Skip to content

Commit

Permalink
Userspace Convertor: Manifest Deduplication
Browse files Browse the repository at this point in the history
Adds manifest deduplication, which prevents re-conversion of an already
converted image if its result is known and already stored within the
registry. This change also includes adjustments to support cross repo
mounts for whole images, improvments for the userspace db functionality,
usage samples and corresponding unit tests.

Signed-off-by: Esteban Rey <[email protected]>
  • Loading branch information
estebanreyl committed Feb 7, 2024
1 parent abd058d commit c7e6fee
Show file tree
Hide file tree
Showing 26 changed files with 707 additions and 123 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
bin/
.vscode
vendor/
tmp_conv/
tmp/
9 changes: 9 additions & 0 deletions cmd/convertor/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ func (b *overlaybdBuilder) Build(ctx context.Context) error {
alreadyConverted := make([]chan *v1.Descriptor, b.layers)
downloaded := make([]chan error, b.layers)
converted := make([]chan error, b.layers)

// check if manifest conversion result is already present in registry, if so, we can avoid conversion.
// when errors are encountered fallback to regular conversion
if convertedDesc, err := b.engine.CheckForConvertedManifest(ctx); err == nil && convertedDesc.Digest != "" {
logrus.Infof("Image found already converted in registry with digest %s", convertedDesc.Digest)
return nil
}

// Errgroups will close the context after wait returns so the operations need their own
// derived context.
g, rctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -231,6 +239,7 @@ func (b *overlaybdBuilder) Build(ctx context.Context) error {
if err := b.engine.UploadImage(ctx); err != nil {
return errors.Wrap(err, "failed to upload manifest or config")
}
b.engine.StoreConvertedManifestDetails(ctx)
logrus.Info("convert finished")
return nil
}
Expand Down
33 changes: 25 additions & 8 deletions cmd/convertor/builder/builder_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,21 +51,34 @@ type builderEngine interface {
// UploadImage upload new manifest and config
UploadImage(ctx context.Context) error

// Cleanup removes workdir
Cleanup()

Deduplicateable
}

// Deduplicateable provides a number of functions to avoid duplicating work when converting images
// It is used by the builderEngine to avoid re-converting layers and manifests
type Deduplicateable interface {
// deduplication functions
// finds already converted layer in db and validates presence in registry
CheckForConvertedLayer(ctx context.Context, idx int) (specs.Descriptor, error)

// downloads the already converted layer
DownloadConvertedLayer(ctx context.Context, idx int, desc specs.Descriptor) error

// store chainID -> converted layer mapping for deduplication
// store chainID -> converted layer mapping for layer deduplication
StoreConvertedLayerDetails(ctx context.Context, idx int) error

// Cleanup removes workdir
Cleanup()
// store manifest digest -> converted manifest to avoid re-conversion
CheckForConvertedManifest(ctx context.Context) (specs.Descriptor, error)

// store manifest digest -> converted manifest to avoid re-conversion
StoreConvertedManifestDetails(ctx context.Context) error
}

type builderEngineBase struct {
resolver remotes.Resolver
fetcher remotes.Fetcher
pusher remotes.Pusher
manifest specs.Manifest
Expand All @@ -77,6 +90,8 @@ type builderEngineBase struct {
db database.ConversionDatabase
host string
repository string
inputDesc specs.Descriptor // original manifest descriptor
outputDesc specs.Descriptor // converted manifest descriptor
reserve bool
noUpload bool
dumpManifest bool
Expand Down Expand Up @@ -172,6 +187,7 @@ func (e *builderEngineBase) uploadManifestAndConfig(ctx context.Context) error {
if err = uploadBytes(ctx, e.pusher, manifestDesc, cbuf); err != nil {
return errors.Wrapf(err, "failed to upload manifest")
}
e.outputDesc = manifestDesc
logrus.Infof("manifest uploaded")
}
if e.dumpManifest {
Expand All @@ -181,7 +197,6 @@ func (e *builderEngineBase) uploadManifestAndConfig(ctx context.Context) error {
}
logrus.Infof("manifest dumped")
}

return nil
}

Expand All @@ -203,9 +218,11 @@ func getBuilderEngineBase(ctx context.Context, resolver remotes.Resolver, ref, t
return nil, errors.Wrap(err, "failed to fetch manifest and config")
}
return &builderEngineBase{
fetcher: fetcher,
pusher: pusher,
manifest: *manifest,
config: *config,
resolver: resolver,
fetcher: fetcher,
pusher: pusher,
manifest: *manifest,
config: *config,
inputDesc: desc,
}, nil
}
14 changes: 14 additions & 0 deletions cmd/convertor/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ func (e *mockFuzzBuilderEngine) StoreConvertedLayerDetails(ctx context.Context,
return nil
}

func (e *mockFuzzBuilderEngine) CheckForConvertedManifest(ctx context.Context) (specs.Descriptor, error) {
if e.fixedRand.Float64() < failRate {
return specs.Descriptor{}, fmt.Errorf("random error on CheckForConvertedManifest")
}
return specs.Descriptor{}, nil
}

func (e *mockFuzzBuilderEngine) StoreConvertedManifestDetails(ctx context.Context) error {
if e.fixedRand.Float64() < failRate {
return fmt.Errorf("random error on StoreConvertedManifestDetails")
}
return nil
}

func (e *mockFuzzBuilderEngine) DownloadConvertedLayer(ctx context.Context, idx int, desc specs.Descriptor) error {
if e.fixedRand.Float64() < failRate {
return fmt.Errorf("random error on DownloadConvertedLayer")
Expand Down
7 changes: 1 addition & 6 deletions cmd/convertor/builder/builder_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,7 @@ func uploadBytes(ctx context.Context, pusher remotes.Pusher, desc specs.Descript
return err
}
defer cw.Close()

err = content.Copy(ctx, cw, bytes.NewReader(data), desc.Size, desc.Digest)
if err != nil {
return err
}
return nil
return content.Copy(ctx, cw, bytes.NewReader(data), desc.Size, desc.Digest)
}

func buildArchiveFromFiles(ctx context.Context, target string, compress compression.Compression, files ...string) error {
Expand Down
2 changes: 1 addition & 1 deletion cmd/convertor/builder/builder_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func Test_uploadBlob(t *testing.T) {
ctx := context.Background()
// Create a new inmemory registry to push to
reg := testingresources.GetTestRegistry(t, ctx, testingresources.RegistryOptions{
InmemoryOnly: true,
InmemoryRegistryOnly: true,
ManifestPushIgnoresLayers: false,
})

Expand Down
128 changes: 122 additions & 6 deletions cmd/convertor/builder/overlaybd_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"archive/tar"
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -180,8 +181,8 @@ func (e *overlaybdBuilderEngine) CheckForConvertedLayer(ctx context.Context, idx
}
chainID := e.overlaybdLayers[idx].chainID

// try to find in the same repo then check existence on registry
entry := e.db.GetEntryForRepo(ctx, e.host, e.repository, chainID)
// try to find the layer in the target repo
entry := e.db.GetLayerEntryForRepo(ctx, e.host, e.repository, chainID)
if entry != nil && entry.ChainID != "" {
desc := specs.Descriptor{
MediaType: e.mediaTypeImageLayer(),
Expand All @@ -197,15 +198,16 @@ func (e *overlaybdBuilderEngine) CheckForConvertedLayer(ctx context.Context, idx
}
if errdefs.IsNotFound(err) {
// invalid record in db, which is not found in registry, remove it
err := e.db.DeleteEntry(ctx, e.host, e.repository, chainID)
err := e.db.DeleteLayerEntry(ctx, e.host, e.repository, chainID)
if err != nil {
return specs.Descriptor{}, err
}
}
}

// fallback to a registry wide search
// found record in other repos, try mounting it to the target repo
entries := e.db.GetCrossRepoEntries(ctx, e.host, chainID)
entries := e.db.GetCrossRepoLayerEntries(ctx, e.host, chainID)
for _, entry := range entries {
desc := specs.Descriptor{
MediaType: e.mediaTypeImageLayer(),
Expand All @@ -220,7 +222,7 @@ func (e *overlaybdBuilderEngine) CheckForConvertedLayer(ctx context.Context, idx
if errdefs.IsAlreadyExists(err) {
desc.Annotations = nil

if err := e.db.CreateEntry(ctx, e.host, e.repository, entry.ConvertedDigest, chainID, entry.DataSize); err != nil {
if err := e.db.CreateLayerEntry(ctx, e.host, e.repository, entry.ConvertedDigest, chainID, entry.DataSize); err != nil {
continue // try a different repo if available
}

Expand All @@ -234,11 +236,125 @@ func (e *overlaybdBuilderEngine) CheckForConvertedLayer(ctx context.Context, idx
return specs.Descriptor{}, errdefs.ErrNotFound
}

// If manifest is already converted, avoid conversion. (e.g During tag reuse or cross repo mounts)
// Note: This is output mediatype sensitive, if the manifest is converted to a different mediatype,
// we will still convert it normally.
func (e *overlaybdBuilderEngine) CheckForConvertedManifest(ctx context.Context) (specs.Descriptor, error) {
if e.db == nil {
return specs.Descriptor{}, errdefs.ErrNotFound
}

// try to find the manifest in the target repo
entry := e.db.GetManifestEntryForRepo(ctx, e.host, e.repository, e.mediaTypeManifest(), e.inputDesc.Digest)
if entry != nil && entry.ConvertedDigest != "" {
convertedDesc := specs.Descriptor{
MediaType: e.mediaTypeImageLayer(),
Digest: entry.ConvertedDigest,
Size: entry.DataSize,
}
rc, err := e.fetcher.Fetch(ctx, convertedDesc)
if err == nil {
rc.Close()
logrus.Infof("manifest %s found in remote with resulting digest %s", e.inputDesc.Digest, convertedDesc.Digest)
return convertedDesc, nil
}
if errdefs.IsNotFound(err) {
// invalid record in db, which is not found in registry, remove it
err := e.db.DeleteManifestEntry(ctx, e.host, e.repository, e.mediaTypeManifest(), e.inputDesc.Digest)
if err != nil {
return specs.Descriptor{}, err
}
}
}
// fallback to a registry wide search
entries := e.db.GetCrossRepoManifestEntries(ctx, e.host, e.mediaTypeManifest(), e.inputDesc.Digest)
for _, entry := range entries {
convertedDesc := specs.Descriptor{
MediaType: e.mediaTypeManifest(),
Digest: entry.ConvertedDigest,
Size: entry.DataSize,
}
fetcher, err := e.resolver.Fetcher(ctx, fmt.Sprintf("%s/%s@%s", entry.Host, entry.Repository, convertedDesc.Digest.String()))
if err != nil {
return specs.Descriptor{}, err
}
manifest, err := fetchManifest(ctx, fetcher, convertedDesc)
if err != nil {
if errdefs.IsNotFound(err) {
// invalid record in db, which is not found in registry, remove it
err := e.db.DeleteManifestEntry(ctx, entry.Host, entry.Repository, e.mediaTypeManifest(), e.inputDesc.Digest)
if err != nil {
return specs.Descriptor{}, err
}
}
continue
}
if err := e.mountImage(ctx, *manifest, convertedDesc, entry.Repository); err != nil {
continue // try a different repo if available
}
if err := e.db.CreateManifestEntry(ctx, e.host, e.repository, e.mediaTypeManifest(), e.inputDesc.Digest, convertedDesc.Digest, entry.DataSize); err != nil {
continue // try a different repo if available
}
logrus.Infof("manifest %s mount from %s was successful", convertedDesc.Digest, entry.Repository)
return convertedDesc, nil
}

logrus.Infof("manifest %s not found already converted in remote", e.inputDesc.Digest)
return specs.Descriptor{}, errdefs.ErrNotFound
}

// mountImage is responsible for mounting a specific manifest from a source repository, this includes
// mounting all layers + config and then pushing the manifest.
func (e *overlaybdBuilderEngine) mountImage(ctx context.Context, manifest specs.Manifest, desc specs.Descriptor, mountRepository string) error {
// Mount Config Blobs
config := manifest.Config
config.Annotations = map[string]string{
fmt.Sprintf("%s.%s", labelDistributionSource, e.host): mountRepository,
}
_, err := e.pusher.Push(ctx, config)
if errdefs.IsAlreadyExists(err) {
logrus.Infof("config blob mount from %s was successful", mountRepository)
} else if err != nil {
return fmt.Errorf("Failed to mount config blob from %s repository : %w", mountRepository, err)
}

// Mount Layer Blobs
for idx, layer := range manifest.Layers {
desc := layer
desc.Annotations = map[string]string{
fmt.Sprintf("%s.%s", labelDistributionSource, e.host): mountRepository,
}
_, err := e.pusher.Push(ctx, desc)
if errdefs.IsAlreadyExists(err) {
logrus.Infof("layer %d mount from %s was successful", idx, mountRepository)
} else if err != nil {
return fmt.Errorf("failed to mount all layers from %s repository : %w", mountRepository, err)
}
}

// Push Manifest
cbuf, err := json.Marshal(manifest)
if err != nil {
return err
}
return uploadBytes(ctx, e.pusher, desc, cbuf)
}

func (e *overlaybdBuilderEngine) StoreConvertedManifestDetails(ctx context.Context) error {
if e.db == nil {
return nil
}
if e.outputDesc.Digest == "" {
return errors.New("manifest is not yet converted")
}
return e.db.CreateManifestEntry(ctx, e.host, e.repository, e.mediaTypeManifest(), e.inputDesc.Digest, e.outputDesc.Digest, e.outputDesc.Size)
}

func (e *overlaybdBuilderEngine) StoreConvertedLayerDetails(ctx context.Context, idx int) error {
if e.db == nil {
return nil
}
return e.db.CreateEntry(ctx, e.host, e.repository, e.overlaybdLayers[idx].desc.Digest, e.overlaybdLayers[idx].chainID, e.overlaybdLayers[idx].desc.Size)
return e.db.CreateLayerEntry(ctx, e.host, e.repository, e.overlaybdLayers[idx].desc.Digest, e.overlaybdLayers[idx].chainID, e.overlaybdLayers[idx].desc.Size)
}

func (e *overlaybdBuilderEngine) DownloadConvertedLayer(ctx context.Context, idx int, desc specs.Descriptor) error {
Expand Down
Loading

0 comments on commit c7e6fee

Please sign in to comment.