Markus Pesch
10069568f9
Changes: - Renamed storage endpoint into dsn (data source name). - Add additional dsn fallback property. This dsn will be used in futes to store informations, if the main dsn backend does not work correctly. For example, if no connection can be established over the network to a database.
106 lines
2.2 KiB
Go
106 lines
2.2 KiB
Go
package daemon
|
|
|
|
import (
|
|
"context"
|
|
"net/url"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
|
|
"github.com/volker-raschek/flucky/pkg/config"
|
|
"github.com/volker-raschek/flucky/pkg/repository"
|
|
"github.com/volker-raschek/flucky/pkg/sensor"
|
|
"github.com/volker-raschek/flucky/pkg/types"
|
|
"github.com/volker-raschek/go-logger/pkg/logger"
|
|
)
|
|
|
|
func Start(cnf *config.Config, flogger logger.Logger) error {
|
|
|
|
measuredValueChannel := make(chan *types.MeasuredValue, 0)
|
|
|
|
// load data source name (dsn)
|
|
dsnURL, err := url.Parse(cnf.DSN)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
repo, err := repository.New(dsnURL, flogger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
repoSensors, err := repo.GetSensors()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
sensors := make([]sensor.Sensor, 0)
|
|
for _, repoSensor := range repoSensors {
|
|
if !repoSensor.Enabled {
|
|
continue
|
|
}
|
|
|
|
sensor, err := sensor.New(repoSensor)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sensors = append(sensors, sensor)
|
|
}
|
|
|
|
interruptChannel := make(chan os.Signal, 1)
|
|
signal.Notify(interruptChannel, os.Kill, syscall.SIGTERM)
|
|
|
|
// Collection
|
|
parentCtx := context.Background()
|
|
ctx, cancel := context.WithCancel(parentCtx)
|
|
|
|
for _, s := range sensors {
|
|
go func(sensor sensor.Sensor) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-sensor.GetTicker().C:
|
|
measuredValues, err := sensor.Read()
|
|
if err != nil {
|
|
flogger.Error("%v", err)
|
|
continue
|
|
}
|
|
for _, measuredValue := range measuredValues {
|
|
measuredValueChannel <- measuredValue
|
|
}
|
|
}
|
|
}
|
|
}(s)
|
|
}
|
|
|
|
measuredValues := make([]*types.MeasuredValue, 0, 10)
|
|
for {
|
|
select {
|
|
case measuredValue := <-measuredValueChannel:
|
|
flogger.Debug("%v\t%v\t%v", measuredValue.ID, measuredValue.ValueType, measuredValue.Value)
|
|
measuredValues = append(measuredValues, measuredValue)
|
|
|
|
if cap(measuredValues) == len(measuredValues) {
|
|
flogger.Debug("Flush cache")
|
|
err := repo.AddMeasuredValues(measuredValues...)
|
|
if err != nil {
|
|
flogger.Error("%v", err)
|
|
}
|
|
measuredValues = make([]*types.MeasuredValue, 0, 10)
|
|
}
|
|
|
|
case <-interruptChannel:
|
|
cancel()
|
|
close(measuredValueChannel)
|
|
|
|
err := repo.AddMeasuredValues(measuredValues...)
|
|
if err != nil {
|
|
flogger.Error("%v", err)
|
|
}
|
|
|
|
break
|
|
}
|
|
}
|
|
}
|