fix(pkg/daemon): write values into logfile

This commit is contained in:
Markus Pesch 2020-01-11 17:24:18 +01:00
parent 002f3e9e25
commit 12246aae0c
Signed by: volker.raschek
GPG Key ID: 852BCC170D81A982
3 changed files with 172 additions and 92 deletions

View File

@ -2,9 +2,7 @@ package daemon
import ( import (
"log" "log"
"time"
"github.com/Masterminds/semver"
"github.com/go-flucky/flucky/pkg/config" "github.com/go-flucky/flucky/pkg/config"
"github.com/go-flucky/flucky/pkg/daemon" "github.com/go-flucky/flucky/pkg/daemon"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@ -12,13 +10,11 @@ import (
) )
var ( var (
cleanCacheInterval string cachedMeasuredValues uint
compression bool compression bool
configFile *string configFile *string
round float64 round float64
temperatureUnit string temperatureUnit string
version *semver.Version
) )
var daemonCmd = &cobra.Command{ var daemonCmd = &cobra.Command{
@ -31,24 +27,18 @@ var daemonCmd = &cobra.Command{
log.Fatalln(err) log.Fatalln(err)
} }
duration, err := time.ParseDuration(cleanCacheInterval)
if err != nil {
log.Fatalf("Can not parse clean cache interval into duration time: %v", err)
}
logger := logger.NewDefaultLogger(logger.LogLevelDebug) logger := logger.NewDefaultLogger(logger.LogLevelDebug)
daemon.SetLogger(logger) daemon.SetLogger(logger)
daemon.Start(cnf, duration, compression, round, version) daemon.Start(cnf, cachedMeasuredValues, compression, round)
}, },
} }
func InitCmd(cmd *cobra.Command, cnfFile *string, sversion *semver.Version) { func InitCmd(cmd *cobra.Command, cnfFile *string) {
configFile = cnfFile configFile = cnfFile
version = sversion
cmd.AddCommand(daemonCmd) cmd.AddCommand(daemonCmd)
daemonCmd.Flags().BoolVar(&compression, "compression", true, "Compress measured values") daemonCmd.Flags().BoolVar(&compression, "compression", true, "Compress measured values")
daemonCmd.Flags().StringVar(&cleanCacheInterval, "clean-cache-interval", "5m", "Minute intervall to clean cache and write measured values into logfile") daemonCmd.Flags().UintVar(&cachedMeasuredValues, "cached-values", 500, "Number of cached values before saveing into the storage endpoint")
daemonCmd.Flags().Float64Var(&round, "round", 0.5, "Round values. The value 0 deactivates the function") daemonCmd.Flags().Float64Var(&round, "round", 0.5, "Round values. The value 0 deactivates the function")
} }

93
pkg/daemon/cache.go Normal file
View File

@ -0,0 +1,93 @@
package daemon
import (
"fmt"
"net/url"
"sync"
"github.com/go-flucky/flucky/pkg/storage"
"github.com/go-flucky/flucky/pkg/storage/logfile"
"github.com/go-flucky/flucky/pkg/types"
)
type cacheStore struct {
compression bool
cache []*types.MeasuredValue
mux *sync.Mutex
round float64
URL *url.URL
}
func (cs *cacheStore) Add(measuredValue *types.MeasuredValue) {
cs.mux.Lock()
defer cs.mux.Unlock()
cs.cache = append(cs.cache, measuredValue)
}
func (cs *cacheStore) Flush(measuredValue *types.MeasuredValue) *cacheStore {
cs.mux.Lock()
defer cs.mux.Unlock()
cs.cache = make([]*types.MeasuredValue, 0)
return cs
}
func (cs *cacheStore) Get(id string) *types.MeasuredValue {
cs.mux.Lock()
defer cs.mux.Unlock()
for _, measuredValue := range cs.cache {
if measuredValue.ID == id {
return measuredValue
}
}
return nil
}
func (cs *cacheStore) Size() int {
cs.mux.Lock()
defer cs.mux.Unlock()
return len(cs.cache)
}
func (cs *cacheStore) WriteToEndpoint() error {
cs.mux.Lock()
defer cs.mux.Unlock()
defer func() { cs.cache = make([]*types.MeasuredValue, 0) }()
switch cs.URL.Scheme {
case "file":
return cs.logfile()
case "postgres":
return fmt.Errorf("Not yet implemented to store values into a postgres database")
}
return nil
}
func (cs *cacheStore) logfile() error {
newMeasuredValues := make([]*types.MeasuredValue, 0)
for _, measuredValue := range cs.cache {
newMeasuredValues = append(newMeasuredValues, measuredValue)
}
if cs.round != 0 {
storage.Round(newMeasuredValues, cs.round)
}
measuredLogfile := logfile.New(cs.URL.Path)
measuredValues, err := measuredLogfile.Read()
if err != nil {
return err
}
measuredValues = append(measuredValues, newMeasuredValues...)
if cs.compression {
measuredValues = storage.Compression(measuredValues)
}
err = measuredLogfile.Write(measuredValues)
if err != nil {
return err
}
return nil
}

View File

@ -5,14 +5,11 @@ import (
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
"sync"
"syscall" "syscall"
"time"
"github.com/Masterminds/semver"
"github.com/go-flucky/flucky/pkg/config" "github.com/go-flucky/flucky/pkg/config"
"github.com/go-flucky/flucky/pkg/sensor" "github.com/go-flucky/flucky/pkg/sensor"
"github.com/go-flucky/flucky/pkg/storage"
"github.com/go-flucky/flucky/pkg/storage/db"
"github.com/go-flucky/flucky/pkg/types" "github.com/go-flucky/flucky/pkg/types"
"github.com/volker-raschek/go-logger/pkg/logger" "github.com/volker-raschek/go-logger/pkg/logger"
) )
@ -32,15 +29,25 @@ func SetLogger(logger logger.Logger) {
} }
// Start the daemon // Start the daemon
func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compression bool, round float64, version *semver.Version) { func Start(cnf *config.Configuration, cacheSize uint, compression bool, round float64) error {
storageEndpointURL, err := cnf.GetStorageEndpointURL()
if err != nil {
return err
}
cache := &cacheStore{
compression: compression,
cache: make([]*types.MeasuredValue, 0),
mux: new(sync.Mutex),
round: round,
URL: storageEndpointURL,
}
// Context // Context
parentCtx := context.Background() parentCtx := context.Background()
ctx, cancel := context.WithCancel(parentCtx) ctx, cancel := context.WithCancel(parentCtx)
// Ticker
// saveTicker := time.Tick(cleanCacheInterval)
// channels // channels
debugChannel := make(chan string, 0) debugChannel := make(chan string, 0)
infoChannel := make(chan string, 0) infoChannel := make(chan string, 0)
@ -52,14 +59,9 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress
measuredValueChannel := make(chan *types.MeasuredValue, 0) measuredValueChannel := make(chan *types.MeasuredValue, 0)
// Info // Info
flogger.Info("Use clean-cache-interval: %v", cleanCacheInterval.String())
flogger.Info("Use compression: %v", compression) flogger.Info("Use compression: %v", compression)
flogger.Info("Round: %v", round) flogger.Info("Round: %v", round)
ticker := time.Tick(cleanCacheInterval)
measuredValuesCache := make([]*types.MeasuredValue, 0)
// measuredValuesLogfile := logfile.New(cnf.Logfile)
// Init semaphoreChannel // Init semaphoreChannel
semaphoreChannels := make(map[string]chan struct{}) semaphoreChannels := make(map[string]chan struct{})
for _, sensor := range cnf.GetSensors(config.ENABLED) { for _, sensor := range cnf.GetSensors(config.ENABLED) {
@ -83,9 +85,8 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress
errorChannel <- err errorChannel <- err
return return
} }
for _, measmeasuredValue := range measuredValues { for _, measuredValue := range measuredValues {
debugChannel <- fmt.Sprintf("%v: %v: %v", sensor.GetID(), measmeasuredValue.ValueType, measmeasuredValue.Value) measuredValueChannel <- measuredValue
measuredValueChannel <- measmeasuredValue
} }
} }
} }
@ -105,8 +106,26 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress
}(s) }(s)
} }
// Distributor go func() {
//measuredValueChannels := distribute.MeasuredValues(ctx, 5, measuredValueChannel) for {
select {
case <-ctx.Done():
debugChannel <- fmt.Sprintf("Stop consumer of measured values: Closed context: %v", ctx.Err().Error())
return
case measuredValue := <-measuredValueChannel:
cache.Add(measuredValue)
debugChannel <- fmt.Sprintf("CacheStore ID %v: %v - %v - %v", cache.Size(), measuredValue.SensorID, measuredValue.ValueType, measuredValue.Value)
if cache.Size() >= int(cacheSize) {
debugChannel <- fmt.Sprint("Write cache into storage endpoint")
err := cache.WriteToEndpoint()
if err != nil {
errorChannel <- err
}
}
}
}
}()
for { for {
select { select {
@ -120,70 +139,48 @@ func Start(cnf *config.Configuration, cleanCacheInterval time.Duration, compress
flogger.Error("%v", err) flogger.Error("%v", err)
case fatal, _ := <-fatalChannel: case fatal, _ := <-fatalChannel:
flogger.Fatal("Received a fatal error: %v", fatal) flogger.Fatal("Received a fatal error: %v", fatal)
case interrupt := <-interruptChannel: case <-interruptChannel:
flogger.Info("Received OS Signal: %v", interrupt) flogger.Debug("Write %v cached values into storage endpoint", cache.Size())
flogger.Info("Close context") cache.WriteToEndpoint()
flogger.Debug("Close context")
cancel() cancel()
flogger.Info("Close channels") flogger.Debug("Close channels")
close(debugChannel) close(debugChannel)
close(infoChannel) close(infoChannel)
close(warnChannel) close(warnChannel)
close(errorChannel) close(errorChannel)
close(interruptChannel) close(interruptChannel)
return return nil
case <-ticker:
if round != 0 {
storage.Round(measuredValuesCache, round)
}
if compression {
measuredValuesCache = storage.Compression(measuredValuesCache)
}
// if err := logfile.Append(measuredValuesLogfile, measuredValuesCache); err != nil {
// flogger.Error("Can not save caches measured values in logfile: %v", err)
// }
measuredValuesCache = make([]*types.MeasuredValue, 0)
case measuredValue, open := <-measuredValueChannel:
if !open {
errorChannel <- fmt.Errorf("MeasuredValue channel closed")
cancel()
}
measuredValuesCache = append(measuredValuesCache, measuredValue)
} }
} }
} }
func checkDeviceInDatabase(ctx context.Context, device *types.Device, database db.Database) { // func checkDeviceInDatabase(ctx context.Context, device *types.Device, database db.Database) {
_, err := database.SelectDeviceByID(ctx, device.ID) // _, err := database.SelectDeviceByID(ctx, device.ID)
if err != nil { // if err != nil {
flogger.Debug("It's seems the current device is not registered in the database. Register the device now") // flogger.Debug("It's seems the current device is not registered in the database. Register the device now")
err2 := database.InsertDevices(ctx, []*types.Device{device}) // err2 := database.InsertDevices(ctx, []*types.Device{device})
if err2 != nil { // if err2 != nil {
flogger.Fatal("Can not register device into database: %v", err2) // flogger.Fatal("Can not register device into database: %v", err2)
} // }
flogger.Debug("Device successfully registered into the database") // flogger.Debug("Device successfully registered into the database")
return // return
} // }
flogger.Debug("Device already registered into the database") // flogger.Debug("Device already registered into the database")
} // }
func checkSensorsInDatabase(ctx context.Context, sensors []*types.Sensor, database db.Database) { // func checkSensorsInDatabase(ctx context.Context, sensors []*types.Sensor, database db.Database) {
for _, sensor := range sensors { // for _, sensor := range sensors {
_, err := database.SelectSensorByID(ctx, sensor.ID) // _, err := database.SelectSensorByID(ctx, sensor.ID)
if err != nil { // if err != nil {
flogger.Debug("It's seems the sensor %v is not registered in the database. Register the sensor now", sensor.Name) // flogger.Debug("It's seems the sensor %v is not registered in the database. Register the sensor now", sensor.Name)
err2 := database.InsertSensors(ctx, []*types.Sensor{sensor}) // err2 := database.InsertSensors(ctx, []*types.Sensor{sensor})
if err2 != nil { // if err2 != nil {
flogger.Fatal("Can not register sensor %v into database: %v", sensor.Name, err2) // flogger.Fatal("Can not register sensor %v into database: %v", sensor.Name, err2)
} // }
flogger.Debug("Sensor %v successfully registered into the database", sensor.Name) // flogger.Debug("Sensor %v successfully registered into the database", sensor.Name)
continue // continue
} // }
flogger.Debug("Sensor %v is already registered into the database", sensor.Name) // flogger.Debug("Sensor %v is already registered into the database", sensor.Name)
} // }
} // }