package dockerutils import ( "context" "errors" "fmt" "sync" "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/events" ) var ( ErrDied = errors.New("died") ErrUnhealthy = errors.New("went unhealthy") ) // Watcher is a helper to listen on docker API events to notify if a container // is healthy or not. type Watcher struct { client *Client errorChannels map[string]chan<- error doneChannels map[string]chan<- struct{} errorMapper map[string]ErrorMapper mutex *sync.RWMutex lastError error cancelFunc context.CancelFunc } func (watcher *Watcher) AddListenerWithErrorMapper(containerID string, errorChannel chan<- error, doneChannel chan<- struct{}, errorMapper ErrorMapper) error { watcher.mutex.Lock() defer watcher.mutex.Unlock() if watcher.lastError != nil { return watcher.lastError } ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(20*time.Second)) defer cancel() containerJSON, err := watcher.client.ContainerInspect(ctx, containerID) if err != nil { return fmt.Errorf("Unable to check if container is already unhealthy: %w", err) } if containerJSON.State.Dead || !containerJSON.State.Running { go func() { errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", containerID, ErrDied)) doneChannel <- struct{}{} }() return nil } if containerJSON.State.Health == nil { go func() { doneChannel <- struct{}{} }() return nil } switch containerJSON.State.Health.Status { case "starting": watcher.errorChannels[containerID] = errorChannel watcher.doneChannels[containerID] = doneChannel watcher.errorMapper[containerID] = errorMapper case "healthy": go func() { doneChannel <- struct{}{} }() case "unhealthy": go func() { errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", containerID, ErrUnhealthy)) doneChannel <- struct{}{} }() default: go func() { errorChannel <- errorMapper(fmt.Errorf("Container %v went in an unknown state during startup: %s", containerID, containerJSON.State.Health.Status)) doneChannel <- struct{}{} }() } return nil } func (watcher *Watcher) AddListener(containerID string, errorChannel chan<- error, doneChannel chan<- struct{}) error { return watcher.AddListenerWithErrorMapper(containerID, errorChannel, doneChannel, defaultErrorMapper) } func (watcher *Watcher) removeListener(containerID string) { watcher.mutex.Lock() defer watcher.mutex.Unlock() delete(watcher.doneChannels, containerID) delete(watcher.errorChannels, containerID) delete(watcher.errorMapper, containerID) } func (watcher *Watcher) start() { ctx, cancel := context.WithCancel(context.Background()) watcher.cancelFunc = cancel dockerEvents, errors := watcher.client.Events(ctx, types.EventsOptions{}) sendErrorFunc := func() { watcher.mutex.Lock() for containerID, errorChannel := range watcher.errorChannels { go func(errorChannel chan<- error, doneChannel chan<- struct{}, err error, errorMapper ErrorMapper) { errorChannel <- errorMapper(err) doneChannel <- struct{}{} }(errorChannel, watcher.doneChannels[containerID], watcher.lastError, watcher.errorMapper[containerID]) } watcher.mutex.Unlock() } go func() { for { select { case event := <-dockerEvents: watcher.mutex.RLock() errorChannel, present := watcher.errorChannels[event.Actor.ID] if !present || event.Type != events.ContainerEventType { continue } doneChannel := watcher.doneChannels[event.Actor.ID] errorMapper := watcher.errorMapper[event.Actor.ID] watcher.mutex.RUnlock() switch event.Action { case "health_status: healthy": go func() { doneChannel <- struct{}{} }() watcher.removeListener(event.Actor.ID) case "health_status: unhealthy": go func() { errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", event.Actor.ID, ErrUnhealthy)) doneChannel <- struct{}{} }() watcher.removeListener(event.Actor.ID) case "die": go func() { errorChannel <- errorMapper(fmt.Errorf("Container %v: %w", event.Actor.ID, ErrDied)) doneChannel <- struct{}{} }() watcher.removeListener(event.Actor.ID) } case err := <-errors: watcher.lastError = err sendErrorFunc() return case <-ctx.Done(): watcher.lastError = fmt.Errorf("Watcher was canceled") sendErrorFunc() return } } }() } func (watcher *Watcher) stop() { watcher.cancelFunc() } type ErrorMapper func(error) error func defaultErrorMapper(err error) error { return err }