Markus Pesch
3a090d190e
changes: - fix: read temperature values without daemon Add subcommand to read temperature values without starting the daemon - fix: implement measured value types Replace measured value types with constants - fix: add sensor pipelines Add functions which returns a channel with measured values - fix: filter measured values from a channel Add functions to filter measured values by sensor id or measured value types.
160 lines
3.4 KiB
Go
160 lines
3.4 KiB
Go
package sensor
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
|
|
"git.cryptic.systems/volker.raschek/flucky/pkg/types"
|
|
)
|
|
|
|
var (
|
|
ErrSensorModelNotMatched = errors.New("Sensor model not matched")
|
|
)
|
|
|
|
// FilterMeasuredValuesByTypes filters measured values by type
|
|
func FilterMeasuredValuesByTypes(ctx context.Context, inChannel <-chan *types.MeasuredValue, measuredValueTypes ...types.MeasuredValueType) <-chan *types.MeasuredValue {
|
|
outChannel := make(chan *types.MeasuredValue, 0)
|
|
go func() {
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case measuredValue, open := <-inChannel:
|
|
if !open {
|
|
return
|
|
}
|
|
|
|
for i := range measuredValueTypes {
|
|
if measuredValueTypes[i] == measuredValue.ValueType {
|
|
outChannel <- measuredValue
|
|
continue LOOP
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return outChannel
|
|
}
|
|
|
|
// FilterMeasuredValuesBySensorIDs filters measured values by sensor id
|
|
func FilterMeasuredValuesBySensorIDs(ctx context.Context, inChannel <-chan *types.MeasuredValue, sensorIDs ...string) <-chan *types.MeasuredValue {
|
|
outChannel := make(chan *types.MeasuredValue, 0)
|
|
go func() {
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case measuredValue, open := <-inChannel:
|
|
if !open {
|
|
return
|
|
}
|
|
|
|
for i := range sensorIDs {
|
|
if sensorIDs[i] == measuredValue.SensorID {
|
|
outChannel <- measuredValue
|
|
continue LOOP
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return outChannel
|
|
}
|
|
|
|
// ReadPipeline pipes for each sensor measured values until the context has been
|
|
// closed. The returned channels will be closed
|
|
func ReadPipeline(ctx context.Context, sensors ...Sensor) (<-chan *types.MeasuredValue, <-chan error) {
|
|
var (
|
|
errorChannel = make(chan error, 0)
|
|
measuredValueChannel = make(chan *types.MeasuredValue, 0)
|
|
)
|
|
|
|
go func() {
|
|
wg := new(sync.WaitGroup)
|
|
for i := range sensors {
|
|
wg.Add(1)
|
|
|
|
go func(s Sensor) {
|
|
defer wg.Done()
|
|
measuredValues, err := s.Read()
|
|
if err != nil {
|
|
errorChannel <- err
|
|
return
|
|
}
|
|
for i := range measuredValues {
|
|
measuredValueChannel <- measuredValues[i]
|
|
}
|
|
}(sensors[i])
|
|
}
|
|
|
|
wg.Wait()
|
|
close(errorChannel)
|
|
close(measuredValueChannel)
|
|
}()
|
|
|
|
return measuredValueChannel, errorChannel
|
|
}
|
|
|
|
// ReadTickingPipeline pipes for every tick on each sensor measured values until
|
|
// the context has been closed
|
|
func ReadTickingPipeline(ctx context.Context, sensors ...Sensor) (<-chan *types.MeasuredValue, <-chan error) {
|
|
var (
|
|
errorChannel = make(chan error, 0)
|
|
measuredValueChannel = make(chan *types.MeasuredValue, 0)
|
|
)
|
|
|
|
for i := range sensors {
|
|
go func(s Sensor) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-s.GetTicker().C:
|
|
measuredValues, err := s.Read()
|
|
if err != nil {
|
|
errorChannel <- err
|
|
break
|
|
}
|
|
|
|
for i := range measuredValues {
|
|
measuredValueChannel <- measuredValues[i]
|
|
}
|
|
}
|
|
}
|
|
}(sensors[i])
|
|
}
|
|
|
|
return measuredValueChannel, errorChannel
|
|
}
|
|
|
|
// New returns a new sensor
|
|
func New(sensor *types.Sensor) (Sensor, error) {
|
|
switch sensor.Model {
|
|
case "BME280":
|
|
return &BME280{
|
|
Sensor: sensor,
|
|
mutex: new(sync.Mutex),
|
|
}, nil
|
|
case "DHT11":
|
|
return &DHT11{
|
|
Sensor: sensor,
|
|
mutex: new(sync.Mutex),
|
|
}, nil
|
|
case "DHT22":
|
|
return &DHT22{
|
|
Sensor: sensor,
|
|
mutex: new(sync.Mutex),
|
|
}, nil
|
|
case "DS18B20":
|
|
return &DS18B20{
|
|
Sensor: sensor,
|
|
mutex: new(sync.Mutex),
|
|
}, nil
|
|
default:
|
|
return nil, ErrSensorModelNotMatched
|
|
}
|
|
}
|