130 lines
5.3 KiB
Go
130 lines
5.3 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"net/url"
|
|
"sort"
|
|
|
|
"github.com/go-flucky/flucky/pkg/internal/format"
|
|
"github.com/go-flucky/flucky/pkg/storage/db"
|
|
"github.com/go-flucky/flucky/pkg/storage/logfile"
|
|
"github.com/go-flucky/flucky/pkg/types"
|
|
)
|
|
|
|
// Compression the measured values. The system checks whether the measured values
|
|
// of the same type correspond to those of the predecessor. If this is the case,
|
|
// the current value is discarded and the validity date of the previous value is
|
|
// set to that of the current value. This means that no information is lost.
|
|
// Only the validity period of the measured value is increased.
|
|
func Compression(measuredValues []*types.MeasuredValue) []*types.MeasuredValue {
|
|
compressedMeasuredValues := make([]*types.MeasuredValue, 0)
|
|
lastMeasuredValuesBySensors := make(map[string]map[types.MeasuredValueType]*types.MeasuredValue, 0)
|
|
|
|
// Sort all measured values according to the start time of the validity date
|
|
// in order to successfully implement the subsequent compression.
|
|
sort.SliceStable(measuredValues, func(i int, j int) bool {
|
|
return measuredValues[i].FromDate.Before(measuredValues[j].TillDate)
|
|
})
|
|
|
|
now := format.FormatedTime()
|
|
|
|
for _, measuredValue := range measuredValues {
|
|
|
|
// If the sensor id does not exist in the map, a new map is initialized,
|
|
// which can assume measured value types as the key. Behind this key there
|
|
// is a pointer which refers to a measured value in the memory. This new map
|
|
// is added to the map "lastMeasuredValuesBySensors" under the sensor ID.
|
|
// This makes it possible to store one measured value per measured value
|
|
// type per sensor.
|
|
if _, ok := lastMeasuredValuesBySensors[measuredValue.SensorID]; !ok {
|
|
lastMeasuredValuesBySensors[measuredValue.SensorID] = make(map[types.MeasuredValueType]*types.MeasuredValue, 0)
|
|
}
|
|
|
|
if _, ok := lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType]; !ok {
|
|
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType] = measuredValue
|
|
continue
|
|
}
|
|
|
|
if lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].Value == measuredValue.Value {
|
|
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].TillDate = measuredValue.TillDate
|
|
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].UpdateDate = &now
|
|
} else if lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].Value != measuredValue.Value {
|
|
compressedMeasuredValues = append(compressedMeasuredValues, lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType])
|
|
delete(lastMeasuredValuesBySensors[measuredValue.SensorID], measuredValue.ValueType)
|
|
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType] = measuredValue
|
|
}
|
|
}
|
|
|
|
// Copy all remaining entries from the map into the cache array
|
|
for _, lastMeasuredValuesBySensor := range lastMeasuredValuesBySensors {
|
|
for _, measuredValueType := range types.MeasuredValueTypes {
|
|
if measuredValue, ok := lastMeasuredValuesBySensor[measuredValueType]; ok {
|
|
compressedMeasuredValues = append(compressedMeasuredValues, measuredValue)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Sort all measured values again to include the measured values from the
|
|
// cache.
|
|
sort.SliceStable(compressedMeasuredValues, func(i int, j int) bool {
|
|
return compressedMeasuredValues[i].FromDate.Before(compressedMeasuredValues[j].FromDate)
|
|
})
|
|
|
|
return compressedMeasuredValues
|
|
}
|
|
|
|
// Read measured values from the given storage endpoint url. The scheme must be
|
|
// matched to a provider, if the scheme is not implemented, the function
|
|
// returns an error
|
|
func Read(ctx context.Context, storageEndpoint *url.URL) ([]*types.MeasuredValue, error) {
|
|
switch storageEndpoint.Scheme {
|
|
case "file":
|
|
measuredValueLogfile := logfile.New(storageEndpoint.Path)
|
|
return measuredValueLogfile.Read()
|
|
case "postgres":
|
|
database, err := db.New(storageEndpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer database.Close()
|
|
return database.SelectMeasuredValues(ctx)
|
|
}
|
|
return nil, fmt.Errorf("No supported scheme")
|
|
}
|
|
|
|
func Round(measuredValues []*types.MeasuredValue, round float64) {
|
|
for _, measuredValue := range measuredValues {
|
|
measuredValue.Value = math.Round(measuredValue.Value/round) * round
|
|
}
|
|
}
|
|
|
|
// Write measured values to the given storage endpoint url. If the storage
|
|
// provider defined to a file, the data will be overwritten. If a database
|
|
// provider is used, the data is simply added without deleting the existing
|
|
// data. The scheme must be matched to a storage provider, if the scheme is not
|
|
// implemented, the function returns an error
|
|
func Write(ctx context.Context, measuredValues []*types.MeasuredValue, storageEndpoint *url.URL) error {
|
|
switch storageEndpoint.Scheme {
|
|
case "file":
|
|
measuredValueLogfile := logfile.New(storageEndpoint.Path)
|
|
storedMeasuredValues, err := measuredValueLogfile.Read()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
storedMeasuredValues = append(storedMeasuredValues, measuredValues...)
|
|
return measuredValueLogfile.Write(storedMeasuredValues)
|
|
case "postgres":
|
|
database, err := db.New(storageEndpoint)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer database.Close()
|
|
if err := database.InsertMeasuredValues(ctx, measuredValues); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return fmt.Errorf("No supported scheme")
|
|
}
|