fix(pkg/sensors): Use channel of data type []measuredValues instead of measuredValues

This commit is contained in:
Markus Pesch 2019-06-27 09:31:40 +02:00
parent 1d8c86df67
commit 8005248262
Signed by: volker.raschek
GPG Key ID: 852BCC170D81A982
7 changed files with 46 additions and 41 deletions

View File

@ -28,7 +28,7 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress
signal.Notify(interrupt, os.Interrupt, os.Kill, syscall.SIGTERM) signal.Notify(interrupt, os.Interrupt, os.Kill, syscall.SIGTERM)
errorChannel := make(chan error, 0) errorChannel := make(chan error, 0)
measuredValueChannel := make(chan types.MeasuredValue, 0) measuredValuesChannel := make(chan []types.MeasuredValue, 0)
ctx := context.Background() ctx := context.Background()
childContext, cancel := context.WithCancel(ctx) childContext, cancel := context.WithCancel(ctx)
@ -37,7 +37,7 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress
measuredValuesCache := make([]types.MeasuredValue, 0) measuredValuesCache := make([]types.MeasuredValue, 0)
go sensor.ReadContinuously(childContext, cnf.GetTemperatureSensors(config.ENABLED), measuredValueChannel, errorChannel) go sensor.ReadContinuously(childContext, cnf.GetTemperatureSensors(config.ENABLED), measuredValuesChannel, errorChannel)
rgbLEDs := cnf.GetRGBLEDs(config.ENABLED) rgbLEDs := cnf.GetRGBLEDs(config.ENABLED)
@ -79,8 +79,8 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress
} }
measuredValuesCache = make([]types.MeasuredValue, 0) measuredValuesCache = make([]types.MeasuredValue, 0)
case measuredValue, _ := <-measuredValueChannel: case measuredValues, _ := <-measuredValuesChannel:
measuredValuesCache = append(measuredValuesCache, measuredValue) measuredValuesCache = append(measuredValuesCache, measuredValues...)
case killSignal := <-interrupt: case killSignal := <-interrupt:
logger.Warn("Daemon was interruped by system signal %v\n", killSignal) logger.Warn("Daemon was interruped by system signal %v\n", killSignal)

View File

@ -4,13 +4,13 @@ import (
"github.com/go-flucky/flucky/pkg/types" "github.com/go-flucky/flucky/pkg/types"
) )
func MeasuredValues(measuredValueChannel <-chan types.MeasuredValue) []types.MeasuredValue { func MeasuredValues(measuredValuesChannel <-chan []types.MeasuredValue) []types.MeasuredValue {
cachedMeasuredValues := make([]types.MeasuredValue, 0) cachedMeasuredValues := make([]types.MeasuredValue, 0)
for { for {
select { select {
case measuredValue, more := <-measuredValueChannel: case measuredValues, more := <-measuredValuesChannel:
if more { if more {
cachedMeasuredValues = append(cachedMeasuredValues, measuredValue) cachedMeasuredValues = append(cachedMeasuredValues, measuredValues...)
continue continue
} }
default: default:

View File

@ -66,7 +66,7 @@ func (s *DHT11) Read() ([]types.MeasuredValue, error) {
// ReadChannel reads the measured values from the sensor and writes them to a // ReadChannel reads the measured values from the sensor and writes them to a
// channel. // channel.
func (s *DHT11) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) { func (s *DHT11) ReadChannel(measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) {
if wg != nil { if wg != nil {
defer wg.Done() defer wg.Done()
} }
@ -77,21 +77,20 @@ func (s *DHT11) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, err
return return
} }
for _, measuredValue := range measuredValues { measuredValuesChannel <- measuredValues
measuredValueChannel <- measuredValue
}
} }
// ReadContinously reads the measured values continously from the sensor and // ReadContinously reads the measured values continously from the sensor and
// writes them to a channel. // writes them to a channel.
func (s *DHT11) ReadContinously(ctx context.Context, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error) { func (s *DHT11) ReadContinously(ctx context.Context, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
errorChannel <- fmt.Errorf("%v: Context closed: %v", s.SensorName, ctx.Err()) errorChannel <- fmt.Errorf("%v: Context closed: %v", s.SensorName, ctx.Err())
return return
default: default:
s.ReadChannel(measuredValueChannel, errorChannel, nil) s.ReadChannel(measuredValuesChannel, errorChannel, nil)
} }
} }
} }

View File

@ -66,7 +66,7 @@ func (s *DHT22) Read() ([]types.MeasuredValue, error) {
// ReadChannel reads the measured values from the sensor and writes them to a // ReadChannel reads the measured values from the sensor and writes them to a
// channel. // channel.
func (s *DHT22) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) { func (s *DHT22) ReadChannel(measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) {
if wg != nil { if wg != nil {
defer wg.Done() defer wg.Done()
} }
@ -77,21 +77,20 @@ func (s *DHT22) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, err
return return
} }
for _, measuredValue := range measuredValues { measuredValuesChannel <- measuredValues
measuredValueChannel <- measuredValue
}
} }
// ReadContinously reads the measured values continously from the sensor and // ReadContinously reads the measured values continously from the sensor and
// writes them to a channel. // writes them to a channel.
func (s *DHT22) ReadContinously(ctx context.Context, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error) { func (s *DHT22) ReadContinously(ctx context.Context, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
errorChannel <- fmt.Errorf("%v: Context closed: %v", s.SensorName, ctx.Err()) errorChannel <- fmt.Errorf("%v: Context closed: %v", s.SensorName, ctx.Err())
return return
default: default:
s.ReadChannel(measuredValueChannel, errorChannel, nil) s.ReadChannel(measuredValuesChannel, errorChannel, nil)
} }
} }
} }

View File

@ -66,7 +66,7 @@ func (s *DS18B20) Read() ([]types.MeasuredValue, error) {
// ReadChannel reads the measured values from the sensor and writes them to a // ReadChannel reads the measured values from the sensor and writes them to a
// channel. // channel.
func (s *DS18B20) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) { func (s *DS18B20) ReadChannel(measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) {
if wg != nil { if wg != nil {
defer wg.Done() defer wg.Done()
} }
@ -77,22 +77,20 @@ func (s *DS18B20) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, e
return return
} }
for _, measuredValue := range measuredValues { measuredValuesChannel <- measuredValues
measuredValueChannel <- measuredValue
}
} }
// ReadContinously reads the measured values continously from the sensor and // ReadContinously reads the measured values continously from the sensor and
// writes them to a channel. // writes them to a channel.
func (s *DS18B20) ReadContinously(ctx context.Context, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error) { func (s *DS18B20) ReadContinously(ctx context.Context, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
errorChannel <- fmt.Errorf("%v: Context closed: %v", s.SensorName, ctx.Err()) errorChannel <- fmt.Errorf("%v: Context closed: %v", s.SensorName, ctx.Err())
return return
default: default:
s.ReadChannel(measuredValueChannel, errorChannel, nil) s.ReadChannel(measuredValuesChannel, errorChannel, nil)
} }
} }
} }

View File

@ -10,6 +10,6 @@ import (
type Sensor interface { type Sensor interface {
GetSensorModel() types.SensorModel GetSensorModel() types.SensorModel
Read() ([]types.MeasuredValue, error) Read() ([]types.MeasuredValue, error)
ReadChannel(measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) ReadChannel(measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup)
ReadContinously(ctx context.Context, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error) ReadContinously(ctx context.Context, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error)
} }

View File

@ -5,45 +5,54 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/go-flucky/flucky/pkg/internal/collect"
"github.com/go-flucky/flucky/pkg/internal/prittyprint"
"github.com/go-flucky/flucky/pkg/types" "github.com/go-flucky/flucky/pkg/types"
) )
// Read measured values from sensors // Read measured values from sensors
func Read(ctx context.Context, sensors []Sensor) ([]types.MeasuredValue, error) { func Read(ctx context.Context, sensors []Sensor) ([]types.MeasuredValue, error) {
// TODO: Execute Read with go function measuredValuesChannel := make(chan []types.MeasuredValue, len(sensors))
cachesMeasuredValues := make([]types.MeasuredValue, 0) errorChannel := make(chan error, len(sensors))
for _, sensor := range sensors { ReadChannel(ctx, sensors, measuredValuesChannel, errorChannel)
measuredValues, err := sensor.Read()
if err != nil { errors := collect.Errors(errorChannel)
return nil, err if len(errors) > 0 {
} return nil, prittyprint.FormatErrors(errors)
cachesMeasuredValues = append(cachesMeasuredValues, measuredValues...)
} }
return cachesMeasuredValues, nil measuredValues := collect.MeasuredValues(measuredValuesChannel)
return measuredValues, nil
} }
// ReadChannel reads the measured values from sensors and writes them to a // ReadChannel reads the measured values from sensors and writes them to a
// channel. // channel.
func ReadChannel(ctx context.Context, sensors []Sensor, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) { func ReadChannel(ctx context.Context, sensors []Sensor, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error) {
// TODO: Execute ReadCallel with go function if wg is available
wg := new(sync.WaitGroup)
wg.Add(len(sensors))
for _, sensor := range sensors { for _, sensor := range sensors {
sensor.ReadChannel(measuredValueChannel, errorChannel, wg) go sensor.ReadChannel(measuredValuesChannel, errorChannel, wg)
} }
wg.Wait()
} }
// ReadContinuously reads the measured values continously from sensors and writes // ReadContinuously reads the measured values continously from sensors and writes
// them to a channel. // them to a channel.
func ReadContinuously(ctx context.Context, sensors []Sensor, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error) { func ReadContinuously(ctx context.Context, sensors []Sensor, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
errorChannel <- fmt.Errorf("Context closed: %v", ctx.Err()) errorChannel <- fmt.Errorf("Context closed: %v", ctx.Err())
return return
default: default:
ReadChannel(ctx, sensors, measuredValueChannel, errorChannel, nil) ReadChannel(ctx, sensors, measuredValuesChannel, errorChannel)
} }
} }
} }