2018-11-20 21:55:06 +00:00
|
|
|
package sensor
|
|
|
|
|
2019-02-24 21:46:36 +00:00
|
|
|
import (
|
2020-09-21 17:36:42 +00:00
|
|
|
"context"
|
2020-05-03 12:04:08 +00:00
|
|
|
"errors"
|
|
|
|
"sync"
|
|
|
|
|
2020-06-10 19:13:05 +00:00
|
|
|
"git.cryptic.systems/volker.raschek/flucky/pkg/types"
|
2019-02-24 21:46:36 +00:00
|
|
|
)
|
|
|
|
|
2020-05-03 12:04:08 +00:00
|
|
|
var (
|
|
|
|
ErrSensorModelNotMatched = errors.New("Sensor model not matched")
|
|
|
|
)
|
2019-06-27 07:31:40 +00:00
|
|
|
|
2020-09-21 17:36:42 +00:00
|
|
|
// FilterMeasuredValuesByTypes filters measured values by type
|
|
|
|
func FilterMeasuredValuesByTypes(ctx context.Context, inChannel <-chan *types.MeasuredValue, measuredValueTypes ...types.MeasuredValueType) <-chan *types.MeasuredValue {
|
2021-05-16 21:02:46 +00:00
|
|
|
outChannel := make(chan *types.MeasuredValue, 1)
|
2020-09-21 17:36:42 +00:00
|
|
|
go func() {
|
|
|
|
LOOP:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case measuredValue, open := <-inChannel:
|
|
|
|
if !open {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := range measuredValueTypes {
|
|
|
|
if measuredValueTypes[i] == measuredValue.ValueType {
|
|
|
|
outChannel <- measuredValue
|
|
|
|
continue LOOP
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
return outChannel
|
|
|
|
}
|
|
|
|
|
|
|
|
// FilterMeasuredValuesBySensorIDs filters measured values by sensor id
|
|
|
|
func FilterMeasuredValuesBySensorIDs(ctx context.Context, inChannel <-chan *types.MeasuredValue, sensorIDs ...string) <-chan *types.MeasuredValue {
|
2021-05-16 21:02:46 +00:00
|
|
|
outChannel := make(chan *types.MeasuredValue, 1)
|
2020-09-21 17:36:42 +00:00
|
|
|
go func() {
|
|
|
|
LOOP:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case measuredValue, open := <-inChannel:
|
|
|
|
if !open {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := range sensorIDs {
|
|
|
|
if sensorIDs[i] == measuredValue.SensorID {
|
|
|
|
outChannel <- measuredValue
|
|
|
|
continue LOOP
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
return outChannel
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReadPipeline pipes for each sensor measured values until the context has been
|
|
|
|
// closed. The returned channels will be closed
|
|
|
|
func ReadPipeline(ctx context.Context, sensors ...Sensor) (<-chan *types.MeasuredValue, <-chan error) {
|
|
|
|
var (
|
2021-05-16 21:02:46 +00:00
|
|
|
errorChannel = make(chan error, 1)
|
|
|
|
measuredValueChannel = make(chan *types.MeasuredValue, 1)
|
2020-09-21 17:36:42 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
wg := new(sync.WaitGroup)
|
|
|
|
for i := range sensors {
|
|
|
|
wg.Add(1)
|
|
|
|
|
|
|
|
go func(s Sensor) {
|
|
|
|
defer wg.Done()
|
|
|
|
measuredValues, err := s.Read()
|
|
|
|
if err != nil {
|
|
|
|
errorChannel <- err
|
|
|
|
return
|
|
|
|
}
|
|
|
|
for i := range measuredValues {
|
|
|
|
measuredValueChannel <- measuredValues[i]
|
|
|
|
}
|
|
|
|
}(sensors[i])
|
|
|
|
}
|
|
|
|
|
|
|
|
wg.Wait()
|
|
|
|
close(errorChannel)
|
|
|
|
close(measuredValueChannel)
|
|
|
|
}()
|
|
|
|
|
|
|
|
return measuredValueChannel, errorChannel
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReadTickingPipeline pipes for every tick on each sensor measured values until
|
|
|
|
// the context has been closed
|
|
|
|
func ReadTickingPipeline(ctx context.Context, sensors ...Sensor) (<-chan *types.MeasuredValue, <-chan error) {
|
|
|
|
var (
|
2021-05-16 21:02:46 +00:00
|
|
|
errorChannel = make(chan error, 1)
|
|
|
|
measuredValueChannel = make(chan *types.MeasuredValue, 1)
|
2020-09-21 17:36:42 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
for i := range sensors {
|
|
|
|
go func(s Sensor) {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-ctx.Done():
|
|
|
|
return
|
|
|
|
case <-s.GetTicker().C:
|
|
|
|
measuredValues, err := s.Read()
|
|
|
|
if err != nil {
|
|
|
|
errorChannel <- err
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := range measuredValues {
|
|
|
|
measuredValueChannel <- measuredValues[i]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}(sensors[i])
|
|
|
|
}
|
|
|
|
|
|
|
|
return measuredValueChannel, errorChannel
|
|
|
|
}
|
|
|
|
|
2020-05-03 12:04:08 +00:00
|
|
|
// New returns a new sensor
|
|
|
|
func New(sensor *types.Sensor) (Sensor, error) {
|
|
|
|
switch sensor.Model {
|
|
|
|
case "BME280":
|
|
|
|
return &BME280{
|
|
|
|
Sensor: sensor,
|
|
|
|
mutex: new(sync.Mutex),
|
|
|
|
}, nil
|
|
|
|
case "DHT11":
|
|
|
|
return &DHT11{
|
|
|
|
Sensor: sensor,
|
|
|
|
mutex: new(sync.Mutex),
|
|
|
|
}, nil
|
|
|
|
case "DHT22":
|
|
|
|
return &DHT22{
|
|
|
|
Sensor: sensor,
|
|
|
|
mutex: new(sync.Mutex),
|
|
|
|
}, nil
|
|
|
|
case "DS18B20":
|
|
|
|
return &DS18B20{
|
|
|
|
Sensor: sensor,
|
|
|
|
mutex: new(sync.Mutex),
|
|
|
|
}, nil
|
|
|
|
default:
|
|
|
|
return nil, ErrSensorModelNotMatched
|
2019-02-24 21:46:36 +00:00
|
|
|
}
|
2019-06-13 19:25:32 +00:00
|
|
|
}
|