fix(pkg/daemon): save measured values into postgres database if defined
This commit is contained in:
129
pkg/storage/logfile/csv.go
Normal file
129
pkg/storage/logfile/csv.go
Normal file
@ -0,0 +1,129 @@
|
||||
package logfile
|
||||
|
||||
import (
|
||||
"encoding/csv"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-flucky/flucky/pkg/internal/format"
|
||||
"github.com/go-flucky/flucky/pkg/types"
|
||||
)
|
||||
|
||||
type csvLogfile struct {
|
||||
logfile string
|
||||
}
|
||||
|
||||
func (cl *csvLogfile) Read() ([]*types.MeasuredValue, error) {
|
||||
if _, err := os.Stat(cl.logfile); os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("%v: %v", errorLogfileNotFound, cl.logfile)
|
||||
}
|
||||
|
||||
f, err := os.Open(cl.logfile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v: %v", errorLogfileOpen, cl.logfile)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
r := csv.NewReader(f)
|
||||
records, err := r.ReadAll()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v %v: %v", errorLogfileDecode, cl.logfile, err)
|
||||
}
|
||||
|
||||
measuredValues := make([]*types.MeasuredValue, 0)
|
||||
|
||||
for _, record := range records {
|
||||
|
||||
// ValueType
|
||||
valueType, err := types.SelectMeasuredValueType(record[1])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v %v: %v", errorParseFloat, record[1], err)
|
||||
}
|
||||
|
||||
// Value
|
||||
value, err := strconv.ParseFloat(record[2], 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v %v: %v", errorParseFloat, record[2], err)
|
||||
}
|
||||
|
||||
// Times
|
||||
times := make([]time.Time, 0)
|
||||
for _, i := range []int{3, 4} {
|
||||
time, err := time.Parse(format.TimeFormat, record[i])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v %v: %v", errorParseTime, record[i], err)
|
||||
}
|
||||
times = append(times, time)
|
||||
}
|
||||
|
||||
measuredValue := &types.MeasuredValue{
|
||||
ID: record[0],
|
||||
ValueType: *valueType,
|
||||
Value: value,
|
||||
FromDate: times[0],
|
||||
TillDate: times[1],
|
||||
SensorID: record[5],
|
||||
}
|
||||
|
||||
// Creation date
|
||||
creationDate, err := time.Parse(format.TimeFormat, record[6])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v %v: %v", errorParseTime, record[6], err)
|
||||
}
|
||||
measuredValue.CreationDate = creationDate
|
||||
|
||||
if record[7] != "null" {
|
||||
updateDate, err := time.Parse(format.TimeFormat, record[7])
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v %v: %v", errorParseTime, record[7], err)
|
||||
}
|
||||
measuredValue.UpdateDate = &updateDate
|
||||
}
|
||||
|
||||
measuredValues = append(measuredValues, measuredValue)
|
||||
|
||||
}
|
||||
|
||||
return measuredValues, nil
|
||||
|
||||
}
|
||||
|
||||
func (cl *csvLogfile) Write(measuredValues []*types.MeasuredValue) error {
|
||||
f, err := os.Create(cl.logfile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v: %v", errorLogfileCreate, cl.logfile)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
writeCreationDate(measuredValues)
|
||||
|
||||
w := csv.NewWriter(f)
|
||||
|
||||
for _, measuredValue := range measuredValues {
|
||||
|
||||
record := []string{
|
||||
measuredValue.ID,
|
||||
fmt.Sprintf("%v", measuredValue.ValueType),
|
||||
fmt.Sprintf("%v", measuredValue.Value),
|
||||
measuredValue.FromDate.Format(format.TimeFormat),
|
||||
measuredValue.TillDate.Format(format.TimeFormat),
|
||||
measuredValue.SensorID,
|
||||
}
|
||||
|
||||
record = append(record, measuredValue.CreationDate.Format(format.TimeFormat))
|
||||
|
||||
if measuredValue.UpdateDate != nil {
|
||||
record = append(record, measuredValue.UpdateDate.Format(format.TimeFormat))
|
||||
} else {
|
||||
record = append(record, "null")
|
||||
}
|
||||
|
||||
w.Write(record)
|
||||
}
|
||||
|
||||
w.Flush()
|
||||
|
||||
return nil
|
||||
}
|
28
pkg/storage/logfile/errors.go
Normal file
28
pkg/storage/logfile/errors.go
Normal file
@ -0,0 +1,28 @@
|
||||
package logfile
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
errorLogfileCreate = errors.New("Can not create logfile")
|
||||
errorLogfileDecode = errors.New("Can not decode from reader")
|
||||
errorLogfileEncode = errors.New("Can not encode from writer")
|
||||
errorLogfileMarshal = errors.New("Can not marshal values")
|
||||
errorLogfileNotFound = errors.New("Can not find logfile")
|
||||
errorLogfileOpen = errors.New("Can not open logfile")
|
||||
errorLogfileRead = errors.New("Can not read from given reader")
|
||||
errorLogfileUnmarshal = errors.New("Can not unmarshal values")
|
||||
errorLogfileWrite = errors.New("Can not write with given writer")
|
||||
|
||||
errorParseFloat = errors.New("Can not parse float")
|
||||
errorParseMeasurementUnit = errors.New("Can not parse mesaurement unit")
|
||||
errorParseTime = errors.New("Can not parse time")
|
||||
|
||||
errorNoValidHumidityID = errors.New("No valid humidity id detected or available")
|
||||
errorNoValidMesuredValue = errors.New("No mesured value detected or available")
|
||||
errorNoValidSensorID = errors.New("No sensor id detected or available")
|
||||
errorNoValidTemperatureID = errors.New("No valid temperature id detected or available")
|
||||
errorNoValidTime = errors.New("No time detected or available")
|
||||
errorNoValidTimePeriods = errors.New("No valid time periods")
|
||||
|
||||
errorTypeSwitch = errors.New("Can not detect type via type switch")
|
||||
)
|
11
pkg/storage/logfile/interfaces.go
Normal file
11
pkg/storage/logfile/interfaces.go
Normal file
@ -0,0 +1,11 @@
|
||||
package logfile
|
||||
|
||||
import (
|
||||
"github.com/go-flucky/flucky/pkg/types"
|
||||
)
|
||||
|
||||
// Logfile is an interface for various logfiles
|
||||
type Logfile interface {
|
||||
Read() ([]*types.MeasuredValue, error)
|
||||
Write(measuredValues []*types.MeasuredValue) error
|
||||
}
|
60
pkg/storage/logfile/json.go
Normal file
60
pkg/storage/logfile/json.go
Normal file
@ -0,0 +1,60 @@
|
||||
package logfile
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/go-flucky/flucky/pkg/types"
|
||||
)
|
||||
|
||||
type jsonLogfile struct {
|
||||
logfile string
|
||||
}
|
||||
|
||||
func (jl *jsonLogfile) Read() ([]*types.MeasuredValue, error) {
|
||||
|
||||
if _, err := os.Stat(jl.logfile); os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("%v: %v", errorLogfileNotFound, jl.logfile)
|
||||
}
|
||||
|
||||
f, err := os.Open(jl.logfile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v %v: %v", errorLogfileOpen, jl.logfile, err)
|
||||
}
|
||||
|
||||
measuredValues := make([]*types.MeasuredValue, 0)
|
||||
|
||||
if err := json.NewDecoder(f).Decode(&measuredValues); err != nil {
|
||||
return nil, fmt.Errorf("%v %v: %v", errorLogfileDecode, jl.logfile, err)
|
||||
}
|
||||
|
||||
return measuredValues, nil
|
||||
}
|
||||
|
||||
func (jl *jsonLogfile) Write(measuredValues []*types.MeasuredValue) error {
|
||||
|
||||
if _, err := os.Stat(filepath.Dir(jl.logfile)); os.IsNotExist(err) {
|
||||
if err := os.MkdirAll(filepath.Dir(jl.logfile), 755); err != nil {
|
||||
return fmt.Errorf("Directory for the logfile can not be created: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
writeCreationDate(measuredValues)
|
||||
|
||||
f, err := os.Create(jl.logfile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v %v: %v", errorLogfileCreate, jl.logfile, err)
|
||||
}
|
||||
|
||||
jsonEncoder := json.NewEncoder(f)
|
||||
jsonEncoder.SetIndent("", " ")
|
||||
err = jsonEncoder.Encode(measuredValues)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v %v: %v", errorLogfileEncode, jl.logfile, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
}
|
132
pkg/storage/logfile/logfile.go
Normal file
132
pkg/storage/logfile/logfile.go
Normal file
@ -0,0 +1,132 @@
|
||||
package logfile
|
||||
|
||||
import (
|
||||
"math"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
|
||||
"github.com/go-flucky/flucky/pkg/internal/format"
|
||||
"github.com/go-flucky/flucky/pkg/types"
|
||||
)
|
||||
|
||||
// var validUUID = regexp.MustCompile("^[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-4[a-fA-F0-9]{3}-[8|9|aA|bB][a-fA-F0-9]{3}-[a-fA-F0-9]{12}$")
|
||||
|
||||
// Append adds an array of several measured values to a logfile
|
||||
func Append(logfile Logfile, compression bool, round float64, measuredValues []*types.MeasuredValue) error {
|
||||
|
||||
if round != 0 {
|
||||
for _, measuredValue := range measuredValues {
|
||||
measuredValue.Value = math.Round(measuredValue.Value/round) * round
|
||||
}
|
||||
}
|
||||
|
||||
allMeasuredValues, err := logfile.Read()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
allMeasuredValues = append(allMeasuredValues, measuredValues...)
|
||||
|
||||
if compression {
|
||||
allMeasuredValues = Compression(allMeasuredValues)
|
||||
}
|
||||
|
||||
err = logfile.Write(allMeasuredValues)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Compression the measured values. The system checks whether the measured values
|
||||
// of the same type correspond to those of the predecessor. If this is the case,
|
||||
// the current value is discarded and the validity date of the previous value is
|
||||
// set to that of the current value. This means that no information is lost.
|
||||
// Only the validity period of the measured value is increased.
|
||||
func Compression(measuredValues []*types.MeasuredValue) []*types.MeasuredValue {
|
||||
compressedMeasuredValues := make([]*types.MeasuredValue, 0)
|
||||
lastMeasuredValuesBySensors := make(map[string]map[types.MeasuredValueType]*types.MeasuredValue, 0)
|
||||
|
||||
// Sort all measured values according to the start time of the validity date
|
||||
// in order to successfully implement the subsequent compression.
|
||||
sort.SliceStable(measuredValues, func(i int, j int) bool {
|
||||
return measuredValues[i].FromDate.Before(measuredValues[j].TillDate)
|
||||
})
|
||||
|
||||
now := format.FormatedTime()
|
||||
|
||||
for _, measuredValue := range measuredValues {
|
||||
if _, ok := lastMeasuredValuesBySensors[measuredValue.SensorID]; !ok {
|
||||
lastMeasuredValuesBySensors[measuredValue.SensorID] = make(map[types.MeasuredValueType]*types.MeasuredValue, 0)
|
||||
}
|
||||
|
||||
if _, ok := lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType]; !ok {
|
||||
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType] = measuredValue
|
||||
continue
|
||||
}
|
||||
|
||||
if lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].Value == measuredValue.Value {
|
||||
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].TillDate = measuredValue.TillDate
|
||||
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].UpdateDate = &now
|
||||
} else if lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType].Value != measuredValue.Value {
|
||||
compressedMeasuredValues = append(compressedMeasuredValues, lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType])
|
||||
delete(lastMeasuredValuesBySensors[measuredValue.SensorID], measuredValue.ValueType)
|
||||
lastMeasuredValuesBySensors[measuredValue.SensorID][measuredValue.ValueType] = measuredValue
|
||||
}
|
||||
}
|
||||
|
||||
// Copy all remaining entries from the map into the cache array
|
||||
for _, lastMeasuredValuesBySensor := range lastMeasuredValuesBySensors {
|
||||
for _, measuredValueType := range types.MeasuredValueTypes {
|
||||
if measuredValue, ok := lastMeasuredValuesBySensor[measuredValueType]; ok {
|
||||
compressedMeasuredValues = append(compressedMeasuredValues, measuredValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort all measured values again to include the measured values from the
|
||||
// cache.
|
||||
sort.SliceStable(compressedMeasuredValues, func(i int, j int) bool {
|
||||
return compressedMeasuredValues[i].FromDate.Before(compressedMeasuredValues[j].FromDate)
|
||||
})
|
||||
|
||||
return compressedMeasuredValues
|
||||
}
|
||||
|
||||
// New returns a log file with basic functions for reading and writing data. The
|
||||
// file extension of the logfile is taken into account to format the logfile
|
||||
// into the correct format.
|
||||
func New(logfile string) Logfile {
|
||||
|
||||
ext := filepath.Ext(logfile)
|
||||
|
||||
switch ext {
|
||||
case ".csv":
|
||||
return &csvLogfile{
|
||||
logfile: logfile,
|
||||
}
|
||||
case ".json":
|
||||
return &jsonLogfile{
|
||||
logfile: logfile,
|
||||
}
|
||||
case ".xml":
|
||||
return &xmlLogfile{
|
||||
logfile: logfile,
|
||||
}
|
||||
default:
|
||||
return &jsonLogfile{
|
||||
logfile: logfile,
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func writeCreationDate(measuredValues []*types.MeasuredValue) error {
|
||||
for _, measuredValue := range measuredValues {
|
||||
now := format.FormatedTime()
|
||||
measuredValue.CreationDate = now
|
||||
}
|
||||
return nil
|
||||
}
|
1
pkg/storage/logfile/logfile_test.go
Normal file
1
pkg/storage/logfile/logfile_test.go
Normal file
@ -0,0 +1 @@
|
||||
package logfile_test
|
19
pkg/storage/logfile/types.go
Normal file
19
pkg/storage/logfile/types.go
Normal file
@ -0,0 +1,19 @@
|
||||
package logfile
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
|
||||
"github.com/go-flucky/flucky/pkg/types"
|
||||
)
|
||||
|
||||
// MeasuredValues is an XML Wrapper for an array of measured values
|
||||
type MeasuredValues struct {
|
||||
XMLName xml.Name `xml:"measured_values"`
|
||||
MeasuredValues []*MeasuredValue `xml:"measured_value"`
|
||||
}
|
||||
|
||||
// MeasuredValue is an XML Wrapper for the original measured value struct
|
||||
type MeasuredValue struct {
|
||||
XMLName xml.Name `xml:"measured_value"`
|
||||
*types.MeasuredValue
|
||||
}
|
74
pkg/storage/logfile/xml.go
Normal file
74
pkg/storage/logfile/xml.go
Normal file
@ -0,0 +1,74 @@
|
||||
package logfile
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"os"
|
||||
|
||||
"github.com/go-flucky/flucky/pkg/types"
|
||||
)
|
||||
|
||||
type xmlLogfile struct {
|
||||
logfile string
|
||||
}
|
||||
|
||||
func (xl *xmlLogfile) GetLogfile() string {
|
||||
return xl.logfile
|
||||
}
|
||||
|
||||
func (xl *xmlLogfile) Read() ([]*types.MeasuredValue, error) {
|
||||
if _, err := os.Stat(xl.logfile); os.IsNotExist(err) {
|
||||
return nil, fmt.Errorf("%v: %v", errorLogfileNotFound, xl.logfile)
|
||||
}
|
||||
|
||||
f, err := os.Open(xl.logfile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%v: %v", errorLogfileOpen, xl.logfile)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
measuredValues := new(MeasuredValues)
|
||||
|
||||
if err := xml.NewDecoder(f).Decode(&measuredValues); err != nil {
|
||||
return nil, fmt.Errorf("%v: %v", errorLogfileDecode, err)
|
||||
}
|
||||
|
||||
cachedMeasuredValues := make([]*types.MeasuredValue, 0)
|
||||
for _, measuredValue := range measuredValues.MeasuredValues {
|
||||
cachedMeasuredValues = append(cachedMeasuredValues, measuredValue.MeasuredValue)
|
||||
}
|
||||
|
||||
return cachedMeasuredValues, nil
|
||||
}
|
||||
|
||||
func (xl *xmlLogfile) Write(measuredValues []*types.MeasuredValue) error {
|
||||
f, err := os.Create(xl.logfile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v: %v", errorLogfileCreate, xl.logfile)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
writeCreationDate(measuredValues)
|
||||
|
||||
cachedMeasuredValues := new(MeasuredValues)
|
||||
|
||||
for _, measuredValue := range measuredValues {
|
||||
cachedMeasuredValue := &MeasuredValue{
|
||||
MeasuredValue: measuredValue,
|
||||
}
|
||||
|
||||
cachedMeasuredValues.MeasuredValues = append(cachedMeasuredValues.MeasuredValues, cachedMeasuredValue)
|
||||
}
|
||||
|
||||
bytes, err := xml.MarshalIndent(cachedMeasuredValues, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v: %v", errorLogfileMarshal, err)
|
||||
}
|
||||
|
||||
_, err = f.Write(bytes)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%v: %v", errorLogfileWrite, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Reference in New Issue
Block a user