165 lines
4.4 KiB
Go
165 lines
4.4 KiB
Go
|
package dockerutils
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/docker/docker/api/types"
|
||
|
"github.com/docker/docker/api/types/events"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
ErrDied = errors.New("died")
|
||
|
ErrUnhealthy = errors.New("went unhealthy")
|
||
|
)
|
||
|
|
||
|
type Watcher struct {
|
||
|
client *Client
|
||
|
errorChannels map[string]chan<- error
|
||
|
doneChannels map[string]chan<- struct{}
|
||
|
errorMapper map[string]ErrorMapper
|
||
|
mutex *sync.RWMutex
|
||
|
lastError error
|
||
|
cancelFunc context.CancelFunc
|
||
|
}
|
||
|
|
||
|
func (watcher *Watcher) AddListenerWithErrorMapper(containerID string, errorChannel chan<- error, doneChannel chan<- struct{}, errorMapper ErrorMapper) error {
|
||
|
watcher.mutex.Lock()
|
||
|
defer watcher.mutex.Unlock()
|
||
|
|
||
|
if watcher.lastError != nil {
|
||
|
return watcher.lastError
|
||
|
}
|
||
|
|
||
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(20*time.Second))
|
||
|
defer cancel()
|
||
|
|
||
|
containerJSON, err := watcher.client.ContainerInspect(ctx, containerID)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("Unable to check if container is already unhealthy: %v", err)
|
||
|
}
|
||
|
|
||
|
if containerJSON.State.Dead || !containerJSON.State.Running {
|
||
|
go func() {
|
||
|
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", containerID, ErrDied))
|
||
|
doneChannel <- struct{}{}
|
||
|
}()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
if containerJSON.State.Health == nil {
|
||
|
go func() {
|
||
|
doneChannel <- struct{}{}
|
||
|
}()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
switch containerJSON.State.Health.Status {
|
||
|
case "starting":
|
||
|
watcher.errorChannels[containerID] = errorChannel
|
||
|
watcher.doneChannels[containerID] = doneChannel
|
||
|
watcher.errorMapper[containerID] = errorMapper
|
||
|
case "healthy":
|
||
|
go func() {
|
||
|
doneChannel <- struct{}{}
|
||
|
}()
|
||
|
case "unhealthy":
|
||
|
go func() {
|
||
|
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", containerID, ErrUnhealthy))
|
||
|
doneChannel <- struct{}{}
|
||
|
}()
|
||
|
default:
|
||
|
go func() {
|
||
|
errorChannel <- errorMapper(fmt.Errorf("Container %v went in an unknown state during startup: %s", containerID, containerJSON.State.Health.Status))
|
||
|
doneChannel <- struct{}{}
|
||
|
}()
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (watcher *Watcher) AddListener(containerID string, errorChannel chan<- error, doneChannel chan<- struct{}) error {
|
||
|
return watcher.AddListenerWithErrorMapper(containerID, errorChannel, doneChannel, defaultErrorMapper)
|
||
|
}
|
||
|
|
||
|
func (watcher *Watcher) removeListener(containerID string) {
|
||
|
watcher.mutex.Lock()
|
||
|
defer watcher.mutex.Unlock()
|
||
|
delete(watcher.doneChannels, containerID)
|
||
|
delete(watcher.errorChannels, containerID)
|
||
|
delete(watcher.errorMapper, containerID)
|
||
|
}
|
||
|
|
||
|
func (watcher *Watcher) start() {
|
||
|
ctx, cancel := context.WithCancel(context.Background())
|
||
|
watcher.cancelFunc = cancel
|
||
|
dockerEvents, errors := watcher.client.Events(ctx, types.EventsOptions{})
|
||
|
|
||
|
sendErrorFunc := func() {
|
||
|
watcher.mutex.Lock()
|
||
|
for containerID, errorChannel := range watcher.errorChannels {
|
||
|
go func(errorChannel chan<- error, doneChannel chan<- struct{}, err error, errorMapper ErrorMapper) {
|
||
|
errorChannel <- errorMapper(err)
|
||
|
doneChannel <- struct{}{}
|
||
|
}(errorChannel, watcher.doneChannels[containerID], watcher.lastError, watcher.errorMapper[containerID])
|
||
|
}
|
||
|
watcher.mutex.Unlock()
|
||
|
}
|
||
|
|
||
|
go func() {
|
||
|
for {
|
||
|
select {
|
||
|
case event := <-dockerEvents:
|
||
|
watcher.mutex.RLock()
|
||
|
errorChannel, present := watcher.errorChannels[event.Actor.ID]
|
||
|
if !present || event.Type != events.ContainerEventType {
|
||
|
continue
|
||
|
}
|
||
|
doneChannel := watcher.doneChannels[event.Actor.ID]
|
||
|
errorMapper := watcher.errorMapper[event.Actor.ID]
|
||
|
watcher.mutex.RUnlock()
|
||
|
|
||
|
switch event.Action {
|
||
|
case "health_status: healthy":
|
||
|
go func() {
|
||
|
doneChannel <- struct{}{}
|
||
|
}()
|
||
|
watcher.removeListener(event.Actor.ID)
|
||
|
case "health_status: unhealthy":
|
||
|
go func() {
|
||
|
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", event.Actor.ID, ErrUnhealthy))
|
||
|
doneChannel <- struct{}{}
|
||
|
}()
|
||
|
watcher.removeListener(event.Actor.ID)
|
||
|
case "die":
|
||
|
go func() {
|
||
|
errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", event.Actor.ID, ErrDied))
|
||
|
doneChannel <- struct{}{}
|
||
|
}()
|
||
|
watcher.removeListener(event.Actor.ID)
|
||
|
}
|
||
|
case err := <-errors:
|
||
|
watcher.lastError = err
|
||
|
sendErrorFunc()
|
||
|
return
|
||
|
case <-ctx.Done():
|
||
|
watcher.lastError = fmt.Errorf("Watcher was canceled")
|
||
|
sendErrorFunc()
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
func (watcher *Watcher) stop() {
|
||
|
watcher.cancelFunc()
|
||
|
}
|
||
|
|
||
|
type ErrorMapper func(error) error
|
||
|
|
||
|
func defaultErrorMapper(err error) error {
|
||
|
return err
|
||
|
}
|