feat: import from sqlite or postgres

This commit is contained in:
Markus Pesch 2021-04-09 16:29:09 +02:00
parent 8c2090a316
commit 749f2697c7
Signed by: volker.raschek
GPG Key ID: 852BCC170D81A982
6 changed files with 772 additions and 608 deletions

View File

@ -1,6 +1,13 @@
package imp
import (
"context"
"fmt"
"net/url"
"git.cryptic.systems/volker.raschek/flucky/pkg/config"
"git.cryptic.systems/volker.raschek/flucky/pkg/repository"
"git.cryptic.systems/volker.raschek/go-logger"
"github.com/spf13/cobra"
)
@ -15,61 +22,74 @@ func InitCmd(cmd *cobra.Command) error {
importCmd := &cobra.Command{
Use: "import",
Args: cobra.RangeArgs(1, 2),
Short: "Import data from passed URL",
RunE: importSources,
Example: `import sqlite3:///var/cache/flucky/sqlite3.db
import sqlite3:///var/cache/flucky/sqlite3.db postgres://user:password@host:port/database?sslmode=disable`,
RunE: importSources,
}
importCmd.Flags().BoolVar(&importSensors, "sensors", true, "Import sensors")
importCmd.Flags().BoolVar(&importHumidities, "humidities", true, "Import humidities")
importCmd.Flags().BoolVar(&importPressures, "pressures", true, "Import pressures")
importCmd.Flags().BoolVar(&importTemperatures, "temperatures", true, "Import temperatures")
cmd.AddCommand(importCmd)
return nil
}
func importSources(cmd *cobra.Command, args []string) error {
// configFile, err := cmd.Flags().GetString("config")
// if err != nil {
// return fmt.Errorf("No config file defined")
// }
// cnf, err := config.Read(configFile)
// if err != nil {
// return err
// }
configFile, err := cmd.Flags().GetString("config")
if err != nil {
return fmt.Errorf("No config file defined")
}
// destURL, err := url.Parse(cnf.DSN)
// if err != nil {
// return err
// }
cnf, err := config.Read(configFile)
if err != nil {
return err
}
// logLevelString, err := cmd.Flags().GetString("loglevel")
// if err != nil {
// return err
// }
logLevelString, err := cmd.Flags().GetString("loglevel")
if err != nil {
return err
}
// logLevel, err := logger.ParseLogLevel(logLevelString)
// if err != nil {
// return err
// }
logLevel, err := logger.ParseLogLevel(logLevelString)
if err != nil {
return err
}
// flogger := logger.NewLogger(logLevel)
flogger := logger.NewLogger(logLevel)
// sourceURL, err := url.Parse(args[0])
// if err != nil {
// return fmt.Errorf("Failed to parse source url: %w", err)
// }
var (
srcURL *url.URL
destURL *url.URL
)
// err = repository.Import(sourceURL, destURL, flogger, repository.OptImport{
// Sensors: importSensors,
// Humidities: importHumidities,
// Pressures: importPressures,
// Temperatures: importTemperatures,
// })
// if err != nil {
// return fmt.Errorf("Failed to import: %w", err)
// }
srcURL, err = url.Parse(args[0])
if err != nil {
return err
}
return nil
switch len(args) {
case 1:
destURL, err = url.Parse(cnf.DSN)
if err != nil {
return err
}
case 2:
destURL, err = url.Parse(args[1])
if err != nil {
return err
}
}
srcRepository, err := repository.New(srcURL, flogger)
if err != nil {
return err
}
destRepository, err := repository.New(destURL, flogger)
if err != nil {
return err
}
return destRepository.Import(context.Background(), srcRepository)
}

View File

@ -1,121 +1,26 @@
package postgres
package repository
import (
"context"
"database/sql"
"embed"
"errors"
"fmt"
"net/url"
"time"
"git.cryptic.systems/volker.raschek/flucky/pkg/repository/postgres"
"git.cryptic.systems/volker.raschek/flucky/pkg/types"
"git.cryptic.systems/volker.raschek/go-logger"
"github.com/golang-migrate/migrate/v4"
"github.com/johejo/golang-migrate-extra/source/iofs"
)
var (
//go:embed ddl/*.sql
ddlAssets embed.FS
//go:embed dml/deleteDeviceByID.sql
deleteDeviceByIDSQL string
//go:embed dml/deleteDeviceByName.sql
deleteDeviceByNameSQL string
//go:embed dml/deleteSensorByID.sql
deleteSensorByIDSQL string
//go:embed dml/deleteSensorByName.sql
deleteSensorByNameSQL string
//go:embed dml/insertDevice.sql
insertDeviceSQL string
//go:embed dml/insertHumidity.sql
insertHumiditySQL string
//go:embed dml/insertOrUpdateDevice.sql
insertOrUpdateDeviceSQL string
//go:embed dml/insertOrUpdateHumidity.sql
insertOrUpdateHumiditySQL string
//go:embed dml/insertOrUpdatePressure.sql
insertOrUpdatePressureSQL string
//go:embed dml/insertOrUpdateSensor.sql
insertOrUpdateSensorSQL string
//go:embed dml/insertOrUpdateTemperature.sql
insertOrUpdateTemperatureSQL string
//go:embed dml/insertPressure.sql
insertPressureSQL string
//go:embed dml/insertSensor.sql
insertSensorSQL string
//go:embed dml/insertTemperature.sql
insertTemperatureSQL string
//go:embed dml/selectDeviceByID.sql
selectDeviceByIDSQL string
//go:embed dml/selectDeviceByName.sql
selectDeviceByNameSQL string
//go:embed dml/selectDevices.sql
selectDevicesSQL string
//go:embed dml/selectHumidities.sql
selectHumiditiesSQL string
//go:embed dml/selectHumidityByID.sql
selectHumidityByIDSQL string
//go:embed dml/selectPressureByID.sql
selectPressureByIDSQL string
//go:embed dml/selectPressures.sql
selectPressuresSQL string
//go:embed dml/selectSensorByID.sql
selectSensorByIDSQL string
//go:embed dml/selectSensors.sql
selectSensorsSQL string
//go:embed dml/selectSensorsByDeviceID.sql
selectSensorsByDeviceIDSQL string
//go:embed dml/selectSensorsByModel.sql
selectSensorsByModelSQL string
//go:embed dml/selectSensorsByName.sql
selectSensorsByNameSQL string
//go:embed dml/selectTemperatureByID.sql
selectTemperatureByIDSQL string
//go:embed dml/selectTemperatures.sql
selectTemperaturesSQL string
//go:embed dml/updateDevice.sql
updateDeviceSQL string
//go:embed dml/updateSensor.sql
updateSensorSQL string
)
type Opts struct {
type PostgresOpts struct {
DatabaseURL *url.URL
Logger logger.Logger
}
func (o *Opts) Validate() error {
func (o *PostgresOpts) Validate() error {
for k, v := range map[string]interface{}{
"DatabaseURL": o.DatabaseURL,
"Logger": o.Logger,
@ -132,7 +37,7 @@ func (o *Opts) Validate() error {
return nil
}
func New(opts Opts) (*Postgres, error) {
func NewPostgres(opts PostgresOpts) (Repository, error) {
if err := opts.Validate(); err != nil {
return nil, err
}
@ -157,14 +62,14 @@ type Postgres struct {
}
// AddDevices into the database
func (postgres *Postgres) AddDevices(ctx context.Context, devices ...*types.Device) error {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
func (d *Postgres) AddDevices(ctx context.Context, devices ...*types.Device) error {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
err = postgres.insertDevices(tx, devices...)
err = d.insertDevices(tx, devices...)
if err != nil {
return err
}
@ -172,8 +77,8 @@ func (postgres *Postgres) AddDevices(ctx context.Context, devices ...*types.Devi
return tx.Commit()
}
func (postgres *Postgres) insertDevices(tx *sql.Tx, devices ...*types.Device) error {
stmt, err := tx.Prepare(insertDeviceSQL)
func (d *Postgres) insertDevices(tx *sql.Tx, devices ...*types.Device) error {
stmt, err := tx.Prepare(postgres.InsertDeviceSQL)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %v", err)
}
@ -200,14 +105,23 @@ func (postgres *Postgres) insertDevices(tx *sql.Tx, devices ...*types.Device) er
return nil
}
func (postgres *Postgres) AddOrUpdateDevices(ctx context.Context, devices ...*types.Device) error {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
func (d *Postgres) AddOrUpdateDevices(ctx context.Context, devices ...*types.Device) error {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
stmt, err := tx.Prepare(insertOrUpdateDeviceSQL)
err = d.insertOrUpdateDevices(tx, devices...)
if err != nil {
return err
}
return tx.Commit()
}
func (d *Postgres) insertOrUpdateDevices(tx *sql.Tx, devices ...*types.Device) error {
stmt, err := tx.Prepare(postgres.InsertOrUpdateDeviceSQL)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %w", err)
}
@ -231,11 +145,11 @@ func (postgres *Postgres) AddOrUpdateDevices(ctx context.Context, devices ...*ty
}
}
return tx.Commit()
return nil
}
// AddMeasuredValues into the database
func (postgres *Postgres) AddMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error {
func (d *Postgres) AddMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error {
splittedMeasuredValues := make(map[types.MeasuredValueType][]*types.MeasuredValue, 0)
for _, measuredValue := range measuredValues {
@ -245,58 +159,27 @@ func (postgres *Postgres) AddMeasuredValues(ctx context.Context, measuredValues
splittedMeasuredValues[measuredValue.ValueType] = append(splittedMeasuredValues[measuredValue.ValueType], measuredValue)
}
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
// General insert function
insert := func(tx *sql.Tx, query string, measuredValues []*types.MeasuredValue) error {
stmt, err := tx.Prepare(query)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %v", err)
}
defer stmt.Close()
for _, measuredValue := range measuredValues {
if measuredValue.CreationDate.Equal(time.Time{}) {
measuredValue.CreationDate = time.Now()
}
_, err := stmt.Exec(
&measuredValue.ID,
&measuredValue.Value,
&measuredValue.Date,
&measuredValue.SensorID,
&measuredValue.CreationDate,
&measuredValue.UpdateDate,
)
if err != nil {
return fmt.Errorf("Failed to execute statement: %v", err)
}
}
return nil
}
for measuredValueType, measuredValues := range splittedMeasuredValues {
var queryFile string
switch measuredValueType {
case types.Humidity:
queryFile = insertHumiditySQL
queryFile = postgres.InsertHumiditySQL
case types.Pressure:
queryFile = insertPressureSQL
queryFile = postgres.InsertPressureSQL
case types.Temperature:
queryFile = insertTemperatureSQL
queryFile = postgres.InsertTemperatureSQL
default:
return fmt.Errorf("Measured value type %v not supported", measuredValueType)
}
err := insert(tx, queryFile, measuredValues)
err := d.insertMeasuredValues(tx, queryFile, measuredValues...)
if err != nil {
return err
}
@ -305,8 +188,38 @@ func (postgres *Postgres) AddMeasuredValues(ctx context.Context, measuredValues
return tx.Commit()
}
func (d *Postgres) insertMeasuredValues(tx *sql.Tx, query string, measuredValues ...*types.MeasuredValue) error {
stmt, err := tx.Prepare(query)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %v", err)
}
defer stmt.Close()
for _, measuredValue := range measuredValues {
if measuredValue.CreationDate.Equal(time.Time{}) {
measuredValue.CreationDate = time.Now()
}
_, err := stmt.Exec(
&measuredValue.ID,
&measuredValue.Value,
&measuredValue.Date,
&measuredValue.SensorID,
&measuredValue.CreationDate,
&measuredValue.UpdateDate,
)
if err != nil {
return fmt.Errorf("Failed to execute statement: %v", err)
}
}
return nil
}
// AddOrUpdateMeasuredValues into the database
func (postgres *Postgres) AddOrUpdateMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error {
func (d *Postgres) AddOrUpdateMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error {
splittedMeasuredValues := make(map[types.MeasuredValueType][]*types.MeasuredValue, 0)
for _, measuredValue := range measuredValues {
@ -316,58 +229,27 @@ func (postgres *Postgres) AddOrUpdateMeasuredValues(ctx context.Context, measure
splittedMeasuredValues[measuredValue.ValueType] = append(splittedMeasuredValues[measuredValue.ValueType], measuredValue)
}
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
// General insert function
insert := func(tx *sql.Tx, query string, measuredValues []*types.MeasuredValue) error {
stmt, err := tx.Prepare(query)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %v", err)
}
defer stmt.Close()
for _, measuredValue := range measuredValues {
if measuredValue.CreationDate.Equal(time.Time{}) {
measuredValue.CreationDate = time.Now()
}
_, err := stmt.Exec(
&measuredValue.ID,
&measuredValue.Value,
&measuredValue.Date,
&measuredValue.SensorID,
&measuredValue.CreationDate,
&measuredValue.UpdateDate,
)
if err != nil {
return fmt.Errorf("Failed to execute statement: %v", err)
}
}
return nil
}
for measuredValueType, measuredValues := range splittedMeasuredValues {
var queryFile string
switch measuredValueType {
case types.Humidity:
queryFile = insertOrUpdateHumiditySQL
queryFile = postgres.InsertOrUpdateHumiditySQL
case types.Pressure:
queryFile = insertOrUpdatePressureSQL
queryFile = postgres.InsertOrUpdatePressureSQL
case types.Temperature:
queryFile = insertOrUpdateTemperatureSQL
queryFile = postgres.InsertOrUpdateTemperatureSQL
default:
return fmt.Errorf("Measured value type %v not supported", measuredValueType)
}
err := insert(tx, queryFile, measuredValues)
err := d.insertOrUpdateMeasuredValues(tx, queryFile, measuredValues...)
if err != nil {
return err
}
@ -376,15 +258,45 @@ func (postgres *Postgres) AddOrUpdateMeasuredValues(ctx context.Context, measure
return tx.Commit()
}
func (d *Postgres) insertOrUpdateMeasuredValues(tx *sql.Tx, query string, measuredValues ...*types.MeasuredValue) error {
stmt, err := tx.Prepare(query)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %v", err)
}
defer stmt.Close()
for _, measuredValue := range measuredValues {
if measuredValue.CreationDate.Equal(time.Time{}) {
measuredValue.CreationDate = time.Now()
}
_, err := stmt.Exec(
&measuredValue.ID,
&measuredValue.Value,
&measuredValue.Date,
&measuredValue.SensorID,
&measuredValue.CreationDate,
&measuredValue.UpdateDate,
)
if err != nil {
return fmt.Errorf("Failed to execute statement: %v", err)
}
}
return nil
}
// AddSensors into the database
func (postgres *Postgres) AddSensors(ctx context.Context, sensors ...*types.Sensor) error {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
func (d *Postgres) AddSensors(ctx context.Context, sensors ...*types.Sensor) error {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
stmt, err := tx.Prepare(insertSensorSQL)
stmt, err := tx.Prepare(postgres.InsertSensorSQL)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %v", err)
}
@ -420,14 +332,23 @@ func (postgres *Postgres) AddSensors(ctx context.Context, sensors ...*types.Sens
}
// AddOrUpdateSensors into the database
func (postgres *Postgres) AddOrUpdateSensors(ctx context.Context, sensors ...*types.Sensor) error {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
func (d *Postgres) AddOrUpdateSensors(ctx context.Context, sensors ...*types.Sensor) error {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
stmt, err := tx.Prepare(insertOrUpdateSensorSQL)
err = d.insertOrUpdateSensors(tx, sensors...)
if err != nil {
return err
}
return tx.Commit()
}
func (d *Postgres) insertOrUpdateSensors(tx *sql.Tx, sensors ...*types.Sensor) error {
stmt, err := tx.Prepare(postgres.InsertOrUpdateSensorSQL)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %v", err)
}
@ -459,7 +380,7 @@ func (postgres *Postgres) AddOrUpdateSensors(ctx context.Context, sensors ...*ty
}
}
return tx.Commit()
return nil
}
// Close closes the database and prevents new queries from starting. Close then
@ -469,13 +390,13 @@ func (postgres *Postgres) Close() error {
}
// Migrate creates all required tables if not exist
func (postgres *Postgres) Migrate(ctx context.Context) error {
sourceDriver, err := iofs.New(ddlAssets, "ddl")
func (d *Postgres) Migrate(ctx context.Context) error {
sourceDriver, err := iofs.New(postgres.DDLAssets, postgres.DDLAssetPath)
if err != nil {
return err
}
m, err := migrate.NewWithSourceInstance("iofs", sourceDriver, postgres.databaseURL.String())
m, err := migrate.NewWithSourceInstance("iofs", sourceDriver, d.databaseURL.String())
if err != nil {
return err
}
@ -490,14 +411,14 @@ func (postgres *Postgres) Migrate(ctx context.Context) error {
}
// GetDevice from database
func (postgres *Postgres) GetDeviceByID(ctx context.Context, id string) (*types.Device, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetDeviceByID(ctx context.Context, id string) (*types.Device, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
devices, err := postgres.selectDevices(tx, selectDeviceByIDSQL, id)
devices, err := d.selectDevices(tx, postgres.SelectDeviceByIDSQL, id)
if err != nil {
return nil, err
}
@ -515,14 +436,14 @@ func (postgres *Postgres) GetDeviceByID(ctx context.Context, id string) (*types.
}
// GetDevice from database
func (postgres *Postgres) GetDeviceByName(ctx context.Context, name string) (*types.Device, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetDeviceByName(ctx context.Context, name string) (*types.Device, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
devices, err := postgres.selectDevices(tx, selectDeviceByNameSQL, name)
devices, err := d.selectDevices(tx, postgres.SelectDeviceByNameSQL, name)
if err != nil {
return nil, err
}
@ -540,14 +461,14 @@ func (postgres *Postgres) GetDeviceByName(ctx context.Context, name string) (*ty
}
// GetDevices from the database
func (postgres *Postgres) GetDevices(ctx context.Context) ([]*types.Device, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetDevices(ctx context.Context) ([]*types.Device, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
devices, err := postgres.selectDevices(tx, selectDevicesSQL)
devices, err := d.selectDevices(tx, postgres.SelectDevicesSQL)
if err != nil {
return nil, err
}
@ -560,7 +481,7 @@ func (postgres *Postgres) GetDevices(ctx context.Context) ([]*types.Device, erro
return devices, nil
}
func (postgres *Postgres) selectDevices(tx *sql.Tx, query string, args ...interface{}) ([]*types.Device, error) {
func (d *Postgres) selectDevices(tx *sql.Tx, query string, args ...interface{}) ([]*types.Device, error) {
stmt, err := tx.Prepare(query)
if err != nil {
return nil, fmt.Errorf("Failed to prepare statement: %v", err)
@ -592,14 +513,14 @@ func (postgres *Postgres) selectDevices(tx *sql.Tx, query string, args ...interf
}
// GetHumidity returns humidity from the database
func (postgres *Postgres) GetHumidityByID(ctx context.Context, id string) (*types.MeasuredValue, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetHumidityByID(ctx context.Context, id string) (*types.MeasuredValue, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, err
}
defer tx.Rollback()
measuredValues, err := postgres.selectMeasuredValue(tx, selectHumidityByIDSQL, id)
measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectHumidityByIDSQL, id)
if err != nil {
return nil, err
}
@ -621,14 +542,14 @@ func (postgres *Postgres) GetHumidityByID(ctx context.Context, id string) (*type
}
// GetHumidities returns humidities from the database
func (postgres *Postgres) GetHumidities(ctx context.Context) ([]*types.MeasuredValue, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetHumidities(ctx context.Context) ([]*types.MeasuredValue, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, err
}
defer tx.Rollback()
measuredValues, err := postgres.selectMeasuredValue(tx, selectHumiditiesSQL)
measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectHumiditiesSQL)
if err != nil {
return nil, err
}
@ -645,7 +566,7 @@ func (postgres *Postgres) GetHumidities(ctx context.Context) ([]*types.MeasuredV
return measuredValues, nil
}
func (postgres *Postgres) selectMeasuredValue(tx *sql.Tx, query string, args ...interface{}) ([]*types.MeasuredValue, error) {
func (d *Postgres) selectMeasuredValue(tx *sql.Tx, query string, args ...interface{}) ([]*types.MeasuredValue, error) {
stmt, err := tx.Prepare(query)
if err != nil {
return nil, err
@ -680,14 +601,14 @@ func (postgres *Postgres) selectMeasuredValue(tx *sql.Tx, query string, args ...
}
// GetPressure returns pressure from the database
func (postgres *Postgres) GetPressureByID(ctx context.Context, id string) (*types.MeasuredValue, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetPressureByID(ctx context.Context, id string) (*types.MeasuredValue, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, err
}
defer tx.Rollback()
measuredValues, err := postgres.selectMeasuredValue(tx, selectPressureByIDSQL, id)
measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectPressureByIDSQL, id)
if err != nil {
return nil, err
}
@ -709,14 +630,14 @@ func (postgres *Postgres) GetPressureByID(ctx context.Context, id string) (*type
}
// GetPressures returns pressure from the database
func (postgres *Postgres) GetPressures(ctx context.Context) ([]*types.MeasuredValue, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetPressures(ctx context.Context) ([]*types.MeasuredValue, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, err
}
defer tx.Rollback()
measuredValues, err := postgres.selectMeasuredValue(tx, selectPressuresSQL)
measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectPressuresSQL)
if err != nil {
return nil, err
}
@ -734,14 +655,14 @@ func (postgres *Postgres) GetPressures(ctx context.Context) ([]*types.MeasuredVa
}
// GetSensor from database
func (postgres *Postgres) GetSensorByID(ctx context.Context, id string) (*types.Sensor, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetSensorByID(ctx context.Context, id string) (*types.Sensor, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
sensors, err := postgres.selectSensors(tx, selectSensorByIDSQL, id)
sensors, err := d.selectSensors(tx, postgres.SelectSensorByIDSQL, id)
if err != nil {
return nil, err
}
@ -759,14 +680,14 @@ func (postgres *Postgres) GetSensorByID(ctx context.Context, id string) (*types.
}
// GetSensors from the database
func (postgres *Postgres) GetSensors(ctx context.Context) ([]*types.Sensor, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetSensors(ctx context.Context) ([]*types.Sensor, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
sensors, err := postgres.selectSensors(tx, selectSensorsSQL)
sensors, err := d.selectSensors(tx, postgres.SelectSensorsSQL)
if err != nil {
return nil, err
}
@ -780,8 +701,8 @@ func (postgres *Postgres) GetSensors(ctx context.Context) ([]*types.Sensor, erro
}
// GetSensorsByModels from the database
func (postgres *Postgres) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs ...string) ([]*types.Sensor, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs ...string) ([]*types.Sensor, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, fmt.Errorf("Failed to begin new transaction: %v", err)
}
@ -789,7 +710,7 @@ func (postgres *Postgres) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs .
cachedSensors := make([]*types.Sensor, 0)
for i := range deviceIDs {
sensors, err := postgres.selectSensors(tx, selectSensorsByDeviceIDSQL, deviceIDs[i])
sensors, err := d.selectSensors(tx, postgres.SelectSensorsByDeviceIDSQL, deviceIDs[i])
if err != nil {
return nil, err
}
@ -806,15 +727,15 @@ func (postgres *Postgres) GetSensorsByDeviceIDs(ctx context.Context, deviceIDs .
}
// GetSensorsByModel from the database
func (postgres *Postgres) GetSensorsByModels(ctx context.Context, sensorModels ...string) ([]*types.Sensor, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetSensorsByModels(ctx context.Context, sensorModels ...string) ([]*types.Sensor, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, fmt.Errorf("Failed to begin new transaction: %v", err)
}
cachedSensors := make([]*types.Sensor, 0)
for i := range sensorModels {
sensors, err := postgres.selectSensors(tx, selectSensorsByModelSQL, sensorModels[i])
sensors, err := d.selectSensors(tx, postgres.SelectSensorsByModelSQL, sensorModels[i])
if err != nil {
return nil, err
}
@ -831,8 +752,8 @@ func (postgres *Postgres) GetSensorsByModels(ctx context.Context, sensorModels .
}
// GetSensorsByModel from the database
func (postgres *Postgres) GetSensorsByNames(ctx context.Context, sensorNames ...string) ([]*types.Sensor, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetSensorsByNames(ctx context.Context, sensorNames ...string) ([]*types.Sensor, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, fmt.Errorf("Failed to begin new transaction: %v", err)
}
@ -840,7 +761,7 @@ func (postgres *Postgres) GetSensorsByNames(ctx context.Context, sensorNames ...
cachedSensors := make([]*types.Sensor, 0)
for i := range sensorNames {
sensors, err := postgres.selectSensors(tx, selectSensorsByNameSQL, sensorNames[i])
sensors, err := d.selectSensors(tx, postgres.SelectSensorsByNameSQL, sensorNames[i])
if err != nil {
return nil, err
}
@ -856,7 +777,7 @@ func (postgres *Postgres) GetSensorsByNames(ctx context.Context, sensorNames ...
return cachedSensors, nil
}
func (postgres *Postgres) selectSensors(tx *sql.Tx, query string, args ...interface{}) ([]*types.Sensor, error) {
func (d *Postgres) selectSensors(tx *sql.Tx, query string, args ...interface{}) ([]*types.Sensor, error) {
stmt, err := tx.Prepare(query)
if err != nil {
return nil, fmt.Errorf("Failed to prepare statement: %v", err)
@ -897,14 +818,14 @@ func (postgres *Postgres) selectSensors(tx *sql.Tx, query string, args ...interf
}
// GetTemperature returns temperatures from the database
func (postgres *Postgres) GetTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, err
}
defer tx.Rollback()
measuredValues, err := postgres.selectMeasuredValue(tx, selectTemperatureByIDSQL, id)
measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectTemperatureByIDSQL, id)
if err != nil {
return nil, err
}
@ -926,14 +847,14 @@ func (postgres *Postgres) GetTemperatureByID(ctx context.Context, id string) (*t
}
// GetTemperatures returns temperatures from the database
func (postgres *Postgres) GetTemperatures(ctx context.Context) ([]*types.MeasuredValue, error) {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
func (d *Postgres) GetTemperatures(ctx context.Context) ([]*types.MeasuredValue, error) {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
if err != nil {
return nil, err
}
defer tx.Rollback()
measuredValues, err := postgres.selectMeasuredValue(tx, selectTemperaturesSQL)
measuredValues, err := d.selectMeasuredValue(tx, postgres.SelectTemperaturesSQL)
if err != nil {
return nil, err
}
@ -950,15 +871,62 @@ func (postgres *Postgres) GetTemperatures(ctx context.Context) ([]*types.Measure
return measuredValues, nil
}
// Import imports devices, sensors and all measured values from a source
// repository. Existing entries will be updated.
func (d *Postgres) Import(ctx context.Context, src Repository) error {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return err
}
defer tx.Rollback()
devices, err := src.GetDevices(ctx)
if err != nil {
return err
}
err = d.insertOrUpdateDevices(tx, devices...)
if err != nil {
return err
}
sensors, err := src.GetSensors(ctx)
err = d.insertOrUpdateSensors(tx, sensors...)
if err != nil {
return err
}
for _, value := range []struct {
f func(ctx context.Context) ([]*types.MeasuredValue, error)
query string
}{
{f: src.GetHumidities, query: postgres.InsertOrUpdateHumiditySQL},
{f: src.GetPressures, query: postgres.InsertOrUpdatePressureSQL},
{f: src.GetTemperatures, query: postgres.InsertOrUpdateTemperatureSQL},
} {
measuredValues, err := value.f(ctx)
if err != nil {
return err
}
err = d.insertOrUpdateMeasuredValues(tx, value.query, measuredValues...)
if err != nil {
return err
}
}
return tx.Commit()
}
// RemoveDevices from the database
func (postgres *Postgres) RemoveDevicesByIDs(ctx context.Context, deviceIDs ...string) error {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
func (d *Postgres) RemoveDevicesByIDs(ctx context.Context, deviceIDs ...string) error {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
stmt, err := tx.Prepare(deleteDeviceByIDSQL)
stmt, err := tx.Prepare(postgres.DeleteDeviceByIDSQL)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %v", err)
}
@ -975,14 +943,14 @@ func (postgres *Postgres) RemoveDevicesByIDs(ctx context.Context, deviceIDs ...s
}
// RemoveDevices from the database
func (postgres *Postgres) RemoveDevicesByNames(ctx context.Context, names ...string) error {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
func (d *Postgres) RemoveDevicesByNames(ctx context.Context, names ...string) error {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
stmt, err := tx.Prepare(deleteDeviceByNameSQL)
stmt, err := tx.Prepare(postgres.DeleteDeviceByNameSQL)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %v", err)
}
@ -999,14 +967,14 @@ func (postgres *Postgres) RemoveDevicesByNames(ctx context.Context, names ...str
}
// RemoveSensors from the database
func (postgres *Postgres) RemoveSensorsByIDs(ctx context.Context, sensorIDs ...string) error {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
func (d *Postgres) RemoveSensorsByIDs(ctx context.Context, sensorIDs ...string) error {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
stmt, err := tx.Prepare(deleteSensorByIDSQL)
stmt, err := tx.Prepare(postgres.DeleteSensorByIDSQL)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %v", err)
}
@ -1023,14 +991,14 @@ func (postgres *Postgres) RemoveSensorsByIDs(ctx context.Context, sensorIDs ...s
}
// RemoveSensors from the database
func (postgres *Postgres) RemoveSensorsByNames(ctx context.Context, sensorIDs ...string) error {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
func (d *Postgres) RemoveSensorsByNames(ctx context.Context, sensorIDs ...string) error {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return fmt.Errorf("Failed to begin new transaction: %v", err)
}
defer tx.Rollback()
stmt, err := tx.Prepare(deleteSensorByNameSQL)
stmt, err := tx.Prepare(postgres.DeleteSensorByNameSQL)
if err != nil {
return fmt.Errorf("Failed to prepare statement: %v", err)
}
@ -1047,14 +1015,14 @@ func (postgres *Postgres) RemoveSensorsByNames(ctx context.Context, sensorIDs ..
}
// UpdateDevices updates a device in the database
func (postgres *Postgres) UpdateDevices(ctx context.Context, devices ...*types.Device) error {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
func (d *Postgres) UpdateDevices(ctx context.Context, devices ...*types.Device) error {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare(updateDeviceSQL)
stmt, err := tx.Prepare(postgres.UpdateDeviceSQL)
if err != nil {
return err
}
@ -1080,14 +1048,14 @@ func (postgres *Postgres) UpdateDevices(ctx context.Context, devices ...*types.D
}
// UpdateSensors updates a sensor in the database
func (postgres *Postgres) UpdateSensors(ctx context.Context, sensors ...*types.Sensor) error {
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
func (d *Postgres) UpdateSensors(ctx context.Context, sensors ...*types.Sensor) error {
tx, err := d.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare(updateSensorSQL)
stmt, err := tx.Prepare(postgres.UpdateSensorSQL)
if err != nil {
return err
}

View File

@ -0,0 +1,100 @@
package postgres
import "embed"
var (
DDLAssetPath = "ddl"
//go:embed ddl/*.sql
DDLAssets embed.FS
//go:embed dml/deleteDeviceByID.sql
DeleteDeviceByIDSQL string
//go:embed dml/deleteDeviceByName.sql
DeleteDeviceByNameSQL string
//go:embed dml/deleteSensorByID.sql
DeleteSensorByIDSQL string
//go:embed dml/deleteSensorByName.sql
DeleteSensorByNameSQL string
//go:embed dml/insertDevice.sql
InsertDeviceSQL string
//go:embed dml/insertHumidity.sql
InsertHumiditySQL string
//go:embed dml/insertOrUpdateDevice.sql
InsertOrUpdateDeviceSQL string
//go:embed dml/insertOrUpdateHumidity.sql
InsertOrUpdateHumiditySQL string
//go:embed dml/insertOrUpdatePressure.sql
InsertOrUpdatePressureSQL string
//go:embed dml/insertOrUpdateSensor.sql
InsertOrUpdateSensorSQL string
//go:embed dml/insertOrUpdateTemperature.sql
InsertOrUpdateTemperatureSQL string
//go:embed dml/insertPressure.sql
InsertPressureSQL string
//go:embed dml/insertSensor.sql
InsertSensorSQL string
//go:embed dml/insertTemperature.sql
InsertTemperatureSQL string
//go:embed dml/selectDeviceByID.sql
SelectDeviceByIDSQL string
//go:embed dml/selectDeviceByName.sql
SelectDeviceByNameSQL string
//go:embed dml/selectDevices.sql
SelectDevicesSQL string
//go:embed dml/selectHumidities.sql
SelectHumiditiesSQL string
//go:embed dml/selectHumidityByID.sql
SelectHumidityByIDSQL string
//go:embed dml/selectPressureByID.sql
SelectPressureByIDSQL string
//go:embed dml/selectPressures.sql
SelectPressuresSQL string
//go:embed dml/selectSensorByID.sql
SelectSensorByIDSQL string
//go:embed dml/selectSensors.sql
SelectSensorsSQL string
//go:embed dml/selectSensorsByDeviceID.sql
SelectSensorsByDeviceIDSQL string
//go:embed dml/selectSensorsByModel.sql
SelectSensorsByModelSQL string
//go:embed dml/selectSensorsByName.sql
SelectSensorsByNameSQL string
//go:embed dml/selectTemperatureByID.sql
SelectTemperatureByIDSQL string
//go:embed dml/selectTemperatures.sql
SelectTemperaturesSQL string
//go:embed dml/updateDevice.sql
UpdateDeviceSQL string
//go:embed dml/updateSensor.sql
UpdateSensorSQL string
)

View File

@ -5,8 +5,6 @@ import (
"fmt"
"net/url"
"git.cryptic.systems/volker.raschek/flucky/pkg/repository/postgres"
"git.cryptic.systems/volker.raschek/flucky/pkg/repository/sqlite3"
"git.cryptic.systems/volker.raschek/flucky/pkg/types"
"git.cryptic.systems/volker.raschek/go-logger"
)
@ -33,6 +31,7 @@ type Repository interface {
GetSensors(ctx context.Context) ([]*types.Sensor, error)
GetTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error)
GetTemperatures(ctx context.Context) ([]*types.MeasuredValue, error)
Import(ctx context.Context, src Repository) error
Migrate(ctx context.Context) error
RemoveDevicesByIDs(ctx context.Context, deviceIDs ...string) error
RemoveDevicesByNames(ctx context.Context, names ...string) error
@ -46,7 +45,7 @@ type Repository interface {
func New(databaseURL *url.URL, flogger logger.Logger) (Repository, error) {
switch databaseURL.Scheme {
case "postgres":
repo, err := postgres.New(postgres.Opts{
repo, err := NewPostgres(PostgresOpts{
DatabaseURL: databaseURL,
Logger: flogger,
})
@ -63,7 +62,7 @@ func New(databaseURL *url.URL, flogger logger.Logger) (Repository, error) {
return repo, nil
case "sqlite3":
repo, err := sqlite3.New(sqlite3.Opts{
repo, err := NewSQLite(SQLiteOpts{
DatabaseURL: databaseURL,
Logger: flogger,
})

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,100 @@
package sqlite3
import "embed"
var (
DDLAssetPath = "ddl"
//go:embed ddl/*.sql
DDLAssets embed.FS
//go:embed dml/deleteDeviceByID.sql
DeleteDeviceByIDSQL string
//go:embed dml/deleteDeviceByName.sql
DeleteDeviceByNameSQL string
//go:embed dml/deleteSensorByID.sql
DeleteSensorByIDSQL string
//go:embed dml/deleteSensorByName.sql
DeleteSensorByNameSQL string
//go:embed dml/insertDevice.sql
InsertDeviceSQL string
//go:embed dml/insertHumidity.sql
InsertHumiditySQL string
//go:embed dml/insertOrUpdateDevice.sql
InsertOrUpdateDeviceSQL string
//go:embed dml/insertOrUpdateHumidity.sql
InsertOrUpdateHumiditySQL string
//go:embed dml/insertOrUpdatePressure.sql
InsertOrUpdatePressureSQL string
//go:embed dml/insertOrUpdateSensor.sql
InsertOrUpdateSensorSQL string
//go:embed dml/insertOrUpdateTemperature.sql
InsertOrUpdateTemperatureSQL string
//go:embed dml/insertPressure.sql
InsertPressureSQL string
//go:embed dml/insertSensor.sql
InsertSensorSQL string
//go:embed dml/insertTemperature.sql
InsertTemperatureSQL string
//go:embed dml/selectDeviceByID.sql
SelectDeviceByIDSQL string
//go:embed dml/selectDeviceByName.sql
SelectDeviceByNameSQL string
//go:embed dml/selectDevices.sql
SelectDevicesSQL string
//go:embed dml/selectHumidities.sql
SelectHumiditiesSQL string
//go:embed dml/selectHumidityByID.sql
SelectHumidityByIDSQL string
//go:embed dml/selectPressureByID.sql
SelectPressureByIDSQL string
//go:embed dml/selectPressures.sql
SelectPressuresSQL string
//go:embed dml/selectSensorByID.sql
SelectSensorByIDSQL string
//go:embed dml/selectSensors.sql
SelectSensorsSQL string
//go:embed dml/selectSensorsByDeviceID.sql
SelectSensorsByDeviceIDSQL string
//go:embed dml/selectSensorsByModel.sql
SelectSensorsByModelSQL string
//go:embed dml/selectSensorsByName.sql
SelectSensorsByNameSQL string
//go:embed dml/selectTemperatureByID.sql
SelectTemperatureByIDSQL string
//go:embed dml/selectTemperatures.sql
SelectTemperaturesSQL string
//go:embed dml/updateDevice.sql
UpdateDeviceSQL string
//go:embed dml/updateSensor.sql
UpdateSensorSQL string
)