flucky/pkg/sensor/sensor.go

160 lines
3.4 KiB
Go

package sensor
import (
"context"
"errors"
"sync"
"git.cryptic.systems/volker.raschek/flucky/pkg/types"
)
var (
ErrSensorModelNotMatched = errors.New("Sensor model not matched")
)
// FilterMeasuredValuesByTypes filters measured values by type
func FilterMeasuredValuesByTypes(ctx context.Context, inChannel <-chan *types.MeasuredValue, measuredValueTypes ...types.MeasuredValueType) <-chan *types.MeasuredValue {
outChannel := make(chan *types.MeasuredValue, 1)
go func() {
LOOP:
for {
select {
case <-ctx.Done():
return
case measuredValue, open := <-inChannel:
if !open {
return
}
for i := range measuredValueTypes {
if measuredValueTypes[i] == measuredValue.ValueType {
outChannel <- measuredValue
continue LOOP
}
}
}
}
}()
return outChannel
}
// FilterMeasuredValuesBySensorIDs filters measured values by sensor id
func FilterMeasuredValuesBySensorIDs(ctx context.Context, inChannel <-chan *types.MeasuredValue, sensorIDs ...string) <-chan *types.MeasuredValue {
outChannel := make(chan *types.MeasuredValue, 1)
go func() {
LOOP:
for {
select {
case <-ctx.Done():
return
case measuredValue, open := <-inChannel:
if !open {
return
}
for i := range sensorIDs {
if sensorIDs[i] == measuredValue.SensorID {
outChannel <- measuredValue
continue LOOP
}
}
}
}
}()
return outChannel
}
// ReadPipeline pipes for each sensor measured values until the context has been
// closed. The returned channels will be closed
func ReadPipeline(ctx context.Context, sensors ...Sensor) (<-chan *types.MeasuredValue, <-chan error) {
var (
errorChannel = make(chan error, 1)
measuredValueChannel = make(chan *types.MeasuredValue, 1)
)
go func() {
wg := new(sync.WaitGroup)
for i := range sensors {
wg.Add(1)
go func(s Sensor) {
defer wg.Done()
measuredValues, err := s.Read()
if err != nil {
errorChannel <- err
return
}
for i := range measuredValues {
measuredValueChannel <- measuredValues[i]
}
}(sensors[i])
}
wg.Wait()
close(errorChannel)
close(measuredValueChannel)
}()
return measuredValueChannel, errorChannel
}
// ReadTickingPipeline pipes for every tick on each sensor measured values until
// the context has been closed
func ReadTickingPipeline(ctx context.Context, sensors ...Sensor) (<-chan *types.MeasuredValue, <-chan error) {
var (
errorChannel = make(chan error, 1)
measuredValueChannel = make(chan *types.MeasuredValue, 1)
)
for i := range sensors {
go func(s Sensor) {
for {
select {
case <-ctx.Done():
return
case <-s.GetTicker().C:
measuredValues, err := s.Read()
if err != nil {
errorChannel <- err
break
}
for i := range measuredValues {
measuredValueChannel <- measuredValues[i]
}
}
}
}(sensors[i])
}
return measuredValueChannel, errorChannel
}
// New returns a new sensor
func New(sensor *types.Sensor) (Sensor, error) {
switch sensor.Model {
case "BME280":
return &BME280{
Sensor: sensor,
mutex: new(sync.Mutex),
}, nil
case "DHT11":
return &DHT11{
Sensor: sensor,
mutex: new(sync.Mutex),
}, nil
case "DHT22":
return &DHT22{
Sensor: sensor,
mutex: new(sync.Mutex),
}, nil
case "DS18B20":
return &DS18B20{
Sensor: sensor,
mutex: new(sync.Mutex),
}, nil
default:
return nil, ErrSensorModelNotMatched
}
}