PKGBUILD/pkg/storage/db/postgres.go

714 lines
22 KiB
Go

package db
import (
"context"
"database/sql"
"fmt"
"path/filepath"
"sort"
"strings"
"github.com/Masterminds/semver"
"github.com/volker-raschek/flucky/pkg/types"
// PostgreSQL lib
_ "github.com/lib/pq"
)
var (
postgresAssetPath = "pkg/storage/db/sql/psql"
)
// Postgres provide functions to interact with a postgres database
type Postgres struct {
dbo *sql.DB
}
// Close the database connection
func (p *Postgres) Close() error {
return p.dbo.Close()
}
// Schema create or updates the database schema to a given version. Normally the
// version is the same as the flucky binary version.
func (p *Postgres) Schema(ctx context.Context, version *semver.Version) error {
schemaFunc := func(ctx context.Context, fromVersion *semver.Version, toVersion *semver.Version) error {
assetPath := fmt.Sprintf("%v/schema", postgresAssetPath)
sqlAssetFiles, err := AssetDir(assetPath)
if err != nil {
return fmt.Errorf("Can not restore asset directory %v: %v", assetPath, err)
}
postgreSQLVersionChanges := make(map[*semver.Version]string, 0)
postgreSQLVersions := make([]*semver.Version, len(sqlAssetFiles))
for i, sqlAssetFile := range sqlAssetFiles {
fileSemVersion, err := semver.NewVersion(strings.ReplaceAll(sqlAssetFile, ".sql", ""))
if err != nil {
return fmt.Errorf("Can not create semantic version from file asset %v: %v", sqlAssetFile, err)
}
postgreSQLVersionChanges[fileSemVersion] = sqlAssetFile
postgreSQLVersions[i] = fileSemVersion
}
sort.Sort(semver.Collection(postgreSQLVersions))
for i, postgreSQLVersion := range postgreSQLVersions {
if fromVersion != nil {
if postgreSQLVersion.LessThan(fromVersion) || postgreSQLVersion.Equal(fromVersion) {
flogger.Debug("SKIP: PostgreSQL schema version '%v' is less or eqal then the local version changes '%v'", postgreSQLVersion.String(), fromVersion.String())
continue
}
}
asset := postgreSQLVersionChanges[postgreSQLVersion]
queryBytes, err := Asset(filepath.Join(assetPath, asset))
if err != nil {
return fmt.Errorf("Can not restore asset %v, %v", asset, err)
}
query := string(queryBytes)
if _, err := p.dbo.ExecContext(ctx, query); err != nil {
return fmt.Errorf("%v: %v", errorStatementExecute, err)
}
if i == 0 {
if err := p.InsertInfo(ctx, "version", postgreSQLVersion.String()); err != nil {
return fmt.Errorf("Can not insert version %v into info table: %v", postgreSQLVersion.String(), err)
}
} else {
if err := p.UpdateInfo(ctx, "version", postgreSQLVersion.String()); err != nil {
return fmt.Errorf("Can not update version %v into info table: %v", postgreSQLVersion.String(), err)
}
}
}
return nil
}
dbVersion, err := p.SelectInfo(ctx, "version")
if err != nil {
// can not select version from database, maybe the schema is not initialize
// create db schema for the current flucky version
return schemaFunc(ctx, nil, version)
} else {
fromVersion, err := semver.NewVersion(dbVersion)
if err != nil {
return fmt.Errorf("Can not create semantic version from database entry %v: %v", dbVersion, err)
}
return schemaFunc(ctx, fromVersion, version)
}
}
// DeleteDevices delete recursively all spicified devices, including sensors and
// all measured values
func (p *Postgres) DeleteDevices(ctx context.Context, devices []*types.Device) error {
asset := fmt.Sprintf("%v/deleteDevice.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
defer stmt.Close()
for _, device := range devices {
_, err := stmt.ExecContext(ctx, &device.ID)
if err != nil {
return fmt.Errorf("%v: %v", errorStatementExecute, err)
}
}
return nil
}
// DeleteSensors delete recusively all spicified sensors, including all measured
// values
func (p *Postgres) DeleteSensors(ctx context.Context, sensors []*types.Sensor) error {
asset := fmt.Sprintf("%v/deleteSensor.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
defer stmt.Close()
for _, sensor := range sensors {
_, err := stmt.ExecContext(ctx, &sensor.ID)
if err != nil {
return fmt.Errorf("%v: %v", errorStatementExecute, err)
}
}
return nil
}
// DeleteInfo delete a key with his value
func (p *Postgres) DeleteInfo(ctx context.Context, key string) error {
asset := fmt.Sprintf("%v/deleteInfo.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
defer stmt.Close()
_, err = stmt.ExecContext(ctx, &key)
if err != nil {
return fmt.Errorf("%v: %v", errorStatementExecute, err)
}
return nil
}
// DeleteMeasuredValues delete all spicified measured values
func (p *Postgres) DeleteMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error {
deleteMeasuredValue := func(ctx context.Context, query string, measuredValues []*types.MeasuredValue) error {
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
defer stmt.Close()
for _, measuredValue := range measuredValues {
_, err := stmt.ExecContext(ctx, &measuredValue.ID)
if err != nil {
return fmt.Errorf("%v: %v", errorStatementExecute, err)
}
}
return nil
}
sortedMeasuredValueTypes := make(map[types.MeasuredValueType][]*types.MeasuredValue)
for _, measuredValue := range measuredValues {
if _, ok := sortedMeasuredValueTypes[measuredValue.ValueType]; !ok {
sortedMeasuredValueTypes[measuredValue.ValueType] = make([]*types.MeasuredValue, 0)
}
sortedMeasuredValueTypes[measuredValue.ValueType] = append(sortedMeasuredValueTypes[measuredValue.ValueType], measuredValue)
}
assetFunc := func(queryFile string) (string, error) {
queryBytes, err := Asset(queryFile)
if err != nil {
return "", fmt.Errorf("%v: %v", errorGetAsset, err)
}
return string(queryBytes), nil
}
for measuredValueType, sortedMeasuredValues := range sortedMeasuredValueTypes {
switch measuredValueType {
case types.MeasuredValueTypeHumidity:
query, err := assetFunc(fmt.Sprintf("%v/deleteHumidity.sql", postgresAssetPath))
if err != nil {
return err
}
if err := deleteMeasuredValue(ctx, query, sortedMeasuredValues); err != nil {
return err
}
case types.MeasuredValueTypePressure:
query, err := assetFunc(fmt.Sprintf("%v/deletePressure.sql", postgresAssetPath))
if err != nil {
return err
}
if err := deleteMeasuredValue(ctx, query, sortedMeasuredValues); err != nil {
return err
}
case types.MeasuredValueTypeTemperature:
query, err := assetFunc(fmt.Sprintf("%v/deleteTemperature.sql", postgresAssetPath))
if err != nil {
return err
}
if err := deleteMeasuredValue(ctx, query, sortedMeasuredValues); err != nil {
return err
}
}
}
return nil
}
// InsertDevices insert all specified devices into the database
func (p *Postgres) InsertDevices(ctx context.Context, devices []*types.Device) error {
asset := fmt.Sprintf("%v/insertDevice.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
defer stmt.Close()
for _, device := range devices {
_, err := stmt.ExecContext(ctx, &device.ID, &device.Name, &device.Location, &device.CreationDate)
if err != nil {
return fmt.Errorf("%v: %v", errorStatementExecute, err)
}
}
return nil
}
// InsertInfo insert into the database additional informations, based on a key value syntax
func (p *Postgres) InsertInfo(ctx context.Context, key string, value string) error {
asset := fmt.Sprintf("%v/insertInfo.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
defer stmt.Close()
_, err = stmt.ExecContext(ctx, key, value)
if err != nil {
return fmt.Errorf("%v: %v", errorStatementExecute, err)
}
return nil
}
// InsertMeasuredValues insert all specified measured values into the database
func (p *Postgres) InsertMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error {
sortedMeasuredValueTypes := make(map[types.MeasuredValueType][]*types.MeasuredValue)
for _, measuredValue := range measuredValues {
if _, ok := sortedMeasuredValueTypes[measuredValue.ValueType]; !ok {
sortedMeasuredValueTypes[measuredValue.ValueType] = make([]*types.MeasuredValue, 0)
}
sortedMeasuredValueTypes[measuredValue.ValueType] = append(sortedMeasuredValueTypes[measuredValue.ValueType], measuredValue)
}
for measuredValueType, sortedMeasuredValues := range sortedMeasuredValueTypes {
switch measuredValueType {
case types.MeasuredValueTypeHumidity:
if err := p.insertHumidity(ctx, sortedMeasuredValues); err != nil {
return err
}
case types.MeasuredValueTypePressure:
if err := p.insertPressure(ctx, sortedMeasuredValues); err != nil {
return err
}
case types.MeasuredValueTypeTemperature:
if err := p.insertTemperature(ctx, sortedMeasuredValues); err != nil {
return err
}
}
}
return nil
}
func (p *Postgres) insertHumidity(ctx context.Context, measuredValues []*types.MeasuredValue) error {
asset := fmt.Sprintf("%v/insertHumidity.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
defer stmt.Close()
for _, measuredValue := range measuredValues {
if measuredValue.ValueType != types.MeasuredValueTypeHumidity {
continue
}
_, err := stmt.ExecContext(ctx, &measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate)
if err != nil {
return fmt.Errorf("%v: %v", errorStatementExecute, err)
}
}
return nil
}
func (p *Postgres) insertPressure(ctx context.Context, measuredValues []*types.MeasuredValue) error {
asset := fmt.Sprintf("%v/insertPressure.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
defer stmt.Close()
for _, measuredValue := range measuredValues {
if measuredValue.ValueType != types.MeasuredValueTypePressure {
continue
}
_, err := stmt.ExecContext(ctx, &measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate)
if err != nil {
return fmt.Errorf("%v: Measured value id %v: %v", errorStatementExecute, measuredValue.ID, err)
}
}
return nil
}
func (p *Postgres) insertTemperature(ctx context.Context, measuredValues []*types.MeasuredValue) error {
asset := fmt.Sprintf("%v/insertTemperature.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
defer stmt.Close()
for _, measuredValue := range measuredValues {
if measuredValue.ValueType != types.MeasuredValueTypeTemperature {
continue
}
_, err := stmt.ExecContext(ctx, &measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate)
if err != nil {
return fmt.Errorf("%v: Measured value id %v: %v", errorStatementExecute, measuredValue.ID, err)
}
}
return nil
}
// InsertSensors insert all specified sensors into the database
func (p *Postgres) InsertSensors(ctx context.Context, sensors []*types.Sensor) error {
asset := fmt.Sprintf("%v/insertSensor.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
defer stmt.Close()
for _, sensor := range sensors {
_, err := stmt.ExecContext(ctx, &sensor.ID, &sensor.Name, &sensor.Location, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.Model, &sensor.Enabled, &sensor.DeviceID, &sensor.CreationDate)
if err != nil {
return fmt.Errorf("%v: %v", errorStatementExecute, err)
}
}
return nil
}
// SelectDeviceByID returns a device by his ID
func (p *Postgres) SelectDeviceByID(ctx context.Context, id string) (*types.Device, error) {
asset := fmt.Sprintf("%v/selectDeviceByID.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return nil, fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
row := stmt.QueryRowContext(ctx, id)
if row == nil {
return nil, errorRowNotFound
}
device := new(types.Device)
err = row.Scan(&device.ID, &device.Name, &device.Location, &device.CreationDate)
if err != nil {
return nil, fmt.Errorf("%v: %v", errorScanRow, err)
}
return device, nil
}
// SelectInfo returns the value of a key stored in the database
func (p *Postgres) SelectInfo(ctx context.Context, key string) (string, error) {
asset := fmt.Sprintf("%v/selectInfo.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return "", fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return "", fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
row := stmt.QueryRowContext(ctx, key)
if row == nil {
return "", errorRowNotFound
}
value := ""
err = row.Scan(&value)
if err != nil {
return "", fmt.Errorf("%v: %v", errorScanRow, err)
}
return value, nil
}
// SelectHumidities returns humidity values
func (p *Postgres) SelectHumidities(ctx context.Context) ([]*types.MeasuredValue, error) {
queryFile := fmt.Sprintf("%v/selectHumidities.sql", postgresAssetPath)
measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeHumidity, queryFile, nil)
if err != nil {
return nil, err
}
return measuredValues, nil
}
// SelectHumidityByID returns a humidity value by his ID
func (p *Postgres) SelectHumidityByID(ctx context.Context, id string) (*types.MeasuredValue, error) {
queryFile := fmt.Sprintf("%v/selectHumidityByID.sql", postgresAssetPath)
args := []interface{}{id}
measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeHumidity, queryFile, args)
if err != nil {
return nil, err
}
if len(measuredValues) == 0 {
return nil, fmt.Errorf("%v: %v", errorRowNotFound, id)
}
return measuredValues[0], nil
}
// SelectMeasuredValues returns all measured values about all diffferent value
// types
func (p *Postgres) SelectMeasuredValues(ctx context.Context) ([]*types.MeasuredValue, error) {
measuredValues := make([]*types.MeasuredValue, 0)
// MeasuredValue query functions
queryFunctions := []func(ctx context.Context) ([]*types.MeasuredValue, error){
p.SelectHumidities,
p.SelectPressures,
p.SelectTemperatures,
}
// Execute query functions
for _, queryFunction := range queryFunctions {
queriedMeasuredValues, err := queryFunction(ctx)
if err != nil {
return nil, err
}
measuredValues = append(measuredValues, queriedMeasuredValues...)
}
return measuredValues, nil
}
// SelectMeasuredValuesByIDAndType returns a measured value by his ID and type
func (p *Postgres) SelectMeasuredValuesByIDAndType(ctx context.Context, id string, valueType types.MeasuredValueType) (*types.MeasuredValue, error) {
switch valueType {
case types.MeasuredValueTypeHumidity:
return p.SelectHumidityByID(ctx, id)
case types.MeasuredValueTypePressure:
return p.SelectPressureByID(ctx, id)
case types.MeasuredValueTypeTemperature:
return p.SelectTemperatureByID(ctx, id)
default:
return nil, fmt.Errorf("%v: %v", errorUnknownMeasuredValueType, valueType)
}
}
// SelectPressures returns pressure values
func (p *Postgres) SelectPressures(ctx context.Context) ([]*types.MeasuredValue, error) {
queryFile := fmt.Sprintf("%v/selectPressures.sql", postgresAssetPath)
measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypePressure, queryFile, nil)
if err != nil {
return nil, err
}
return measuredValues, nil
}
// SelectPressureByID returns a pressure value by his ID
func (p *Postgres) SelectPressureByID(ctx context.Context, id string) (*types.MeasuredValue, error) {
queryFile := fmt.Sprintf("%v/selectPressureByID.sql", postgresAssetPath)
args := []interface{}{id}
measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypePressure, queryFile, args)
if err != nil {
return nil, err
}
if len(measuredValues) == 0 {
return nil, fmt.Errorf("%v: %v", errorRowNotFound, id)
}
return measuredValues[0], nil
}
// SelectSensorByID returns a sensor by his ID
func (p *Postgres) SelectSensorByID(ctx context.Context, id string) (*types.Sensor, error) {
asset := fmt.Sprintf("%v/selectSensorByID.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return nil, fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
row := stmt.QueryRowContext(ctx, id)
if row == nil {
return nil, errorRowNotFound
}
sensor := new(types.Sensor)
err = row.Scan(&sensor.ID, &sensor.Name, &sensor.Location, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.Model, &sensor.Enabled, &sensor.DeviceID, &sensor.CreationDate)
if err != nil {
return nil, fmt.Errorf("%v: %v", errorScanRow, err)
}
return sensor, nil
}
// SelectTemperatures returns temperature values
func (p *Postgres) SelectTemperatures(ctx context.Context) ([]*types.MeasuredValue, error) {
queryFile := fmt.Sprintf("%v/selectTemperatures.sql", postgresAssetPath)
measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeTemperature, queryFile, nil)
if err != nil {
return nil, err
}
return measuredValues, nil
}
// SelectTemperatureByID returns a temperature value by his ID
func (p *Postgres) SelectTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error) {
queryFile := fmt.Sprintf("%v/selectTemperatureByID.sql", postgresAssetPath)
args := []interface{}{id}
measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeTemperature, queryFile, args)
if err != nil {
return nil, err
}
if len(measuredValues) == 0 {
return nil, fmt.Errorf("%v: %v", errorRowNotFound, id)
}
return measuredValues[0], nil
}
func (p *Postgres) selectMeasuredValues(ctx context.Context, measuredValueType types.MeasuredValueType, queryFile string, queryArgs []interface{}) ([]*types.MeasuredValue, error) {
queryBytes, err := Asset(queryFile)
if err != nil {
return nil, fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return nil, fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
rows, err := stmt.QueryContext(ctx, queryArgs...)
if err != nil {
return nil, fmt.Errorf("%v: %v", errorStatementQuery, err)
}
measuredValues := make([]*types.MeasuredValue, 0)
for rows.Next() {
measuredValue := new(types.MeasuredValue)
measuredValue.ValueType = measuredValueType
rows.Scan(&measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate)
measuredValues = append(measuredValues, measuredValue)
}
return measuredValues, nil
}
// UpdateDevices updates all specified devices into the database
func (p *Postgres) UpdateDevices(ctx context.Context, devices []*types.Device) error {
return nil
}
// UpdateInfo updates the value which is stored to a key in the database
func (p *Postgres) UpdateInfo(ctx context.Context, key string, value string) error {
asset := fmt.Sprintf("%v/updateInfo.sql", postgresAssetPath)
queryBytes, err := Asset(asset)
if err != nil {
return fmt.Errorf("%v: %v", errorGetAsset, err)
}
query := string(queryBytes)
stmt, err := p.dbo.PrepareContext(ctx, query)
if err != nil {
return fmt.Errorf("%v: %v", errorPrepareStatement, err)
}
defer stmt.Close()
res, err := stmt.ExecContext(ctx, key, value)
if err != nil {
return fmt.Errorf("%v: %v", errorStatementExecute, err)
}
affected, err := res.RowsAffected()
if err != nil {
return err
}
if affected == 0 {
return errorNoRowsAffected
}
return nil
}
// UpdateMeasuredValues updates the measured values which are stored in the database
func (p *Postgres) UpdateMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error {
return nil
}
// UpdateSensors updates the sensors which are stored in the database
func (p *Postgres) UpdateSensors(ctx context.Context, sensots []*types.Sensor) error {
return nil
}