package daemon import ( "context" "fmt" "os" "os/signal" "syscall" "time" "github.com/Masterminds/semver" "github.com/go-flucky/flucky/pkg/config" "github.com/go-flucky/flucky/pkg/sensor" "github.com/go-flucky/flucky/pkg/storage" "github.com/go-flucky/flucky/pkg/storage/db" "github.com/go-flucky/flucky/pkg/types" "github.com/volker-raschek/go-logger/pkg/logger" ) var ( postgresHost = "markus-pc.trier.cryptic.systems" postgresPort = "5432" postgresDatabase = "postgres" postgresUser = "postgres" postgresPassword = "postgres" flogger = logger.NewSilentLogger() ) func SetLogger(logger logger.Logger) { flogger = logger } // Start the daemon func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compression bool, round float64, version *semver.Version) { // Context parentCtx := context.Background() ctx, cancel := context.WithCancel(parentCtx) // Ticker // saveTicker := time.Tick(cleanCacheInterval) // channels debugChannel := make(chan string, 0) infoChannel := make(chan string, 0) warnChannel := make(chan string, 0) errorChannel := make(chan error, 0) fatalChannel := make(chan error, 1) interruptChannel := make(chan os.Signal, 1) signal.Notify(interruptChannel, os.Interrupt, os.Kill, syscall.SIGTERM) measuredValueChannel := make(chan *types.MeasuredValue, 0) // Info flogger.Info("Use clean-cache-interval: %v", cleanCacheInterval.String()) flogger.Info("Use compression: %v", compression) flogger.Info("Round: %v", round) ticker := time.Tick(cleanCacheInterval) measuredValuesCache := make([]*types.MeasuredValue, 0) // measuredValuesLogfile := logfile.New(cnf.Logfile) // Init semaphoreChannel semaphoreChannels := make(map[string]chan struct{}) for _, sensor := range cnf.GetSensors(config.ENABLED) { semaphoreChannels[sensor.GetID()] = make(chan struct{}, 1) } // Start producers for _, s := range cnf.GetSensors(config.ENABLED) { // start go routine for each sensor go func(sensor sensor.Sensor) { // run forever for { select { case <-ctx.Done(): errorChannel <- fmt.Errorf("Closed context: %v", ctx.Err().Error()) return case <-semaphoreChannels[sensor.GetID()]: measuredValues, err := sensor.Read() if err != nil { errorChannel <- err return } for _, measmeasuredValue := range measuredValues { measuredValueChannel <- measmeasuredValue } } } }(s) // start ticker for each sensor go func(sensor sensor.Sensor) { for { select { case <-ctx.Done(): errorChannel <- fmt.Errorf("Closed context: %v", ctx.Err().Error()) return case <-sensor.GetTicker().C: semaphoreChannels[sensor.GetID()] <- struct{}{} } } }(s) } // Distributor //measuredValueChannels := distribute.MeasuredValues(ctx, 5, measuredValueChannel) for { select { case debug, _ := <-debugChannel: flogger.Debug("%v", debug) case info, _ := <-infoChannel: flogger.Info("%v", info) case warn, _ := <-warnChannel: flogger.Warn("%v", warn) case err, _ := <-errorChannel: flogger.Error("%v", err) case fatal, _ := <-fatalChannel: flogger.Fatal("Received a fatal error: %v", fatal) case interrupt := <-interruptChannel: flogger.Info("Received OS Signal: %v", interrupt) flogger.Info("Close context") cancel() flogger.Info("Close channels") close(debugChannel) close(infoChannel) close(warnChannel) close(errorChannel) close(interruptChannel) return case <-ticker: if round != 0 { storage.Round(measuredValuesCache, round) } if compression { measuredValuesCache = storage.Compression(measuredValuesCache) } // if err := logfile.Append(measuredValuesLogfile, measuredValuesCache); err != nil { // flogger.Error("Can not save caches measured values in logfile: %v", err) // } measuredValuesCache = make([]*types.MeasuredValue, 0) case measuredValue, open := <-measuredValueChannel: if !open { errorChannel <- fmt.Errorf("MeasuredValue channel closed") cancel() } measuredValuesCache = append(measuredValuesCache, measuredValue) } } } func checkDeviceInDatabase(ctx context.Context, device *types.Device, database db.Database) { _, err := database.SelectDeviceByID(ctx, device.ID) if err != nil { flogger.Debug("It's seems the current device is not registered in the database. Register the device now") err2 := database.InsertDevices(ctx, []*types.Device{device}) if err2 != nil { flogger.Fatal("Can not register device into database: %v", err2) } flogger.Debug("Device successfully registered into the database") return } flogger.Debug("Device already registered into the database") } func checkSensorsInDatabase(ctx context.Context, sensors []*types.Sensor, database db.Database) { for _, sensor := range sensors { _, err := database.SelectSensorByID(ctx, sensor.ID) if err != nil { flogger.Debug("It's seems the sensor %v is not registered in the database. Register the sensor now", sensor.Name) err2 := database.InsertSensors(ctx, []*types.Sensor{sensor}) if err2 != nil { flogger.Fatal("Can not register sensor %v into database: %v", sensor.Name, err2) } flogger.Debug("Sensor %v successfully registered into the database", sensor.Name) continue } flogger.Debug("Sensor %v is already registered into the database", sensor.Name) } }