Skip to content

Commit

Permalink
Skip pulling layers reusable from additional layer store
Browse files Browse the repository at this point in the history
Signed-off-by: Kohei Tokunaga <[email protected]>
  • Loading branch information
ktock committed Jan 12, 2021
1 parent 122b16a commit 97321ea
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 15 deletions.
30 changes: 26 additions & 4 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
}

data := make([]copyLayerData, numLayers)
copyLayerHelper := func(index int, srcLayer types.BlobInfo, toEncrypt bool, pool *mpb.Progress) {
copyLayerHelper := func(index int, srcLayer types.BlobInfo, toEncrypt bool, pool *mpb.Progress, srcRef reference.Named) {
defer copySemaphore.Release(1)
defer copyGroup.Done()
cld := copyLayerData{}
Expand All @@ -830,7 +830,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name())
}
} else {
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool)
cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool, srcRef)
}
data[index] = cld
}
Expand Down Expand Up @@ -867,7 +867,7 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error {
return errors.Wrapf(err, "Can't acquire semaphore")
}
copyGroup.Add(1)
go copyLayerHelper(i, srcLayer, encLayerBitmap[i], progressPool)
go copyLayerHelper(i, srcLayer, encLayerBitmap[i], progressPool, ic.c.rawSource.Reference().DockerReference())
}

// A call to copyGroup.Wait() is done at this point by the defer above.
Expand Down Expand Up @@ -1051,7 +1051,7 @@ type diffIDResult struct {

// copyLayer copies a layer with srcInfo (with known Digest and Annotations and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress) (types.BlobInfo, digest.Digest, error) {
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress, srcRef reference.Named) (types.BlobInfo, digest.Digest, error) {
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
// Diffs are needed if we are encrypting an image or trying to decrypt an image
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == "" || toEncrypt || (isOciEncrypted(srcInfo.MediaType) && ic.c.ociDecryptConfig != nil)
Expand All @@ -1076,6 +1076,28 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
return blobInfo, cachedDiffID, nil
}

rDest, ok := ic.c.dest.(types.ImageDestinationReusableWithRef)
if ok {
reused, blobInfo, err = rDest.TryReusingBlobWithRef(ctx, srcInfo, srcRef)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination with ref %q", srcInfo.Digest, srcRef.String())
}
if reused {
logrus.Debugf("Skipping blob %s (reusable with ref %q):", srcInfo.Digest, srcRef.String())
bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists")
bar.SetTotal(0, true)

// Throw an event that the layer has been skipped
if ic.c.progress != nil && ic.c.progressInterval > 0 {
ic.c.progress <- types.ProgressProperties{
Event: types.ProgressEventSkipped,
Artifact: srcInfo,
}
}
return blobInfo, cachedDiffID, nil
}
}
}

// Fallback: copy the layer, computing the diffID if we need to do so
Expand Down
63 changes: 52 additions & 11 deletions storage/storage_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,17 +54,18 @@ type storageImageSource struct {

type storageImageDestination struct {
imageRef storageReference
directory string // Temporary directory where we store blobs until Commit() time
nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs
manifest []byte // Manifest contents, temporary
signatures []byte // Signature contents, temporary
signatureses map[digest.Digest][]byte // Instance signature contents, temporary
putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions
blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // Sizes of each manifest's signature slice
directory string // Temporary directory where we store blobs until Commit() time
nextTempFileID int32 // A counter that we use for computing filenames to assign to blobs
manifest []byte // Manifest contents, temporary
signatures []byte // Signature contents, temporary
signatureses map[digest.Digest][]byte // Instance signature contents, temporary
putBlobMutex sync.Mutex // Mutex to sync state for parallel PutBlob executions
blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs
blobLabels map[digest.Digest]map[string]string // Mapping from layer blobsums to their corresponding additional layer store labels.
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // Sizes of each manifest's signature slice
}

type storageImageCloser struct {
Expand Down Expand Up @@ -352,6 +353,7 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (*
directory: directory,
signatureses: make(map[digest.Digest][]byte),
blobDiffIDs: make(map[digest.Digest]digest.Digest),
blobLabels: make(map[digest.Digest]map[string]string),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
SignatureSizes: []int{},
Expand Down Expand Up @@ -459,6 +461,36 @@ func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader,
}, nil
}

const (
// layerTargetReference is a key of a default label usable by registry-backed store.
// This contains the image reference.
layerTargetReference = "containers/image/target.reference"
)

func (s *storageImageDestination) TryReusingBlobWithRef(ctx context.Context, blobinfo types.BlobInfo, ref reference.Named) (bool, types.BlobInfo, error) {
// Check if we have a was-compressed layer in storage that's based on that blob.
labels := make(map[string]string)
for k, v := range blobinfo.Annotations {
labels[k] = v
}
labels[layerTargetReference] = ref.String()
layer, err := s.imageRef.transport.store.LayerFromAdditionalLayerStore(blobinfo.Digest, labels)
if err != nil {
if errors.Cause(err) == storage.ErrLayerUnknown {
return false, types.BlobInfo{}, nil
}
return false, types.BlobInfo{}, errors.Wrapf(err, `Error looking for compressed layers with digest %q and labels`, blobinfo.Digest)
}
// Record the uncompressed value so that we can use it to calculate layer IDs.
s.blobDiffIDs[blobinfo.Digest] = layer.UncompressedDigest
s.blobLabels[blobinfo.Digest] = labels
return true, types.BlobInfo{
Digest: blobinfo.Digest,
Size: layer.CompressedSize,
MediaType: blobinfo.MediaType,
}, nil
}

// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
Expand Down Expand Up @@ -682,6 +714,15 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
lastLayer = layer.ID
continue
}
if labels, ok := s.blobLabels[blob.Digest]; ok {
layer, err := s.imageRef.transport.store.PutFromAdditionalLayerStore(id, lastLayer, blob.Digest, labels)
if err != nil {
return errors.Wrapf(err, "failed to put layer from digest and labels")
}
lastLayer = layer.ID
continue
}

// Check if we previously cached a file with that blob's contents. If we didn't,
// then we need to read the desired contents from a layer.
filename, ok := s.filenames[blob.Digest]
Expand Down
8 changes: 8 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,14 @@ type ImageDestination interface {
Commit(ctx context.Context, unparsedToplevel UnparsedImage) error
}

type ImageDestinationReusableWithRef interface {
ImageDestination

// TryReusingBlobWithRef checks whether the destination can efficiently reuse a blob, and if so, applies it to the current destination.
// This allows the destination to search the blob contents with taking the image reference into consideration.
TryReusingBlobWithRef(ctx context.Context, blobinfo BlobInfo, ref reference.Named) (bool, BlobInfo, error)
}

// ManifestTypeRejectedError is returned by ImageDestination.PutManifest if the destination is in principle available,
// refuses specifically this manifest type, but may accept a different manifest type.
type ManifestTypeRejectedError struct { // We only use a struct to allow a type assertion, without limiting the contents of the error otherwise.
Expand Down

0 comments on commit 97321ea

Please sign in to comment.