Skip to content

Commit

Permalink
chore: robustly ensure containers are stopped (#5489)
Browse files Browse the repository at this point in the history
Uses `docker ps` to ensure that the containers are stopped, instead of waiting 1 second and assuming that's enough time to stop them.



By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the Apache 2.0 License.
  • Loading branch information
CaptainCarpensir authored Dec 4, 2023
1 parent ab6e1da commit dae144e
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 12 deletions.
29 changes: 23 additions & 6 deletions internal/pkg/docker/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,6 @@ func (a *runTaskAction) Do(o *Orchestrator) error {
if err := o.stopTask(ctx, o.curTask); err != nil {
return fmt.Errorf("stop existing task: %w", err)
}

// ensure that containers are fully stopped after o.stopTask finishes blocking
// TODO(Aiden): Implement a container ID system or use `docker ps` to ensure containers are stopped
time.Sleep(1 * time.Second)
}

for name, ctr := range a.task.Containers {
Expand Down Expand Up @@ -399,8 +395,29 @@ func (o *Orchestrator) stopTask(ctx context.Context, task Task) error {
errCh <- fmt.Errorf("stop %q: %w", name, err)
return
}
fmt.Printf("Stopped %q\n", name)
errCh <- nil

// ensure that container is fully stopped before stopTask finishes blocking
for {
running, err := o.docker.IsContainerRunning(ctx, o.containerID(name))
if err != nil {
errCh <- fmt.Errorf("polling container %q for removal: %w", name, err)
return
}

if running {
select {
case <-time.After(1 * time.Second):
continue
case <-ctx.Done():
errCh <- fmt.Errorf("check container %q stopped: %w", name, ctx.Err())
return
}
}

fmt.Printf("Stopped %q\n", name)
errCh <- nil
return
}
}()
}

Expand Down
59 changes: 53 additions & 6 deletions internal/pkg/docker/orchestrator/orchestrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ func TestOrchestrator(t *testing.T) {
test: func(t *testing.T) (test, *dockerenginetest.Double) {
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
StopFn: func(ctx context.Context, name string) error {
if name == "prefix-success" {
Expand All @@ -111,13 +114,45 @@ func TestOrchestrator(t *testing.T) {
`stop "bar": some error`,
},
},
"error polling tasks removed": {
logOptions: noLogs,
runUntilStopped: true,
test: func(t *testing.T) (test, *dockerenginetest.Double) {
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
if name == "prefix-pause" {
return true, nil
}
return false, errors.New("some error")
},
StopFn: func(ctx context.Context, name string) error {
return nil
},
}
return func(t *testing.T, o *Orchestrator) {
o.RunTask(Task{
Containers: map[string]ContainerDefinition{
"foo": {},
"bar": {},
},
})
}, de
},
errs: []string{
`polling container "foo" for removal: some error`,
`polling container "bar" for removal: some error`,
},
},
"error restarting new task due to pause changes": {
logOptions: noLogs,
runUntilStopped: true,
test: func(t *testing.T) (test, *dockerenginetest.Double) {
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
}
return func(t *testing.T, o *Orchestrator) {
Expand Down Expand Up @@ -151,7 +186,10 @@ func TestOrchestrator(t *testing.T) {
test: func(t *testing.T) (test, *dockerenginetest.Double) {
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error {
// validate pause container has correct ports and secrets
Expand Down Expand Up @@ -197,7 +235,10 @@ func TestOrchestrator(t *testing.T) {
stopPause := make(chan struct{})
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error {
if opts.ContainerName == "prefix-foo" {
Expand Down Expand Up @@ -232,7 +273,10 @@ func TestOrchestrator(t *testing.T) {
stopPause := make(chan struct{})
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
RunFn: func(ctx context.Context, opts *dockerengine.RunOptions) error {
if opts.ContainerName == "prefix-foo" {
Expand Down Expand Up @@ -377,7 +421,10 @@ func TestOrchestrator(t *testing.T) {
test: func(t *testing.T) (test, *dockerenginetest.Double) {
de := &dockerenginetest.Double{
IsContainerRunningFn: func(ctx context.Context, name string) (bool, error) {
return true, nil
if name == "prefix-pause" {
return true, nil
}
return false, nil
},
ExecFn: func(ctx context.Context, ctr string, w io.Writer, cmd string, args ...string) error {
if cmd == "aws" {
Expand Down

0 comments on commit dae144e

Please sign in to comment.