PKGBUILD/pkg/daemon/daemon.go

144 lines
3.1 KiB
Go

package daemon
import (
"context"
"fmt"
"net/url"
"os"
"os/signal"
"syscall"
"git.cryptic.systems/volker.raschek/flucky/pkg/config"
"git.cryptic.systems/volker.raschek/flucky/pkg/repository"
"git.cryptic.systems/volker.raschek/flucky/pkg/sensor"
"git.cryptic.systems/volker.raschek/flucky/pkg/types"
"git.cryptic.systems/volker.raschek/go-logger"
)
func Start(cnf *config.Config, cachedEntries uint, flogger logger.Logger) error {
// load data source name (dsn)
dsnURL, err := url.Parse(cnf.DSN)
if err != nil {
return err
}
repo, err := repository.New(dsnURL, flogger)
if err != nil {
return err
}
ctx := context.Background()
// Add
repoDevice, err := repo.GetDeviceByID(ctx, cnf.DeviceID)
switch {
case err != nil:
return err
case repoDevice == nil:
hostname, err := os.Hostname()
if err != nil {
return err
}
err = repo.AddDevices(ctx, &types.Device{
ID: cnf.DeviceID,
Name: hostname,
})
if err != nil {
return err
}
repoDevice, err = repo.GetDeviceByID(ctx, cnf.DeviceID)
if err != nil {
return err
}
}
repoSensors, err := repo.GetSensorsByDeviceIDs(ctx, repoDevice.ID)
switch {
case err != nil:
return err
case repoSensors == nil, len(repoSensors) <= 0:
return fmt.Errorf("No sensors found")
}
sensors := make([]sensor.Sensor, 0)
for _, repoSensor := range repoSensors {
if !repoSensor.Enabled || repoSensor.DeviceID != repoDevice.ID {
continue
}
flogger.Debug("Found sensor %v", repoSensor.Name)
sensor, err := sensor.New(repoSensor)
if err != nil {
return err
}
sensors = append(sensors, sensor)
}
interruptChannel := make(chan os.Signal, 1)
signal.Notify(interruptChannel, syscall.SIGTERM)
// Collection
parentCtx := context.Background()
ctx, cancel := context.WithCancel(parentCtx)
measuredValueChannel, errorChannel := sensor.ReadTickingPipeline(ctx, sensors...)
go func() {
for {
select {
case <-ctx.Done():
return
case err, open := <-errorChannel:
if !open {
return
}
if err != nil {
flogger.Error("%v", err)
}
}
}
}()
measuredValues := make([]*types.MeasuredValue, 0, cachedEntries)
for {
select {
case measuredValue := <-measuredValueChannel:
flogger.Debug("%v\t%v\t%v", measuredValue.ID, measuredValue.ValueType, measuredValue.Value)
measuredValues = append(measuredValues, measuredValue)
if cap(measuredValues) == len(measuredValues) {
flogger.Debug("Flush cache with %v values", len(measuredValues))
err := repo.AddMeasuredValues(ctx, measuredValues...)
if err != nil {
flogger.Error("%v", err)
}
measuredValues = make([]*types.MeasuredValue, 0, cachedEntries)
}
case signal := <-interruptChannel:
cancel()
flogger.Info("Stopping daemon: Received process signal %v", signal.String())
flogger.Debug("Flush cache with %v remaining values", len(measuredValues))
err := repo.AddMeasuredValues(ctx, measuredValues...)
if err != nil {
flogger.Error("%v", err)
}
flogger.Debug("Close repository")
err = repo.Close()
if err != nil {
flogger.Error("%v", err)
}
return nil
}
}
}