package daemon import ( "context" "fmt" "net/url" "os" "os/signal" "git.cryptic.systems/volker.raschek/flucky/pkg/config" "git.cryptic.systems/volker.raschek/flucky/pkg/repository" "git.cryptic.systems/volker.raschek/flucky/pkg/sensor" "git.cryptic.systems/volker.raschek/flucky/pkg/types" "git.cryptic.systems/volker.raschek/go-logger" ) func Start(cnf *config.Config, flogger logger.Logger) error { measuredValueChannel := make(chan *types.MeasuredValue, 0) // load data source name (dsn) dsnURL, err := url.Parse(cnf.DSN) if err != nil { return err } repo, err := repository.New(dsnURL, flogger) if err != nil { return err } // Add repoDevice, err := repo.GetDevice(cnf.Device.ID) switch { case err != nil: return err case repoDevice == nil: err = repo.AddDevices(cnf.Device) if err != nil { return err } repoDevice, err = repo.GetDevice(cnf.Device.ID) if err != nil { return err } } repoSensors, err := repo.GetSensorsByDeviceID(repoDevice.ID) switch { case err != nil: return err case repoSensors == nil, len(repoSensors) <= 0: return fmt.Errorf("No sensors found") } sensors := make([]sensor.Sensor, 0) for _, repoSensor := range repoSensors { if !repoSensor.Enabled || repoSensor.DeviceID != repoDevice.ID { continue } flogger.Debug("Found sensor %v", repoSensor.GetName()) sensor, err := sensor.New(repoSensor) if err != nil { return err } sensors = append(sensors, sensor) } interruptChannel := make(chan os.Signal, 1) signal.Notify(interruptChannel, os.Interrupt, os.Kill) // Collection parentCtx := context.Background() ctx, cancel := context.WithCancel(parentCtx) for _, s := range sensors { go func(sensor sensor.Sensor) { for { select { case <-ctx.Done(): return case <-sensor.GetTicker().C: measuredValues, err := sensor.Read() if err != nil { flogger.Error("%v", err) continue } for _, measuredValue := range measuredValues { measuredValueChannel <- measuredValue } } } }(s) } measuredValues := make([]*types.MeasuredValue, 0, 10) for { select { case measuredValue := <-measuredValueChannel: flogger.Debug("%v\t%v\t%v", measuredValue.ID, measuredValue.ValueType, measuredValue.Value) measuredValues = append(measuredValues, measuredValue) if cap(measuredValues) == len(measuredValues) { flogger.Debug("Flush cache with %v values", len(measuredValues)) err := repo.AddMeasuredValues(measuredValues...) if err != nil { flogger.Error("%v", err) } measuredValues = make([]*types.MeasuredValue, 0, 10) } case signal := <-interruptChannel: cancel() close(measuredValueChannel) flogger.Info("Stopping daemon: Received process signal %v", signal.String()) flogger.Debug("Flush cache with %v remaining values", len(measuredValues)) err := repo.AddMeasuredValues(measuredValues...) if err != nil { flogger.Error("%v", err) } flogger.Debug("Close repository") err = repo.Close() if err != nil { flogger.Error("%v", err) } return nil } } }