fix(pkg/sensor): reduce interface functions for better error handling

This commit is contained in:
Markus Pesch 2020-01-10 19:42:19 +01:00
parent ca4269fff8
commit 95fb1f6745
Signed by: volker.raschek
GPG Key ID: 852BCC170D81A982
12 changed files with 159 additions and 93 deletions

View File

@ -5,13 +5,12 @@ import (
"log" "log"
"os" "os"
"github.com/go-flucky/flucky/pkg/storage"
"github.com/go-flucky/flucky/pkg/types"
"github.com/go-flucky/flucky/pkg/cli" "github.com/go-flucky/flucky/pkg/cli"
"github.com/go-flucky/flucky/pkg/config" "github.com/go-flucky/flucky/pkg/config"
"github.com/go-flucky/flucky/pkg/rgbled" "github.com/go-flucky/flucky/pkg/rgbled"
"github.com/go-flucky/flucky/pkg/sensor" "github.com/go-flucky/flucky/pkg/sensor"
"github.com/go-flucky/flucky/pkg/storage"
"github.com/go-flucky/flucky/pkg/types"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -45,9 +44,8 @@ var readHumidityCmd = &cobra.Command{
log.Fatalln(err) log.Fatalln(err)
} }
ctx := context.Background() measuredValues, err := sensor.Read(sensors, types.MeasuredValueTypeTemperature)
measuredValues, err := sensor.Read(ctx, sensors) if err := rgbled.Run(rgbLEDs); err != nil {
if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
@ -61,6 +59,8 @@ var readHumidityCmd = &cobra.Command{
log.Fatalln(err) log.Fatalln(err)
} }
ctx := context.Background()
err = storage.Write(ctx, measuredValues, storageEndpoint) err = storage.Write(ctx, measuredValues, storageEndpoint)
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)

View File

@ -45,14 +45,11 @@ var readPressureCmd = &cobra.Command{
log.Fatalln(err) log.Fatalln(err)
} }
ctx := context.Background() measuredValues, err := sensor.Read(sensors, types.MeasuredValueTypePressure)
measuredValues, err := sensor.Read(ctx, sensors) if err := rgbled.Run(rgbLEDs); err != nil {
if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
measuredValues = types.SelectMeasuredValues(types.MeasuredValueTypePressure, measuredValues)
cli.PrintMeasuredValues(measuredValues, cnf, os.Stdout) cli.PrintMeasuredValues(measuredValues, cnf, os.Stdout)
if logs { if logs {
@ -61,6 +58,8 @@ var readPressureCmd = &cobra.Command{
log.Fatalln(err) log.Fatalln(err)
} }
ctx := context.Background()
err = storage.Write(ctx, measuredValues, storageEndpoint) err = storage.Write(ctx, measuredValues, storageEndpoint)
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)

View File

@ -6,12 +6,12 @@ import (
"log" "log"
"os" "os"
"github.com/go-flucky/flucky/pkg/storage"
"github.com/go-flucky/flucky/pkg/types"
"github.com/go-flucky/flucky/pkg/cli" "github.com/go-flucky/flucky/pkg/cli"
"github.com/go-flucky/flucky/pkg/config" "github.com/go-flucky/flucky/pkg/config"
"github.com/go-flucky/flucky/pkg/rgbled"
"github.com/go-flucky/flucky/pkg/sensor" "github.com/go-flucky/flucky/pkg/sensor"
"github.com/go-flucky/flucky/pkg/storage"
"github.com/go-flucky/flucky/pkg/types"
"github.com/spf13/cobra" "github.com/spf13/cobra"
) )
@ -39,36 +39,33 @@ var readTemperatureCmd = &cobra.Command{
log.Fatalln("No sensors found, specified or configured") log.Fatalln("No sensors found, specified or configured")
} }
ctx := context.Background() rgbLEDs := cnf.GetRGBLEDs(config.ENABLED)
measuredValues, err := sensor.Read(ctx, sensors) if err := rgbled.Run(rgbLEDs); err != nil {
if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
storage.Round(measuredValues, round) measuredValues, err := sensor.Read(sensors, types.MeasuredValueTypeTemperature)
if err := rgbled.Run(rgbLEDs); err != nil {
log.Fatalln(err)
}
measuredValues = types.SelectMeasuredValues(types.MeasuredValueTypeTemperature, measuredValues)
// print temperatures on stdout
cli.PrintMeasuredValues(measuredValues, cnf, os.Stdout) cli.PrintMeasuredValues(measuredValues, cnf, os.Stdout)
// Save the new measured values, if desired
if logs { if logs {
if compression {
measuredValues = storage.Compression(measuredValues)
}
storageEndpoint, err := cnf.GetStorageEndpointURL() storageEndpoint, err := cnf.GetStorageEndpointURL()
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
ctx := context.Background()
err = storage.Write(ctx, measuredValues, storageEndpoint) err = storage.Write(ctx, measuredValues, storageEndpoint)
if err != nil { if err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
} }
rgbled.Off(rgbLEDs)
}, },
} }

View File

@ -2,6 +2,7 @@ package daemon
import ( import (
"context" "context"
"fmt"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
@ -59,8 +60,49 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress
measuredValuesCache := make([]*types.MeasuredValue, 0) measuredValuesCache := make([]*types.MeasuredValue, 0)
// measuredValuesLogfile := logfile.New(cnf.Logfile) // measuredValuesLogfile := logfile.New(cnf.Logfile)
// Producer // Init semaphoreChannel
go sensor.ReadContinuously(ctx, cnf.GetSensors(config.ENABLED), measuredValueChannel, errorChannel) semaphoreChannels := make(map[string]chan struct{})
for _, sensor := range cnf.GetSensors(config.ENABLED) {
semaphoreChannels[sensor.ID()] = make(chan struct{}, 1)
}
// Start producers
for _, s := range cnf.GetSensors(config.ENABLED) {
// start go routine for each sensor
go func(sensor sensor.Sensor) {
// run forever
for {
select {
case <-ctx.Done():
errorChannel <- fmt.Errorf("Closed context: %v", ctx.Err().Error())
return
case <-semaphoreChannels[sensor.ID()]:
measuredValues, err := sensor.Read()
if err != nil {
errorChannel <- err
return
}
for _, measmeasuredValue := range measuredValues {
measuredValueChannel <- measmeasuredValue
}
}
}
}(s)
// start ticker for each sensor
go func(sensor sensor.Sensor) {
for {
select {
case <-ctx.Done():
errorChannel <- fmt.Errorf("Closed context: %v", ctx.Err().Error())
return
case <-sensor.Ticker().C:
semaphoreChannels[sensor.ID()] <- struct{}{}
}
}
}(s)
}
// Distributor // Distributor
//measuredValueChannels := distribute.MeasuredValues(ctx, 5, measuredValueChannel) //measuredValueChannels := distribute.MeasuredValues(ctx, 5, measuredValueChannel)
@ -105,7 +147,11 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress
measuredValuesCache = make([]*types.MeasuredValue, 0) measuredValuesCache = make([]*types.MeasuredValue, 0)
case measuredValue, _ := <-measuredValueChannel: case measuredValue, open := <-measuredValueChannel:
if !open {
errorChannel <- fmt.Errorf("MeasuredValue channel closed")
cancel()
}
measuredValuesCache = append(measuredValuesCache, measuredValue) measuredValuesCache = append(measuredValuesCache, measuredValue)
} }
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"log" "log"
"sync" "sync"
"time"
"github.com/d2r2/go-bsbmp" "github.com/d2r2/go-bsbmp"
"github.com/d2r2/go-i2c" "github.com/d2r2/go-i2c"
@ -19,9 +20,8 @@ type BME280 struct {
*types.Sensor *types.Sensor
} }
// GetSensorModel returns the sensor model func (s *BME280) ID() string {
func (s *BME280) GetSensorModel() types.SensorModel { return s.SensorID
return s.Sensor.SensorModel
} }
// Read measured values // Read measured values
@ -124,3 +124,12 @@ func (s *BME280) ReadContinously(ctx context.Context, measuredValueChannel chan<
} }
} }
} }
// Ticker returns a new ticker, which tick every when the sensor should be read
func (s *BME280) Ticker() *time.Ticker {
duration, err := time.ParseDuration(s.TickDuration)
if err != nil {
duration = time.Minute
}
return time.NewTicker(duration)
}

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/go-flucky/flucky/pkg/internal/format" "github.com/go-flucky/flucky/pkg/internal/format"
"github.com/go-flucky/flucky/pkg/types" "github.com/go-flucky/flucky/pkg/types"
@ -16,9 +17,8 @@ type DHT11 struct {
*types.Sensor *types.Sensor
} }
// GetSensorModel returns the sensor model func (s *DHT11) ID() string {
func (s *DHT11) GetSensorModel() types.SensorModel { return s.SensorID
return s.Sensor.SensorModel
} }
// Read measured values // Read measured values
@ -98,3 +98,12 @@ func (s *DHT11) ReadContinously(ctx context.Context, measuredValueChannel chan<-
} }
} }
} }
// Ticker returns a new ticker, which tick every when the sensor should be read
func (s *DHT11) Ticker() *time.Ticker {
duration, err := time.ParseDuration(s.TickDuration)
if err != nil {
duration = time.Minute
}
return time.NewTicker(duration)
}

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"sync" "sync"
"time"
"github.com/go-flucky/flucky/pkg/internal/format" "github.com/go-flucky/flucky/pkg/internal/format"
"github.com/go-flucky/flucky/pkg/types" "github.com/go-flucky/flucky/pkg/types"
@ -16,9 +17,8 @@ type DHT22 struct {
*types.Sensor *types.Sensor
} }
// GetSensorModel returns the sensor model func (s *DHT22) ID() string {
func (s *DHT22) GetSensorModel() types.SensorModel { return s.SensorID
return s.Sensor.SensorModel
} }
// Read measured values // Read measured values
@ -98,3 +98,12 @@ func (s *DHT22) ReadContinously(ctx context.Context, measuredValueChannel chan<-
} }
} }
} }
// Ticker returns a new ticker, which tick every when the sensor should be read
func (s *DHT22) Ticker() *time.Ticker {
duration, err := time.ParseDuration(s.TickDuration)
if err != nil {
duration = time.Minute
}
return time.NewTicker(duration)
}

View File

@ -8,6 +8,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"time"
"github.com/go-flucky/flucky/pkg/internal/format" "github.com/go-flucky/flucky/pkg/internal/format"
"github.com/go-flucky/flucky/pkg/types" "github.com/go-flucky/flucky/pkg/types"
@ -19,9 +20,8 @@ type DS18B20 struct {
*types.Sensor *types.Sensor
} }
// GetSensorModel returns the sensor model func (s *DS18B20) ID() string {
func (s *DS18B20) GetSensorModel() types.SensorModel { return s.SensorID
return s.Sensor.SensorModel
} }
// Read measured values // Read measured values
@ -40,12 +40,12 @@ func (s *DS18B20) Read() ([]*types.MeasuredValue, error) {
i := strings.LastIndex(raw, "t=") i := strings.LastIndex(raw, "t=")
if i == -1 { if i == -1 {
return nil, ErrReadSensor return nil, errorReadData
} }
c, err := strconv.ParseFloat(raw[i+2:len(raw)-1], 64) c, err := strconv.ParseFloat(raw[i+2:len(raw)-1], 64)
if err != nil { if err != nil {
return nil, ErrParseData return nil, errorParseData
} }
temperatureValue := c / 1000 temperatureValue := c / 1000
@ -97,3 +97,12 @@ func (s *DS18B20) ReadContinously(ctx context.Context, measuredValueChannel chan
} }
} }
} }
// Ticker returns a new ticker, which tick every when the sensor should be read
func (s *DS18B20) Ticker() *time.Ticker {
duration, err := time.ParseDuration(s.TickDuration)
if err != nil {
duration = time.Minute
}
return time.NewTicker(duration)
}

View File

@ -4,5 +4,7 @@ import (
"errors" "errors"
) )
var ErrParseData = errors.New("Can not parse data") var (
var ErrReadSensor = errors.New("Can not read data from Sensor") errorParseData = errors.New("Failed to parse data")
errorReadData = errors.New("Failed to read data from sensor")
)

View File

@ -1,15 +1,12 @@
package sensor package sensor
import ( import (
"context"
"sync"
"github.com/go-flucky/flucky/pkg/types" "github.com/go-flucky/flucky/pkg/types"
"time"
) )
type Sensor interface { type Sensor interface {
GetSensorModel() types.SensorModel ID() string
Read() ([]*types.MeasuredValue, error) Read() ([]*types.MeasuredValue, error)
ReadChannel(measuredValueChannel chan<- *types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) Ticker() *time.Ticker
ReadContinously(ctx context.Context, measuredValueChannel chan<- *types.MeasuredValue, errorChannel chan<- error)
} }

View File

@ -1,58 +1,46 @@
package sensor package sensor
import ( import (
"context"
"fmt"
"sync"
"github.com/go-flucky/flucky/pkg/internal/collect"
"github.com/go-flucky/flucky/pkg/internal/prittyprint"
"github.com/go-flucky/flucky/pkg/types" "github.com/go-flucky/flucky/pkg/types"
) )
// Read measured values from sensors func Read(sensors []Sensor, measuredValueType types.MeasuredValueType) ([]*types.MeasuredValue, error) {
func Read(ctx context.Context, sensors []Sensor) ([]*types.MeasuredValue, error) {
measuredValueChannel := make(chan *types.MeasuredValue, len(sensors)) type result struct {
errorChannel := make(chan error, len(sensors)) measuredValues []*types.MeasuredValue
err error
ReadChannel(ctx, sensors, measuredValueChannel, errorChannel)
errors := collect.Errors(errorChannel)
if len(errors) > 0 {
return nil, prittyprint.FormatErrors(errors)
} }
measuredValues := collect.MeasuredValues(measuredValueChannel) resultChannel := make(chan *result, len(sensors))
return measuredValues, nil // producers
} // read measured values
for _, s := range sensors {
// ReadChannel reads the measured values from sensors and writes them to a go func(s Sensor) {
// channel. measuredValues, err := s.Read()
func ReadChannel(ctx context.Context, sensors []Sensor, measuredValueChannel chan<- *types.MeasuredValue, errorChannel chan<- error) { resultChannel <- &result{
measuredValues: measuredValues,
wg := new(sync.WaitGroup) err: err,
wg.Add(len(sensors)) }
}(s)
for _, sensor := range sensors {
go sensor.ReadChannel(measuredValueChannel, errorChannel, wg)
} }
wg.Wait() // consumer
} measuredValues := make([]*types.MeasuredValue, 0)
counter := len(sensors)
// ReadContinuously reads the measured values continously from sensors and writes
// them to a channel.
func ReadContinuously(ctx context.Context, sensors []Sensor, measuredValueChannel chan<- *types.MeasuredValue, errorChannel chan<- error) {
for { for {
if counter == 0 {
break
}
select { select {
case <-ctx.Done(): case result := <-resultChannel:
errorChannel <- fmt.Errorf("Context closed: %v", ctx.Err()) counter--
return if result.err != nil {
default: return nil, result.err
ReadChannel(ctx, sensors, measuredValueChannel, errorChannel) }
measuredValues = append(measuredValues, result.measuredValues...)
} }
} }
return types.SelectMeasuredValues(measuredValueType, measuredValues), nil
} }

View File

@ -19,6 +19,7 @@ type Sensor struct {
SensorModel SensorModel `json:"sensor_model" xml:"sensor_model"` SensorModel SensorModel `json:"sensor_model" xml:"sensor_model"`
SensorEnabled bool `json:"sensor_enabled" xml:"sensor_enabled"` SensorEnabled bool `json:"sensor_enabled" xml:"sensor_enabled"`
SensorLastContact *time.Time `json:"sensor_last_contact" xml:"sensor_last_contact"` SensorLastContact *time.Time `json:"sensor_last_contact" xml:"sensor_last_contact"`
TickDuration string `json:"sensor_tick_duration" xml:"sensor_tick_duration"`
DeviceID string `json:"device_id" xml:"device_id"` DeviceID string `json:"device_id" xml:"device_id"`
CreationDate time.Time `json:"creation_date" xml:"creation_date"` CreationDate time.Time `json:"creation_date" xml:"creation_date"`
} }