Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Edit the manifest when pushing uncompressed data from c/storage #2273

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 56 additions & 17 deletions copy/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,27 @@ func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobI

// bpCompressionStepData contains data that the copy pipeline needs about the compression step.
type bpCompressionStepData struct {
operation types.LayerCompression // Operation to use for updating the blob metadata.
operation bpcOperation // What we are actually doing
uploadedOperation types.LayerCompression // Operation to use for updating the blob metadata (matching the end state, not necessarily what we do)
uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits.
uploadedAnnotations map[string]string // Annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed.
srcCompressorName string // Compressor name to record in the blob info cache for the source blob.
uploadedCompressorName string // Compressor name to record in the blob info cache for the uploaded blob.
closers []io.Closer // Objects to close after the upload is done, if any.
}

type bpcOperation int

const (
bpcOpInvalid bpcOperation = iota
bpcOpPreserveOpaque // We are preserving something where compression is not applicable
bpcOpPreserveCompressed // We are preserving a compressed, and decompressible, layer
bpcOpPreserveUncompressed // We are preserving an uncompressed, and compressible, layer
bpcOpCompressUncompressed // We are compressing uncompressed data
bpcOpRecompressCompressed // We are recompressing compressed data
bpcOpDecompressCompressed // We are decompressing compressed data
)

// blobPipelineCompressionStep updates *stream to compress and/or decompress it.
// srcInfo is primarily used for error messages.
// Returns data for other steps; the caller should eventually call updateCompressionEdits and perhaps recordValidatedBlobData,
Expand Down Expand Up @@ -112,10 +125,11 @@ func (ic *imageCopier) blobPipelineCompressionStep(stream *sourceStream, canModi
// bpcPreserveEncrypted checks if the input is encrypted, and returns a *bpCompressionStepData if so.
func (ic *imageCopier) bpcPreserveEncrypted(stream *sourceStream, _ bpDetectCompressionStepData) (*bpCompressionStepData, error) {
if isOciEncrypted(stream.info.MediaType) {
// We can’t do anything with an encrypted blob unless decrypted.
logrus.Debugf("Using original blob without modification for encrypted blob")
// PreserveOriginal due to any compression not being able to be done on an encrypted blob unless decrypted
return &bpCompressionStepData{
operation: types.PreserveOriginal,
operation: bpcOpPreserveOpaque,
uploadedOperation: types.PreserveOriginal,
uploadedAlgorithm: nil,
srcCompressorName: internalblobinfocache.UnknownCompression,
uploadedCompressorName: internalblobinfocache.UnknownCompression,
Expand Down Expand Up @@ -143,7 +157,8 @@ func (ic *imageCopier) bpcCompressUncompressed(stream *sourceStream, detected bp
Size: -1,
}
return &bpCompressionStepData{
operation: types.Compress,
operation: bpcOpCompressUncompressed,
uploadedOperation: types.Compress,
uploadedAlgorithm: uploadedAlgorithm,
uploadedAnnotations: annotations,
srcCompressorName: detected.srcCompressorName,
Expand Down Expand Up @@ -182,7 +197,8 @@ func (ic *imageCopier) bpcRecompressCompressed(stream *sourceStream, detected bp
}
succeeded = true
return &bpCompressionStepData{
operation: types.PreserveOriginal,
operation: bpcOpRecompressCompressed,
uploadedOperation: types.PreserveOriginal,
uploadedAlgorithm: ic.compressionFormat,
uploadedAnnotations: annotations,
srcCompressorName: detected.srcCompressorName,
Expand All @@ -208,7 +224,8 @@ func (ic *imageCopier) bpcDecompressCompressed(stream *sourceStream, detected bp
Size: -1,
}
return &bpCompressionStepData{
operation: types.Decompress,
operation: bpcOpDecompressCompressed,
uploadedOperation: types.Decompress,
uploadedAlgorithm: nil,
srcCompressorName: detected.srcCompressorName,
uploadedCompressorName: internalblobinfocache.Uncompressed,
Expand All @@ -232,14 +249,26 @@ func (ic *imageCopier) bpcPreserveOriginal(_ *sourceStream, detected bpDetectCom
// But don’t touch blobs in objects where we can’t change compression,
// so that src.UpdatedImage() doesn’t fail; assume that for such blobs
// LayerInfosForCopy() should not be making any changes in the first place.
var bpcOp bpcOperation
var uploadedOp types.LayerCompression
var algorithm *compressiontypes.Algorithm
if layerCompressionChangeSupported && detected.isCompressed {
switch {
case !layerCompressionChangeSupported:
bpcOp = bpcOpPreserveOpaque
uploadedOp = types.PreserveOriginal
algorithm = nil
case detected.isCompressed:
bpcOp = bpcOpPreserveCompressed
uploadedOp = types.PreserveOriginal
algorithm = &detected.format
} else {
default:
bpcOp = bpcOpPreserveUncompressed
uploadedOp = types.Decompress
algorithm = nil
}
return &bpCompressionStepData{
operation: types.PreserveOriginal,
operation: bpcOp,
uploadedOperation: uploadedOp,
uploadedAlgorithm: algorithm,
srcCompressorName: detected.srcCompressorName,
uploadedCompressorName: detected.srcCompressorName,
Expand All @@ -248,7 +277,7 @@ func (ic *imageCopier) bpcPreserveOriginal(_ *sourceStream, detected bpDetectCom

// updateCompressionEdits sets *operation, *algorithm and updates *annotations, if necessary.
func (d *bpCompressionStepData) updateCompressionEdits(operation *types.LayerCompression, algorithm **compressiontypes.Algorithm, annotations *map[string]string) {
*operation = d.operation
*operation = d.uploadedOperation
// If we can modify the layer's blob, set the desired algorithm for it to be set in the manifest.
*algorithm = d.uploadedAlgorithm
if *annotations == nil {
Expand All @@ -257,7 +286,8 @@ func (d *bpCompressionStepData) updateCompressionEdits(operation *types.LayerCom
maps.Copy(*annotations, d.uploadedAnnotations)
}

// recordValidatedBlobData updates b.blobInfoCache with data about the created uploadedInfo adnd the original srcInfo.
// recordValidatedBlobData updates b.blobInfoCache with data about the created uploadedInfo (as returned by PutBlob)
// and the original srcInfo (which the caller guarantees has been validated).
// This must ONLY be called if all data has been validated by OUR code, and is not coming from third parties.
func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInfo types.BlobInfo, srcInfo types.BlobInfo,
encryptionStep *bpEncryptionStepData, decryptionStep *bpDecryptionStepData) error {
Expand All @@ -268,17 +298,26 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf
// in the blob info cache (which would probably be necessary for any more complex logic),
// and the simplicity is attractive.
if !encryptionStep.encrypting && !decryptionStep.decrypting {
// If d.operation != types.PreserveOriginal, we now have two reliable digest values:
// If d.operation != bpcOpPreserve*, we now have two reliable digest values:
// srcinfo.Digest describes the pre-d.operation input, verified by digestingReader
// uploadedInfo.Digest describes the post-d.operation output, computed by PutBlob
// (because stream.info.Digest == "", this must have been computed afresh).
// (because we set stream.info.Digest == "", this must have been computed afresh).
switch d.operation {
case types.PreserveOriginal:
break // Do nothing, we have only one digest and we might not have even verified it.
case types.Compress:
case bpcOpPreserveOpaque:
// No useful information
case bpcOpCompressUncompressed:
c.blobInfoCache.RecordDigestUncompressedPair(uploadedInfo.Digest, srcInfo.Digest)
case types.Decompress:
case bpcOpDecompressCompressed:
c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, uploadedInfo.Digest)
case bpcOpRecompressCompressed, bpcOpPreserveCompressed:
// We know one or two compressed digests. BlobInfoCache associates compression variants via the uncompressed digest,
// and we don’t know that one.
// That also means that repeated copies with the same recompression don’t identify reuse opportunities (unless
// RecordDigestUncompressedPair was called for both compressed variants for some other reason).
case bpcOpPreserveUncompressed:
c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, srcInfo.Digest)
case bpcOpInvalid:
fallthrough
default:
return fmt.Errorf("Internal error: Unexpected d.operation value %#v", d.operation)
}
Expand Down
19 changes: 12 additions & 7 deletions copy/single.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (ic *imageCopier) compareImageDestinationManifestEqual(ctx context.Context,

compressionAlgos := set.New[string]()
for _, srcInfo := range ic.src.LayerInfos() {
if c := compressionAlgorithmFromMIMEType(srcInfo); c != nil {
if _, c := compressionEditsFromMIMEType(srcInfo); c != nil {
compressionAlgos.Add(c.Name())
}
}
Expand Down Expand Up @@ -636,17 +636,22 @@ type diffIDResult struct {
err error
}

func compressionAlgorithmFromMIMEType(srcInfo types.BlobInfo) *compressiontypes.Algorithm {
// compressionEditsFromMIMEType returns a (CompressionOperation, CompressionAlgorithm) value pair suitable
// for types.BlobInfo, based on a MIME type of srcInfo.
func compressionEditsFromMIMEType(srcInfo types.BlobInfo) (types.LayerCompression, *compressiontypes.Algorithm) {
// This MIME type → compression mapping belongs in manifest-specific code in our manifest
// package (but we should preferably replace/change UpdatedImage instead of productizing
// this workaround).
switch srcInfo.MediaType {
case manifest.DockerV2Schema2LayerMediaType, imgspecv1.MediaTypeImageLayerGzip:
return &compression.Gzip
return types.PreserveOriginal, &compression.Gzip
case imgspecv1.MediaTypeImageLayerZstd:
return &compression.Zstd
return types.PreserveOriginal, &compression.Zstd
case manifest.DockerV2SchemaLayerMediaTypeUncompressed, imgspecv1.MediaTypeImageLayer:
return types.Decompress, nil
default:
return types.PreserveOriginal, nil
}
return nil
}

// copyLayer copies a layer with srcInfo (with known Digest and Annotations and possibly known Size) in src to dest, perhaps (de/re/)compressing it,
Expand All @@ -660,8 +665,8 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
// which uses the compression information to compute the updated MediaType values.
// (Sadly UpdatedImage() is documented to not update MediaTypes from
// ManifestUpdateOptions.LayerInfos[].MediaType, so we are doing it indirectly.)
if srcInfo.CompressionAlgorithm == nil {
srcInfo.CompressionAlgorithm = compressionAlgorithmFromMIMEType(srcInfo)
if srcInfo.CompressionOperation == types.PreserveOriginal && srcInfo.CompressionAlgorithm == nil {
srcInfo.CompressionOperation, srcInfo.CompressionAlgorithm = compressionEditsFromMIMEType(srcInfo)
}

ic.c.printCopyInfo("blob", srcInfo)
Expand Down
4 changes: 2 additions & 2 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ type BlobInfo struct {
// CompressionOperation is used in Image.UpdateLayerInfos to instruct
// whether the original layer's "compressed or not" should be preserved,
// possibly while changing the compression algorithm from one to another,
// or if it should be compressed or decompressed. The field defaults to
// preserve the original layer's compressedness.
// or if it should be changed to compressed or decompressed.
// The field defaults to preserve the original layer's compressedness.
// TODO: To remove together with CryptoOperation in re-design to remove
// field out of BlobInfo.
CompressionOperation LayerCompression
Expand Down