From 40a9359dd3dbf613fe2a11c949884ff99794e7eb Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Tue, 30 Jan 2024 12:49:34 +0100 Subject: [PATCH 1/2] dest: propagate layer index to PutBlobPartial Signed-off-by: Giuseppe Scrivano --- copy/single.go | 2 +- internal/imagedestination/stubs/put_blob_partial.go | 2 +- internal/private/private.go | 2 +- oci/archive/oci_dest.go | 4 ++-- openshift/openshift_dest.go | 4 ++-- pkg/blobcache/dest.go | 4 ++-- storage/storage_dest.go | 2 +- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/copy/single.go b/copy/single.go index c233619427..99e7121057 100644 --- a/copy/single.go +++ b/copy/single.go @@ -746,7 +746,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to wrapped: ic.c.rawSource, bar: bar, } - uploadedBlob, err := ic.c.dest.PutBlobPartial(ctx, &proxy, srcInfo, ic.c.blobInfoCache) + uploadedBlob, err := ic.c.dest.PutBlobPartial(ctx, &proxy, srcInfo, &layerIndex, ic.c.blobInfoCache) if err == nil { if srcInfo.Size != -1 { refill := srcInfo.Size - bar.Current() diff --git a/internal/imagedestination/stubs/put_blob_partial.go b/internal/imagedestination/stubs/put_blob_partial.go index 0dc6bd5af7..c0c822822c 100644 --- a/internal/imagedestination/stubs/put_blob_partial.go +++ b/internal/imagedestination/stubs/put_blob_partial.go @@ -39,7 +39,7 @@ func (stub NoPutBlobPartialInitialize) SupportsPutBlobPartial() bool { // It is available only if SupportsPutBlobPartial(). // Even if SupportsPutBlobPartial() returns true, the call can fail, in which case the caller // should fall back to PutBlobWithOptions. -func (stub NoPutBlobPartialInitialize) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, cache blobinfocache.BlobInfoCache2) (private.UploadedBlob, error) { +func (stub NoPutBlobPartialInitialize) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, index *int, cache blobinfocache.BlobInfoCache2) (private.UploadedBlob, error) { return private.UploadedBlob{}, fmt.Errorf("internal error: PutBlobPartial is not supported by the %q transport", stub.transportName) } diff --git a/internal/private/private.go b/internal/private/private.go index 7037755bfc..bbe0c76912 100644 --- a/internal/private/private.go +++ b/internal/private/private.go @@ -55,7 +55,7 @@ type ImageDestinationInternalOnly interface { // It is available only if SupportsPutBlobPartial(). // Even if SupportsPutBlobPartial() returns true, the call can fail, in which case the caller // should fall back to PutBlobWithOptions. - PutBlobPartial(ctx context.Context, chunkAccessor BlobChunkAccessor, srcInfo types.BlobInfo, cache blobinfocache.BlobInfoCache2) (UploadedBlob, error) + PutBlobPartial(ctx context.Context, chunkAccessor BlobChunkAccessor, srcInfo types.BlobInfo, index *int, cache blobinfocache.BlobInfoCache2) (UploadedBlob, error) // TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). diff --git a/oci/archive/oci_dest.go b/oci/archive/oci_dest.go index 6ca618e351..f76b62139b 100644 --- a/oci/archive/oci_dest.go +++ b/oci/archive/oci_dest.go @@ -120,8 +120,8 @@ func (d *ociArchiveImageDestination) PutBlobWithOptions(ctx context.Context, str // It is available only if SupportsPutBlobPartial(). // Even if SupportsPutBlobPartial() returns true, the call can fail, in which case the caller // should fall back to PutBlobWithOptions. -func (d *ociArchiveImageDestination) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, cache blobinfocache.BlobInfoCache2) (private.UploadedBlob, error) { - return d.unpackedDest.PutBlobPartial(ctx, chunkAccessor, srcInfo, cache) +func (d *ociArchiveImageDestination) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, index *int, cache blobinfocache.BlobInfoCache2) (private.UploadedBlob, error) { + return d.unpackedDest.PutBlobPartial(ctx, chunkAccessor, srcInfo, index, cache) } // TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination diff --git a/openshift/openshift_dest.go b/openshift/openshift_dest.go index 50a5339e1b..5c2ef0fbb1 100644 --- a/openshift/openshift_dest.go +++ b/openshift/openshift_dest.go @@ -128,8 +128,8 @@ func (d *openshiftImageDestination) PutBlobWithOptions(ctx context.Context, stre // It is available only if SupportsPutBlobPartial(). // Even if SupportsPutBlobPartial() returns true, the call can fail, in which case the caller // should fall back to PutBlobWithOptions. -func (d *openshiftImageDestination) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, cache blobinfocache.BlobInfoCache2) (private.UploadedBlob, error) { - return d.docker.PutBlobPartial(ctx, chunkAccessor, srcInfo, cache) +func (d *openshiftImageDestination) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, index *int, cache blobinfocache.BlobInfoCache2) (private.UploadedBlob, error) { + return d.docker.PutBlobPartial(ctx, chunkAccessor, srcInfo, index, cache) } // TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination diff --git a/pkg/blobcache/dest.go b/pkg/blobcache/dest.go index f32fccf805..757af2dcb6 100644 --- a/pkg/blobcache/dest.go +++ b/pkg/blobcache/dest.go @@ -227,8 +227,8 @@ func (d *blobCacheDestination) SupportsPutBlobPartial() bool { // It is available only if SupportsPutBlobPartial(). // Even if SupportsPutBlobPartial() returns true, the call can fail, in which case the caller // should fall back to PutBlobWithOptions. -func (d *blobCacheDestination) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, cache blobinfocache.BlobInfoCache2) (private.UploadedBlob, error) { - return d.destination.PutBlobPartial(ctx, chunkAccessor, srcInfo, cache) +func (d *blobCacheDestination) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, index *int, cache blobinfocache.BlobInfoCache2) (private.UploadedBlob, error) { + return d.destination.PutBlobPartial(ctx, chunkAccessor, srcInfo, index, cache) } // TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination diff --git a/storage/storage_dest.go b/storage/storage_dest.go index 53569793e6..0cfb2d80a6 100644 --- a/storage/storage_dest.go +++ b/storage/storage_dest.go @@ -269,7 +269,7 @@ func (f *zstdFetcher) GetBlobAt(chunks []chunked.ImageSourceChunk) (chan io.Read // It is available only if SupportsPutBlobPartial(). // Even if SupportsPutBlobPartial() returns true, the call can fail, in which case the caller // should fall back to PutBlobWithOptions. -func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, cache blobinfocache.BlobInfoCache2) (private.UploadedBlob, error) { +func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, index *int, cache blobinfocache.BlobInfoCache2) (private.UploadedBlob, error) { fetcher := zstdFetcher{ chunkAccessor: chunkAccessor, ctx: ctx, From 56e34435727962e5fb4258bfc6f783d76d5967be Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 11 Dec 2023 14:24:06 +0100 Subject: [PATCH 2/2] storage, dest: clarify when TOCDigest is used This update introduces an enhancement in the blob handling mechanism, specifically by separating the TOC digest from the uncompressed/compressed digest. Follow-up for: #1080. Signed-off-by: Giuseppe Scrivano --- copy/single.go | 14 ++- internal/private/private.go | 12 +- storage/storage_dest.go | 243 +++++++++++++++++++++++------------- 3 files changed, 176 insertions(+), 93 deletions(-) diff --git a/copy/single.go b/copy/single.go index 99e7121057..eb8ffc351b 100644 --- a/copy/single.go +++ b/copy/single.go @@ -690,11 +690,16 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to requiredCompression = ic.compressionFormat } + var tocDigest digest.Digest + // Check if we have a chunked layer in storage that's based on that blob. These layers are stored by their TOC digest. - tocDigest, err := chunkedToc.GetTOCDigest(srcInfo.Annotations) + d, err := chunkedToc.GetTOCDigest(srcInfo.Annotations) if err != nil { return types.BlobInfo{}, "", err } + if d != nil { + tocDigest = *d + } reused, reusedBlob, err := ic.c.dest.TryReusingBlobWithOptions(ctx, srcInfo, private.TryReusingBlobOptions{ Cache: ic.c.blobInfoCache, @@ -713,7 +718,12 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to if reused { logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) func() { // A scope for defer - bar := ic.c.createProgressBar(pool, false, types.BlobInfo{Digest: reusedBlob.Digest, Size: 0}, "blob", "skipped: already exists") + d := reusedBlob.Digest + label := "skipped: already exists" + if reusedBlob.TOCDigest != "" { + label = "skipped: already exists (found by TOC)" + } + bar := ic.c.createProgressBar(pool, false, types.BlobInfo{Digest: d, Size: 0}, "blob", label) defer bar.Abort(false) bar.mark100PercentComplete() }() diff --git a/internal/private/private.go b/internal/private/private.go index bbe0c76912..40efb47c84 100644 --- a/internal/private/private.go +++ b/internal/private/private.go @@ -81,8 +81,9 @@ type ImageDestination interface { // UploadedBlob is information about a blob written to a destination. // It is the subset of types.BlobInfo fields the transport is responsible for setting; all fields must be provided. type UploadedBlob struct { - Digest digest.Digest - Size int64 + Digest digest.Digest + Size int64 + TOCDigest digest.Digest } // PutBlobOptions are used in PutBlobWithOptions. @@ -118,14 +119,15 @@ type TryReusingBlobOptions struct { PossibleManifestFormats []string // A set of possible manifest formats; at least one should support the reused layer blob. RequiredCompression *compression.Algorithm // If set, reuse blobs with a matching algorithm as per implementations in internal/imagedestination/impl.helpers.go OriginalCompression *compression.Algorithm // May be nil to indicate “uncompressed” or “unknown”. - TOCDigest *digest.Digest // If specified, the blob can be looked up in the destination also by its TOC digest. + TOCDigest digest.Digest // If specified, the blob can be looked up in the destination also by its TOC digest. } // ReusedBlob is information about a blob reused in a destination. // It is the subset of types.BlobInfo fields the transport is responsible for setting. type ReusedBlob struct { - Digest digest.Digest // Must be provided - Size int64 // Must be provided + Digest digest.Digest // Must be provided, can be empty if TOCDigest is present + TOCDigest digest.Digest // Must be provided, can be empty if Digest is present + Size int64 // Must be provided // The following compression fields should be set when the reuse substitutes // a differently-compressed blob. CompressionOperation types.LayerCompression // Compress/Decompress, matching the reused blob; PreserveOriginal if N/A diff --git a/storage/storage_dest.go b/storage/storage_dest.go index 0cfb2d80a6..f73a30808f 100644 --- a/storage/storage_dest.go +++ b/storage/storage_dest.go @@ -77,19 +77,21 @@ type storageImageDestination struct { indexToStorageID map[int]*string // All accesses to below data are protected by `lock` which is made // *explicit* in the code. - uncompressedOrTocDigest map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs or TOC IDs. - fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes - filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them - currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed) - indexToAddedLayerInfo map[int]addedLayerInfo // Mapping from layer (by index) to blob to add to the image - blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer - diffOutputs map[digest.Digest]*graphdriver.DriverWithDifferOutput // Mapping from digest to differ output + blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs + indexToTocDigest map[int]digest.Digest // Mapping from layer index to their corresponding TOC Digest, if used + fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes + filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them + currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed) + indexToAddedLayerInfo map[int]addedLayerInfo // Mapping from layer (by index) to blob to add to the image + blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer + diffOutputs map[digest.Digest]*graphdriver.DriverWithDifferOutput // Mapping from digest to differ output } // addedLayerInfo records data about a layer to use in this image. type addedLayerInfo struct { - digest digest.Digest - emptyLayer bool // The layer is an “empty”/“throwaway” one, and may or may not be physically represented in various transport / storage systems. false if the manifest type does not have the concept. + digest digest.Digest // Mandatory, the digest of the layer. + tocDigest digest.Digest // Optional. If tocDigest != "", the layer was pulled by its TOC. + emptyLayer bool // The layer is an “empty”/“throwaway” one, and may or may not be physically represented in various transport / storage systems. false if the manifest type does not have the concept. } // newImageDestination sets us up to write a new image, caching blobs in a temporary directory until @@ -117,18 +119,19 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (* HasThreadSafePutBlob: true, }), - imageRef: imageRef, - directory: directory, - signatureses: make(map[digest.Digest][]byte), - uncompressedOrTocDigest: make(map[digest.Digest]digest.Digest), - blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), - SignatureSizes: []int{}, - SignaturesSizes: make(map[digest.Digest][]int), - indexToStorageID: make(map[int]*string), - indexToAddedLayerInfo: make(map[int]addedLayerInfo), - diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput), + imageRef: imageRef, + directory: directory, + signatureses: make(map[digest.Digest][]byte), + blobDiffIDs: make(map[digest.Digest]digest.Digest), + indexToTocDigest: make(map[int]digest.Digest), + blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + SignatureSizes: []int{}, + SignaturesSizes: make(map[digest.Digest][]int), + indexToStorageID: make(map[int]*string), + indexToAddedLayerInfo: make(map[int]addedLayerInfo), + diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput), } dest.Compat = impl.AddCompat(dest) return dest, nil @@ -227,7 +230,7 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf // Record information about the blob. s.lock.Lock() - s.uncompressedOrTocDigest[blobDigest] = diffID.Digest() + s.blobDiffIDs[blobDigest] = diffID.Digest() s.fileSizes[blobDigest] = counter.Count s.filenames[blobDigest] = filename s.lock.Unlock() @@ -286,13 +289,19 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces return private.UploadedBlob{}, err } + if out.TOCDigest == "" { + return private.UploadedBlob{}, errors.New("TOC digest is empty") + } + blobDigest := srcInfo.Digest s.lock.Lock() - s.uncompressedOrTocDigest[blobDigest] = blobDigest s.fileSizes[blobDigest] = 0 s.filenames[blobDigest] = "" s.diffOutputs[blobDigest] = out + if index != nil { + s.indexToTocDigest[*index] = out.TOCDigest + } s.lock.Unlock() return private.UploadedBlob{ @@ -317,72 +326,79 @@ func (s *storageImageDestination) TryReusingBlobWithOptions(ctx context.Context, return reused, info, s.queueOrCommit(*options.LayerIndex, addedLayerInfo{ digest: info.Digest, + tocDigest: options.TOCDigest, emptyLayer: options.EmptyLayer, }) } -// tryReusingBlobAsPending implements TryReusingBlobWithOptions for (digest, size or -1), filling s.uncompressedOrTocDigest and other metadata. +// tryReusingBlobAsPending implements TryReusingBlobWithOptions for (blobDigest, size or -1), filling s.blobDiffIDs and other metadata. // The caller must arrange the blob to be eventually committed using s.commitLayer(). -func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, size int64, options *private.TryReusingBlobOptions) (bool, private.ReusedBlob, error) { +func (s *storageImageDestination) tryReusingBlobAsPending(blobDigest digest.Digest, size int64, options *private.TryReusingBlobOptions) (bool, private.ReusedBlob, error) { // lock the entire method as it executes fairly quickly s.lock.Lock() defer s.lock.Unlock() if options.SrcRef != nil { // Check if we have the layer in the underlying additional layer store. - aLayer, err := s.imageRef.transport.store.LookupAdditionalLayer(digest, options.SrcRef.String()) + aLayer, err := s.imageRef.transport.store.LookupAdditionalLayer(blobDigest, options.SrcRef.String()) if err != nil && !errors.Is(err, storage.ErrLayerUnknown) { - return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q and labels: %w`, digest, err) + return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q and labels: %w`, blobDigest, err) } else if err == nil { // Record the uncompressed value so that we can use it to calculate layer IDs. - s.uncompressedOrTocDigest[digest] = aLayer.UncompressedDigest() - s.blobAdditionalLayer[digest] = aLayer + s.blobDiffIDs[blobDigest] = aLayer.UncompressedDigest() + s.blobAdditionalLayer[blobDigest] = aLayer return true, private.ReusedBlob{ - Digest: digest, + Digest: blobDigest, Size: aLayer.CompressedSize(), }, nil } } - if digest == "" { + if blobDigest == "" { return false, private.ReusedBlob{}, errors.New(`Can not check for a blob with unknown digest`) } - if err := digest.Validate(); err != nil { + if err := blobDigest.Validate(); err != nil { return false, private.ReusedBlob{}, fmt.Errorf("Can not check for a blob with invalid digest: %w", err) } + if options.TOCDigest != "" { + if err := options.TOCDigest.Validate(); err != nil { + return false, private.ReusedBlob{}, fmt.Errorf("Can not check for a blob with invalid digest: %w", err) + } + } + + // Check if we have a wasn't-compressed layer in storage that's based on that blob. // Check if we've already cached it in a file. - if size, ok := s.fileSizes[digest]; ok { + if size, ok := s.fileSizes[blobDigest]; ok { return true, private.ReusedBlob{ - Digest: digest, + Digest: blobDigest, Size: size, }, nil } - // Check if we have a wasn't-compressed layer in storage that's based on that blob. - layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(digest) + layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(blobDigest) if err != nil && !errors.Is(err, storage.ErrLayerUnknown) { - return false, private.ReusedBlob{}, fmt.Errorf(`looking for layers with digest %q: %w`, digest, err) + return false, private.ReusedBlob{}, fmt.Errorf(`looking for layers with digest %q: %w`, blobDigest, err) } if len(layers) > 0 { // Save this for completeness. - s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest + s.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ - Digest: digest, + Digest: blobDigest, Size: layers[0].UncompressedSize, }, nil } // Check if we have a was-compressed layer in storage that's based on that blob. - layers, err = s.imageRef.transport.store.LayersByCompressedDigest(digest) + layers, err = s.imageRef.transport.store.LayersByCompressedDigest(blobDigest) if err != nil && !errors.Is(err, storage.ErrLayerUnknown) { - return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q: %w`, digest, err) + return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q: %w`, blobDigest, err) } if len(layers) > 0 { // Record the uncompressed value so that we can use it to calculate layer IDs. - s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest + s.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ - Digest: digest, + Digest: blobDigest, Size: layers[0].CompressedSize, }, nil } @@ -391,23 +407,23 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, // Because we must return the size, which is unknown for unavailable compressed blobs, the returned BlobInfo refers to the // uncompressed layer, and that can happen only if options.CanSubstitute, or if the incoming manifest already specifies the size. if options.CanSubstitute || size != -1 { - if uncompressedDigest := options.Cache.UncompressedDigest(digest); uncompressedDigest != "" && uncompressedDigest != digest { + if uncompressedDigest := options.Cache.UncompressedDigest(blobDigest); uncompressedDigest != "" && uncompressedDigest != blobDigest { layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(uncompressedDigest) if err != nil && !errors.Is(err, storage.ErrLayerUnknown) { return false, private.ReusedBlob{}, fmt.Errorf(`looking for layers with digest %q: %w`, uncompressedDigest, err) } if len(layers) > 0 { if size != -1 { - s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest + s.blobDiffIDs[blobDigest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ - Digest: digest, + Digest: blobDigest, Size: size, }, nil } if !options.CanSubstitute { - return false, private.ReusedBlob{}, fmt.Errorf("Internal error: options.CanSubstitute was expected to be true for blob with digest %s", digest) + return false, private.ReusedBlob{}, fmt.Errorf("Internal error: options.CanSubstitute was expected to be true for blob with digest %s", blobDigest) } - s.uncompressedOrTocDigest[uncompressedDigest] = layers[0].UncompressedDigest + s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: uncompressedDigest, Size: layers[0].UncompressedSize, @@ -416,23 +432,37 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, } } - tocDigest := digest - if options.TOCDigest != nil { - tocDigest = *options.TOCDigest - } + if options.TOCDigest != "" && options.LayerIndex != nil { + // we are already holding s.lock + diffOutput, ok := s.diffOutputs[blobDigest] + if ok { + // Save this for completeness. + s.indexToTocDigest[*options.LayerIndex] = options.TOCDigest + return true, private.ReusedBlob{ + Digest: blobDigest, + TOCDigest: diffOutput.TOCDigest, + Size: diffOutput.Size, + }, nil + } - // Check if we have a chunked layer in storage with the same TOC digest. - layers, err = s.imageRef.transport.store.LayersByTOCDigest(tocDigest) - if err != nil && !errors.Is(err, storage.ErrLayerUnknown) { - return false, private.ReusedBlob{}, fmt.Errorf(`looking for layers with TOC digest %q: %w`, tocDigest, err) - } - if len(layers) > 0 { - // Save this for completeness. - s.uncompressedOrTocDigest[digest] = layers[0].TOCDigest - return true, private.ReusedBlob{ - Digest: layers[0].TOCDigest, - Size: layers[0].UncompressedSize, - }, nil + // Check if we have a chunked layer in storage with the same TOC digest. + layers, err := s.imageRef.transport.store.LayersByTOCDigest(options.TOCDigest) + + if err != nil && !errors.Is(err, storage.ErrLayerUnknown) { + return false, private.ReusedBlob{}, fmt.Errorf(`looking for layers with TOC digest %q: %w`, options.TOCDigest, err) + } + if len(layers) > 0 { + if options.LayerIndex != nil { + // Save this for completeness. + s.indexToTocDigest[*options.LayerIndex] = options.TOCDigest + } + + return true, private.ReusedBlob{ + Digest: blobDigest, + TOCDigest: layers[0].TOCDigest, + Size: layers[0].UncompressedSize, + }, nil + } } // Nope, we don't have it. @@ -457,8 +487,9 @@ func (s *storageImageDestination) computeID(m manifest.Manifest) string { continue } blobSum := m.FSLayers[i].BlobSum - diffID, ok := s.uncompressedOrTocDigest[blobSum] + diffID, ok := s.blobDiffIDs[blobSum] if !ok { + // this can, in principle, legitimately happen when a layer is reused by TOC. logrus.Infof("error looking up diffID for layer %q", blobSum.String()) return "" } @@ -468,8 +499,16 @@ func (s *storageImageDestination) computeID(m manifest.Manifest) string { // We know the ID calculation doesn't actually use the diffIDs, so we don't need to populate // the diffID list. case *manifest.OCI1: - for _, l := range m.Layers { - diffIDs = append(diffIDs, l.Digest) + for i, l := range m.Layers { + if l.Digest != "" { + // if a layer was pulled using a partial blob, we need to use the TOC digest + // to calculate the image ID, since the layer digest was not validated. + if tocDigest, found := s.indexToTocDigest[i]; found { + diffIDs = append(diffIDs, tocDigest) + } else { + diffIDs = append(diffIDs, l.Digest) + } + } } default: return "" @@ -555,20 +594,26 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo) return nil } -// getDiffIDOrTOCDigest returns the diffID for the specified digest or the digest for the TOC, if known. -func (s *storageImageDestination) getDiffIDOrTOCDigest(uncompressedDigest digest.Digest) (digest.Digest, bool) { +// getLayerID returns the diffID for the specified digest or the digest for the TOC, if known. +func (s *storageImageDestination) getLayerID(uncompressedDigest digest.Digest, tocDigest digest.Digest, index int) (string, bool) { s.lock.Lock() defer s.lock.Unlock() - if d, found := s.diffOutputs[uncompressedDigest]; found { - return d.TOCDigest, found + if d, found := s.indexToTocDigest[index]; found { + return d.Hex() + "-toc", found + } + + if d, found := s.diffOutputs[tocDigest]; found { + // the layer was pulled by its TOC. + return d.TOCDigest.Hex() + "-toc", found } - d, found := s.uncompressedOrTocDigest[uncompressedDigest] - return d, found + + d, found := s.blobDiffIDs[uncompressedDigest] + return d.Hex(), found } // commitLayer commits the specified layer with the given index to the storage. -// size can usually be -1; it can be provided if the layer is not known to be already present in uncompressedOrTocDigest. +// size can usually be -1; it can be provided if the layer is not known to be already present in blobDiffIDs. // // If the layer cannot be committed yet, the function returns (true, nil). // @@ -599,35 +644,43 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si // Check if there's already a layer with the ID that we'd give to the result of applying // this layer blob to its parent, if it has one, or the blob's hex value otherwise. - // The diffIDOrTOCDigest refers either to the DiffID or the digest of the TOC. - diffIDOrTOCDigest, haveDiffIDOrTOCDigest := s.getDiffIDOrTOCDigest(info.digest) - if !haveDiffIDOrTOCDigest { + // The layerID refers either to the DiffID or the digest of the TOC. + layerID, haveLayerID := s.getLayerID(info.digest, info.tocDigest, index) + if !haveLayerID { // Check if it's elsewhere and the caller just forgot to pass it to us in a PutBlob(), // or to even check if we had it. // Use none.NoCache to avoid a repeated DiffID lookup in the BlobInfoCache; a caller // that relies on using a blob digest that has never been seen by the store had better call // TryReusingBlob; not calling PutBlob already violates the documented API, so there’s only // so far we are going to accommodate that (if we should be doing that at all). - logrus.Debugf("looking for diffID or TOC digest for blob %+v", info.digest) + logrus.Debugf("looking for diffID or TOC digest for blob=%+v tocDigest=+%v", info.digest, info.tocDigest) + // Use tryReusingBlobAsPending, not the top-level TryReusingBlobWithOptions, to prevent recursion via queueOrCommit. has, _, err := s.tryReusingBlobAsPending(info.digest, size, &private.TryReusingBlobOptions{ + TOCDigest: info.tocDigest, Cache: none.NoCache, CanSubstitute: false, }) if err != nil { - return false, fmt.Errorf("checking for a layer based on blob %q: %w", info.digest.String(), err) + return false, fmt.Errorf("checking for a layer based on blob %q (tocDigest %q): %w", info.digest.String(), info.tocDigest.String(), err) } if !has { return false, fmt.Errorf("error determining uncompressed digest or TOC digest for blob %q", info.digest.String()) } - diffIDOrTOCDigest, haveDiffIDOrTOCDigest = s.getDiffIDOrTOCDigest(info.digest) - if !haveDiffIDOrTOCDigest { - return false, fmt.Errorf("we have blob %q, but don't know its uncompressed or TOC digest", info.digest.String()) + + layerID, haveLayerID = s.getLayerID(info.digest, info.tocDigest, index) + if !haveLayerID { + d := info.digest + if d == "" { + d = info.tocDigest + } + return false, fmt.Errorf("we have blob %q, but don't know its uncompressed or TOC digest", d.String()) } } - id := diffIDOrTOCDigest.Hex() + + id := layerID if lastLayer != "" { - id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffIDOrTOCDigest.Hex())).Hex() + id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + layerID)).Hex() } if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil { // There's already a layer that should have the right contents, just reuse it. @@ -686,6 +739,14 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si return false, nil } + s.lock.Lock() + diffID, ok := s.blobDiffIDs[info.digest] + s.lock.Unlock() + + if !ok { + return false, fmt.Errorf("failed to find diffID for layer: %q", info.digest) + } + s.lock.Lock() al, ok := s.blobAdditionalLayer[info.digest] s.lock.Unlock() @@ -707,7 +768,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si if !ok { // Try to find the layer with contents matching that blobsum. layer := "" - layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(diffIDOrTOCDigest) + layers, err2 := s.imageRef.transport.store.LayersByUncompressedDigest(diffID) if err2 == nil && len(layers) > 0 { layer = layers[0].ID } else { @@ -763,7 +824,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si // TODO: This can take quite some time, and should ideally be cancellable using ctx.Done(). layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, &storage.LayerOptions{ OriginalDigest: info.digest, - UncompressedDigest: diffIDOrTOCDigest, + UncompressedDigest: diffID, }, file) if err != nil && !errors.Is(err, storage.ErrDuplicateID) { return false, fmt.Errorf("adding layer with blob %q: %w", info.digest, err) @@ -817,8 +878,18 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t // Extract, commit, or find the layers. for i, blob := range layerBlobs { + s.lock.Lock() + diffOutput, ok := s.diffOutputs[blob.Digest] + s.lock.Unlock() + + var tocDigest digest.Digest + if ok { + tocDigest = diffOutput.TOCDigest + } + if stopQueue, err := s.commitLayer(i, addedLayerInfo{ digest: blob.Digest, + tocDigest: tocDigest, emptyLayer: blob.EmptyLayer, }, blob.Size); err != nil { return err