PKGBUILD/pkg/daemon/daemon.go
Markus Pesch 10069568f9
fix: renamed storage endpoint into dsn
Changes:
- Renamed storage endpoint into dsn (data source name).
- Add additional dsn fallback property. This dsn will be used in futes
  to store informations, if the main dsn backend does not work
  correctly. For example, if no connection can be established over the
  network to a database.
2020-06-01 12:41:48 +02:00

106 lines
2.2 KiB
Go

package daemon
import (
"context"
"net/url"
"os"
"os/signal"
"syscall"
"github.com/volker-raschek/flucky/pkg/config"
"github.com/volker-raschek/flucky/pkg/repository"
"github.com/volker-raschek/flucky/pkg/sensor"
"github.com/volker-raschek/flucky/pkg/types"
"github.com/volker-raschek/go-logger/pkg/logger"
)
func Start(cnf *config.Config, flogger logger.Logger) error {
measuredValueChannel := make(chan *types.MeasuredValue, 0)
// load data source name (dsn)
dsnURL, err := url.Parse(cnf.DSN)
if err != nil {
return err
}
repo, err := repository.New(dsnURL, flogger)
if err != nil {
return err
}
repoSensors, err := repo.GetSensors()
if err != nil {
return err
}
sensors := make([]sensor.Sensor, 0)
for _, repoSensor := range repoSensors {
if !repoSensor.Enabled {
continue
}
sensor, err := sensor.New(repoSensor)
if err != nil {
return err
}
sensors = append(sensors, sensor)
}
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, os.Kill, syscall.SIGTERM)
// Collection
parentCtx := context.Background()
ctx, cancel := context.WithCancel(parentCtx)
for _, s := range sensors {
go func(sensor sensor.Sensor) {
for {
select {
case <-ctx.Done():
return
case <-sensor.GetTicker().C:
measuredValues, err := sensor.Read()
if err != nil {
flogger.Error("%v", err)
continue
}
for _, measuredValue := range measuredValues {
measuredValueChannel <- measuredValue
}
}
}
}(s)
}
measuredValues := make([]*types.MeasuredValue, 0, 10)
for {
select {
case measuredValue := <-measuredValueChannel:
flogger.Debug("%v\t%v\t%v", measuredValue.ID, measuredValue.ValueType, measuredValue.Value)
measuredValues = append(measuredValues, measuredValue)
if cap(measuredValues) == len(measuredValues) {
flogger.Debug("Flush cache")
err := repo.AddMeasuredValues(measuredValues...)
if err != nil {
flogger.Error("%v", err)
}
measuredValues = make([]*types.MeasuredValue, 0, 10)
}
case <-interruptChannel:
cancel()
close(measuredValueChannel)
err := repo.AddMeasuredValues(measuredValues...)
if err != nil {
flogger.Error("%v", err)
}
break
}
}
}