fix: new implementation
changes: - Remove cli Some cli commands are not complete tested and are deprecated. - Daemon - Old version has a very bad implementation of how to verify, if the device or the sensors are in the database insert. The current implementation can be improved but this one is betten then the old one. - Remove complete the cache store implementation. Use a normal array and query the length and capacity to determine how the array cache must be cleaned. - Type Remove unused types and functions
This commit is contained in:
@ -2,132 +2,239 @@ package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"math"
|
||||
"net/url"
|
||||
"sort"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/volker-raschek/flucky/pkg/internal/format"
|
||||
"github.com/volker-raschek/flucky/pkg/storage/db"
|
||||
"github.com/volker-raschek/flucky/pkg/storage/logfile"
|
||||
_ "github.com/lib/pq"
|
||||
"github.com/volker-raschek/flucky/pkg/types"
|
||||
"github.com/volker-raschek/go-logger/pkg/logger"
|
||||
)
|
||||
|
||||
// Compression the measured values. The system checks whether the measured values
|
||||
// of the same type correspond to those of the predecessor. If this is the case,
|
||||
// the current value is discarded and the validity date of the previous value is
|
||||
// set to that of the current value. This means that no information is lost.
|
||||
// Only the validity period of the measured value is increased.
|
||||
func Compression(measuredValues []*types.MeasuredValue) []*types.MeasuredValue {
|
||||
compressedMeasuredValues := make([]*types.MeasuredValue, 0)
|
||||
lastMeasuredValuesBySensors := make(map[string]map[types.MeasuredValueType]*types.MeasuredValue, 0)
|
||||
|
||||
// Sort all measured values according to the start time of the validity date
|
||||
// in order to successfully implement the subsequent compression.
|
||||
sort.SliceStable(measuredValues, func(i int, j int) bool {
|
||||
return measuredValues[i].FromDate.Before(measuredValues[j].TillDate)
|
||||
})
|
||||
|
||||
now := format.FormatedTime()
|
||||
|
||||
for _, measuredValue := range measuredValues {
|
||||
|
||||
// If the sensor id does not exist in the map, a new map is initialized,
|
||||
// which can assume measured value types as the key. Behind this key there
|
||||
// is a pointer which refers to a measured value in the memory. This new map
|
||||
// is added to the map "lastMeasuredValuesBySensors" under the sensor ID.
|
||||
// This makes it possible to store one measured value per measured value
|
||||
// type per sensor.
|
||||
if _, ok := lastMeasuredValuesBySensors[measuredValue.SensorID]; !ok {
|
||||
lastMeasuredValuesBySensors[measuredValue.SensorID] = make(map[types.MeasuredValueType]*types.MeasuredValue, 0)
|
||||
}
|
||||
|
||||
if _, ok := lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType]; !ok {
|
||||
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType] = measuredValue
|
||||
continue
|
||||
}
|
||||
|
||||
if lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].Value == measuredValue.Value {
|
||||
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].TillDate = measuredValue.TillDate
|
||||
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].UpdateDate = &now
|
||||
} else if lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].Value != measuredValue.Value {
|
||||
compressedMeasuredValues = append(compressedMeasuredValues, lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType])
|
||||
delete(lastMeasuredValuesBySensors[measuredValue.SensorID], measuredValue.ValueType)
|
||||
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType] = measuredValue
|
||||
}
|
||||
}
|
||||
|
||||
// Copy all remaining entries from the map into the cache array
|
||||
for _, lastMeasuredValuesBySensor := range lastMeasuredValuesBySensors {
|
||||
for _, measuredValueType := range types.MeasuredValueTypes {
|
||||
if measuredValue, ok := lastMeasuredValuesBySensor[measuredValueType]; ok {
|
||||
compressedMeasuredValues = append(compressedMeasuredValues, measuredValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort all measured values again to include the measured values from the
|
||||
// cache.
|
||||
sort.SliceStable(compressedMeasuredValues, func(i int, j int) bool {
|
||||
return compressedMeasuredValues[i].FromDate.Before(compressedMeasuredValues[j].FromDate)
|
||||
})
|
||||
|
||||
return compressedMeasuredValues
|
||||
// Storage is a general interface for a storage endpoint
|
||||
type Storage interface {
|
||||
InsertDevice(ctx context.Context, device *types.Device) error
|
||||
InsertMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error
|
||||
InsertSensor(ctx context.Context, sensor *types.Sensor) error
|
||||
SelectDevice(ctx context.Context, id string) (*types.Device, error)
|
||||
SelectSensor(ctx context.Context, id string) (*types.Sensor, error)
|
||||
}
|
||||
|
||||
// Read measured values from the given storage endpoint url. The scheme must be
|
||||
// matched to a provider, if the scheme is not implemented, the function
|
||||
// returns an error
|
||||
func Read(ctx context.Context, storageEndpoint *url.URL) ([]*types.MeasuredValue, error) {
|
||||
switch storageEndpoint.Scheme {
|
||||
case "file":
|
||||
measuredValueLogfile := logfile.New(storageEndpoint.Path)
|
||||
return measuredValueLogfile.Read()
|
||||
var (
|
||||
postgresAssetPath = "pkg/storage/postgres"
|
||||
)
|
||||
|
||||
// Postgres implementation
|
||||
type Postgres struct {
|
||||
dbo *sql.DB
|
||||
flogger logger.Logger
|
||||
}
|
||||
|
||||
// InsertDevice into the database
|
||||
func (postgres *Postgres) InsertDevice(ctx context.Context, device *types.Device) error {
|
||||
asset := filepath.Join(postgresAssetPath, "insertDevice.sql")
|
||||
queryBytes, err := Asset(asset)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to load asset %v: %v", asset, err)
|
||||
}
|
||||
query := string(queryBytes)
|
||||
|
||||
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to begin new transaction: %v", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to prepare statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
_, err = stmt.Exec(&device.ID, &device.Name, &device.Location, &device.CreationDate)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return fmt.Errorf("Failed to execute statement: %v", err)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// InsertMeasuredValues into the database
|
||||
func (postgres *Postgres) InsertMeasuredValues(ctx context.Context, measuredValues []*types.MeasuredValue) error {
|
||||
splittedMeasuredValues := make(map[string][]*types.MeasuredValue, 0)
|
||||
|
||||
for _, measuredValue := range measuredValues {
|
||||
if _, ok := splittedMeasuredValues[measuredValue.ValueType]; !ok {
|
||||
splittedMeasuredValues[measuredValue.ValueType] = make([]*types.MeasuredValue, 0)
|
||||
}
|
||||
splittedMeasuredValues[measuredValue.ValueType] = append(splittedMeasuredValues[measuredValue.ValueType], measuredValue)
|
||||
}
|
||||
|
||||
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to begin new transaction: %v", err)
|
||||
}
|
||||
|
||||
// General insert function
|
||||
insert := func(tx *sql.Tx, asset string, measuredValues []*types.MeasuredValue) error {
|
||||
queryBytes, err := Asset(asset)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to load asset %v: %v", asset, err)
|
||||
}
|
||||
query := string(queryBytes)
|
||||
|
||||
stmt, err := tx.Prepare(query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to prepare statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
for _, measuredValue := range measuredValues {
|
||||
_, err := stmt.Exec(&measuredValue.ID, &measuredValue.Value, &measuredValue.FromDate, &measuredValue.TillDate, &measuredValue.SensorID, &measuredValue.CreationDate, &measuredValue.UpdateDate)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to execute statement: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
for measuredValueType, measuredValues := range splittedMeasuredValues {
|
||||
var asset string
|
||||
|
||||
switch measuredValueType {
|
||||
case "humidity":
|
||||
asset = filepath.Join(postgresAssetPath, "insertHumidity.sql")
|
||||
case "pressure":
|
||||
asset = filepath.Join(postgresAssetPath, "insertPressure.sql")
|
||||
case "temperature":
|
||||
asset = filepath.Join(postgresAssetPath, "insertTemperature.sql")
|
||||
default:
|
||||
tx.Rollback()
|
||||
return fmt.Errorf("Measured value type %v not supported", measuredValueType)
|
||||
}
|
||||
|
||||
err := insert(tx, asset, measuredValues)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// InsertSensor into the database
|
||||
func (postgres *Postgres) InsertSensor(ctx context.Context, sensor *types.Sensor) error {
|
||||
asset := filepath.Join(postgresAssetPath, "insertSensor.sql")
|
||||
queryBytes, err := Asset(asset)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to load asset %v: %v", asset, err)
|
||||
}
|
||||
query := string(queryBytes)
|
||||
|
||||
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: false})
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to begin new transaction: %v", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to prepare statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
_, err = stmt.Exec(&sensor.ID, &sensor.Name, &sensor.Location, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.Model, &sensor.Enabled, &sensor.DeviceID, &sensor.CreationDate)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
return fmt.Errorf("Failed to execute statement: %v", err)
|
||||
}
|
||||
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// SelectDevice from database
|
||||
func (postgres *Postgres) SelectDevice(ctx context.Context, id string) (*types.Device, error) {
|
||||
asset := filepath.Join(postgresAssetPath, "selectDeviceByID.sql")
|
||||
queryBytes, err := Asset(asset)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to load asset %v: %v", asset, err)
|
||||
}
|
||||
query := string(queryBytes)
|
||||
|
||||
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to begin new transaction: %v", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to prepare statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
row := stmt.QueryRow(id)
|
||||
|
||||
device := new(types.Device)
|
||||
err = row.Scan(&device.ID, &device.Name, &device.Location, &device.CreationDate)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to scan row: %v", err)
|
||||
}
|
||||
|
||||
return device, nil
|
||||
}
|
||||
|
||||
// SelectSensor from database
|
||||
func (postgres *Postgres) SelectSensor(ctx context.Context, id string) (*types.Sensor, error) {
|
||||
asset := filepath.Join(postgresAssetPath, "selectSensorByID.sql")
|
||||
queryBytes, err := Asset(asset)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to load asset %v: %v", asset, err)
|
||||
}
|
||||
query := string(queryBytes)
|
||||
|
||||
tx, err := postgres.dbo.BeginTx(ctx, &sql.TxOptions{ReadOnly: true})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to begin new transaction: %v", err)
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare(query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to prepare statement: %v", err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
|
||||
row := stmt.QueryRow(id)
|
||||
|
||||
sensor := new(types.Sensor)
|
||||
err = row.Scan(&sensor.ID, &sensor.Name, &sensor.Location, &sensor.WireID, &sensor.I2CBus, &sensor.I2CAddress, &sensor.GPIONumber, &sensor.Model, &sensor.Enabled, &sensor.DeviceID, &sensor.CreationDate)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to scan row: %v", err)
|
||||
}
|
||||
|
||||
return sensor, nil
|
||||
}
|
||||
|
||||
// New returns a new storage provider
|
||||
func New(storageEndpoint string, flogger logger.Logger) (Storage, error) {
|
||||
storageEndpointURL, err := url.Parse(storageEndpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch storageEndpointURL.Scheme {
|
||||
case "postgres":
|
||||
database, err := db.New(storageEndpoint)
|
||||
newDBO, err := sql.Open(storageEndpointURL.Scheme, storageEndpointURL.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer database.Close()
|
||||
return database.SelectMeasuredValues(ctx)
|
||||
}
|
||||
return nil, fmt.Errorf("No supported scheme")
|
||||
}
|
||||
|
||||
func Round(measuredValues []*types.MeasuredValue, round float64) {
|
||||
for _, measuredValue := range measuredValues {
|
||||
measuredValue.Value = math.Round(measuredValue.Value/round) * round
|
||||
}
|
||||
}
|
||||
|
||||
// Write measured values to the given storage endpoint url. If the storage
|
||||
// provider defined to a file, the data will be overwritten. If a database
|
||||
// provider is used, the data is simply added without deleting the existing
|
||||
// data. The scheme must be matched to a storage provider, if the scheme is not
|
||||
// implemented, the function returns an error
|
||||
func Write(ctx context.Context, measuredValues []*types.MeasuredValue, storageEndpoint *url.URL) error {
|
||||
writeCreationDate(measuredValues)
|
||||
switch storageEndpoint.Scheme {
|
||||
case "file":
|
||||
measuredValueLogfile := logfile.New(storageEndpoint.Path)
|
||||
return measuredValueLogfile.Write(measuredValues)
|
||||
case "postgres":
|
||||
database, err := db.New(storageEndpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer database.Close()
|
||||
return database.InsertMeasuredValues(ctx, measuredValues)
|
||||
return &Postgres{
|
||||
dbo: newDBO,
|
||||
flogger: flogger,
|
||||
}, nil
|
||||
default:
|
||||
return fmt.Errorf("No supported scheme")
|
||||
}
|
||||
}
|
||||
|
||||
func writeCreationDate(measuredValues []*types.MeasuredValue) {
|
||||
now := format.FormatedTime()
|
||||
for _, measuredValue := range measuredValues {
|
||||
if measuredValue.CreationDate == nil {
|
||||
measuredValue.CreationDate = &now
|
||||
}
|
||||
return nil, fmt.Errorf("Unsupported database scheme: %v", storageEndpointURL.Scheme)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user