package db import ( "context" "database/sql" "fmt" "path/filepath" "sort" "strings" "github.com/Masterminds/semver" "github.com/go-flucky/flucky/pkg/types" // PostgreSQL lib _ "github.com/lib/pq" ) var ( postgresAssetPath = "pkg/storage/db/sql/psql" ) // Postgres provide functions to interact with a postgres database type Postgres struct { dbo *sql.DB } // Close the database connection func (p *Postgres) Close() error { return p.dbo.Close() } // Schema create or updates the database schema to a given version. Normally the // version is the same as the flucky binary version. func (p *Postgres) Schema(ctx context.Context, version *semver.Version) error { schemaFunc := func(ctx context.Context, fromVersion *semver.Version, toVersion *semver.Version) error { assetPath := fmt.Sprintf("%v/schema", postgresAssetPath) sqlAssetFiles, err := AssetDir(assetPath) if err != nil { return fmt.Errorf("Can not restore asset directory %v: %v", assetPath, err) } postgreSQLVersionChanges := make(map[*semver.Version]string, 0) postgreSQLVersions := make([]*semver.Version, len(sqlAssetFiles)) for i, sqlAssetFile := range sqlAssetFiles { fileSemVersion, err := semver.NewVersion(strings.ReplaceAll(sqlAssetFile, ".sql", "")) if err != nil { return fmt.Errorf("Can not create semantic version from file asset %v: %v", sqlAssetFile, err) } postgreSQLVersionChanges[fileSemVersion] = sqlAssetFile postgreSQLVersions[i] = fileSemVersion } sort.Sort(semver.Collection(postgreSQLVersions)) for i, postgreSQLVersion := range postgreSQLVersions { if fromVersion != nil { if postgreSQLVersion.LessThan(fromVersion) || postgreSQLVersion.Equal(fromVersion) { flogger.Debug("SKIP: PostgreSQL schema version '%v' is less or eqal then the local version changes '%v'", postgreSQLVersion.String(), fromVersion.String()) continue } } asset := postgreSQLVersionChanges[postgreSQLVersion] queryBytes, err := Asset(filepath.Join(assetPath, asset)) if err != nil { return fmt.Errorf("Can not restore asset %v, %v", asset, err) } query := string(queryBytes) if _, err := p.dbo.ExecContext(ctx, query); err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } if i == 0 { if err := p.InsertInfo(ctx, "version", postgreSQLVersion.String()); err != nil { return fmt.Errorf("Can not insert version %v into info table: %v", postgreSQLVersion.String(), err) } } else { if err := p.UpdateInfo(ctx, "version", postgreSQLVersion.String()); err != nil { return fmt.Errorf("Can not update version %v into info table: %v", postgreSQLVersion.String(), err) } } } return nil } dbVersion, err := p.SelectInfo(ctx, "version") if err != nil { // can not select version from database, maybe the schema is not initialize // create db schema for the current flucky version return schemaFunc(ctx, nil, version) } else { fromVersion, err := semver.NewVersion(dbVersion) if err != nil { return fmt.Errorf("Can not create semantic version from database entry %v: %v", dbVersion, err) } return schemaFunc(ctx, fromVersion, version) } } // DeleteDevices delete recursively all spicified devices, including sensors and // all measured values func (p *Postgres) DeleteDevices(ctx context.Context, devices []*types.Device) error { asset := fmt.Sprintf("%v/deleteDevice.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, device := range devices { _, err := stmt.ExecContext(ctx, &device.ID) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } // DeleteSensors delete recusively all spicified sensors, including all measured // values func (p *Postgres) DeleteSensors(ctx context.Context, sensors []*types.Sensor) error { asset := fmt.Sprintf("%v/deleteSensor.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, sensor := range sensors { _, err := stmt.ExecContext(ctx, &sensor.ID) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } // DeleteMeasuredValues delete all spicified measured values func (p *Postgres) DeleteMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error { deleteMeasuredValue := func(ctx context.Context, query string, measuredValues []*types.MeasuredValue) error { stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, measuredValue := range measuredValues { _, err := stmt.ExecContext(ctx, &measuredValue.ID) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } sortedMeasuredValueTypes := make(map[types.MeasuredValueType][]*types.MeasuredValue) for _, measuredValue := range measuredValues { if _, ok := sortedMeasuredValueTypes[measuredValue.ValueType]; !ok { sortedMeasuredValueTypes[measuredValue.ValueType] = make([]*types.MeasuredValue, 0) } sortedMeasuredValueTypes[measuredValue.ValueType] = append(sortedMeasuredValueTypes[measuredValue.ValueType], measuredValue) } assetFunc := func(queryFile string) (string, error) { queryBytes, err := Asset(queryFile) if err != nil { return "", fmt.Errorf("%v: %v", errorGetAsset, err) } return string(queryBytes), nil } for measuredValueType, sortedMeasuredValues := range sortedMeasuredValueTypes { switch measuredValueType { case types.MeasuredValueTypeHumidity: query, err := assetFunc(fmt.Sprintf("%v/deleteHumidity.sql", postgresAssetPath)) if err != nil { return err } if err := deleteMeasuredValue(ctx, query, sortedMeasuredValues); err != nil { return err } case types.MeasuredValueTypePressure: query, err := assetFunc(fmt.Sprintf("%v/deletePressure.sql", postgresAssetPath)) if err != nil { return err } if err := deleteMeasuredValue(ctx, query, sortedMeasuredValues); err != nil { return err } case types.MeasuredValueTypeTemperature: query, err := assetFunc(fmt.Sprintf("%v/deleteTemperature.sql", postgresAssetPath)) if err != nil { return err } if err := deleteMeasuredValue(ctx, query, sortedMeasuredValues); err != nil { return err } } } return nil } // InsertDevices insert all specified devices into the database func (p *Postgres) InsertDevices(ctx context.Context, devices []*types.Device) error { asset := fmt.Sprintf("%v/insertDevice.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, device := range devices { _, err := stmt.ExecContext(ctx, &device.ID, &device.Name, &device.Location, &device.CreationDate) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } // InsertInfo insert into the database additional informations, based on a key value syntax func (p *Postgres) InsertInfo(ctx context.Context, key string, value string) error { asset := fmt.Sprintf("%v/insertInfo.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() _, err = stmt.ExecContext(ctx, key, value) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } return nil } // InsertMeasuredValues insert all specified measured values into the database func (p *Postgres) InsertMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error { sortedMeasuredValueTypes := make(map[types.MeasuredValueType][]*types.MeasuredValue) for _, measuredValue := range measuredValues { if _, ok := sortedMeasuredValueTypes[measuredValue.ValueType]; !ok { sortedMeasuredValueTypes[measuredValue.ValueType] = make([]*types.MeasuredValue, 0) } sortedMeasuredValueTypes[measuredValue.ValueType] = append(sortedMeasuredValueTypes[measuredValue.ValueType], measuredValue) } for measuredValueType, sortedMeasuredValues := range sortedMeasuredValueTypes { switch measuredValueType { case types.MeasuredValueTypeHumidity: if err := p.insertHumidity(ctx, sortedMeasuredValues); err != nil { return err } case types.MeasuredValueTypePressure: if err := p.insertPressure(ctx, sortedMeasuredValues); err != nil { return err } case types.MeasuredValueTypeTemperature: if err := p.insertTemperature(ctx, sortedMeasuredValues); err != nil { return err } } } return nil } func (p *Postgres) insertHumidity(ctx context.Context, measuredValues []*types.MeasuredValue) error { asset := fmt.Sprintf("%v/insertHumidity.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, measuredValue := range measuredValues { if measuredValue.ValueType != types.MeasuredValueTypeHumidity { continue } _, err := stmt.ExecContext(ctx, &measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } func (p *Postgres) insertPressure(ctx context.Context, measuredValues []*types.MeasuredValue) error { asset := fmt.Sprintf("%v/insertPressure.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, measuredValue := range measuredValues { if measuredValue.ValueType != types.MeasuredValueTypePressure { continue } _, err := stmt.ExecContext(ctx, &measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate) if err != nil { return fmt.Errorf("%v: Measured value id %v: %v", errorStatementExecute, measuredValue.ID, err) } } return nil } func (p *Postgres) insertTemperature(ctx context.Context, measuredValues []*types.MeasuredValue) error { asset := fmt.Sprintf("%v/insertTemperature.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, measuredValue := range measuredValues { if measuredValue.ValueType != types.MeasuredValueTypeTemperature { continue } _, err := stmt.ExecContext(ctx, &measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate) if err != nil { return fmt.Errorf("%v: Measured value id %v: %v", errorStatementExecute, measuredValue.ID, err) } } return nil } // InsertSensors insert all specified sensors into the database func (p *Postgres) InsertSensors(ctx context.Context, sensors []*types.Sensor) error { asset := fmt.Sprintf("%v/insertSensor.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, sensor := range sensors { _, err := stmt.ExecContext(ctx, &sensor.ID, &sensor.Name, &sensor.Location, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.Model, &sensor.Enabled, &sensor.DeviceID, &sensor.CreationDate) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } // SelectDeviceByID returns a device by his ID func (p *Postgres) SelectDeviceByID(ctx context.Context, id string) (*types.Device, error) { asset := fmt.Sprintf("%v/selectDeviceByID.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return nil, fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return nil, fmt.Errorf("%v: %v", errorPrepareStatement, err) } row := stmt.QueryRowContext(ctx, id) if row == nil { return nil, errorRowNotFound } device := new(types.Device) err = row.Scan(&device.ID, &device.Name, &device.Location, &device.CreationDate) if err != nil { return nil, fmt.Errorf("%v: %v", errorScanRow, err) } return device, nil } // SelectInfo returns the value of a key stored in the database func (p *Postgres) SelectInfo(ctx context.Context, key string) (string, error) { asset := fmt.Sprintf("%v/selectInfo.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return "", fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return "", fmt.Errorf("%v: %v", errorPrepareStatement, err) } row := stmt.QueryRowContext(ctx, key) if row == nil { return "", errorRowNotFound } value := "" err = row.Scan(&value) if err != nil { return "", fmt.Errorf("%v: %v", errorScanRow, err) } return value, nil } // SelectHumidities returns humidity values func (p *Postgres) SelectHumidities(ctx context.Context) ([]*types.MeasuredValue, error) { queryFile := fmt.Sprintf("%v/selectHumidities.sql", postgresAssetPath) measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeHumidity, queryFile, nil) if err != nil { return nil, err } return measuredValues, nil } // SelectHumidityByID returns a humidity value by his ID func (p *Postgres) SelectHumidityByID(ctx context.Context, id string) (*types.MeasuredValue, error) { queryFile := fmt.Sprintf("%v/selectHumidityByID.sql", postgresAssetPath) args := []interface{}{id} measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeHumidity, queryFile, args) if err != nil { return nil, err } if len(measuredValues) == 0 { return nil, fmt.Errorf("%v: %v", errorRowNotFound, id) } return measuredValues[0], nil } // SelectMeasuredValues returns all measured values about all diffferent value // types func (p *Postgres) SelectMeasuredValues(ctx context.Context) ([]*types.MeasuredValue, error) { measuredValues := make([]*types.MeasuredValue, 0) // MeasuredValue query functions queryFunctions := []func(ctx context.Context) ([]*types.MeasuredValue, error){ p.SelectHumidities, p.SelectPressures, p.SelectTemperatures, } // Execute query functions for _, queryFunction := range queryFunctions { queriedMeasuredValues, err := queryFunction(ctx) if err != nil { return nil, err } measuredValues = append(measuredValues, queriedMeasuredValues...) } return measuredValues, nil } // SelectMeasuredValuesByIDAndType returns a measured value by his ID and type func (p *Postgres) SelectMeasuredValuesByIDAndType(ctx context.Context, id string, valueType types.MeasuredValueType) (*types.MeasuredValue, error) { switch valueType { case types.MeasuredValueTypeHumidity: return p.SelectHumidityByID(ctx, id) case types.MeasuredValueTypePressure: return p.SelectPressureByID(ctx, id) case types.MeasuredValueTypeTemperature: return p.SelectTemperatureByID(ctx, id) default: return nil, fmt.Errorf("%v: %v", errorUnknownMeasuredValueType, valueType) } } // SelectPressures returns pressure values func (p *Postgres) SelectPressures(ctx context.Context) ([]*types.MeasuredValue, error) { queryFile := fmt.Sprintf("%v/selectPressures.sql", postgresAssetPath) measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypePressure, queryFile, nil) if err != nil { return nil, err } return measuredValues, nil } // SelectPressureByID returns a pressure value by his ID func (p *Postgres) SelectPressureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { queryFile := fmt.Sprintf("%v/selectPressureByID.sql", postgresAssetPath) args := []interface{}{id} measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypePressure, queryFile, args) if err != nil { return nil, err } if len(measuredValues) == 0 { return nil, fmt.Errorf("%v: %v", errorRowNotFound, id) } return measuredValues[0], nil } // SelectSensorByID returns a sensor by his ID func (p *Postgres) SelectSensorByID(ctx context.Context, id string) (*types.Sensor, error) { asset := fmt.Sprintf("%v/selectSensorByID.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return nil, fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return nil, fmt.Errorf("%v: %v", errorPrepareStatement, err) } row := stmt.QueryRowContext(ctx, id) if row == nil { return nil, errorRowNotFound } sensor := new(types.Sensor) err = row.Scan(&sensor.ID, &sensor.Name, &sensor.Location, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.Model, &sensor.Enabled, &sensor.DeviceID, &sensor.CreationDate) if err != nil { return nil, fmt.Errorf("%v: %v", errorScanRow, err) } return sensor, nil } // SelectTemperatures returns temperature values func (p *Postgres) SelectTemperatures(ctx context.Context) ([]*types.MeasuredValue, error) { queryFile := fmt.Sprintf("%v/selectTemperatures.sql", postgresAssetPath) measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeTemperature, queryFile, nil) if err != nil { return nil, err } return measuredValues, nil } // SelectTemperatureByID returns a temperature value by his ID func (p *Postgres) SelectTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { queryFile := fmt.Sprintf("%v/selectTemperatureByID.sql", postgresAssetPath) args := []interface{}{id} measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeTemperature, queryFile, args) if err != nil { return nil, err } if len(measuredValues) == 0 { return nil, fmt.Errorf("%v: %v", errorRowNotFound, id) } return measuredValues[0], nil } func (p *Postgres) selectMeasuredValues(ctx context.Context, measuredValueType types.MeasuredValueType, queryFile string, queryArgs []interface{}) ([]*types.MeasuredValue, error) { queryBytes, err := Asset(queryFile) if err != nil { return nil, fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return nil, fmt.Errorf("%v: %v", errorPrepareStatement, err) } rows, err := stmt.QueryContext(ctx, queryArgs...) if err != nil { return nil, fmt.Errorf("%v: %v", errorStatementQuery, err) } measuredValues := make([]*types.MeasuredValue, 0) for rows.Next() { measuredValue := new(types.MeasuredValue) measuredValue.ValueType = measuredValueType rows.Scan(&measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate) measuredValues = append(measuredValues, measuredValue) } return measuredValues, nil } // UpdateDevices updates all specified devices into the database func (p *Postgres) UpdateDevices(ctx context.Context, devices []*types.Device) error { return nil } // UpdateInfo updates the value which is stored to a key in the database func (p *Postgres) UpdateInfo(ctx context.Context, key string, value string) error { asset := fmt.Sprintf("%v/updateInfo.sql", postgresAssetPath) queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() _, err = stmt.ExecContext(ctx, key, value) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } return nil } // UpdateMeasuredValues updates the measured values which are stored in the database func (p *Postgres) UpdateMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error { return nil } // UpdateSensors updates the sensors which are stored in the database func (p *Postgres) UpdateSensors(ctx context.Context, sensots []*types.Sensor) error { return nil }