Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Random storage-related cleanups #2287

Merged
merged 9 commits into from
Feb 8, 2024
2 changes: 1 addition & 1 deletion oci/layout/oci_src_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestGetBlobForRemoteLayers(t *testing.T) {
imageSource := createImageSource(t, &types.SystemContext{})
defer imageSource.Close()
layerInfo := types.BlobInfo{
Digest: digest.FromBytes([]byte("Hello world")),
Digest: digest.FromString("Hello world"),
Size: -1,
URLs: []string{
"brokenurl",
Expand Down
80 changes: 42 additions & 38 deletions storage/storage_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ type storageImageDestination struct {
stubs.ImplementsPutBlobPartial
stubs.AlwaysSupportsSignatures

imageRef storageReference
directory string // Temporary directory where we store blobs until Commit() time
nextTempFileID atomic.Int32 // A counter that we use for computing filenames to assign to blobs
manifest []byte // Manifest contents, temporary
manifestDigest digest.Digest // Valid if len(manifest) != 0
signatures []byte // Signature contents, temporary
signatureses map[digest.Digest][]byte // Instance signature contents, temporary
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // Sizes of each manifest's signature slice
imageRef storageReference
directory string // Temporary directory where we store blobs until Commit() time
nextTempFileID atomic.Int32 // A counter that we use for computing filenames to assign to blobs
manifest []byte // Manifest contents, temporary
manifestDigest digest.Digest // Valid if len(manifest) != 0
signatures []byte // Signature contents, temporary
signatureses map[digest.Digest][]byte // Instance signature contents, temporary
metadata storageImageMetadata // Metadata contents being built

// A storage destination may be used concurrently. Accesses are
// serialized via a mutex. Please refer to the individual comments
Expand All @@ -74,8 +73,9 @@ type storageImageDestination struct {
// time, can only be executed by *one* goroutine. Please refer to
// `queueOrCommit()` for further details on how the single-caller
// guarantee is implemented.
indexToStorageID map[int]*string
// All accesses to below data are protected by `lock` which is made
indexToStorageID map[int]string
// All accesses to below data are, during the concurrent TryReusingBlob/PutBlob/* calls
// (but not necessarily during the final Commit) protected by `lock` which is made
// *explicit* in the code.
uncompressedOrTocDigest map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs or TOC IDs.
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
Expand Down Expand Up @@ -124,11 +124,13 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (*
blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
indexToStorageID: make(map[int]*string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
metadata: storageImageMetadata{
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
},
indexToStorageID: make(map[int]string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
}
dest.Compat = impl.AddCompat(dest)
return dest, nil
Expand Down Expand Up @@ -586,14 +588,18 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// Start with an empty string or the previous layer ID. Note that
// `s.indexToStorageID` can only be accessed by *one* goroutine at any
// given time. Hence, we don't need to lock accesses.
var lastLayer string
if prev := s.indexToStorageID[index-1]; prev != nil {
lastLayer = *prev
var parentLayer string
if index != 0 {
prev, ok := s.indexToStorageID[index-1]
if !ok {
return false, fmt.Errorf("Internal error: commitLayer called with previous layer %d not committed yet", index-1)
}
parentLayer = prev
}

// Carry over the previous ID for empty non-base layers.
if info.emptyLayer {
s.indexToStorageID[index] = &lastLayer
s.indexToStorageID[index] = parentLayer
return false, nil
}

Expand Down Expand Up @@ -626,13 +632,12 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
}
}
id := diffIDOrTOCDigest.Hex()
if lastLayer != "" {
id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffIDOrTOCDigest.Hex())).Hex()
if parentLayer != "" {
id = digest.Canonical.FromString(parentLayer + "+" + diffIDOrTOCDigest.Hex()).Hex()
}
if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil {
// There's already a layer that should have the right contents, just reuse it.
lastLayer = layer.ID
s.indexToStorageID[index] = &lastLayer
s.indexToStorageID[index] = layer.ID
return false, nil
}

Expand Down Expand Up @@ -664,7 +669,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
return false, fmt.Errorf("index %d out of range for configOCI.RootFS.DiffIDs", index)
}

layer, err := s.imageRef.transport.store.CreateLayer(id, lastLayer, nil, "", false, nil)
layer, err := s.imageRef.transport.store.CreateLayer(id, parentLayer, nil, "", false, nil)
if err != nil {
return false, err
}
Expand All @@ -682,20 +687,19 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
return false, err
}

s.indexToStorageID[index] = &layer.ID
s.indexToStorageID[index] = layer.ID
return false, nil
}

s.lock.Lock()
al, ok := s.blobAdditionalLayer[info.digest]
s.lock.Unlock()
if ok {
layer, err := al.PutAs(id, lastLayer, nil)
layer, err := al.PutAs(id, parentLayer, nil)
if err != nil && !errors.Is(err, storage.ErrDuplicateID) {
return false, fmt.Errorf("failed to put layer from digest and labels: %w", err)
}
lastLayer = layer.ID
s.indexToStorageID[index] = &lastLayer
s.indexToStorageID[index] = layer.ID
return false, nil
}

Expand Down Expand Up @@ -761,15 +765,15 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
defer file.Close()
// Build the new layer using the diff, regardless of where it came from.
// TODO: This can take quite some time, and should ideally be cancellable using ctx.Done().
layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, &storage.LayerOptions{
layer, _, err := s.imageRef.transport.store.PutLayer(id, parentLayer, nil, "", false, &storage.LayerOptions{
OriginalDigest: info.digest,
UncompressedDigest: diffIDOrTOCDigest,
}, file)
if err != nil && !errors.Is(err, storage.ErrDuplicateID) {
return false, fmt.Errorf("adding layer with blob %q: %w", info.digest, err)
}

s.indexToStorageID[index] = &layer.ID
s.indexToStorageID[index] = layer.ID
return false, nil
}

Expand Down Expand Up @@ -827,12 +831,12 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
}
}
var lastLayer string
if len(layerBlobs) > 0 { // Can happen when using caches
prev := s.indexToStorageID[len(layerBlobs)-1]
if prev == nil {
if len(layerBlobs) > 0 { // Zero-layer images rarely make sense, but it is technically possible, and may happen for non-image artifacts.
prev, ok := s.indexToStorageID[len(layerBlobs)-1]
if !ok {
return fmt.Errorf("Internal error: storageImageDestination.Commit(): previous layer %d hasn't been committed (lastLayer == nil)", len(layerBlobs)-1)
}
lastLayer = *prev
lastLayer = prev
}

// If one of those blobs was a configuration blob, then we can try to dig out the date when the image
Expand Down Expand Up @@ -906,7 +910,7 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
}

// Set up to save our metadata.
metadata, err := json.Marshal(s)
metadata, err := json.Marshal(s.metadata)
if err != nil {
return fmt.Errorf("encoding metadata for image: %w", err)
}
Expand Down Expand Up @@ -1011,15 +1015,15 @@ func (s *storageImageDestination) PutSignaturesWithFormat(ctx context.Context, s
}
if instanceDigest == nil {
s.signatures = sigblob
s.SignatureSizes = sizes
s.metadata.SignatureSizes = sizes
if len(s.manifest) > 0 {
manifestDigest := s.manifestDigest
instanceDigest = &manifestDigest
}
}
if instanceDigest != nil {
s.signatureses[*instanceDigest] = sigblob
s.SignaturesSizes[*instanceDigest] = sizes
s.metadata.SignaturesSizes[*instanceDigest] = sizes
}
return nil
}
16 changes: 11 additions & 5 deletions storage/storage_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ var (
ErrNoSuchImage = storage.ErrNotAnImage
)

type storageImageCloser struct {
types.ImageCloser
size int64
}

// manifestBigDataKey returns a key suitable for recording a manifest with the specified digest using storage.Store.ImageBigData and related functions.
// If a specific manifest digest is explicitly requested by the user, the key returned by this function should be used preferably;
// for compatibility, if a manifest is not available under this key, check also storage.ImageDigestBigDataKey
Expand All @@ -36,6 +31,17 @@ func signatureBigDataKey(digest digest.Digest) string {
return "signature-" + digest.Encoded()
}

// storageImageMetadata is stored, as JSON, in storage.Image.Metadata
type storageImageMetadata struct {
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // Sizes of each manifest's signature slice
}

type storageImageCloser struct {
types.ImageCloser
size int64
}

// Size() returns the previously-computed size of the image, with no error.
func (s *storageImageCloser) Size() (int64, error) {
return s.size, nil
Expand Down
27 changes: 14 additions & 13 deletions storage/storage_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ type storageImageSource struct {
imageRef storageReference
image *storage.Image
systemContext *types.SystemContext // SystemContext used in GetBlob() to create temporary files
cachedManifest []byte // A cached copy of the manifest, if already known, or nil
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions (it guards layerPosition and digestToLayerID)
metadata storageImageMetadata
cachedManifest []byte // A cached copy of the manifest, if already known, or nil
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions (it guards layerPosition and digestToLayerID)
getBlobMutexProtected getBlobMutexProtected
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // List of sizes of each signature slice
}

const expectedLayerDiffIDFlag = "expected-layer-diffid"
Expand All @@ -71,19 +70,21 @@ func newImageSource(sys *types.SystemContext, imageRef storageReference) (*stora
}),
NoGetBlobAtInitialize: stubs.NoGetBlobAt(imageRef),

imageRef: imageRef,
systemContext: sys,
image: img,
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
imageRef: imageRef,
systemContext: sys,
image: img,
metadata: storageImageMetadata{
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
},
getBlobMutexProtected: getBlobMutexProtected{
digestToLayerID: make(map[digest.Digest]string),
layerPosition: make(map[digest.Digest]int),
},
}
image.Compat = impl.AddCompat(image)
if img.Metadata != "" {
if err := json.Unmarshal([]byte(img.Metadata), image); err != nil {
if err := json.Unmarshal([]byte(img.Metadata), &image.metadata); err != nil {
return nil, fmt.Errorf("decoding metadata for source image: %w", err)
}
}
Expand Down Expand Up @@ -375,11 +376,11 @@ func buildLayerInfosForCopy(manifestInfos []manifest.LayerInfo, physicalInfos []
func (s *storageImageSource) GetSignaturesWithFormat(ctx context.Context, instanceDigest *digest.Digest) ([]signature.Signature, error) {
var offset int
signatureBlobs := []byte{}
signatureSizes := s.SignatureSizes
signatureSizes := s.metadata.SignatureSizes
key := "signatures"
instance := "default instance"
if instanceDigest != nil {
signatureSizes = s.SignaturesSizes[*instanceDigest]
signatureSizes = s.metadata.SignaturesSizes[*instanceDigest]
key = signatureBigDataKey(*instanceDigest)
instance = instanceDigest.Encoded()
}
Expand Down Expand Up @@ -425,7 +426,7 @@ func (s *storageImageSource) getSize() (int64, error) {
sum += bigSize
}
// Add the signature sizes.
for _, sigSize := range s.SignatureSizes {
for _, sigSize := range s.metadata.SignatureSizes {
sum += int64(sigSize)
}
// Walk the layer list.
Expand Down