Skip to content

Commit

Permalink
Merge pull request #1084 from giuseppe/zstd-chunked
Browse files Browse the repository at this point in the history
pkg/compression: new zstd variant zstd:chunked
  • Loading branch information
rhatdan authored Jul 7, 2021
2 parents 008a60e + be50ba7 commit eeb9819
Show file tree
Hide file tree
Showing 10 changed files with 503 additions and 49 deletions.
147 changes: 121 additions & 26 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type copier struct {
ociEncryptConfig *encconfig.EncryptConfig
maxParallelDownloads uint
downloadForeignLayers bool
fetchPartialBlobs bool
}

// imageCopier tracks state specific to a single image (possibly an item of a manifest list)
Expand Down Expand Up @@ -194,15 +195,21 @@ type Options struct {
// OciDecryptConfig contains the config that can be used to decrypt an image if it is
// encrypted if non-nil. If nil, it does not attempt to decrypt an image.
OciDecryptConfig *encconfig.DecryptConfig

// MaxParallelDownloads indicates the maximum layers to pull at the same time. A reasonable default is used if this is left as 0.
MaxParallelDownloads uint

// When OptimizeDestinationImageAlreadyExists is set, optimize the copy assuming that the destination image already
// exists (and is equivalent). Making the eventual (no-op) copy more performant for this case. Enabling the option
// is slightly pessimistic if the destination image doesn't exist, or is not equivalent.
OptimizeDestinationImageAlreadyExists bool

// Download layer contents with "nondistributable" media types ("foreign" layers) and translate the layer media type
// to not indicate "nondistributable".
DownloadForeignLayers bool

// FetchPartialBlobs indicates whether to attempt to fetch the blob partially. Experimental.
FetchPartialBlobs bool
}

// validateImageListSelection returns an error if the passed-in value is not one that we recognize as a valid ImageListSelection value
Expand Down Expand Up @@ -283,6 +290,7 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef,
ociEncryptConfig: options.OciEncryptConfig,
maxParallelDownloads: options.MaxParallelDownloads,
downloadForeignLayers: options.DownloadForeignLayers,
fetchPartialBlobs: options.FetchPartialBlobs,
}
// Default to using gzip compression unless specified otherwise.
if options.DestinationCtx == nil || options.DestinationCtx.CompressionFormat == nil {
Expand Down Expand Up @@ -1072,9 +1080,25 @@ func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {
}
}

// customPartialBlobCounter provides a decorator function for the partial blobs retrieval progress bar
func customPartialBlobCounter(filler interface{}, wcc ...decor.WC) decor.Decorator {
producer := func(filler interface{}) decor.DecorFunc {
return func(s decor.Statistics) string {
if s.Total == 0 {
pairFmt := "%.1f / %.1f (skipped: %.1f)"
return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill))
}
pairFmt := "%.1f / %.1f (skipped: %.1f = %.2f%%)"
percentage := 100.0 * float64(s.Refill) / float64(s.Total)
return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill), percentage)
}
}
return decor.Any(producer(filler), wcc...)
}

// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter
// is ioutil.Discard, the progress bar's output will be discarded
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
func (c *copier) createProgressBar(pool *mpb.Progress, partial bool, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12

Expand All @@ -1091,18 +1115,30 @@ func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind
// Use a normal progress bar when we know the size (i.e., size > 0).
// Otherwise, use a spinner to indicate that something's happening.
var bar *mpb.Bar
sstyle := mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft()
if info.Size > 0 {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""),
),
)
if partial {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
customPartialBlobCounter(sstyle.Build()),
),
)
} else {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""),
),
)
}
} else {
sstyle := mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft()
bar = pool.Add(0,
sstyle.Build(),
mpb.BarFillerClearOnComplete(),
Expand All @@ -1129,7 +1165,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
bar := c.createProgressBar(progressPool, false, srcInfo, "config", "done")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, false, bar, -1, false)
if err != nil {
return types.BlobInfo{}, err
Expand Down Expand Up @@ -1217,7 +1253,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists")
bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "skipped: already exists")
bar.SetTotal(0, true)

// Throw an event that the layer has been skipped
Expand All @@ -1244,14 +1280,57 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
}

// A partial pull is managed by the destination storage, that decides what portions
// of the source file are not known yet and must be fetched.
// Attempt a partial only when the source allows to retrieve a blob partially and
// the destination has support for it.
imgSource, okSource := ic.c.rawSource.(internalTypes.ImageSourceSeekable)
imgDest, okDest := ic.c.dest.(internalTypes.ImageDestinationPartial)
if ic.c.fetchPartialBlobs && okSource && okDest && !diffIDIsNeeded {
bar := ic.c.createProgressBar(pool, true, srcInfo, "blob", "done")

progress := make(chan int64)
terminate := make(chan interface{})

defer close(terminate)
defer close(progress)

proxy := imageSourceSeekableProxy{
source: imgSource,
progress: progress,
}
go func() {
for {
select {
case written := <-progress:
bar.IncrInt64(written)
case <-terminate:
return
}
}
}()

bar.SetTotal(srcInfo.Size, false)
info, err := imgDest.PutBlobPartial(ctx, proxy, srcInfo, ic.c.blobInfoCache)
if err == nil {
bar.SetRefill(srcInfo.Size - bar.Current())
bar.SetCurrent(srcInfo.Size)
bar.SetTotal(srcInfo.Size, true)
logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest)
return info, cachedDiffID, nil
}
bar.Abort(true)
logrus.Errorf("Failed to retrieve partial blob: %v", err)
}

// Fallback: copy the layer, computing the diffID if we need to do so
srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "reading blob %s", srcInfo.Digest)
}
defer srcStream.Close()

bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")
bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "done")

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer)
if err != nil {
Expand Down Expand Up @@ -1425,6 +1504,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
originalLayerReader = destStream
}

compressionMetadata := map[string]string{}
// === Deal with layer compression/decompression if necessary
// WARNING: If you are adding new reasons to change the blob, update also the OptimizeDestinationImageAlreadyExists
// short-circuit conditions
Expand Down Expand Up @@ -1453,7 +1533,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
// If this fails while writing data, it will do pipeWriter.CloseWithError(); if it fails otherwise,
// e.g. because we have exited and due to pipeReader.Close() above further writing to the pipe has failed,
// we don’t care.
go c.compressGoroutine(pipeWriter, destStream, *uploadCompressionFormat) // Closes pipeWriter
go c.compressGoroutine(pipeWriter, destStream, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter
destStream = pipeReader
inputInfo.Digest = ""
inputInfo.Size = -1
Expand All @@ -1473,7 +1553,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
pipeReader, pipeWriter := io.Pipe()
defer pipeReader.Close()

go c.compressGoroutine(pipeWriter, s, *uploadCompressionFormat) // Closes pipeWriter
go c.compressGoroutine(pipeWriter, s, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter

destStream = pipeReader
inputInfo.Digest = ""
Expand Down Expand Up @@ -1640,23 +1720,38 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
c.blobInfoCache.RecordDigestCompressorName(srcInfo.Digest, srcCompressorName)
}
}

// Copy all the metadata generated by the compressor into the annotations.
if uploadedInfo.Annotations == nil {
uploadedInfo.Annotations = map[string]string{}
}
for k, v := range compressionMetadata {
uploadedInfo.Annotations[k] = v
}

return uploadedInfo, nil
}

// compressGoroutine reads all input from src and writes its compressed equivalent to dest.
func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, compressionFormat compression.Algorithm) {
err := errors.New("Internal error: unexpected panic in compressGoroutine")
defer func() { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily.
_ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil
}()

compressor, err := compression.CompressStream(dest, compressionFormat, c.compressionLevel)
// doCompression reads all input from src and writes its compressed equivalent to dest.
func doCompression(dest io.Writer, src io.Reader, metadata map[string]string, compressionFormat compression.Algorithm, compressionLevel *int) error {
compressor, err := compression.CompressStreamWithMetadata(dest, metadata, compressionFormat, compressionLevel)
if err != nil {
return
return err
}
defer compressor.Close()

buf := make([]byte, compressionBufferSize)

_, err = io.CopyBuffer(compressor, src, buf) // Sets err to nil, i.e. causes dest.Close()
if err != nil {
compressor.Close()
return err
}

return compressor.Close()
}

// compressGoroutine reads all input from src and writes its compressed equivalent to dest.
func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, metadata map[string]string, compressionFormat compression.Algorithm) {
err := doCompression(dest, src, metadata, compressionFormat, c.compressionLevel)
_ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil
}
25 changes: 25 additions & 0 deletions copy/progress_reader.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package copy

import (
"context"
"io"
"time"

internalTypes "github.com/containers/image/v5/internal/types"
"github.com/containers/image/v5/types"
)

Expand Down Expand Up @@ -77,3 +79,26 @@ func (r *progressReader) Read(p []byte) (int, error) {
}
return n, err
}

// imageSourceSeekableProxy wraps ImageSourceSeekable and keeps track of how many bytes
// are received.
type imageSourceSeekableProxy struct {
// source is the seekable input to read from.
source internalTypes.ImageSourceSeekable
// progress is the chan where the total number of bytes read so far are reported.
progress chan int64
}

// GetBlobAt reads from the ImageSourceSeekable and report how many bytes were received
// to the progress chan.
func (s imageSourceSeekableProxy) GetBlobAt(ctx context.Context, bInfo types.BlobInfo, chunks []internalTypes.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
rc, errs, err := s.source.GetBlobAt(ctx, bInfo, chunks)
if err == nil {
total := int64(0)
for _, c := range chunks {
total += int64(c.Length)
}
s.progress <- total
}
return rc, errs, err
}
Loading

0 comments on commit eeb9819

Please sign in to comment.