fix: implement repository pkg
This commit is contained in:
@ -2,13 +2,14 @@ package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/url"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"github.com/volker-raschek/flucky/pkg/config"
|
||||
"github.com/volker-raschek/flucky/pkg/repository/db"
|
||||
"github.com/volker-raschek/flucky/pkg/sensor"
|
||||
"github.com/volker-raschek/flucky/pkg/storage"
|
||||
"github.com/volker-raschek/flucky/pkg/types"
|
||||
"github.com/volker-raschek/go-logger/pkg/logger"
|
||||
)
|
||||
@ -31,7 +32,12 @@ func Start(cnf *config.Config, flogger logger.Logger) error {
|
||||
measuredValueChannel := make(chan *types.MeasuredValue, 0)
|
||||
|
||||
// load storage endpoint
|
||||
storageEndpoint, err := storage.New(cnf.StorageEndpoint, flogger)
|
||||
storageEndpointURL, err := url.Parse(cnf.StorageEndpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
backend, err := db.New(storageEndpointURL, flogger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -43,18 +49,18 @@ func Start(cnf *config.Config, flogger logger.Logger) error {
|
||||
parentCtx := context.Background()
|
||||
|
||||
// Insert device if not exist
|
||||
device, _ := storageEndpoint.SelectDevice(parentCtx, cnf.Device.ID)
|
||||
device, _ := backend.SelectDevice(parentCtx, cnf.Device.ID)
|
||||
if device == nil {
|
||||
if err := storageEndpoint.InsertDevice(parentCtx, cnf.Device); err != nil {
|
||||
if err := backend.InsertDevices(parentCtx, cnf.Device); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Insert sensors if not exist
|
||||
for _, cnfSensor := range cnf.Sensors {
|
||||
sensor, _ := storageEndpoint.SelectSensor(parentCtx, cnfSensor.ID)
|
||||
sensor, _ := backend.SelectSensor(parentCtx, cnfSensor.ID)
|
||||
if sensor == nil {
|
||||
if err := storageEndpoint.InsertSensor(parentCtx, cnfSensor); err != nil {
|
||||
if err := backend.InsertSensors(parentCtx, cnfSensor); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -91,7 +97,7 @@ func Start(cnf *config.Config, flogger logger.Logger) error {
|
||||
|
||||
if cap(measuredValues) == len(measuredValues) {
|
||||
flogger.Debug("Flush cache")
|
||||
err := storageEndpoint.InsertMeasuredValues(ctx, measuredValues)
|
||||
err := backend.InsertMeasuredValues(ctx, measuredValues...)
|
||||
if err != nil {
|
||||
flogger.Error("%v", err)
|
||||
}
|
||||
@ -102,7 +108,7 @@ func Start(cnf *config.Config, flogger logger.Logger) error {
|
||||
cancel()
|
||||
close(measuredValueChannel)
|
||||
|
||||
err := storageEndpoint.InsertMeasuredValues(ctx, measuredValues)
|
||||
err := backend.InsertMeasuredValues(ctx, measuredValues...)
|
||||
if err != nil {
|
||||
flogger.Error("%v", err)
|
||||
}
|
||||
@ -110,6 +116,4 @@ func Start(cnf *config.Config, flogger logger.Logger) error {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user