From 48f39f6f02efbf8332ceeb321ee6c4c9d64da4a1 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 18 Nov 2020 09:45:51 +0100 Subject: [PATCH 01/12] compression: change return type for zstdWriterWithLevel Signed-off-by: Giuseppe Scrivano --- pkg/compression/zstd.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/compression/zstd.go b/pkg/compression/zstd.go index 962fe9676..867d3ddbc 100644 --- a/pkg/compression/zstd.go +++ b/pkg/compression/zstd.go @@ -40,7 +40,7 @@ func zstdWriter(dest io.Writer) (io.WriteCloser, error) { return zstd.NewWriter(dest) } -func zstdWriterWithLevel(dest io.Writer, level int) (io.WriteCloser, error) { +func zstdWriterWithLevel(dest io.Writer, level int) (*zstd.Encoder, error) { el := zstd.EncoderLevelFromZstd(level) return zstd.NewWriter(dest, zstd.WithEncoderLevel(el)) } From b167b4b6f2b7afd2c17b23d310497cb6df7716a6 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Sat, 28 Nov 2020 16:32:22 +0100 Subject: [PATCH 02/12] compression: support generating compressor metadata Signed-off-by: Giuseppe Scrivano --- copy/copy.go | 18 ++++++++++++++---- pkg/compression/compression.go | 15 +++++++++++---- pkg/compression/internal/types.go | 2 +- pkg/compression/zstd.go | 2 +- 4 files changed, 27 insertions(+), 10 deletions(-) diff --git a/copy/copy.go b/copy/copy.go index 57d952d7c..73c63d287 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -1425,6 +1425,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr originalLayerReader = destStream } + compressionMetadata := map[string]string{} // === Deal with layer compression/decompression if necessary // WARNING: If you are adding new reasons to change the blob, update also the OptimizeDestinationImageAlreadyExists // short-circuit conditions @@ -1453,7 +1454,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr // If this fails while writing data, it will do pipeWriter.CloseWithError(); if it fails otherwise, // e.g. because we have exited and due to pipeReader.Close() above further writing to the pipe has failed, // we don’t care. - go c.compressGoroutine(pipeWriter, destStream, *uploadCompressionFormat) // Closes pipeWriter + go c.compressGoroutine(pipeWriter, destStream, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter destStream = pipeReader inputInfo.Digest = "" inputInfo.Size = -1 @@ -1473,7 +1474,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr pipeReader, pipeWriter := io.Pipe() defer pipeReader.Close() - go c.compressGoroutine(pipeWriter, s, *uploadCompressionFormat) // Closes pipeWriter + go c.compressGoroutine(pipeWriter, s, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter destStream = pipeReader inputInfo.Digest = "" @@ -1640,17 +1641,26 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr c.blobInfoCache.RecordDigestCompressorName(srcInfo.Digest, srcCompressorName) } } + + // Copy all the metadata generated by the compressor into the annotations. + if uploadedInfo.Annotations == nil { + uploadedInfo.Annotations = map[string]string{} + } + for k, v := range compressionMetadata { + uploadedInfo.Annotations[k] = v + } + return uploadedInfo, nil } // compressGoroutine reads all input from src and writes its compressed equivalent to dest. -func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, compressionFormat compression.Algorithm) { +func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, metadata map[string]string, compressionFormat compression.Algorithm) { err := errors.New("Internal error: unexpected panic in compressGoroutine") defer func() { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily. _ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil }() - compressor, err := compression.CompressStream(dest, compressionFormat, c.compressionLevel) + compressor, err := compression.CompressStreamWithMetadata(dest, metadata, compressionFormat, c.compressionLevel) if err != nil { return } diff --git a/pkg/compression/compression.go b/pkg/compression/compression.go index 015757061..d6b46f149 100644 --- a/pkg/compression/compression.go +++ b/pkg/compression/compression.go @@ -69,7 +69,7 @@ func XzDecompressor(r io.Reader) (io.ReadCloser, error) { } // gzipCompressor is a CompressorFunc for the gzip compression algorithm. -func gzipCompressor(r io.Writer, level *int) (io.WriteCloser, error) { +func gzipCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) { if level != nil { return pgzip.NewWriterLevel(r, *level) } @@ -77,18 +77,25 @@ func gzipCompressor(r io.Writer, level *int) (io.WriteCloser, error) { } // bzip2Compressor is a CompressorFunc for the bzip2 compression algorithm. -func bzip2Compressor(r io.Writer, level *int) (io.WriteCloser, error) { +func bzip2Compressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) { return nil, fmt.Errorf("bzip2 compression not supported") } // xzCompressor is a CompressorFunc for the xz compression algorithm. -func xzCompressor(r io.Writer, level *int) (io.WriteCloser, error) { +func xzCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) { return xz.NewWriter(r) } // CompressStream returns the compressor by its name func CompressStream(dest io.Writer, algo Algorithm, level *int) (io.WriteCloser, error) { - return internal.AlgorithmCompressor(algo)(dest, level) + m := map[string]string{} + return internal.AlgorithmCompressor(algo)(dest, m, level) +} + +// CompressStreamWithMetadata returns the compressor by its name. If the compression +// generates any metadata, it is written to the provided metadata map. +func CompressStreamWithMetadata(dest io.Writer, metadata map[string]string, algo Algorithm, level *int) (io.WriteCloser, error) { + return internal.AlgorithmCompressor(algo)(dest, metadata, level) } // DetectCompressionFormat returns an Algorithm and DecompressorFunc if the input is recognized as a compressed format, an invalid diff --git a/pkg/compression/internal/types.go b/pkg/compression/internal/types.go index 6092a9517..d2b023c9b 100644 --- a/pkg/compression/internal/types.go +++ b/pkg/compression/internal/types.go @@ -4,7 +4,7 @@ import "io" // CompressorFunc writes the compressed stream to the given writer using the specified compression level. // The caller must call Close() on the stream (even if the input stream does not need closing!). -type CompressorFunc func(io.Writer, *int) (io.WriteCloser, error) +type CompressorFunc func(io.Writer, map[string]string, *int) (io.WriteCloser, error) // DecompressorFunc returns the decompressed stream, given a compressed stream. // The caller must call Close() on the decompressed stream (even if the compressed input stream does not need closing!). diff --git a/pkg/compression/zstd.go b/pkg/compression/zstd.go index 867d3ddbc..39ae014d2 100644 --- a/pkg/compression/zstd.go +++ b/pkg/compression/zstd.go @@ -46,7 +46,7 @@ func zstdWriterWithLevel(dest io.Writer, level int) (*zstd.Encoder, error) { } // zstdCompressor is a CompressorFunc for the zstd compression algorithm. -func zstdCompressor(r io.Writer, level *int) (io.WriteCloser, error) { +func zstdCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) { if level == nil { return zstdWriter(r) } From 7421a4899498f08929e13877482b3484001080a3 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Mon, 30 Nov 2020 14:51:49 +0100 Subject: [PATCH 03/12] compression: let algorithms register a MIME type Signed-off-by: Giuseppe Scrivano --- manifest/common.go | 2 +- pkg/compression/compression.go | 8 ++++---- pkg/compression/internal/types.go | 9 ++++++++- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/manifest/common.go b/manifest/common.go index 3ece948a0..6008812cc 100644 --- a/manifest/common.go +++ b/manifest/common.go @@ -68,7 +68,7 @@ func compressionVariantMIMEType(variantTable []compressionMIMETypeSet, mimeType if mt == mimeType { // Found the variant name := mtsUncompressed if algorithm != nil { - name = algorithm.Name() + name = algorithm.MIME() } if res, ok := variants[name]; ok { if res != mtsUnsupportedMIMEType { diff --git a/pkg/compression/compression.go b/pkg/compression/compression.go index d6b46f149..2a42eb3ea 100644 --- a/pkg/compression/compression.go +++ b/pkg/compression/compression.go @@ -20,13 +20,13 @@ type Algorithm = types.Algorithm var ( // Gzip compression. - Gzip = internal.NewAlgorithm("gzip", []byte{0x1F, 0x8B, 0x08}, GzipDecompressor, gzipCompressor) + Gzip = internal.NewAlgorithm("gzip", "gzip", []byte{0x1F, 0x8B, 0x08}, GzipDecompressor, gzipCompressor) // Bzip2 compression. - Bzip2 = internal.NewAlgorithm("bzip2", []byte{0x42, 0x5A, 0x68}, Bzip2Decompressor, bzip2Compressor) + Bzip2 = internal.NewAlgorithm("bzip2", "bzip2", []byte{0x42, 0x5A, 0x68}, Bzip2Decompressor, bzip2Compressor) // Xz compression. - Xz = internal.NewAlgorithm("Xz", []byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, XzDecompressor, xzCompressor) + Xz = internal.NewAlgorithm("Xz", "xz", []byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, XzDecompressor, xzCompressor) // Zstd compression. - Zstd = internal.NewAlgorithm("zstd", []byte{0x28, 0xb5, 0x2f, 0xfd}, ZstdDecompressor, zstdCompressor) + Zstd = internal.NewAlgorithm("zstd", "zstd", []byte{0x28, 0xb5, 0x2f, 0xfd}, ZstdDecompressor, zstdCompressor) compressionAlgorithms = map[string]Algorithm{ Gzip.Name(): Gzip, diff --git a/pkg/compression/internal/types.go b/pkg/compression/internal/types.go index d2b023c9b..8b16e527a 100644 --- a/pkg/compression/internal/types.go +++ b/pkg/compression/internal/types.go @@ -13,6 +13,7 @@ type DecompressorFunc func(io.Reader) (io.ReadCloser, error) // Algorithm is a compression algorithm that can be used for CompressStream. type Algorithm struct { name string + mime string prefix []byte decompressor DecompressorFunc compressor CompressorFunc @@ -21,9 +22,10 @@ type Algorithm struct { // NewAlgorithm creates an Algorithm instance. // This function exists so that Algorithm instances can only be created by code that // is allowed to import this internal subpackage. -func NewAlgorithm(name string, prefix []byte, decompressor DecompressorFunc, compressor CompressorFunc) Algorithm { +func NewAlgorithm(name, mime string, prefix []byte, decompressor DecompressorFunc, compressor CompressorFunc) Algorithm { return Algorithm{ name: name, + mime: mime, prefix: prefix, decompressor: decompressor, compressor: compressor, @@ -35,6 +37,11 @@ func (c Algorithm) Name() string { return c.name } +// Name returns the MIME type to use for the compression algorithm. +func (c Algorithm) MIME() string { + return c.mime +} + // AlgorithmCompressor returns the compressor field of algo. // This is a function instead of a public method so that it is only callable from by code // that is allowed to import this internal subpackage. From 40848f40c8c3b6f8d0669fe3ab6469ba1a7cccc1 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 18 Nov 2020 09:48:35 +0100 Subject: [PATCH 04/12] pkg/compression: new zstd variant zstd:chunked add a new custom variant of the zstd compression that permits to retrieve each file separately. The idea is based on CRFS and its stargz format for having seekable and indexable tarballs. One disadvantage of the stargz format is that a custom file is added to the tarball to store the layer metadata, and the metadata file is part of the image itself. Clients that are not aware of the stargz format will propagate the metadata file inside of the containers. The zstd compression supports embeddeding additional data as part of the stream that the zstd decompressor will ignore (skippable frame), so the issue above with CRFS can be solved directly within the zstd compression format. Beside this minor advantage, zstd is much faster and compresses better than gzip, so take this opportunity to push the zstd format further. The zstd compression is supported by the OCI image specs since August 2019: https://github.com/opencontainers/image-spec/pull/788 and has been supported by containers/image since then. Clients that are not aware of the zstd:chunked format, won't notice any difference when handling a blob that uses the variant. Signed-off-by: Giuseppe Scrivano --- pkg/compression/compression.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/compression/compression.go b/pkg/compression/compression.go index 2a42eb3ea..718b50c05 100644 --- a/pkg/compression/compression.go +++ b/pkg/compression/compression.go @@ -9,6 +9,7 @@ import ( "github.com/containers/image/v5/pkg/compression/internal" "github.com/containers/image/v5/pkg/compression/types" + "github.com/containers/storage/pkg/chunked" "github.com/klauspost/pgzip" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -27,12 +28,15 @@ var ( Xz = internal.NewAlgorithm("Xz", "xz", []byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, XzDecompressor, xzCompressor) // Zstd compression. Zstd = internal.NewAlgorithm("zstd", "zstd", []byte{0x28, 0xb5, 0x2f, 0xfd}, ZstdDecompressor, zstdCompressor) + // Zstd:chunked compression. + ZstdChunked = internal.NewAlgorithm("zstd:chunked", "zstd", []byte{0x28, 0xb5, 0x2f, 0xfd}, ZstdDecompressor, chunked.ZstdCompressor) compressionAlgorithms = map[string]Algorithm{ - Gzip.Name(): Gzip, - Bzip2.Name(): Bzip2, - Xz.Name(): Xz, - Zstd.Name(): Zstd, + Gzip.Name(): Gzip, + Bzip2.Name(): Bzip2, + Xz.Name(): Xz, + Zstd.Name(): Zstd, + ZstdChunked.Name(): ZstdChunked, } ) From e8ce5b81a84fa3710399e1b43d0d848015c4a572 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 18 Nov 2020 09:42:54 +0100 Subject: [PATCH 05/12] types: add interface for partial blob retrieval Signed-off-by: Giuseppe Scrivano --- internal/types/types.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/internal/types/types.go b/internal/types/types.go index 4a863ba34..4c9a7b09f 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -58,3 +58,17 @@ type TryReusingBlobOptions struct { // The reference of the image that contains the target blob. SrcRef reference.Named } + +// ImageSourceChunk is a portion of a blob. +// This API is experimental and can be changed without bumping the major version number. +type ImageSourceChunk struct { + Offset uint64 + Length uint64 +} + +// ImageSourceSeekable is an image source that permits to fetch chunks of the entire blob. +// This API is experimental and can be changed without bumping the major version number. +type ImageSourceSeekable interface { + // GetBlobAt returns a stream for the specified blob. + GetBlobAt(context.Context, publicTypes.BlobInfo, []ImageSourceChunk) (chan io.ReadCloser, chan error, error) +} From 05f70a3c24b6ba48743e2c6b842d55b6bcab6260 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 18 Nov 2020 09:44:28 +0100 Subject: [PATCH 06/12] types: add interface for storing partial blob Signed-off-by: Giuseppe Scrivano --- internal/types/types.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/types/types.go b/internal/types/types.go index 4c9a7b09f..079adfcd3 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -72,3 +72,10 @@ type ImageSourceSeekable interface { // GetBlobAt returns a stream for the specified blob. GetBlobAt(context.Context, publicTypes.BlobInfo, []ImageSourceChunk) (chan io.ReadCloser, chan error, error) } + +// ImageDestinationPartial is a service to store a blob by requesting the missing chunks to a ImageSourceSeekable. +// This API is experimental and can be changed without bumping the major version number. +type ImageDestinationPartial interface { + // PutBlobPartial writes contents of stream and returns data representing the result. + PutBlobPartial(ctx context.Context, stream ImageSourceSeekable, srcInfo publicTypes.BlobInfo, cache publicTypes.BlobInfoCache) (publicTypes.BlobInfo, error) +} From 93477e61a3556798bfab89b375e68247e59f0b19 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 18 Nov 2020 09:50:44 +0100 Subject: [PATCH 07/12] docker: support partial blob retrieval Signed-off-by: Giuseppe Scrivano --- docker/docker_image_src.go | 195 +++++++++++++++++++++++++++++++++++++ docker/errors.go | 5 + internal/types/types.go | 9 ++ 3 files changed, 209 insertions(+) diff --git a/docker/docker_image_src.go b/docker/docker_image_src.go index 5ed9ea8f6..c5a428ba0 100644 --- a/docker/docker_image_src.go +++ b/docker/docker_image_src.go @@ -6,14 +6,17 @@ import ( "io" "io/ioutil" "mime" + "mime/multipart" "net/http" "net/url" "os" "strconv" "strings" + "sync" "github.com/containers/image/v5/docker/reference" "github.com/containers/image/v5/internal/iolimits" + internalTypes "github.com/containers/image/v5/internal/types" "github.com/containers/image/v5/manifest" "github.com/containers/image/v5/pkg/sysregistriesv2" "github.com/containers/image/v5/types" @@ -275,6 +278,82 @@ func (s *dockerImageSource) HasThreadSafeGetBlob() bool { return true } +// GetBlobAt returns a stream for the specified blob. +func (s *dockerImageSource) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []internalTypes.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { + headers := make(map[string][]string) + + var rangeVals []string + for _, c := range chunks { + rangeVals = append(rangeVals, fmt.Sprintf("%d-%d", c.Offset, c.Offset+c.Length-1)) + } + + headers["Range"] = []string{fmt.Sprintf("bytes=%s", strings.Join(rangeVals, ","))} + + if len(info.URLs) != 0 { + return nil, nil, fmt.Errorf("external URLs not supported with GetBlobAt") + } + + path := fmt.Sprintf(blobsPath, reference.Path(s.physicalRef.ref), info.Digest.String()) + logrus.Debugf("Downloading %s", path) + res, err := s.c.makeRequest(ctx, "GET", path, headers, nil, v2Auth, nil) + if err != nil { + return nil, nil, err + } + if err := httpResponseToError(res, "Error fetching partial blob"); err != nil { + if res.Body != nil { + res.Body.Close() + } + return nil, nil, err + } + if res.StatusCode != http.StatusPartialContent { + res.Body.Close() + return nil, nil, errors.Errorf("invalid status code returned when fetching blob %d (%s)", res.StatusCode, http.StatusText(res.StatusCode)) + } + + mediaType, params, err := mime.ParseMediaType(res.Header.Get("Content-Type")) + if err != nil { + return nil, nil, err + } + + streams := make(chan io.ReadCloser) + errs := make(chan error) + + go func() { + defer close(streams) + defer close(errs) + if !strings.HasPrefix(mediaType, "multipart/") { + streams <- res.Body + return + } + boundary, found := params["boundary"] + if !found { + errs <- errors.Errorf("could not find boundary") + return + } + buffered := makeBufferedNetworkReader(res.Body, 64, 16384) + defer buffered.Close() + mr := multipart.NewReader(buffered, boundary) + for { + p, err := mr.NextPart() + if err != nil { + if err != io.EOF { + errs <- err + } + return + } + s := signalCloseReader{ + Closed: make(chan interface{}), + Stream: p, + } + streams <- s + // NextPart() cannot be called while the current part + // is being read, so wait until it is closed + <-s.Closed + } + }() + return streams, errs, nil +} + // GetBlob returns a stream for the specified blob, and the blob’s size (or -1 if unknown). // The Digest field in BlobInfo is guaranteed to be provided, Size may be -1 and MediaType may be optionally provided. // May update BlobInfoCache, preferably after it knows for certain that a blob truly exists at a specific location. @@ -498,3 +577,119 @@ func deleteImage(ctx context.Context, sys *types.SystemContext, ref dockerRefere return nil } + +type bufferedNetworkReaderBuffer struct { + data []byte + len int + consumed int + err error +} + +type bufferedNetworkReader struct { + stream io.Reader + emptyBuffer chan *bufferedNetworkReaderBuffer + readyBuffer chan *bufferedNetworkReaderBuffer + terminate chan bool + current *bufferedNetworkReaderBuffer + mutex sync.Mutex + gotEOF bool +} + +// handleBufferedNetworkReader runs in a goroutine +func handleBufferedNetworkReader(br *bufferedNetworkReader) { + defer close(br.readyBuffer) + for { + select { + case b := <-br.emptyBuffer: + b.len, b.err = br.stream.Read(b.data) + br.readyBuffer <- b + if b.err != nil { + return + } + case <-br.terminate: + return + } + } +} + +func (n *bufferedNetworkReader) Close() { + close(n.terminate) + close(n.emptyBuffer) +} + +func (n *bufferedNetworkReader) read(p []byte) (int, error) { + if n.current != nil { + copied := copy(p, n.current.data[n.current.consumed:n.current.len]) + n.current.consumed += copied + if n.current.consumed == n.current.len { + n.emptyBuffer <- n.current + n.current = nil + } + if copied > 0 { + return copied, nil + } + } + if n.gotEOF { + return 0, io.EOF + } + + var b *bufferedNetworkReaderBuffer + + select { + case b = <-n.readyBuffer: + if b.err != nil { + if b.err != io.EOF { + return b.len, b.err + } + n.gotEOF = true + } + b.consumed = 0 + n.current = b + return n.read(p) + case <-n.terminate: + return 0, io.EOF + } +} + +func (n *bufferedNetworkReader) Read(p []byte) (int, error) { + n.mutex.Lock() + defer n.mutex.Unlock() + + return n.read(p) +} + +func makeBufferedNetworkReader(stream io.Reader, nBuffers, bufferSize uint) *bufferedNetworkReader { + br := bufferedNetworkReader{ + stream: stream, + emptyBuffer: make(chan *bufferedNetworkReaderBuffer, nBuffers), + readyBuffer: make(chan *bufferedNetworkReaderBuffer, nBuffers), + terminate: make(chan bool), + } + + go func() { + handleBufferedNetworkReader(&br) + }() + + for i := uint(0); i < nBuffers; i++ { + b := bufferedNetworkReaderBuffer{ + data: make([]byte, bufferSize), + } + br.emptyBuffer <- &b + } + + return &br +} + +type signalCloseReader struct { + Closed chan interface{} + Stream io.ReadCloser +} + +func (s signalCloseReader) Read(p []byte) (int, error) { + return s.Stream.Read(p) +} + +func (s signalCloseReader) Close() error { + defer close(s.Closed) + return s.Stream.Close() +} diff --git a/docker/errors.go b/docker/errors.go index 282c6f3a6..6f2c5fde5 100644 --- a/docker/errors.go +++ b/docker/errors.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" + internalTypes "github.com/containers/image/v5/internal/types" "github.com/docker/distribution/registry/client" perrors "github.com/pkg/errors" ) @@ -32,11 +33,15 @@ func httpResponseToError(res *http.Response, context string) error { switch res.StatusCode { case http.StatusOK: return nil + case http.StatusPartialContent: + return nil case http.StatusTooManyRequests: return ErrTooManyRequests case http.StatusUnauthorized: err := client.HandleErrorResponse(res) return ErrUnauthorizedForCredentials{Err: err} + case http.StatusBadRequest: + return internalTypes.BadPartialRequestError{Status: res.Status} default: if context != "" { context = context + ": " diff --git a/internal/types/types.go b/internal/types/types.go index 079adfcd3..e0355a477 100644 --- a/internal/types/types.go +++ b/internal/types/types.go @@ -79,3 +79,12 @@ type ImageDestinationPartial interface { // PutBlobPartial writes contents of stream and returns data representing the result. PutBlobPartial(ctx context.Context, stream ImageSourceSeekable, srcInfo publicTypes.BlobInfo, cache publicTypes.BlobInfoCache) (publicTypes.BlobInfo, error) } + +// BadPartialRequestError is returned by ImageSourceSeekable.GetBlobAt on an invalid request. +type BadPartialRequestError struct { + Status string +} + +func (e BadPartialRequestError) Error() string { + return e.Status +} From 67a4c997599937c41772a65386c427983074ee01 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 18 Nov 2020 09:50:25 +0100 Subject: [PATCH 08/12] storage: support partial storage with zstd:chunked Signed-off-by: Giuseppe Scrivano --- storage/storage_image.go | 98 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 92 insertions(+), 6 deletions(-) diff --git a/storage/storage_image.go b/storage/storage_image.go index cf9282fd1..6b0fea61a 100644 --- a/storage/storage_image.go +++ b/storage/storage_image.go @@ -23,7 +23,9 @@ import ( "github.com/containers/image/v5/pkg/blobinfocache/none" "github.com/containers/image/v5/types" "github.com/containers/storage" + "github.com/containers/storage/drivers" "github.com/containers/storage/pkg/archive" + "github.com/containers/storage/pkg/chunked" "github.com/containers/storage/pkg/ioutils" digest "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" @@ -77,12 +79,13 @@ type storageImageDestination struct { indexToStorageID map[int]*string // All accesses to below data are protected by `lock` which is made // *explicit* in the code. - blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs - fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes - filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them - currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed) - indexToPulledLayerInfo map[int]*manifest.LayerInfo // Mapping from layer (by index) to pulled down blob - blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer + blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs + fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes + filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them + currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed) + indexToPulledLayerInfo map[int]*manifest.LayerInfo // Mapping from layer (by index) to pulled down blob + blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer + diffOutputs map[digest.Digest]*graphdriver.DriverWithDifferOutput // Mapping from digest to differ output } type storageImageCloser struct { @@ -404,6 +407,7 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (* SignaturesSizes: make(map[digest.Digest][]int), indexToStorageID: make(map[int]*string), indexToPulledLayerInfo: make(map[int]*manifest.LayerInfo), + diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput), } return image, nil } @@ -419,6 +423,11 @@ func (s *storageImageDestination) Close() error { for _, al := range s.blobAdditionalLayer { al.Release() } + for _, v := range s.diffOutputs { + if v.Target != "" { + _ = s.imageRef.transport.store.CleanupStagingDirectory(v.Target) + } + } return os.RemoveAll(s.directory) } @@ -573,6 +582,61 @@ func (s *storageImageDestination) tryReusingBlobWithSrcRef(ctx context.Context, return s.tryReusingBlobLocked(ctx, blobinfo, cache, canSubstitute) } +type zstdFetcher struct { + stream internalTypes.ImageSourceSeekable + ctx context.Context + blobInfo types.BlobInfo +} + +// GetBlobAt converts from chunked.GetBlobAt to ImageSourceSeekable.GetBlobAt. +func (f *zstdFetcher) GetBlobAt(chunks []chunked.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { + var newChunks []internalTypes.ImageSourceChunk + for _, v := range chunks { + i := internalTypes.ImageSourceChunk{ + Offset: v.Offset, + Length: v.Length, + } + newChunks = append(newChunks, i) + } + rc, errs, err := f.stream.GetBlobAt(f.ctx, f.blobInfo, newChunks) + if _, ok := err.(internalTypes.BadPartialRequestError); ok { + err = chunked.ErrBadRequest{} + } + return rc, errs, err + +} + +// PutBlobPartial attempts to create a blob using the data that is already present at the destination storage. stream is accessed +// in a non-sequential way to retrieve the missing chunks. +func (s *storageImageDestination) PutBlobPartial(ctx context.Context, stream internalTypes.ImageSourceSeekable, srcInfo types.BlobInfo, cache types.BlobInfoCache) (types.BlobInfo, error) { + fetcher := zstdFetcher{ + stream: stream, + ctx: ctx, + blobInfo: srcInfo, + } + + differ, err := chunked.GetDiffer(ctx, s.imageRef.transport.store, srcInfo.Size, srcInfo.Annotations, &fetcher) + if err != nil { + return srcInfo, err + } + + out, err := s.imageRef.transport.store.ApplyDiffWithDiffer("", nil, differ) + if err != nil { + return srcInfo, err + } + + blobDigest := srcInfo.Digest + + s.lock.Lock() + s.blobDiffIDs[blobDigest] = blobDigest + s.fileSizes[blobDigest] = 0 + s.filenames[blobDigest] = "" + s.diffOutputs[blobDigest] = out + s.lock.Unlock() + + return srcInfo, nil +} + // TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination // (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). // info.Digest must not be empty. @@ -844,6 +908,27 @@ func (s *storageImageDestination) commitLayer(ctx context.Context, blob manifest return nil } + s.lock.Lock() + diffOutput, ok := s.diffOutputs[blob.Digest] + s.lock.Unlock() + if ok { + layer, err := s.imageRef.transport.store.CreateLayer(id, lastLayer, nil, "", false, nil) + if err != nil { + return err + } + + // FIXME: what to do with the uncompressed digest? + diffOutput.UncompressedDigest = blob.Digest + + if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, nil); err != nil { + _ = s.imageRef.transport.store.Delete(layer.ID) + return err + } + + s.indexToStorageID[index] = &layer.ID + return nil + } + s.lock.Lock() al, ok := s.blobAdditionalLayer[blob.Digest] s.lock.Unlock() @@ -969,6 +1054,7 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t return errors.Wrapf(err, "parsing manifest") } layerBlobs := man.LayerInfos() + // Extract, commit, or find the layers. for i, blob := range layerBlobs { if err := s.commitLayer(ctx, blob, i); err != nil { From 3250f2da77d61931d38513846f2bb7fbf9352460 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 18 Nov 2020 09:37:41 +0100 Subject: [PATCH 09/12] copy: do not ignore errors on Close Signed-off-by: Giuseppe Scrivano --- copy/copy.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/copy/copy.go b/copy/copy.go index 73c63d287..52c6a1e3d 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -1653,20 +1653,26 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr return uploadedInfo, nil } -// compressGoroutine reads all input from src and writes its compressed equivalent to dest. -func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, metadata map[string]string, compressionFormat compression.Algorithm) { - err := errors.New("Internal error: unexpected panic in compressGoroutine") - defer func() { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily. - _ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil - }() - - compressor, err := compression.CompressStreamWithMetadata(dest, metadata, compressionFormat, c.compressionLevel) +// doCompression reads all input from src and writes its compressed equivalent to dest. +func doCompression(dest io.Writer, src io.Reader, metadata map[string]string, compressionFormat compression.Algorithm, compressionLevel *int) error { + compressor, err := compression.CompressStreamWithMetadata(dest, metadata, compressionFormat, compressionLevel) if err != nil { - return + return err } - defer compressor.Close() buf := make([]byte, compressionBufferSize) _, err = io.CopyBuffer(compressor, src, buf) // Sets err to nil, i.e. causes dest.Close() + if err != nil { + compressor.Close() + return err + } + + return compressor.Close() +} + +// compressGoroutine reads all input from src and writes its compressed equivalent to dest. +func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, metadata map[string]string, compressionFormat compression.Algorithm) { + err := doCompression(dest, src, metadata, compressionFormat, c.compressionLevel) + _ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil } From 138ab62aa5bdcec0598b5eef92950a6324dcf0ae Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Wed, 18 Nov 2020 09:51:00 +0100 Subject: [PATCH 10/12] copy: use partial blob retrieval if possible Signed-off-by: Giuseppe Scrivano --- copy/copy.go | 42 +++++++++++++++++++++++++++++++++++++++++ copy/progress_reader.go | 25 ++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/copy/copy.go b/copy/copy.go index 52c6a1e3d..39efc2eea 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -1244,6 +1244,48 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } } + // A partial pull is managed by the destination storage, that decides what portions + // of the source file are not known yet and must be fetched. + // Attempt a partial only when the source allows to retrieve a blob partially and + // the destination has support for it. + imgSource, okSource := ic.c.rawSource.(internalTypes.ImageSourceSeekable) + imgDest, okDest := ic.c.dest.(internalTypes.ImageDestinationPartial) + if okSource && okDest && !diffIDIsNeeded { + bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") + + progress := make(chan int64) + terminate := make(chan interface{}) + + defer close(terminate) + defer close(progress) + + proxy := imageSourceSeekableProxy{ + source: imgSource, + progress: progress, + } + go func() { + for { + select { + case written := <-progress: + bar.IncrInt64(written) + case <-terminate: + return + } + } + + }() + + bar.SetTotal(srcInfo.Size, false) + info, err := imgDest.PutBlobPartial(ctx, proxy, srcInfo, ic.c.blobInfoCache) + if err == nil { + bar.SetRefill(srcInfo.Size - bar.Current()) + bar.SetTotal(srcInfo.Size, true) + logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest) + return info, cachedDiffID, nil + } + logrus.Errorf("Failed to retrieve partial blob: %v", err) + } + // Fallback: copy the layer, computing the diffID if we need to do so srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache) if err != nil { diff --git a/copy/progress_reader.go b/copy/progress_reader.go index 0761065a2..42f490d32 100644 --- a/copy/progress_reader.go +++ b/copy/progress_reader.go @@ -1,9 +1,11 @@ package copy import ( + "context" "io" "time" + internalTypes "github.com/containers/image/v5/internal/types" "github.com/containers/image/v5/types" ) @@ -77,3 +79,26 @@ func (r *progressReader) Read(p []byte) (int, error) { } return n, err } + +// imageSourceSeekableProxy wraps ImageSourceSeekable and keeps track of how many bytes +// are received. +type imageSourceSeekableProxy struct { + // source is the seekable input to read from. + source internalTypes.ImageSourceSeekable + // progress is the chan where the total number of bytes read so far are reported. + progress chan int64 +} + +// GetBlobAt reads from the ImageSourceSeekable and report how many bytes were received +// to the progress chan. +func (s imageSourceSeekableProxy) GetBlobAt(ctx context.Context, bInfo types.BlobInfo, chunks []internalTypes.ImageSourceChunk) (chan io.ReadCloser, chan error, error) { + rc, errs, err := s.source.GetBlobAt(ctx, bInfo, chunks) + if err == nil { + total := int64(0) + for _, c := range chunks { + total += int64(c.Length) + } + s.progress <- total + } + return rc, errs, err +} From ee5014cf7d5b788f8cdfdba424861d60bf0293be Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 19 Nov 2020 10:46:23 +0100 Subject: [PATCH 11/12] copy: provide progress bar for partial blobs Signed-off-by: Giuseppe Scrivano --- copy/copy.go | 61 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/copy/copy.go b/copy/copy.go index 39efc2eea..e2655b9a6 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -1072,9 +1072,25 @@ func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) { } } +// customPartialBlobCounter provides a decorator function for the partial blobs retrieval progress bar +func customPartialBlobCounter(filler interface{}, wcc ...decor.WC) decor.Decorator { + producer := func(filler interface{}) decor.DecorFunc { + return func(s decor.Statistics) string { + if s.Total == 0 { + pairFmt := "%.1f / %.1f (skipped: %.1f)" + return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill)) + } + pairFmt := "%.1f / %.1f (skipped: %.1f = %.2f%%)" + percentage := 100.0 * float64(s.Refill) / float64(s.Total) + return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill), percentage) + } + } + return decor.Any(producer(filler), wcc...) +} + // createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter // is ioutil.Discard, the progress bar's output will be discarded -func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar { +func (c *copier) createProgressBar(pool *mpb.Progress, partial bool, info types.BlobInfo, kind string, onComplete string) *mpb.Bar { // shortDigestLen is the length of the digest used for blobs. const shortDigestLen = 12 @@ -1091,18 +1107,30 @@ func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind // Use a normal progress bar when we know the size (i.e., size > 0). // Otherwise, use a spinner to indicate that something's happening. var bar *mpb.Bar + sstyle := mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft() if info.Size > 0 { - bar = pool.AddBar(info.Size, - mpb.BarFillerClearOnComplete(), - mpb.PrependDecorators( - decor.OnComplete(decor.Name(prefix), onComplete), - ), - mpb.AppendDecorators( - decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""), - ), - ) + if partial { + bar = pool.AddBar(info.Size, + mpb.BarFillerClearOnComplete(), + mpb.PrependDecorators( + decor.OnComplete(decor.Name(prefix), onComplete), + ), + mpb.AppendDecorators( + customPartialBlobCounter(sstyle.Build()), + ), + ) + } else { + bar = pool.AddBar(info.Size, + mpb.BarFillerClearOnComplete(), + mpb.PrependDecorators( + decor.OnComplete(decor.Name(prefix), onComplete), + ), + mpb.AppendDecorators( + decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""), + ), + ) + } } else { - sstyle := mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft() bar = pool.Add(0, sstyle.Build(), mpb.BarFillerClearOnComplete(), @@ -1129,7 +1157,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error { destInfo, err := func() (types.BlobInfo, error) { // A scope for defer progressPool, progressCleanup := c.newProgressPool(ctx) defer progressCleanup() - bar := c.createProgressBar(progressPool, srcInfo, "config", "done") + bar := c.createProgressBar(progressPool, false, srcInfo, "config", "done") destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, false, bar, -1, false) if err != nil { return types.BlobInfo{}, err @@ -1217,7 +1245,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } if reused { logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest) - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists") + bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "skipped: already exists") bar.SetTotal(0, true) // Throw an event that the layer has been skipped @@ -1251,7 +1279,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to imgSource, okSource := ic.c.rawSource.(internalTypes.ImageSourceSeekable) imgDest, okDest := ic.c.dest.(internalTypes.ImageDestinationPartial) if okSource && okDest && !diffIDIsNeeded { - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") + bar := ic.c.createProgressBar(pool, true, srcInfo, "blob", "done") progress := make(chan int64) terminate := make(chan interface{}) @@ -1272,17 +1300,18 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to return } } - }() bar.SetTotal(srcInfo.Size, false) info, err := imgDest.PutBlobPartial(ctx, proxy, srcInfo, ic.c.blobInfoCache) if err == nil { bar.SetRefill(srcInfo.Size - bar.Current()) + bar.SetCurrent(srcInfo.Size) bar.SetTotal(srcInfo.Size, true) logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest) return info, cachedDiffID, nil } + bar.Abort(true) logrus.Errorf("Failed to retrieve partial blob: %v", err) } @@ -1293,7 +1322,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to } defer srcStream.Close() - bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done") + bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "done") blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer) if err != nil { From be50ba78342fda966bc86e9113e3180f2408fd00 Mon Sep 17 00:00:00 2001 From: Giuseppe Scrivano Date: Thu, 19 Nov 2020 12:03:13 +0100 Subject: [PATCH 12/12] copy: make using partial blobs configurable Signed-off-by: Giuseppe Scrivano --- copy/copy.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/copy/copy.go b/copy/copy.go index e2655b9a6..ae182777b 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -123,6 +123,7 @@ type copier struct { ociEncryptConfig *encconfig.EncryptConfig maxParallelDownloads uint downloadForeignLayers bool + fetchPartialBlobs bool } // imageCopier tracks state specific to a single image (possibly an item of a manifest list) @@ -194,15 +195,21 @@ type Options struct { // OciDecryptConfig contains the config that can be used to decrypt an image if it is // encrypted if non-nil. If nil, it does not attempt to decrypt an image. OciDecryptConfig *encconfig.DecryptConfig + // MaxParallelDownloads indicates the maximum layers to pull at the same time. A reasonable default is used if this is left as 0. MaxParallelDownloads uint + // When OptimizeDestinationImageAlreadyExists is set, optimize the copy assuming that the destination image already // exists (and is equivalent). Making the eventual (no-op) copy more performant for this case. Enabling the option // is slightly pessimistic if the destination image doesn't exist, or is not equivalent. OptimizeDestinationImageAlreadyExists bool + // Download layer contents with "nondistributable" media types ("foreign" layers) and translate the layer media type // to not indicate "nondistributable". DownloadForeignLayers bool + + // FetchPartialBlobs indicates whether to attempt to fetch the blob partially. Experimental. + FetchPartialBlobs bool } // validateImageListSelection returns an error if the passed-in value is not one that we recognize as a valid ImageListSelection value @@ -283,6 +290,7 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef, ociEncryptConfig: options.OciEncryptConfig, maxParallelDownloads: options.MaxParallelDownloads, downloadForeignLayers: options.DownloadForeignLayers, + fetchPartialBlobs: options.FetchPartialBlobs, } // Default to using gzip compression unless specified otherwise. if options.DestinationCtx == nil || options.DestinationCtx.CompressionFormat == nil { @@ -1278,7 +1286,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to // the destination has support for it. imgSource, okSource := ic.c.rawSource.(internalTypes.ImageSourceSeekable) imgDest, okDest := ic.c.dest.(internalTypes.ImageDestinationPartial) - if okSource && okDest && !diffIDIsNeeded { + if ic.c.fetchPartialBlobs && okSource && okDest && !diffIDIsNeeded { bar := ic.c.createProgressBar(pool, true, srcInfo, "blob", "done") progress := make(chan int64)