package storage import ( "context" "fmt" "math" "net/url" "sort" "github.com/volker-raschek/flucky/pkg/internal/format" "github.com/volker-raschek/flucky/pkg/storage/db" "github.com/volker-raschek/flucky/pkg/storage/logfile" "github.com/volker-raschek/flucky/pkg/types" ) // Compression the measured values. The system checks whether the measured values // of the same type correspond to those of the predecessor. If this is the case, // the current value is discarded and the validity date of the previous value is // set to that of the current value. This means that no information is lost. // Only the validity period of the measured value is increased. func Compression(measuredValues []*types.MeasuredValue) []*types.MeasuredValue { compressedMeasuredValues := make([]*types.MeasuredValue, 0) lastMeasuredValuesBySensors := make(map[string]map[types.MeasuredValueType]*types.MeasuredValue, 0) // Sort all measured values according to the start time of the validity date // in order to successfully implement the subsequent compression. sort.SliceStable(measuredValues, func(i int, j int) bool { return measuredValues[i].FromDate.Before(measuredValues[j].TillDate) }) now := format.FormatedTime() for _, measuredValue := range measuredValues { // If the sensor id does not exist in the map, a new map is initialized, // which can assume measured value types as the key. Behind this key there // is a pointer which refers to a measured value in the memory. This new map // is added to the map "lastMeasuredValuesBySensors" under the sensor ID. // This makes it possible to store one measured value per measured value // type per sensor. if _, ok := lastMeasuredValuesBySensors[measuredValue.SensorID]; !ok { lastMeasuredValuesBySensors[measuredValue.SensorID] = make(map[types.MeasuredValueType]*types.MeasuredValue, 0) } if _, ok := lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType]; !ok { lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType] = measuredValue continue } if lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].Value == measuredValue.Value { lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].TillDate = measuredValue.TillDate lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].UpdateDate = &now } else if lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].Value != measuredValue.Value { compressedMeasuredValues = append(compressedMeasuredValues, lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType]) delete(lastMeasuredValuesBySensors[measuredValue.SensorID], measuredValue.ValueType) lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType] = measuredValue } } // Copy all remaining entries from the map into the cache array for _, lastMeasuredValuesBySensor := range lastMeasuredValuesBySensors { for _, measuredValueType := range types.MeasuredValueTypes { if measuredValue, ok := lastMeasuredValuesBySensor[measuredValueType]; ok { compressedMeasuredValues = append(compressedMeasuredValues, measuredValue) } } } // Sort all measured values again to include the measured values from the // cache. sort.SliceStable(compressedMeasuredValues, func(i int, j int) bool { return compressedMeasuredValues[i].FromDate.Before(compressedMeasuredValues[j].FromDate) }) return compressedMeasuredValues } // Read measured values from the given storage endpoint url. The scheme must be // matched to a provider, if the scheme is not implemented, the function // returns an error func Read(ctx context.Context, storageEndpoint *url.URL) ([]*types.MeasuredValue, error) { switch storageEndpoint.Scheme { case "file": measuredValueLogfile := logfile.New(storageEndpoint.Path) return measuredValueLogfile.Read() case "postgres": database, err := db.New(storageEndpoint) if err != nil { return nil, err } defer database.Close() return database.SelectMeasuredValues(ctx) } return nil, fmt.Errorf("No supported scheme") } func Round(measuredValues []*types.MeasuredValue, round float64) { for _, measuredValue := range measuredValues { measuredValue.Value = math.Round(measuredValue.Value/round) * round } } // Write measured values to the given storage endpoint url. If the storage // provider defined to a file, the data will be overwritten. If a database // provider is used, the data is simply added without deleting the existing // data. The scheme must be matched to a storage provider, if the scheme is not // implemented, the function returns an error func Write(ctx context.Context, measuredValues []*types.MeasuredValue, storageEndpoint *url.URL) error { writeCreationDate(measuredValues) switch storageEndpoint.Scheme { case "file": measuredValueLogfile := logfile.New(storageEndpoint.Path) return measuredValueLogfile.Write(measuredValues) case "postgres": database, err := db.New(storageEndpoint) if err != nil { return err } defer database.Close() return database.InsertMeasuredValues(ctx, measuredValues) default: return fmt.Errorf("No supported scheme") } } func writeCreationDate(measuredValues []*types.MeasuredValue) { now := format.FormatedTime() for _, measuredValue := range measuredValues { if measuredValue.CreationDate == nil { measuredValue.CreationDate = &now } } }