fix: splited temperatures

Changes:
- Split temperatures from log file into blocks.
  Every block has a size of 500 entries. Every block would be send to
  the remote host
This commit is contained in:
Markus Pesch 2018-12-07 22:50:28 +01:00
parent 81600154f0
commit 500d1a5823
Signed by: volker.raschek
GPG Key ID: 852BCC170D81A982
8 changed files with 175 additions and 102 deletions

View File

@ -13,6 +13,7 @@ var quiet bool
var listRemoteCmd = &cobra.Command{ var listRemoteCmd = &cobra.Command{
Use: "list", Use: "list",
Short: "List Remove Servers", Short: "List Remove Servers",
Aliases: []string{"ls"},
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
if err := remote.Print(os.Stdout, configDir, quiet); err != nil { if err := remote.Print(os.Stdout, configDir, quiet); err != nil {

View File

@ -12,6 +12,7 @@ var all bool
var rmRemoteCmd = &cobra.Command{ var rmRemoteCmd = &cobra.Command{
Use: "rm", Use: "rm",
Short: "Remove Remote Server", Short: "Remove Remote Server",
Aliases: []string{"remove"},
Args: cobra.RangeArgs(0, 1), Args: cobra.RangeArgs(0, 1),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {

View File

@ -11,6 +11,7 @@ var force bool
var syncRemoteCmd = &cobra.Command{ var syncRemoteCmd = &cobra.Command{
Use: "sync", Use: "sync",
Aliases: []string{"synchronize"},
Short: "Synchronise Device Values with Remote Servers", Short: "Synchronise Device Values with Remote Servers",
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {

View File

@ -11,8 +11,9 @@ import (
var quiet bool var quiet bool
var listSensorCmd = &cobra.Command{ var listSensorCmd = &cobra.Command{
Use: "list", Use: "ls",
Short: "List Sensors", Short: "List Sensors",
Aliases: []string{"list"},
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
if err := sensor.Print(os.Stdout, configDir, quiet); err != nil { if err := sensor.Print(os.Stdout, configDir, quiet); err != nil {
log.Fatal(err) log.Fatal(err)

View File

@ -10,6 +10,7 @@ import (
var rmSensorCmd = &cobra.Command{ var rmSensorCmd = &cobra.Command{
Use: "rm", Use: "rm",
Short: "Remove Sensor", Short: "Remove Sensor",
Aliases: []string{"remove"},
Args: cobra.ExactArgs(1), Args: cobra.ExactArgs(1),
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
if err := sensor.Remove(args[0], configDir); err != nil { if err := sensor.Remove(args[0], configDir); err != nil {

View File

@ -12,6 +12,7 @@ var force bool
var syncSensorCmd = &cobra.Command{ var syncSensorCmd = &cobra.Command{
Use: "sync", Use: "sync",
Short: "Synchronise Sensors with Remote Servers", Short: "Synchronise Sensors with Remote Servers",
Aliases: []string{"synchronize"},
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
if err := httpcall.SyncSensors(configDir, force); err != nil { if err := httpcall.SyncSensors(configDir, force); err != nil {

View File

@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math"
"net/http" "net/http"
"git.cryptic.systems/fh-trier/go-flucky/pkg/config" "git.cryptic.systems/fh-trier/go-flucky/pkg/config"
@ -20,15 +21,34 @@ func SendTemperatures(temperatures []*stypes.Temperature, configDir string) erro
return err return err
} }
temperaturesAsBytes, err := json.Marshal(temperatures) // split temperatures in blocks to reduce server data size in single request
blockSize := 500
temperatureBlocks := make(map[int][]*stypes.Temperature)
length := int(math.Ceil(float64(len(temperatures)) / float64(blockSize)))
for i := 0; i < length; i++ {
start := i * blockSize
end := (i + 1) * blockSize
if end > len(temperatures) {
end = len(temperatures)
}
temperatureBlocks[i] = temperatures[start:end]
}
for _, remote := range cnf.Remotes {
if remote.Enabled {
for _, temperatureBlock := range temperatureBlocks {
temperaturesAsBytes, err := json.Marshal(temperatureBlock)
if err != nil { if err != nil {
return fmt.Errorf("Can not marshal temperatures to JSON: %v", err) return fmt.Errorf("Can not marshal temperatures to JSON: %v", err)
} }
temperaturesAsReader := bytes.NewReader(temperaturesAsBytes) temperaturesAsReader := bytes.NewReader(temperaturesAsBytes)
for _, remote := range cnf.Remotes {
if remote.Enabled {
requestURL := fmt.Sprintf("%s%s", remote.Address, "/temperatures") requestURL := fmt.Sprintf("%s%s", remote.Address, "/temperatures")
req, err := http.NewRequest("POST", requestURL, temperaturesAsReader) req, err := http.NewRequest("POST", requestURL, temperaturesAsReader)
if err != nil { if err != nil {
@ -51,6 +71,7 @@ func SendTemperatures(temperatures []*stypes.Temperature, configDir string) erro
} }
} }
} }
}
return nil return nil

View File

@ -12,6 +12,7 @@ import (
stypes "git.cryptic.systems/fh-trier/go-flucky-server/pkg/types" stypes "git.cryptic.systems/fh-trier/go-flucky-server/pkg/types"
"git.cryptic.systems/fh-trier/go-flucky/pkg/httpcall" "git.cryptic.systems/fh-trier/go-flucky/pkg/httpcall"
"git.cryptic.systems/fh-trier/go-flucky/pkg/logs" "git.cryptic.systems/fh-trier/go-flucky/pkg/logs"
"git.cryptic.systems/fh-trier/go-flucky/pkg/types"
"git.cryptic.systems/fh-trier/go-flucky/pkg/config" "git.cryptic.systems/fh-trier/go-flucky/pkg/config"
@ -30,49 +31,20 @@ func Get(sensorNames []string, push bool, configDir string, w io.Writer) error {
return err return err
} }
// get sensors
sensors, err := getSensors(sensorNames, cnf)
if err != nil {
return err
}
// get temperatures
temperatures, err := getTemperatures(sensors)
if err != nil {
return err
}
tw := tabwriter.NewWriter(w, 0, 0, 5, ' ', 0) tw := tabwriter.NewWriter(w, 0, 0, 5, ' ', 0)
// filter sensors
var sensors []*stypes.Sensor
if len(sensorNames) > 0 {
for _, sensorName := range sensorNames {
for _, sensor := range cnf.Sensors {
if sensorName == *sensor.SensorName && *sensor.SensorEnabled ||
sensorName == sensor.SensorID && *sensor.SensorEnabled ||
sensorName == *sensor.WireID && *sensor.SensorEnabled {
sensors = append(sensors, sensor)
}
}
}
} else {
for _, sensor := range cnf.Sensors {
if *sensor.SensorEnabled {
sensors = append(sensors, sensor)
}
}
}
if len(sensors) == 0 {
return fmt.Errorf("No matched Sensors. Check if your sensor is configured or enabled")
}
// check if sensor exists in cnf and has a wire id
for _, filterdSensor := range sensors {
if filterdSensor.WireID == nil || *filterdSensor.WireID == "" {
return fmt.Errorf("Sensor %v has no wire id", filterdSensor.SensorID)
}
var found bool
for _, cnfSensor := range cnf.Sensors {
if filterdSensor.SensorID == cnfSensor.SensorID {
found = true
}
}
if !found {
return fmt.Errorf("Can not found sensor %v in config", filterdSensor.SensorID)
}
}
// headlines // headlines
for _, sensor := range sensors { for _, sensor := range sensors {
if sensor.SensorName != nil && *sensor.SensorName != "" { if sensor.SensorName != nil && *sensor.SensorName != "" {
@ -84,10 +56,6 @@ func Get(sensorNames []string, push bool, configDir string, w io.Writer) error {
fmt.Fprint(tw, "\n") fmt.Fprint(tw, "\n")
// body // body
temperatures, err := getTemperatures(sensors)
if err != nil {
return err
}
for _, temperature := range temperatures { for _, temperature := range temperatures {
fmt.Fprintf(tw, "%v\t", temperature.TemperatureValue) fmt.Fprintf(tw, "%v\t", temperature.TemperatureValue)
} }
@ -110,6 +78,43 @@ func Get(sensorNames []string, push bool, configDir string, w io.Writer) error {
} }
// Fetch ...
func Fetch(sensorNames []string, push bool, configDir string) error {
// get cnf
cnf, err := config.Read(configDir)
if err != nil {
return err
}
// get sensors
sensors, err := getSensors(sensorNames, cnf)
if err != nil {
return err
}
// get temperatures
temperatures, err := getTemperatures(sensors)
if err != nil {
return err
}
// write logfiles or push
if !push {
if err := logs.AppendTemperature(temperatures, cnf); err != nil {
return err
}
} else {
if err := httpcall.SendTemperatures(temperatures, configDir); err != nil {
return err
}
}
return nil
}
// Follow ...
func Follow(sensorNames []string, push bool, configDir string, w io.Writer) error { func Follow(sensorNames []string, push bool, configDir string, w io.Writer) error {
// get cnf // get cnf
@ -118,39 +123,29 @@ func Follow(sensorNames []string, push bool, configDir string, w io.Writer) erro
return err return err
} }
var temperatures []*stypes.Temperature // get sensors
sensors, err := getSensors(sensorNames, cnf)
if err != nil {
return err
}
// tabwriter
tw := tabwriter.NewWriter(w, 0, 0, 5, ' ', 0) tw := tabwriter.NewWriter(w, 0, 0, 5, ' ', 0)
// headlines // headlines
var sensors []*stypes.Sensor for _, sensor := range sensors {
if sensor.SensorName != nil && *sensor.SensorName != "" {
numOfSensors := len(cnf.Sensors)
for _, sensor := range cnf.Sensors {
switch {
case sensor.SensorName != nil && numOfSensors <= 1:
sensors = append(sensors, sensor)
case sensor.SensorName == nil && numOfSensors <= 1:
sensors = append(sensors, sensor)
case sensor.SensorName != nil && numOfSensors > 1:
fmt.Fprintf(tw, "%v\t", *sensor.SensorName) fmt.Fprintf(tw, "%v\t", *sensor.SensorName)
sensors = append(sensors, sensor) } else {
break
case sensor.SensorName == nil && numOfSensors > 1:
fmt.Fprintf(tw, "%v\t", sensor.SensorID) fmt.Fprintf(tw, "%v\t", sensor.SensorID)
sensors = append(sensors, sensor)
break
} }
} }
if numOfSensors > 1 {
fmt.Fprint(tw, "\n") fmt.Fprint(tw, "\n")
}
// body // body
temperatures := []*stypes.Temperature{}
ticker := time.NewTicker(1 * time.Second) ticker := time.NewTicker(1 * time.Second)
// TODO: create channel for own sensor
go func() { go func() {
for { for {
select { select {
@ -160,8 +155,16 @@ func Follow(sensorNames []string, push bool, configDir string, w io.Writer) erro
} }
// get temperatures from sensors and write them into writer // get temperatures from sensors and write them into writer
temperatures, err = getTemperatures(sensors) t, err := getTemperatures(sensors)
for _, temperature := range temperatures { // TODO: get error back over channel
if err != nil {
return
}
// append temperatures to temperature list
temperatures = append(temperatures, t...)
for _, temperature := range t {
fmt.Fprintf(tw, "%3.3f\t", temperature.TemperatureValue) fmt.Fprintf(tw, "%3.3f\t", temperature.TemperatureValue)
} }
@ -244,3 +247,46 @@ func getTemperatures(sensors []*stypes.Sensor) ([]*stypes.Temperature, error) {
return temperatures, nil return temperatures, nil
} }
func getSensors(sensorNames []string, cnf *types.Config) ([]*stypes.Sensor, error) {
sensors := []*stypes.Sensor{}
if len(sensorNames) > 0 {
for _, sensorName := range sensorNames {
for _, sensor := range cnf.Sensors {
if sensorName == *sensor.SensorName && *sensor.SensorEnabled ||
sensorName == sensor.SensorID && *sensor.SensorEnabled ||
sensorName == *sensor.WireID && *sensor.SensorEnabled {
sensors = append(sensors, sensor)
}
}
}
} else {
for _, sensor := range cnf.Sensors {
if *sensor.SensorEnabled {
sensors = append(sensors, sensor)
}
}
}
if len(sensors) == 0 {
return nil, fmt.Errorf("No matched Sensors. Check if your sensor is configured or enabled")
}
// check if sensor exists in cnf and has a wire id
for _, filterdSensor := range sensors {
if filterdSensor.WireID == nil || *filterdSensor.WireID == "" {
return nil, fmt.Errorf("Sensor %v has no wire id", filterdSensor.SensorID)
}
var found bool
for _, cnfSensor := range cnf.Sensors {
if filterdSensor.SensorID == cnfSensor.SensorID {
found = true
}
}
if !found {
return nil, fmt.Errorf("Can not found sensor %v in config", filterdSensor.SensorID)
}
}
return sensors, nil
}