Skip to content

Commit

Permalink
Merge pull request #1185 from vrothberg/pull-improvements
Browse files Browse the repository at this point in the history
copy to storage: commit layers as early as possible
  • Loading branch information
rhatdan authored Mar 23, 2021
2 parents 05fa757 + 538c2f8 commit 4aa0a1e
Show file tree
Hide file tree
Showing 3 changed files with 348 additions and 122 deletions.
54 changes: 45 additions & 9 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/containers/image/v5/image"
internalblobinfocache "github.com/containers/image/v5/internal/blobinfocache"
"github.com/containers/image/v5/internal/pkg/platform"
internalTypes "github.com/containers/image/v5/internal/types"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/blobinfocache"
"github.com/containers/image/v5/pkg/compression"
Expand Down Expand Up @@ -920,7 +921,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name())
}
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool)
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool, index)
}
data[index] = cld
}
Expand Down Expand Up @@ -1116,7 +1117,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, false, bar)
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, false, bar, -1)
if err != nil {
return types.BlobInfo{}, err
}
Expand All @@ -1142,7 +1143,7 @@ type diffIDResult struct {

// copyLayer copies a layer with srcInfo (with known Digest and Annotations and possibly known Size) in src to dest, perhaps (de/re/)compressing it,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress, layerIndex int) (types.BlobInfo, digest.Digest, error) {
// If the srcInfo doesn't contain compression information, try to compute it from the
// MediaType, which was either read from a manifest by way of LayerInfos() or constructed
// by LayerInfosForCopy(), if it was supplied at all. If we succeed in copying the blob,
Expand Down Expand Up @@ -1175,7 +1176,26 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
// a failure when we eventually try to update the manifest with the digest and MIME type of the reused blob.
// Fixing that will probably require passing more information to TryReusingBlob() than the current version of
// the ImageDestination interface lets us pass in.
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
var (
blobInfo types.BlobInfo
reused bool
err error
)
// Note: the storage destination optimizes the committing of
// layers which requires passing the index of the layer.
// Hence, we need to special case and cast.
dest, ok := ic.c.dest.(internalTypes.ImageDestinationWithOptions)
if ok {
options := internalTypes.TryReusingBlobOptions{
Cache: ic.c.blobInfoCache,
CanSubstitute: ic.canSubstituteBlobs,
LayerIndex: &layerIndex,
}
reused, blobInfo, err = dest.TryReusingBlobWithOptions(ctx, srcInfo, options)
} else {
reused, blobInfo, err = ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
}

if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
Expand Down Expand Up @@ -1217,7 +1237,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to

bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar)
blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex)
if err != nil {
return types.BlobInfo{}, "", err
}
Expand Down Expand Up @@ -1248,7 +1268,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
// perhaps (de/re/)compressing the stream,
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool, toEncrypt bool, bar *mpb.Bar) (types.BlobInfo, <-chan diffIDResult, error) {
diffIDIsNeeded bool, toEncrypt bool, bar *mpb.Bar, layerIndex int) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult

Expand All @@ -1273,7 +1293,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea
}
}

blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, toEncrypt, bar) // Sets err to nil on success
blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.canModifyManifest, false, toEncrypt, bar, layerIndex) // Sets err to nil on success
return blobInfo, diffIDChan, err
// We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan
}
Expand Down Expand Up @@ -1325,7 +1345,7 @@ func (r errorAnnotationReader) Read(b []byte) (n int, err error) {
// and returns a complete blobInfo of the copied blob.
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer,
canModifyBlob bool, isConfig bool, toEncrypt bool, bar *mpb.Bar) (types.BlobInfo, error) {
canModifyBlob bool, isConfig bool, toEncrypt bool, bar *mpb.Bar, layerIndex int) (types.BlobInfo, error) {
if isConfig { // This is guaranteed by the caller, but set it here to be explicit.
canModifyBlob = false
}
Expand Down Expand Up @@ -1521,7 +1541,23 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}

// === Finally, send the layer stream to dest.
uploadedInfo, err := c.dest.PutBlob(ctx, &errorAnnotationReader{destStream}, inputInfo, c.blobInfoCache, isConfig)
var uploadedInfo types.BlobInfo
// Note: the storage destination optimizes the committing of layers
// which requires passing the index of the layer. Hence, we need to
// special case and cast.
dest, ok := c.dest.(internalTypes.ImageDestinationWithOptions)
if ok {
options := internalTypes.PutBlobOptions{
Cache: c.blobInfoCache,
IsConfig: isConfig,
}
if !isConfig {
options.LayerIndex = &layerIndex
}
uploadedInfo, err = dest.PutBlobWithOptions(ctx, &errorAnnotationReader{destStream}, inputInfo, options)
} else {
uploadedInfo, err = c.dest.PutBlob(ctx, &errorAnnotationReader{destStream}, inputInfo, c.blobInfoCache, isConfig)
}
if err != nil {
return types.BlobInfo{}, errors.Wrap(err, "Error writing blob")
}
Expand Down
53 changes: 53 additions & 0 deletions internal/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package types

import (
"context"
"io"

publicTypes "github.com/containers/image/v5/types"
)

// ImageDestinationWithOptions is an internal extension to the ImageDestination
// interface.
type ImageDestinationWithOptions interface {
publicTypes.ImageDestination

// PutBlobWithOptions is a wrapper around PutBlob. If
// options.LayerIndex is set, the blob will be committed directly.
// Either by the calling goroutine or by another goroutine already
// committing layers.
//
// Please note that TryReusingBlobWithOptions and PutBlobWithOptions
// *must* be used the together. Mixing the two with non "WithOptions"
// functions is not supported.
PutBlobWithOptions(ctx context.Context, stream io.Reader, blobinfo publicTypes.BlobInfo, options PutBlobOptions) (publicTypes.BlobInfo, error)

// TryReusingBlobWithOptions is a wrapper around TryReusingBlob. If
// options.LayerIndex is set, the reused blob will be recoreded as
// already pulled.
//
// Please note that TryReusingBlobWithOptions and PutBlobWithOptions
// *must* be used the together. Mixing the two with non "WithOptions"
// functions is not supported.
TryReusingBlobWithOptions(ctx context.Context, blobinfo publicTypes.BlobInfo, options TryReusingBlobOptions) (bool, publicTypes.BlobInfo, error)
}

// PutBlobOptions are used in PutBlobWithOptions.
type PutBlobOptions struct {
// Cache to look up blob infos.
Cache publicTypes.BlobInfoCache
// Denotes whether the blob is a config or not.
IsConfig bool
// The corresponding index in the layer slice.
LayerIndex *int
}

// TryReusingBlobOptions are used in TryReusingBlobWithOptions.
type TryReusingBlobOptions struct {
// Cache to look up blob infos.
Cache publicTypes.BlobInfoCache
// Use an equivalent of the desired blob.
CanSubstitute bool
// The corresponding index in the layer slice.
LayerIndex *int
}
Loading

0 comments on commit 4aa0a1e

Please sign in to comment.