PKGBUILD/pkg/repository/repository.go

416 lines
14 KiB
Go
Raw Normal View History

2020-05-21 15:40:24 +00:00
package repository
import (
"context"
2020-11-06 22:09:26 +00:00
"fmt"
2020-05-21 15:40:24 +00:00
"net/url"
"git.cryptic.systems/volker.raschek/flucky/pkg/repository/postgres"
"git.cryptic.systems/volker.raschek/flucky/pkg/repository/sqlite3"
2020-06-10 19:13:05 +00:00
"git.cryptic.systems/volker.raschek/flucky/pkg/types"
"git.cryptic.systems/volker.raschek/go-logger"
2020-05-21 15:40:24 +00:00
)
type Repository interface {
AddDevices(ctx context.Context, devices ...*types.Device) error
AddMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error
AddOrUpdateDevices(ctx context.Context, devices ...*types.Device) error
AddOrUpdateMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error
AddOrUpdateSensors(ctx context.Context, sensors ...*types.Sensor) error
AddSensors(ctx context.Context, sensors ...*types.Sensor) error
Close() error
GetDeviceByID(ctx context.Context, deviceID string) (*types.Device, error)
GetDeviceByName(ctx context.Context, name string) (*types.Device, error)
GetDevices(ctx context.Context) ([]*types.Device, error)
GetHumidities(ctx context.Context) ([]*types.MeasuredValue, error)
GetHumidityByID(ctx context.Context, id string) (*types.MeasuredValue, error)
GetPressureByID(ctx context.Context, id string) (*types.MeasuredValue, error)
GetPressures(ctx context.Context) ([]*types.MeasuredValue, error)
GetSensorByID(ctx context.Context, sensorID string) (*types.Sensor, error)
GetSensorsByDeviceIDs(ctx context.Context, deviceIDs ...string) ([]*types.Sensor, error)
GetSensorsByModels(ctx context.Context, sensorModels ...string) ([]*types.Sensor, error)
GetSensorsByNames(ctx context.Context, sensorModels ...string) ([]*types.Sensor, error)
GetSensors(ctx context.Context) ([]*types.Sensor, error)
GetTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error)
GetTemperatures(ctx context.Context) ([]*types.MeasuredValue, error)
Migrate(ctx context.Context) error
RemoveDevicesByIDs(ctx context.Context, deviceIDs ...string) error
RemoveDevicesByNames(ctx context.Context, names ...string) error
RemoveSensorsByIDs(ctx context.Context, sensorIDs ...string) error
RemoveSensorsByNames(ctx context.Context, names ...string) error
UpdateDevices(ctx context.Context, devices ...*types.Device) error
UpdateSensors(ctx context.Context, sensors ...*types.Sensor) error
2020-05-21 15:40:24 +00:00
}
// New returns a new database backend interface
func New(databaseURL *url.URL, flogger logger.Logger) (Repository, error) {
switch databaseURL.Scheme {
case "postgres":
repo, err := postgres.New(postgres.Opts{
DatabaseURL: databaseURL,
Logger: flogger,
})
if err != nil {
return nil, err
2020-05-21 15:40:24 +00:00
}
// Initialize database scheme if not exists
err = repo.Migrate(context.Background())
if err != nil {
return nil, err
2020-11-06 22:09:26 +00:00
}
return repo, nil
2020-11-06 22:09:26 +00:00
case "sqlite3":
repo, err := sqlite3.New(sqlite3.Opts{
DatabaseURL: databaseURL,
Logger: flogger,
})
2020-11-06 22:09:26 +00:00
if err != nil {
return nil, err
2020-11-06 22:09:26 +00:00
}
// Initialize database scheme if not exists
err = repo.Migrate(context.Background())
2020-11-06 22:09:26 +00:00
if err != nil {
return nil, err
2020-11-06 22:09:26 +00:00
}
return repo, nil
2020-11-06 22:09:26 +00:00
default:
return nil, fmt.Errorf("Unsupported repository scheme: %v", databaseURL.Scheme)
2020-11-06 22:09:26 +00:00
}
}
// Repository represent a repository where all devices, sensors and measured
// values are stored.
// type Repository struct {
// database db.Database
// }
// // AddDevices to the repository
// func (repo *Repository) AddDevices(devices ...*types.Device) error {
// return repo.database.InsertDevices(context.Background(), devices...)
// }
// // AddMeasuredValues to the repository
// func (repo *Repository) AddMeasuredValues(measuredValues ...*types.MeasuredValue) error {
// return repo.database.InsertMeasuredValues(context.Background(), measuredValues...)
// }
// // AddOrUpdateDevices to the repository
// func (repo *Repository) AddOrUpdateDevices(devices ...*types.Device) error {
// return repo.database.InsertOrUpdateDevices(context.Background(), devices...)
// }
// // AddOrUpdateMeasuredValues to the repository
// func (repo *Repository) AddOrUpdateMeasuredValues(measuredValues ...*types.MeasuredValue) error {
// return repo.database.InsertOrUpdateMeasuredValues(context.Background(), measuredValues...)
// }
// // AddOrUpdateSensors to the repository
// func (repo *Repository) AddOrUpdateSensors(sensors ...*types.Sensor) error {
// return repo.database.InsertOrUpdateSensors(context.Background(), sensors...)
// }
// // AddSensors to the repository
// func (repo *Repository) AddSensors(sensors ...*types.Sensor) error {
// return repo.database.InsertSensors(context.Background(), sensors...)
// }
// // Close closes the repository and prevents new queries from starting. Close
// // then waits for all queries that have started processing on the server to
// // finish.
// func (repo *Repository) Close() error {
// return repo.database.Close()
// }
// // DisableSensorsByNames disable all sensors which match bei their name
// func (repo *Repository) DisableSensorsByNames(sensorNames ...string) error {
// sensors, err := repo.GetSensors()
// if err != nil {
// return err
// }
// matchedSensors := make([]*types.Sensor, 0)
// for _, sensor := range sensors {
// for _, sensorName := range sensorNames {
// if strings.Compare(sensor.Name, sensorName) == 0 {
// sensor.Enabled = false
// matchedSensors = append(matchedSensors, sensor)
// }
// }
// }
// return repo.UpdateSensors(matchedSensors...)
// }
// // EnableSensorsByNames enable all sensors which match bei their name
// func (repo *Repository) EnableSensorsByNames(sensorNames ...string) error {
// sensors, err := repo.GetSensors()
// if err != nil {
// return err
// }
// matchedSensors := make([]*types.Sensor, 0)
// for _, sensor := range sensors {
// for _, sensorName := range sensorNames {
// if strings.Compare(sensor.Name, sensorName) == 0 {
// sensor.Enabled = true
// matchedSensors = append(matchedSensors, sensor)
// }
// }
// }
// return repo.UpdateSensors(matchedSensors...)
// }
// // GetDevice returns a device by his id. If no device has been found, the
// // function returns nil.
// func (repo *Repository) GetDevice(deviceID string) (*types.Device, error) {
// return repo.database.SelectDevice(context.Background(), deviceID)
// }
// // GetDevices returns all devices. If no devices has been found, the function
// // returns nil.
// func (repo *Repository) GetDevices() ([]*types.Device, error) {
// return repo.database.SelectDevices(context.Background())
// }
// // GetHumidity returns a humidity value by id
// func (repo *Repository) GetHumidity(humidityID string) (*types.MeasuredValue, error) {
// return repo.database.SelectHumidity(context.Background(), humidityID)
// }
// // GetPressure returns a pressure value by id
// func (repo *Repository) GetPressure(pressureID string) (*types.MeasuredValue, error) {
// return repo.database.SelectPressure(context.Background(), pressureID)
// }
// // GetSensor returns a sensor by his id. If no sensor has been found, the
// // function returns nil.
// func (repo *Repository) GetSensor(sensorID string) (*types.Sensor, error) {
// return repo.database.SelectSensor(context.Background(), sensorID)
// }
// // GetSensors returns all sensors. If no sensors has been found, the function
// // returns nil.
// func (repo *Repository) GetSensors(models ...string) ([]*types.Sensor, error) {
// sensors, err := repo.database.SelectSensors(context.Background())
// switch {
// case err != nil:
// return nil, err
// case len(models) > 0:
// cachedSensors := make([]*types.Sensor, 0)
// LOOP:
// for i := range sensors {
// for j := range models {
// if strings.ToLower(sensors[i].Model) == strings.ToLower(models[j]) {
// cachedSensors = append(cachedSensors, sensors[i])
// continue LOOP
// }
// }
// }
// return cachedSensors, nil
// case len(models) <= 0:
// fallthrough
// default:
// return sensors, err
// }
// }
// // GetSensorsByDeviceID returns all sensors by a device id. If no sensor has
// // been found by the id, the function returns nil.
// func (repo *Repository) GetSensorsByDeviceID(deviceID string) ([]*types.Sensor, error) {
// cachedSensors := make([]*types.Sensor, 0)
// sensors, err := repo.GetSensors()
// if err != nil {
// return nil, err
// }
// for _, sensor := range sensors {
// if strings.Compare(sensor.DeviceID, deviceID) == 0 {
// cachedSensors = append(cachedSensors, sensor)
// }
// }
// return cachedSensors, nil
// }
// // GetTemperature returns a temperature value by id
// func (repo *Repository) GetTemperature(pressureID string) (*types.MeasuredValue, error) {
// return repo.database.SelectTemperature(context.Background(), pressureID)
// }
// // RemoveDevices removes devices by their ids from the repository. Additional
// // all sensors and measured values, which are in relation with the device
// // respectively the sensors will also be deleted.
// func (repo *Repository) RemoveDevices(deviceIDs ...string) error {
// return repo.database.DeleteDevices(context.Background(), deviceIDs...)
// }
// // RenameSensors all sensors which match by their current name to the new name
// func (repo *Repository) RenameSensors(oldName string, newName string) error {
// sensors, err := repo.GetSensors()
// if err != nil {
// return err
// }
// matchedSensors := make([]*types.Sensor, 0)
// for _, sensor := range sensors {
// if strings.Compare(sensor.Name, oldName) == 0 {
// sensor.Name = newName
// matchedSensors = append(matchedSensors, sensor)
// }
// }
// return repo.UpdateSensors(matchedSensors...)
// }
// // RemoveSensors removes sensors by their ids from the repository. Additional
// // all measured values, which are in relation with the sensor will also be
// // deleted.
// func (repo *Repository) RemoveSensors(sensorIDs ...string) error {
// return repo.database.DeleteSensors(context.Background(), sensorIDs...)
// }
// // RemoveSensorsByNames removes all sensors which match bei their name
// func (repo *Repository) RemoveSensorsByNames(sensorNames ...string) error {
// sensors, err := repo.GetSensors()
// if err != nil {
// return err
// }
// matchedSensorIDs := make([]string, 0)
// for _, sensor := range sensors {
// for _, sensorName := range sensorNames {
// if strings.Compare(sensor.Name, sensorName) == 0 {
// matchedSensorIDs = append(matchedSensorIDs, sensor.ID)
// }
// }
// }
// return repo.RemoveSensors(matchedSensorIDs...)
// }
// // UpdateDevices update devices which are stored into the repository by their
// // id. The id of the device can not be updated.
// func (repo *Repository) UpdateDevices(devices ...*types.Device) error {
// return repo.database.UpdateDevices(context.Background(), devices...)
// }
// // UpdateSensors update sensors which are stored into the repository by their
// // id. The id of the sensor can not be updated.
// func (repo *Repository) UpdateSensors(sensors ...*types.Sensor) error {
// return repo.database.UpdateSensors(context.Background(), sensors...)
// }
// // New returns a new repository based on the data source name (dsn)
// func New(dsnURL *url.URL, flogger logger.Logger) (*Repository, error) {
// database, err := db.New(dsnURL, flogger)
// if err != nil {
// return nil, err
// }
// return &Repository{
// database: database,
// }, nil
// }
// type OptImport struct {
// Sensors bool
// Humidities bool
// Pressures bool
// Temperatures bool
// }
// func Import(sourceDSNURL *url.URL, destDNSURL *url.URL, flogger logger.Logger, optImport OptImport) error {
// importMap := map[string]bool{
// "humidities": optImport.Humidities,
// "pressures": optImport.Pressures,
// "temperatures": optImport.Temperatures,
// }
// // enable sensors if one measured value is enabled
// if !optImport.Sensors {
// for key, value := range importMap {
// if value {
// flogger.Info("Enable import option sensors. It's required as foreign key for %v", key)
// optImport.Sensors = true
// }
// }
// }
// // Initialize source repository
// sourceRepo, err := New(sourceDSNURL, flogger)
// if err != nil {
// return fmt.Errorf("Failed to open the source repo: %w", err)
// }
// defer sourceRepo.Close()
// // Initialize destination repository
// destRepo, err := New(destDNSURL, flogger)
// if err != nil {
// return fmt.Errorf("Failed to open the destination repo: %w", err)
// }
// defer destRepo.Close()
// // AddOrUpdate: Devices
// ctx := context.Background()
// devices, err := sourceRepo.database.SelectDevices(ctx)
// if err != nil {
// return fmt.Errorf("Failed to fetch devices from source repo: %w", err)
// }
// flogger.Debug("Found %v devices", len(devices))
// for i := range devices {
// err := destRepo.AddOrUpdateDevices(devices[i])
// if err != nil {
// return fmt.Errorf("Failed to add or update device %v into dest repo: %w", devices[i].Name, err)
// }
// }
// // AddOrUpdate: Sensors
// if optImport.Sensors {
// sensors, err := sourceRepo.database.SelectSensors(ctx)
// if err != nil {
// return fmt.Errorf("Failed to fetch sensors from source repo: %w", err)
// }
// flogger.Debug("Found %v sensors", len(sensors))
// for i := range sensors {
// err := destRepo.AddOrUpdateSensors(sensors[i])
// if err != nil {
// return fmt.Errorf("Failed to add or update sensor %v into dest repo: %w", sensors[i].Name, err)
// }
// }
// }
// for key, f := range map[string]func(context.Context) ([]*types.MeasuredValue, error){
// "humidities": sourceRepo.database.SelectHumidities,
// "pressures": sourceRepo.database.SelectPressures,
// "temperatures": sourceRepo.database.SelectTemperatures,
// } {
// if importMap[key] {
// measuredValues, err := f(ctx)
// if err != nil {
// return fmt.Errorf("Failed to select %v from source repo: %w", key, err)
// }
// flogger.Debug("Found %v %v values", len(measuredValues), key)
// err = destRepo.AddOrUpdateMeasuredValues(measuredValues...)
// if err != nil {
// return fmt.Errorf("Failed to add or update %v into dest repo: %w", key, err)
// }
// }
// }
// return nil
// }