diff --git a/cli/imp/imp.go b/cli/imp/imp.go index 34d0ca4..630c9b6 100644 --- a/cli/imp/imp.go +++ b/cli/imp/imp.go @@ -1,6 +1,13 @@ package imp import ( + "context" + "fmt" + "net/url" + + "git.cryptic.systems/volker.raschek/flucky/pkg/config" + "git.cryptic.systems/volker.raschek/flucky/pkg/repository" + "git.cryptic.systems/volker.raschek/go-logger" "github.com/spf13/cobra" ) @@ -15,61 +22,74 @@ func InitCmd(cmd *cobra.Command) error { importCmd := &cobra.Command{ Use: "import", + Args: cobra.RangeArgs(1, 2), Short: "Import data from passed URL", - RunE: importSources, + Example: `import sqlite3:///var/cache/flucky/sqlite3.db +import sqlite3:///var/cache/flucky/sqlite3.db postgres://user:password@host:port/database?sslmode=disable`, + RunE: importSources, } - importCmd.Flags().BoolVar(&importSensors, "sensors", true, "Import sensors") - importCmd.Flags().BoolVar(&importHumidities, "humidities", true, "Import humidities") - importCmd.Flags().BoolVar(&importPressures, "pressures", true, "Import pressures") - importCmd.Flags().BoolVar(&importTemperatures, "temperatures", true, "Import temperatures") - cmd.AddCommand(importCmd) return nil } func importSources(cmd *cobra.Command, args []string) error { - // configFile, err := cmd.Flags().GetString("config") - // if err != nil { - // return fmt.Errorf("No config file defined") - // } - // cnf, err := config.Read(configFile) - // if err != nil { - // return err - // } + configFile, err := cmd.Flags().GetString("config") + if err != nil { + return fmt.Errorf("No config file defined") + } - // destURL, err := url.Parse(cnf.DSN) - // if err != nil { - // return err - // } + cnf, err := config.Read(configFile) + if err != nil { + return err + } - // logLevelString, err := cmd.Flags().GetString("loglevel") - // if err != nil { - // return err - // } + logLevelString, err := cmd.Flags().GetString("loglevel") + if err != nil { + return err + } - // logLevel, err := logger.ParseLogLevel(logLevelString) - // if err != nil { - // return err - // } + logLevel, err := logger.ParseLogLevel(logLevelString) + if err != nil { + return err + } - // flogger := logger.NewLogger(logLevel) + flogger := logger.NewLogger(logLevel) - // sourceURL, err := url.Parse(args[0]) - // if err != nil { - // return fmt.Errorf("Failed to parse source url: %w", err) - // } + var ( + srcURL *url.URL + destURL *url.URL + ) - // err = repository.Import(sourceURL, destURL, flogger, repository.OptImport{ - // Sensors: importSensors, - // Humidities: importHumidities, - // Pressures: importPressures, - // Temperatures: importTemperatures, - // }) - // if err != nil { - // return fmt.Errorf("Failed to import: %w", err) - // } + srcURL, err = url.Parse(args[0]) + if err != nil { + return err + } - return nil + switch len(args) { + case 1: + destURL, err = url.Parse(cnf.DSN) + if err != nil { + return err + } + + case 2: + destURL, err = url.Parse(args[1]) + if err != nil { + return err + } + } + + srcRepository, err := repository.New(srcURL, flogger) + if err != nil { + return err + } + + destRepository, err := repository.New(destURL, flogger) + if err != nil { + return err + } + + return destRepository.Import(context.Background(), srcRepository) } diff --git a/pkg/repository/postgres/postgres.go b/pkg/repository/postgres.go similarity index 60% rename from pkg/repository/postgres/postgres.go rename to pkg/repository/postgres.go index 8cb81ef..b6f59c6 100644 --- a/pkg/repository/postgres/postgres.go +++ b/pkg/repository/postgres.go @@ -1,121 +1,26 @@ -package postgres +package repository import ( "context" "database/sql" - "embed" "errors" "fmt" "net/url" "time" + "git.cryptic.systems/volker.raschek/flucky/pkg/repository/postgres" "git.cryptic.systems/volker.raschek/flucky/pkg/types" "git.cryptic.systems/volker.raschek/go-logger" "github.com/golang-migrate/migrate/v4" "github.com/johejo/golang-migrate-extra/source/iofs" ) -var ( - //go:embed ddl/*.sql - ddlAssets embed.FS - - //go:embed dml/deleteDeviceByID.sql - deleteDeviceByIDSQL string - - //go:embed dml/deleteDeviceByName.sql - deleteDeviceByNameSQL string - - //go:embed dml/deleteSensorByID.sql - deleteSensorByIDSQL string - - //go:embed dml/deleteSensorByName.sql - deleteSensorByNameSQL string - - //go:embed dml/insertDevice.sql - insertDeviceSQL string - - //go:embed dml/insertHumidity.sql - insertHumiditySQL string - - //go:embed dml/insertOrUpdateDevice.sql - insertOrUpdateDeviceSQL string - - //go:embed dml/insertOrUpdateHumidity.sql - insertOrUpdateHumiditySQL string - - //go:embed dml/insertOrUpdatePressure.sql - insertOrUpdatePressureSQL string - - //go:embed dml/insertOrUpdateSensor.sql - insertOrUpdateSensorSQL string - - //go:embed dml/insertOrUpdateTemperature.sql - insertOrUpdateTemperatureSQL string - - //go:embed dml/insertPressure.sql - insertPressureSQL string - - //go:embed dml/insertSensor.sql - insertSensorSQL string - - //go:embed dml/insertTemperature.sql - insertTemperatureSQL string - - //go:embed dml/selectDeviceByID.sql - selectDeviceByIDSQL string - - //go:embed dml/selectDeviceByName.sql - selectDeviceByNameSQL string - - //go:embed dml/selectDevices.sql - selectDevicesSQL string - - //go:embed dml/selectHumidities.sql - selectHumiditiesSQL string - - //go:embed dml/selectHumidityByID.sql - selectHumidityByIDSQL string - - //go:embed dml/selectPressureByID.sql - selectPressureByIDSQL string - - //go:embed dml/selectPressures.sql - selectPressuresSQL string - - //go:embed dml/selectSensorByID.sql - selectSensorByIDSQL string - - //go:embed dml/selectSensors.sql - selectSensorsSQL string - - //go:embed dml/selectSensorsByDeviceID.sql - selectSensorsByDeviceIDSQL string - - //go:embed dml/selectSensorsByModel.sql - selectSensorsByModelSQL string - - //go:embed dml/selectSensorsByName.sql - selectSensorsByNameSQL string - - //go:embed dml/selectTemperatureByID.sql - selectTemperatureByIDSQL string - - //go:embed dml/selectTemperatures.sql - selectTemperaturesSQL string - - //go:embed dml/updateDevice.sql - updateDeviceSQL string - - //go:embed dml/updateSensor.sql - updateSensorSQL string -) - -type Opts struct { +type PostgresOpts struct { DatabaseURL *url.URL Logger logger.Logger } -func (o *Opts) Validate() error { +func (o *PostgresOpts) Validate() error { for k, v := range map[string]interface{}{ "DatabaseURL": o.DatabaseURL, "Logger": o.Logger, @@ -132,7 +37,7 @@ func (o *Opts) Validate() error { return nil } -func New(opts Opts) (*Postgres, error) { +func NewPostgres(opts PostgresOpts) (Repository, error) { if err := opts.Validate(); err != nil { return nil, err } @@ -157,14 +62,14 @@ type Postgres struct { } // AddDevices into the database -func (postgres *Postgres) AddDevices(ctx context.Context, devices ...*types.Device) error { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) +func (d *Postgres) AddDevices(ctx context.Context, devices ...*types.Device) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - err = postgres.insertDevices(tx, devices...) + err = d.insertDevices(tx, devices...) if err != nil { return err } @@ -172,8 +77,8 @@ func (postgres *Postgres) AddDevices(ctx context.Context, devices ...*types.Devi return tx.Commit() } -func (postgres *Postgres) insertDevices(tx *sql.Tx, devices ...*types.Device) error { - stmt, err := tx.Prepare(insertDeviceSQL) +func (d *Postgres) insertDevices(tx *sql.Tx, devices ...*types.Device) error { + stmt, err := tx.Prepare(postgres.InsertDeviceSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -200,14 +105,23 @@ func (postgres *Postgres) insertDevices(tx *sql.Tx, devices ...*types.Device) er return nil } -func (postgres *Postgres) AddOrUpdateDevices(ctx context.Context, devices ...*types.Device) error { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) +func (d *Postgres) AddOrUpdateDevices(ctx context.Context, devices ...*types.Device) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(insertOrUpdateDeviceSQL) + err = d.insertOrUpdateDevices(tx, devices...) + if err != nil { + return err + } + + return tx.Commit() +} + +func (d *Postgres) insertOrUpdateDevices(tx *sql.Tx, devices ...*types.Device) error { + stmt, err := tx.Prepare(postgres.InsertOrUpdateDeviceSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %w", err) } @@ -231,11 +145,11 @@ func (postgres *Postgres) AddOrUpdateDevices(ctx context.Context, devices ...*ty } } - return tx.Commit() + return nil } // AddMeasuredValues into the database -func (postgres *Postgres) AddMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error { +func (d *Postgres) AddMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error { splittedMeasuredValues := make(map[types.MeasuredValueType][]*types.MeasuredValue, 0) for _, measuredValue := range measuredValues { @@ -245,58 +159,27 @@ func (postgres *Postgres) AddMeasuredValues(ctx context.Context, measuredValues splittedMeasuredValues[measuredValue.ValueType] = append(splittedMeasuredValues[measuredValue.ValueType], measuredValue) } - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - // General insert function - insert := func(tx *sql.Tx, query string, measuredValues []*types.MeasuredValue) error { - stmt, err := tx.Prepare(query) - if err != nil { - return fmt.Errorf("Failed to prepare statement: %v", err) - } - defer stmt.Close() - - for _, measuredValue := range measuredValues { - - if measuredValue.CreationDate.Equal(time.Time{}) { - measuredValue.CreationDate = time.Now() - } - - _, err := stmt.Exec( - &measuredValue.ID, - &measuredValue.Value, - &measuredValue.Date, - &measuredValue.SensorID, - &measuredValue.CreationDate, - &measuredValue.UpdateDate, - ) - - if err != nil { - return fmt.Errorf("Failed to execute statement: %v", err) - } - } - - return nil - } - for measuredValueType, measuredValues := range splittedMeasuredValues { var queryFile string switch measuredValueType { case types.Humidity: - queryFile = insertHumiditySQL + queryFile = postgres.InsertHumiditySQL case types.Pressure: - queryFile = insertPressureSQL + queryFile = postgres.InsertPressureSQL case types.Temperature: - queryFile = insertTemperatureSQL + queryFile = postgres.InsertTemperatureSQL default: return fmt.Errorf("Measured value type %v not supported", measuredValueType) } - err := insert(tx, queryFile, measuredValues) + err := d.insertMeasuredValues(tx, queryFile, measuredValues...) if err != nil { return err } @@ -305,8 +188,38 @@ func (postgres *Postgres) AddMeasuredValues(ctx context.Context, measuredValues return tx.Commit() } +func (d *Postgres) insertMeasuredValues(tx *sql.Tx, query string, measuredValues ...*types.MeasuredValue) error { + stmt, err := tx.Prepare(query) + if err != nil { + return fmt.Errorf("Failed to prepare statement: %v", err) + } + defer stmt.Close() + + for _, measuredValue := range measuredValues { + + if measuredValue.CreationDate.Equal(time.Time{}) { + measuredValue.CreationDate = time.Now() + } + + _, err := stmt.Exec( + &measuredValue.ID, + &measuredValue.Value, + &measuredValue.Date, + &measuredValue.SensorID, + &measuredValue.CreationDate, + &measuredValue.UpdateDate, + ) + + if err != nil { + return fmt.Errorf("Failed to execute statement: %v", err) + } + } + + return nil +} + // AddOrUpdateMeasuredValues into the database -func (postgres *Postgres) AddOrUpdateMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error { +func (d *Postgres) AddOrUpdateMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error { splittedMeasuredValues := make(map[types.MeasuredValueType][]*types.MeasuredValue, 0) for _, measuredValue := range measuredValues { @@ -316,58 +229,27 @@ func (postgres *Postgres) AddOrUpdateMeasuredValues(ctx context.Context, measure splittedMeasuredValues[measuredValue.ValueType] = append(splittedMeasuredValues[measuredValue.ValueType], measuredValue) } - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - // General insert function - insert := func(tx *sql.Tx, query string, measuredValues []*types.MeasuredValue) error { - stmt, err := tx.Prepare(query) - if err != nil { - return fmt.Errorf("Failed to prepare statement: %v", err) - } - defer stmt.Close() - - for _, measuredValue := range measuredValues { - - if measuredValue.CreationDate.Equal(time.Time{}) { - measuredValue.CreationDate = time.Now() - } - - _, err := stmt.Exec( - &measuredValue.ID, - &measuredValue.Value, - &measuredValue.Date, - &measuredValue.SensorID, - &measuredValue.CreationDate, - &measuredValue.UpdateDate, - ) - - if err != nil { - return fmt.Errorf("Failed to execute statement: %v", err) - } - } - - return nil - } - for measuredValueType, measuredValues := range splittedMeasuredValues { var queryFile string switch measuredValueType { case types.Humidity: - queryFile = insertOrUpdateHumiditySQL + queryFile = postgres.InsertOrUpdateHumiditySQL case types.Pressure: - queryFile = insertOrUpdatePressureSQL + queryFile = postgres.InsertOrUpdatePressureSQL case types.Temperature: - queryFile = insertOrUpdateTemperatureSQL + queryFile = postgres.InsertOrUpdateTemperatureSQL default: return fmt.Errorf("Measured value type %v not supported", measuredValueType) } - err := insert(tx, queryFile, measuredValues) + err := d.insertOrUpdateMeasuredValues(tx, queryFile, measuredValues...) if err != nil { return err } @@ -376,15 +258,45 @@ func (postgres *Postgres) AddOrUpdateMeasuredValues(ctx context.Context, measure return tx.Commit() } +func (d *Postgres) insertOrUpdateMeasuredValues(tx *sql.Tx, query string, measuredValues ...*types.MeasuredValue) error { + stmt, err := tx.Prepare(query) + if err != nil { + return fmt.Errorf("Failed to prepare statement: %v", err) + } + defer stmt.Close() + + for _, measuredValue := range measuredValues { + + if measuredValue.CreationDate.Equal(time.Time{}) { + measuredValue.CreationDate = time.Now() + } + + _, err := stmt.Exec( + &measuredValue.ID, + &measuredValue.Value, + &measuredValue.Date, + &measuredValue.SensorID, + &measuredValue.CreationDate, + &measuredValue.UpdateDate, + ) + + if err != nil { + return fmt.Errorf("Failed to execute statement: %v", err) + } + } + + return nil +} + // AddSensors into the database -func (postgres *Postgres) AddSensors(ctx context.Context, sensors ...*types.Sensor) error { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) +func (d *Postgres) AddSensors(ctx context.Context, sensors ...*types.Sensor) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(insertSensorSQL) + stmt, err := tx.Prepare(postgres.InsertSensorSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -420,14 +332,23 @@ func (postgres *Postgres) AddSensors(ctx context.Context, sensors ...*types.Sens } // AddOrUpdateSensors into the database -func (postgres *Postgres) AddOrUpdateSensors(ctx context.Context, sensors ...*types.Sensor) error { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) +func (d *Postgres) AddOrUpdateSensors(ctx context.Context, sensors ...*types.Sensor) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(insertOrUpdateSensorSQL) + err = d.insertOrUpdateSensors(tx, sensors...) + if err != nil { + return err + } + + return tx.Commit() +} + +func (d *Postgres) insertOrUpdateSensors(tx *sql.Tx, sensors ...*types.Sensor) error { + stmt, err := tx.Prepare(postgres.InsertOrUpdateSensorSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -459,7 +380,7 @@ func (postgres *Postgres) AddOrUpdateSensors(ctx context.Context, sensors ...*ty } } - return tx.Commit() + return nil } // Close closes the database and prevents new queries from starting. Close then @@ -469,13 +390,13 @@ func (postgres *Postgres) Close() error { } // Migrate creates all required tables if not exist -func (postgres *Postgres) Migrate(ctx context.Context) error { - sourceDriver, err := iofs.New(ddlAssets, "ddl") +func (d *Postgres) Migrate(ctx context.Context) error { + sourceDriver, err := iofs.New(postgres.DDLAssets, postgres.DDLAssetPath) if err != nil { return err } - m, err := migrate.NewWithSourceInstance("iofs", sourceDriver, postgres.databaseURL.String()) + m, err := migrate.NewWithSourceInstance("iofs", sourceDriver, d.databaseURL.String()) if err != nil { return err } @@ -490,14 +411,14 @@ func (postgres *Postgres) Migrate(ctx context.Context) error { } // GetDevice from database -func (postgres *Postgres) GetDeviceByID(ctx context.Context, id string) (*types.Device, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetDeviceByID(ctx context.Context, id string) (*types.Device, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - devices, err := postgres.selectDevices(tx, selectDeviceByIDSQL, id) + devices, err := d.selectDevices(tx, postgres.SelectDeviceByIDSQL, id) if err != nil { return nil, err } @@ -515,14 +436,14 @@ func (postgres *Postgres) GetDeviceByID(ctx context.Context, id string) (*types. } // GetDevice from database -func (postgres *Postgres) GetDeviceByName(ctx context.Context, name string) (*types.Device, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetDeviceByName(ctx context.Context, name string) (*types.Device, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - devices, err := postgres.selectDevices(tx, selectDeviceByNameSQL, name) + devices, err := d.selectDevices(tx, postgres.SelectDeviceByNameSQL, name) if err != nil { return nil, err } @@ -540,14 +461,14 @@ func (postgres *Postgres) GetDeviceByName(ctx context.Context, name string) (*ty } // GetDevices from the database -func (postgres *Postgres) GetDevices(ctx context.Context) ([]*types.Device, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetDevices(ctx context.Context) ([]*types.Device, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - devices, err := postgres.selectDevices(tx, selectDevicesSQL) + devices, err := d.selectDevices(tx, postgres.SelectDevicesSQL) if err != nil { return nil, err } @@ -560,7 +481,7 @@ func (postgres *Postgres) GetDevices(ctx context.Context) ([]*types.Device, erro return devices, nil } -func (postgres *Postgres) selectDevices(tx *sql.Tx, query string, args ...interface{}) ([]*types.Device, error) { +func (d *Postgres) selectDevices(tx *sql.Tx, query string, args ...interface{}) ([]*types.Device, error) { stmt, err := tx.Prepare(query) if err != nil { return nil, fmt.Errorf("Failed to prepare statement: %v", err) @@ -592,14 +513,14 @@ func (postgres *Postgres) selectDevices(tx *sql.Tx, query string, args ...interf } // GetHumidity returns humidity from the database -func (postgres *Postgres) GetHumidityByID(ctx context.Context, id string) (*types.MeasuredValue, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetHumidityByID(ctx context.Context, id string) (*types.MeasuredValue, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := postgres.selectMeasuredValue(tx, selectHumidityByIDSQL, id) + measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectHumidityByIDSQL, id) if err != nil { return nil, err } @@ -621,14 +542,14 @@ func (postgres *Postgres) GetHumidityByID(ctx context.Context, id string) (*type } // GetHumidities returns humidities from the database -func (postgres *Postgres) GetHumidities(ctx context.Context) ([]*types.MeasuredValue, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetHumidities(ctx context.Context) ([]*types.MeasuredValue, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := postgres.selectMeasuredValue(tx, selectHumiditiesSQL) + measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectHumiditiesSQL) if err != nil { return nil, err } @@ -645,7 +566,7 @@ func (postgres *Postgres) GetHumidities(ctx context.Context) ([]*types.MeasuredV return measuredValues, nil } -func (postgres *Postgres) selectMeasuredValue(tx *sql.Tx, query string, args ...interface{}) ([]*types.MeasuredValue, error) { +func (d *Postgres) selectMeasuredValue(tx *sql.Tx, query string, args ...interface{}) ([]*types.MeasuredValue, error) { stmt, err := tx.Prepare(query) if err != nil { return nil, err @@ -680,14 +601,14 @@ func (postgres *Postgres) selectMeasuredValue(tx *sql.Tx, query string, args ... } // GetPressure returns pressure from the database -func (postgres *Postgres) GetPressureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetPressureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := postgres.selectMeasuredValue(tx, selectPressureByIDSQL, id) + measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectPressureByIDSQL, id) if err != nil { return nil, err } @@ -709,14 +630,14 @@ func (postgres *Postgres) GetPressureByID(ctx context.Context, id string) (*type } // GetPressures returns pressure from the database -func (postgres *Postgres) GetPressures(ctx context.Context) ([]*types.MeasuredValue, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetPressures(ctx context.Context) ([]*types.MeasuredValue, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := postgres.selectMeasuredValue(tx, selectPressuresSQL) + measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectPressuresSQL) if err != nil { return nil, err } @@ -734,14 +655,14 @@ func (postgres *Postgres) GetPressures(ctx context.Context) ([]*types.MeasuredVa } // GetSensor from database -func (postgres *Postgres) GetSensorByID(ctx context.Context, id string) (*types.Sensor, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetSensorByID(ctx context.Context, id string) (*types.Sensor, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - sensors, err := postgres.selectSensors(tx, selectSensorByIDSQL, id) + sensors, err := d.selectSensors(tx, postgres.SelectSensorByIDSQL, id) if err != nil { return nil, err } @@ -759,14 +680,14 @@ func (postgres *Postgres) GetSensorByID(ctx context.Context, id string) (*types. } // GetSensors from the database -func (postgres *Postgres) GetSensors(ctx context.Context) ([]*types.Sensor, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetSensors(ctx context.Context) ([]*types.Sensor, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - sensors, err := postgres.selectSensors(tx, selectSensorsSQL) + sensors, err := d.selectSensors(tx, postgres.SelectSensorsSQL) if err != nil { return nil, err } @@ -780,8 +701,8 @@ func (postgres *Postgres) GetSensors(ctx context.Context) ([]*types.Sensor, erro } // GetSensorsByModels from the database -func (postgres *Postgres) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs ...string) ([]*types.Sensor, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs ...string) ([]*types.Sensor, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } @@ -789,7 +710,7 @@ func (postgres *Postgres) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs . cachedSensors := make([]*types.Sensor, 0) for i := range deviceIDs { - sensors, err := postgres.selectSensors(tx, selectSensorsByDeviceIDSQL, deviceIDs[i]) + sensors, err := d.selectSensors(tx, postgres.SelectSensorsByDeviceIDSQL, deviceIDs[i]) if err != nil { return nil, err } @@ -806,15 +727,15 @@ func (postgres *Postgres) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs . } // GetSensorsByModel from the database -func (postgres *Postgres) GetSensorsByModels(ctx context.Context, sensorModels ...string) ([]*types.Sensor, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetSensorsByModels(ctx context.Context, sensorModels ...string) ([]*types.Sensor, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } cachedSensors := make([]*types.Sensor, 0) for i := range sensorModels { - sensors, err := postgres.selectSensors(tx, selectSensorsByModelSQL, sensorModels[i]) + sensors, err := d.selectSensors(tx, postgres.SelectSensorsByModelSQL, sensorModels[i]) if err != nil { return nil, err } @@ -831,8 +752,8 @@ func (postgres *Postgres) GetSensorsByModels(ctx context.Context, sensorModels . } // GetSensorsByModel from the database -func (postgres *Postgres) GetSensorsByNames(ctx context.Context, sensorNames ...string) ([]*types.Sensor, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetSensorsByNames(ctx context.Context, sensorNames ...string) ([]*types.Sensor, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } @@ -840,7 +761,7 @@ func (postgres *Postgres) GetSensorsByNames(ctx context.Context, sensorNames ... cachedSensors := make([]*types.Sensor, 0) for i := range sensorNames { - sensors, err := postgres.selectSensors(tx, selectSensorsByNameSQL, sensorNames[i]) + sensors, err := d.selectSensors(tx, postgres.SelectSensorsByNameSQL, sensorNames[i]) if err != nil { return nil, err } @@ -856,7 +777,7 @@ func (postgres *Postgres) GetSensorsByNames(ctx context.Context, sensorNames ... return cachedSensors, nil } -func (postgres *Postgres) selectSensors(tx *sql.Tx, query string, args ...interface{}) ([]*types.Sensor, error) { +func (d *Postgres) selectSensors(tx *sql.Tx, query string, args ...interface{}) ([]*types.Sensor, error) { stmt, err := tx.Prepare(query) if err != nil { return nil, fmt.Errorf("Failed to prepare statement: %v", err) @@ -897,14 +818,14 @@ func (postgres *Postgres) selectSensors(tx *sql.Tx, query string, args ...interf } // GetTemperature returns temperatures from the database -func (postgres *Postgres) GetTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := postgres.selectMeasuredValue(tx, selectTemperatureByIDSQL, id) + measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectTemperatureByIDSQL, id) if err != nil { return nil, err } @@ -926,14 +847,14 @@ func (postgres *Postgres) GetTemperatureByID(ctx context.Context, id string) (*t } // GetTemperatures returns temperatures from the database -func (postgres *Postgres) GetTemperatures(ctx context.Context) ([]*types.MeasuredValue, error) { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) +func (d *Postgres) GetTemperatures(ctx context.Context) ([]*types.MeasuredValue, error) { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := postgres.selectMeasuredValue(tx, selectTemperaturesSQL) + measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectTemperaturesSQL) if err != nil { return nil, err } @@ -950,15 +871,62 @@ func (postgres *Postgres) GetTemperatures(ctx context.Context) ([]*types.Measure return measuredValues, nil } +// Import imports devices, sensors and all measured values from a source +// repository. Existing entries will be updated. +func (d *Postgres) Import(ctx context.Context, src Repository) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + if err != nil { + return err + } + defer tx.Rollback() + + devices, err := src.GetDevices(ctx) + if err != nil { + return err + } + + err = d.insertOrUpdateDevices(tx, devices...) + if err != nil { + return err + } + + sensors, err := src.GetSensors(ctx) + err = d.insertOrUpdateSensors(tx, sensors...) + if err != nil { + return err + } + + for _, value := range []struct { + f func(ctx context.Context) ([]*types.MeasuredValue, error) + query string + }{ + {f: src.GetHumidities, query: postgres.InsertOrUpdateHumiditySQL}, + {f: src.GetPressures, query: postgres.InsertOrUpdatePressureSQL}, + {f: src.GetTemperatures, query: postgres.InsertOrUpdateTemperatureSQL}, + } { + measuredValues, err := value.f(ctx) + if err != nil { + return err + } + + err = d.insertOrUpdateMeasuredValues(tx, value.query, measuredValues...) + if err != nil { + return err + } + } + + return tx.Commit() +} + // RemoveDevices from the database -func (postgres *Postgres) RemoveDevicesByIDs(ctx context.Context, deviceIDs ...string) error { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) +func (d *Postgres) RemoveDevicesByIDs(ctx context.Context, deviceIDs ...string) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(deleteDeviceByIDSQL) + stmt, err := tx.Prepare(postgres.DeleteDeviceByIDSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -975,14 +943,14 @@ func (postgres *Postgres) RemoveDevicesByIDs(ctx context.Context, deviceIDs ...s } // RemoveDevices from the database -func (postgres *Postgres) RemoveDevicesByNames(ctx context.Context, names ...string) error { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) +func (d *Postgres) RemoveDevicesByNames(ctx context.Context, names ...string) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(deleteDeviceByNameSQL) + stmt, err := tx.Prepare(postgres.DeleteDeviceByNameSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -999,14 +967,14 @@ func (postgres *Postgres) RemoveDevicesByNames(ctx context.Context, names ...str } // RemoveSensors from the database -func (postgres *Postgres) RemoveSensorsByIDs(ctx context.Context, sensorIDs ...string) error { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) +func (d *Postgres) RemoveSensorsByIDs(ctx context.Context, sensorIDs ...string) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(deleteSensorByIDSQL) + stmt, err := tx.Prepare(postgres.DeleteSensorByIDSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -1023,14 +991,14 @@ func (postgres *Postgres) RemoveSensorsByIDs(ctx context.Context, sensorIDs ...s } // RemoveSensors from the database -func (postgres *Postgres) RemoveSensorsByNames(ctx context.Context, sensorIDs ...string) error { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) +func (d *Postgres) RemoveSensorsByNames(ctx context.Context, sensorIDs ...string) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(deleteSensorByNameSQL) + stmt, err := tx.Prepare(postgres.DeleteSensorByNameSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -1047,14 +1015,14 @@ func (postgres *Postgres) RemoveSensorsByNames(ctx context.Context, sensorIDs .. } // UpdateDevices updates a device in the database -func (postgres *Postgres) UpdateDevices(ctx context.Context, devices ...*types.Device) error { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) +func (d *Postgres) UpdateDevices(ctx context.Context, devices ...*types.Device) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return err } defer tx.Rollback() - stmt, err := tx.Prepare(updateDeviceSQL) + stmt, err := tx.Prepare(postgres.UpdateDeviceSQL) if err != nil { return err } @@ -1080,14 +1048,14 @@ func (postgres *Postgres) UpdateDevices(ctx context.Context, devices ...*types.D } // UpdateSensors updates a sensor in the database -func (postgres *Postgres) UpdateSensors(ctx context.Context, sensors ...*types.Sensor) error { - tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) +func (d *Postgres) UpdateSensors(ctx context.Context, sensors ...*types.Sensor) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return err } defer tx.Rollback() - stmt, err := tx.Prepare(updateSensorSQL) + stmt, err := tx.Prepare(postgres.UpdateSensorSQL) if err != nil { return err } diff --git a/pkg/repository/postgres/statements.go b/pkg/repository/postgres/statements.go new file mode 100644 index 0000000..c1dccd6 --- /dev/null +++ b/pkg/repository/postgres/statements.go @@ -0,0 +1,100 @@ +package postgres + +import "embed" + +var ( + DDLAssetPath = "ddl" + + //go:embed ddl/*.sql + DDLAssets embed.FS + + //go:embed dml/deleteDeviceByID.sql + DeleteDeviceByIDSQL string + + //go:embed dml/deleteDeviceByName.sql + DeleteDeviceByNameSQL string + + //go:embed dml/deleteSensorByID.sql + DeleteSensorByIDSQL string + + //go:embed dml/deleteSensorByName.sql + DeleteSensorByNameSQL string + + //go:embed dml/insertDevice.sql + InsertDeviceSQL string + + //go:embed dml/insertHumidity.sql + InsertHumiditySQL string + + //go:embed dml/insertOrUpdateDevice.sql + InsertOrUpdateDeviceSQL string + + //go:embed dml/insertOrUpdateHumidity.sql + InsertOrUpdateHumiditySQL string + + //go:embed dml/insertOrUpdatePressure.sql + InsertOrUpdatePressureSQL string + + //go:embed dml/insertOrUpdateSensor.sql + InsertOrUpdateSensorSQL string + + //go:embed dml/insertOrUpdateTemperature.sql + InsertOrUpdateTemperatureSQL string + + //go:embed dml/insertPressure.sql + InsertPressureSQL string + + //go:embed dml/insertSensor.sql + InsertSensorSQL string + + //go:embed dml/insertTemperature.sql + InsertTemperatureSQL string + + //go:embed dml/selectDeviceByID.sql + SelectDeviceByIDSQL string + + //go:embed dml/selectDeviceByName.sql + SelectDeviceByNameSQL string + + //go:embed dml/selectDevices.sql + SelectDevicesSQL string + + //go:embed dml/selectHumidities.sql + SelectHumiditiesSQL string + + //go:embed dml/selectHumidityByID.sql + SelectHumidityByIDSQL string + + //go:embed dml/selectPressureByID.sql + SelectPressureByIDSQL string + + //go:embed dml/selectPressures.sql + SelectPressuresSQL string + + //go:embed dml/selectSensorByID.sql + SelectSensorByIDSQL string + + //go:embed dml/selectSensors.sql + SelectSensorsSQL string + + //go:embed dml/selectSensorsByDeviceID.sql + SelectSensorsByDeviceIDSQL string + + //go:embed dml/selectSensorsByModel.sql + SelectSensorsByModelSQL string + + //go:embed dml/selectSensorsByName.sql + SelectSensorsByNameSQL string + + //go:embed dml/selectTemperatureByID.sql + SelectTemperatureByIDSQL string + + //go:embed dml/selectTemperatures.sql + SelectTemperaturesSQL string + + //go:embed dml/updateDevice.sql + UpdateDeviceSQL string + + //go:embed dml/updateSensor.sql + UpdateSensorSQL string +) diff --git a/pkg/repository/repository.go b/pkg/repository/repository.go index e91b80e..e8ae62c 100644 --- a/pkg/repository/repository.go +++ b/pkg/repository/repository.go @@ -5,8 +5,6 @@ import ( "fmt" "net/url" - "git.cryptic.systems/volker.raschek/flucky/pkg/repository/postgres" - "git.cryptic.systems/volker.raschek/flucky/pkg/repository/sqlite3" "git.cryptic.systems/volker.raschek/flucky/pkg/types" "git.cryptic.systems/volker.raschek/go-logger" ) @@ -33,6 +31,7 @@ type Repository interface { GetSensors(ctx context.Context) ([]*types.Sensor, error) GetTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error) GetTemperatures(ctx context.Context) ([]*types.MeasuredValue, error) + Import(ctx context.Context, src Repository) error Migrate(ctx context.Context) error RemoveDevicesByIDs(ctx context.Context, deviceIDs ...string) error RemoveDevicesByNames(ctx context.Context, names ...string) error @@ -46,7 +45,7 @@ type Repository interface { func New(databaseURL *url.URL, flogger logger.Logger) (Repository, error) { switch databaseURL.Scheme { case "postgres": - repo, err := postgres.New(postgres.Opts{ + repo, err := NewPostgres(PostgresOpts{ DatabaseURL: databaseURL, Logger: flogger, }) @@ -63,7 +62,7 @@ func New(databaseURL *url.URL, flogger logger.Logger) (Repository, error) { return repo, nil case "sqlite3": - repo, err := sqlite3.New(sqlite3.Opts{ + repo, err := NewSQLite(SQLiteOpts{ DatabaseURL: databaseURL, Logger: flogger, }) diff --git a/pkg/repository/sqlite3/sqlite.go b/pkg/repository/sqlite.go similarity index 58% rename from pkg/repository/sqlite3/sqlite.go rename to pkg/repository/sqlite.go index d87ddb6..7d87327 100644 --- a/pkg/repository/sqlite3/sqlite.go +++ b/pkg/repository/sqlite.go @@ -1,9 +1,8 @@ -package sqlite3 +package repository import ( "context" "database/sql" - "embed" "errors" "fmt" "net/url" @@ -11,113 +10,20 @@ import ( "path/filepath" "sync" + "git.cryptic.systems/volker.raschek/flucky/pkg/repository/postgres" + "git.cryptic.systems/volker.raschek/flucky/pkg/repository/sqlite3" "git.cryptic.systems/volker.raschek/flucky/pkg/types" "git.cryptic.systems/volker.raschek/go-logger" "github.com/golang-migrate/migrate/v4" "github.com/johejo/golang-migrate-extra/source/iofs" ) -var ( - //go:embed ddl/*.sql - ddlAssets embed.FS - - //go:embed dml/deleteDeviceByID.sql - deleteDeviceByIDSQL string - - //go:embed dml/deleteDeviceByName.sql - deleteDeviceByNameSQL string - - //go:embed dml/deleteSensorByID.sql - deleteSensorByIDSQL string - - //go:embed dml/deleteSensorByName.sql - deleteSensorByNameSQL string - - //go:embed dml/insertDevice.sql - insertDeviceSQL string - - //go:embed dml/insertHumidity.sql - insertHumiditySQL string - - //go:embed dml/insertOrUpdateDevice.sql - insertOrUpdateDeviceSQL string - - //go:embed dml/insertOrUpdateHumidity.sql - insertOrUpdateHumiditySQL string - - //go:embed dml/insertOrUpdatePressure.sql - insertOrUpdatePressureSQL string - - //go:embed dml/insertOrUpdateSensor.sql - insertOrUpdateSensorSQL string - - //go:embed dml/insertOrUpdateTemperature.sql - insertOrUpdateTemperatureSQL string - - //go:embed dml/insertPressure.sql - insertPressureSQL string - - //go:embed dml/insertSensor.sql - insertSensorSQL string - - //go:embed dml/insertTemperature.sql - insertTemperatureSQL string - - //go:embed dml/selectDeviceByID.sql - selectDeviceByIDSQL string - - //go:embed dml/selectDeviceByName.sql - selectDeviceByNameSQL string - - //go:embed dml/selectDevices.sql - selectDevicesSQL string - - //go:embed dml/selectHumidities.sql - selectHumiditiesSQL string - - //go:embed dml/selectHumidityByID.sql - selectHumidityByIDSQL string - - //go:embed dml/selectPressureByID.sql - selectPressureByIDSQL string - - //go:embed dml/selectPressures.sql - selectPressuresSQL string - - //go:embed dml/selectSensorByID.sql - selectSensorByIDSQL string - - //go:embed dml/selectSensors.sql - selectSensorsSQL string - - //go:embed dml/selectSensorsByDeviceID.sql - selectSensorsByDeviceIDSQL string - - //go:embed dml/selectSensorsByModel.sql - selectSensorsByModelSQL string - - //go:embed dml/selectSensorsByName.sql - selectSensorsByNameSQL string - - //go:embed dml/selectTemperatureByID.sql - selectTemperatureByIDSQL string - - //go:embed dml/selectTemperatures.sql - selectTemperaturesSQL string - - //go:embed dml/updateDevice.sql - updateDeviceSQL string - - //go:embed dml/updateSensor.sql - updateSensorSQL string -) - -type Opts struct { +type SQLiteOpts struct { DatabaseURL *url.URL Logger logger.Logger } -func (o *Opts) Validate() error { +func (o *SQLiteOpts) Validate() error { for k, v := range map[string]interface{}{ "DatabaseURL": o.DatabaseURL, "Logger": o.Logger, @@ -134,7 +40,7 @@ func (o *Opts) Validate() error { return nil } -func New(opts Opts) (*SQLite, error) { +func NewSQLite(opts SQLiteOpts) (Repository, error) { if err := opts.Validate(); err != nil { return nil, err } @@ -175,17 +81,26 @@ type SQLite struct { } // AddDevices into the database -func (sqlite *SQLite) AddDevices(ctx context.Context, devices ...*types.Device) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) AddDevices(ctx context.Context, devices ...*types.Device) error { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(insertDeviceSQL) + err = d.insertDevices(tx, devices...) + if err != nil { + return err + } + + return tx.Commit() +} + +func (d *SQLite) insertDevices(tx *sql.Tx, devices ...*types.Device) error { + stmt, err := tx.Prepare(sqlite3.InsertDeviceSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -198,21 +113,30 @@ func (sqlite *SQLite) AddDevices(ctx context.Context, devices ...*types.Device) } } - return tx.Commit() + return nil } // AddOrUpdateDevices into the database -func (sqlite *SQLite) AddOrUpdateDevices(ctx context.Context, devices ...*types.Device) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) AddOrUpdateDevices(ctx context.Context, devices ...*types.Device) error { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(insertOrUpdateDeviceSQL) + err = d.insertOrUpdateDevices(tx, devices...) + if err != nil { + return err + } + + return tx.Commit() +} + +func (d *SQLite) insertOrUpdateDevices(tx *sql.Tx, devices ...*types.Device) error { + stmt, err := tx.Prepare(sqlite3.InsertOrUpdateDeviceSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -225,13 +149,13 @@ func (sqlite *SQLite) AddOrUpdateDevices(ctx context.Context, devices ...*types. } } - return tx.Commit() + return nil } // AddMeasuredValues into the database -func (sqlite *SQLite) AddMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) AddMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error { + d.mutex.Lock() + defer d.mutex.Unlock() splittedMeasuredValues := make(map[types.MeasuredValueType][]*types.MeasuredValue, 0) @@ -242,53 +166,27 @@ func (sqlite *SQLite) AddMeasuredValues(ctx context.Context, measuredValues ...* splittedMeasuredValues[measuredValue.ValueType] = append(splittedMeasuredValues[measuredValue.ValueType], measuredValue) } - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - // General insert function - insert := func(tx *sql.Tx, query string, measuredValues []*types.MeasuredValue) error { - stmt, err := tx.Prepare(query) - if err != nil { - return fmt.Errorf("Failed to prepare statement: %v", err) - } - defer stmt.Close() - - for _, measuredValue := range measuredValues { - _, err := stmt.Exec( - &measuredValue.ID, - &measuredValue.Value, - &measuredValue.Date, - &measuredValue.SensorID, - &measuredValue.CreationDate, - &measuredValue.UpdateDate, - ) - - if err != nil { - return fmt.Errorf("Failed to execute statement: %v", err) - } - } - - return nil - } - for measuredValueType, measuredValues := range splittedMeasuredValues { var query string switch measuredValueType { case types.Humidity: - query = insertHumiditySQL + query = sqlite3.InsertHumiditySQL case types.Pressure: - query = insertPressureSQL + query = sqlite3.InsertPressureSQL case types.Temperature: - query = insertTemperatureSQL + query = sqlite3.InsertTemperatureSQL default: return fmt.Errorf("Measured value type %v not supported", measuredValueType) } - err := insert(tx, query, measuredValues) + err := d.insertMeasuredValues(tx, query, measuredValues...) if err != nil { return err } @@ -297,10 +195,35 @@ func (sqlite *SQLite) AddMeasuredValues(ctx context.Context, measuredValues ...* return tx.Commit() } +func (d *SQLite) insertMeasuredValues(tx *sql.Tx, query string, measuredValues ...*types.MeasuredValue) error { + stmt, err := tx.Prepare(query) + if err != nil { + return fmt.Errorf("Failed to prepare statement: %v", err) + } + defer stmt.Close() + + for _, measuredValue := range measuredValues { + _, err := stmt.Exec( + &measuredValue.ID, + &measuredValue.Value, + &measuredValue.Date, + &measuredValue.SensorID, + &measuredValue.CreationDate, + &measuredValue.UpdateDate, + ) + + if err != nil { + return fmt.Errorf("Failed to execute statement: %v", err) + } + } + + return nil +} + // AddOrUpdateMeasuredValues into the database -func (sqlite *SQLite) AddOrUpdateMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) AddOrUpdateMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error { + d.mutex.Lock() + defer d.mutex.Unlock() splittedMeasuredValues := make(map[types.MeasuredValueType][]*types.MeasuredValue, 0) @@ -311,54 +234,27 @@ func (sqlite *SQLite) AddOrUpdateMeasuredValues(ctx context.Context, measuredVal splittedMeasuredValues[measuredValue.ValueType] = append(splittedMeasuredValues[measuredValue.ValueType], measuredValue) } - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - // General insert function - insert := func(tx *sql.Tx, query string, measuredValues []*types.MeasuredValue) error { - - stmt, err := tx.Prepare(query) - if err != nil { - return fmt.Errorf("Failed to prepare statement: %v", err) - } - defer stmt.Close() - - for _, measuredValue := range measuredValues { - _, err := stmt.Exec( - &measuredValue.ID, - &measuredValue.Value, - &measuredValue.Date, - &measuredValue.SensorID, - &measuredValue.CreationDate, - &measuredValue.UpdateDate, - ) - - if err != nil { - return fmt.Errorf("Failed to execute statement: %v", err) - } - } - - return nil - } - for measuredValueType, measuredValues := range splittedMeasuredValues { - var queryFile string + var query string switch measuredValueType { case types.Humidity: - queryFile = insertOrUpdateHumiditySQL + query = sqlite3.InsertOrUpdateHumiditySQL case types.Pressure: - queryFile = insertOrUpdatePressureSQL + query = sqlite3.InsertOrUpdatePressureSQL case types.Temperature: - queryFile = insertOrUpdateTemperatureSQL + query = sqlite3.InsertOrUpdateTemperatureSQL default: return fmt.Errorf("Measured value type %v not supported", measuredValueType) } - err := insert(tx, queryFile, measuredValues) + err := d.insertOrUpdateMeasuredValues(tx, query, measuredValues...) if err != nil { return err } @@ -367,18 +263,43 @@ func (sqlite *SQLite) AddOrUpdateMeasuredValues(ctx context.Context, measuredVal return tx.Commit() } -// AddSensors into the database -func (sqlite *SQLite) AddSensors(ctx context.Context, sensors ...*types.Sensor) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) insertOrUpdateMeasuredValues(tx *sql.Tx, query string, measuredValues ...*types.MeasuredValue) error { + stmt, err := tx.Prepare(query) + if err != nil { + return fmt.Errorf("Failed to prepare statement: %v", err) + } + defer stmt.Close() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + for _, measuredValue := range measuredValues { + _, err := stmt.Exec( + &measuredValue.ID, + &measuredValue.Value, + &measuredValue.Date, + &measuredValue.SensorID, + &measuredValue.CreationDate, + &measuredValue.UpdateDate, + ) + + if err != nil { + return fmt.Errorf("Failed to execute statement: %v", err) + } + } + + return nil +} + +// AddSensors into the database +func (d *SQLite) AddSensors(ctx context.Context, sensors ...*types.Sensor) error { + d.mutex.Lock() + defer d.mutex.Unlock() + + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(insertSensorSQL) + stmt, err := tx.Prepare(sqlite3.InsertSensorSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -409,17 +330,26 @@ func (sqlite *SQLite) AddSensors(ctx context.Context, sensors ...*types.Sensor) } // AddOrUpdateSensors into the database -func (sqlite *SQLite) AddOrUpdateSensors(ctx context.Context, sensors ...*types.Sensor) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) AddOrUpdateSensors(ctx context.Context, sensors ...*types.Sensor) error { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(insertOrUpdateSensorSQL) + err = d.insertOrUpdateSensors(tx, sensors...) + if err != nil { + return err + } + + return tx.Commit() +} + +func (d *SQLite) insertOrUpdateSensors(tx *sql.Tx, sensors ...*types.Sensor) error { + stmt, err := tx.Prepare(sqlite3.InsertOrUpdateSensorSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -446,28 +376,28 @@ func (sqlite *SQLite) AddOrUpdateSensors(ctx context.Context, sensors ...*types. } } - return tx.Commit() + return nil } // Close closes the database and prevents new queries from starting. Close then // waits for all queries that have started processing on the server to finish. -func (sqlite *SQLite) Close() error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() - return sqlite.dbo.Close() +func (d *SQLite) Close() error { + d.mutex.Lock() + defer d.mutex.Unlock() + return d.dbo.Close() } // Migrate creates all required tables if not exist -func (sqlite *SQLite) Migrate(ctx context.Context) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) Migrate(ctx context.Context) error { + d.mutex.Lock() + defer d.mutex.Unlock() - sourceDriver, err := iofs.New(ddlAssets, "ddl") + sourceDriver, err := iofs.New(sqlite3.DDLAssets, sqlite3.DDLAssetPath) if err != nil { return err } - m, err := migrate.NewWithSourceInstance("iofs", sourceDriver, sqlite.databaseURL.String()) + m, err := migrate.NewWithSourceInstance("iofs", sourceDriver, d.databaseURL.String()) if err != nil { return err } @@ -482,17 +412,17 @@ func (sqlite *SQLite) Migrate(ctx context.Context) error { } // GetDevice from database -func (sqlite *SQLite) GetDeviceByID(ctx context.Context, id string) (*types.Device, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetDeviceByID(ctx context.Context, id string) (*types.Device, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - devices, err := sqlite.selectDevices(tx, selectDeviceByIDSQL, id) + devices, err := d.selectDevices(tx, sqlite3.SelectDeviceByIDSQL, id) if err != nil { return nil, err } @@ -510,17 +440,17 @@ func (sqlite *SQLite) GetDeviceByID(ctx context.Context, id string) (*types.Devi } // GetDevice from database -func (sqlite *SQLite) GetDeviceByName(ctx context.Context, name string) (*types.Device, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetDeviceByName(ctx context.Context, name string) (*types.Device, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - devices, err := sqlite.selectDevices(tx, selectDeviceByNameSQL, name) + devices, err := d.selectDevices(tx, sqlite3.SelectDeviceByNameSQL, name) if err != nil { return nil, err } @@ -538,17 +468,17 @@ func (sqlite *SQLite) GetDeviceByName(ctx context.Context, name string) (*types. } // GetDevices from the database -func (sqlite *SQLite) GetDevices(ctx context.Context) ([]*types.Device, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetDevices(ctx context.Context) ([]*types.Device, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - devices, err := sqlite.selectDevices(tx, selectDevicesSQL) + devices, err := d.selectDevices(tx, sqlite3.SelectDevicesSQL) if err != nil { return nil, err } @@ -561,7 +491,7 @@ func (sqlite *SQLite) GetDevices(ctx context.Context) ([]*types.Device, error) { return devices, nil } -func (sqlite *SQLite) selectDevices(tx *sql.Tx, query string, args ...interface{}) ([]*types.Device, error) { +func (d *SQLite) selectDevices(tx *sql.Tx, query string, args ...interface{}) ([]*types.Device, error) { stmt, err := tx.Prepare(query) if err != nil { return nil, fmt.Errorf("Failed to prepare statement: %v", err) @@ -594,17 +524,17 @@ func (sqlite *SQLite) selectDevices(tx *sql.Tx, query string, args ...interface{ } // GetHumidity returns humidity from the database -func (sqlite *SQLite) GetHumidityByID(ctx context.Context, id string) (*types.MeasuredValue, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetHumidityByID(ctx context.Context, id string) (*types.MeasuredValue, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := sqlite.selectMeasuredValue(tx, selectHumidityByIDSQL, id) + measuredValues, err := d.selectMeasuredValue(tx, sqlite3.SelectHumidityByIDSQL, id) if err != nil { return nil, err } @@ -626,17 +556,17 @@ func (sqlite *SQLite) GetHumidityByID(ctx context.Context, id string) (*types.Me } // GetHumidities returns humidities from the database -func (sqlite *SQLite) GetHumidities(ctx context.Context) ([]*types.MeasuredValue, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetHumidities(ctx context.Context) ([]*types.MeasuredValue, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := sqlite.selectMeasuredValue(tx, selectHumiditiesSQL) + measuredValues, err := d.selectMeasuredValue(tx, sqlite3.SelectHumiditiesSQL) if err != nil { return nil, err } @@ -653,7 +583,7 @@ func (sqlite *SQLite) GetHumidities(ctx context.Context) ([]*types.MeasuredValue return measuredValues, nil } -func (sqlite *SQLite) selectMeasuredValue(tx *sql.Tx, query string, args ...interface{}) ([]*types.MeasuredValue, error) { +func (d *SQLite) selectMeasuredValue(tx *sql.Tx, query string, args ...interface{}) ([]*types.MeasuredValue, error) { stmt, err := tx.Prepare(query) if err != nil { return nil, err @@ -689,17 +619,17 @@ func (sqlite *SQLite) selectMeasuredValue(tx *sql.Tx, query string, args ...inte } // GetPressure returns pressure from the database -func (sqlite *SQLite) GetPressureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetPressureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := sqlite.selectMeasuredValue(tx, selectPressureByIDSQL, id) + measuredValues, err := d.selectMeasuredValue(tx, sqlite3.SelectPressureByIDSQL, id) if err != nil { return nil, err } @@ -721,17 +651,17 @@ func (sqlite *SQLite) GetPressureByID(ctx context.Context, id string) (*types.Me } // GetPressures returns pressure from the database -func (sqlite *SQLite) GetPressures(ctx context.Context) ([]*types.MeasuredValue, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetPressures(ctx context.Context) ([]*types.MeasuredValue, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := sqlite.selectMeasuredValue(tx, selectPressuresSQL) + measuredValues, err := d.selectMeasuredValue(tx, sqlite3.SelectPressuresSQL) if err != nil { return nil, err } @@ -749,17 +679,17 @@ func (sqlite *SQLite) GetPressures(ctx context.Context) ([]*types.MeasuredValue, } // GetSensor from database -func (sqlite *SQLite) GetSensorByID(ctx context.Context, id string) (*types.Sensor, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetSensorByID(ctx context.Context, id string) (*types.Sensor, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - sensors, err := sqlite.selectSensors(tx, selectSensorByIDSQL, id) + sensors, err := d.selectSensors(tx, sqlite3.SelectSensorByIDSQL, id) if err != nil { return nil, err } @@ -777,17 +707,17 @@ func (sqlite *SQLite) GetSensorByID(ctx context.Context, id string) (*types.Sens } // GetSensors from the database -func (sqlite *SQLite) GetSensors(ctx context.Context) ([]*types.Sensor, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetSensors(ctx context.Context) ([]*types.Sensor, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - sensors, err := sqlite.selectSensors(tx, selectSensorsSQL) + sensors, err := d.selectSensors(tx, sqlite3.SelectSensorsSQL) if err != nil { return nil, err } @@ -801,11 +731,11 @@ func (sqlite *SQLite) GetSensors(ctx context.Context) ([]*types.Sensor, error) { } // GetSensorsByModels from the database -func (sqlite *SQLite) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs ...string) ([]*types.Sensor, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs ...string) ([]*types.Sensor, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } @@ -813,7 +743,7 @@ func (sqlite *SQLite) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs ...st cachedSensors := make([]*types.Sensor, 0) for i := range deviceIDs { - sensors, err := sqlite.selectSensors(tx, selectSensorsByDeviceIDSQL, deviceIDs[i]) + sensors, err := d.selectSensors(tx, sqlite3.SelectSensorsByDeviceIDSQL, deviceIDs[i]) if err != nil { return nil, err } @@ -830,11 +760,11 @@ func (sqlite *SQLite) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs ...st } // GetSensorsByModels from the database -func (sqlite *SQLite) GetSensorsByModels(ctx context.Context, sensorModels ...string) ([]*types.Sensor, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetSensorsByModels(ctx context.Context, sensorModels ...string) ([]*types.Sensor, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } @@ -842,7 +772,7 @@ func (sqlite *SQLite) GetSensorsByModels(ctx context.Context, sensorModels ...st cachedSensors := make([]*types.Sensor, 0) for i := range sensorModels { - sensors, err := sqlite.selectSensors(tx, selectSensorsByModelSQL, sensorModels[i]) + sensors, err := d.selectSensors(tx, sqlite3.SelectSensorsByModelSQL, sensorModels[i]) if err != nil { return nil, err } @@ -859,11 +789,11 @@ func (sqlite *SQLite) GetSensorsByModels(ctx context.Context, sensorModels ...st } // GetSensorsByModels from the database -func (sqlite *SQLite) GetSensorsByNames(ctx context.Context, sensorNames ...string) ([]*types.Sensor, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetSensorsByNames(ctx context.Context, sensorNames ...string) ([]*types.Sensor, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } @@ -871,7 +801,7 @@ func (sqlite *SQLite) GetSensorsByNames(ctx context.Context, sensorNames ...stri cachedSensors := make([]*types.Sensor, 0) for i := range sensorNames { - sensors, err := sqlite.selectSensors(tx, selectSensorsByNameSQL, sensorNames[i]) + sensors, err := d.selectSensors(tx, sqlite3.SelectSensorsByNameSQL, sensorNames[i]) if err != nil { return nil, err } @@ -887,7 +817,7 @@ func (sqlite *SQLite) GetSensorsByNames(ctx context.Context, sensorNames ...stri return cachedSensors, nil } -func (sqlite *SQLite) selectSensors(tx *sql.Tx, query string, args ...interface{}) ([]*types.Sensor, error) { +func (d *SQLite) selectSensors(tx *sql.Tx, query string, args ...interface{}) ([]*types.Sensor, error) { stmt, err := tx.Prepare(query) if err != nil { return nil, fmt.Errorf("Failed to prepare statement: %v", err) @@ -929,17 +859,17 @@ func (sqlite *SQLite) selectSensors(tx *sql.Tx, query string, args ...interface{ } // GetTemperature returns temperatures from the database -func (sqlite *SQLite) GetTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := sqlite.selectMeasuredValue(tx, selectTemperatureByIDSQL, id) + measuredValues, err := d.selectMeasuredValue(tx, sqlite3.SelectTemperatureByIDSQL, id) if err != nil { return nil, err } @@ -961,17 +891,17 @@ func (sqlite *SQLite) GetTemperatureByID(ctx context.Context, id string) (*types } // GetTemperatures returns temperatures from the database -func (sqlite *SQLite) GetTemperatures(ctx context.Context) ([]*types.MeasuredValue, error) { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) GetTemperatures(ctx context.Context) ([]*types.MeasuredValue, error) { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, err } defer tx.Rollback() - measuredValues, err := sqlite.selectMeasuredValue(tx, selectTemperaturesSQL) + measuredValues, err := d.selectMeasuredValue(tx, sqlite3.SelectTemperaturesSQL) if err != nil { return nil, err } @@ -988,18 +918,65 @@ func (sqlite *SQLite) GetTemperatures(ctx context.Context) ([]*types.MeasuredVal return measuredValues, nil } -// RemoveDevices from the database -func (sqlite *SQLite) RemoveDevicesByIDs(ctx context.Context, deviceIDs ...string) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +// Import imports devices, sensors and all measured values from a source +// repository. Existing entries will be updated. +func (d *SQLite) Import(ctx context.Context, src Repository) error { + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + if err != nil { + return err + } + defer tx.Rollback() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + devices, err := src.GetDevices(ctx) + if err != nil { + return err + } + + err = d.insertOrUpdateDevices(tx, devices...) + if err != nil { + return err + } + + sensors, err := src.GetSensors(ctx) + err = d.insertOrUpdateSensors(tx, sensors...) + if err != nil { + return err + } + + for _, value := range []struct { + f func(ctx context.Context) ([]*types.MeasuredValue, error) + query string + }{ + {f: src.GetHumidities, query: postgres.InsertOrUpdateHumiditySQL}, + {f: src.GetPressures, query: postgres.InsertOrUpdatePressureSQL}, + {f: src.GetTemperatures, query: postgres.InsertOrUpdateTemperatureSQL}, + } { + measuredValues, err := value.f(ctx) + if err != nil { + return err + } + + err = d.insertOrUpdateMeasuredValues(tx, value.query, measuredValues...) + if err != nil { + return err + } + } + + return tx.Commit() +} + +// RemoveDevices from the database +func (d *SQLite) RemoveDevicesByIDs(ctx context.Context, deviceIDs ...string) error { + d.mutex.Lock() + defer d.mutex.Unlock() + + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(deleteDeviceByIDSQL) + stmt, err := tx.Prepare(sqlite3.DeleteDeviceByIDSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -1016,17 +993,17 @@ func (sqlite *SQLite) RemoveDevicesByIDs(ctx context.Context, deviceIDs ...strin } // RemoveDevices from the database -func (sqlite *SQLite) RemoveDevicesByNames(ctx context.Context, names ...string) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) RemoveDevicesByNames(ctx context.Context, names ...string) error { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(deleteDeviceByNameSQL) + stmt, err := tx.Prepare(sqlite3.DeleteDeviceByNameSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -1043,17 +1020,17 @@ func (sqlite *SQLite) RemoveDevicesByNames(ctx context.Context, names ...string) } // RemoveSensors from the database -func (sqlite *SQLite) RemoveSensorsByIDs(ctx context.Context, sensorIDs ...string) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) RemoveSensorsByIDs(ctx context.Context, sensorIDs ...string) error { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(deleteSensorByIDSQL) + stmt, err := tx.Prepare(sqlite3.DeleteSensorByIDSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -1070,17 +1047,17 @@ func (sqlite *SQLite) RemoveSensorsByIDs(ctx context.Context, sensorIDs ...strin } // RemoveSensors from the database -func (sqlite *SQLite) RemoveSensorsByNames(ctx context.Context, names ...string) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) RemoveSensorsByNames(ctx context.Context, names ...string) error { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } defer tx.Rollback() - stmt, err := tx.Prepare(deleteSensorByNameSQL) + stmt, err := tx.Prepare(sqlite3.DeleteSensorByNameSQL) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } @@ -1097,17 +1074,17 @@ func (sqlite *SQLite) RemoveSensorsByNames(ctx context.Context, names ...string) } // UpdateDevices updates a device in the database -func (sqlite *SQLite) UpdateDevices(ctx context.Context, devices ...*types.Device) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) UpdateDevices(ctx context.Context, devices ...*types.Device) error { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return err } defer tx.Rollback() - stmt, err := tx.Prepare(updateDeviceSQL) + stmt, err := tx.Prepare(sqlite3.UpdateDeviceSQL) if err != nil { return err } @@ -1130,17 +1107,17 @@ func (sqlite *SQLite) UpdateDevices(ctx context.Context, devices ...*types.Devic } // UpdateSensors updates a sensor in the database -func (sqlite *SQLite) UpdateSensors(ctx context.Context, sensors ...*types.Sensor) error { - sqlite.mutex.Lock() - defer sqlite.mutex.Unlock() +func (d *SQLite) UpdateSensors(ctx context.Context, sensors ...*types.Sensor) error { + d.mutex.Lock() + defer d.mutex.Unlock() - tx, err := sqlite.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) + tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return err } defer tx.Rollback() - stmt, err := tx.Prepare(updateSensorSQL) + stmt, err := tx.Prepare(sqlite3.UpdateSensorSQL) if err != nil { return err } diff --git a/pkg/repository/sqlite3/statements.go b/pkg/repository/sqlite3/statements.go new file mode 100644 index 0000000..9c0d625 --- /dev/null +++ b/pkg/repository/sqlite3/statements.go @@ -0,0 +1,100 @@ +package sqlite3 + +import "embed" + +var ( + DDLAssetPath = "ddl" + + //go:embed ddl/*.sql + DDLAssets embed.FS + + //go:embed dml/deleteDeviceByID.sql + DeleteDeviceByIDSQL string + + //go:embed dml/deleteDeviceByName.sql + DeleteDeviceByNameSQL string + + //go:embed dml/deleteSensorByID.sql + DeleteSensorByIDSQL string + + //go:embed dml/deleteSensorByName.sql + DeleteSensorByNameSQL string + + //go:embed dml/insertDevice.sql + InsertDeviceSQL string + + //go:embed dml/insertHumidity.sql + InsertHumiditySQL string + + //go:embed dml/insertOrUpdateDevice.sql + InsertOrUpdateDeviceSQL string + + //go:embed dml/insertOrUpdateHumidity.sql + InsertOrUpdateHumiditySQL string + + //go:embed dml/insertOrUpdatePressure.sql + InsertOrUpdatePressureSQL string + + //go:embed dml/insertOrUpdateSensor.sql + InsertOrUpdateSensorSQL string + + //go:embed dml/insertOrUpdateTemperature.sql + InsertOrUpdateTemperatureSQL string + + //go:embed dml/insertPressure.sql + InsertPressureSQL string + + //go:embed dml/insertSensor.sql + InsertSensorSQL string + + //go:embed dml/insertTemperature.sql + InsertTemperatureSQL string + + //go:embed dml/selectDeviceByID.sql + SelectDeviceByIDSQL string + + //go:embed dml/selectDeviceByName.sql + SelectDeviceByNameSQL string + + //go:embed dml/selectDevices.sql + SelectDevicesSQL string + + //go:embed dml/selectHumidities.sql + SelectHumiditiesSQL string + + //go:embed dml/selectHumidityByID.sql + SelectHumidityByIDSQL string + + //go:embed dml/selectPressureByID.sql + SelectPressureByIDSQL string + + //go:embed dml/selectPressures.sql + SelectPressuresSQL string + + //go:embed dml/selectSensorByID.sql + SelectSensorByIDSQL string + + //go:embed dml/selectSensors.sql + SelectSensorsSQL string + + //go:embed dml/selectSensorsByDeviceID.sql + SelectSensorsByDeviceIDSQL string + + //go:embed dml/selectSensorsByModel.sql + SelectSensorsByModelSQL string + + //go:embed dml/selectSensorsByName.sql + SelectSensorsByNameSQL string + + //go:embed dml/selectTemperatureByID.sql + SelectTemperatureByIDSQL string + + //go:embed dml/selectTemperatures.sql + SelectTemperaturesSQL string + + //go:embed dml/updateDevice.sql + UpdateDeviceSQL string + + //go:embed dml/updateSensor.sql + UpdateSensorSQL string +)