package storage import ( "context" "fmt" "math" "net/url" "sort" "github.com/go-flucky/flucky/pkg/internal/format" "github.com/go-flucky/flucky/pkg/storage/db" "github.com/go-flucky/flucky/pkg/storage/logfile" "github.com/go-flucky/flucky/pkg/types" ) // Compression the measured values. The system checks whether the measured values // of the same type correspond to those of the predecessor. If this is the case, // the current value is discarded and the validity date of the previous value is // set to that of the current value. This means that no information is lost. // Only the validity period of the measured value is increased. func Compression(measuredValues []*types.MeasuredValue) []*types.MeasuredValue { compressedMeasuredValues := make([]*types.MeasuredValue, 0) lastMeasuredValuesBySensors := make(map[string]map[types.MeasuredValueType]*types.MeasuredValue, 0) // Sort all measured values according to the start time of the validity date // in order to successfully implement the subsequent compression. sort.SliceStable(measuredValues, func(i int, j int) bool { return measuredValues[i].FromDate.Before(measuredValues[j].TillDate) }) now := format.FormatedTime() for _, measuredValue := range measuredValues { if _, ok := lastMeasuredValuesBySensors[measuredValue.SensorID]; !ok { lastMeasuredValuesBySensors[measuredValue.SensorID] = make(map[types.MeasuredValueType]*types.MeasuredValue, 0) } if _, ok := lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType]; !ok { lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType] = measuredValue continue } if lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].Value == measuredValue.Value { lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].TillDate = measuredValue.TillDate lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].UpdateDate = &now } else if lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].Value != measuredValue.Value { compressedMeasuredValues = append(compressedMeasuredValues, lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType]) delete(lastMeasuredValuesBySensors[measuredValue.SensorID], measuredValue.ValueType) lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType] = measuredValue } } // Copy all remaining entries from the map into the cache array for _, lastMeasuredValuesBySensor := range lastMeasuredValuesBySensors { for _, measuredValueType := range types.MeasuredValueTypes { if measuredValue, ok := lastMeasuredValuesBySensor[measuredValueType]; ok { compressedMeasuredValues = append(compressedMeasuredValues, measuredValue) } } } // Sort all measured values again to include the measured values from the // cache. sort.SliceStable(compressedMeasuredValues, func(i int, j int) bool { return compressedMeasuredValues[i].FromDate.Before(compressedMeasuredValues[j].FromDate) }) return compressedMeasuredValues } // Read measured values from the given storage endpoint url. The scheme must be // matched to a provider, if the scheme is not implemented, the function // returns an error func Read(ctx context.Context, storageEndpoint *url.URL) ([]*types.MeasuredValue, error) { switch storageEndpoint.Scheme { case "file": measuredValueLogfile := logfile.New(storageEndpoint.Path) return measuredValueLogfile.Read() case "postgres": database, err := db.New(storageEndpoint) if err != nil { return nil, err } defer database.Close() return database.SelectMeasuredValues(ctx) } return nil, fmt.Errorf("No supported scheme") } func Round(measuredValues []*types.MeasuredValue, round float64) { for _, measuredValue := range measuredValues { measuredValue.Value = math.Round(measuredValue.Value/round) * round } } // Write measured values to the given storage endpoint url. The scheme must be // matched to a provider, if the scheme is not implemented, the function // returns an error func Write(ctx context.Context, measuredValues []*types.MeasuredValue, storageEndpoint *url.URL) error { switch storageEndpoint.Scheme { case "file": measuredValueLogfile := logfile.New(storageEndpoint.Path) return measuredValueLogfile.Write(measuredValues) case "postgres": database, err := db.New(storageEndpoint) if err != nil { return err } defer database.Close() if err := database.InsertMeasuredValues(ctx, measuredValues); err != nil { return err } } return fmt.Errorf("No supported scheme") }