package repository import ( "context" "fmt" "net/url" "strings" "git.cryptic.systems/volker.raschek/flucky/pkg/repository/db" "git.cryptic.systems/volker.raschek/flucky/pkg/types" "git.cryptic.systems/volker.raschek/go-logger" ) // Repository represent a repository where all devices, sensors and measured // values are stored. type Repository struct { database db.Database } // AddDevices to the repository func (repo *Repository) AddDevices(devices ...*types.Device) error { return repo.database.InsertDevices(context.Background(), devices...) } // AddMeasuredValues to the repository func (repo *Repository) AddMeasuredValues(measuredValues ...*types.MeasuredValue) error { return repo.database.InsertMeasuredValues(context.Background(), measuredValues...) } // AddOrUpdateDevices to the repository func (repo *Repository) AddOrUpdateDevices(devices ...*types.Device) error { return repo.database.InsertOrUpdateDevices(context.Background(), devices...) } // AddOrUpdateMeasuredValues to the repository func (repo *Repository) AddOrUpdateMeasuredValues(measuredValues ...*types.MeasuredValue) error { return repo.database.InsertOrUpdateMeasuredValues(context.Background(), measuredValues...) } // AddOrUpdateSensors to the repository func (repo *Repository) AddOrUpdateSensors(sensors ...*types.Sensor) error { return repo.database.InsertOrUpdateSensors(context.Background(), sensors...) } // AddSensors to the repository func (repo *Repository) AddSensors(sensors ...*types.Sensor) error { return repo.database.InsertSensors(context.Background(), sensors...) } // Close closes the repository and prevents new queries from starting. Close // then waits for all queries that have started processing on the server to // finish. func (repo *Repository) Close() error { return repo.database.Close() } // DisableSensorsByNames disable all sensors which match bei their name func (repo *Repository) DisableSensorsByNames(sensorNames ...string) error { sensors, err := repo.GetSensors() if err != nil { return err } matchedSensors := make([]*types.Sensor, 0) for _, sensor := range sensors { for _, sensorName := range sensorNames { if strings.Compare(sensor.Name, sensorName) == 0 { sensor.Enabled = false matchedSensors = append(matchedSensors, sensor) } } } return repo.UpdateSensors(matchedSensors...) } // EnableSensorsByNames enable all sensors which match bei their name func (repo *Repository) EnableSensorsByNames(sensorNames ...string) error { sensors, err := repo.GetSensors() if err != nil { return err } matchedSensors := make([]*types.Sensor, 0) for _, sensor := range sensors { for _, sensorName := range sensorNames { if strings.Compare(sensor.Name, sensorName) == 0 { sensor.Enabled = true matchedSensors = append(matchedSensors, sensor) } } } return repo.UpdateSensors(matchedSensors...) } // GetDevice returns a device by his id. If no device has been found, the // function returns nil. func (repo *Repository) GetDevice(deviceID string) (*types.Device, error) { return repo.database.SelectDevice(context.Background(), deviceID) } // GetDevices returns all devices. If no devices has been found, the function // returns nil. func (repo *Repository) GetDevices() ([]*types.Device, error) { return repo.database.SelectDevices(context.Background()) } // GetHumidity returns a humidity value by id func (repo *Repository) GetHumidity(humidityID string) (*types.MeasuredValue, error) { return repo.database.SelectHumidity(context.Background(), humidityID) } // GetPressure returns a pressure value by id func (repo *Repository) GetPressure(pressureID string) (*types.MeasuredValue, error) { return repo.database.SelectPressure(context.Background(), pressureID) } // GetSensor returns a sensor by his id. If no sensor has been found, the // function returns nil. func (repo *Repository) GetSensor(sensorID string) (*types.Sensor, error) { return repo.database.SelectSensor(context.Background(), sensorID) } // GetSensors returns all sensors. If no sensors has been found, the function // returns nil. func (repo *Repository) GetSensors(models ...string) ([]*types.Sensor, error) { sensors, err := repo.database.SelectSensors(context.Background()) switch { case err != nil: return nil, err case len(models) > 0: cachedSensors := make([]*types.Sensor, 0) LOOP: for i := range sensors { for j := range models { if strings.ToLower(sensors[i].Model) == strings.ToLower(models[j]) { cachedSensors = append(cachedSensors, sensors[i]) continue LOOP } } } return cachedSensors, nil case len(models) <= 0: fallthrough default: return sensors, err } } // GetSensorsByDeviceID returns all sensors by a device id. If no sensor has // been found by the id, the function returns nil. func (repo *Repository) GetSensorsByDeviceID(deviceID string) ([]*types.Sensor, error) { cachedSensors := make([]*types.Sensor, 0) sensors, err := repo.GetSensors() if err != nil { return nil, err } for _, sensor := range sensors { if strings.Compare(sensor.DeviceID, deviceID) == 0 { cachedSensors = append(cachedSensors, sensor) } } return cachedSensors, nil } // GetTemperature returns a temperature value by id func (repo *Repository) GetTemperature(pressureID string) (*types.MeasuredValue, error) { return repo.database.SelectTemperature(context.Background(), pressureID) } // RemoveDevices removes devices by their ids from the repository. Additional // all sensors and measured values, which are in relation with the device // respectively the sensors will also be deleted. func (repo *Repository) RemoveDevices(deviceIDs ...string) error { return repo.database.DeleteDevices(context.Background(), deviceIDs...) } // RenameSensors all sensors which match by their current name to the new name func (repo *Repository) RenameSensors(oldName string, newName string) error { sensors, err := repo.GetSensors() if err != nil { return err } matchedSensors := make([]*types.Sensor, 0) for _, sensor := range sensors { if strings.Compare(sensor.Name, oldName) == 0 { sensor.Name = newName matchedSensors = append(matchedSensors, sensor) } } return repo.UpdateSensors(matchedSensors...) } // RemoveSensors removes sensors by their ids from the repository. Additional // all measured values, which are in relation with the sensor will also be // deleted. func (repo *Repository) RemoveSensors(sensorIDs ...string) error { return repo.database.DeleteSensors(context.Background(), sensorIDs...) } // RemoveSensorsByNames removes all sensors which match bei their name func (repo *Repository) RemoveSensorsByNames(sensorNames ...string) error { sensors, err := repo.GetSensors() if err != nil { return err } matchedSensorIDs := make([]string, 0) for _, sensor := range sensors { for _, sensorName := range sensorNames { if strings.Compare(sensor.Name, sensorName) == 0 { matchedSensorIDs = append(matchedSensorIDs, sensor.ID) } } } return repo.RemoveSensors(matchedSensorIDs...) } // UpdateDevices update devices which are stored into the repository by their // id. The id of the device can not be updated. func (repo *Repository) UpdateDevices(devices ...*types.Device) error { return repo.database.UpdateDevices(context.Background(), devices...) } // UpdateSensors update sensors which are stored into the repository by their // id. The id of the sensor can not be updated. func (repo *Repository) UpdateSensors(sensors ...*types.Sensor) error { return repo.database.UpdateSensors(context.Background(), sensors...) } // New returns a new repository based on the data source name (dsn) func New(dsnURL *url.URL, flogger logger.Logger) (*Repository, error) { database, err := db.New(dsnURL, flogger) if err != nil { return nil, err } return &Repository{ database: database, }, nil } type OptImport struct { Sensors bool Humidities bool Pressures bool Temperatures bool } func Import(sourceDSNURL *url.URL, destDNSURL *url.URL, flogger logger.Logger, optImport OptImport) error { importMap := map[string]bool{ "humidities": optImport.Humidities, "pressures": optImport.Pressures, "temperatures": optImport.Temperatures, } // enable sensors if one measured value is enabled if !optImport.Sensors { for key, value := range importMap { if value { flogger.Info("Enable import option sensors. It's required as foreign key for %v", key) optImport.Sensors = true } } } // Initialize source repository sourceRepo, err := New(sourceDSNURL, flogger) if err != nil { return fmt.Errorf("Failed to open the source repo: %w", err) } defer sourceRepo.Close() // Initialize destination repository destRepo, err := New(destDNSURL, flogger) if err != nil { return fmt.Errorf("Failed to open the destination repo: %w", err) } defer destRepo.Close() // AddOrUpdate: Devices ctx := context.Background() devices, err := sourceRepo.database.SelectDevices(ctx) if err != nil { return fmt.Errorf("Failed to fetch devices from source repo: %w", err) } flogger.Debug("Found %v devices", len(devices)) for i := range devices { err := destRepo.AddOrUpdateDevices(devices[i]) if err != nil { return fmt.Errorf("Failed to add or update device %v into dest repo: %w", devices[i].Name, err) } } // AddOrUpdate: Sensors if optImport.Sensors { sensors, err := sourceRepo.database.SelectSensors(ctx) if err != nil { return fmt.Errorf("Failed to fetch sensors from source repo: %w", err) } flogger.Debug("Found %v sensors", len(sensors)) for i := range sensors { err := destRepo.AddOrUpdateSensors(sensors[i]) if err != nil { return fmt.Errorf("Failed to add or update sensor %v into dest repo: %w", sensors[i].Name, err) } } } for key, f := range map[string]func(context.Context) ([]*types.MeasuredValue, error){ "humidities": sourceRepo.database.SelectHumidities, "pressures": sourceRepo.database.SelectPressures, "temperatures": sourceRepo.database.SelectTemperatures, } { if importMap[key] { measuredValues, err := f(ctx) if err != nil { return fmt.Errorf("Failed to select %v from source repo: %w", key, err) } flogger.Debug("Found %v %v values", len(measuredValues), key) err = destRepo.AddOrUpdateMeasuredValues(measuredValues...) if err != nil { return fmt.Errorf("Failed to add or update %v into dest repo: %w", key, err) } } } return nil }