package daemon import ( "context" "fmt" "net/url" "os" "os/signal" "syscall" "git.cryptic.systems/volker.raschek/flucky/pkg/config" "git.cryptic.systems/volker.raschek/flucky/pkg/repository" "git.cryptic.systems/volker.raschek/flucky/pkg/sensor" "git.cryptic.systems/volker.raschek/flucky/pkg/types" "git.cryptic.systems/volker.raschek/go-logger" ) func Start(cnf *config.Config, cachedEntries uint, flogger logger.Logger) error { // load data source name (dsn) dsnURL, err := url.Parse(cnf.DSN) if err != nil { return err } repo, err := repository.New(dsnURL, flogger) if err != nil { return err } ctx := context.Background() // Add repoDevice, err := repo.GetDeviceByID(ctx, cnf.DeviceID) switch { case err != nil: return err case repoDevice == nil: hostname, err := os.Hostname() if err != nil { return err } err = repo.AddDevices(ctx, &types.Device{ ID: cnf.DeviceID, Name: hostname, }) if err != nil { return err } repoDevice, err = repo.GetDeviceByID(ctx, cnf.DeviceID) if err != nil { return err } } repoSensors, err := repo.GetSensorsByDeviceIDs(ctx, repoDevice.ID) switch { case err != nil: return err case repoSensors == nil, len(repoSensors) <= 0: return fmt.Errorf("No sensors found") } sensors := make([]sensor.Sensor, 0) for _, repoSensor := range repoSensors { if !repoSensor.Enabled || repoSensor.DeviceID != repoDevice.ID { continue } flogger.Debug("Found sensor %v", repoSensor.Name) sensor, err := sensor.New(repoSensor) if err != nil { return err } sensors = append(sensors, sensor) } interruptChannel := make(chan os.Signal, 1) signal.Notify(interruptChannel, syscall.SIGTERM) // Collection parentCtx := context.Background() ctx, cancel := context.WithCancel(parentCtx) measuredValueChannel, errorChannel := sensor.ReadTickingPipeline(ctx, sensors...) go func() { for { select { case <-ctx.Done(): return case err, open := <-errorChannel: if !open { return } if err != nil { flogger.Error("%v", err) } } } }() measuredValues := make([]*types.MeasuredValue, 0, cachedEntries) for { select { case measuredValue := <-measuredValueChannel: flogger.Debug("%v\t%v\t%v", measuredValue.ID, measuredValue.ValueType, measuredValue.Value) measuredValues = append(measuredValues, measuredValue) if cap(measuredValues) == len(measuredValues) { flogger.Debug("Flush cache with %v values", len(measuredValues)) err := repo.AddMeasuredValues(ctx, measuredValues...) if err != nil { flogger.Error("%v", err) } measuredValues = make([]*types.MeasuredValue, 0, cachedEntries) } case signal := <-interruptChannel: cancel() flogger.Info("Stopping daemon: Received process signal %v", signal.String()) flogger.Debug("Flush cache with %v remaining values", len(measuredValues)) err := repo.AddMeasuredValues(ctx, measuredValues...) if err != nil { flogger.Error("%v", err) } flogger.Debug("Close repository") err = repo.Close() if err != nil { flogger.Error("%v", err) } return nil } } }