Skip to content

Commit

Permalink
Use Docker API to pull images (#892)
Browse files Browse the repository at this point in the history
Replace a shell command with a call to the Docker API to pull images for
the Docker executor.

Also remove unused code as well as rename `SetupAllSuite` functions to
the testify `SetupSuite` function, so that testify will call them when
the suite is initially created.

Fixes #283
  • Loading branch information
wjam authored Oct 14, 2022
1 parent 96aa5e2 commit 17d00d9
Show file tree
Hide file tree
Showing 21 changed files with 124 additions and 401 deletions.
2 changes: 1 addition & 1 deletion cmd/bacalhau/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *CreateSuite) TearDownTest() {

}

func (s *CreateSuite) TearDownAlls() {
func (s *CreateSuite) TearDownSuite() {

}

Expand Down
102 changes: 1 addition & 101 deletions pkg/devstack/lotus.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,19 @@ package devstack

import (
"archive/tar"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"sort"
"strings"
"sync"
"time"

dockertypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/mount"
dockerclient "github.com/docker/docker/client"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/go-connections/nat"
"github.com/filecoin-project/bacalhau/pkg/docker"
"github.com/filecoin-project/bacalhau/pkg/system"
"github.com/filecoin-project/bacalhau/pkg/util/closer"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -55,7 +48,7 @@ func newLotusNode(ctx context.Context) (*LotusNode, error) {
return nil, err
}

if err := pullImage(ctx, dockerClient, image); err != nil {
if err := docker.PullImage(ctx, dockerClient, image); err != nil {
closer.CloseWithLogOnError("docker", dockerClient)
return nil, err
}
Expand Down Expand Up @@ -192,96 +185,3 @@ func (l *LotusNode) Close() error {

return nil
}

func pullImage(ctx context.Context, client dockerclient.ImageAPIClient, image string) error {
_, _, err := client.ImageInspectWithRaw(ctx, image)
if err == nil {
return nil
}
if !dockerclient.IsErrNotFound(err) {
return err
}

log.Debug().Str("image", image).Msg("Pulling image as it wasn't found")

output, err := client.ImagePull(ctx, image, dockertypes.ImagePullOptions{})
if err != nil {
return err
}

defer closer.CloseWithLogOnError("image-pull", output)

stop := make(chan struct{}, 1)
defer func() {
stop <- struct{}{}
}()
t := time.NewTicker(3 * time.Second)
defer t.Stop()

layers := &sync.Map{}
go func() {
for {
select {
case <-stop:
return
case <-t.C:
logImagePullStatus(layers)
}
}
}()

dec := json.NewDecoder(output)
for {
var mess jsonmessage.JSONMessage
if err := dec.Decode(&mess); err != nil {
if err == io.EOF {
return nil
}
return err
}
if mess.Aux != nil {
continue
}
if mess.Error != nil {
return mess.Error
}
layers.Store(mess.ID, mess)
}
}

func logImagePullStatus(m *sync.Map) {
withUnits := map[string]*zerolog.Event{}
withoutUnits := map[string][]string{}
m.Range(func(_, value any) bool {
mess := value.(jsonmessage.JSONMessage)

if mess.Progress == nil || mess.Progress.Current <= 0 {
withoutUnits[mess.Status] = append(withoutUnits[mess.Status], mess.ID)
} else {
var status string
if mess.Progress.Total <= 0 {
status = fmt.Sprintf("%d %s", mess.Progress.Total, mess.Progress.Units)
} else {
status = fmt.Sprintf("%.3f%%", float64(mess.Progress.Current)/float64(mess.Progress.Total)*100) //nolint:gomnd
}

if _, ok := withUnits[mess.Status]; !ok {
withUnits[mess.Status] = zerolog.Dict()
}

withUnits[mess.Status].Str(mess.ID, status)
}

return true
})
e := log.Debug()
for s, l := range withUnits {
e = e.Dict(s, l)
}
for s, l := range withoutUnits {
sort.Strings(l)
e = e.Strs(s, l)
}

e.Msg("Pulling layers")
}
117 changes: 87 additions & 30 deletions pkg/docker/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@ package docker
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"sort"
"strings"
"sync"
"time"

"github.com/docker/docker/api/types"
dockerclient "github.com/docker/docker/client"
"github.com/filecoin-project/bacalhau/pkg/config"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/filecoin-project/bacalhau/pkg/system"
"github.com/filecoin-project/bacalhau/pkg/util/closer"
"github.com/moby/moby/pkg/stdcopy"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -141,25 +145,6 @@ func RemoveContainer(ctx context.Context, dockerClient *dockerclient.Client, nam
return nil
}

func WaitForContainer(ctx context.Context, client *dockerclient.Client, id string, maxAttempts int, delay time.Duration) error {
waiter := &system.FunctionWaiter{
Name: fmt.Sprintf("wait for container to be running: %s", id),
MaxAttempts: maxAttempts,
Delay: delay,
Handler: func() (bool, error) {
container, err := GetContainer(ctx, client, id)
if err != nil {
return false, err
}
if container == nil {
return false, nil
}
return container.State == "running", nil
},
}
return waiter.Wait()
}

func WaitForContainerLogs(ctx context.Context,
client *dockerclient.Client,
id string,
Expand Down Expand Up @@ -195,22 +180,94 @@ func WaitForContainerLogs(ctx context.Context,
}

func PullImage(ctx context.Context, dockerClient *dockerclient.Client, image string) error {
imagePullStream, err := dockerClient.ImagePull(
ctx,
image,
types.ImagePullOptions{},
)
_, _, err := dockerClient.ImageInspectWithRaw(ctx, image)
if err == nil {
return nil
}
if !dockerclient.IsErrNotFound(err) {
return err
}

log.Debug().Str("image", image).Msg("Pulling image as it wasn't found")

output, err := dockerClient.ImagePull(ctx, image, types.ImagePullOptions{})
if err != nil {
return err
}

if config.IsDebug() {
_, err = io.Copy(os.Stdout, imagePullStream)
if err != nil {
defer closer.CloseWithLogOnError("image-pull", output)

stop := make(chan struct{}, 1)
defer func() {
stop <- struct{}{}
}()
t := time.NewTicker(3 * time.Second)
defer t.Stop()

layers := &sync.Map{}
go func() {
for {
select {
case <-stop:
return
case <-t.C:
logImagePullStatus(layers)
}
}
}()

dec := json.NewDecoder(output)
for {
var mess jsonmessage.JSONMessage
if err := dec.Decode(&mess); err != nil {
if err == io.EOF {
return nil
}
return err
}
if mess.Aux != nil {
continue
}
if mess.Error != nil {
return mess.Error
}
layers.Store(mess.ID, mess)
}
}

func logImagePullStatus(m *sync.Map) {
withUnits := map[string]*zerolog.Event{}
withoutUnits := map[string][]string{}
m.Range(func(_, value any) bool {
mess := value.(jsonmessage.JSONMessage)

if mess.Progress == nil || mess.Progress.Current <= 0 {
withoutUnits[mess.Status] = append(withoutUnits[mess.Status], mess.ID)
} else {
var status string
if mess.Progress.Total <= 0 {
status = fmt.Sprintf("%d %s", mess.Progress.Total, mess.Progress.Units)
} else {
status = fmt.Sprintf("%.3f%%", float64(mess.Progress.Current)/float64(mess.Progress.Total)*100) //nolint:gomnd
}

if _, ok := withUnits[mess.Status]; !ok {
withUnits[mess.Status] = zerolog.Dict()
}

withUnits[mess.Status].Str(mess.ID, status)
}

return true
})
e := log.Debug()
for s, l := range withUnits {
e = e.Dict(s, l)
}
for s, l := range withoutUnits {
sort.Strings(l)
e = e.Strs(s, l)
}

return imagePullStream.Close()
e.Msg("Pulling layers")
}
34 changes: 7 additions & 27 deletions pkg/executor/docker/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (e *Executor) RunShard(

// the actual mounts we will give to the container
// these are paths for both input and output data
mounts := []mount.Mount{}
var mounts []mount.Mount

var err error

Expand Down Expand Up @@ -213,28 +213,10 @@ func (e *Executor) RunShard(
}

if os.Getenv("SKIP_IMAGE_PULL") == "" {
// TODO: #283 work out why this does not work in github actions
// err = docker.PullImage(e.Client, job.Spec.Vm.Image)
var im dockertypes.ImageInspect
im, _, err = e.Client.ImageInspectWithRaw(ctx, shard.Job.Spec.Docker.Image)
if err == nil {
log.Ctx(ctx).Debug().Msgf("Not pulling image %s, already have %s", shard.Job.Spec.Docker.Image, im.ID)
} else if dockerclient.IsErrNotFound(err) {
log.Ctx(ctx).Debug().Msgf("Pulling image %s", shard.Job.Spec.Docker.Image)

r, err := system.UnsafeForUserCodeRunCommand( //nolint:govet // shadowing ok
"docker",
[]string{"pull", shard.Job.Spec.Docker.Image},
)
if err != nil {
//nolint:stylecheck // Error message for user
err = fmt.Errorf(`Could not pull image - could be due to repo/image not existing,
or registry needing authorization. %s: %s, %s`, shard.Job.Spec.Docker.Image, err, r.STDOUT)
return returnStdErrWithErr(err.Error(), err), err
}
log.Ctx(ctx).Trace().Msgf("Pull image output: %s\n%s", shard.Job.Spec.Docker.Image, r.STDOUT)
} else {
err = fmt.Errorf("error checking if we have %s locally: %s", shard.Job.Spec.Docker.Image, err)
if err := docker.PullImage(ctx, e.Client, shard.Job.Spec.Docker.Image); err != nil { //nolint:govet // ignore err shadowing
//nolint:stylecheck // Error message for user
err = fmt.Errorf(`Could not pull image - could be due to repo/image not existing,
or registry needing authorization. %s: %s`, shard.Job.Spec.Docker.Image, err)
return returnStdErrWithErr(err.Error(), err), err
}
}
Expand Down Expand Up @@ -416,11 +398,9 @@ func (e *Executor) cleanupAll(ctx context.Context) {
return
}
// TODO: #287 Fix if when we care about optimization of memory (224 bytes copied per loop)
//nolint:gocritic // will fix when we care
for _, container := range containersWithLabel {
err = docker.RemoveContainer(ctx, e.Client, container.ID)
if err != nil {
log.Ctx(ctx).Error().Msgf("Non-critical error cleaning up container: %s", err.Error())
if err := docker.RemoveContainer(ctx, e.Client, container.ID); err != nil { //nolint:govet // ignore err shadowing
log.Ctx(ctx).Err(err).Msgf("Non-critical error cleaning up container")
}
}
}
Expand Down
36 changes: 0 additions & 36 deletions pkg/executor/docker/executor_test.go

This file was deleted.

Loading

0 comments on commit 17d00d9

Please sign in to comment.