fix: cli temperature read

changes:
- fix: read temperature values without daemon
  Add subcommand to read temperature values without starting the daemon

- fix: implement measured value types
  Replace measured value types with constants

- fix: add sensor pipelines
  Add functions which returns a channel with measured values

- fix: filter measured values from a channel
  Add functions to filter measured values by sensor id or measured
  value types.
This commit is contained in:
Markus Pesch 2020-09-21 19:36:42 +02:00
parent 7cbd80c726
commit 3a090d190e
Signed by: volker.raschek
GPG Key ID: 852BCC170D81A982
17 changed files with 481 additions and 77 deletions

View File

@ -10,6 +10,7 @@ import (
"git.cryptic.systems/volker.raschek/flucky/cli/daemon" "git.cryptic.systems/volker.raschek/flucky/cli/daemon"
"git.cryptic.systems/volker.raschek/flucky/cli/sensor" "git.cryptic.systems/volker.raschek/flucky/cli/sensor"
"git.cryptic.systems/volker.raschek/flucky/cli/temperature"
"git.cryptic.systems/volker.raschek/flucky/pkg/config" "git.cryptic.systems/volker.raschek/flucky/pkg/config"
"github.com/Masterminds/semver" "github.com/Masterminds/semver"
@ -38,6 +39,7 @@ func Execute(version *semver.Version) error {
subCommands := []func(cmd *cobra.Command) error{ subCommands := []func(cmd *cobra.Command) error{
daemon.InitCmd, daemon.InitCmd,
sensor.InitCmd, sensor.InitCmd,
temperature.InitCmd,
} }
for _, subCommand := range subCommands { for _, subCommand := range subCommands {

View File

@ -0,0 +1,133 @@
package temperature
import (
"context"
"fmt"
"net/url"
"os"
"git.cryptic.systems/volker.raschek/flucky/pkg/cli"
"git.cryptic.systems/volker.raschek/flucky/pkg/config"
"git.cryptic.systems/volker.raschek/flucky/pkg/repository"
"git.cryptic.systems/volker.raschek/flucky/pkg/sensor"
"git.cryptic.systems/volker.raschek/flucky/pkg/types"
"git.cryptic.systems/volker.raschek/go-logger"
"github.com/spf13/cobra"
)
func InitCmd(cmd *cobra.Command) error {
temperatureCmd := &cobra.Command{
Use: "temperature",
Short: "Read and list temperature values",
}
readTemperatureCmd := &cobra.Command{
Use: "read",
Short: "Read temperature values from sensors",
RunE: readTemperature,
}
readTemperatureCmd.Flags().Bool("persist", true, "Persist measured values to the repository")
temperatureCmd.AddCommand(readTemperatureCmd)
cmd.AddCommand(temperatureCmd)
return nil
}
func readTemperature(cmd *cobra.Command, args []string) error {
configFile, err := cmd.Flags().GetString("config")
if err != nil {
return fmt.Errorf("No config file defined")
}
persist, err := cmd.Flags().GetBool("persist")
if err != nil {
return fmt.Errorf("Flag persist not defined: %v", err)
}
cnf, err := config.Read(configFile)
if err != nil {
return err
}
dsnURL, err := url.Parse(cnf.DSN)
if err != nil {
return err
}
logLevelString, err := cmd.Flags().GetString("loglevel")
if err != nil {
return err
}
logLevel, err := logger.ParseLogLevel(logLevelString)
if err != nil {
return err
}
flogger := logger.NewLogger(logLevel)
repo, err := repository.New(dsnURL, flogger)
if err != nil {
return err
}
sensorTypes, err := repo.GetSensors()
if err != nil {
return err
}
sensorTypes, err = types.FilterSensorByMeasuredValueTypes(sensorTypes, types.Temperature)
if err != nil {
return err
}
sensors := make([]sensor.Sensor, 0)
for i := range sensorTypes {
s, err := sensor.New(sensorTypes[i])
if err != nil {
return err
}
sensors = append(sensors, s)
}
measuredValueChannel, errorChannel := sensor.ReadPipeline(context.TODO(), sensors...)
go func() {
for {
select {
case err, open := <-errorChannel:
if !open {
return
}
flogger.Error("%v", err)
}
}
}()
measuredValues := make([]*types.MeasuredValue, 0)
LOOP:
for {
select {
case measuredValue, open := <-measuredValueChannel:
if !open {
break LOOP
}
measuredValues = append(measuredValues, measuredValue)
}
}
err = cli.PrintMeasuredValues(measuredValues, os.Stdout)
if err != nil {
return err
}
if persist {
err = repo.AddMeasuredValues(measuredValues...)
if err != nil {
return err
}
}
return nil
}

View File

@ -8,6 +8,25 @@ import (
"git.cryptic.systems/volker.raschek/flucky/pkg/types" "git.cryptic.systems/volker.raschek/flucky/pkg/types"
) )
func PrintMeasuredValues(measuredValues []*types.MeasuredValue, w io.Writer) error {
// declar tabwriter
tw := tabwriter.NewWriter(w, 0, 0, 3, ' ', 0)
fmt.Fprint(tw, "timestamp\ttype\tvalue\n")
for i := range measuredValues {
fmt.Fprintf(tw, "%v\t%v\t%v\n", measuredValues[i].Date.String(), measuredValues[i].ValueType, measuredValues[i].Value)
}
err := tw.Flush()
if err != nil {
return err
}
return nil
}
// PrintSensors displays a list with all configured sensors // PrintSensors displays a list with all configured sensors
func PrintSensors(sensors []*types.Sensor, w io.Writer) error { func PrintSensors(sensors []*types.Sensor, w io.Writer) error {
@ -42,7 +61,10 @@ func PrintSensors(sensors []*types.Sensor, w io.Writer) error {
fmt.Fprintf(tw, "%v\t%v\n", sensor.TickDuration, sensor.Enabled) fmt.Fprintf(tw, "%v\t%v\n", sensor.TickDuration, sensor.Enabled)
} }
tw.Flush() err := tw.Flush()
if err != nil {
return err
}
return nil return nil
} }

View File

@ -16,8 +16,6 @@ import (
func Start(cnf *config.Config, flogger logger.Logger) error { func Start(cnf *config.Config, flogger logger.Logger) error {
measuredValueChannel := make(chan *types.MeasuredValue, 0)
// load data source name (dsn) // load data source name (dsn)
dsnURL, err := url.Parse(cnf.DSN) dsnURL, err := url.Parse(cnf.DSN)
if err != nil { if err != nil {
@ -69,7 +67,7 @@ func Start(cnf *config.Config, flogger logger.Logger) error {
continue continue
} }
flogger.Debug("Found sensor %v", repoSensor.GetName()) flogger.Debug("Found sensor %v", repoSensor.Name)
sensor, err := sensor.New(repoSensor) sensor, err := sensor.New(repoSensor)
if err != nil { if err != nil {
@ -85,25 +83,23 @@ func Start(cnf *config.Config, flogger logger.Logger) error {
parentCtx := context.Background() parentCtx := context.Background()
ctx, cancel := context.WithCancel(parentCtx) ctx, cancel := context.WithCancel(parentCtx)
for _, s := range sensors { measuredValueChannel, errorChannel := sensor.ReadTickingPipeline(ctx, sensors...)
go func(sensor sensor.Sensor) {
go func() {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-sensor.GetTicker().C: case err, open := <-errorChannel:
measuredValues, err := sensor.Read() if !open {
return
}
if err != nil { if err != nil {
flogger.Error("%v", err) flogger.Error("%v", err)
continue
}
for _, measuredValue := range measuredValues {
measuredValueChannel <- measuredValue
} }
} }
} }
}(s) }()
}
measuredValues := make([]*types.MeasuredValue, 0, 10) measuredValues := make([]*types.MeasuredValue, 0, 10)
for { for {
@ -123,7 +119,6 @@ func Start(cnf *config.Config, flogger logger.Logger) error {
case signal := <-interruptChannel: case signal := <-interruptChannel:
cancel() cancel()
close(measuredValueChannel)
flogger.Info("Stopping daemon: Received process signal %v", signal.String()) flogger.Info("Stopping daemon: Received process signal %v", signal.String())

View File

@ -115,7 +115,7 @@ func (postgres *Postgres) InsertDevices(ctx context.Context, devices ...*types.D
// InsertMeasuredValues into the database // InsertMeasuredValues into the database
func (postgres *Postgres) InsertMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error { func (postgres *Postgres) InsertMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error {
splittedMeasuredValues := make(map[string][]*types.MeasuredValue, 0) splittedMeasuredValues := make(map[types.MeasuredValueType][]*types.MeasuredValue, 0)
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
if _, ok := splittedMeasuredValues[measuredValue.ValueType]; !ok { if _, ok := splittedMeasuredValues[measuredValue.ValueType]; !ok {
@ -164,11 +164,11 @@ func (postgres *Postgres) InsertMeasuredValues(ctx context.Context, measuredValu
var queryFile string var queryFile string
switch measuredValueType { switch measuredValueType {
case "humidity": case types.Humidity:
queryFile = "insertHumidity.sql" queryFile = "insertHumidity.sql"
case "pressure": case types.Pressure:
queryFile = "insertPressure.sql" queryFile = "insertPressure.sql"
case "temperature": case types.Temperature:
queryFile = "insertTemperature.sql" queryFile = "insertTemperature.sql"
default: default:
tx.Rollback() tx.Rollback()
@ -361,7 +361,7 @@ func (postgres *Postgres) SelectHumidity(ctx context.Context, id string) (*types
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "humidity" measuredValue.ValueType = types.Humidity
} }
return measuredValues[0], nil return measuredValues[0], nil
@ -391,7 +391,7 @@ func (postgres *Postgres) SelectHumidities(ctx context.Context) ([]*types.Measur
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "humidity" measuredValue.ValueType = types.Humidity
} }
return measuredValues, nil return measuredValues, nil
@ -462,7 +462,7 @@ func (postgres *Postgres) SelectPressure(ctx context.Context, id string) (*types
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "pressure" measuredValue.ValueType = types.Pressure
} }
return measuredValues[0], nil return measuredValues[0], nil
@ -492,7 +492,7 @@ func (postgres *Postgres) SelectPressures(ctx context.Context) ([]*types.Measure
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "pressure" measuredValue.ValueType = types.Pressure
} }
return measuredValues, nil return measuredValues, nil
@ -622,7 +622,7 @@ func (postgres *Postgres) SelectTemperature(ctx context.Context, id string) (*ty
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "temperatures" measuredValue.ValueType = types.Temperature
} }
return measuredValues[0], nil return measuredValues[0], nil
@ -652,7 +652,7 @@ func (postgres *Postgres) SelectTemperatures(ctx context.Context) ([]*types.Meas
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "temperatures" measuredValue.ValueType = types.Temperature
} }
return measuredValues, nil return measuredValues, nil

View File

@ -114,7 +114,7 @@ func (sqlite *SQLite) InsertDevices(ctx context.Context, devices ...*types.Devic
// InsertMeasuredValues into the database // InsertMeasuredValues into the database
func (sqlite *SQLite) InsertMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error { func (sqlite *SQLite) InsertMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error {
splittedMeasuredValues := make(map[string][]*types.MeasuredValue, 0) splittedMeasuredValues := make(map[types.MeasuredValueType][]*types.MeasuredValue, 0)
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
if _, ok := splittedMeasuredValues[measuredValue.ValueType]; !ok { if _, ok := splittedMeasuredValues[measuredValue.ValueType]; !ok {
@ -163,11 +163,11 @@ func (sqlite *SQLite) InsertMeasuredValues(ctx context.Context, measuredValues .
var queryFile string var queryFile string
switch measuredValueType { switch measuredValueType {
case "humidity": case types.Humidity:
queryFile = "insertHumidity.sql" queryFile = "insertHumidity.sql"
case "pressure": case types.Pressure:
queryFile = "insertPressure.sql" queryFile = "insertPressure.sql"
case "temperature": case types.Temperature:
queryFile = "insertTemperature.sql" queryFile = "insertTemperature.sql"
default: default:
tx.Rollback() tx.Rollback()
@ -360,7 +360,7 @@ func (sqlite *SQLite) SelectHumidity(ctx context.Context, id string) (*types.Mea
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "humidity" measuredValue.ValueType = types.Humidity
} }
return measuredValues[0], nil return measuredValues[0], nil
@ -390,7 +390,7 @@ func (sqlite *SQLite) SelectHumidities(ctx context.Context) ([]*types.MeasuredVa
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "humidity" measuredValue.ValueType = types.Humidity
} }
return measuredValues, nil return measuredValues, nil
@ -461,7 +461,7 @@ func (sqlite *SQLite) SelectPressure(ctx context.Context, id string) (*types.Mea
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "pressure" measuredValue.ValueType = types.Pressure
} }
return measuredValues[0], nil return measuredValues[0], nil
@ -491,7 +491,7 @@ func (sqlite *SQLite) SelectPressures(ctx context.Context) ([]*types.MeasuredVal
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "pressure" measuredValue.ValueType = types.Pressure
} }
return measuredValues, nil return measuredValues, nil
@ -621,7 +621,7 @@ func (sqlite *SQLite) SelectTemperature(ctx context.Context, id string) (*types.
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "temperatures" measuredValue.ValueType = types.Temperature
} }
return measuredValues[0], nil return measuredValues[0], nil
@ -651,7 +651,7 @@ func (sqlite *SQLite) SelectTemperatures(ctx context.Context) ([]*types.Measured
} }
for _, measuredValue := range measuredValues { for _, measuredValue := range measuredValues {
measuredValue.ValueType = "temperatures" measuredValue.ValueType = types.Temperature
} }
return measuredValues, nil return measuredValues, nil

View File

@ -98,8 +98,28 @@ func (repo *Repository) GetSensor(sensorID string) (*types.Sensor, error) {
// GetSensors returns all sensors. If no sensors has been found, the function // GetSensors returns all sensors. If no sensors has been found, the function
// returns nil. // returns nil.
func (repo *Repository) GetSensors() ([]*types.Sensor, error) { func (repo *Repository) GetSensors(models ...string) ([]*types.Sensor, error) {
return repo.database.SelectSensors(context.Background()) sensors, err := repo.database.SelectSensors(context.Background())
switch {
case err != nil:
return nil, err
case len(models) > 0:
cachedSensors := make([]*types.Sensor, 0)
LOOP:
for i := range sensors {
for j := range models {
if strings.ToLower(sensors[i].Model) == strings.ToLower(models[j]) {
cachedSensors = append(cachedSensors, sensors[i])
continue LOOP
}
}
}
return cachedSensors, nil
case len(models) <= 0:
fallthrough
default:
return sensors, err
}
} }
// GetSensorsByDeviceID returns all sensors by a device id. If no sensor has // GetSensorsByDeviceID returns all sensors by a device id. If no sensor has

View File

@ -188,6 +188,26 @@ func testBackend(t *testing.T, repo *repository.Repository) {
require.NoError(err) require.NoError(err)
require.Len(sensors, len(expectedSensors)) require.Len(sensors, len(expectedSensors))
sensors, err = repo.GetSensors("BME280")
require.NoError(err)
require.Len(sensors, 1)
require.JSONEq(jsonEncoder(expectedSensors[2]), jsonEncoder(sensors[0]))
sensors, err = repo.GetSensors("DS18B20")
require.NoError(err)
require.Len(sensors, 1)
require.JSONEq(jsonEncoder(expectedSensors[1]), jsonEncoder(sensors[0]))
sensors, err = repo.GetSensors("DHT11")
require.NoError(err)
require.Len(sensors, 1)
require.JSONEq(jsonEncoder(expectedSensors[0]), jsonEncoder(sensors[0]))
sensors, err = repo.GetSensors("DHT11", "DS18B20")
require.NoError(err)
require.Len(sensors, 2)
require.ElementsMatch(expectedSensors[0:2], sensors)
// Test: GetSensor // Test: GetSensor
sensor, err := repo.GetSensor(expectedSensors[0].ID) sensor, err := repo.GetSensor(expectedSensors[0].ID)
require.NoError(err) require.NoError(err)
@ -278,7 +298,7 @@ func testBackend(t *testing.T, repo *repository.Repository) {
{ {
ID: "2e5a297a-3da0-46ae-89d2-0fcab0f1d5f7", ID: "2e5a297a-3da0-46ae-89d2-0fcab0f1d5f7",
Value: 32, Value: 32,
ValueType: "humidity", ValueType: types.Humidity,
Date: *timeNow(require), Date: *timeNow(require),
SensorID: "8c74397f-8e60-4c9d-960d-3197747cef9a", SensorID: "8c74397f-8e60-4c9d-960d-3197747cef9a",
CreationDate: *timeNow(require), CreationDate: *timeNow(require),
@ -287,7 +307,7 @@ func testBackend(t *testing.T, repo *repository.Repository) {
{ {
ID: "d69f1b62-0c6c-4058-b42c-4a2821bd220c", ID: "d69f1b62-0c6c-4058-b42c-4a2821bd220c",
Value: 38, Value: 38,
ValueType: "pressure", ValueType: types.Pressure,
Date: *timeNow(require), Date: *timeNow(require),
SensorID: "8c74397f-8e60-4c9d-960d-3197747cef9a", SensorID: "8c74397f-8e60-4c9d-960d-3197747cef9a",
CreationDate: *timeNow(require), CreationDate: *timeNow(require),
@ -296,7 +316,7 @@ func testBackend(t *testing.T, repo *repository.Repository) {
{ {
ID: "ea945ae0-412b-4561-a191-1f8f1f909fa4", ID: "ea945ae0-412b-4561-a191-1f8f1f909fa4",
Value: 35.4, Value: 35.4,
ValueType: "temperature", ValueType: types.Temperature,
Date: *timeNow(require), Date: *timeNow(require),
SensorID: "8c74397f-8e60-4c9d-960d-3197747cef9a", SensorID: "8c74397f-8e60-4c9d-960d-3197747cef9a",
CreationDate: *timeNow(require), CreationDate: *timeNow(require),

View File

@ -66,21 +66,21 @@ func (bme280 *BME280) Read() ([]*types.MeasuredValue, error) {
{ {
ID: uuid.NewV4().String(), ID: uuid.NewV4().String(),
Value: float64(humidityValue), Value: float64(humidityValue),
ValueType: "humidity", ValueType: types.Humidity,
Date: format.FormatedTime(), Date: format.FormatedTime(),
SensorID: bme280.ID, SensorID: bme280.ID,
}, },
{ {
ID: uuid.NewV4().String(), ID: uuid.NewV4().String(),
Value: float64(pressureValue), Value: float64(pressureValue),
ValueType: "pressure", ValueType: types.Pressure,
Date: format.FormatedTime(), Date: format.FormatedTime(),
SensorID: bme280.ID, SensorID: bme280.ID,
}, },
{ {
ID: uuid.NewV4().String(), ID: uuid.NewV4().String(),
Value: float64(temperatureValue), Value: float64(temperatureValue),
ValueType: "temperature", ValueType: types.Temperature,
Date: format.FormatedTime(), Date: format.FormatedTime(),
SensorID: bme280.ID, SensorID: bme280.ID,
}, },

View File

@ -42,14 +42,14 @@ func (dht11 *DHT11) Read() ([]*types.MeasuredValue, error) {
{ {
ID: uuid.NewV4().String(), ID: uuid.NewV4().String(),
Value: float64(humidityValue), Value: float64(humidityValue),
ValueType: "humidity", ValueType: types.Humidity,
Date: format.FormatedTime(), Date: format.FormatedTime(),
SensorID: dht11.ID, SensorID: dht11.ID,
}, },
{ {
ID: uuid.NewV4().String(), ID: uuid.NewV4().String(),
Value: float64(temperatureValue), Value: float64(temperatureValue),
ValueType: "temperature", ValueType: types.Temperature,
Date: format.FormatedTime(), Date: format.FormatedTime(),
SensorID: dht11.ID, SensorID: dht11.ID,
}, },

View File

@ -42,14 +42,14 @@ func (dht22 *DHT22) Read() ([]*types.MeasuredValue, error) {
{ {
ID: uuid.NewV4().String(), ID: uuid.NewV4().String(),
Value: float64(humidityValue), Value: float64(humidityValue),
ValueType: "humidity", ValueType: types.Humidity,
Date: format.FormatedTime(), Date: format.FormatedTime(),
SensorID: dht22.ID, SensorID: dht22.ID,
}, },
{ {
ID: uuid.NewV4().String(), ID: uuid.NewV4().String(),
Value: float64(temperatureValue), Value: float64(temperatureValue),
ValueType: "temperature", ValueType: types.Temperature,
Date: format.FormatedTime(), Date: format.FormatedTime(),
SensorID: dht22.ID, SensorID: dht22.ID,
}, },

View File

@ -59,7 +59,7 @@ func (ds18b20 *DS18B20) Read() ([]*types.MeasuredValue, error) {
{ {
ID: uuid.NewV4().String(), ID: uuid.NewV4().String(),
Value: float64(temperatureValue), Value: float64(temperatureValue),
ValueType: "temperature", ValueType: types.Temperature,
Date: format.FormatedTime(), Date: format.FormatedTime(),
SensorID: ds18b20.ID, SensorID: ds18b20.ID,
}, },

View File

@ -7,7 +7,8 @@ import (
) )
type Sensor interface { type Sensor interface {
GetID() string
GetTicker() *time.Ticker GetTicker() *time.Ticker
// Read single measured values from sensor
Read() ([]*types.MeasuredValue, error) Read() ([]*types.MeasuredValue, error)
} }

View File

@ -1,6 +1,7 @@
package sensor package sensor
import ( import (
"context"
"errors" "errors"
"sync" "sync"
@ -11,6 +12,124 @@ var (
ErrSensorModelNotMatched = errors.New("Sensor model not matched") ErrSensorModelNotMatched = errors.New("Sensor model not matched")
) )
// FilterMeasuredValuesByTypes filters measured values by type
func FilterMeasuredValuesByTypes(ctx context.Context, inChannel <-chan *types.MeasuredValue, measuredValueTypes ...types.MeasuredValueType) <-chan *types.MeasuredValue {
outChannel := make(chan *types.MeasuredValue, 0)
go func() {
LOOP:
for {
select {
case <-ctx.Done():
return
case measuredValue, open := <-inChannel:
if !open {
return
}
for i := range measuredValueTypes {
if measuredValueTypes[i] == measuredValue.ValueType {
outChannel <- measuredValue
continue LOOP
}
}
}
}
}()
return outChannel
}
// FilterMeasuredValuesBySensorIDs filters measured values by sensor id
func FilterMeasuredValuesBySensorIDs(ctx context.Context, inChannel <-chan *types.MeasuredValue, sensorIDs ...string) <-chan *types.MeasuredValue {
outChannel := make(chan *types.MeasuredValue, 0)
go func() {
LOOP:
for {
select {
case <-ctx.Done():
return
case measuredValue, open := <-inChannel:
if !open {
return
}
for i := range sensorIDs {
if sensorIDs[i] == measuredValue.SensorID {
outChannel <- measuredValue
continue LOOP
}
}
}
}
}()
return outChannel
}
// ReadPipeline pipes for each sensor measured values until the context has been
// closed. The returned channels will be closed
func ReadPipeline(ctx context.Context, sensors ...Sensor) (<-chan *types.MeasuredValue, <-chan error) {
var (
errorChannel = make(chan error, 0)
measuredValueChannel = make(chan *types.MeasuredValue, 0)
)
go func() {
wg := new(sync.WaitGroup)
for i := range sensors {
wg.Add(1)
go func(s Sensor) {
defer wg.Done()
measuredValues, err := s.Read()
if err != nil {
errorChannel <- err
return
}
for i := range measuredValues {
measuredValueChannel <- measuredValues[i]
}
}(sensors[i])
}
wg.Wait()
close(errorChannel)
close(measuredValueChannel)
}()
return measuredValueChannel, errorChannel
}
// ReadTickingPipeline pipes for every tick on each sensor measured values until
// the context has been closed
func ReadTickingPipeline(ctx context.Context, sensors ...Sensor) (<-chan *types.MeasuredValue, <-chan error) {
var (
errorChannel = make(chan error, 0)
measuredValueChannel = make(chan *types.MeasuredValue, 0)
)
for i := range sensors {
go func(s Sensor) {
for {
select {
case <-ctx.Done():
return
case <-s.GetTicker().C:
measuredValues, err := s.Read()
if err != nil {
errorChannel <- err
break
}
for i := range measuredValues {
measuredValueChannel <- measuredValues[i]
}
}
}
}(sensors[i])
}
return measuredValueChannel, errorChannel
}
// New returns a new sensor // New returns a new sensor
func New(sensor *types.Sensor) (Sensor, error) { func New(sensor *types.Sensor) (Sensor, error) {
switch sensor.Model { switch sensor.Model {

View File

@ -10,9 +10,17 @@ import (
type MeasuredValue struct { type MeasuredValue struct {
ID string `json:"id" xml:"id"` ID string `json:"id" xml:"id"`
Value float64 `json:"value,string" xml:"value,string"` Value float64 `json:"value,string" xml:"value,string"`
ValueType string `json:"value_type" xml:"value_type"` ValueType MeasuredValueType `json:"value_type" xml:"value_type"`
Date time.Time `json:"date" xml:"date"` Date time.Time `json:"date" xml:"date"`
SensorID string `json:"sensor_id" xml:"sensor_id"` SensorID string `json:"sensor_id" xml:"sensor_id"`
CreationDate time.Time `json:"creation_date" xml:"creation_date"` CreationDate time.Time `json:"creation_date" xml:"creation_date"`
UpdateDate *time.Time `json:"update_date" xml:"update_date"` UpdateDate *time.Time `json:"update_date" xml:"update_date"`
} }
type MeasuredValueType string
const (
Humidity MeasuredValueType = "humidity"
Pressure = "pressure"
Temperature = "temperature"
)

View File

@ -1,6 +1,7 @@
package types package types
import ( import (
"fmt"
"time" "time"
) )
@ -22,21 +23,6 @@ type Sensor struct {
UpdateDate *time.Time `json:"update_date" xml:"update_date"` UpdateDate *time.Time `json:"update_date" xml:"update_date"`
} }
// GetID returns the UUID of the sensor.
func (s *Sensor) GetID() string {
return s.ID
}
// GetDeviceID returns the UUID of the configured device.
func (s *Sensor) GetDeviceID() string {
return s.DeviceID
}
// GetName returns the name of the sensor.
func (s *Sensor) GetName() string {
return s.Name
}
// GetTicker returns a new ticker, which tick every when the sensor should be // GetTicker returns a new ticker, which tick every when the sensor should be
// read // read
func (s *Sensor) GetTicker() *time.Ticker { func (s *Sensor) GetTicker() *time.Ticker {
@ -46,3 +32,34 @@ func (s *Sensor) GetTicker() *time.Ticker {
} }
return time.NewTicker(duration) return time.NewTicker(duration)
} }
// FilterSensorByMeasuredValueTypes filters sensors by the measured values types
// which they measure
func FilterSensorByMeasuredValueTypes(sensors []*Sensor, measuredValueTypes ...MeasuredValueType) ([]*Sensor, error) {
cachedSensors := make([]*Sensor, 0)
mapping := map[MeasuredValueType][]string{
Humidity: {"BME280", "DHT11", "DHT22"},
Pressure: {"BME280"},
Temperature: {"BME280", "DHT11", "DHT22", "DS18B20"},
}
for i := range measuredValueTypes {
mappedSensors, ok := mapping[measuredValueTypes[i]]
if !ok {
return nil, fmt.Errorf("No mapping for measured values type %v available", measuredValueTypes[i])
}
LOOP:
for j := range sensors {
for k := range mappedSensors {
if sensors[j].Model == mappedSensors[k] {
cachedSensors = append(cachedSensors, sensors[j])
continue LOOP
}
}
}
}
return cachedSensors, nil
}

67
pkg/types/sensor_test.go Normal file
View File

@ -0,0 +1,67 @@
package types_test
import (
"testing"
"git.cryptic.systems/volker.raschek/flucky/pkg/types"
"github.com/stretchr/testify/require"
)
func TestFilterSensorByMeasuredValueTypes(t *testing.T) {
require := require.New(t)
testCases := []struct {
sensors []*types.Sensor
expectedSensors []*types.Sensor
measuredValueTypes []types.MeasuredValueType
}{
{
sensors: []*types.Sensor{
{ID: "af7f845f-9a79-4638-8005-292754367d33", Model: "BME280"},
{ID: "2d2c20e4-2628-4660-9f8c-a301b4e5933b", Model: "DHT11"},
{ID: "30e69d52-c0cd-48a7-841a-3918b2ed5941", Model: "DHT22"},
{ID: "2d1ecf2e-2dc6-48f6-8158-822cab6e561b", Model: "DS18B20"},
},
expectedSensors: []*types.Sensor{
{ID: "af7f845f-9a79-4638-8005-292754367d33", Model: "BME280"},
{ID: "2d2c20e4-2628-4660-9f8c-a301b4e5933b", Model: "DHT11"},
{ID: "30e69d52-c0cd-48a7-841a-3918b2ed5941", Model: "DHT22"},
{ID: "2d1ecf2e-2dc6-48f6-8158-822cab6e561b", Model: "DS18B20"},
},
measuredValueTypes: []types.MeasuredValueType{types.Temperature},
},
{
sensors: []*types.Sensor{
{ID: "af7f845f-9a79-4638-8005-292754367d33", Model: "BME280"},
{ID: "2d2c20e4-2628-4660-9f8c-a301b4e5933b", Model: "DHT11"},
{ID: "30e69d52-c0cd-48a7-841a-3918b2ed5941", Model: "DHT22"},
{ID: "2d1ecf2e-2dc6-48f6-8158-822cab6e561b", Model: "DS18B20"},
},
expectedSensors: []*types.Sensor{
{ID: "af7f845f-9a79-4638-8005-292754367d33", Model: "BME280"},
{ID: "2d2c20e4-2628-4660-9f8c-a301b4e5933b", Model: "DHT11"},
{ID: "30e69d52-c0cd-48a7-841a-3918b2ed5941", Model: "DHT22"},
},
measuredValueTypes: []types.MeasuredValueType{types.Humidity},
},
{
sensors: []*types.Sensor{
{ID: "af7f845f-9a79-4638-8005-292754367d33", Model: "BME280"},
{ID: "2d2c20e4-2628-4660-9f8c-a301b4e5933b", Model: "DHT11"},
{ID: "30e69d52-c0cd-48a7-841a-3918b2ed5941", Model: "DHT22"},
{ID: "2d1ecf2e-2dc6-48f6-8158-822cab6e561b", Model: "DS18B20"},
},
expectedSensors: []*types.Sensor{
{ID: "af7f845f-9a79-4638-8005-292754367d33", Model: "BME280"},
},
measuredValueTypes: []types.MeasuredValueType{types.Pressure},
},
}
for i := range testCases {
s, err := types.FilterSensorByMeasuredValueTypes(testCases[i].sensors, testCases[i].measuredValueTypes...)
require.NoError(err)
require.ElementsMatch(testCases[i].expectedSensors, s)
require.Equal(len(testCases[i].expectedSensors), len(s))
}
}