package storage import ( "context" "database/sql" "fmt" "net/url" "path/filepath" _ "github.com/lib/pq" "github.com/volker-raschek/flucky/pkg/types" "github.com/volker-raschek/go-logger/pkg/logger" ) // Storage is a general interface for a storage endpoint type Storage interface { InsertDevice(ctx context.Context, device *types.Device) error InsertMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error InsertSensor(ctx context.Context, sensor *types.Sensor) error RemoveSensorByID(ctx context.Context, sensorID string) error RemoveSensorByName(ctx context.Context, sensorName string) error SelectDevice(ctx context.Context, id string) (*types.Device, error) SelectSensor(ctx context.Context, id string) (*types.Sensor, error) } var ( postgresAssetPath = "pkg/storage/postgres" ) // Postgres implementation type Postgres struct { dbo *sql.DB flogger logger.Logger } // InsertDevice into the database func (postgres *Postgres) InsertDevice(ctx context.Context, device *types.Device) error { asset := filepath.Join(postgresAssetPath, "insertDevice.sql") queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() _, err = stmt.Exec(&device.ID, &device.Name, &device.Location, &device.CreationDate) if err != nil { tx.Rollback() return fmt.Errorf("Failed to execute statement: %v", err) } return tx.Commit() } // InsertMeasuredValues into the database func (postgres *Postgres) InsertMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error { splittedMeasuredValues := make(map[string][]*types.MeasuredValue, 0) for _, measuredValue := range measuredValues { if _, ok := splittedMeasuredValues[measuredValue.ValueType]; !ok { splittedMeasuredValues[measuredValue.ValueType] = make([]*types.MeasuredValue, 0) } splittedMeasuredValues[measuredValue.ValueType] = append(splittedMeasuredValues[measuredValue.ValueType], measuredValue) } tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } // General insert function insert := func(tx *sql.Tx, asset string, measuredValues []*types.MeasuredValue) error { queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) stmt, err := tx.Prepare(query) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() for _, measuredValue := range measuredValues { _, err := stmt.Exec(&measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate) if err != nil { return fmt.Errorf("Failed to execute statement: %v", err) } } return nil } for measuredValueType, measuredValues := range splittedMeasuredValues { var asset string switch measuredValueType { case "humidity": asset = filepath.Join(postgresAssetPath, "insertHumidity.sql") case "pressure": asset = filepath.Join(postgresAssetPath, "insertPressure.sql") case "temperature": asset = filepath.Join(postgresAssetPath, "insertTemperature.sql") default: tx.Rollback() return fmt.Errorf("Measured value type %v not supported", measuredValueType) } err := insert(tx, asset, measuredValues) if err != nil { tx.Rollback() return err } } return tx.Commit() } // InsertSensor into the database func (postgres *Postgres) InsertSensor(ctx context.Context, sensor *types.Sensor) error { asset := filepath.Join(postgresAssetPath, "insertSensor.sql") queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() _, err = stmt.Exec(&sensor.ID, &sensor.Name, &sensor.Location, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.Model, &sensor.Enabled, &sensor.DeviceID, &sensor.CreationDate) if err != nil { tx.Rollback() return fmt.Errorf("Failed to execute statement: %v", err) } return tx.Commit() } // RemoveSensorByID from the database func (postgres *Postgres) RemoveSensorByID(ctx context.Context, sensorID string) error { asset := filepath.Join(postgresAssetPath, "removeSensorByID.sql") queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() _, err = stmt.Exec(sensorID) if err != nil { tx.Rollback() return err } return tx.Commit() } // RemoveSensorByName from the database func (postgres *Postgres) RemoveSensorByName(ctx context.Context, sensorID string) error { asset := filepath.Join(postgresAssetPath, "removeSensorByName.sql") queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false}) if err != nil { return fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() _, err = stmt.Exec(sensorID) if err != nil { tx.Rollback() return err } return tx.Commit() } // SelectDevice from database func (postgres *Postgres) SelectDevice(ctx context.Context, id string) (*types.Device, error) { asset := filepath.Join(postgresAssetPath, "selectDeviceByID.sql") queryBytes, err := Asset(asset) if err != nil { return nil, fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return nil, fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() row := stmt.QueryRow(id) device := new(types.Device) err = row.Scan(&device.ID, &device.Name, &device.Location, &device.CreationDate) if err != nil { return nil, fmt.Errorf("Failed to scan row: %v", err) } return device, nil } // SelectSensor from database func (postgres *Postgres) SelectSensor(ctx context.Context, id string) (*types.Sensor, error) { asset := filepath.Join(postgresAssetPath, "selectSensorByID.sql") queryBytes, err := Asset(asset) if err != nil { return nil, fmt.Errorf("Failed to load asset %v: %v", asset, err) } query := string(queryBytes) tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}) if err != nil { return nil, fmt.Errorf("Failed to begin new transaction: %v", err) } stmt, err := tx.Prepare(query) if err != nil { return nil, fmt.Errorf("Failed to prepare statement: %v", err) } defer stmt.Close() row := stmt.QueryRow(id) sensor := new(types.Sensor) err = row.Scan(&sensor.ID, &sensor.Name, &sensor.Location, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.Model, &sensor.Enabled, &sensor.DeviceID, &sensor.CreationDate) if err != nil { return nil, fmt.Errorf("Failed to scan row: %v", err) } return sensor, nil } // New returns a new storage provider func New(storageEndpoint string, flogger logger.Logger) (Storage, error) { storageEndpointURL, err := url.Parse(storageEndpoint) if err != nil { return nil, err } switch storageEndpointURL.Scheme { case "postgres": newDBO, err := sql.Open(storageEndpointURL.Scheme, storageEndpointURL.String()) if err != nil { return nil, err } return &Postgres{ dbo: newDBO, flogger: flogger, }, nil default: return nil, fmt.Errorf("Unsupported database scheme: %v", storageEndpointURL.Scheme) } }