package daemon import ( "context" "net/url" "os" "os/signal" "syscall" "github.com/volker-raschek/flucky/pkg/config" "github.com/volker-raschek/flucky/pkg/repository" "github.com/volker-raschek/flucky/pkg/sensor" "github.com/volker-raschek/flucky/pkg/types" "github.com/volker-raschek/go-logger/pkg/logger" ) func Start(cnf *config.Config, flogger logger.Logger) error { sensors := make([]sensor.Sensor, 0) for _, cnfSensor := range cnf.Sensors { if !cnfSensor.Enabled { continue } sensor, err := sensor.New(cnfSensor) if err != nil { return err } sensors = append(sensors, sensor) } measuredValueChannel := make(chan *types.MeasuredValue, 0) // load storage endpoint storageEndpointURL, err := url.Parse(cnf.StorageEndpoint) if err != nil { return err } repo, err := repository.New(storageEndpointURL, flogger) if err != nil { return err } interruptChannel := make(chan os.Signal, 1) signal.Notify(interruptChannel, os.Kill, syscall.SIGTERM) // Collection parentCtx := context.Background() // Insert device if not exist device, _ := repo.GetDevice(cnf.Device.ID) if device == nil { if err := repo.AddDevices(cnf.Device); err != nil { return err } } // Insert sensors if not exist for _, cnfSensor := range cnf.Sensors { sensor, _ := repo.GetSensor(cnfSensor.ID) if sensor == nil { if err := repo.AddSensors(cnfSensor); err != nil { return err } } } ctx, cancel := context.WithCancel(parentCtx) for _, s := range sensors { go func(sensor sensor.Sensor) { for { select { case <-ctx.Done(): return case <-sensor.GetTicker().C: measuredValues, err := sensor.Read() if err != nil { flogger.Error("%v", err) continue } for _, measuredValue := range measuredValues { measuredValueChannel <- measuredValue } } } }(s) } measuredValues := make([]*types.MeasuredValue, 0, 10) for { select { case measuredValue := <-measuredValueChannel: flogger.Debug("%v\t%v\t%v", measuredValue.ID, measuredValue.ValueType, measuredValue.Value) measuredValues = append(measuredValues, measuredValue) if cap(measuredValues) == len(measuredValues) { flogger.Debug("Flush cache") err := repo.AddMeasuredValues(measuredValues...) if err != nil { flogger.Error("%v", err) } measuredValues = make([]*types.MeasuredValue, 0, 10) } case <-interruptChannel: cancel() close(measuredValueChannel) err := repo.AddMeasuredValues(measuredValues...) if err != nil { flogger.Error("%v", err) } break } } }