From 9a87d2ecc1e4a6477fbbb6b35423cd161c9ddedd Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 1 Jun 2023 23:36:31 +0200 Subject: [PATCH] copy: do not always fail push if digest mismatches if the computed digest mismatches the expected one for a partial image, just log a warning and use the computed one. This is expected since partial images are stored by their TOC digest, not the diffID for the layer. Signed-off-by: Giuseppe Scrivano --- copy/blob.go | 15 ++++++--- copy/digesting_reader.go | 11 +++++-- copy/digesting_reader_test.go | 61 ++++++++++++++++++++++------------- copy/single.go | 19 +++++++---- 4 files changed, 70 insertions(+), 36 deletions(-) diff --git a/copy/blob.go b/copy/blob.go index f45b97f56c..9957022292 100644 --- a/copy/blob.go +++ b/copy/blob.go @@ -18,7 +18,7 @@ import ( // and returns a complete blobInfo of the copied blob. func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, srcInfo types.BlobInfo, getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer, - isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool) (types.BlobInfo, error) { + isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, incremental, emptyLayer bool) (types.BlobInfo, error) { // The copying happens through a pipeline of connected io.Readers; // that pipeline is built by updating stream. // === Input: srcReader @@ -27,13 +27,16 @@ func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Read info: srcInfo, } + // If the layer is a partial layer, we can rewrite the digest + canRewriteDigest := incremental + // === Process input through digestingReader to validate against the expected digest. // Be paranoid; in case PutBlob somehow managed to ignore an error from digestingReader, // use a separate validation failure indicator. // Note that for this check we don't use the stronger "validationSucceeded" indicator, because // dest.PutBlob may detect that the layer already exists, in which case we don't // read stream to the end, and validation does not happen. - digestingReader, err := newDigestingReader(stream.reader, srcInfo.Digest) + digestingReader, err := newDigestingReader(stream.reader, srcInfo.Digest, canRewriteDigest) if err != nil { return types.BlobInfo{}, fmt.Errorf("preparing to verify blob %s: %w", srcInfo.Digest, err) } @@ -128,8 +131,12 @@ func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Read } } - if digestingReader.validationFailed { // Coverage: This should never happen. - return types.BlobInfo{}, fmt.Errorf("Internal error writing blob %s, digest verification failed but was ignored", srcInfo.Digest) + if digestingReader.validationFailed { + if !canRewriteDigest { // Coverage: This should never happen. + return types.BlobInfo{}, fmt.Errorf("Internal error writing blob %s, digest verification failed", srcInfo.Digest) + } + logrus.Warningf("Digest verification failed for blob %s but was ignored", srcInfo.Digest) + return uploadedInfo, nil } if stream.info.Digest != "" && uploadedInfo.Digest != stream.info.Digest { return types.BlobInfo{}, fmt.Errorf("Internal error writing blob %s, blob with digest %s saved with digest %s", srcInfo.Digest, stream.info.Digest, uploadedInfo.Digest) diff --git a/copy/digesting_reader.go b/copy/digesting_reader.go index 901d10826f..24d5410504 100644 --- a/copy/digesting_reader.go +++ b/copy/digesting_reader.go @@ -15,12 +15,13 @@ type digestingReader struct { expectedDigest digest.Digest validationFailed bool validationSucceeded bool + canRewriteDigest bool } // newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error // or set validationSucceeded/validationFailed to true if the source stream does/does not match expectedDigest. // (neither is set if EOF is never reached). -func newDigestingReader(source io.Reader, expectedDigest digest.Digest) (*digestingReader, error) { +func newDigestingReader(source io.Reader, expectedDigest digest.Digest, canRewriteDigest bool) (*digestingReader, error) { var digester digest.Digester if err := expectedDigest.Validate(); err != nil { return nil, fmt.Errorf("Invalid digest specification %s", expectedDigest) @@ -37,6 +38,7 @@ func newDigestingReader(source io.Reader, expectedDigest digest.Digest) (*digest hash: digester.Hash(), expectedDigest: expectedDigest, validationFailed: false, + canRewriteDigest: canRewriteDigest, }, nil } @@ -54,9 +56,12 @@ func (d *digestingReader) Read(p []byte) (int, error) { actualDigest := d.digester.Digest() if actualDigest != d.expectedDigest { d.validationFailed = true - return 0, fmt.Errorf("Digest did not match, expected %s, got %s", d.expectedDigest, actualDigest) + if !d.canRewriteDigest { + return 0, fmt.Errorf("Digest did not match, expected %s, got %s", d.expectedDigest, actualDigest) + } + } else { + d.validationSucceeded = true } - d.validationSucceeded = true } return n, err } diff --git a/copy/digesting_reader_test.go b/copy/digesting_reader_test.go index 2e17437ae3..1ce42ae887 100644 --- a/copy/digesting_reader_test.go +++ b/copy/digesting_reader_test.go @@ -21,7 +21,7 @@ func TestNewDigestingReader(t *testing.T) { "sha256:0", // Invalid hex value "sha256:01", // Invalid length of hex value } { - _, err := newDigestingReader(source, input) + _, err := newDigestingReader(source, input, false) assert.Error(t, err, input.String()) } } @@ -37,41 +37,56 @@ func TestDigestingReaderRead(t *testing.T) { } // Valid input for _, c := range cases { - source := bytes.NewReader(c.input) - reader, err := newDigestingReader(source, c.digest) - require.NoError(t, err, c.digest.String()) - dest := bytes.Buffer{} - n, err := io.Copy(&dest, reader) - assert.NoError(t, err, c.digest.String()) - assert.Equal(t, int64(len(c.input)), n, c.digest.String()) - assert.Equal(t, c.input, dest.Bytes(), c.digest.String()) - assert.False(t, reader.validationFailed, c.digest.String()) - assert.True(t, reader.validationSucceeded, c.digest.String()) + for _, incremental := range []bool{false, true} { + source := bytes.NewReader(c.input) + reader, err := newDigestingReader(source, c.digest, incremental) + require.NoError(t, err, c.digest.String()) + dest := bytes.Buffer{} + n, err := io.Copy(&dest, reader) + assert.NoError(t, err, c.digest.String()) + assert.Equal(t, int64(len(c.input)), n, c.digest.String()) + assert.Equal(t, c.input, dest.Bytes(), c.digest.String()) + assert.False(t, reader.validationFailed, c.digest.String()) + assert.True(t, reader.validationSucceeded, c.digest.String()) + } } // Modified input for _, c := range cases { source := bytes.NewReader(bytes.Join([][]byte{c.input, []byte("x")}, nil)) - reader, err := newDigestingReader(source, c.digest) + reader, err := newDigestingReader(source, c.digest, false) require.NoError(t, err, c.digest.String()) dest := bytes.Buffer{} _, err = io.Copy(&dest, reader) assert.Error(t, err, c.digest.String()) assert.True(t, reader.validationFailed, c.digest.String()) assert.False(t, reader.validationSucceeded, c.digest.String()) + + // try with an incremental source + source = bytes.NewReader(bytes.Join([][]byte{c.input, []byte("x")}, nil)) + reader, err = newDigestingReader(source, c.digest, true) + require.NoError(t, err, c.digest.String()) + dest = bytes.Buffer{} + _, err = io.Copy(&dest, reader) + assert.NoError(t, err, c.digest.String()) + assert.True(t, reader.validationFailed, c.digest.String()) + assert.False(t, reader.validationSucceeded, c.digest.String()) + assert.NotEqual(t, c.digest.String(), reader.digester.Digest(), c.digest.String()) } // Truncated input for _, c := range cases { - source := bytes.NewReader(c.input) - reader, err := newDigestingReader(source, c.digest) - require.NoError(t, err, c.digest.String()) - if len(c.input) != 0 { - dest := bytes.Buffer{} - truncatedLen := int64(len(c.input) - 1) - n, err := io.CopyN(&dest, reader, truncatedLen) - assert.NoError(t, err, c.digest.String()) - assert.Equal(t, truncatedLen, n, c.digest.String()) + for _, incremental := range []bool{false, true} { + source := bytes.NewReader(c.input) + reader, err := newDigestingReader(source, c.digest, incremental) + require.NoError(t, err, c.digest.String()) + if len(c.input) != 0 { + dest := bytes.Buffer{} + truncatedLen := int64(len(c.input) - 1) + n, err := io.CopyN(&dest, reader, truncatedLen) + assert.NoError(t, err, c.digest.String()) + assert.Equal(t, truncatedLen, n, c.digest.String()) + } + assert.False(t, reader.validationFailed, c.digest.String()) + assert.False(t, reader.validationSucceeded, c.digest.String()) } - assert.False(t, reader.validationFailed, c.digest.String()) - assert.False(t, reader.validationSucceeded, c.digest.String()) } } diff --git a/copy/single.go b/copy/single.go index 037f7daff4..ac07b42c86 100644 --- a/copy/single.go +++ b/copy/single.go @@ -21,6 +21,7 @@ import ( "github.com/containers/image/v5/signature" "github.com/containers/image/v5/transports" "github.com/containers/image/v5/types" + "github.com/containers/storage/pkg/chunked" digest "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" @@ -405,7 +406,13 @@ func (ic *imageCopier) copyLayers(ctx context.Context) error { logrus.Debugf("Skipping foreign layer %q copy to %s", cld.destInfo.Digest, ic.c.dest.Reference().Transport().Name()) } } else { - cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool, index, srcRef, manifestLayerInfos[index].EmptyLayer) + toc, err := chunked.GetTOCDigest(manifestLayerInfos[index].Annotations) + if err != nil { + cld.err = err + } else { + incremental := toc != nil + cld.destInfo, cld.diffID, cld.err = ic.copyLayer(ctx, srcLayer, toEncrypt, pool, index, srcRef, incremental, manifestLayerInfos[index].EmptyLayer) + } } data[index] = cld } @@ -550,7 +557,7 @@ func (ic *imageCopier) copyConfig(ctx context.Context, src types.Image) error { return types.BlobInfo{}, fmt.Errorf("reading config blob %s: %w", srcInfo.Digest, err) } - destInfo, err := ic.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, true, false, bar, -1, false) + destInfo, err := ic.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, true, false, bar, -1, false, false) if err != nil { return types.BlobInfo{}, err } @@ -578,7 +585,7 @@ type diffIDResult struct { // copyLayer copies a layer with srcInfo (with known Digest and Annotations and possibly known Size) in src to dest, perhaps (de/re/)compressing it, // and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded // srcRef can be used as an additional hint to the destination during checking whether a layer can be reused but srcRef can be nil. -func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress, layerIndex int, srcRef reference.Named, emptyLayer bool) (types.BlobInfo, digest.Digest, error) { +func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, toEncrypt bool, pool *mpb.Progress, layerIndex int, srcRef reference.Named, incremental, emptyLayer bool) (types.BlobInfo, digest.Digest, error) { // If the srcInfo doesn't contain compression information, try to compute it from the // MediaType, which was either read from a manifest by way of LayerInfos() or constructed // by LayerInfosForCopy(), if it was supplied at all. If we succeed in copying the blob, @@ -696,7 +703,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } defer srcStream.Close() - blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer) + blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, incremental, emptyLayer) if err != nil { return types.BlobInfo{}, "", err } @@ -762,7 +769,7 @@ func updatedBlobInfoFromReuse(inputInfo types.BlobInfo, reusedBlob private.Reuse // perhaps (de/re/)compressing the stream, // and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller. func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, - diffIDIsNeeded bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool) (types.BlobInfo, <-chan diffIDResult, error) { + diffIDIsNeeded bool, toEncrypt bool, bar *progressBar, layerIndex int, incremental, emptyLayer bool) (types.BlobInfo, <-chan diffIDResult, error) { var getDiffIDRecorder func(compressiontypes.DecompressorFunc) io.Writer // = nil var diffIDChan chan diffIDResult @@ -787,7 +794,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea } } - blobInfo, err := ic.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, false, toEncrypt, bar, layerIndex, emptyLayer) // Sets err to nil on success + blobInfo, err := ic.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, false, toEncrypt, bar, layerIndex, incremental, emptyLayer) // Sets err to nil on success return blobInfo, diffIDChan, err // We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan }