diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index c6edd73..93e374b 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -28,7 +28,7 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress signal.Notify(interrupt, os.Interrupt, os.Kill, syscall.SIGTERM) errorChannel := make(chan error, 0) - measuredValueChannel := make(chan types.MeasuredValue, 0) + measuredValuesChannel := make(chan []types.MeasuredValue, 0) ctx := context.Background() childContext, cancel := context.WithCancel(ctx) @@ -37,7 +37,7 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress measuredValuesCache := make([]types.MeasuredValue, 0) - go sensor.ReadContinuously(childContext, cnf.GetTemperatureSensors(config.ENABLED), measuredValueChannel, errorChannel) + go sensor.ReadContinuously(childContext, cnf.GetTemperatureSensors(config.ENABLED), measuredValuesChannel, errorChannel) rgbLEDs := cnf.GetRGBLEDs(config.ENABLED) @@ -79,8 +79,8 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress } measuredValuesCache = make([]types.MeasuredValue, 0) - case measuredValue, _ := <-measuredValueChannel: - measuredValuesCache = append(measuredValuesCache, measuredValue) + case measuredValues, _ := <-measuredValuesChannel: + measuredValuesCache = append(measuredValuesCache, measuredValues...) case killSignal := <-interrupt: logger.Warn("Daemon was interruped by system signal %v\n", killSignal) diff --git a/pkg/internal/collect/measuredValues.go b/pkg/internal/collect/measuredValues.go index 634414a..629e8d8 100644 --- a/pkg/internal/collect/measuredValues.go +++ b/pkg/internal/collect/measuredValues.go @@ -4,13 +4,13 @@ import ( "github.com/go-flucky/flucky/pkg/types" ) -func MeasuredValues(measuredValueChannel <-chan types.MeasuredValue) []types.MeasuredValue { +func MeasuredValues(measuredValuesChannel <-chan []types.MeasuredValue) []types.MeasuredValue { cachedMeasuredValues := make([]types.MeasuredValue, 0) for { select { - case measuredValue, more := <-measuredValueChannel: + case measuredValues, more := <-measuredValuesChannel: if more { - cachedMeasuredValues = append(cachedMeasuredValues, measuredValue) + cachedMeasuredValues = append(cachedMeasuredValues, measuredValues...) continue } default: diff --git a/pkg/sensor/dht11.go b/pkg/sensor/dht11.go index 18e721b..5b2b98d 100644 --- a/pkg/sensor/dht11.go +++ b/pkg/sensor/dht11.go @@ -66,7 +66,7 @@ func (s *DHT11) Read() ([]types.MeasuredValue, error) { // ReadChannel reads the measured values from the sensor and writes them to a // channel. -func (s *DHT11) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) { +func (s *DHT11) ReadChannel(measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) { if wg != nil { defer wg.Done() } @@ -77,21 +77,20 @@ func (s *DHT11) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, err return } - for _, measuredValue := range measuredValues { - measuredValueChannel <- measuredValue - } + measuredValuesChannel <- measuredValues + } // ReadContinously reads the measured values continously from the sensor and // writes them to a channel. -func (s *DHT11) ReadContinously(ctx context.Context, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error) { +func (s *DHT11) ReadContinously(ctx context.Context, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error) { for { select { case <-ctx.Done(): errorChannel <- fmt.Errorf("%v: Context closed: %v", s.SensorName, ctx.Err()) return default: - s.ReadChannel(measuredValueChannel, errorChannel, nil) + s.ReadChannel(measuredValuesChannel, errorChannel, nil) } } } diff --git a/pkg/sensor/dht22.go b/pkg/sensor/dht22.go index 1675bce..2ca5eb9 100644 --- a/pkg/sensor/dht22.go +++ b/pkg/sensor/dht22.go @@ -66,7 +66,7 @@ func (s *DHT22) Read() ([]types.MeasuredValue, error) { // ReadChannel reads the measured values from the sensor and writes them to a // channel. -func (s *DHT22) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) { +func (s *DHT22) ReadChannel(measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) { if wg != nil { defer wg.Done() } @@ -77,21 +77,20 @@ func (s *DHT22) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, err return } - for _, measuredValue := range measuredValues { - measuredValueChannel <- measuredValue - } + measuredValuesChannel <- measuredValues + } // ReadContinously reads the measured values continously from the sensor and // writes them to a channel. -func (s *DHT22) ReadContinously(ctx context.Context, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error) { +func (s *DHT22) ReadContinously(ctx context.Context, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error) { for { select { case <-ctx.Done(): errorChannel <- fmt.Errorf("%v: Context closed: %v", s.SensorName, ctx.Err()) return default: - s.ReadChannel(measuredValueChannel, errorChannel, nil) + s.ReadChannel(measuredValuesChannel, errorChannel, nil) } } } diff --git a/pkg/sensor/ds18b20.go b/pkg/sensor/ds18b20.go index 9ed73bf..46211f5 100644 --- a/pkg/sensor/ds18b20.go +++ b/pkg/sensor/ds18b20.go @@ -66,7 +66,7 @@ func (s *DS18B20) Read() ([]types.MeasuredValue, error) { // ReadChannel reads the measured values from the sensor and writes them to a // channel. -func (s *DS18B20) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) { +func (s *DS18B20) ReadChannel(measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) { if wg != nil { defer wg.Done() } @@ -77,22 +77,20 @@ func (s *DS18B20) ReadChannel(measuredValueChannel chan<- types.MeasuredValue, e return } - for _, measuredValue := range measuredValues { - measuredValueChannel <- measuredValue - } + measuredValuesChannel <- measuredValues } // ReadContinously reads the measured values continously from the sensor and // writes them to a channel. -func (s *DS18B20) ReadContinously(ctx context.Context, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error) { +func (s *DS18B20) ReadContinously(ctx context.Context, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error) { for { select { case <-ctx.Done(): errorChannel <- fmt.Errorf("%v: Context closed: %v", s.SensorName, ctx.Err()) return default: - s.ReadChannel(measuredValueChannel, errorChannel, nil) + s.ReadChannel(measuredValuesChannel, errorChannel, nil) } } } diff --git a/pkg/sensor/interfaces.go b/pkg/sensor/interfaces.go index fc5d028..08ee48f 100644 --- a/pkg/sensor/interfaces.go +++ b/pkg/sensor/interfaces.go @@ -10,6 +10,6 @@ import ( type Sensor interface { GetSensorModel() types.SensorModel Read() ([]types.MeasuredValue, error) - ReadChannel(measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) - ReadContinously(ctx context.Context, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error) + ReadChannel(measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) + ReadContinously(ctx context.Context, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error) } diff --git a/pkg/sensor/sensor.go b/pkg/sensor/sensor.go index 4f7ca48..154a7b3 100644 --- a/pkg/sensor/sensor.go +++ b/pkg/sensor/sensor.go @@ -5,45 +5,54 @@ import ( "fmt" "sync" + "github.com/go-flucky/flucky/pkg/internal/collect" + "github.com/go-flucky/flucky/pkg/internal/prittyprint" + "github.com/go-flucky/flucky/pkg/types" ) // Read measured values from sensors func Read(ctx context.Context, sensors []Sensor) ([]types.MeasuredValue, error) { - // TODO: Execute Read with go function - cachesMeasuredValues := make([]types.MeasuredValue, 0) + measuredValuesChannel := make(chan []types.MeasuredValue, len(sensors)) + errorChannel := make(chan error, len(sensors)) - for _, sensor := range sensors { - measuredValues, err := sensor.Read() - if err != nil { - return nil, err - } - cachesMeasuredValues = append(cachesMeasuredValues, measuredValues...) + ReadChannel(ctx, sensors, measuredValuesChannel, errorChannel) + + errors := collect.Errors(errorChannel) + if len(errors) > 0 { + return nil, prittyprint.FormatErrors(errors) } - return cachesMeasuredValues, nil + measuredValues := collect.MeasuredValues(measuredValuesChannel) + + return measuredValues, nil } // ReadChannel reads the measured values from sensors and writes them to a // channel. -func ReadChannel(ctx context.Context, sensors []Sensor, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) { - // TODO: Execute ReadCallel with go function if wg is available +func ReadChannel(ctx context.Context, sensors []Sensor, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error) { + + wg := new(sync.WaitGroup) + wg.Add(len(sensors)) + for _, sensor := range sensors { - sensor.ReadChannel(measuredValueChannel, errorChannel, wg) + go sensor.ReadChannel(measuredValuesChannel, errorChannel, wg) } + + wg.Wait() } // ReadContinuously reads the measured values continously from sensors and writes // them to a channel. -func ReadContinuously(ctx context.Context, sensors []Sensor, measuredValueChannel chan<- types.MeasuredValue, errorChannel chan<- error) { +func ReadContinuously(ctx context.Context, sensors []Sensor, measuredValuesChannel chan<- []types.MeasuredValue, errorChannel chan<- error) { for { select { case <-ctx.Done(): errorChannel <- fmt.Errorf("Context closed: %v", ctx.Err()) return default: - ReadChannel(ctx, sensors, measuredValueChannel, errorChannel, nil) + ReadChannel(ctx, sensors, measuredValuesChannel, errorChannel) } } }