package db import ( "context" "database/sql" "fmt" "path/filepath" "strings" "github.com/Masterminds/semver" "github.com/go-flucky/flucky/pkg/types" _ "github.com/lib/pq" ) type Postgres struct { dbo *sql.DB } func (p *Postgres) Close() error { return p.Close() } // Schema create or upgrade database schema to the version of the flucky binary func (p *Postgres) Schema(ctx context.Context, version *semver.Version) error { query := "SELECT value FROM info WHERE key='version';" stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { asset := "pkg/db/sql/psql/schema.sql" queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query = string(queryBytes) _, err = p.dbo.ExecContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } return nil } schemaVersion := "" row := stmt.QueryRowContext(ctx) err = row.Scan(&schemaVersion) if err != nil { return fmt.Errorf("%v: %v", errorScanRow, err) } fromVersion, err := semver.NewVersion(schemaVersion) if err != nil { return fmt.Errorf("Can not create new sematic version from database: %v", err) } if err := schema(fromVersion, version); err != nil { return err } // The result will be 0 if from == to, -1 if from < to, or +1 if from > to. switch fromVersion.Compare(version) { case -1: return schema(fromVersion, version) case 0: // fromVersion and toVersion are equal return nil case 1: // fromVersion < toVersion return fmt.Errorf("Can not downgrade the database schema. Update the flucky binary") } return nil } func (p *Postgres) DeleteDevices(ctx context.Context, devices []*types.Device) error { asset := "pkg/db/sql/psql/deleteDevice.sql" queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, device := range devices { _, err := stmt.ExecContext(ctx, &device.DeviceID) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } func (p *Postgres) DeleteSensors(ctx context.Context, sensors []*types.Sensor) error { asset := "pkg/db/sql/psql/deleteSensor.sql" queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, sensor := range sensors { _, err := stmt.ExecContext(ctx, &sensor.SensorID) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } func (p *Postgres) DeleteMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error { deleteMeasuredValue := func(ctx context.Context, query string, measuredValues []*types.MeasuredValue) error { stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, measuredValue := range measuredValues { _, err := stmt.ExecContext(ctx, &measuredValue.ID) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } sortedMeasuredValueTypes := make(map[types.MeasuredValueType][]*types.MeasuredValue) for _, measuredValue := range measuredValues { if _, ok := sortedMeasuredValueTypes[measuredValue.ValueType]; !ok { sortedMeasuredValueTypes[measuredValue.ValueType] = make([]*types.MeasuredValue, 0) } sortedMeasuredValueTypes[measuredValue.ValueType] = append(sortedMeasuredValueTypes[measuredValue.ValueType], measuredValue) } assetFunc := func(queryFile string) (string, error) { queryBytes, err := Asset(queryFile) if err != nil { return "", fmt.Errorf("%v: %v", errorGetAsset, err) } return string(queryBytes), nil } for measuredValueType, sortedMeasuredValues := range sortedMeasuredValueTypes { switch measuredValueType { case types.MeasuredValueTypeHumidity: query, err := assetFunc("pkg/db/sql/psql/deleteHumidity.sql") if err != nil { return err } if err := deleteMeasuredValue(ctx, query, sortedMeasuredValues); err != nil { return err } case types.MeasuredValueTypePressure: query, err := assetFunc("pkg/db/sql/psql/deletePressure.sql") if err != nil { return err } if err := deleteMeasuredValue(ctx, query, sortedMeasuredValues); err != nil { return err } case types.MeasuredValueTypeTemperature: query, err := assetFunc("pkg/db/sql/psql/deleteTemperature.sql") if err != nil { return err } if err := deleteMeasuredValue(ctx, query, sortedMeasuredValues); err != nil { return err } } } return nil } func (p *Postgres) InsertDevices(ctx context.Context, devices []*types.Device) error { asset := "pkg/db/sql/psql/insertDevice.sql" queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, device := range devices { _, err := stmt.ExecContext(ctx, &device.DeviceID, &device.DeviceName, &device.DeviceLocation, &device.CreationDate) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } func (p *Postgres) InsertMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error { sortedMeasuredValueTypes := make(map[types.MeasuredValueType][]*types.MeasuredValue) for _, measuredValue := range measuredValues { if _, ok := sortedMeasuredValueTypes[measuredValue.ValueType]; !ok { sortedMeasuredValueTypes[measuredValue.ValueType] = make([]*types.MeasuredValue, 0) } sortedMeasuredValueTypes[measuredValue.ValueType] = append(sortedMeasuredValueTypes[measuredValue.ValueType], measuredValue) } for measuredValueType, sortedMeasuredValues := range sortedMeasuredValueTypes { switch measuredValueType { case types.MeasuredValueTypeHumidity: if err := p.insertHumidity(ctx, sortedMeasuredValues); err != nil { return err } case types.MeasuredValueTypePressure: if err := p.insertPressure(ctx, sortedMeasuredValues); err != nil { return err } case types.MeasuredValueTypeTemperature: if err := p.insertTemperature(ctx, sortedMeasuredValues); err != nil { return err } } } return nil } func (p *Postgres) insertHumidity(ctx context.Context, measuredValues []*types.MeasuredValue) error { asset := "pkg/db/sql/psql/insertHumidity.sql" queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, measuredValue := range measuredValues { if measuredValue.ValueType != types.MeasuredValueTypeHumidity { continue } _, err := stmt.ExecContext(ctx, &measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } func (p *Postgres) insertPressure(ctx context.Context, measuredValues []*types.MeasuredValue) error { asset := "pkg/db/sql/psql/insertPressure.sql" queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, measuredValue := range measuredValues { if measuredValue.ValueType != types.MeasuredValueTypePressure { continue } _, err := stmt.ExecContext(ctx, &measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } func (p *Postgres) insertTemperature(ctx context.Context, measuredValues []*types.MeasuredValue) error { asset := "pkg/db/sql/psql/insertTemperature.sql" queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, measuredValue := range measuredValues { if measuredValue.ValueType != types.MeasuredValueTypeTemperature { continue } _, err := stmt.ExecContext(ctx, &measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } func (p *Postgres) InsertSensors(ctx context.Context, sensors []*types.Sensor) error { asset := "pkg/db/sql/psql/insertSensor.sql" queryBytes, err := Asset(asset) if err != nil { return fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return fmt.Errorf("%v: %v", errorPrepareStatement, err) } defer stmt.Close() for _, sensor := range sensors { _, err := stmt.ExecContext(ctx, &sensor.SensorID, &sensor.SensorName, &sensor.SensorLocation, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.SensorModel, &sensor.SensorEnabled, &sensor.DeviceID, &sensor.CreationDate) if err != nil { return fmt.Errorf("%v: %v", errorStatementExecute, err) } } return nil } func (p *Postgres) SelectDeviceByID(ctx context.Context, id string) (*types.Device, error) { asset := "pkg/db/sql/psql/selectDeviceByID.sql" queryBytes, err := Asset(asset) if err != nil { return nil, fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return nil, fmt.Errorf("%v: %v", errorPrepareStatement, err) } row := stmt.QueryRowContext(ctx, id) if row == nil { return nil, errorRowNotFound } device := new(types.Device) err = row.Scan(&device.DeviceID, &device.DeviceName, &device.DeviceLocation, &device.CreationDate) if err != nil { return nil, fmt.Errorf("%v: %v", errorScanRow, err) } return device, nil } func (p *Postgres) SelectMeasuredValuesByIDAndType(ctx context.Context, id string, valueType types.MeasuredValueType) (*types.MeasuredValue, error) { switch valueType { case types.MeasuredValueTypeHumidity: return p.SelectHumidityByID(ctx, id) case types.MeasuredValueTypePressure: return p.SelectPressureByID(ctx, id) case types.MeasuredValueTypeTemperature: return p.SelectTemperatureByID(ctx, id) default: return nil, fmt.Errorf("%v: %v", errorUnknownMeasuredValueType, valueType) } } func (p *Postgres) SelectHumidities(ctx context.Context) ([]*types.MeasuredValue, error) { queryFile := "pkg/db/sql/psql/selectHumidities.sql" measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeHumidity, queryFile, nil) if err != nil { return nil, err } return measuredValues, nil } func (p *Postgres) SelectHumidityByID(ctx context.Context, id string) (*types.MeasuredValue, error) { queryFile := "pkg/db/sql/psql/selectHumidityByID.sql" args := []interface{}{id} measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeHumidity, queryFile, args) if err != nil { return nil, err } if len(measuredValues) == 0 { return nil, fmt.Errorf("%v: %v", errorRowNotFound, id) } return measuredValues[0], nil } func (p *Postgres) SelectPressures(ctx context.Context) ([]*types.MeasuredValue, error) { queryFile := "pkg/db/sql/psql/selectPressures.sql" measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypePressure, queryFile, nil) if err != nil { return nil, err } return measuredValues, nil } func (p *Postgres) SelectPressureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { queryFile := "pkg/db/sql/psql/selectPressureByID.sql" args := []interface{}{id} measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypePressure, queryFile, args) if err != nil { return nil, err } if len(measuredValues) == 0 { return nil, fmt.Errorf("%v: %v", errorRowNotFound, id) } return measuredValues[0], nil } func (p *Postgres) SelectSensorByID(ctx context.Context, id string) (*types.Sensor, error) { asset := "pkg/db/sql/psql/selectSensorByID.sql" queryBytes, err := Asset(asset) if err != nil { return nil, fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return nil, fmt.Errorf("%v: %v", errorPrepareStatement, err) } row := stmt.QueryRowContext(ctx, id) if row == nil { return nil, errorRowNotFound } sensor := new(types.Sensor) err = row.Scan(&sensor.SensorID, &sensor.SensorName, &sensor.SensorLocation, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.SensorModel, &sensor.SensorEnabled, &sensor.DeviceID, &sensor.CreationDate) if err != nil { return nil, fmt.Errorf("%v: %v", errorScanRow, err) } return sensor, nil } func (p *Postgres) SelectTemperatures(ctx context.Context) ([]*types.MeasuredValue, error) { queryFile := "pkg/db/sql/psql/selectTemperatures.sql" measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeTemperature, queryFile, nil) if err != nil { return nil, err } return measuredValues, nil } func (p *Postgres) SelectTemperatureByID(ctx context.Context, id string) (*types.MeasuredValue, error) { queryFile := "pkg/db/sql/psql/selectTemperatureByID.sql" args := []interface{}{id} measuredValues, err := p.selectMeasuredValues(ctx, types.MeasuredValueTypeTemperature, queryFile, args) if err != nil { return nil, err } if len(measuredValues) == 0 { return nil, fmt.Errorf("%v: %v", errorRowNotFound, id) } return measuredValues[0], nil } func (p *Postgres) selectMeasuredValues(ctx context.Context, measuredValueType types.MeasuredValueType, queryFile string, queryArgs []interface{}) ([]*types.MeasuredValue, error) { queryBytes, err := Asset(queryFile) if err != nil { return nil, fmt.Errorf("%v: %v", errorGetAsset, err) } query := string(queryBytes) stmt, err := p.dbo.PrepareContext(ctx, query) if err != nil { return nil, fmt.Errorf("%v: %v", errorPrepareStatement, err) } rows, err := stmt.QueryContext(ctx, queryArgs...) if err != nil { return nil, fmt.Errorf("%v: %v", errorStatementQuery, err) } measuredValues := make([]*types.MeasuredValue, 0) for rows.Next() { measuredValue := new(types.MeasuredValue) measuredValue.ValueType = measuredValueType rows.Scan(&measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate) measuredValues = append(measuredValues, measuredValue) } return measuredValues, nil } func (p *Postgres) UpdateDevices(ctx context.Context, devices []*types.Device) error { return nil } func (p *Postgres) UpdateMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error { return nil } func (p *Postgres) UpdateSensors(ctx context.Context, sensots []*types.Sensor) error { return nil } func schema(fromVersion *semver.Version, toVersion *semver.Version) error { sqlAssetFiles, err := parseAssetDir("pkg/db/sql/psql/schema") if err != nil { return fmt.Errorf("Can not restore sql files: %v", err) } for _, sqlAssetFile := range sqlAssetFiles { version := strings.Replace() sqlSchemaInformations := &schemaInformation{ Version: filepath.Split() Asset: sqlAssetFile, } } return nil } func parseAssetDir(dir string) ([]string, error) { assetFiles, err := AssetDir(dir) if err != nil { return nil, fmt.Errorf("Can not load asset directory %v: %v", dir, err) } goldenAssetFiles := assetFiles for i, assetFile := range assetFiles { if strings.HasPrefix(assetFile, "/") { // is directory- SKIP goldenAssetFiles = append(goldenAssetFiles[:i], goldenAssetFiles[i+1:]...) } } return goldenAssetFiles, nil }