markus
522fe2746a
Add additional functions to the repository to add or update devices, sensors or measured values. Furthermore the test has been adapt to the new functions.
339 lines
10 KiB
Go
339 lines
10 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/url"
|
|
"strings"
|
|
|
|
"git.cryptic.systems/volker.raschek/flucky/pkg/repository/db"
|
|
"git.cryptic.systems/volker.raschek/flucky/pkg/types"
|
|
"git.cryptic.systems/volker.raschek/go-logger"
|
|
)
|
|
|
|
// Repository represent a repository where all devices, sensors and measured
|
|
// values are stored.
|
|
type Repository struct {
|
|
database db.Database
|
|
}
|
|
|
|
// AddDevices to the repository
|
|
func (repo *Repository) AddDevices(devices ...*types.Device) error {
|
|
return repo.database.InsertDevices(context.Background(), devices...)
|
|
}
|
|
|
|
// AddMeasuredValues to the repository
|
|
func (repo *Repository) AddMeasuredValues(measuredValues ...*types.MeasuredValue) error {
|
|
return repo.database.InsertMeasuredValues(context.Background(), measuredValues...)
|
|
}
|
|
|
|
// AddOrUpdateDevices to the repository
|
|
func (repo *Repository) AddOrUpdateDevices(devices ...*types.Device) error {
|
|
return repo.database.InsertOrUpdateDevices(context.Background(), devices...)
|
|
}
|
|
|
|
// AddOrUpdateMeasuredValues to the repository
|
|
func (repo *Repository) AddOrUpdateMeasuredValues(measuredValues ...*types.MeasuredValue) error {
|
|
return repo.database.InsertOrUpdateMeasuredValues(context.Background(), measuredValues...)
|
|
}
|
|
|
|
// AddOrUpdateSensors to the repository
|
|
func (repo *Repository) AddOrUpdateSensors(sensors ...*types.Sensor) error {
|
|
return repo.database.InsertOrUpdateSensors(context.Background(), sensors...)
|
|
}
|
|
|
|
// AddSensors to the repository
|
|
func (repo *Repository) AddSensors(sensors ...*types.Sensor) error {
|
|
return repo.database.InsertSensors(context.Background(), sensors...)
|
|
}
|
|
|
|
// Close closes the repository and prevents new queries from starting. Close
|
|
// then waits for all queries that have started processing on the server to
|
|
// finish.
|
|
func (repo *Repository) Close() error {
|
|
return repo.database.Close()
|
|
}
|
|
|
|
// DisableSensorsByNames disable all sensors which match bei their name
|
|
func (repo *Repository) DisableSensorsByNames(sensorNames ...string) error {
|
|
sensors, err := repo.GetSensors()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
matchedSensors := make([]*types.Sensor, 0)
|
|
for _, sensor := range sensors {
|
|
for _, sensorName := range sensorNames {
|
|
if strings.Compare(sensor.Name, sensorName) == 0 {
|
|
sensor.Enabled = false
|
|
matchedSensors = append(matchedSensors, sensor)
|
|
}
|
|
}
|
|
}
|
|
|
|
return repo.UpdateSensors(matchedSensors...)
|
|
}
|
|
|
|
// EnableSensorsByNames enable all sensors which match bei their name
|
|
func (repo *Repository) EnableSensorsByNames(sensorNames ...string) error {
|
|
sensors, err := repo.GetSensors()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
matchedSensors := make([]*types.Sensor, 0)
|
|
for _, sensor := range sensors {
|
|
for _, sensorName := range sensorNames {
|
|
if strings.Compare(sensor.Name, sensorName) == 0 {
|
|
sensor.Enabled = true
|
|
matchedSensors = append(matchedSensors, sensor)
|
|
}
|
|
}
|
|
}
|
|
|
|
return repo.UpdateSensors(matchedSensors...)
|
|
}
|
|
|
|
// GetDevice returns a device by his id. If no device has been found, the
|
|
// function returns nil.
|
|
func (repo *Repository) GetDevice(deviceID string) (*types.Device, error) {
|
|
return repo.database.SelectDevice(context.Background(), deviceID)
|
|
}
|
|
|
|
// GetDevices returns all devices. If no devices has been found, the function
|
|
// returns nil.
|
|
func (repo *Repository) GetDevices() ([]*types.Device, error) {
|
|
return repo.database.SelectDevices(context.Background())
|
|
}
|
|
|
|
// GetHumidity returns a humidity value by id
|
|
func (repo *Repository) GetHumidity(humidityID string) (*types.MeasuredValue, error) {
|
|
return repo.database.SelectHumidity(context.Background(), humidityID)
|
|
}
|
|
|
|
// GetPressure returns a pressure value by id
|
|
func (repo *Repository) GetPressure(pressureID string) (*types.MeasuredValue, error) {
|
|
return repo.database.SelectPressure(context.Background(), pressureID)
|
|
}
|
|
|
|
// GetSensor returns a sensor by his id. If no sensor has been found, the
|
|
// function returns nil.
|
|
func (repo *Repository) GetSensor(sensorID string) (*types.Sensor, error) {
|
|
return repo.database.SelectSensor(context.Background(), sensorID)
|
|
}
|
|
|
|
// GetSensors returns all sensors. If no sensors has been found, the function
|
|
// returns nil.
|
|
func (repo *Repository) GetSensors(models ...string) ([]*types.Sensor, error) {
|
|
sensors, err := repo.database.SelectSensors(context.Background())
|
|
switch {
|
|
case err != nil:
|
|
return nil, err
|
|
case len(models) > 0:
|
|
cachedSensors := make([]*types.Sensor, 0)
|
|
LOOP:
|
|
for i := range sensors {
|
|
for j := range models {
|
|
if strings.ToLower(sensors[i].Model) == strings.ToLower(models[j]) {
|
|
cachedSensors = append(cachedSensors, sensors[i])
|
|
continue LOOP
|
|
}
|
|
}
|
|
}
|
|
return cachedSensors, nil
|
|
case len(models) <= 0:
|
|
fallthrough
|
|
default:
|
|
return sensors, err
|
|
}
|
|
}
|
|
|
|
// GetSensorsByDeviceID returns all sensors by a device id. If no sensor has
|
|
// been found by the id, the function returns nil.
|
|
func (repo *Repository) GetSensorsByDeviceID(deviceID string) ([]*types.Sensor, error) {
|
|
cachedSensors := make([]*types.Sensor, 0)
|
|
sensors, err := repo.GetSensors()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, sensor := range sensors {
|
|
if strings.Compare(sensor.DeviceID, deviceID) == 0 {
|
|
cachedSensors = append(cachedSensors, sensor)
|
|
}
|
|
}
|
|
|
|
return cachedSensors, nil
|
|
}
|
|
|
|
// GetTemperature returns a temperature value by id
|
|
func (repo *Repository) GetTemperature(pressureID string) (*types.MeasuredValue, error) {
|
|
return repo.database.SelectTemperature(context.Background(), pressureID)
|
|
}
|
|
|
|
// RemoveDevices removes devices by their ids from the repository. Additional
|
|
// all sensors and measured values, which are in relation with the device
|
|
// respectively the sensors will also be deleted.
|
|
func (repo *Repository) RemoveDevices(deviceIDs ...string) error {
|
|
return repo.database.DeleteDevices(context.Background(), deviceIDs...)
|
|
}
|
|
|
|
// RenameSensors all sensors which match by their current name to the new name
|
|
func (repo *Repository) RenameSensors(oldName string, newName string) error {
|
|
sensors, err := repo.GetSensors()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
matchedSensors := make([]*types.Sensor, 0)
|
|
for _, sensor := range sensors {
|
|
if strings.Compare(sensor.Name, oldName) == 0 {
|
|
sensor.Name = newName
|
|
matchedSensors = append(matchedSensors, sensor)
|
|
}
|
|
}
|
|
|
|
return repo.UpdateSensors(matchedSensors...)
|
|
}
|
|
|
|
// RemoveSensors removes sensors by their ids from the repository. Additional
|
|
// all measured values, which are in relation with the sensor will also be
|
|
// deleted.
|
|
func (repo *Repository) RemoveSensors(sensorIDs ...string) error {
|
|
return repo.database.DeleteSensors(context.Background(), sensorIDs...)
|
|
}
|
|
|
|
// RemoveSensorsByNames removes all sensors which match bei their name
|
|
func (repo *Repository) RemoveSensorsByNames(sensorNames ...string) error {
|
|
sensors, err := repo.GetSensors()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
matchedSensorIDs := make([]string, 0)
|
|
for _, sensor := range sensors {
|
|
for _, sensorName := range sensorNames {
|
|
if strings.Compare(sensor.Name, sensorName) == 0 {
|
|
matchedSensorIDs = append(matchedSensorIDs, sensor.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
return repo.RemoveSensors(matchedSensorIDs...)
|
|
}
|
|
|
|
// UpdateDevices update devices which are stored into the repository by their
|
|
// id. The id of the device can not be updated.
|
|
func (repo *Repository) UpdateDevices(devices ...*types.Device) error {
|
|
return repo.database.UpdateDevices(context.Background(), devices...)
|
|
}
|
|
|
|
// UpdateSensors update sensors which are stored into the repository by their
|
|
// id. The id of the sensor can not be updated.
|
|
func (repo *Repository) UpdateSensors(sensors ...*types.Sensor) error {
|
|
return repo.database.UpdateSensors(context.Background(), sensors...)
|
|
}
|
|
|
|
// New returns a new repository based on the data source name (dsn)
|
|
func New(dsnURL *url.URL, flogger logger.Logger) (*Repository, error) {
|
|
database, err := db.New(dsnURL, flogger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Repository{
|
|
database: database,
|
|
}, nil
|
|
}
|
|
|
|
type OptImport struct {
|
|
Sensors bool
|
|
Humidities bool
|
|
Pressures bool
|
|
Temperatures bool
|
|
}
|
|
|
|
func Import(sourceDSNURL *url.URL, destDNSURL *url.URL, flogger logger.Logger, optImport OptImport) error {
|
|
|
|
importMap := map[string]bool{
|
|
"humidities": optImport.Humidities,
|
|
"pressures": optImport.Pressures,
|
|
"temperatures": optImport.Temperatures,
|
|
}
|
|
|
|
// enable sensors if one measured value is enabled
|
|
if !optImport.Sensors {
|
|
for key, value := range importMap {
|
|
if value {
|
|
flogger.Info("Enable import option sensors. It's required as foreign key for %v", key)
|
|
optImport.Sensors = true
|
|
}
|
|
}
|
|
}
|
|
|
|
sourceRepo, err := New(sourceDSNURL, flogger)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to open the source repo: %w", err)
|
|
}
|
|
defer sourceRepo.Close()
|
|
|
|
destRepo, err := New(destDNSURL, flogger)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to open the destination repo: %w", err)
|
|
}
|
|
defer destRepo.Close()
|
|
|
|
ctx := context.Background()
|
|
devices, err := sourceRepo.database.SelectDevices(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to fetch devices from source repo: %w", err)
|
|
}
|
|
|
|
flogger.Debug("Found %v devices", len(devices))
|
|
|
|
for i := range devices {
|
|
err := destRepo.AddDevices(devices[i])
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to add device %v into dest repo: %w", devices[i].Name, err)
|
|
}
|
|
}
|
|
|
|
if optImport.Sensors {
|
|
sensors, err := sourceRepo.database.SelectSensors(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to fetch sensors from source repo: %w", err)
|
|
}
|
|
|
|
flogger.Debug("Found %v sensors", len(sensors))
|
|
|
|
for i := range sensors {
|
|
err := destRepo.AddSensors(sensors[i])
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to add sensor %v into dest repo: %w", sensors[i].Name, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
for key, f := range map[string]func(context.Context) ([]*types.MeasuredValue, error){
|
|
"humidities": sourceRepo.database.SelectHumidities,
|
|
"pressures": sourceRepo.database.SelectPressures,
|
|
"temperatures": sourceRepo.database.SelectTemperatures,
|
|
} {
|
|
if importMap[key] {
|
|
measuredValues, err := f(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to select %v from source repo: %w", key, err)
|
|
}
|
|
|
|
flogger.Debug("Found %v %v values", len(measuredValues), key)
|
|
|
|
err = destRepo.AddMeasuredValues(measuredValues...)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to add %v into dest repo: %w", key, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|