package db import ( "context" "database/sql" "fmt" "path/filepath" _ "github.com/lib/pq" "github.com/volker-raschek/flucky/pkg/types" "github.com/volker-raschek/go-logger/pkg/logger" ) var ( postgresAssetPath = "pkg/storage/postgres" ) // Postgres implementation type Postgres struct { dbo *sql.DB flogger logger.Logger } // DeleteDevices from the database func (postgres *Postgres) DeleteDevices(ctx context.Context, deviceIDs ...string) error { asset := filepath.Join(postgresAssetPath, "deleteDevice.sql") queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() for _, deviceID := range deviceIDs { _, err = stmt.Exec(deviceID) if err != nil { tx.Rollback() return err } } return tx.Commit() } // DeleteSensors from the database func (postgres *Postgres) DeleteSensors(ctx context.Context, sensorIDs ...string) error { asset := filepath.Join(postgresAssetPath, "deleteSensor.sql") queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() for _, sensorID := range sensorIDs { _, err = stmt.Exec(sensorID) if err != nil { tx.Rollback() return err } } return tx.Commit() } // InsertDevices into the database func (postgres *Postgres) InsertDevices(ctx context.Context, devices ...*types.Device) error { asset := filepath.Join(postgresAssetPath, "insertDevice.sql") queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() for _, device := range devices { _, err = stmt.Exec(&device.ID, &device.Name, &device.Location, &device.CreationDate) if err != nil { tx.Rollback() return fmt.Errorf("Failed to execute statement: %v", err) } } return tx.Commit() } // InsertMeasuredValues into the database func (postgres *Postgres) InsertMeasuredValues(ctx context.Context, measuredValues ...*types.MeasuredValue) error { splittedMeasuredValues := make(map[string][]*types.MeasuredValue, 0) for _, measuredValue := range measuredValues { if _, ok := splittedMeasuredValues[measuredValue.ValueType]; !ok { splittedMeasuredValues[measuredValue.ValueType] = make([]*types.MeasuredValue, 0) } splittedMeasuredValues[measuredValue.ValueType] = append(splittedMeasuredValues[measuredValue.ValueType], measuredValue) } tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } // General insert function insert := func(tx *sql.Tx, asset string, measuredValues []*types.MeasuredValue) error { queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) stmt, err := tx.Prepare(query) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() for _, measuredValue := range measuredValues { _, err := stmt.Exec(&measuredValue.ID, &measuredValue.Value, &measuredValue.SensorID, &measuredValue.CreationDate) if err != nil { return fmt.Errorf("Failed to execute statement: %v", err) } } return nil } for measuredValueType, measuredValues := range splittedMeasuredValues { var asset string switch measuredValueType { case "humidity": asset = filepath.Join(postgresAssetPath, "insertHumidity.sql") case "pressure": asset = filepath.Join(postgresAssetPath, "insertPressure.sql") case "temperature": asset = filepath.Join(postgresAssetPath, "insertTemperature.sql") default: tx.Rollback() return fmt.Errorf("Measured value type %v not supported", measuredValueType) } err := insert(tx, asset, measuredValues) if err != nil { tx.Rollback() return err } } return tx.Commit() } // InsertSensors into the database func (postgres *Postgres) InsertSensors(ctx context.Context, sensors ...*types.Sensor) error { asset := filepath.Join(postgresAssetPath, "insertSensor.sql") queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() for _, sensor := range sensors { _, err = stmt.Exec(&sensor.ID, &sensor.Name, &sensor.Location, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.Model, &sensor.Enabled, &sensor.DeviceID, &sensor.CreationDate) if err != nil { tx.Rollback() return fmt.Errorf("Failed to execute statement: %v", err) } } return tx.Commit() } // SelectDevice from database func (postgres *Postgres) SelectDevice(ctx context.Context, id string) (*types.Device, error) { asset := filepath.Join(postgresAssetPath, "selectDevice.sql") queryBytes, err := Asset(asset) if err != nil { return nil, fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return nil, fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() row := stmt.QueryRow(id) device := new(types.Device) err = row.Scan(&device.ID, &device.Name, &device.Location, &device.CreationDate) if err != nil { return nil, fmt.Errorf("Failed to scan row: %v", err) } err = tx.Commit() if err != nil { return nil, fmt.Errorf("Failed to commit transaction: %v", err) } return device, nil } // SelectDevices from the database func (postgres *Postgres) SelectDevices(ctx context.Context) ([]*types.Device, error) { asset := filepath.Join(postgresAssetPath, "selectDevices.sql") queryBytes, err := Asset(asset) if err != nil { return nil, fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return nil, fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() rows, err := stmt.Query() if err != nil { return nil, fmt.Errorf("Failed to query statement: %v", err) } devices := make([]*types.Device, 0) for rows.Next() { device := new(types.Device) err = rows.Scan(&device.ID, &device.Name, &device.Location, &device.CreationDate) if err != nil { return nil, fmt.Errorf("Failed to scan row: %v", err) } devices = append(devices, device) } err = tx.Commit() if err != nil { return nil, fmt.Errorf("Failed to commit transaction: %v", err) } return devices, nil } // SelectSensor from database func (postgres *Postgres) SelectSensor(ctx context.Context, id string) (*types.Sensor, error) { asset := filepath.Join(postgresAssetPath, "selectSensor.sql") queryBytes, err := Asset(asset) if err != nil { return nil, fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return nil, fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() row := stmt.QueryRow(id) sensor := new(types.Sensor) err = row.Scan(&sensor.ID, &sensor.Name, &sensor.Location, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.Model, &sensor.Enabled, &sensor.DeviceID, &sensor.CreationDate) if err != nil { return nil, fmt.Errorf("Failed to scan row: %v", err) } err = tx.Commit() if err != nil { return nil, fmt.Errorf("Failed to commit transaction: %v", err) } return sensor, nil } // SelectSensors from the database func (postgres *Postgres) SelectSensors(ctx context.Context) ([]*types.Sensor, error) { asset := filepath.Join(postgresAssetPath, "selectSensors.sql") queryBytes, err := Asset(asset) if err != nil { return nil, fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return nil, fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() rows, err := stmt.Query() if err != nil { return nil, fmt.Errorf("Failed to query statement: %v", err) } sensors := make([]*types.Sensor, 0) for rows.Next() { sensor := new(types.Sensor) err = rows.Scan(&sensor.ID, &sensor.Name, &sensor.Location, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.Model, &sensor.Enabled, &sensor.DeviceID, &sensor.CreationDate) if err != nil { return nil, fmt.Errorf("Failed to scan row: %v", err) } sensors = append(sensors, sensor) } err = tx.Commit() if err != nil { return nil, fmt.Errorf("Failed to commit transaction: %v", err) } return sensors, nil } // UpdateDevices updates a device in the database func (postgres *Postgres) UpdateDevices(ctx context.Context, devices ...*types.Device) error { return nil } // UpdateSensors updates a sensor in the database func (postgres *Postgres) UpdateSensors(ctx context.Context, sensor ...*types.Sensor) error { return nil }