dockerutils/watcher.go

167 lines
4.5 KiB
Go

package dockerutils
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/events"
)
var (
ErrDied = errors.New("died")
ErrUnhealthy = errors.New("went unhealthy")
)
// Watcher is a helper to listen on docker API events to notify if a container
// is healthy or not.
type Watcher struct {
client *Client
errorChannels map[string]chan<- error
doneChannels map[string]chan<- struct{}
errorMapper map[string]ErrorMapper
mutex *sync.RWMutex
lastError error
cancelFunc context.CancelFunc
}
func (watcher *Watcher) AddListenerWithErrorMapper(containerID string, errorChannel chan<- error, doneChannel chan<- struct{}, errorMapper ErrorMapper) error {
watcher.mutex.Lock()
defer watcher.mutex.Unlock()
if watcher.lastError != nil {
return watcher.lastError
}
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(20*time.Second))
defer cancel()
containerJSON, err := watcher.client.ContainerInspect(ctx, containerID)
if err != nil {
return fmt.Errorf("Unable to check if container is already unhealthy: %w", err)
}
if containerJSON.State.Dead || !containerJSON.State.Running {
go func() {
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", containerID, ErrDied))
doneChannel <- struct{}{}
}()
return nil
}
if containerJSON.State.Health == nil {
go func() {
doneChannel <- struct{}{}
}()
return nil
}
switch containerJSON.State.Health.Status {
case "starting":
watcher.errorChannels[containerID] = errorChannel
watcher.doneChannels[containerID] = doneChannel
watcher.errorMapper[containerID] = errorMapper
case "healthy":
go func() {
doneChannel <- struct{}{}
}()
case "unhealthy":
go func() {
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", containerID, ErrUnhealthy))
doneChannel <- struct{}{}
}()
default:
go func() {
errorChannel <- errorMapper(fmt.Errorf("Container %v went in an unknown state during startup: %s", containerID, containerJSON.State.Health.Status))
doneChannel <- struct{}{}
}()
}
return nil
}
func (watcher *Watcher) AddListener(containerID string, errorChannel chan<- error, doneChannel chan<- struct{}) error {
return watcher.AddListenerWithErrorMapper(containerID, errorChannel, doneChannel, defaultErrorMapper)
}
func (watcher *Watcher) removeListener(containerID string) {
watcher.mutex.Lock()
defer watcher.mutex.Unlock()
delete(watcher.doneChannels, containerID)
delete(watcher.errorChannels, containerID)
delete(watcher.errorMapper, containerID)
}
func (watcher *Watcher) start() {
ctx, cancel := context.WithCancel(context.Background())
watcher.cancelFunc = cancel
dockerEvents, errors := watcher.client.Events(ctx, types.EventsOptions{})
sendErrorFunc := func() {
watcher.mutex.Lock()
for containerID, errorChannel := range watcher.errorChannels {
go func(errorChannel chan<- error, doneChannel chan<- struct{}, err error, errorMapper ErrorMapper) {
errorChannel <- errorMapper(err)
doneChannel <- struct{}{}
}(errorChannel, watcher.doneChannels[containerID], watcher.lastError, watcher.errorMapper[containerID])
}
watcher.mutex.Unlock()
}
go func() {
for {
select {
case event := <-dockerEvents:
watcher.mutex.RLock()
errorChannel, present := watcher.errorChannels[event.Actor.ID]
if !present || event.Type != events.ContainerEventType {
continue
}
doneChannel := watcher.doneChannels[event.Actor.ID]
errorMapper := watcher.errorMapper[event.Actor.ID]
watcher.mutex.RUnlock()
switch event.Action {
case "health_status: healthy":
go func() {
doneChannel <- struct{}{}
}()
watcher.removeListener(event.Actor.ID)
case "health_status: unhealthy":
go func() {
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", event.Actor.ID, ErrUnhealthy))
doneChannel <- struct{}{}
}()
watcher.removeListener(event.Actor.ID)
case "die":
go func() {
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", event.Actor.ID, ErrDied))
doneChannel <- struct{}{}
}()
watcher.removeListener(event.Actor.ID)
}
case err := <-errors:
watcher.lastError = err
sendErrorFunc()
return
case <-ctx.Done():
watcher.lastError = fmt.Errorf("Watcher was canceled")
sendErrorFunc()
return
}
}
}()
}
func (watcher *Watcher) stop() {
watcher.cancelFunc()
}
type ErrorMapper func(error) error
func defaultErrorMapper(err error) error {
return err
}