diff --git a/cmd/humidity/read.go b/cmd/humidity/read.go index ed064b7..fde1988 100644 --- a/cmd/humidity/read.go +++ b/cmd/humidity/read.go @@ -5,13 +5,12 @@ import ( "log" "os" - "github.com/go-flucky/flucky/pkg/storage" - "github.com/go-flucky/flucky/pkg/types" - "github.com/go-flucky/flucky/pkg/cli" "github.com/go-flucky/flucky/pkg/config" "github.com/go-flucky/flucky/pkg/rgbled" "github.com/go-flucky/flucky/pkg/sensor" + "github.com/go-flucky/flucky/pkg/storage" + "github.com/go-flucky/flucky/pkg/types" "github.com/spf13/cobra" ) @@ -45,9 +44,8 @@ var readHumidityCmd = &cobra.Command{ log.Fatalln(err) } - ctx := context.Background() - measuredValues, err := sensor.Read(ctx, sensors) - if err != nil { + measuredValues, err := sensor.Read(sensors, types.MeasuredValueTypeTemperature) + if err := rgbled.Run(rgbLEDs); err != nil { log.Fatalln(err) } @@ -61,6 +59,8 @@ var readHumidityCmd = &cobra.Command{ log.Fatalln(err) } + ctx := context.Background() + err = storage.Write(ctx, measuredValues, storageEndpoint) if err != nil { log.Fatalln(err) diff --git a/cmd/pressure/read.go b/cmd/pressure/read.go index b9e938d..f05fb31 100644 --- a/cmd/pressure/read.go +++ b/cmd/pressure/read.go @@ -45,14 +45,11 @@ var readPressureCmd = &cobra.Command{ log.Fatalln(err) } - ctx := context.Background() - measuredValues, err := sensor.Read(ctx, sensors) - if err != nil { + measuredValues, err := sensor.Read(sensors, types.MeasuredValueTypePressure) + if err := rgbled.Run(rgbLEDs); err != nil { log.Fatalln(err) } - measuredValues = types.SelectMeasuredValues(types.MeasuredValueTypePressure, measuredValues) - cli.PrintMeasuredValues(measuredValues, cnf, os.Stdout) if logs { @@ -61,6 +58,8 @@ var readPressureCmd = &cobra.Command{ log.Fatalln(err) } + ctx := context.Background() + err = storage.Write(ctx, measuredValues, storageEndpoint) if err != nil { log.Fatalln(err) diff --git a/cmd/temperature/read.go b/cmd/temperature/read.go index d122033..165e6c0 100644 --- a/cmd/temperature/read.go +++ b/cmd/temperature/read.go @@ -6,12 +6,12 @@ import ( "log" "os" - "github.com/go-flucky/flucky/pkg/storage" - "github.com/go-flucky/flucky/pkg/types" - "github.com/go-flucky/flucky/pkg/cli" "github.com/go-flucky/flucky/pkg/config" + "github.com/go-flucky/flucky/pkg/rgbled" "github.com/go-flucky/flucky/pkg/sensor" + "github.com/go-flucky/flucky/pkg/storage" + "github.com/go-flucky/flucky/pkg/types" "github.com/spf13/cobra" ) @@ -39,36 +39,33 @@ var readTemperatureCmd = &cobra.Command{ log.Fatalln("No sensors found, specified or configured") } - ctx := context.Background() - measuredValues, err := sensor.Read(ctx, sensors) - if err != nil { + rgbLEDs := cnf.GetRGBLEDs(config.ENABLED) + if err := rgbled.Run(rgbLEDs); err != nil { log.Fatalln(err) } - storage.Round(measuredValues, round) + measuredValues, err := sensor.Read(sensors, types.MeasuredValueTypeTemperature) + if err := rgbled.Run(rgbLEDs); err != nil { + log.Fatalln(err) + } - measuredValues = types.SelectMeasuredValues(types.MeasuredValueTypeTemperature, measuredValues) - - // print temperatures on stdout cli.PrintMeasuredValues(measuredValues, cnf, os.Stdout) - // Save the new measured values, if desired if logs { - - if compression { - measuredValues = storage.Compression(measuredValues) - } - storageEndpoint, err := cnf.GetStorageEndpointURL() if err != nil { log.Fatalln(err) } + ctx := context.Background() + err = storage.Write(ctx, measuredValues, storageEndpoint) if err != nil { log.Fatalln(err) } } + + rgbled.Off(rgbLEDs) }, } diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index d688071..edfc851 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -2,6 +2,7 @@ package daemon import ( "context" + "fmt" "os" "os/signal" "syscall" @@ -59,8 +60,49 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress measuredValuesCache := make([]*types.MeasuredValue, 0) // measuredValuesLogfile := logfile.New(cnf.Logfile) - // Producer - go sensor.ReadContinuously(ctx, cnf.GetSensors(config.ENABLED), measuredValueChannel, errorChannel) + // Init semaphoreChannel + semaphoreChannels := make(map[string]chan struct{}) + for _, sensor := range cnf.GetSensors(config.ENABLED) { + semaphoreChannels[sensor.ID()] = make(chan struct{}, 1) + } + + // Start producers + for _, s := range cnf.GetSensors(config.ENABLED) { + + // start go routine for each sensor + go func(sensor sensor.Sensor) { + // run forever + for { + select { + case <-ctx.Done(): + errorChannel <- fmt.Errorf("Closed context: %v", ctx.Err().Error()) + return + case <-semaphoreChannels[sensor.ID()]: + measuredValues, err := sensor.Read() + if err != nil { + errorChannel <- err + return + } + for _, measmeasuredValue := range measuredValues { + measuredValueChannel <- measmeasuredValue + } + } + } + }(s) + + // start ticker for each sensor + go func(sensor sensor.Sensor) { + for { + select { + case <-ctx.Done(): + errorChannel <- fmt.Errorf("Closed context: %v", ctx.Err().Error()) + return + case <-sensor.Ticker().C: + semaphoreChannels[sensor.ID()] <- struct{}{} + } + } + }(s) + } // Distributor //measuredValueChannels := distribute.MeasuredValues(ctx, 5, measuredValueChannel) @@ -105,7 +147,11 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress measuredValuesCache = make([]*types.MeasuredValue, 0) - case measuredValue, _ := <-measuredValueChannel: + case measuredValue, open := <-measuredValueChannel: + if !open { + errorChannel <- fmt.Errorf("MeasuredValue channel closed") + cancel() + } measuredValuesCache = append(measuredValuesCache, measuredValue) } } diff --git a/pkg/sensor/bme280.go b/pkg/sensor/bme280.go index 27ae6de..3899104 100644 --- a/pkg/sensor/bme280.go +++ b/pkg/sensor/bme280.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "sync" + "time" "github.com/d2r2/go-bsbmp" "github.com/d2r2/go-i2c" @@ -19,9 +20,8 @@ type BME280 struct { *types.Sensor } -// GetSensorModel returns the sensor model -func (s *BME280) GetSensorModel() types.SensorModel { - return s.Sensor.SensorModel +func (s *BME280) ID() string { + return s.SensorID } // Read measured values @@ -124,3 +124,12 @@ func (s *BME280) ReadContinously(ctx context.Context, measuredValueChannel chan< } } } + +// Ticker returns a new ticker, which tick every when the sensor should be read +func (s *BME280) Ticker() *time.Ticker { + duration, err := time.ParseDuration(s.TickDuration) + if err != nil { + duration = time.Minute + } + return time.NewTicker(duration) +} diff --git a/pkg/sensor/dht11.go b/pkg/sensor/dht11.go index 9a62e1c..d0bb73f 100644 --- a/pkg/sensor/dht11.go +++ b/pkg/sensor/dht11.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/go-flucky/flucky/pkg/internal/format" "github.com/go-flucky/flucky/pkg/types" @@ -16,9 +17,8 @@ type DHT11 struct { *types.Sensor } -// GetSensorModel returns the sensor model -func (s *DHT11) GetSensorModel() types.SensorModel { - return s.Sensor.SensorModel +func (s *DHT11) ID() string { + return s.SensorID } // Read measured values @@ -98,3 +98,12 @@ func (s *DHT11) ReadContinously(ctx context.Context, measuredValueChannel chan<- } } } + +// Ticker returns a new ticker, which tick every when the sensor should be read +func (s *DHT11) Ticker() *time.Ticker { + duration, err := time.ParseDuration(s.TickDuration) + if err != nil { + duration = time.Minute + } + return time.NewTicker(duration) +} diff --git a/pkg/sensor/dht22.go b/pkg/sensor/dht22.go index 734822e..e9e2b49 100644 --- a/pkg/sensor/dht22.go +++ b/pkg/sensor/dht22.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/go-flucky/flucky/pkg/internal/format" "github.com/go-flucky/flucky/pkg/types" @@ -16,9 +17,8 @@ type DHT22 struct { *types.Sensor } -// GetSensorModel returns the sensor model -func (s *DHT22) GetSensorModel() types.SensorModel { - return s.Sensor.SensorModel +func (s *DHT22) ID() string { + return s.SensorID } // Read measured values @@ -98,3 +98,12 @@ func (s *DHT22) ReadContinously(ctx context.Context, measuredValueChannel chan<- } } } + +// Ticker returns a new ticker, which tick every when the sensor should be read +func (s *DHT22) Ticker() *time.Ticker { + duration, err := time.ParseDuration(s.TickDuration) + if err != nil { + duration = time.Minute + } + return time.NewTicker(duration) +} diff --git a/pkg/sensor/ds18b20.go b/pkg/sensor/ds18b20.go index 6c254b6..da7ff0b 100644 --- a/pkg/sensor/ds18b20.go +++ b/pkg/sensor/ds18b20.go @@ -8,6 +8,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/go-flucky/flucky/pkg/internal/format" "github.com/go-flucky/flucky/pkg/types" @@ -19,9 +20,8 @@ type DS18B20 struct { *types.Sensor } -// GetSensorModel returns the sensor model -func (s *DS18B20) GetSensorModel() types.SensorModel { - return s.Sensor.SensorModel +func (s *DS18B20) ID() string { + return s.SensorID } // Read measured values @@ -40,12 +40,12 @@ func (s *DS18B20) Read() ([]*types.MeasuredValue, error) { i := strings.LastIndex(raw, "t=") if i == -1 { - return nil, ErrReadSensor + return nil, errorReadData } c, err := strconv.ParseFloat(raw[i+2:len(raw)-1], 64) if err != nil { - return nil, ErrParseData + return nil, errorParseData } temperatureValue := c / 1000 @@ -97,3 +97,12 @@ func (s *DS18B20) ReadContinously(ctx context.Context, measuredValueChannel chan } } } + +// Ticker returns a new ticker, which tick every when the sensor should be read +func (s *DS18B20) Ticker() *time.Ticker { + duration, err := time.ParseDuration(s.TickDuration) + if err != nil { + duration = time.Minute + } + return time.NewTicker(duration) +} diff --git a/pkg/sensor/errors.go b/pkg/sensor/errors.go index 054ff47..5bdd95e 100644 --- a/pkg/sensor/errors.go +++ b/pkg/sensor/errors.go @@ -4,5 +4,7 @@ import ( "errors" ) -var ErrParseData = errors.New("Can not parse data") -var ErrReadSensor = errors.New("Can not read data from Sensor") +var ( + errorParseData = errors.New("Failed to parse data") + errorReadData = errors.New("Failed to read data from sensor") +) diff --git a/pkg/sensor/interfaces.go b/pkg/sensor/interfaces.go index f7f7203..3a719a2 100644 --- a/pkg/sensor/interfaces.go +++ b/pkg/sensor/interfaces.go @@ -1,15 +1,12 @@ package sensor import ( - "context" - "sync" - "github.com/go-flucky/flucky/pkg/types" + "time" ) type Sensor interface { - GetSensorModel() types.SensorModel + ID() string Read() ([]*types.MeasuredValue, error) - ReadChannel(measuredValueChannel chan<- *types.MeasuredValue, errorChannel chan<- error, wg *sync.WaitGroup) - ReadContinously(ctx context.Context, measuredValueChannel chan<- *types.MeasuredValue, errorChannel chan<- error) + Ticker() *time.Ticker } diff --git a/pkg/sensor/sensor.go b/pkg/sensor/sensor.go index 4219885..b731f88 100644 --- a/pkg/sensor/sensor.go +++ b/pkg/sensor/sensor.go @@ -1,58 +1,46 @@ package sensor import ( - "context" - "fmt" - "sync" - - "github.com/go-flucky/flucky/pkg/internal/collect" - "github.com/go-flucky/flucky/pkg/internal/prittyprint" - "github.com/go-flucky/flucky/pkg/types" ) -// Read measured values from sensors -func Read(ctx context.Context, sensors []Sensor) ([]*types.MeasuredValue, error) { +func Read(sensors []Sensor, measuredValueType types.MeasuredValueType) ([]*types.MeasuredValue, error) { - measuredValueChannel := make(chan *types.MeasuredValue, len(sensors)) - errorChannel := make(chan error, len(sensors)) - - ReadChannel(ctx, sensors, measuredValueChannel, errorChannel) - - errors := collect.Errors(errorChannel) - if len(errors) > 0 { - return nil, prittyprint.FormatErrors(errors) + type result struct { + measuredValues []*types.MeasuredValue + err error } - measuredValues := collect.MeasuredValues(measuredValueChannel) + resultChannel := make(chan *result, len(sensors)) - return measuredValues, nil -} - -// ReadChannel reads the measured values from sensors and writes them to a -// channel. -func ReadChannel(ctx context.Context, sensors []Sensor, measuredValueChannel chan<- *types.MeasuredValue, errorChannel chan<- error) { - - wg := new(sync.WaitGroup) - wg.Add(len(sensors)) - - for _, sensor := range sensors { - go sensor.ReadChannel(measuredValueChannel, errorChannel, wg) + // producers + // read measured values + for _, s := range sensors { + go func(s Sensor) { + measuredValues, err := s.Read() + resultChannel <- &result{ + measuredValues: measuredValues, + err: err, + } + }(s) } - wg.Wait() -} - -// ReadContinuously reads the measured values continously from sensors and writes -// them to a channel. -func ReadContinuously(ctx context.Context, sensors []Sensor, measuredValueChannel chan<- *types.MeasuredValue, errorChannel chan<- error) { + // consumer + measuredValues := make([]*types.MeasuredValue, 0) + counter := len(sensors) for { + if counter == 0 { + break + } select { - case <-ctx.Done(): - errorChannel <- fmt.Errorf("Context closed: %v", ctx.Err()) - return - default: - ReadChannel(ctx, sensors, measuredValueChannel, errorChannel) + case result := <-resultChannel: + counter-- + if result.err != nil { + return nil, result.err + } + measuredValues = append(measuredValues, result.measuredValues...) } } + + return types.SelectMeasuredValues(measuredValueType, measuredValues), nil } diff --git a/pkg/types/sensor.go b/pkg/types/sensor.go index 0245979..fdc007f 100644 --- a/pkg/types/sensor.go +++ b/pkg/types/sensor.go @@ -19,6 +19,7 @@ type Sensor struct { SensorModel SensorModel `json:"sensor_model" xml:"sensor_model"` SensorEnabled bool `json:"sensor_enabled" xml:"sensor_enabled"` SensorLastContact *time.Time `json:"sensor_last_contact" xml:"sensor_last_contact"` + TickDuration string `json:"sensor_tick_duration" xml:"sensor_tick_duration"` DeviceID string `json:"device_id" xml:"device_id"` CreationDate time.Time `json:"creation_date" xml:"creation_date"` }