feat: import from sqlite or postgresql
This commit is contained in:
parent
592e9b7f5c
commit
675af77965
81
cli/imp/imp.go
Normal file
81
cli/imp/imp.go
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
package imp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
|
||||||
|
"git.cryptic.systems/volker.raschek/flucky/pkg/config"
|
||||||
|
"git.cryptic.systems/volker.raschek/flucky/pkg/repository"
|
||||||
|
"git.cryptic.systems/volker.raschek/go-logger"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
importSensors bool
|
||||||
|
importHumidities bool
|
||||||
|
importPressures bool
|
||||||
|
importTemperatures bool
|
||||||
|
)
|
||||||
|
|
||||||
|
func InitCmd(cmd *cobra.Command) error {
|
||||||
|
|
||||||
|
importCmd := &cobra.Command{
|
||||||
|
Use: "import",
|
||||||
|
Short: "Import data from passed URL",
|
||||||
|
RunE: importSources,
|
||||||
|
}
|
||||||
|
importCmd.Flags().BoolVar(&importSensors, "sensors", true, "Import sensors")
|
||||||
|
importCmd.Flags().BoolVar(&importHumidities, "humidities", true, "Import humidities")
|
||||||
|
importCmd.Flags().BoolVar(&importPressures, "pressures", true, "Import pressures")
|
||||||
|
importCmd.Flags().BoolVar(&importTemperatures, "temperatures", true, "Import temperatures")
|
||||||
|
|
||||||
|
cmd.AddCommand(importCmd)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func importSources(cmd *cobra.Command, args []string) error {
|
||||||
|
configFile, err := cmd.Flags().GetString("config")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("No config file defined")
|
||||||
|
}
|
||||||
|
|
||||||
|
cnf, err := config.Read(configFile)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
destURL, err := url.Parse(cnf.DSN)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
logLevelString, err := cmd.Flags().GetString("loglevel")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
logLevel, err := logger.ParseLogLevel(logLevelString)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
flogger := logger.NewLogger(logLevel)
|
||||||
|
|
||||||
|
sourceURL, err := url.Parse(args[0])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to parse source url: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = repository.Import(sourceURL, destURL, flogger, repository.OptImport{
|
||||||
|
Sensors: importSensors,
|
||||||
|
Humidities: importHumidities,
|
||||||
|
Pressures: importPressures,
|
||||||
|
Temperatures: importTemperatures,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to import: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -9,6 +9,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.cryptic.systems/volker.raschek/flucky/cli/daemon"
|
"git.cryptic.systems/volker.raschek/flucky/cli/daemon"
|
||||||
|
imp "git.cryptic.systems/volker.raschek/flucky/cli/imp"
|
||||||
"git.cryptic.systems/volker.raschek/flucky/cli/sensor"
|
"git.cryptic.systems/volker.raschek/flucky/cli/sensor"
|
||||||
"git.cryptic.systems/volker.raschek/flucky/cli/temperature"
|
"git.cryptic.systems/volker.raschek/flucky/cli/temperature"
|
||||||
"git.cryptic.systems/volker.raschek/flucky/pkg/config"
|
"git.cryptic.systems/volker.raschek/flucky/pkg/config"
|
||||||
@ -38,6 +39,7 @@ func Execute(version *semver.Version) error {
|
|||||||
|
|
||||||
subCommands := []func(cmd *cobra.Command) error{
|
subCommands := []func(cmd *cobra.Command) error{
|
||||||
daemon.InitCmd,
|
daemon.InitCmd,
|
||||||
|
imp.InitCmd,
|
||||||
sensor.InitCmd,
|
sensor.InitCmd,
|
||||||
temperature.InitCmd,
|
temperature.InitCmd,
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package repository
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
@ -214,3 +215,94 @@ func New(dsnURL *url.URL, flogger logger.Logger) (*Repository, error) {
|
|||||||
database: database,
|
database: database,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type OptImport struct {
|
||||||
|
Sensors bool
|
||||||
|
Humidities bool
|
||||||
|
Pressures bool
|
||||||
|
Temperatures bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func Import(sourceDSNURL *url.URL, destDNSURL *url.URL, flogger logger.Logger, optImport OptImport) error {
|
||||||
|
|
||||||
|
importMap := map[string]bool{
|
||||||
|
"humidities": optImport.Humidities,
|
||||||
|
"pressures": optImport.Pressures,
|
||||||
|
"temperatures": optImport.Temperatures,
|
||||||
|
}
|
||||||
|
|
||||||
|
// enable sensors if one measured value is enabled
|
||||||
|
if !optImport.Sensors {
|
||||||
|
for key, value := range importMap {
|
||||||
|
if value {
|
||||||
|
flogger.Info("Enable import option sensors. It's required as foreign key for %v", key)
|
||||||
|
optImport.Sensors = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sourceRepo, err := New(sourceDSNURL, flogger)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to open the source repo: %w", err)
|
||||||
|
}
|
||||||
|
defer sourceRepo.Close()
|
||||||
|
|
||||||
|
destRepo, err := New(destDNSURL, flogger)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to open the destination repo: %w", err)
|
||||||
|
}
|
||||||
|
defer destRepo.Close()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
devices, err := sourceRepo.database.SelectDevices(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to fetch devices from source repo: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
flogger.Debug("Found %v devices", len(devices))
|
||||||
|
|
||||||
|
for i := range devices {
|
||||||
|
err := destRepo.AddDevices(devices[i])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to add device %v into dest repo: %w", devices[i].Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if optImport.Sensors {
|
||||||
|
sensors, err := sourceRepo.database.SelectSensors(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to fetch sensors from source repo: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
flogger.Debug("Found %v sensors", len(sensors))
|
||||||
|
|
||||||
|
for i := range sensors {
|
||||||
|
err := destRepo.AddSensors(sensors[i])
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to add sensor %v into dest repo: %w", sensors[i].Name, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for key, f := range map[string]func(context.Context) ([]*types.MeasuredValue, error){
|
||||||
|
"humidities": sourceRepo.database.SelectHumidities,
|
||||||
|
"pressures": sourceRepo.database.SelectPressures,
|
||||||
|
"temperatures": sourceRepo.database.SelectTemperatures,
|
||||||
|
} {
|
||||||
|
if importMap[key] {
|
||||||
|
measuredValues, err := f(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to select %v from source repo: %w", key, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
flogger.Debug("Found %v %v values", len(measuredValues), key)
|
||||||
|
|
||||||
|
err = destRepo.AddMeasuredValues(measuredValues...)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("Failed to add %v into dest repo: %w", key, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user