From e600adae27aece4a70060490655160310c90843b Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Tue, 19 Sep 2023 09:02:37 +0200 Subject: [PATCH] storage: store the diffID and use for validation when copying a partial image, store the expected diffID so that it can be later used to validate the obtained layer stream. Signed-off-by: Giuseppe Scrivano --- storage/storage_dest.go | 75 ++++++++++++++++++++++++++++------------- storage/storage_src.go | 41 ++++++++++++++++++++-- 2 files changed, 91 insertions(+), 25 deletions(-) diff --git a/storage/storage_dest.go b/storage/storage_dest.go index 6e1e83ecdb..84219337c6 100644 --- a/storage/storage_dest.go +++ b/storage/storage_dest.go @@ -541,7 +541,7 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo) } s.lock.Unlock() // Note: commitLayer locks on-demand. - if err := s.commitLayer(index, info, -1); err != nil { + if stopQueue, err := s.commitLayer(index, info, -1); stopQueue || err != nil { return err } s.lock.Lock() @@ -570,15 +570,17 @@ func (s *storageImageDestination) getDiffIDOrTOCDigest(uncompressedDigest digest // commitLayer commits the specified layer with the given index to the storage. // size can usually be -1; it can be provided if the layer is not known to be already present in uncompressedOrTocDigest. // +// If the layer cannot be committed yet, the function returns (true, nil). +// // Note that the previous layer is expected to already be committed. // // Caution: this function must be called without holding `s.lock`. Callers // must guarantee that, at any given time, at most one goroutine may execute // `commitLayer()`. -func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, size int64) error { +func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, size int64) (bool, error) { // Already committed? Return early. if _, alreadyCommitted := s.indexToStorageID[index]; alreadyCommitted { - return nil + return false, nil } // Start with an empty string or the previous layer ID. Note that @@ -592,7 +594,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si // Carry over the previous ID for empty non-base layers. if info.emptyLayer { s.indexToStorageID[index] = &lastLayer - return nil + return false, nil } // Check if there's already a layer with the ID that we'd give to the result of applying @@ -613,14 +615,14 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si CanSubstitute: false, }) if err != nil { - return fmt.Errorf("checking for a layer based on blob %q: %w", info.digest.String(), err) + return false, fmt.Errorf("checking for a layer based on blob %q: %w", info.digest.String(), err) } if !has { - return fmt.Errorf("error determining uncompressed digest or TOC digest for blob %q", info.digest.String()) + return false, fmt.Errorf("error determining uncompressed digest or TOC digest for blob %q", info.digest.String()) } diffIDOrTOCDigest, haveDiffIDOrTOCDigest = s.getDiffIDOrTOCDigest(info.digest) if !haveDiffIDOrTOCDigest { - return fmt.Errorf("we have blob %q, but don't know its uncompressed or TOC digest", info.digest.String()) + return false, fmt.Errorf("we have blob %q, but don't know its uncompressed or TOC digest", info.digest.String()) } } id := diffIDOrTOCDigest.Hex() @@ -631,28 +633,53 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si // There's already a layer that should have the right contents, just reuse it. lastLayer = layer.ID s.indexToStorageID[index] = &lastLayer - return nil + return false, nil } s.lock.Lock() diffOutput, ok := s.diffOutputs[info.digest] s.lock.Unlock() if ok { + if s.manifest == nil { + logrus.Debugf("Skipping commit for TOC=%q, manifest not yet available", id) + return true, nil + } + + man, err := manifest.FromBlob(s.manifest, manifest.GuessMIMEType(s.manifest)) + if err != nil { + return false, fmt.Errorf("parsing manifest: %w", err) + } + + cb, err := s.getConfigBlob(man.ConfigInfo()) + if err != nil { + return false, err + } + + // retrieve the expected uncompressed digest from the config blob. + configOCI := &imgspecv1.Image{} + if err := json.Unmarshal(cb, configOCI); err != nil { + return false, err + } + if index >= len(configOCI.RootFS.DiffIDs) { + return false, fmt.Errorf("index %d out of range for configOCI.RootFS.DiffIDs", index) + } + layer, err := s.imageRef.transport.store.CreateLayer(id, lastLayer, nil, "", false, nil) if err != nil { - return err + return false, err } - // FIXME: what to do with the uncompressed digest? - diffOutput.UncompressedDigest = info.digest + // let the storage layer know what was the original uncompressed layer. + diffOutput.UncompressedDigest = configOCI.RootFS.DiffIDs[index] + logrus.Debugf("Setting uncompressed digest to %q for layer %q", diffOutput.UncompressedDigest, id) if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, nil); err != nil { _ = s.imageRef.transport.store.Delete(layer.ID) - return err + return false, err } s.indexToStorageID[index] = &layer.ID - return nil + return false, nil } s.lock.Lock() @@ -661,11 +688,11 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si if ok { layer, err := al.PutAs(id, lastLayer, nil) if err != nil && !errors.Is(err, storage.ErrDuplicateID) { - return fmt.Errorf("failed to put layer from digest and labels: %w", err) + return false, fmt.Errorf("failed to put layer from digest and labels: %w", err) } lastLayer = layer.ID s.indexToStorageID[index] = &lastLayer - return nil + return false, nil } // Check if we previously cached a file with that blob's contents. If we didn't, @@ -686,7 +713,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si } } if layer == "" { - return fmt.Errorf("locating layer for blob %q: %w", info.digest, err2) + return false, fmt.Errorf("locating layer for blob %q: %w", info.digest, err2) } // Read the layer's contents. noCompression := archive.Uncompressed @@ -695,7 +722,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si } diff, err2 := s.imageRef.transport.store.Diff("", layer, diffOptions) if err2 != nil { - return fmt.Errorf("reading layer %q for blob %q: %w", layer, info.digest, err2) + return false, fmt.Errorf("reading layer %q for blob %q: %w", layer, info.digest, err2) } // Copy the layer diff to a file. Diff() takes a lock that it holds // until the ReadCloser that it returns is closed, and PutLayer() wants @@ -705,7 +732,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si file, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY|os.O_EXCL, 0o600) if err != nil { diff.Close() - return fmt.Errorf("creating temporary file %q: %w", filename, err) + return false, fmt.Errorf("creating temporary file %q: %w", filename, err) } // Copy the data to the file. // TODO: This can take quite some time, and should ideally be cancellable using @@ -714,7 +741,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si diff.Close() file.Close() if err != nil { - return fmt.Errorf("storing blob to file %q: %w", filename, err) + return false, fmt.Errorf("storing blob to file %q: %w", filename, err) } // Make sure that we can find this file later, should we need the layer's // contents again. @@ -725,7 +752,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si // Read the cached blob and use it as a diff. file, err := os.Open(filename) if err != nil { - return fmt.Errorf("opening file %q: %w", filename, err) + return false, fmt.Errorf("opening file %q: %w", filename, err) } defer file.Close() // Build the new layer using the diff, regardless of where it came from. @@ -735,11 +762,11 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si UncompressedDigest: diffIDOrTOCDigest, }, file) if err != nil && !errors.Is(err, storage.ErrDuplicateID) { - return fmt.Errorf("adding layer with blob %q: %w", info.digest, err) + return false, fmt.Errorf("adding layer with blob %q: %w", info.digest, err) } s.indexToStorageID[index] = &layer.ID - return nil + return false, nil } // Commit marks the process of storing the image as successful and asks for the image to be persisted. @@ -786,11 +813,13 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t // Extract, commit, or find the layers. for i, blob := range layerBlobs { - if err := s.commitLayer(i, addedLayerInfo{ + if stopQueue, err := s.commitLayer(i, addedLayerInfo{ digest: blob.Digest, emptyLayer: blob.EmptyLayer, }, blob.Size); err != nil { return err + } else if stopQueue { + return fmt.Errorf("Internal error: storageImageDestination.Commit(): commitLayer() not ready to commit for layer %q", blob.Digest) } } var lastLayer string diff --git a/storage/storage_src.go b/storage/storage_src.go index d9d0bfd716..f8d615f125 100644 --- a/storage/storage_src.go +++ b/storage/storage_src.go @@ -42,6 +42,10 @@ type storageImageSource struct { getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // List of sizes of each signature slice + + // fromDigestToTOC converts from a layer diffID returned by LayerInfosForCopy to the TOC digest. The TOC digest + // must be used for lookups in the underyling storage. + fromDigestToTOC map[digest.Digest]digest.Digest } // newImageSource sets up an image for reading. @@ -65,6 +69,7 @@ func newImageSource(sys *types.SystemContext, imageRef storageReference) (*stora layerPosition: make(map[digest.Digest]int), SignatureSizes: []int{}, SignaturesSizes: make(map[digest.Digest][]int), + fromDigestToTOC: make(map[digest.Digest]digest.Digest), } image.Compat = impl.AddCompat(image) if img.Metadata != "" { @@ -91,6 +96,18 @@ func (s *storageImageSource) Close() error { func (s *storageImageSource) GetBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (rc io.ReadCloser, n int64, err error) { // We need a valid digest value. digest := info.Digest + + digestIsTOC := false + + // If the digest was overriden by LayerInfosForCopy, then we need to use the TOC digest + // to retrieve it from the storage. + s.getBlobMutex.Lock() + if v, ok := s.fromDigestToTOC[digest]; ok { + digest = v + digestIsTOC = true + } + s.getBlobMutex.Unlock() + err = digest.Validate() if err != nil { return nil, 0, err @@ -103,7 +120,12 @@ func (s *storageImageSource) GetBlob(ctx context.Context, info types.BlobInfo, c // Check if the blob corresponds to a diff that was used to initialize any layers. Our // callers should try to retrieve layers using their uncompressed digests, so no need to // check if they're using one of the compressed digests, which we can't reproduce anyway. - layers, _ := s.imageRef.transport.store.LayersByUncompressedDigest(digest) + var layers []storage.Layer + if digestIsTOC { + layers, _ = s.imageRef.transport.store.LayersByTOCDigest(digest) + } else { + layers, _ = s.imageRef.transport.store.LayersByUncompressedDigest(digest) + } // If it's not a layer, then it must be a data item. if len(layers) == 0 { @@ -273,8 +295,23 @@ func (s *storageImageSource) LayerInfosForCopy(ctx context.Context, instanceDige if layer.UncompressedSize < 0 { return nil, fmt.Errorf("uncompressed size for layer %q is unknown", layerID) } + + blobDigest := layer.UncompressedDigest + + if layer.Flags != nil { + if v, ok := layer.Flags[storage.ExpectedLayerDiffIDFlag]; ok { + if expectedDigest, ok := v.(string); ok { + // if the layer is stored by its TOC, report the expected diffID as the layer Digest + // but store the TOC digest so we can later retrieve it from the storage. + blobDigest = digest.Digest(expectedDigest) + s.getBlobMutex.Lock() + s.fromDigestToTOC[blobDigest] = layer.TOCDigest + s.getBlobMutex.Unlock() + } + } + } blobInfo := types.BlobInfo{ - Digest: layer.UncompressedDigest, + Digest: blobDigest, Size: layer.UncompressedSize, MediaType: uncompressedLayerType, }