fix: implement repository test
changes: - Implement repository test for the sqlite backend - Add testutils package to start container images - Remove deprecated till_date in measured values - Renamed columns of the table humidities, pressures and temperatures
This commit is contained in:
200
pkg/testutils/dockerutils/builder.go
Normal file
200
pkg/testutils/dockerutils/builder.go
Normal file
@ -0,0 +1,200 @@
|
||||
package dockerutils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/network"
|
||||
"github.com/docker/go-connections/nat"
|
||||
)
|
||||
|
||||
type Builder struct {
|
||||
client *Client
|
||||
containerConfig *container.Config
|
||||
containerName string
|
||||
hostConfig *container.HostConfig
|
||||
networkConfig *network.NetworkingConfig
|
||||
networks map[string][]string
|
||||
ports []string
|
||||
pull bool
|
||||
waitForHealthy bool
|
||||
}
|
||||
|
||||
// AddEnv to the container
|
||||
func (builder *Builder) AddEnv(key string, value string) *Builder {
|
||||
builder.containerConfig.Env = append(builder.containerConfig.Env, fmt.Sprintf("%v=%v", key, value))
|
||||
return builder
|
||||
}
|
||||
|
||||
// AddLabel to the container
|
||||
func (builder *Builder) AddLabel(key string, value string) *Builder {
|
||||
builder.containerConfig.Labels[key] = value
|
||||
return builder
|
||||
}
|
||||
|
||||
// Env set environment variables to the container
|
||||
func (builder *Builder) Env(env map[string]string) *Builder {
|
||||
builder.containerConfig.Labels = make(map[string]string)
|
||||
for key, value := range env {
|
||||
builder.containerConfig.Labels[key] = value
|
||||
}
|
||||
return builder
|
||||
}
|
||||
|
||||
// Labels set labels to the container
|
||||
func (builder *Builder) Labels(labels map[string]string) *Builder {
|
||||
builder.containerConfig.Labels = labels
|
||||
return builder
|
||||
}
|
||||
|
||||
// Memory defines a memory limit for the container
|
||||
func (builder *Builder) Memory(limit int64) *Builder {
|
||||
builder.hostConfig.Memory = limit
|
||||
return builder
|
||||
}
|
||||
|
||||
// Mount a source volume or hostpath into the filesystem of the container
|
||||
func (builder *Builder) Mount(source string, dest string) *Builder {
|
||||
builder.hostConfig.Binds = append(builder.hostConfig.Binds, fmt.Sprintf("%v:%v", source, dest))
|
||||
return builder
|
||||
}
|
||||
|
||||
// Mounts a set of source volumes or hostpath into the filesystem of the
|
||||
// container
|
||||
func (builder *Builder) Mounts(mounts map[string]string) *Builder {
|
||||
for source, dest := range mounts {
|
||||
builder.Mount(source, dest)
|
||||
}
|
||||
return builder
|
||||
}
|
||||
|
||||
// Port defines a port forwarding from the host machine to the container
|
||||
// Examples:
|
||||
// - 8080:8080
|
||||
// - 10.6.231.10:8080:8080
|
||||
// - 10.6.231.10:8080:8080/tcp
|
||||
func (builder *Builder) Port(port string) *Builder {
|
||||
builder.ports = append(builder.ports, port)
|
||||
return builder
|
||||
}
|
||||
|
||||
// Ports defines a set port forwarding rules from the host machine to the
|
||||
// container
|
||||
// Examples:
|
||||
// - 8080:8080
|
||||
// - 10.6.231.10:8080:8080
|
||||
// - 10.6.231.10:8080:8080/tcp
|
||||
func (builder *Builder) Ports(ports []string) *Builder {
|
||||
builder.ports = ports
|
||||
return builder
|
||||
}
|
||||
|
||||
// Pull the image if absent
|
||||
func (builder *Builder) Pull() *Builder {
|
||||
builder.pull = true
|
||||
return builder
|
||||
}
|
||||
|
||||
// Start the container
|
||||
func (builder *Builder) Start(ctx context.Context) (string, error) {
|
||||
|
||||
// Pull container image if absent
|
||||
if builder.pull {
|
||||
err := builder.client.PullQuiet(ctx, builder.containerConfig.Image)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
// check if ports are available
|
||||
exposedPorts, portBindings, err := nat.ParsePortSpecs(builder.ports)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Failed to parse ports %v: %v", builder.ports, err)
|
||||
}
|
||||
|
||||
if len(portBindings) > 0 {
|
||||
builder.containerConfig.ExposedPorts = exposedPorts
|
||||
builder.hostConfig.PortBindings = portBindings
|
||||
}
|
||||
|
||||
// add endpoint settings to existing networks
|
||||
networkExist := make(map[string]bool, 0)
|
||||
for networkName := range builder.networks {
|
||||
networkExist[networkName] = false
|
||||
}
|
||||
|
||||
networks, err := builder.client.NetworkList(ctx, types.NetworkListOptions{})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Failed to list networks: %v", err)
|
||||
}
|
||||
|
||||
for _, nw := range networks {
|
||||
if aliases, present := builder.networks[nw.Name]; present {
|
||||
networkExist[nw.Name] = true
|
||||
builder.networkConfig.EndpointsConfig[nw.Name] = &network.EndpointSettings{
|
||||
Aliases: aliases,
|
||||
NetworkID: nw.ID,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for nw, found := range networkExist {
|
||||
if !found {
|
||||
return "", fmt.Errorf("Failed to add endpoint settings for network %v. It does not exist", nw)
|
||||
}
|
||||
}
|
||||
|
||||
// create container
|
||||
resp, err := builder.client.ContainerCreate(ctx, builder.containerConfig, builder.hostConfig, builder.networkConfig, builder.containerName)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("Failed to create container %v: %v", builder.containerName, err)
|
||||
}
|
||||
|
||||
// start container
|
||||
err = builder.client.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{})
|
||||
if err != nil {
|
||||
stopError := builder.client.ContainerStopByIDs(ctx, time.Second, resp.ID)
|
||||
if stopError != nil {
|
||||
return "", fmt.Errorf("Failed to start container %v: %v\nUnable to remove container %v. Manual cleanup necessary", builder.containerName, err, builder.containerName)
|
||||
}
|
||||
return "", fmt.Errorf("Failed to start container %v: %v", builder.containerName, err)
|
||||
}
|
||||
|
||||
// wait for healthy
|
||||
if builder.waitForHealthy {
|
||||
|
||||
watcher := builder.client.GetWatcher()
|
||||
errorChannel := make(chan error, 1)
|
||||
doneChannel := make(chan struct{})
|
||||
err = watcher.AddListener(resp.ID, errorChannel, doneChannel)
|
||||
if err != nil {
|
||||
containerRemoveError := builder.client.ContainerRemove(ctx, resp.ID, types.ContainerRemoveOptions{Force: true})
|
||||
if containerRemoveError != nil {
|
||||
return "", fmt.Errorf("Failed while watching status for container %v: %v\nUnable to remove container %v: %v", resp.ID, err, resp.ID, containerRemoveError)
|
||||
}
|
||||
return "", fmt.Errorf("Failed while watching status for container %v: %v", resp.ID, err)
|
||||
}
|
||||
|
||||
select {
|
||||
case err := <-errorChannel:
|
||||
if err != nil {
|
||||
containerRemoveError := builder.client.ContainerRemove(ctx, resp.ID, types.ContainerRemoveOptions{Force: true})
|
||||
if containerRemoveError != nil {
|
||||
return "", fmt.Errorf("Unable to remove container %v: %v", resp.ID, containerRemoveError)
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
case <-doneChannel:
|
||||
}
|
||||
}
|
||||
|
||||
return resp.ID, nil
|
||||
}
|
||||
|
||||
func (builder *Builder) WaitForHealthy() *Builder {
|
||||
builder.waitForHealthy = true
|
||||
return builder
|
||||
}
|
220
pkg/testutils/dockerutils/client.go
Normal file
220
pkg/testutils/dockerutils/client.go
Normal file
@ -0,0 +1,220 @@
|
||||
package dockerutils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/docker/docker/api/types/filters"
|
||||
"github.com/docker/docker/api/types/network"
|
||||
"github.com/docker/docker/client"
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
*client.Client
|
||||
watcher *Watcher
|
||||
mutex *sync.Mutex
|
||||
}
|
||||
|
||||
// Close docker connection
|
||||
func (client *Client) Close() error {
|
||||
client.mutex.Lock()
|
||||
defer client.mutex.Unlock()
|
||||
if client.watcher != nil {
|
||||
client.watcher.stop()
|
||||
}
|
||||
return client.Close()
|
||||
}
|
||||
|
||||
// ContainerListByLabels returns only containers which match by given labels
|
||||
func (client *Client) ContainerListByLabels(ctx context.Context, all bool, labels map[string]string) ([]types.Container, error) {
|
||||
filterArgs := filters.NewArgs()
|
||||
for key, value := range labels {
|
||||
filterArgs.Add("label", fmt.Sprintf("%v=%v", key, value))
|
||||
}
|
||||
containers, err := client.ContainerList(ctx, types.ContainerListOptions{
|
||||
All: all,
|
||||
Filters: filterArgs,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if containers == nil {
|
||||
return nil, fmt.Errorf("No containers found by given labels")
|
||||
}
|
||||
return containers, nil
|
||||
}
|
||||
|
||||
// ContainerListByNames returns only containers which match by given labels
|
||||
func (client *Client) ContainerListByNames(ctx context.Context, all bool, names ...string) ([]types.Container, error) {
|
||||
filterArgs := filters.NewArgs()
|
||||
for _, name := range names {
|
||||
filterArgs.Add("name", name)
|
||||
}
|
||||
containers, err := client.ContainerList(ctx, types.ContainerListOptions{
|
||||
All: all,
|
||||
Filters: filterArgs,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if containers == nil {
|
||||
return nil, fmt.Errorf("No containers found by given names")
|
||||
}
|
||||
return containers, nil
|
||||
}
|
||||
|
||||
// ContainerRemoveByIDs deletes all containers which match by their container ids
|
||||
func (client *Client) ContainerRemoveByIDs(ctx context.Context, containerIDs ...string) error {
|
||||
for _, containerID := range containerIDs {
|
||||
err := client.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{Force: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ContainerRemoveByLabels deletes all containers which match by given labels
|
||||
func (client *Client) ContainerRemoveByLabels(ctx context.Context, labels map[string]string) error {
|
||||
containers, err := client.ContainerListByLabels(ctx, true, labels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, container := range containers {
|
||||
err := client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ContainerRemoveByNames deletes all containers which match by their names
|
||||
func (client *Client) ContainerRemoveByNames(ctx context.Context, names ...string) error {
|
||||
containers, err := client.ContainerListByNames(ctx, true, names...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, container := range containers {
|
||||
err := client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ContainerStopByIDs deletes all containers which match by their container ids
|
||||
func (client *Client) ContainerStopByIDs(ctx context.Context, timeout time.Duration, containerIDs ...string) error {
|
||||
for _, containerID := range containerIDs {
|
||||
err := client.ContainerStop(ctx, containerID, &timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ContainerStopByLabels shutdown containters which match by given labels
|
||||
func (client *Client) ContainerStopByLabels(ctx context.Context, timeout time.Duration, labels map[string]string) error {
|
||||
containers, err := client.ContainerListByLabels(ctx, true, labels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, container := range containers {
|
||||
err := client.ContainerStop(ctx, container.ID, &timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetWatcher returns a watcher for container health states
|
||||
func (client *Client) GetWatcher() *Watcher {
|
||||
if client.watcher != nil {
|
||||
return client.watcher
|
||||
}
|
||||
|
||||
client.mutex.Lock()
|
||||
defer client.mutex.Unlock()
|
||||
|
||||
client.watcher = &Watcher{
|
||||
client: client,
|
||||
errorChannels: make(map[string]chan<- error),
|
||||
doneChannels: make(map[string]chan<- struct{}),
|
||||
errorMapper: make(map[string]ErrorMapper),
|
||||
mutex: new(sync.RWMutex),
|
||||
}
|
||||
|
||||
client.watcher.start()
|
||||
return client.watcher
|
||||
}
|
||||
|
||||
// NewBuilder returns a new builder for containers
|
||||
func (client *Client) NewBuilder(image string) *Builder {
|
||||
return &Builder{
|
||||
client: client,
|
||||
containerConfig: &container.Config{
|
||||
Image: image,
|
||||
},
|
||||
hostConfig: new(container.HostConfig),
|
||||
networkConfig: new(network.NetworkingConfig),
|
||||
networks: make(map[string][]string, 0),
|
||||
ports: make([]string, 0),
|
||||
pull: false,
|
||||
waitForHealthy: false,
|
||||
}
|
||||
}
|
||||
|
||||
// Pull image
|
||||
func (client *Client) Pull(ctx context.Context, image string, w io.Writer) error {
|
||||
|
||||
parts := strings.Split(image, "/")
|
||||
switch len(parts) {
|
||||
case 1:
|
||||
image = fmt.Sprintf("docker.io/library/%v", parts[0])
|
||||
case 2:
|
||||
if strings.Compare(parts[0], "library") == 0 ||
|
||||
strings.Compare(parts[0], "docker.io") == 0 {
|
||||
image = fmt.Sprintf("docker.io/library/%v", parts[1])
|
||||
}
|
||||
}
|
||||
|
||||
readCloser, err := client.ImagePull(ctx, image, types.ImagePullOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = io.Copy(w, readCloser)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// PullQuiet image
|
||||
func (client *Client) PullQuiet(ctx context.Context, image string) error {
|
||||
return client.Pull(ctx, image, ioutil.Discard)
|
||||
}
|
||||
|
||||
// New returns a new dockerutil client
|
||||
func New() (*Client, error) {
|
||||
dockerClient, err := client.NewEnvClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Client{
|
||||
dockerClient,
|
||||
nil,
|
||||
new(sync.Mutex),
|
||||
}, nil
|
||||
}
|
164
pkg/testutils/dockerutils/watcher.go
Normal file
164
pkg/testutils/dockerutils/watcher.go
Normal file
@ -0,0 +1,164 @@
|
||||
package dockerutils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/events"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrDied = errors.New("died")
|
||||
ErrUnhealthy = errors.New("went unhealthy")
|
||||
)
|
||||
|
||||
type Watcher struct {
|
||||
client *Client
|
||||
errorChannels map[string]chan<- error
|
||||
doneChannels map[string]chan<- struct{}
|
||||
errorMapper map[string]ErrorMapper
|
||||
mutex *sync.RWMutex
|
||||
lastError error
|
||||
cancelFunc context.CancelFunc
|
||||
}
|
||||
|
||||
func (watcher *Watcher) AddListenerWithErrorMapper(containerID string, errorChannel chan<- error, doneChannel chan<- struct{}, errorMapper ErrorMapper) error {
|
||||
watcher.mutex.Lock()
|
||||
defer watcher.mutex.Unlock()
|
||||
|
||||
if watcher.lastError != nil {
|
||||
return watcher.lastError
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(20*time.Second))
|
||||
defer cancel()
|
||||
|
||||
containerJSON, err := watcher.client.ContainerInspect(ctx, containerID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Unable to check if container is already unhealthy: %v", err)
|
||||
}
|
||||
|
||||
if containerJSON.State.Dead || !containerJSON.State.Running {
|
||||
go func() {
|
||||
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", containerID, ErrDied))
|
||||
doneChannel <- struct{}{}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
if containerJSON.State.Health == nil {
|
||||
go func() {
|
||||
doneChannel <- struct{}{}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
switch containerJSON.State.Health.Status {
|
||||
case "starting":
|
||||
watcher.errorChannels[containerID] = errorChannel
|
||||
watcher.doneChannels[containerID] = doneChannel
|
||||
watcher.errorMapper[containerID] = errorMapper
|
||||
case "healthy":
|
||||
go func() {
|
||||
doneChannel <- struct{}{}
|
||||
}()
|
||||
case "unhealthy":
|
||||
go func() {
|
||||
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", containerID, ErrUnhealthy))
|
||||
doneChannel <- struct{}{}
|
||||
}()
|
||||
default:
|
||||
go func() {
|
||||
errorChannel <- errorMapper(fmt.Errorf("Container %v went in an unknown state during startup: %s", containerID, containerJSON.State.Health.Status))
|
||||
doneChannel <- struct{}{}
|
||||
}()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (watcher *Watcher) AddListener(containerID string, errorChannel chan<- error, doneChannel chan<- struct{}) error {
|
||||
return watcher.AddListenerWithErrorMapper(containerID, errorChannel, doneChannel, defaultErrorMapper)
|
||||
}
|
||||
|
||||
func (watcher *Watcher) removeListener(containerID string) {
|
||||
watcher.mutex.Lock()
|
||||
defer watcher.mutex.Unlock()
|
||||
delete(watcher.doneChannels, containerID)
|
||||
delete(watcher.errorChannels, containerID)
|
||||
delete(watcher.errorMapper, containerID)
|
||||
}
|
||||
|
||||
func (watcher *Watcher) start() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
watcher.cancelFunc = cancel
|
||||
dockerEvents, errors := watcher.client.Events(ctx, types.EventsOptions{})
|
||||
|
||||
sendErrorFunc := func() {
|
||||
watcher.mutex.Lock()
|
||||
for containerID, errorChannel := range watcher.errorChannels {
|
||||
go func(errorChannel chan<- error, doneChannel chan<- struct{}, err error, errorMapper ErrorMapper) {
|
||||
errorChannel <- errorMapper(err)
|
||||
doneChannel <- struct{}{}
|
||||
}(errorChannel, watcher.doneChannels[containerID], watcher.lastError, watcher.errorMapper[containerID])
|
||||
}
|
||||
watcher.mutex.Unlock()
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case event := <-dockerEvents:
|
||||
watcher.mutex.RLock()
|
||||
errorChannel, present := watcher.errorChannels[event.Actor.ID]
|
||||
if !present || event.Type != events.ContainerEventType {
|
||||
continue
|
||||
}
|
||||
doneChannel := watcher.doneChannels[event.Actor.ID]
|
||||
errorMapper := watcher.errorMapper[event.Actor.ID]
|
||||
watcher.mutex.RUnlock()
|
||||
|
||||
switch event.Action {
|
||||
case "health_status: healthy":
|
||||
go func() {
|
||||
doneChannel <- struct{}{}
|
||||
}()
|
||||
watcher.removeListener(event.Actor.ID)
|
||||
case "health_status: unhealthy":
|
||||
go func() {
|
||||
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", event.Actor.ID, ErrUnhealthy))
|
||||
doneChannel <- struct{}{}
|
||||
}()
|
||||
watcher.removeListener(event.Actor.ID)
|
||||
case "die":
|
||||
go func() {
|
||||
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", event.Actor.ID, ErrDied))
|
||||
doneChannel <- struct{}{}
|
||||
}()
|
||||
watcher.removeListener(event.Actor.ID)
|
||||
}
|
||||
case err := <-errors:
|
||||
watcher.lastError = err
|
||||
sendErrorFunc()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
watcher.lastError = fmt.Errorf("Watcher was canceled")
|
||||
sendErrorFunc()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (watcher *Watcher) stop() {
|
||||
watcher.cancelFunc()
|
||||
}
|
||||
|
||||
type ErrorMapper func(error) error
|
||||
|
||||
func defaultErrorMapper(err error) error {
|
||||
return err
|
||||
}
|
Reference in New Issue
Block a user