package repository import ( "context" "fmt" "net/url" "git.cryptic.systems/volker.raschek/flucky/pkg/repository/postgres" "git.cryptic.systems/volker.raschek/flucky/pkg/repository/sqlite3" "git.cryptic.systems/volker.raschek/flucky/pkg/types" "git.cryptic.systems/volker.raschek/go-logger" ) type Repository interface { AddDevices(ctx context.Context, devices ...*types.Device) error AddMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error AddOrUpdateDevices(ctx context.Context, devices ...*types.Device) error AddOrUpdateMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error AddOrUpdateSensors(ctx context.Context, sensors ...*types.Sensor) error AddSensors(ctx context.Context, sensors ...*types.Sensor) error Close() error GetDeviceByID(ctx context.Context, deviceID string) (*types.Device, error) GetDeviceByName(ctx context.Context, name string) (*types.Device, error) GetDevices(ctx context.Context) ([]*types.Device, error) GetHumidities(ctx context.Context) ([]*types.MeasuredValue, error) GetHumidityByID(ctx context.Context, id string) (*types.MeasuredValue, error) GetPressureByID(ctx context.Context, id string) (*types.MeasuredValue, error) GetPressures(ctx context.Context) ([]*types.MeasuredValue, error) GetSensorByID(ctx context.Context, sensorID string) (*types.Sensor, error) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs ...string) ([]*types.Sensor, error) GetSensorsByModels(ctx context.Context, sensorModels ...string) ([]*types.Sensor, error) GetSensorsByNames(ctx context.Context, sensorModels ...string) ([]*types.Sensor, error) GetSensors(ctx context.Context) ([]*types.Sensor, error) GetTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error) GetTemperatures(ctx context.Context) ([]*types.MeasuredValue, error) Migrate(ctx context.Context) error RemoveDevicesByIDs(ctx context.Context, deviceIDs ...string) error RemoveDevicesByNames(ctx context.Context, names ...string) error RemoveSensorsByIDs(ctx context.Context, sensorIDs ...string) error RemoveSensorsByNames(ctx context.Context, names ...string) error UpdateDevices(ctx context.Context, devices ...*types.Device) error UpdateSensors(ctx context.Context, sensors ...*types.Sensor) error } // New returns a new database backend interface func New(databaseURL *url.URL, flogger logger.Logger) (Repository, error) { switch databaseURL.Scheme { case "postgres": repo, err := postgres.New(postgres.Opts{ DatabaseURL: databaseURL, Logger: flogger, }) if err != nil { return nil, err } // Initialize database scheme if not exists err = repo.Migrate(context.Background()) if err != nil { return nil, err } return repo, nil case "sqlite3": repo, err := sqlite3.New(sqlite3.Opts{ DatabaseURL: databaseURL, Logger: flogger, }) if err != nil { return nil, err } // Initialize database scheme if not exists err = repo.Migrate(context.Background()) if err != nil { return nil, err } return repo, nil default: return nil, fmt.Errorf("Unsupported repository scheme: %v", databaseURL.Scheme) } } // Repository represent a repository where all devices, sensors and measured // values are stored. // type Repository struct { // database db.Database // } // // AddDevices to the repository // func (repo *Repository) AddDevices(devices ...*types.Device) error { // return repo.database.InsertDevices(context.Background(), devices...) // } // // AddMeasuredValues to the repository // func (repo *Repository) AddMeasuredValues(measuredValues ...*types.MeasuredValue) error { // return repo.database.InsertMeasuredValues(context.Background(), measuredValues...) // } // // AddOrUpdateDevices to the repository // func (repo *Repository) AddOrUpdateDevices(devices ...*types.Device) error { // return repo.database.InsertOrUpdateDevices(context.Background(), devices...) // } // // AddOrUpdateMeasuredValues to the repository // func (repo *Repository) AddOrUpdateMeasuredValues(measuredValues ...*types.MeasuredValue) error { // return repo.database.InsertOrUpdateMeasuredValues(context.Background(), measuredValues...) // } // // AddOrUpdateSensors to the repository // func (repo *Repository) AddOrUpdateSensors(sensors ...*types.Sensor) error { // return repo.database.InsertOrUpdateSensors(context.Background(), sensors...) // } // // AddSensors to the repository // func (repo *Repository) AddSensors(sensors ...*types.Sensor) error { // return repo.database.InsertSensors(context.Background(), sensors...) // } // // Close closes the repository and prevents new queries from starting. Close // // then waits for all queries that have started processing on the server to // // finish. // func (repo *Repository) Close() error { // return repo.database.Close() // } // // DisableSensorsByNames disable all sensors which match bei their name // func (repo *Repository) DisableSensorsByNames(sensorNames ...string) error { // sensors, err := repo.GetSensors() // if err != nil { // return err // } // matchedSensors := make([]*types.Sensor, 0) // for _, sensor := range sensors { // for _, sensorName := range sensorNames { // if strings.Compare(sensor.Name, sensorName) == 0 { // sensor.Enabled = false // matchedSensors = append(matchedSensors, sensor) // } // } // } // return repo.UpdateSensors(matchedSensors...) // } // // EnableSensorsByNames enable all sensors which match bei their name // func (repo *Repository) EnableSensorsByNames(sensorNames ...string) error { // sensors, err := repo.GetSensors() // if err != nil { // return err // } // matchedSensors := make([]*types.Sensor, 0) // for _, sensor := range sensors { // for _, sensorName := range sensorNames { // if strings.Compare(sensor.Name, sensorName) == 0 { // sensor.Enabled = true // matchedSensors = append(matchedSensors, sensor) // } // } // } // return repo.UpdateSensors(matchedSensors...) // } // // GetDevice returns a device by his id. If no device has been found, the // // function returns nil. // func (repo *Repository) GetDevice(deviceID string) (*types.Device, error) { // return repo.database.SelectDevice(context.Background(), deviceID) // } // // GetDevices returns all devices. If no devices has been found, the function // // returns nil. // func (repo *Repository) GetDevices() ([]*types.Device, error) { // return repo.database.SelectDevices(context.Background()) // } // // GetHumidity returns a humidity value by id // func (repo *Repository) GetHumidity(humidityID string) (*types.MeasuredValue, error) { // return repo.database.SelectHumidity(context.Background(), humidityID) // } // // GetPressure returns a pressure value by id // func (repo *Repository) GetPressure(pressureID string) (*types.MeasuredValue, error) { // return repo.database.SelectPressure(context.Background(), pressureID) // } // // GetSensor returns a sensor by his id. If no sensor has been found, the // // function returns nil. // func (repo *Repository) GetSensor(sensorID string) (*types.Sensor, error) { // return repo.database.SelectSensor(context.Background(), sensorID) // } // // GetSensors returns all sensors. If no sensors has been found, the function // // returns nil. // func (repo *Repository) GetSensors(models ...string) ([]*types.Sensor, error) { // sensors, err := repo.database.SelectSensors(context.Background()) // switch { // case err != nil: // return nil, err // case len(models) > 0: // cachedSensors := make([]*types.Sensor, 0) // LOOP: // for i := range sensors { // for j := range models { // if strings.ToLower(sensors[i].Model) == strings.ToLower(models[j]) { // cachedSensors = append(cachedSensors, sensors[i]) // continue LOOP // } // } // } // return cachedSensors, nil // case len(models) <= 0: // fallthrough // default: // return sensors, err // } // } // // GetSensorsByDeviceID returns all sensors by a device id. If no sensor has // // been found by the id, the function returns nil. // func (repo *Repository) GetSensorsByDeviceID(deviceID string) ([]*types.Sensor, error) { // cachedSensors := make([]*types.Sensor, 0) // sensors, err := repo.GetSensors() // if err != nil { // return nil, err // } // for _, sensor := range sensors { // if strings.Compare(sensor.DeviceID, deviceID) == 0 { // cachedSensors = append(cachedSensors, sensor) // } // } // return cachedSensors, nil // } // // GetTemperature returns a temperature value by id // func (repo *Repository) GetTemperature(pressureID string) (*types.MeasuredValue, error) { // return repo.database.SelectTemperature(context.Background(), pressureID) // } // // RemoveDevices removes devices by their ids from the repository. Additional // // all sensors and measured values, which are in relation with the device // // respectively the sensors will also be deleted. // func (repo *Repository) RemoveDevices(deviceIDs ...string) error { // return repo.database.DeleteDevices(context.Background(), deviceIDs...) // } // // RenameSensors all sensors which match by their current name to the new name // func (repo *Repository) RenameSensors(oldName string, newName string) error { // sensors, err := repo.GetSensors() // if err != nil { // return err // } // matchedSensors := make([]*types.Sensor, 0) // for _, sensor := range sensors { // if strings.Compare(sensor.Name, oldName) == 0 { // sensor.Name = newName // matchedSensors = append(matchedSensors, sensor) // } // } // return repo.UpdateSensors(matchedSensors...) // } // // RemoveSensors removes sensors by their ids from the repository. Additional // // all measured values, which are in relation with the sensor will also be // // deleted. // func (repo *Repository) RemoveSensors(sensorIDs ...string) error { // return repo.database.DeleteSensors(context.Background(), sensorIDs...) // } // // RemoveSensorsByNames removes all sensors which match bei their name // func (repo *Repository) RemoveSensorsByNames(sensorNames ...string) error { // sensors, err := repo.GetSensors() // if err != nil { // return err // } // matchedSensorIDs := make([]string, 0) // for _, sensor := range sensors { // for _, sensorName := range sensorNames { // if strings.Compare(sensor.Name, sensorName) == 0 { // matchedSensorIDs = append(matchedSensorIDs, sensor.ID) // } // } // } // return repo.RemoveSensors(matchedSensorIDs...) // } // // UpdateDevices update devices which are stored into the repository by their // // id. The id of the device can not be updated. // func (repo *Repository) UpdateDevices(devices ...*types.Device) error { // return repo.database.UpdateDevices(context.Background(), devices...) // } // // UpdateSensors update sensors which are stored into the repository by their // // id. The id of the sensor can not be updated. // func (repo *Repository) UpdateSensors(sensors ...*types.Sensor) error { // return repo.database.UpdateSensors(context.Background(), sensors...) // } // // New returns a new repository based on the data source name (dsn) // func New(dsnURL *url.URL, flogger logger.Logger) (*Repository, error) { // database, err := db.New(dsnURL, flogger) // if err != nil { // return nil, err // } // return &Repository{ // database: database, // }, nil // } // type OptImport struct { // Sensors bool // Humidities bool // Pressures bool // Temperatures bool // } // func Import(sourceDSNURL *url.URL, destDNSURL *url.URL, flogger logger.Logger, optImport OptImport) error { // importMap := map[string]bool{ // "humidities": optImport.Humidities, // "pressures": optImport.Pressures, // "temperatures": optImport.Temperatures, // } // // enable sensors if one measured value is enabled // if !optImport.Sensors { // for key, value := range importMap { // if value { // flogger.Info("Enable import option sensors. It's required as foreign key for %v", key) // optImport.Sensors = true // } // } // } // // Initialize source repository // sourceRepo, err := New(sourceDSNURL, flogger) // if err != nil { // return fmt.Errorf("Failed to open the source repo: %w", err) // } // defer sourceRepo.Close() // // Initialize destination repository // destRepo, err := New(destDNSURL, flogger) // if err != nil { // return fmt.Errorf("Failed to open the destination repo: %w", err) // } // defer destRepo.Close() // // AddOrUpdate: Devices // ctx := context.Background() // devices, err := sourceRepo.database.SelectDevices(ctx) // if err != nil { // return fmt.Errorf("Failed to fetch devices from source repo: %w", err) // } // flogger.Debug("Found %v devices", len(devices)) // for i := range devices { // err := destRepo.AddOrUpdateDevices(devices[i]) // if err != nil { // return fmt.Errorf("Failed to add or update device %v into dest repo: %w", devices[i].Name, err) // } // } // // AddOrUpdate: Sensors // if optImport.Sensors { // sensors, err := sourceRepo.database.SelectSensors(ctx) // if err != nil { // return fmt.Errorf("Failed to fetch sensors from source repo: %w", err) // } // flogger.Debug("Found %v sensors", len(sensors)) // for i := range sensors { // err := destRepo.AddOrUpdateSensors(sensors[i]) // if err != nil { // return fmt.Errorf("Failed to add or update sensor %v into dest repo: %w", sensors[i].Name, err) // } // } // } // for key, f := range map[string]func(context.Context) ([]*types.MeasuredValue, error){ // "humidities": sourceRepo.database.SelectHumidities, // "pressures": sourceRepo.database.SelectPressures, // "temperatures": sourceRepo.database.SelectTemperatures, // } { // if importMap[key] { // measuredValues, err := f(ctx) // if err != nil { // return fmt.Errorf("Failed to select %v from source repo: %w", key, err) // } // flogger.Debug("Found %v %v values", len(measuredValues), key) // err = destRepo.AddOrUpdateMeasuredValues(measuredValues...) // if err != nil { // return fmt.Errorf("Failed to add or update %v into dest repo: %w", key, err) // } // } // } // return nil // }