Skip to content

Commit

Permalink
convertor: support multi-arch images
Browse files Browse the repository at this point in the history
Signed-off-by: zhuangbowei.zbw <[email protected]>
  • Loading branch information
WaberZhuang committed Mar 11, 2024
1 parent f4c734c commit db0aa5a
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 105 deletions.
274 changes: 228 additions & 46 deletions cmd/convertor/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,33 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"os"
"path/filepath"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/containerd/accelerated-container-image/cmd/convertor/database"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/reference"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/opencontainers/go-digest"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

type Builder interface {
Build(ctx context.Context) error
}

type BuilderOptions struct {
Ref string
TargetRef string
Expand All @@ -57,18 +62,214 @@ type BuilderOptions struct {
Reserve bool
NoUpload bool
DumpManifest bool

// ConcurrencyLimit limits the number of manifests that can be built at once
// 0 means no limit
ConcurrencyLimit int
}

type overlaybdBuilder struct {
layers int
config v1.Image
engine builderEngine
type graphBuilder struct {
// required
Resolver remotes.Resolver

// options
BuilderOptions

// private
fetcher remotes.Fetcher
pusher remotes.Pusher
tagPusher remotes.Pusher
group *errgroup.Group
sem chan struct{}
id atomic.Int32
}

func (b *graphBuilder) Build(ctx context.Context) error {
fetcher, err := b.Resolver.Fetcher(ctx, b.Ref)
if err != nil {
return fmt.Errorf("failed to obtain new fetcher: %w", err)
}
pusher, err := b.Resolver.Pusher(ctx, b.TargetRef+"@") // append '@' to avoid tag
if err != nil {
return fmt.Errorf("failed to obtain new pusher: %w", err)
}
tagPusher, err := b.Resolver.Pusher(ctx, b.TargetRef) // append '@' to avoid tag
if err != nil {
return fmt.Errorf("failed to obtain new tag pusher: %w", err)
}
b.fetcher = fetcher
b.pusher = pusher
b.tagPusher = tagPusher
_, src, err := b.Resolver.Resolve(ctx, b.Ref)
if err != nil {
return fmt.Errorf("failed to resolve: %w", err)
}

g, gctx := errgroup.WithContext(ctx)
b.group = g
if b.ConcurrencyLimit > 0 {
b.sem = make(chan struct{}, b.ConcurrencyLimit)
}
g.Go(func() error {
target, err := b.process(gctx, src, true)
if err != nil {
return fmt.Errorf("failed to build %q: %w", src.Digest, err)
}
log.G(gctx).Infof("converted to %q, digest: %q", b.TargetRef, target.Digest)
return nil
})
return g.Wait()
}

func NewOverlayBDBuilder(ctx context.Context, opt BuilderOptions) (Builder, error) {
func (b *graphBuilder) process(ctx context.Context, src v1.Descriptor, tag bool) (v1.Descriptor, error) {
switch src.MediaType {
case v1.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest:
return b.buildOne(ctx, src, tag)
case v1.MediaTypeImageIndex, images.MediaTypeDockerSchema2ManifestList:
var index v1.Index
rc, err := b.fetcher.Fetch(ctx, src)
if err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to fetch index: %w", err)
}
defer rc.Close()
indexBytes, err := io.ReadAll(rc)
if err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to read index: %w", err)
}
if err := json.Unmarshal(indexBytes, &index); err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to unmarshal index: %w", err)
}
var wg sync.WaitGroup
for _i, _m := range index.Manifests {
i := _i
m := _m
wg.Add(1)
b.group.Go(func() error {
defer wg.Done()
target, err := b.process(ctx, m, false)
if err != nil {
return fmt.Errorf("failed to build %q: %w", m.Digest, err)
}
index.Manifests[i] = target
return nil
})
}
wg.Wait()
if ctx.Err() != nil {
return v1.Descriptor{}, ctx.Err()
}

// upload index
indexBytes, err = json.Marshal(index)
if err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to marshal index: %w", err)
}
expected := src
expected.Digest = digest.FromBytes(indexBytes)
expected.Size = int64(len(indexBytes))
var pusher remotes.Pusher
if tag {
pusher = b.tagPusher
} else {
pusher = b.pusher
}
if err := uploadBytes(ctx, pusher, expected, indexBytes); err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to upload index: %w", err)
}
return expected, nil
default:
return v1.Descriptor{}, fmt.Errorf("unsupported media type %q", src.MediaType)
}
}

func (b *graphBuilder) buildOne(ctx context.Context, src v1.Descriptor, tag bool) (v1.Descriptor, error) {
if b.sem != nil {
select {
case <-ctx.Done():
return v1.Descriptor{}, ctx.Err()
case b.sem <- struct{}{}:
}
}
defer func() {
if b.sem != nil {
select {
case <-ctx.Done():
case <-b.sem:
}
}
}()
id := b.id.Add(1)

var platform string
if src.Platform == nil {
platform = ""
} else {
platform = platforms.Format(*src.Platform)
ctx = log.WithLogger(ctx, log.G(ctx).WithField("platform", platform))
}
workdir := filepath.Join(b.WorkDir, fmt.Sprintf("%d-%s-%s", id, strings.ReplaceAll(platform, "/", "_"), src.Digest.Encoded()))
log.G(ctx).Infof("building %s ...", workdir)

// init build engine
manifest, config, err := fetchManifestAndConfig(ctx, b.fetcher, src)
if err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to fetch manifest and config: %w", err)
}
var pusher remotes.Pusher
if tag {
pusher = b.tagPusher
} else {
pusher = b.pusher
}
engineBase := &builderEngineBase{
resolver: b.Resolver,
fetcher: b.fetcher,
pusher: pusher,
manifest: *manifest,
config: *config,
inputDesc: src,
}
engineBase.workDir = workdir
engineBase.oci = b.OCI
engineBase.mkfs = b.Mkfs
engineBase.vsize = b.Vsize
engineBase.db = b.DB
refspec, err := reference.Parse(b.Ref)
if err != nil {
return v1.Descriptor{}, err
}
engineBase.host = refspec.Hostname()
engineBase.repository = strings.TrimPrefix(refspec.Locator, engineBase.host+"/")
engineBase.reserve = b.Reserve
engineBase.noUpload = b.NoUpload
engineBase.dumpManifest = b.DumpManifest

var engine builderEngine
switch b.Engine {
case Overlaybd:
engine = NewOverlayBDBuilderEngine(engineBase)
case TurboOCI:
engine = NewTurboOCIBuilderEngine(engineBase)
}

// build
builder := &overlaybdBuilder{
layers: len(engineBase.manifest.Layers),
engine: engine,
}
desc, err := builder.Build(ctx)
if err != nil {
return v1.Descriptor{}, fmt.Errorf("failed to build %s: %w", workdir, err)
}
src.Digest = desc.Digest
src.Size = desc.Size
return src, nil
}

func Build(ctx context.Context, opt BuilderOptions) error {
tlsConfig, err := loadTLSConfig(opt.CertOption)
if err != nil {
return nil, fmt.Errorf("failed to load certifications: %w", err)
return fmt.Errorf("failed to load certifications: %w", err)
}
transport := &http.Transport{
DialContext: (&net.Dialer{
Expand Down Expand Up @@ -106,41 +307,21 @@ func NewOverlayBDBuilder(ctx context.Context, opt BuilderOptions) (Builder, erro
}),
),
})
engineBase, err := getBuilderEngineBase(ctx, resolver, opt.Ref, opt.TargetRef)
if err != nil {
return nil, err
}
engineBase.workDir = opt.WorkDir
engineBase.oci = opt.OCI
engineBase.mkfs = opt.Mkfs
engineBase.vsize = opt.Vsize
engineBase.db = opt.DB

refspec, err := reference.Parse(opt.Ref)
if err != nil {
return nil, err
}
engineBase.host = refspec.Hostname()
engineBase.repository = strings.TrimPrefix(refspec.Locator, engineBase.host+"/")
engineBase.reserve = opt.Reserve
engineBase.noUpload = opt.NoUpload
engineBase.dumpManifest = opt.DumpManifest
return (&graphBuilder{
Resolver: resolver,
BuilderOptions: opt,
}).Build(ctx)
}

var engine builderEngine
switch opt.Engine {
case Overlaybd:
engine = NewOverlayBDBuilderEngine(engineBase)
case TurboOCI:
engine = NewTurboOCIBuilderEngine(engineBase)
}
return &overlaybdBuilder{
layers: len(engineBase.manifest.Layers),
engine: engine,
config: engineBase.config,
}, nil
type overlaybdBuilder struct {
layers int
engine builderEngine
}

func (b *overlaybdBuilder) Build(ctx context.Context) error {
// Build return a descriptor of the converted target, as the caller may need it
// to tag or compose an index
func (b *overlaybdBuilder) Build(ctx context.Context) (v1.Descriptor, error) {
defer b.engine.Cleanup()
alreadyConverted := make([]chan *v1.Descriptor, b.layers)
downloaded := make([]chan error, b.layers)
Expand All @@ -150,7 +331,7 @@ func (b *overlaybdBuilder) Build(ctx context.Context) error {
// when errors are encountered fallback to regular conversion
if convertedDesc, err := b.engine.CheckForConvertedManifest(ctx); err == nil && convertedDesc.Digest != "" {
logrus.Infof("Image found already converted in registry with digest %s", convertedDesc.Digest)
return nil
return convertedDesc, nil
}

// Errgroups will close the context after wait returns so the operations need their own
Expand Down Expand Up @@ -244,15 +425,16 @@ func (b *overlaybdBuilder) Build(ctx context.Context) error {
})
}
if err := g.Wait(); err != nil {
return err
return v1.Descriptor{}, err
}

if err := b.engine.UploadImage(ctx); err != nil {
return errors.Wrap(err, "failed to upload manifest or config")
targetDesc, err := b.engine.UploadImage(ctx)
if err != nil {
return v1.Descriptor{}, errors.Wrap(err, "failed to upload manifest or config")
}
b.engine.StoreConvertedManifestDetails(ctx)
logrus.Info("convert finished")
return nil
return targetDesc, nil
}

// block until ctx.Done() or sent
Expand Down
Loading

0 comments on commit db0aa5a

Please sign in to comment.