From 661f4e86a227adf1343d7a911bf20ed80620d949 Mon Sep 17 00:00:00 2001 From: Markus Pesch Date: Sat, 7 Nov 2020 19:40:39 +0100 Subject: [PATCH] Intial Commit --- .drone.yml | 35 ++++ .editorconfig | 12 ++ .gitattributes | 1 + .gitignore | 5 + LICENSE | 13 ++ Makefile | 52 ++++++ README.md | 74 +++++++++ builder.go | 236 ++++++++++++++++++++++++++ client.go | 402 ++++++++++++++++++++++++++++++++++++++++++++ client_test.go | 443 +++++++++++++++++++++++++++++++++++++++++++++++++ go.mod | 15 ++ go.sum | 35 ++++ watcher.go | 166 ++++++++++++++++++ 13 files changed, 1489 insertions(+) create mode 100644 .drone.yml create mode 100644 .editorconfig create mode 100644 .gitattributes create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 README.md create mode 100644 builder.go create mode 100644 client.go create mode 100644 client_test.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 watcher.go diff --git a/.drone.yml b/.drone.yml new file mode 100644 index 0000000..785e5d6 --- /dev/null +++ b/.drone.yml @@ -0,0 +1,35 @@ +kind: pipeline +type: docker +name: amd64 +steps: +- name: unit test + image: docker.io/volkerraschek/build-image:latest + commands: + - make --jobs=$(nproc) test/coverage + volumes: + - name: dockerAPI + path: /var/run/docker.sock + when: + event: + - push + - pull_request + - tag +- name: notify + image: drillster/drone-email + environment: + PLUGIN_HOST: + from_secret: smtp_host + PLUGIN_USERNAME: + from_secret: smtp_username + PLUGIN_PASSWORD: + from_secret: smtp_password + PLUGIN_FROM: + from_secret: smtp_mail_address + when: + status: + - changed + - failure +volumes: +- name: dockerAPI + host: + path: /var/run/docker.sock \ No newline at end of file diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..b53e68c --- /dev/null +++ b/.editorconfig @@ -0,0 +1,12 @@ +root = true + +[*] +indent_style = space +indent_size = 2 +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = false + +[Makefile] +indent_style = tab \ No newline at end of file diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..dcd9d00 --- /dev/null +++ b/.gitattributes @@ -0,0 +1 @@ +Makefile eol=lf \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5180bd1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +# directories +.vscode/ + +# coverage files +coverage* \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..0098739 --- /dev/null +++ b/LICENSE @@ -0,0 +1,13 @@ +Copyright 2020 Markus Pesch + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..c2fa743 --- /dev/null +++ b/Makefile @@ -0,0 +1,52 @@ +# GOPROXY settings +# If no GOPROXY environment variable available, the pre-defined GOPROXY from go +# env to download and validate go modules is used. Exclude downloading and +# validation of all private modules which are pre-defined in GOPRIVATE. If no +# GOPRIVATE variable defined, the variable of go env is used. +GOPROXY?=$(shell go env GOPROXY) +GOPRIVATE?=$(shell go env GOPRIVATE) + +# CONTAINER_RUNTIME +# The CONTAINER_RUNTIME variable will be used to specified the path to a +# container runtime. This is needed to start and run a container images. +CONTAINER_RUNTIME?=$(shell which docker) +CONTAINER_IMAGE_VERSION?=latest + +# TEST +# ============================================================================== +PHONY+=test/unit +test/unit: + go test -v -race -coverprofile=coverage.txt -covermode=atomic -timeout 600s -count=1 ./... + +test/coverage: test/unit + go tool cover -html=coverage.txt + +# CONTAINER STEPS - TEST +# ============================================================================== +PHONY+=container-run/test/unit +container-run/test/unit: + $(MAKE) container-run COMMAND=${@:container-run/%=%} + +PHONY+=container-run/test/coverage +container-run/test/coverage: + $(MAKE) container-run COMMAND=${@:container-run/%=%} + +# GENERAL CONTAINER COMMAND +# ============================================================================== +PHONY+=container-run +container-run: + ${CONTAINER_RUNTIME} run \ + --rm \ + --volume ${PWD}:/workspace \ + --volume /var/run/docker.sock:/var/run/docker.sock \ + docker.io/volkerraschek/build-image:latest \ + make ${COMMAND} \ + GOPROXY=${GOPROXY} \ + GOPRIVATE=${GOPRIVATE} \ + VERSION=${VERSION:v%=%} + +# PHONY +# ============================================================================== +# Declare the contents of the PHONY variable as phony. We keep that information +# in a variable so we can use it in if_changed. +.PHONY: ${PHONY} \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..d45157c --- /dev/null +++ b/README.md @@ -0,0 +1,74 @@ +# dockerutils + +[![Build Status](https://drone.cryptic.systems/api/badges/volker.raschek/dockerutils/status.svg)](https://drone.cryptic.systems/volker.raschek/dockerutils) + +dockerutils is a small library which extends the official docker library to +create and start images over a builder. Additionally the library provide +functions for easy removeing resources based on ids, names or labels which the +official library does not directly supports. + +## Installing + +Install the library by the following command: + +```bash +go get git.cryptic.systems/volker.raschek/dockerutils +``` + +## Usage + +### Example: Create and remove postgreSQL container + +```go +package main + +import "git.cryptic.systems/volker.raschek/dockerutils" + +func noErr(err){ + if err != nil { + panic(err) + } +} + +func main(){ + dockerClient, err := dockerutils.New() + noErr(err) + + postgresContainerID, err := dockerClient.NewBuilder("postgres:13-alpine"). + Port(fmt.Sprintf("5432:5432/tcp", postgresHostPort)). + Pull(). + AddEnv("PGTZ", "Europe/Berlin"). + AddEnv("POSTGRES_PASSWORD", postgres). + AddEnv("TZ", "Europe/Berlin"). + Mount("/etc/localtime", "/etc/localtime"). + Start(context.Background()) + noErr(err) + defer func(){dockerClient.ContainerRemoveByIDs(context.Background(), postgresContainerID)} +} +``` + +### Example: Create and remove container network + +```go +package main + +import ( + "git.cryptic.systems/volker.raschek/dockerutils" + "github.com/docker/docker/api/types" +) + +func noErr(err){ + if err != nil { + panic(err) + } +} + +func main(){ + dockerClient, err := dockerutils.New() + noErr(err) + + containerNetwork, err := dockerClient.NetworkCreate(ctx, "my-network", tt.NetworkCreate{Labels: map[string]string{"key": "value"}}) + noErr(err) + defer func(){dockerClient.NetworkRemove(context.Background(), containerNetwork.ID)} +} +``` diff --git a/builder.go b/builder.go new file mode 100644 index 0000000..0655aa7 --- /dev/null +++ b/builder.go @@ -0,0 +1,236 @@ +package dockerutils + +import ( + "context" + "fmt" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/network" + "github.com/docker/go-connections/nat" +) + +// Builder is a wrapper around the official docker API to start a container +// image. +type Builder struct { + client *Client + containerConfig *container.Config + containerName string + hostConfig *container.HostConfig + networkConfig *network.NetworkingConfig + networks map[string][]string + ports []string + pull bool + waitForHealthy bool +} + +// AddEnv to the container +func (builder *Builder) AddEnv(key string, value string) *Builder { + builder.containerConfig.Env = append(builder.containerConfig.Env, fmt.Sprintf("%v=%v", key, value)) + return builder +} + +// AddLabel to the container +func (builder *Builder) AddLabel(key string, value string) *Builder { + builder.containerConfig.Labels[key] = value + return builder +} + +// Env set environment variables to the container +func (builder *Builder) Env(env map[string]string) *Builder { + builder.containerConfig.Labels = make(map[string]string) + for key, value := range env { + builder.containerConfig.Env = append(builder.containerConfig.Env, fmt.Sprintf("%v=%v", key, value)) + } + return builder +} + +// Labels set labels to the container +func (builder *Builder) Labels(labels map[string]string) *Builder { + builder.containerConfig.Labels = labels + return builder +} + +// Memory defines a memory limit for the container +func (builder *Builder) Memory(limit int64) *Builder { + builder.hostConfig.Memory = limit + return builder +} + +// Mount a source volume or hostpath into the filesystem of the container +func (builder *Builder) Mount(source string, dest string) *Builder { + builder.hostConfig.Binds = append(builder.hostConfig.Binds, fmt.Sprintf("%v:%v", source, dest)) + return builder +} + +// Mounts a set of source volumes or hostpath into the filesystem of the +// container +func (builder *Builder) Mounts(mounts map[string]string) *Builder { + for source, dest := range mounts { + builder.Mount(source, dest) + } + return builder +} + +// Network add the container with aliasses to a specific network +func (builder *Builder) Network(networkName string, aliasses ...string) *Builder { + builder.networks[networkName] = aliasses + return builder +} + +// Port defines a port forwarding from the host machine to the container +// Examples: +// - 8080:8080 +// - 10.6.231.10:8080:8080 +// - 10.6.231.10:8080:8080/tcp +func (builder *Builder) Port(port string) *Builder { + builder.ports = append(builder.ports, port) + return builder +} + +// Ports defines a set port forwarding rules from the host machine to the +// container +// Examples: +// - 8080:8080 +// - 10.6.231.10:8080:8080 +// - 10.6.231.10:8080:8080/tcp +func (builder *Builder) Ports(ports []string) *Builder { + builder.ports = ports + return builder +} + +// Pull the image if absent +func (builder *Builder) Pull() *Builder { + builder.pull = true + return builder +} + +// Start the container +func (builder *Builder) Start(ctx context.Context) (string, error) { + + // Pull container image if absent + if builder.pull { + err := builder.client.PullQuiet(ctx, builder.containerConfig.Image) + if err != nil { + return "", err + } + } + + // Network: portbinding Host->Container + exposedPorts, portBindings, err := nat.ParsePortSpecs(builder.ports) + if err != nil { + return "", fmt.Errorf("unabel to parse ports: %w", err) + } + if len(portBindings) > 0 { + time.Sleep(1 * time.Second) + builder.containerConfig.ExposedPorts = exposedPorts + builder.hostConfig.PortBindings = portBindings + } + + // Network: Add container to container networks + // Add the container to the first defined container network, if any one is + // defined. If no one is defined, the docker API will add the container to + // their default bridge docker0. The other networks will be added to the + // container after the container start. + var ( + networkNames = make([]string, 0) + networks = make([]types.NetworkResource, 0) + ) + if len(builder.networks) > 0 { + for networkName := range builder.networks { + networkNames = append(networkNames, networkName) + } + var err error + networks, err = builder.client.NetworkListByNames(ctx, networkNames...) + if err != nil { + return "", err + } + + endpointSetting := &network.EndpointSettings{ + NetworkID: networks[0].ID, + Aliases: builder.networks[networkNames[0]], + } + + builder.networkConfig.EndpointsConfig[networkNames[0]] = endpointSetting + + networkNames = networkNames[1:] + networks = networks[1:] + } + + // Container: Create + resp, err := builder.client.ContainerCreate( + ctx, + builder.containerConfig, + builder.hostConfig, + builder.networkConfig, + builder.containerName, + ) + if err != nil { + return "", fmt.Errorf("Unable to create container: %w", err) + } + + err = builder.client.ContainerStart(ctx, resp.ID, types.ContainerStartOptions{}) + if err != nil { + + shutdownErr := builder.client.ContainerStopByIDs(ctx, 1*time.Second, resp.ID) + if shutdownErr != nil { + return "", fmt.Errorf("Unable to start container: %w\nUnable to remove container %s: %v\nManual cleanup necessary", err, resp.ID, shutdownErr) + } + return "", fmt.Errorf("Unable to start container: %w", err) + } + + // Network: Add more container networks + for i, networkName := range networkNames { + endpointSetting := &network.EndpointSettings{ + NetworkID: networks[i].ID, + Aliases: builder.networks[networkName], + } + err := builder.client.NetworkConnect(ctx, networks[i].ID, resp.ID, endpointSetting) + if err != nil { + return "", fmt.Errorf("Unable to append container endpoint to network %v", networkName) + } + } + + if builder.waitForHealthy { + watcher := builder.client.GetWatcher() + errors := make(chan error, 1) + done := make(chan struct{}) + err = watcher.AddListener(resp.ID, errors, done) + if err != nil { + containerRemoveError := builder.client.ContainerRemove(ctx, resp.ID, types.ContainerRemoveOptions{Force: true}) + if containerRemoveError != nil { + return "", fmt.Errorf("error while watching for container status: %w - unable to remove container: %v", err, containerRemoveError) + } + return "", fmt.Errorf("error while watching for container status: %w", err) + } + + select { + case err := <-errors: + if err != nil { + containerRemoveError := builder.client.ContainerRemove(ctx, resp.ID, types.ContainerRemoveOptions{Force: true}) + if containerRemoveError != nil { + return "", fmt.Errorf("%w - unable to remove container: %v", err, containerRemoveError) + } + return "", err + } + case <-done: + + } + } + + return resp.ID, err +} + +// WaitForHealthy set the option to wait during the start process until the +// container is healthy. +func (builder *Builder) WaitForHealthy() *Builder { + builder.waitForHealthy = true + return builder +} + +// WithName set the name of the container +func (builder *Builder) WithName(containerName string) *Builder { + builder.containerName = containerName + return builder +} diff --git a/client.go b/client.go new file mode 100644 index 0000000..e0c07be --- /dev/null +++ b/client.go @@ -0,0 +1,402 @@ +package dockerutils + +import ( + "context" + "fmt" + "io" + "io/ioutil" + "strings" + "sync" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/network" + "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/client" +) + +// Client from the docker API with additional functions +type Client struct { + *client.Client + watcher *Watcher + mutex *sync.Mutex +} + +// Close docker connection +func (client *Client) Close() error { + client.mutex.Lock() + defer client.mutex.Unlock() + if client.watcher != nil { + client.watcher.stop() + } + + return client.Client.Close() +} + +// ContainerListByLabels returns only containers which match by given labels +func (client *Client) ContainerListByLabels(ctx context.Context, all bool, containerLabels map[string]string) ([]types.Container, error) { + filterArgs := filters.NewArgs() + for key, value := range containerLabels { + filterArgs.Add("label", fmt.Sprintf("%v=%v", key, value)) + } + containers, err := client.ContainerList(ctx, types.ContainerListOptions{ + All: all, + Filters: filterArgs, + }) + if err != nil { + return nil, err + } + if containers == nil { + return nil, fmt.Errorf("No containers found by given labels") + } + return containers, nil +} + +// ContainerListByNames returns only containers which match by given labels +func (client *Client) ContainerListByNames(ctx context.Context, all bool, containerNames ...string) ([]types.Container, error) { + filterArgs := filters.NewArgs() + for _, name := range containerNames { + filterArgs.Add("name", name) + } + containers, err := client.ContainerList(ctx, types.ContainerListOptions{ + All: all, + Filters: filterArgs, + }) + if err != nil { + return nil, err + } + if containers == nil { + return nil, fmt.Errorf("No containers found by given names") + } + return containers, nil +} + +// ContainerRemoveByIDs deletes all containers which match by their container ids +func (client *Client) ContainerRemoveByIDs(ctx context.Context, containerIDs ...string) error { + for _, containerID := range containerIDs { + err := client.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{Force: true}) + if err != nil { + return err + } + } + return nil +} + +// ContainerRemoveByLabels deletes all containers which match by given labels +func (client *Client) ContainerRemoveByLabels(ctx context.Context, containerLabels map[string]string) error { + containers, err := client.ContainerListByLabels(ctx, true, containerLabels) + if err != nil { + return err + } + for _, container := range containers { + err := client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true}) + if err != nil { + return err + } + } + return nil +} + +// ContainerRemoveByNames deletes all containers which match by their names +func (client *Client) ContainerRemoveByNames(ctx context.Context, containerNames ...string) error { + containers, err := client.ContainerListByNames(ctx, true, containerNames...) + if err != nil { + return err + } + for _, container := range containers { + err := client.ContainerRemove(ctx, container.ID, types.ContainerRemoveOptions{Force: true}) + if err != nil { + return err + } + } + return nil +} + +// ContainerStopByIDs deletes all containers which match by their container ids +func (client *Client) ContainerStopByIDs(ctx context.Context, timeout time.Duration, containerIDs ...string) error { + for _, containerID := range containerIDs { + err := client.ContainerStop(ctx, containerID, &timeout) + if err != nil { + return err + } + } + return nil +} + +// ContainerStopByLabels shutdown containters which match by given labels +func (client *Client) ContainerStopByLabels(ctx context.Context, timeout time.Duration, containerLabels map[string]string) error { + containers, err := client.ContainerListByLabels(ctx, true, containerLabels) + if err != nil { + return err + } + for _, container := range containers { + err := client.ContainerStop(ctx, container.ID, &timeout) + if err != nil { + return err + } + } + return nil +} + +// ContainerStopByNames shutdown containters matching by their names +func (client *Client) ContainerStopByNames(ctx context.Context, timeout time.Duration, containerNames ...string) error { + containers, err := client.ContainerListByNames(ctx, true, containerNames...) + if err != nil { + return err + } + for _, container := range containers { + err := client.ContainerStop(ctx, container.ID, &timeout) + if err != nil { + return err + } + } + return nil +} + +// GetWatcher returns a watcher for container health states +func (client *Client) GetWatcher() *Watcher { + if client.watcher != nil { + return client.watcher + } + + client.mutex.Lock() + defer client.mutex.Unlock() + + client.watcher = &Watcher{ + client: client, + errorChannels: make(map[string]chan<- error), + doneChannels: make(map[string]chan<- struct{}), + errorMapper: make(map[string]ErrorMapper), + mutex: new(sync.RWMutex), + } + + client.watcher.start() + return client.watcher +} + +// NetworkListByLabels returns networks which match by given labels +func (client *Client) NetworkListByLabels(ctx context.Context, networkLabels map[string]string) ([]types.NetworkResource, error) { + args := filters.NewArgs() + for key, value := range networkLabels { + args.Add("label", fmt.Sprintf("%v=%v", key, value)) + } + + return client.NetworkList(ctx, types.NetworkListOptions{ + Filters: args, + }) +} + +// NetworkListByNames returns networks which match by their names. If a +// network can not be found, the function returns an error +func (client *Client) NetworkListByNames(ctx context.Context, networkNames ...string) ([]types.NetworkResource, error) { + networks, err := client.NetworkList(ctx, types.NetworkListOptions{}) + if err != nil { + return nil, err + } + + foundNetwork := make(map[string]bool, 0) + for _, networkName := range networkNames { + foundNetwork[networkName] = false + } + + filteredNetworks := make([]types.NetworkResource, 0) + for _, networkName := range networkNames { + for _, network := range networks { + if network.Name == networkName { + filteredNetworks = append(filteredNetworks, network) + foundNetwork[networkName] = true + } + } + } + + for _, networkName := range networkNames { + if !foundNetwork[networkName] { + return nil, fmt.Errorf("Network %v not found", networkName) + } + } + + return filteredNetworks, nil +} + +// NetworkRemoveByLabels remove all networks which match by given labels +func (client *Client) NetworkRemoveByLabels(ctx context.Context, containerLabels map[string]string) error { + networks, err := client.NetworkListByLabels(ctx, containerLabels) + if err != nil { + return err + } + + for _, network := range networks { + err := client.NetworkRemove(ctx, network.ID) + if err != nil { + return err + } + } + + return nil +} + +// NetworkRemoveByNames remove all networks match by their names. If a +// network can not be found, the function returns an error +func (client *Client) NetworkRemoveByNames(ctx context.Context, networkNames ...string) error { + networks, err := client.NetworkListByNames(ctx, networkNames...) + if err != nil { + return err + } + + for _, network := range networks { + err := client.NetworkRemove(ctx, network.ID) + if err != nil { + return err + } + } + + return nil +} + +// NetworkRemoveByIDs remove all networks match by their id +func (client *Client) NetworkRemoveByIDs(ctx context.Context, containerIDs ...string) error { + for _, containerID := range containerIDs { + err := client.NetworkRemove(ctx, containerID) + if err != nil { + return err + } + } + return nil +} + +// NewBuilder returns a new builder for containers +func (client *Client) NewBuilder(image string) *Builder { + return &Builder{ + client: client, + containerConfig: &container.Config{ + Image: image, + }, + hostConfig: new(container.HostConfig), + networkConfig: &network.NetworkingConfig{ + EndpointsConfig: make(map[string]*network.EndpointSettings, 0), + }, + networks: make(map[string][]string, 0), + ports: make([]string, 0), + pull: false, + waitForHealthy: false, + } +} + +// Pull image +func (client *Client) Pull(ctx context.Context, image string, w io.Writer) error { + + parts := strings.Split(image, "/") + switch len(parts) { + case 1: + image = fmt.Sprintf("docker.io/library/%v", parts[0]) + case 2: + if strings.Compare(parts[0], "library") == 0 || + strings.Compare(parts[0], "docker.io") == 0 { + image = fmt.Sprintf("docker.io/library/%v", parts[1]) + } + } + + readCloser, err := client.ImagePull(ctx, image, types.ImagePullOptions{}) + if err != nil { + return err + } + + _, err = io.Copy(w, readCloser) + if err != nil { + return err + } + + return nil +} + +// PullQuiet image +func (client *Client) PullQuiet(ctx context.Context, image string) error { + return client.Pull(ctx, image, ioutil.Discard) +} + +// VolumeListByLabels returns volumes which match by given labels +func (client *Client) VolumeListByLabels(ctx context.Context, volumeLabels map[string]string) (volume.VolumesListOKBody, error) { + args := filters.NewArgs() + for key, value := range volumeLabels { + args.Add("label", fmt.Sprintf("%v=%v", key, value)) + } + + return client.VolumeList(ctx, args) +} + +// VolumeListByNames returns volumes which match by their names. If a +// volume can not be found, the function returns an error +func (client *Client) VolumeListByNames(ctx context.Context, volumeNames ...string) (volume.VolumesListOKBody, error) { + args := filters.NewArgs() + foundVolumes := make(map[string]bool, 0) + for _, volumeName := range volumeNames { + foundVolumes[volumeName] = false + args.Add("name", volumeName) + } + + volumes, err := client.VolumeList(ctx, args) + if err != nil { + return volume.VolumesListOKBody{}, err + } + + for _, volume := range volumes.Volumes { + foundVolumes[volume.Name] = true + } + + for _, volumeName := range volumeNames { + if foundVolumes[volumeName] != true { + return volume.VolumesListOKBody{}, fmt.Errorf("Volume %v not found", volumeName) + } + } + return volumes, err +} + +// VolumeRemoveByLabels remove all volumes match by their labels +func (client *Client) VolumeRemoveByLabels(ctx context.Context, volumeLabels map[string]string) error { + volumes, err := client.VolumeListByLabels(ctx, volumeLabels) + if err != nil { + return err + } + + for _, volume := range volumes.Volumes { + err := client.VolumeRemove(ctx, volume.Name, true) + if err != nil { + return err + } + } + + return nil +} + +// VolumeRemoveByNames remove all volumes match by their names. If a +// volume can not be found, the function returns an error +func (client *Client) VolumeRemoveByNames(ctx context.Context, volumeNames ...string) error { + volumes, err := client.VolumeListByNames(ctx, volumeNames...) + if err != nil { + return err + } + + for _, volume := range volumes.Volumes { + err := client.VolumeRemove(ctx, volume.Name, true) + if err != nil { + return err + } + } + + return nil +} + +// New returns a new dockerutil client +func New() (*Client, error) { + dockerClient, err := client.NewEnvClient() + if err != nil { + return nil, err + } + return &Client{ + dockerClient, + nil, + new(sync.Mutex), + }, nil +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..86a4f11 --- /dev/null +++ b/client_test.go @@ -0,0 +1,443 @@ +package dockerutils + +import ( + "context" + "fmt" + "net/http" + "strings" + "testing" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/volume" + uuid "github.com/satori/go.uuid" + "github.com/stretchr/testify/require" +) + +// TestContainerCRUD +// Test the following API functions: +// - ContainerListByLabels +// - ContainerListByNames +// - ContainerRemoveByNames +// - ContainerRemoveByLabels +// - ContainerRemoveByIDs +func TestContainerCRUD(t *testing.T) { + var ( + ctx = context.Background() + require = require.New(t) + + iterations = 5 + + cleanupLabels = map[string]string{ + uuid.NewV4().String(): uuid.NewV4().String(), + } + ) + + dockerClient, err := New() + require.NoError(err) + t.Cleanup(func() { + dockerClient.ContainerRemoveByLabels(ctx, cleanupLabels) + }) + + // Create Containers + containerIDs := make([]string, 0) + containerNames := make([]string, 0) + for i := 0; i < iterations; i++ { + containerName := uuid.NewV4().String() + containerID, err := dockerClient.NewBuilder("nginx:alpine"). + Labels(cleanupLabels). + Port("80"). + Pull(). + WithName(containerName). + Start(ctx) + require.NoError(err) + + containerNames = append(containerNames, containerName) + containerIDs = append(containerIDs, containerID) + } + + // ListByLabels + containers, err := dockerClient.ContainerListByLabels(ctx, true, cleanupLabels) + require.NoError(err) + require.Len(containers, iterations) + for _, container := range containers { + require.Contains(containerIDs, container.ID) + require.Contains(containerNames, strings.Split(container.Names[0], "/")[1]) + } + + // ListByNames + containers, err = dockerClient.ContainerListByNames(ctx, true, containerNames...) + require.NoError(err) + require.Len(containers, iterations) + for _, container := range containers { + require.Contains(containerIDs, container.ID) + require.Contains(containerNames, strings.Split(container.Names[0], "/")[1]) + } + + // RemoveByLabels + err = dockerClient.ContainerRemoveByLabels(ctx, cleanupLabels) + require.NoError(err) + + containers, err = dockerClient.ContainerListByLabels(ctx, true, cleanupLabels) + require.NoError(err) + require.Len(containers, 0) + + // Create + containerIDs = make([]string, 0) + containerNames = make([]string, 0) + for i := 0; i < iterations; i++ { + containerName := uuid.NewV4().String() + containerID, err := dockerClient.NewBuilder("nginx:alpine"). + Labels(cleanupLabels). + Port("80"). + Pull(). + WithName(containerName). + Start(ctx) + require.NoError(err) + + containerNames = append(containerNames, containerName) + containerIDs = append(containerIDs, containerID) + } + + // RemoveByNames + err = dockerClient.ContainerRemoveByNames(ctx, containerNames...) + require.NoError(err) + + containers, err = dockerClient.ContainerListByNames(ctx, true, containerNames...) + require.NoError(err) + require.Len(containers, 0) + + // Create + containerIDs = make([]string, 0) + containerNames = make([]string, 0) + for i := 0; i < iterations; i++ { + containerName := uuid.NewV4().String() + containerID, err := dockerClient.NewBuilder("nginx:alpine"). + Labels(cleanupLabels). + Port("80"). + Pull(). + WithName(containerName). + Start(ctx) + require.NoError(err) + + containerNames = append(containerNames, containerName) + containerIDs = append(containerIDs, containerID) + } + + // RemoveByID + err = dockerClient.ContainerRemoveByIDs(ctx, containerIDs...) + require.NoError(err) + + containers, err = dockerClient.ContainerListByLabels(ctx, true, cleanupLabels) + require.NoError(err) + require.Len(containers, 0) +} + +// TestNetworkCRUD +// Test the following API functions: +// - NetworkListByLabels +// - NetworkListByNames +// - NetworkRemoveByLabels +// - NetworkRemoveByNames +// - NetworkRemoveByIDs +func TestNetworkCRUD(t *testing.T) { + var ( + ctx = context.Background() + require = require.New(t) + + iterations = 5 + + cleanupLabels = map[string]string{ + uuid.NewV4().String(): uuid.NewV4().String(), + } + ) + + dockerClient, err := New() + require.NoError(err) + t.Cleanup(func() { + dockerClient.NetworkRemoveByLabels(ctx, cleanupLabels) + }) + + // Create Networks + networkIDs := make([]string, 0) + networkNames := make([]string, 0) + for i := 0; i < iterations; i++ { + networkName := uuid.NewV4().String() + resp, err := dockerClient.NetworkCreate(ctx, networkName, types.NetworkCreate{ + Labels: cleanupLabels, + }) + require.NoError(err) + + networkNames = append(networkNames, networkName) + networkIDs = append(networkIDs, resp.ID) + } + + // ListByLabels + networks, err := dockerClient.NetworkListByLabels(ctx, cleanupLabels) + require.NoError(err) + require.Len(networks, iterations) + for _, network := range networks { + require.Contains(networkIDs, network.ID) + require.Contains(networkNames, network.Name) + } + + // ListByLabels, network with label does not exist + networks, err = dockerClient.NetworkListByLabels(ctx, map[string]string{uuid.NewV4().String(): uuid.NewV4().String()}) + require.NoError(err) + require.Len(networks, 0) + + // ListByNames + networks, err = dockerClient.NetworkListByNames(ctx, networkNames...) + require.NoError(err) + require.Len(networks, iterations) + for _, network := range networks { + require.Contains(networkIDs, network.ID) + require.Contains(networkNames, network.Name) + } + + // ListByNames, network with names does not exist + networks, err = dockerClient.NetworkListByNames(ctx, uuid.NewV4().String(), uuid.NewV4().String()) + require.Error(err) + require.Nil(networks) + + // RemoveByLabels + err = dockerClient.NetworkRemoveByLabels(ctx, cleanupLabels) + require.NoError(err) + + networks, err = dockerClient.NetworkListByLabels(ctx, cleanupLabels) + require.NoError(err) + require.Len(networks, 0) + + // RemoveByLabels, label does not exists + err = dockerClient.NetworkRemoveByLabels(ctx, map[string]string{uuid.NewV4().String(): uuid.NewV4().String()}) + require.NoError(err) + + // Create Networks + networkIDs = make([]string, 0) + networkNames = make([]string, 0) + for i := 0; i < iterations; i++ { + networkName := uuid.NewV4().String() + resp, err := dockerClient.NetworkCreate(ctx, networkName, types.NetworkCreate{ + Labels: cleanupLabels, + }) + require.NoError(err) + + networkNames = append(networkNames, networkName) + networkIDs = append(networkIDs, resp.ID) + } + + // RemoveByNames + err = dockerClient.NetworkRemoveByNames(ctx, networkNames...) + require.NoError(err) + + networks, err = dockerClient.NetworkListByNames(ctx, networkNames...) + require.Error(err) + require.Nil(networks) + + // RemoveByNames, name does not exists + err = dockerClient.NetworkRemoveByNames(ctx, uuid.NewV4().String()) + require.Error(err) + + // Create Networks + networkIDs = make([]string, 0) + networkNames = make([]string, 0) + for i := 0; i < iterations; i++ { + networkName := uuid.NewV4().String() + resp, err := dockerClient.NetworkCreate(ctx, networkName, types.NetworkCreate{ + Labels: cleanupLabels, + }) + require.NoError(err) + + networkNames = append(networkNames, networkName) + networkIDs = append(networkIDs, resp.ID) + } + + // RemoveByIDs + err = dockerClient.NetworkRemoveByIDs(ctx, networkIDs...) + require.NoError(err) + + networks, err = dockerClient.NetworkListByNames(ctx, networkNames...) + require.Error(err) + require.Nil(networks) + + // RemoveByID, id does not exists + err = dockerClient.NetworkRemoveByIDs(ctx, uuid.NewV4().String()) + require.Error(err) +} + +func TestVolumeCRUD(t *testing.T) { + var ( + ctx = context.Background() + require = require.New(t) + + iterations = 5 + + cleanupLabels = map[string]string{ + uuid.NewV4().String(): uuid.NewV4().String(), + } + ) + + dockerClient, err := New() + require.NoError(err) + t.Cleanup(func() { + dockerClient.VolumeRemoveByLabels(ctx, cleanupLabels) + }) + + // Create Volumes + volumeNames := make([]string, 0) + for i := 0; i < iterations; i++ { + volumeName := uuid.NewV4().String() + volume, err := dockerClient.VolumeCreate(ctx, volume.VolumesCreateBody{ + Name: volumeName, + Labels: cleanupLabels, + }) + require.NoError(err) + + volumeNames = append(volumeNames, volume.Name) + } + + // ListByLabels + volumes, err := dockerClient.VolumeListByLabels(ctx, cleanupLabels) + require.NoError(err) + require.Len(volumes.Volumes, iterations) + for _, volume := range volumes.Volumes { + require.Contains(volumeNames, volume.Name) + } + + // ListByLabels, network with label does not exist + volumes, err = dockerClient.VolumeListByLabels(ctx, map[string]string{uuid.NewV4().String(): uuid.NewV4().String()}) + require.NoError(err) + require.Len(volumes.Volumes, 0) + + // ListByNames + volumes, err = dockerClient.VolumeListByNames(ctx, volumeNames...) + require.NoError(err) + require.Len(volumes.Volumes, iterations) + for _, volume := range volumes.Volumes { + require.Contains(volumeNames, volume.Name) + } + + // ListByNames, network with names does not exist + volumes, err = dockerClient.VolumeListByNames(ctx, uuid.NewV4().String(), uuid.NewV4().String()) + require.Error(err) + require.Nil(volumes.Volumes) + + // RemoveByLabels + err = dockerClient.VolumeRemoveByLabels(ctx, cleanupLabels) + require.NoError(err) + + volumes, err = dockerClient.VolumeListByLabels(ctx, cleanupLabels) + require.NoError(err) + require.Len(volumes.Volumes, 0) + + // RemoveByLabels, labels does not exists + err = dockerClient.NetworkRemoveByLabels(ctx, map[string]string{uuid.NewV4().String(): uuid.NewV4().String()}) + require.NoError(err) + + // Create Volumes + volumeNames = make([]string, 0) + for i := 0; i < iterations; i++ { + volumeName := uuid.NewV4().String() + volume, err := dockerClient.VolumeCreate(ctx, volume.VolumesCreateBody{ + Name: volumeName, + Labels: cleanupLabels, + }) + require.NoError(err) + + volumeNames = append(volumeNames, volume.Name) + } + + // RemoveByNames + err = dockerClient.VolumeRemoveByNames(ctx, volumeNames...) + require.NoError(err) + + volumes, err = dockerClient.VolumeListByNames(ctx, volumeNames...) + require.Error(err) + require.Nil(volumes.Volumes) + + // RemoveByNames, name does not exists + err = dockerClient.NetworkRemoveByNames(ctx, uuid.NewV4().String()) + require.Error(err) +} + +// TestContainerMultipleNetworks +// Test if a container can be accessed over multiple networks/ips. +func TestContainerMultipleNetworks(t *testing.T) { + var ( + ctx = context.Background() + require = require.New(t) + + iterations = 5 + + cleanupLabels = map[string]string{ + uuid.NewV4().String(): uuid.NewV4().String(), + } + ) + + dockerClient, err := New() + require.NoError(err) + t.Cleanup(func() { + dockerClient.ContainerRemoveByLabels(ctx, cleanupLabels) + dockerClient.NetworkRemoveByLabels(ctx, cleanupLabels) + }) + + // Create Containers + containerIDs := make([]string, 0) + containerNames := make([]string, 0) + containersNetworks := make(map[string]map[string][]string, 0) + for i := 0; i < iterations; i++ { + containerName := uuid.NewV4().String() + + containerNetworks := map[string][]string{ + uuid.NewV4().String(): { + uuid.NewV4().String(), + uuid.NewV4().String(), + }, + uuid.NewV4().String(): { + uuid.NewV4().String(), + uuid.NewV4().String(), + }, + } + + builder := dockerClient.NewBuilder("nginx:alpine"). + Labels(cleanupLabels). + Port("80"). + Pull(). + WithName(containerName) + + for networkName, aliasses := range containerNetworks { + _, err := dockerClient.NetworkCreate(ctx, networkName, types.NetworkCreate{ + Labels: cleanupLabels, + }) + require.NoError(err) + + builder.Network(networkName, aliasses...) + } + + containerID, err := builder.Start(ctx) + require.NoError(err) + + containerNames = append(containerNames, containerName) + containerIDs = append(containerIDs, containerID) + containersNetworks[containerID] = containerNetworks + } + + for containerID, containerNetworks := range containersNetworks { + for networkName := range containerNetworks { + networks, err := dockerClient.NetworkListByNames(ctx, networkName) + require.NoError(err) + for _, network := range networks { + if _, present := network.Containers[containerID]; !present { + require.Fail("Container %v not found in network %v", containerID, network.ID) + } + + networkIPParts := strings.Split(network.Containers[containerID].IPv4Address, "/") + + url := fmt.Sprintf("http://%v", networkIPParts[0]) + resp, err := http.Get(url) + require.NoError(err) + defer resp.Body.Close() + require.Equal(http.StatusOK, resp.StatusCode) + } + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..3476bce --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module git.cryptic.systems/volker.raschek/dockerutils + +go 1.15 + +require ( + github.com/docker/distribution v2.7.1+incompatible // indirect + github.com/docker/docker v1.13.1 + github.com/docker/go-connections v0.4.0 + github.com/docker/go-units v0.4.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/satori/go.uuid v1.2.0 + github.com/stretchr/testify v1.6.1 + golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..d6a2c2d --- /dev/null +++ b/go.sum @@ -0,0 +1,35 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug= +github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= +github.com/docker/docker v1.13.1 h1:IkZjBSIc8hBjLpqeAbeE5mca5mNgeatLHBy3GO78BWo= +github.com/docker/docker v1.13.1/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= +github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= +github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw= +github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201031054903-ff519b6c9102 h1:42cLlJJdEh+ySyeUUbEQ5bsTiq8voBeTuweGVkY6Puw= +golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/watcher.go b/watcher.go new file mode 100644 index 0000000..b03f940 --- /dev/null +++ b/watcher.go @@ -0,0 +1,166 @@ +package dockerutils + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/events" +) + +var ( + ErrDied = errors.New("died") + ErrUnhealthy = errors.New("went unhealthy") +) + +// Watcher is a helper to listen on docker API events to notify if a container +// is healthy or not. +type Watcher struct { + client *Client + errorChannels map[string]chan<- error + doneChannels map[string]chan<- struct{} + errorMapper map[string]ErrorMapper + mutex *sync.RWMutex + lastError error + cancelFunc context.CancelFunc +} + +func (watcher *Watcher) AddListenerWithErrorMapper(containerID string, errorChannel chan<- error, doneChannel chan<- struct{}, errorMapper ErrorMapper) error { + watcher.mutex.Lock() + defer watcher.mutex.Unlock() + + if watcher.lastError != nil { + return watcher.lastError + } + + ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(20*time.Second)) + defer cancel() + + containerJSON, err := watcher.client.ContainerInspect(ctx, containerID) + if err != nil { + return fmt.Errorf("Unable to check if container is already unhealthy: %w", err) + } + + if containerJSON.State.Dead || !containerJSON.State.Running { + go func() { + errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", containerID, ErrDied)) + doneChannel <- struct{}{} + }() + return nil + } + + if containerJSON.State.Health == nil { + go func() { + doneChannel <- struct{}{} + }() + return nil + } + + switch containerJSON.State.Health.Status { + case "starting": + watcher.errorChannels[containerID] = errorChannel + watcher.doneChannels[containerID] = doneChannel + watcher.errorMapper[containerID] = errorMapper + case "healthy": + go func() { + doneChannel <- struct{}{} + }() + case "unhealthy": + go func() { + errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", containerID, ErrUnhealthy)) + doneChannel <- struct{}{} + }() + default: + go func() { + errorChannel <- errorMapper(fmt.Errorf("Container %v went in an unknown state during startup: %s", containerID, containerJSON.State.Health.Status)) + doneChannel <- struct{}{} + }() + } + return nil +} + +func (watcher *Watcher) AddListener(containerID string, errorChannel chan<- error, doneChannel chan<- struct{}) error { + return watcher.AddListenerWithErrorMapper(containerID, errorChannel, doneChannel, defaultErrorMapper) +} + +func (watcher *Watcher) removeListener(containerID string) { + watcher.mutex.Lock() + defer watcher.mutex.Unlock() + delete(watcher.doneChannels, containerID) + delete(watcher.errorChannels, containerID) + delete(watcher.errorMapper, containerID) +} + +func (watcher *Watcher) start() { + ctx, cancel := context.WithCancel(context.Background()) + watcher.cancelFunc = cancel + dockerEvents, errors := watcher.client.Events(ctx, types.EventsOptions{}) + + sendErrorFunc := func() { + watcher.mutex.Lock() + for containerID, errorChannel := range watcher.errorChannels { + go func(errorChannel chan<- error, doneChannel chan<- struct{}, err error, errorMapper ErrorMapper) { + errorChannel <- errorMapper(err) + doneChannel <- struct{}{} + }(errorChannel, watcher.doneChannels[containerID], watcher.lastError, watcher.errorMapper[containerID]) + } + watcher.mutex.Unlock() + } + + go func() { + for { + select { + case event := <-dockerEvents: + watcher.mutex.RLock() + errorChannel, present := watcher.errorChannels[event.Actor.ID] + if !present || event.Type != events.ContainerEventType { + continue + } + doneChannel := watcher.doneChannels[event.Actor.ID] + errorMapper := watcher.errorMapper[event.Actor.ID] + watcher.mutex.RUnlock() + + switch event.Action { + case "health_status: healthy": + go func() { + doneChannel <- struct{}{} + }() + watcher.removeListener(event.Actor.ID) + case "health_status: unhealthy": + go func() { + errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", event.Actor.ID, ErrUnhealthy)) + doneChannel <- struct{}{} + }() + watcher.removeListener(event.Actor.ID) + case "die": + go func() { + errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", event.Actor.ID, ErrDied)) + doneChannel <- struct{}{} + }() + watcher.removeListener(event.Actor.ID) + } + case err := <-errors: + watcher.lastError = err + sendErrorFunc() + return + case <-ctx.Done(): + watcher.lastError = fmt.Errorf("Watcher was canceled") + sendErrorFunc() + return + } + } + }() +} + +func (watcher *Watcher) stop() { + watcher.cancelFunc() +} + +type ErrorMapper func(error) error + +func defaultErrorMapper(err error) error { + return err +}