From b7a2e8a2c3d02d90952dd24c37853ac7b3dede59 Mon Sep 17 00:00:00 2001 From: Ben McClelland Date: Fri, 4 Oct 2024 21:40:39 -0700 Subject: [PATCH] fix: unexpected errors during upload races This fixes the cases for racing uploads with the same object names. Before we were making some bad assumptions about what would cause an error when trying to link/rename the final object name into the namespace, but missed the case that another upload for the same name could be racing with this upload and causing an incorrect error. This also changes the order of setting metadata to prevent accidental setting of metadata for the current upload to another racing upload. This also fix auth.CheckObjectAccess() when objects are removed while this runs. Fixes #854 --- auth/object_lock.go | 3 + backend/meta/meta.go | 6 +- backend/meta/xattr.go | 17 +- backend/posix/posix.go | 379 +++++++++++++++------------- backend/posix/with_otmpfile.go | 23 +- backend/scoutfs/scoutfs.go | 81 +++--- backend/scoutfs/scoutfs_compat.go | 4 + backend/scoutfs/scoutfs_incompat.go | 8 +- cmd/versitygw/gateway_test.go | 4 + go.mod | 1 + go.sum | 2 + tests/integration/group-tests.go | 17 +- tests/integration/s3conf.go | 1 + tests/integration/tests.go | 300 ++++++++++++++++++++-- tests/integration/utils.go | 47 ++-- 15 files changed, 608 insertions(+), 285 deletions(-) diff --git a/auth/object_lock.go b/auth/object_lock.go index 40801257..8abfa8d8 100644 --- a/auth/object_lock.go +++ b/auth/object_lock.go @@ -227,6 +227,9 @@ func CheckObjectAccess(ctx context.Context, bucket, userAccess string, objects [ status, err := be.GetObjectLegalHold(ctx, bucket, key, versionId) if err != nil { + if errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchKey)) { + continue + } if errors.Is(err, s3err.GetAPIError(s3err.ErrNoSuchObjectLockConfiguration)) { checkLegalHold = false } else { diff --git a/backend/meta/meta.go b/backend/meta/meta.go index 98ac6d3b..4376a73e 100644 --- a/backend/meta/meta.go +++ b/backend/meta/meta.go @@ -14,17 +14,19 @@ package meta +import "os" + // MetadataStorer defines the interface for managing metadata. // When object == "", the operation is on the bucket. type MetadataStorer interface { // RetrieveAttribute retrieves the value of a specific attribute for an object or a bucket. // Returns the value of the attribute, or an error if the attribute does not exist. - RetrieveAttribute(bucket, object, attribute string) ([]byte, error) + RetrieveAttribute(f *os.File, bucket, object, attribute string) ([]byte, error) // StoreAttribute stores the value of a specific attribute for an object or a bucket. // If attribute already exists, new attribute should replace existing. // Returns an error if the operation fails. - StoreAttribute(bucket, object, attribute string, value []byte) error + StoreAttribute(f *os.File, bucket, object, attribute string, value []byte) error // DeleteAttribute removes the value of a specific attribute for an object or a bucket. // Returns an error if the operation fails. diff --git a/backend/meta/xattr.go b/backend/meta/xattr.go index f3789916..89277b15 100644 --- a/backend/meta/xattr.go +++ b/backend/meta/xattr.go @@ -17,6 +17,7 @@ package meta import ( "errors" "fmt" + "os" "path/filepath" "strings" "syscall" @@ -36,7 +37,15 @@ var ( type XattrMeta struct{} // RetrieveAttribute retrieves the value of a specific attribute for an object in a bucket. -func (x XattrMeta) RetrieveAttribute(bucket, object, attribute string) ([]byte, error) { +func (x XattrMeta) RetrieveAttribute(f *os.File, bucket, object, attribute string) ([]byte, error) { + if f != nil { + b, err := xattr.FGet(f, xattrPrefix+attribute) + if errors.Is(err, xattr.ENOATTR) { + return nil, ErrNoSuchKey + } + return b, err + } + b, err := xattr.Get(filepath.Join(bucket, object), xattrPrefix+attribute) if errors.Is(err, xattr.ENOATTR) { return nil, ErrNoSuchKey @@ -45,7 +54,11 @@ func (x XattrMeta) RetrieveAttribute(bucket, object, attribute string) ([]byte, } // StoreAttribute stores the value of a specific attribute for an object in a bucket. -func (x XattrMeta) StoreAttribute(bucket, object, attribute string, value []byte) error { +func (x XattrMeta) StoreAttribute(f *os.File, bucket, object, attribute string, value []byte) error { + if f != nil { + return xattr.FSet(f, xattrPrefix+attribute, value) + } + return xattr.Set(filepath.Join(bucket, object), xattrPrefix+attribute, value) } diff --git a/backend/posix/posix.go b/backend/posix/posix.go index 950add79..2b55eb2b 100644 --- a/backend/posix/posix.go +++ b/backend/posix/posix.go @@ -153,10 +153,10 @@ func New(rootdir string, meta meta.MetadataStorer, opts PosixOpts) (*Posix, erro if !vDir.IsDir() { return nil, fmt.Errorf("versioning path should be a directory") } - - fmt.Printf("Bucket versioning enabled with directory: %v\n", verioningdirAbs) } + fmt.Printf("Bucket versioning enabled with directory: %v\n", verioningdirAbs) + return &Posix{ meta: meta, rootfd: f, @@ -220,7 +220,7 @@ func (p *Posix) ListBuckets(_ context.Context, owner string, isAdmin bool) (s3re continue } - aclTag, err := p.meta.RetrieveAttribute(entry.Name(), "", aclkey) + aclTag, err := p.meta.RetrieveAttribute(nil, entry.Name(), "", aclkey) if errors.Is(err, meta.ErrNoSuchKey) { // skip buckets without acl tag continue @@ -292,7 +292,7 @@ func (p *Posix) CreateBucket(ctx context.Context, input *s3.CreateBucketInput, a err := os.Mkdir(bucket, defaultDirPerm) if err != nil && os.IsExist(err) { - aclJSON, err := p.meta.RetrieveAttribute(bucket, "", aclkey) + aclJSON, err := p.meta.RetrieveAttribute(nil, bucket, "", aclkey) if err != nil { return fmt.Errorf("get bucket acl: %w", err) } @@ -317,10 +317,12 @@ func (p *Posix) CreateBucket(ctx context.Context, input *s3.CreateBucketInput, a } } - if err := p.meta.StoreAttribute(bucket, "", aclkey, acl); err != nil { + err = p.meta.StoreAttribute(nil, bucket, "", aclkey, acl) + if err != nil { return fmt.Errorf("set acl: %w", err) } - if err := p.meta.StoreAttribute(bucket, "", ownershipkey, []byte(input.ObjectOwnership)); err != nil { + err = p.meta.StoreAttribute(nil, bucket, "", ownershipkey, []byte(input.ObjectOwnership)) + if err != nil { return fmt.Errorf("set ownership: %w", err) } @@ -345,7 +347,8 @@ func (p *Posix) CreateBucket(ctx context.Context, input *s3.CreateBucketInput, a return fmt.Errorf("parse default bucket lock state: %w", err) } - if err := p.meta.StoreAttribute(bucket, "", bucketLockKey, defaultLockParsed); err != nil { + err = p.meta.StoreAttribute(nil, bucket, "", bucketLockKey, defaultLockParsed) + if err != nil { return fmt.Errorf("set default bucket lock: %w", err) } } @@ -400,7 +403,8 @@ func (p *Posix) PutBucketOwnershipControls(_ context.Context, bucket string, own return fmt.Errorf("stat bucket: %w", err) } - if err := p.meta.StoreAttribute(bucket, "", ownershipkey, []byte(ownership)); err != nil { + err = p.meta.StoreAttribute(nil, bucket, "", ownershipkey, []byte(ownership)) + if err != nil { return fmt.Errorf("set ownership: %w", err) } @@ -416,7 +420,7 @@ func (p *Posix) GetBucketOwnershipControls(_ context.Context, bucket string) (ty return ownship, fmt.Errorf("stat bucket: %w", err) } - ownership, err := p.meta.RetrieveAttribute(bucket, "", ownershipkey) + ownership, err := p.meta.RetrieveAttribute(nil, bucket, "", ownershipkey) if errors.Is(err, meta.ErrNoSuchKey) { return ownship, s3err.GetAPIError(s3err.ErrOwnershipControlsNotFound) } @@ -435,7 +439,8 @@ func (p *Posix) DeleteBucketOwnershipControls(_ context.Context, bucket string) return fmt.Errorf("stat bucket: %w", err) } - if err := p.meta.DeleteAttribute(bucket, "", ownershipkey); err != nil { + err = p.meta.DeleteAttribute(bucket, "", ownershipkey) + if err != nil { if errors.Is(err, meta.ErrNoSuchKey) { return nil } @@ -483,7 +488,8 @@ func (p *Posix) PutBucketVersioning(ctx context.Context, bucket string, status t versioning = []byte{0} } - if err := p.meta.StoreAttribute(bucket, "", versioningKey, versioning); err != nil { + err = p.meta.StoreAttribute(nil, bucket, "", versioningKey, versioning) + if err != nil { return fmt.Errorf("set versioning: %w", err) } @@ -503,7 +509,7 @@ func (p *Posix) GetBucketVersioning(_ context.Context, bucket string) (s3respons return s3response.GetBucketVersioningOutput{}, fmt.Errorf("stat bucket: %w", err) } - vData, err := p.meta.RetrieveAttribute(bucket, "", versioningKey) + vData, err := p.meta.RetrieveAttribute(nil, bucket, "", versioningKey) if errors.Is(err, meta.ErrNoSuchKey) { return s3response.GetBucketVersioningOutput{}, nil } else if err != nil { @@ -563,7 +569,7 @@ func (p *Posix) createObjVersion(bucket, key string, size int64, acc auth.Accoun defer sf.Close() var versionId string - data, err := p.meta.RetrieveAttribute(bucket, key, versionIdKey) + data, err := p.meta.RetrieveAttribute(sf, bucket, key, versionIdKey) if err == nil { versionId = string(data) } else { @@ -596,22 +602,23 @@ func (p *Posix) createObjVersion(bucket, key string, size int64, acc auth.Accoun return versionPath, err } - if err := f.link(); err != nil { - return versionPath, err - } - // Copy the object attributes(metadata) for _, attr := range attrs { - data, err := p.meta.RetrieveAttribute(bucket, key, attr) + data, err := p.meta.RetrieveAttribute(sf, bucket, key, attr) if err != nil { return versionPath, fmt.Errorf("list %v attribute: %w", attr, err) } - if err := p.meta.StoreAttribute(versionPath, "", attr, data); err != nil { + err = p.meta.StoreAttribute(f.File(), versionPath, "", attr, data) + if err != nil { return versionPath, fmt.Errorf("store %v attribute: %w", attr, err) } } + if err := f.link(); err != nil { + return versionPath, err + } + return versionPath, nil } @@ -673,7 +680,7 @@ func getBoolPtr(b bool) *bool { // Check if the given object is a delete marker func (p *Posix) isObjDeleteMarker(bucket, object string) (bool, error) { - _, err := p.meta.RetrieveAttribute(bucket, object, deleteMarkerKey) + _, err := p.meta.RetrieveAttribute(nil, bucket, object, deleteMarkerKey) if errors.Is(err, fs.ErrNotExist) { return false, s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -704,7 +711,7 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc { if d.IsDir() { // directory object only happens if directory empty // check to see if this is a directory object by checking etag - etagBytes, err := p.meta.RetrieveAttribute(bucket, path, etagkey) + etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey) if errors.Is(err, meta.ErrNoSuchKey) || errors.Is(err, fs.ErrNotExist) { return nil, backend.ErrSkipObj } @@ -744,7 +751,7 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc { } // file object, get object info and fill out object data - etagBytes, err := p.meta.RetrieveAttribute(bucket, path, etagkey) + etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey) if errors.Is(err, fs.ErrNotExist) { return nil, backend.ErrSkipObj } @@ -757,7 +764,7 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc { // If the object doesn't have versionId, it's 'null' versionId := "null" - versionIdBytes, err := p.meta.RetrieveAttribute(bucket, path, versionIdKey) + versionIdBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, versionIdKey) if err == nil { versionId = string(versionIdBytes) } @@ -858,7 +865,7 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc { continue } - etagBytes, err := p.meta.RetrieveAttribute(versionPath, versionId, etagkey) + etagBytes, err := p.meta.RetrieveAttribute(nil, versionPath, versionId, etagkey) if errors.Is(err, fs.ErrNotExist) { return nil, backend.ErrSkipObj } @@ -971,7 +978,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa // set an attribute with the original object name so that we can // map the hashed name back to the original object name - err = p.meta.StoreAttribute(bucket, objdir, onameAttr, []byte(object)) + err = p.meta.StoreAttribute(nil, bucket, objdir, onameAttr, []byte(object)) if err != nil { // if we fail, cleanup the container directories // but ignore errors because there might still be @@ -983,7 +990,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa // set user metadata for k, v := range mpu.Metadata { - err := p.meta.StoreAttribute(bucket, filepath.Join(objdir, uploadID), + err := p.meta.StoreAttribute(nil, bucket, filepath.Join(objdir, uploadID), fmt.Sprintf("%v.%v", metaHdr, k), []byte(v)) if err != nil { // cleanup object if returning error @@ -1007,7 +1014,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa // set content-type ctype := getString(mpu.ContentType) if ctype != "" { - err := p.meta.StoreAttribute(bucket, filepath.Join(objdir, uploadID), + err := p.meta.StoreAttribute(nil, bucket, filepath.Join(objdir, uploadID), contentTypeHdr, []byte(*mpu.ContentType)) if err != nil { // cleanup object if returning error @@ -1020,7 +1027,7 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa // set content-encoding cenc := getString(mpu.ContentEncoding) if cenc != "" { - err := p.meta.StoreAttribute(bucket, filepath.Join(objdir, uploadID), contentEncHdr, + err := p.meta.StoreAttribute(nil, bucket, filepath.Join(objdir, uploadID), contentEncHdr, []byte(*mpu.ContentEncoding)) if err != nil { // cleanup object if returning error @@ -1032,7 +1039,8 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa // set object legal hold if mpu.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn { - if err := p.PutObjectLegalHold(ctx, bucket, filepath.Join(objdir, uploadID), "", true); err != nil { + err := p.PutObjectLegalHold(ctx, bucket, filepath.Join(objdir, uploadID), "", true) + if err != nil { // cleanup object if returning error os.RemoveAll(filepath.Join(tmppath, uploadID)) os.Remove(tmppath) @@ -1053,7 +1061,8 @@ func (p *Posix) CreateMultipartUpload(ctx context.Context, mpu *s3.CreateMultipa os.Remove(tmppath) return s3response.InitiateMultipartUploadResult{}, fmt.Errorf("parse object lock retention: %w", err) } - if err := p.PutObjectRetention(ctx, bucket, filepath.Join(objdir, uploadID), "", true, retParsed); err != nil { + err = p.PutObjectRetention(ctx, bucket, filepath.Join(objdir, uploadID), "", true, retParsed) + if err != nil { // cleanup object if returning error os.RemoveAll(filepath.Join(tmppath, uploadID)) os.Remove(tmppath) @@ -1151,7 +1160,7 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } - b, err := p.meta.RetrieveAttribute(bucket, partObjPath, etagkey) + b, err := p.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey) etag := string(b) if err != nil { etag = "" @@ -1221,97 +1230,89 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM } } - err = f.link() - if err != nil { - return nil, fmt.Errorf("link object in namespace: %w", err) - } - // if the versioning is enabled, generate a new versionID for the object var versionID string if p.versioningEnabled() && vEnabled { versionID = ulid.Make().String() - if err := p.meta.StoreAttribute(bucket, object, versionIdKey, []byte(versionID)); err != nil { + err := p.meta.StoreAttribute(f.File(), bucket, object, versionIdKey, []byte(versionID)) + if err != nil { return nil, fmt.Errorf("set versionId attr: %w", err) } } for k, v := range userMetaData { - err = p.meta.StoreAttribute(bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v)) + err = p.meta.StoreAttribute(f.File(), bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v)) if err != nil { - // cleanup object if returning error - os.Remove(objname) return nil, fmt.Errorf("set user attr %q: %w", k, err) } } // load and set tagging - tagging, err := p.meta.RetrieveAttribute(bucket, upiddir, tagHdr) + tagging, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, tagHdr) + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object tagging: %w", err) + } if err == nil { - if err := p.meta.StoreAttribute(bucket, object, tagHdr, tagging); err != nil { - // cleanup object - os.Remove(objname) + err := p.meta.StoreAttribute(f.File(), bucket, object, tagHdr, tagging) + if err != nil { return nil, fmt.Errorf("set object tagging: %w", err) } } - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return nil, fmt.Errorf("get object tagging: %w", err) - } // set content-type if cType != "" { - if err := p.meta.StoreAttribute(bucket, object, contentTypeHdr, []byte(cType)); err != nil { - // cleanup object - os.Remove(objname) + err := p.meta.StoreAttribute(f.File(), bucket, object, contentTypeHdr, []byte(cType)) + if err != nil { return nil, fmt.Errorf("set object content type: %w", err) } } // set content-encoding if cEnc != "" { - if err := p.meta.StoreAttribute(bucket, object, contentEncHdr, []byte(cEnc)); err != nil { - // cleanup object - os.Remove(objname) + err := p.meta.StoreAttribute(f.File(), bucket, object, contentEncHdr, []byte(cEnc)) + if err != nil { return nil, fmt.Errorf("set object content encoding: %w", err) } } // load and set legal hold - lHold, err := p.meta.RetrieveAttribute(bucket, upiddir, objectLegalHoldKey) + lHold, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey) + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object legal hold: %w", err) + } if err == nil { - if err := p.meta.StoreAttribute(bucket, object, objectLegalHoldKey, lHold); err != nil { - // cleanup object - os.Remove(objname) + err := p.meta.StoreAttribute(f.File(), bucket, object, objectLegalHoldKey, lHold) + if err != nil { return nil, fmt.Errorf("set object legal hold: %w", err) } } - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return nil, fmt.Errorf("get object legal hold: %w", err) - } // load and set retention - ret, err := p.meta.RetrieveAttribute(bucket, upiddir, objectRetentionKey) + ret, err := p.meta.RetrieveAttribute(nil, bucket, upiddir, objectRetentionKey) + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object retention: %w", err) + } if err == nil { - if err := p.meta.StoreAttribute(bucket, object, objectRetentionKey, ret); err != nil { - // cleanup object - os.Remove(objname) + err := p.meta.StoreAttribute(f.File(), bucket, object, objectRetentionKey, ret) + if err != nil { return nil, fmt.Errorf("set object retention: %w", err) } } - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return nil, fmt.Errorf("get object retention: %w", err) - } // Calculate s3 compatible md5sum for complete multipart. s3MD5 := backend.GetMultipartMD5(parts) - err = p.meta.StoreAttribute(bucket, object, etagkey, []byte(s3MD5)) + err = p.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5)) if err != nil { - // cleanup object if returning error - os.Remove(objname) return nil, fmt.Errorf("set etag attr: %w", err) } + err = f.link() + if err != nil { + return nil, fmt.Errorf("link object in namespace: %w", err) + } + // cleanup tmp dirs os.RemoveAll(filepath.Join(bucket, objdir, uploadID)) // use Remove for objdir in case there are still other uploads @@ -1363,7 +1364,7 @@ func (p *Posix) loadUserMetaData(bucket, object string, m map[string]string) (st if !isValidMeta(e) { continue } - b, err := p.meta.RetrieveAttribute(bucket, object, e) + b, err := p.meta.RetrieveAttribute(nil, bucket, object, e) if err != nil { continue } @@ -1375,10 +1376,10 @@ func (p *Posix) loadUserMetaData(bucket, object string, m map[string]string) (st } var contentType, contentEncoding string - b, _ := p.meta.RetrieveAttribute(bucket, object, contentTypeHdr) + b, _ := p.meta.RetrieveAttribute(nil, bucket, object, contentTypeHdr) contentType = string(b) - b, _ = p.meta.RetrieveAttribute(bucket, object, contentEncHdr) + b, _ = p.meta.RetrieveAttribute(nil, bucket, object, contentEncHdr) contentEncoding = string(b) return contentType, contentEncoding @@ -1480,7 +1481,7 @@ func (p *Posix) ListMultipartUploads(_ context.Context, mpu *s3.ListMultipartUpl continue } - b, err := p.meta.RetrieveAttribute(bucket, filepath.Join(metaTmpMultipartDir, obj.Name()), onameAttr) + b, err := p.meta.RetrieveAttribute(nil, bucket, filepath.Join(metaTmpMultipartDir, obj.Name()), onameAttr) if err != nil { continue } @@ -1650,7 +1651,7 @@ func (p *Posix) ListParts(_ context.Context, input *s3.ListPartsInput) (s3respon } partPath := filepath.Join(objdir, uploadID, e.Name()) - b, err := p.meta.RetrieveAttribute(bucket, partPath, etagkey) + b, err := p.meta.RetrieveAttribute(nil, bucket, partPath, etagkey) etag := string(b) if err != nil { etag = "" @@ -1752,6 +1753,7 @@ func (p *Posix) UploadPart(ctx context.Context, input *s3.UploadPartInput) (stri } return "", fmt.Errorf("open temp file: %w", err) } + defer f.cleanup() hash := md5.New() tr := io.TeeReader(r, hash) @@ -1763,20 +1765,18 @@ func (p *Posix) UploadPart(ctx context.Context, input *s3.UploadPartInput) (stri return "", fmt.Errorf("write part data: %w", err) } - err = f.link() - if err != nil { - return "", fmt.Errorf("link object in namespace: %w", err) - } - - f.cleanup() - dataSum := hash.Sum(nil) etag := hex.EncodeToString(dataSum) - err = p.meta.StoreAttribute(bucket, partPath, etagkey, []byte(etag)) + err = p.meta.StoreAttribute(f.File(), bucket, partPath, etagkey, []byte(etag)) if err != nil { return "", fmt.Errorf("set etag attr: %w", err) } + err = f.link() + if err != nil { + return "", fmt.Errorf("link object in namespace: %w", err) + } + return etag, nil } @@ -1839,7 +1839,7 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput) if !p.versioningEnabled() || !vEnabled { return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrInvalidVersionId) } - vId, err := p.meta.RetrieveAttribute(srcBucket, srcObject, versionIdKey) + vId, err := p.meta.RetrieveAttribute(nil, srcBucket, srcObject, versionIdKey) if errors.Is(err, fs.ErrNotExist) { return s3response.CopyObjectResult{}, s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -1912,18 +1912,18 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput) return s3response.CopyObjectResult{}, fmt.Errorf("copy part data: %w", err) } - err = f.link() - if err != nil { - return s3response.CopyObjectResult{}, fmt.Errorf("link object in namespace: %w", err) - } - dataSum := hash.Sum(nil) etag := hex.EncodeToString(dataSum) - err = p.meta.StoreAttribute(*upi.Bucket, partPath, etagkey, []byte(etag)) + err = p.meta.StoreAttribute(f.File(), *upi.Bucket, partPath, etagkey, []byte(etag)) if err != nil { return s3response.CopyObjectResult{}, fmt.Errorf("set etag attr: %w", err) } + err = f.link() + if err != nil { + return s3response.CopyObjectResult{}, fmt.Errorf("link object in namespace: %w", err) + } + fi, err = os.Stat(filepath.Join(*upi.Bucket, partPath)) if err != nil { return s3response.CopyObjectResult{}, fmt.Errorf("stat part path: %w", err) @@ -1999,7 +1999,7 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons } for k, v := range po.Metadata { - err := p.meta.StoreAttribute(*po.Bucket, *po.Key, + err := p.meta.StoreAttribute(nil, *po.Bucket, *po.Key, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v)) if err != nil { return s3response.PutObjectOutput{}, fmt.Errorf("set user attr %q: %w", k, err) @@ -2007,7 +2007,7 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons } // set etag attribute to signify this dir was specifically put - err = p.meta.StoreAttribute(*po.Bucket, *po.Key, etagkey, + err = p.meta.StoreAttribute(nil, *po.Bucket, *po.Key, etagkey, []byte(emptyMD5)) if err != nil { return s3response.PutObjectOutput{}, fmt.Errorf("set etag attr: %w", err) @@ -2066,6 +2066,7 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons } return s3response.PutObjectOutput{}, fmt.Errorf("write object data: %w", err) } + dir := filepath.Dir(name) if dir != "" { err = backend.MkdirAll(dir, uid, gid, doChown) @@ -2074,22 +2075,73 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons } } - err = f.link() - if err != nil { - return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrExistingObjectIsDirectory) + dataSum := hash.Sum(nil) + etag := hex.EncodeToString(dataSum[:]) + + // if the versioning is enabled, generate a new versionID for the object + var versionID string + if p.versioningEnabled() && vEnabled { + versionID = ulid.Make().String() } for k, v := range po.Metadata { - err := p.meta.StoreAttribute(*po.Bucket, *po.Key, + err := p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v)) if err != nil { return s3response.PutObjectOutput{}, fmt.Errorf("set user attr %q: %w", k, err) } } + err = p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, etagkey, []byte(etag)) + if err != nil { + return s3response.PutObjectOutput{}, fmt.Errorf("set etag attr: %w", err) + } + + ctype := getString(po.ContentType) + if ctype != "" { + err := p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, contentTypeHdr, + []byte(*po.ContentType)) + if err != nil { + return s3response.PutObjectOutput{}, fmt.Errorf("set content-type attr: %w", err) + } + } + + cenc := getString(po.ContentEncoding) + if cenc != "" { + err := p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, contentEncHdr, + []byte(*po.ContentEncoding)) + if err != nil { + return s3response.PutObjectOutput{}, fmt.Errorf("set content-encoding attr: %w", err) + } + } + + if versionID != "" { + err := p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, versionIdKey, []byte(versionID)) + if err != nil { + return s3response.PutObjectOutput{}, fmt.Errorf("set versionId attr: %w", err) + } + } + + err = f.link() + if errors.Is(err, syscall.EEXIST) { + return s3response.PutObjectOutput{ + ETag: etag, + VersionID: versionID, + }, nil + } + if err != nil { + return s3response.PutObjectOutput{}, s3err.GetAPIError(s3err.ErrExistingObjectIsDirectory) + } + // Set object tagging if tagsStr != "" { err := p.PutObjectTagging(ctx, *po.Bucket, *po.Key, tags) + if errors.Is(err, fs.ErrNotExist) { + return s3response.PutObjectOutput{ + ETag: etag, + VersionID: versionID, + }, nil + } if err != nil { return s3response.PutObjectOutput{}, err } @@ -2097,7 +2149,8 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons // Set object legal hold if po.ObjectLockLegalHoldStatus == types.ObjectLockLegalHoldStatusOn { - if err := p.PutObjectLegalHold(ctx, *po.Bucket, *po.Key, "", true); err != nil { + err := p.PutObjectLegalHold(ctx, *po.Bucket, *po.Key, "", true) + if err != nil { return s3response.PutObjectOutput{}, err } } @@ -2112,43 +2165,9 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons if err != nil { return s3response.PutObjectOutput{}, fmt.Errorf("parse object lock retention: %w", err) } - if err := p.PutObjectRetention(ctx, *po.Bucket, *po.Key, "", true, retParsed); err != nil { - return s3response.PutObjectOutput{}, err - } - } - - dataSum := hash.Sum(nil) - etag := hex.EncodeToString(dataSum[:]) - err = p.meta.StoreAttribute(*po.Bucket, *po.Key, etagkey, []byte(etag)) - if err != nil { - return s3response.PutObjectOutput{}, fmt.Errorf("set etag attr: %w", err) - } - - ctype := getString(po.ContentType) - if ctype != "" { - err := p.meta.StoreAttribute(*po.Bucket, *po.Key, contentTypeHdr, - []byte(*po.ContentType)) + err = p.PutObjectRetention(ctx, *po.Bucket, *po.Key, "", true, retParsed) if err != nil { - return s3response.PutObjectOutput{}, fmt.Errorf("set content-type attr: %w", err) - } - } - - cenc := getString(po.ContentEncoding) - if cenc != "" { - err := p.meta.StoreAttribute(*po.Bucket, *po.Key, contentEncHdr, - []byte(*po.ContentEncoding)) - if err != nil { - return s3response.PutObjectOutput{}, fmt.Errorf("set content-encoding attr: %w", err) - } - } - - // if the versioning is enabled, generate a new versionID for the object - var versionID string - if p.versioningEnabled() && vEnabled { - versionID = ulid.Make().String() - - if err := p.meta.StoreAttribute(*po.Bucket, *po.Key, versionIdKey, []byte(versionID)); err != nil { - return s3response.PutObjectOutput{}, fmt.Errorf("set versionId attr: %w", err) + return s3response.PutObjectOutput{}, err } } @@ -2213,12 +2232,14 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) ( } // Mark the object as a delete marker - if err := p.meta.StoreAttribute(bucket, object, deleteMarkerKey, []byte{}); err != nil { + err = p.meta.StoreAttribute(nil, bucket, object, deleteMarkerKey, []byte{}) + if err != nil { return nil, fmt.Errorf("set delete marker: %w", err) } // Generate & set a unique versionId for the delete marker versionId := ulid.Make().String() - if err := p.meta.StoreAttribute(bucket, object, versionIdKey, []byte(versionId)); err != nil { + err = p.meta.StoreAttribute(nil, bucket, object, versionIdKey, []byte(versionId)) + if err != nil { return nil, fmt.Errorf("set versionId: %w", err) } @@ -2229,7 +2250,7 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) ( } else { versionPath := p.genObjVersionPath(bucket, object) - vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey) + vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey) if err != nil && !errors.Is(err, meta.ErrNoSuchKey) && !errors.Is(err, fs.ErrNotExist) { return nil, fmt.Errorf("get obj versionId: %w", err) } @@ -2302,17 +2323,19 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) ( } for _, attr := range attrs { - data, err := p.meta.RetrieveAttribute(versionPath, srcVersionId, attr) + data, err := p.meta.RetrieveAttribute(nil, versionPath, srcVersionId, attr) if err != nil { return nil, fmt.Errorf("load %v attribute", attr) } - if err := p.meta.StoreAttribute(bucket, object, attr, data); err != nil { + err = p.meta.StoreAttribute(nil, bucket, object, attr, data) + if err != nil { return nil, fmt.Errorf("store %v attribute", attr) } } - if err := os.Remove(filepath.Join(versionPath, srcVersionId)); err != nil { + err = os.Remove(filepath.Join(versionPath, srcVersionId)) + if err != nil { return nil, fmt.Errorf("remove obj version %w", err) } @@ -2403,7 +2426,7 @@ func (p *Posix) removeParents(bucket, object string) error { break } - _, err := p.meta.RetrieveAttribute(bucket, parent, etagkey) + _, err := p.meta.RetrieveAttribute(nil, bucket, parent, etagkey) if err == nil { // a directory with a valid etag means this was specifically // uploaded with a put object, so stop here and leave this @@ -2497,7 +2520,7 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetO object := *input.Key if versionId != "" { - vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey) + vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey) if errors.Is(err, fs.ErrNotExist) { return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -2586,7 +2609,7 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetO _, contentEncoding := p.loadUserMetaData(bucket, object, userMetaData) contentType := backend.DirContentType - b, err := p.meta.RetrieveAttribute(bucket, object, etagkey) + b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey) etag := string(b) if err != nil { etag = "" @@ -2619,7 +2642,7 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetO // If versioning is configured get the object versionId if p.versioningEnabled() && versionId == "" { - vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey) + vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey) if errors.Is(err, meta.ErrNoSuchKey) { versionId = nullVersionId } else if err != nil { @@ -2633,7 +2656,7 @@ func (p *Posix) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.GetO contentType, contentEncoding := p.loadUserMetaData(bucket, object, userMetaData) - b, err := p.meta.RetrieveAttribute(bucket, object, etagkey) + b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey) etag := string(b) if err != nil { etag = "" @@ -2718,7 +2741,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. return nil, fmt.Errorf("stat part: %w", err) } - b, err := p.meta.RetrieveAttribute(bucket, partPath, etagkey) + b, err := p.meta.RetrieveAttribute(nil, bucket, partPath, etagkey) etag := string(b) if err != nil { etag = "" @@ -2744,7 +2767,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. } if *input.VersionId != "" { - vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey) + vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey) if errors.Is(err, fs.ErrNotExist) { return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -2806,7 +2829,7 @@ func (p *Posix) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s3. contentType = backend.DirContentType } - b, err := p.meta.RetrieveAttribute(bucket, object, etagkey) + b, err := p.meta.RetrieveAttribute(nil, bucket, object, etagkey) etag := string(b) if err != nil { etag = "" @@ -2908,7 +2931,7 @@ func (p *Posix) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3. if !p.versioningEnabled() || !vEnabled { return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId) } - vId, err := p.meta.RetrieveAttribute(srcBucket, srcObject, versionIdKey) + vId, err := p.meta.RetrieveAttribute(nil, srcBucket, srcObject, versionIdKey) if errors.Is(err, fs.ErrNotExist) { return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -2977,16 +3000,16 @@ func (p *Posix) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3. } } for k, v := range input.Metadata { - err := p.meta.StoreAttribute(dstBucket, dstObject, + err := p.meta.StoreAttribute(nil, dstBucket, dstObject, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v)) if err != nil { return nil, fmt.Errorf("set user attr %q: %w", k, err) } } - b, _ := p.meta.RetrieveAttribute(dstBucket, dstObject, etagkey) + b, _ := p.meta.RetrieveAttribute(nil, dstBucket, dstObject, etagkey) etag = string(b) - vId, _ := p.meta.RetrieveAttribute(dstBucket, dstObject, versionIdKey) + vId, _ := p.meta.RetrieveAttribute(nil, dstBucket, dstObject, versionIdKey) if errors.Is(err, fs.ErrNotExist) { return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -3078,7 +3101,7 @@ func (p *Posix) fileToObj(bucket string) backend.GetObjFunc { if d.IsDir() { // directory object only happens if directory empty // check to see if this is a directory object by checking etag - etagBytes, err := p.meta.RetrieveAttribute(bucket, path, etagkey) + etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey) if errors.Is(err, meta.ErrNoSuchKey) || errors.Is(err, fs.ErrNotExist) { return s3response.Object{}, backend.ErrSkipObj } @@ -3114,7 +3137,7 @@ func (p *Posix) fileToObj(bucket string) backend.GetObjFunc { } // file object, get object info and fill out object data - etagBytes, err := p.meta.RetrieveAttribute(bucket, path, etagkey) + etagBytes, err := p.meta.RetrieveAttribute(nil, bucket, path, etagkey) if errors.Is(err, fs.ErrNotExist) { return s3response.Object{}, backend.ErrSkipObj } @@ -3218,7 +3241,8 @@ func (p *Posix) PutBucketAcl(_ context.Context, bucket string, data []byte) erro return fmt.Errorf("stat bucket: %w", err) } - if err := p.meta.StoreAttribute(bucket, "", aclkey, data); err != nil { + err = p.meta.StoreAttribute(nil, bucket, "", aclkey, data) + if err != nil { return fmt.Errorf("set acl: %w", err) } @@ -3237,7 +3261,7 @@ func (p *Posix) GetBucketAcl(_ context.Context, input *s3.GetBucketAclInput) ([] return nil, fmt.Errorf("stat bucket: %w", err) } - b, err := p.meta.RetrieveAttribute(*input.Bucket, "", aclkey) + b, err := p.meta.RetrieveAttribute(nil, *input.Bucket, "", aclkey) if errors.Is(err, meta.ErrNoSuchKey) { return []byte{}, nil } @@ -3270,7 +3294,7 @@ func (p *Posix) PutBucketTagging(_ context.Context, bucket string, tags map[stri return fmt.Errorf("marshal tags: %w", err) } - err = p.meta.StoreAttribute(bucket, "", tagHdr, b) + err = p.meta.StoreAttribute(nil, bucket, "", tagHdr, b) if err != nil { return fmt.Errorf("set tags: %w", err) } @@ -3313,7 +3337,7 @@ func (p *Posix) GetObjectTagging(_ context.Context, bucket, object string) (map[ func (p *Posix) getAttrTags(bucket, object string) (map[string]string, error) { tags := make(map[string]string) - b, err := p.meta.RetrieveAttribute(bucket, object, tagHdr) + b, err := p.meta.RetrieveAttribute(nil, bucket, object, tagHdr) if errors.Is(err, fs.ErrNotExist) { return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -3360,7 +3384,7 @@ func (p *Posix) PutObjectTagging(_ context.Context, bucket, object string, tags return fmt.Errorf("marshal tags: %w", err) } - err = p.meta.StoreAttribute(bucket, object, tagHdr, b) + err = p.meta.StoreAttribute(nil, bucket, object, tagHdr, b) if errors.Is(err, fs.ErrNotExist) { return s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -3397,7 +3421,7 @@ func (p *Posix) PutBucketPolicy(ctx context.Context, bucket string, policy []byt return nil } - err = p.meta.StoreAttribute(bucket, "", policykey, policy) + err = p.meta.StoreAttribute(nil, bucket, "", policykey, policy) if err != nil { return fmt.Errorf("set policy: %w", err) } @@ -3414,7 +3438,7 @@ func (p *Posix) GetBucketPolicy(ctx context.Context, bucket string) ([]byte, err return nil, fmt.Errorf("stat bucket: %w", err) } - policy, err := p.meta.RetrieveAttribute(bucket, "", policykey) + policy, err := p.meta.RetrieveAttribute(nil, bucket, "", policykey) if errors.Is(err, meta.ErrNoSuchKey) { return nil, s3err.GetAPIError(s3err.ErrNoSuchBucketPolicy) } @@ -3441,7 +3465,7 @@ func (p *Posix) PutObjectLockConfiguration(ctx context.Context, bucket string, c return fmt.Errorf("stat bucket: %w", err) } - cfg, err := p.meta.RetrieveAttribute(bucket, "", bucketLockKey) + cfg, err := p.meta.RetrieveAttribute(nil, bucket, "", bucketLockKey) if errors.Is(err, meta.ErrNoSuchKey) { return s3err.GetAPIError(s3err.ErrObjectLockConfigurationNotAllowed) } @@ -3458,7 +3482,8 @@ func (p *Posix) PutObjectLockConfiguration(ctx context.Context, bucket string, c return s3err.GetAPIError(s3err.ErrObjectLockConfigurationNotAllowed) } - if err := p.meta.StoreAttribute(bucket, "", bucketLockKey, config); err != nil { + err = p.meta.StoreAttribute(nil, bucket, "", bucketLockKey, config) + if err != nil { return fmt.Errorf("set object lock config: %w", err) } @@ -3474,7 +3499,7 @@ func (p *Posix) GetObjectLockConfiguration(_ context.Context, bucket string) ([] return nil, fmt.Errorf("stat bucket: %w", err) } - cfg, err := p.meta.RetrieveAttribute(bucket, "", bucketLockKey) + cfg, err := p.meta.RetrieveAttribute(nil, bucket, "", bucketLockKey) if errors.Is(err, meta.ErrNoSuchKey) { return nil, s3err.GetAPIError(s3err.ErrObjectLockConfigurationNotFound) } @@ -3494,7 +3519,7 @@ func (p *Posix) PutObjectLegalHold(_ context.Context, bucket, object, versionId return fmt.Errorf("stat bucket: %w", err) } - cfg, err := p.meta.RetrieveAttribute(bucket, "", bucketLockKey) + cfg, err := p.meta.RetrieveAttribute(nil, bucket, "", bucketLockKey) if errors.Is(err, meta.ErrNoSuchKey) { return s3err.GetAPIError(s3err.ErrInvalidBucketObjectLockConfiguration) } @@ -3523,7 +3548,7 @@ func (p *Posix) PutObjectLegalHold(_ context.Context, bucket, object, versionId //TODO: Maybe we need to return our custom error here? return s3err.GetAPIError(s3err.ErrInvalidVersionId) } - vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey) + vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey) if errors.Is(err, fs.ErrNotExist) { return s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -3537,7 +3562,7 @@ func (p *Posix) PutObjectLegalHold(_ context.Context, bucket, object, versionId } } - err = p.meta.StoreAttribute(bucket, object, objectLegalHoldKey, statusData) + err = p.meta.StoreAttribute(nil, bucket, object, objectLegalHoldKey, statusData) if errors.Is(err, fs.ErrNotExist) { if versionId != "" { return s3err.GetAPIError(s3err.ErrInvalidVersionId) @@ -3565,7 +3590,7 @@ func (p *Posix) GetObjectLegalHold(_ context.Context, bucket, object, versionId //TODO: Maybe we need to return our custom error here? return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId) } - vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey) + vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey) if errors.Is(err, fs.ErrNotExist) { return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -3579,7 +3604,7 @@ func (p *Posix) GetObjectLegalHold(_ context.Context, bucket, object, versionId } } - data, err := p.meta.RetrieveAttribute(bucket, object, objectLegalHoldKey) + data, err := p.meta.RetrieveAttribute(nil, bucket, object, objectLegalHoldKey) if errors.Is(err, fs.ErrNotExist) { if versionId != "" { return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId) @@ -3607,7 +3632,7 @@ func (p *Posix) PutObjectRetention(_ context.Context, bucket, object, versionId return fmt.Errorf("stat bucket: %w", err) } - cfg, err := p.meta.RetrieveAttribute(bucket, "", bucketLockKey) + cfg, err := p.meta.RetrieveAttribute(nil, bucket, "", bucketLockKey) if errors.Is(err, meta.ErrNoSuchKey) { return s3err.GetAPIError(s3err.ErrInvalidBucketObjectLockConfiguration) } @@ -3629,7 +3654,7 @@ func (p *Posix) PutObjectRetention(_ context.Context, bucket, object, versionId //TODO: Maybe we need to return our custom error here? return s3err.GetAPIError(s3err.ErrInvalidVersionId) } - vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey) + vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey) if errors.Is(err, fs.ErrNotExist) { return s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -3643,7 +3668,7 @@ func (p *Posix) PutObjectRetention(_ context.Context, bucket, object, versionId } } - objectLockCfg, err := p.meta.RetrieveAttribute(bucket, object, objectRetentionKey) + objectLockCfg, err := p.meta.RetrieveAttribute(nil, bucket, object, objectRetentionKey) if errors.Is(err, fs.ErrNotExist) { if versionId != "" { return s3err.GetAPIError(s3err.ErrInvalidVersionId) @@ -3651,7 +3676,8 @@ func (p *Posix) PutObjectRetention(_ context.Context, bucket, object, versionId return s3err.GetAPIError(s3err.ErrNoSuchKey) } if errors.Is(err, meta.ErrNoSuchKey) { - if err := p.meta.StoreAttribute(bucket, object, objectRetentionKey, retention); err != nil { + err := p.meta.StoreAttribute(nil, bucket, object, objectRetentionKey, retention) + if err != nil { return fmt.Errorf("set object lock config: %w", err) } @@ -3677,7 +3703,8 @@ func (p *Posix) PutObjectRetention(_ context.Context, bucket, object, versionId } } - if err := p.meta.StoreAttribute(bucket, object, objectRetentionKey, retention); err != nil { + err = p.meta.StoreAttribute(nil, bucket, object, objectRetentionKey, retention) + if err != nil { return fmt.Errorf("set object lock config: %w", err) } @@ -3698,7 +3725,7 @@ func (p *Posix) GetObjectRetention(_ context.Context, bucket, object, versionId //TODO: Maybe we need to return our custom error here? return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId) } - vId, err := p.meta.RetrieveAttribute(bucket, object, versionIdKey) + vId, err := p.meta.RetrieveAttribute(nil, bucket, object, versionIdKey) if errors.Is(err, fs.ErrNotExist) { return nil, s3err.GetAPIError(s3err.ErrNoSuchKey) } @@ -3712,7 +3739,7 @@ func (p *Posix) GetObjectRetention(_ context.Context, bucket, object, versionId } } - data, err := p.meta.RetrieveAttribute(bucket, object, objectRetentionKey) + data, err := p.meta.RetrieveAttribute(nil, bucket, object, objectRetentionKey) if errors.Is(err, fs.ErrNotExist) { if versionId != "" { return nil, s3err.GetAPIError(s3err.ErrInvalidVersionId) @@ -3749,7 +3776,7 @@ func (p *Posix) ListBucketsAndOwners(ctx context.Context) (buckets []s3response. continue } - aclTag, err := p.meta.RetrieveAttribute(entry.Name(), "", aclkey) + aclTag, err := p.meta.RetrieveAttribute(nil, entry.Name(), "", aclkey) if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { return buckets, fmt.Errorf("get acl tag: %w", err) } diff --git a/backend/posix/with_otmpfile.go b/backend/posix/with_otmpfile.go index fc49827f..cc41da26 100644 --- a/backend/posix/with_otmpfile.go +++ b/backend/posix/with_otmpfile.go @@ -134,6 +134,9 @@ func (tmp *tmpfile) falloc() error { } func (tmp *tmpfile) link() error { + // make sure this is cleaned up in all error cases + defer tmp.f.Close() + // We use Linkat/Rename as the atomic operation for object puts. The // upload is written to a temp (or unnamed/O_TMPFILE) file to not conflict // with any other simultaneous uploads. The final operation is to move the @@ -170,11 +173,21 @@ func (tmp *tmpfile) link() error { } defer dirf.Close() - err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()), - int(dirf.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW) - if err != nil { - return fmt.Errorf("link tmpfile (%q in %q): %w", - filepath.Dir(objPath), filepath.Base(tmp.f.Name()), err) + for { + err = unix.Linkat(int(procdir.Fd()), filepath.Base(tmp.f.Name()), + int(dirf.Fd()), filepath.Base(objPath), unix.AT_SYMLINK_FOLLOW) + if errors.Is(err, syscall.EEXIST) { + err := os.Remove(objPath) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return fmt.Errorf("remove stale path: %w", err) + } + continue + } + if err != nil { + return fmt.Errorf("link tmpfile (fd %q as %q): %w", + filepath.Base(tmp.f.Name()), objPath, err) + } + break } err = tmp.f.Close() diff --git a/backend/scoutfs/scoutfs.go b/backend/scoutfs/scoutfs.go index 59c44448..c57651de 100644 --- a/backend/scoutfs/scoutfs.go +++ b/backend/scoutfs/scoutfs.go @@ -226,7 +226,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet return nil, s3err.GetAPIError(s3err.ErrInvalidPart) } - b, err := s.meta.RetrieveAttribute(bucket, partObjPath, etagkey) + b, err := s.meta.RetrieveAttribute(nil, bucket, partObjPath, etagkey) etag := string(b) if err != nil { etag = "" @@ -262,7 +262,7 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet // scoutfs move data is a metadata only operation that moves the data // extent references from the source, appeding to the destination. // this needs to be 4k aligned. - err = moveData(pf, f.f) + err = moveData(pf, f.File()) pf.Close() if err != nil { return nil, fmt.Errorf("move blocks part %v: %v", *part.PartNumber, err) @@ -282,78 +282,71 @@ func (s *ScoutFS) CompleteMultipartUpload(ctx context.Context, input *s3.Complet return nil, err } } - err = f.link() - if err != nil { - return nil, fmt.Errorf("link object in namespace: %w", err) - } for k, v := range userMetaData { - err = s.meta.StoreAttribute(bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v)) + err = s.meta.StoreAttribute(f.File(), bucket, object, fmt.Sprintf("%v.%v", metaHdr, k), []byte(v)) if err != nil { - // cleanup object if returning error - os.Remove(objname) return nil, fmt.Errorf("set user attr %q: %w", k, err) } } // load and set tagging - tagging, err := s.meta.RetrieveAttribute(bucket, upiddir, tagHdr) + tagging, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, tagHdr) + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object tagging: %w", err) + } if err == nil { - if err := s.meta.StoreAttribute(bucket, object, tagHdr, tagging); err != nil { - // cleanup object - os.Remove(objname) + err := s.meta.StoreAttribute(f.File(), bucket, object, tagHdr, tagging) + if err != nil { return nil, fmt.Errorf("set object tagging: %w", err) } } - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return nil, fmt.Errorf("get object tagging: %w", err) - } // set content-type if cType != "" { - if err := s.meta.StoreAttribute(bucket, object, contentTypeHdr, []byte(cType)); err != nil { - // cleanup object - os.Remove(objname) + err := s.meta.StoreAttribute(f.File(), bucket, object, contentTypeHdr, []byte(cType)) + if err != nil { return nil, fmt.Errorf("set object content type: %w", err) } } // load and set legal hold - lHold, err := s.meta.RetrieveAttribute(bucket, upiddir, objectLegalHoldKey) + lHold, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectLegalHoldKey) + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object legal hold: %w", err) + } if err == nil { - if err := s.meta.StoreAttribute(bucket, object, objectLegalHoldKey, lHold); err != nil { - // cleanup object - os.Remove(objname) + err := s.meta.StoreAttribute(f.File(), bucket, object, objectLegalHoldKey, lHold) + if err != nil { return nil, fmt.Errorf("set object legal hold: %w", err) } } - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return nil, fmt.Errorf("get object legal hold: %w", err) - } // load and set retention - ret, err := s.meta.RetrieveAttribute(bucket, upiddir, objectRetentionKey) + ret, err := s.meta.RetrieveAttribute(nil, bucket, upiddir, objectRetentionKey) + if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { + return nil, fmt.Errorf("get object retention: %w", err) + } if err == nil { - if err := s.meta.StoreAttribute(bucket, object, objectRetentionKey, ret); err != nil { - // cleanup object - os.Remove(objname) + err := s.meta.StoreAttribute(f.File(), bucket, object, objectRetentionKey, ret) + if err != nil { return nil, fmt.Errorf("set object retention: %w", err) } } - if err != nil && !errors.Is(err, meta.ErrNoSuchKey) { - return nil, fmt.Errorf("get object retention: %w", err) - } // Calculate s3 compatible md5sum for complete multipart. s3MD5 := backend.GetMultipartMD5(parts) - err = s.meta.StoreAttribute(bucket, object, etagkey, []byte(s3MD5)) + err = s.meta.StoreAttribute(f.File(), bucket, object, etagkey, []byte(s3MD5)) if err != nil { - // cleanup object if returning error - os.Remove(objname) return nil, fmt.Errorf("set etag attr: %w", err) } + err = f.link() + if err != nil { + return nil, fmt.Errorf("link object in namespace: %w", err) + } + // cleanup tmp dirs os.RemoveAll(upiddir) // use Remove for objdir in case there are still other uploads @@ -392,7 +385,7 @@ func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) ( if !isValidMeta(e) { continue } - b, err := s.meta.RetrieveAttribute(bucket, object, e) + b, err := s.meta.RetrieveAttribute(nil, bucket, object, e) if err != nil { continue } @@ -404,13 +397,13 @@ func (s *ScoutFS) loadUserMetaData(bucket, object string, m map[string]string) ( } var contentType, contentEncoding string - b, _ := s.meta.RetrieveAttribute(bucket, object, contentTypeHdr) + b, _ := s.meta.RetrieveAttribute(nil, bucket, object, contentTypeHdr) contentType = string(b) if contentType != "" { m[contentTypeHdr] = contentType } - b, _ = s.meta.RetrieveAttribute(bucket, object, contentEncHdr) + b, _ = s.meta.RetrieveAttribute(nil, bucket, object, contentEncHdr) contentEncoding = string(b) if contentEncoding != "" { m[contentEncHdr] = contentEncoding @@ -466,7 +459,7 @@ func (s *ScoutFS) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s return nil, fmt.Errorf("stat part: %w", err) } - b, err := s.meta.RetrieveAttribute(bucket, partPath, etagkey) + b, err := s.meta.RetrieveAttribute(nil, bucket, partPath, etagkey) etag := string(b) if err != nil { etag = "" @@ -514,7 +507,7 @@ func (s *ScoutFS) HeadObject(ctx context.Context, input *s3.HeadObjectInput) (*s contentType = "application/x-directory" } - b, err := s.meta.RetrieveAttribute(bucket, object, etagkey) + b, err := s.meta.RetrieveAttribute(nil, bucket, object, etagkey) etag := string(b) if err != nil { etag = "" @@ -685,7 +678,7 @@ func (s *ScoutFS) GetObject(_ context.Context, input *s3.GetObjectInput) (*s3.Ge contentType, contentEncoding := s.loadUserMetaData(bucket, object, userMetaData) - b, err := s.meta.RetrieveAttribute(bucket, object, etagkey) + b, err := s.meta.RetrieveAttribute(nil, bucket, object, etagkey) etag := string(b) if err != nil { etag = "" @@ -840,7 +833,7 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc { if d.IsDir() { // directory object only happens if directory empty // check to see if this is a directory object by checking etag - etagBytes, err := s.meta.RetrieveAttribute(bucket, path, etagkey) + etagBytes, err := s.meta.RetrieveAttribute(nil, bucket, path, etagkey) if errors.Is(err, meta.ErrNoSuchKey) || errors.Is(err, fs.ErrNotExist) { return s3response.Object{}, backend.ErrSkipObj } @@ -869,7 +862,7 @@ func (s *ScoutFS) fileToObj(bucket string) backend.GetObjFunc { } // file object, get object info and fill out object data - b, err := s.meta.RetrieveAttribute(bucket, path, etagkey) + b, err := s.meta.RetrieveAttribute(nil, bucket, path, etagkey) if errors.Is(err, fs.ErrNotExist) { return s3response.Object{}, backend.ErrSkipObj } diff --git a/backend/scoutfs/scoutfs_compat.go b/backend/scoutfs/scoutfs_compat.go index fe937636..9482cc07 100644 --- a/backend/scoutfs/scoutfs_compat.go +++ b/backend/scoutfs/scoutfs_compat.go @@ -174,6 +174,10 @@ func (tmp *tmpfile) cleanup() { tmp.f.Close() } +func (tmp *tmpfile) File() *os.File { + return tmp.f +} + func moveData(from *os.File, to *os.File) error { return scoutfs.MoveData(from, to) } diff --git a/backend/scoutfs/scoutfs_incompat.go b/backend/scoutfs/scoutfs_incompat.go index 4a5fda2b..8d3f09c1 100644 --- a/backend/scoutfs/scoutfs_incompat.go +++ b/backend/scoutfs/scoutfs_incompat.go @@ -28,9 +28,7 @@ func New(rootdir string, opts ScoutfsOpts) (*ScoutFS, error) { return nil, fmt.Errorf("scoutfs only available on linux") } -type tmpfile struct { - f *os.File -} +type tmpfile struct{} var ( errNotSupported = errors.New("not supported") @@ -56,6 +54,10 @@ func (tmp *tmpfile) Write(b []byte) (int, error) { func (tmp *tmpfile) cleanup() { } +func (tmp *tmpfile) File() *os.File { + return nil +} + func moveData(_, _ *os.File) error { return errNotSupported } diff --git a/cmd/versitygw/gateway_test.go b/cmd/versitygw/gateway_test.go index 505d2f80..14369b4d 100644 --- a/cmd/versitygw/gateway_test.go +++ b/cmd/versitygw/gateway_test.go @@ -7,6 +7,7 @@ import ( "path/filepath" "sync" "testing" + "time" "github.com/versity/versitygw/backend/meta" "github.com/versity/versitygw/backend/posix" @@ -75,6 +76,9 @@ func initPosix(ctx context.Context) { } wg.Done() }() + + // wait for server to start + time.Sleep(1 * time.Second) } func TestIntegration(t *testing.T) { diff --git a/go.mod b/go.mod index 46a4443d..3768b9da 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/urfave/cli/v2 v2.27.4 github.com/valyala/fasthttp v1.56.0 github.com/versity/scoutfs-go v0.0.0-20240325223134-38eb2f5f7d44 + golang.org/x/sync v0.8.0 golang.org/x/sys v0.26.0 ) diff --git a/go.sum b/go.sum index 1ef6f24a..6fdb4aad 100644 --- a/go.sum +++ b/go.sum @@ -215,6 +215,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/tests/integration/group-tests.go b/tests/integration/group-tests.go index 655b8977..c1fa7392 100644 --- a/tests/integration/group-tests.go +++ b/tests/integration/group-tests.go @@ -134,6 +134,7 @@ func TestPutObject(s *S3Conf) { PutObject_missing_object_lock_retention_config(s) PutObject_with_object_lock(s) PutObject_success(s) + PutObject_racey_success(s) PutObject_invalid_credentials(s) } @@ -304,6 +305,9 @@ func TestCompleteMultipartUpload(s *S3Conf) { CompleteMultipartUpload_invalid_part_number(s) CompleteMultipartUpload_invalid_ETag(s) CompleteMultipartUpload_success(s) + if !s.azureTests { + CompleteMultipartUpload_racey_success(s) + } } func TestPutBucketAcl(s *S3Conf) { @@ -571,17 +575,19 @@ func TestVersioning(s *S3Conf) { Versioning_Enable_object_lock(s) Versioning_status_switch_to_suspended_with_object_lock(s) // Object-Lock Retention - Versionsin_PutObjectRetention_invalid_versionId(s) + Versioning_PutObjectRetention_invalid_versionId(s) Versioning_GetObjectRetention_invalid_versionId(s) Versioning_Put_GetObjectRetention_success(s) // Object-Lock Legal hold - Versionsin_PutObjectLegalHold_invalid_versionId(s) + Versioning_PutObjectLegalHold_invalid_versionId(s) Versioning_GetObjectLegalHold_invalid_versionId(s) Versioning_Put_GetObjectLegalHold_success(s) // WORM protection Versioning_WORM_obj_version_locked_with_legal_hold(s) Versioning_WORM_obj_version_locked_with_governance_retention(s) Versioning_WORM_obj_version_locked_with_compliance_retention(s) + // Concurrent requests + //Versioninig_concurrent_upload_object(s) } func TestVersioningDisabled(s *S3Conf) { @@ -677,6 +683,7 @@ func GetIntTests() IntTests { "PutObject_special_chars": PutObject_special_chars, "PutObject_invalid_long_tags": PutObject_invalid_long_tags, "PutObject_success": PutObject_success, + "PutObject_racey_success": PutObject_racey_success, "HeadObject_non_existing_object": HeadObject_non_existing_object, "HeadObject_invalid_part_number": HeadObject_invalid_part_number, "HeadObject_non_existing_mp": HeadObject_non_existing_mp, @@ -790,6 +797,7 @@ func GetIntTests() IntTests { "CompleteMultipartUpload_invalid_part_number": CompleteMultipartUpload_invalid_part_number, "CompleteMultipartUpload_invalid_ETag": CompleteMultipartUpload_invalid_ETag, "CompleteMultipartUpload_success": CompleteMultipartUpload_success, + "CompleteMultipartUpload_racey_success": CompleteMultipartUpload_racey_success, "PutBucketAcl_non_existing_bucket": PutBucketAcl_non_existing_bucket, "PutBucketAcl_disabled": PutBucketAcl_disabled, "PutBucketAcl_none_of_the_options_specified": PutBucketAcl_none_of_the_options_specified, @@ -939,14 +947,15 @@ func GetIntTests() IntTests { "Versioning_UploadPartCopy_from_an_object_version": Versioning_UploadPartCopy_from_an_object_version, "Versioning_Enable_object_lock": Versioning_Enable_object_lock, "Versioning_status_switch_to_suspended_with_object_lock": Versioning_status_switch_to_suspended_with_object_lock, - "Versionsin_PutObjectRetention_invalid_versionId": Versionsin_PutObjectRetention_invalid_versionId, + "Versioning_PutObjectRetention_invalid_versionId": Versioning_PutObjectRetention_invalid_versionId, "Versioning_GetObjectRetention_invalid_versionId": Versioning_GetObjectRetention_invalid_versionId, "Versioning_Put_GetObjectRetention_success": Versioning_Put_GetObjectRetention_success, - "Versionsin_PutObjectLegalHold_invalid_versionId": Versionsin_PutObjectLegalHold_invalid_versionId, + "Versioning_PutObjectLegalHold_invalid_versionId": Versioning_PutObjectLegalHold_invalid_versionId, "Versioning_GetObjectLegalHold_invalid_versionId": Versioning_GetObjectLegalHold_invalid_versionId, "Versioning_Put_GetObjectLegalHold_success": Versioning_Put_GetObjectLegalHold_success, "Versioning_WORM_obj_version_locked_with_legal_hold": Versioning_WORM_obj_version_locked_with_legal_hold, "Versioning_WORM_obj_version_locked_with_governance_retention": Versioning_WORM_obj_version_locked_with_governance_retention, "Versioning_WORM_obj_version_locked_with_compliance_retention": Versioning_WORM_obj_version_locked_with_compliance_retention, + "Versioning_concurrent_upload_object": Versioning_concurrent_upload_object, } } diff --git a/tests/integration/s3conf.go b/tests/integration/s3conf.go index 1a862ed2..5e2cb844 100644 --- a/tests/integration/s3conf.go +++ b/tests/integration/s3conf.go @@ -115,6 +115,7 @@ func (c *S3Conf) Config() aws.Config { config.WithRegion(c.awsRegion), config.WithCredentialsProvider(creds), config.WithHTTPClient(client), + config.WithRetryMaxAttempts(1), } if c.checksumDisable { diff --git a/tests/integration/tests.go b/tests/integration/tests.go index ece769f2..a961e094 100644 --- a/tests/integration/tests.go +++ b/tests/integration/tests.go @@ -19,6 +19,7 @@ import ( "context" "crypto/rand" "crypto/sha256" + "encoding/hex" "encoding/xml" "errors" "fmt" @@ -27,6 +28,7 @@ import ( "net/url" "regexp" "strings" + "sync" "time" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -34,6 +36,7 @@ import ( "github.com/versity/versitygw/backend" "github.com/versity/versitygw/s3err" "github.com/versity/versitygw/s3response" + "golang.org/x/sync/errgroup" ) var ( @@ -2865,6 +2868,52 @@ func PutObject_with_object_lock(s *S3Conf) error { return nil } +func PutObject_racey_success(s *S3Conf) error { + testName := "PutObject_racey_success" + runF(testName) + bucket, obj, lockStatus := getBucketName(), "my-obj", true + + client := s3.NewFromConfig(s.Config()) + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err := client.CreateBucket(ctx, &s3.CreateBucketInput{ + Bucket: &bucket, + ObjectLockEnabledForBucket: &lockStatus, + }) + cancel() + if err != nil { + failF("%v: %v", testName, err) + return fmt.Errorf("%v: %w", testName, err) + } + + eg := errgroup.Group{} + for i := 0; i < 10; i++ { + eg.Go(func() error { + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + _, err := client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &obj, + }) + cancel() + return err + }) + } + err = eg.Wait() + + if err != nil { + failF("%v: %v", testName, err) + return fmt.Errorf("%v: %w", testName, err) + } + + err = teardown(s, bucket) + if err != nil { + failF("%v: %v", err) + return fmt.Errorf("%v: %w", testName, err) + } + + passF(testName) + return nil +} + func PutObject_success(s *S3Conf) error { testName := "PutObject_success" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { @@ -2943,7 +2992,7 @@ func HeadObject_mp_success(s *S3Conf) error { testName := "HeadObject_mp_success" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { obj := "my-obj" - partCount, partSize := 5, 1024 + partCount, partSize := int64(5), int64(1024) partNumber := int32(3) mp, err := createMp(s3client, bucket, obj) @@ -2951,7 +3000,7 @@ func HeadObject_mp_success(s *S3Conf) error { return err } - parts, err := uploadParts(s3client, partCount*partSize, partCount, bucket, obj, *mp.UploadId) + parts, _, err := uploadParts(s3client, partCount*partSize, partCount, bucket, obj, *mp.UploadId) if err != nil { return err } @@ -5245,7 +5294,7 @@ func CreateMultipartUpload_with_metadata(s *S3Conf) error { return err } - parts, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId) + parts, _, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId) if err != nil { return err } @@ -5319,7 +5368,7 @@ func CreateMultipartUpload_with_content_type(s *S3Conf) error { return err } - parts, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId) + parts, _, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId) if err != nil { return err } @@ -5382,7 +5431,7 @@ func CreateMultipartUpload_with_object_lock(s *S3Conf) error { return err } - parts, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId) + parts, _, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId) if err != nil { return err } @@ -5540,7 +5589,7 @@ func CreateMultipartUpload_with_tagging(s *S3Conf) error { return err } - parts, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId) + parts, _, err := uploadParts(s3client, 100, 1, bucket, obj, *out.UploadId) if err != nil { return err } @@ -6228,7 +6277,7 @@ func ListParts_truncated(s *S3Conf) error { return err } - parts, err := uploadParts(s3client, 5*1024*1024, 5, bucket, obj, *out.UploadId) + parts, _, err := uploadParts(s3client, 5*1024*1024, 5, bucket, obj, *out.UploadId) if err != nil { return err } @@ -6292,7 +6341,7 @@ func ListParts_success(s *S3Conf) error { return err } - parts, err := uploadParts(s3client, 5*1024*1024, 5, bucket, obj, *out.UploadId) + parts, _, err := uploadParts(s3client, 5*1024*1024, 5, bucket, obj, *out.UploadId) if err != nil { return err } @@ -6781,8 +6830,8 @@ func CompleteMultipartUpload_success(s *S3Conf) error { return err } - objSize := 5 * 1024 * 1024 - parts, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId) + objSize := int64(5 * 1024 * 1024) + parts, csum, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId) if err != nil { return err } @@ -6830,10 +6879,162 @@ func CompleteMultipartUpload_success(s *S3Conf) error { return fmt.Errorf("expected the uploaded object size to be %v, instead got %v", objSize, resp.ContentLength) } + ctx, cancel = context.WithTimeout(context.Background(), shortTimeout) + defer cancel() + rget, err := s3client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &bucket, + Key: &obj, + }) + if err != nil { + return err + } + + if *rget.ContentLength != int64(objSize) { + return fmt.Errorf("expected the object content-length to be %v, instead got %v", objSize, *rget.ContentLength) + } + + bdy, err := io.ReadAll(rget.Body) + if err != nil { + return err + } + defer rget.Body.Close() + + sum := sha256.Sum256(bdy) + getsum := hex.EncodeToString(sum[:]) + + if csum != getsum { + return fmt.Errorf("expected the object checksum to be %v, instead got %v", csum, getsum) + } + return nil }) } +type mpinfo struct { + uploadId *string + parts []types.CompletedPart +} + +func CompleteMultipartUpload_racey_success(s *S3Conf) error { + testName := "CompleteMultipartUpload_racey_success" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + + var mu sync.RWMutex + uploads := make([]mpinfo, 10) + sums := make([]string, 10) + objSize := int64(5 * 1024 * 1024) + + eg := errgroup.Group{} + for i := 0; i < 10; i++ { + func(i int) { + eg.Go(func() error { + out, err := createMp(s3client, bucket, obj) + if err != nil { + return err + } + + parts, csum, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId) + mu.Lock() + sums[i] = csum + mu.Unlock() + if err != nil { + return err + } + + compParts := []types.CompletedPart{} + for _, el := range parts { + compParts = append(compParts, types.CompletedPart{ + ETag: el.ETag, + PartNumber: el.PartNumber, + }) + } + + mu.Lock() + uploads[i] = mpinfo{ + uploadId: out.UploadId, + parts: compParts, + } + mu.Unlock() + return nil + }) + }(i) + } + + err := eg.Wait() + if err != nil { + return err + } + + eg = errgroup.Group{} + for i := 0; i < 10; i++ { + func(i int) { + eg.Go(func() error { + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + mu.RLock() + res, err := s3client.CompleteMultipartUpload(ctx, &s3.CompleteMultipartUploadInput{ + Bucket: &bucket, + Key: &obj, + UploadId: uploads[i].uploadId, + MultipartUpload: &types.CompletedMultipartUpload{ + Parts: uploads[i].parts, + }, + }) + mu.RUnlock() + cancel() + if err != nil { + fmt.Println("GOT ERROR: ", err) + return err + } + + if *res.Key != obj { + return fmt.Errorf("expected object key to be %v, instead got %v", obj, *res.Key) + } + + return nil + }) + }(i) + } + + err = eg.Wait() + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + defer cancel() + out, err := s3client.GetObject(ctx, &s3.GetObjectInput{ + Bucket: &bucket, + Key: &obj, + }) + if err != nil { + return err + } + + if *out.ContentLength != int64(objSize) { + return fmt.Errorf("expected the object content-length to be %v, instead got %v", objSize, *out.ContentLength) + } + + bdy, err := io.ReadAll(out.Body) + if err != nil { + return err + } + defer out.Body.Close() + + sum := sha256.Sum256(bdy) + csum := hex.EncodeToString(sum[:]) + + mu.RLock() + defer mu.RUnlock() + for _, s := range sums { + if csum == s { + return nil + } + } + return fmt.Errorf("expected the object checksum to be one of %v, instead got %v", sums, csum) + }) +} + func PutBucketAcl_non_existing_bucket(s *S3Conf) error { testName := "PutBucketAcl_non_existing_bucket" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { @@ -11594,8 +11795,8 @@ func Versioning_Multipart_Upload_success(s *S3Conf) error { return err } - objSize := 5 * 1024 * 1024 - parts, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId) + objSize := int64(5 * 1024 * 1024) + parts, _, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId) if err != nil { return err } @@ -11674,8 +11875,8 @@ func Versioning_Multipart_Upload_overwrite_an_object(s *S3Conf) error { return err } - objSize := 5 * 1024 * 1024 - parts, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId) + objSize := int64(5 * 1024 * 1024) + parts, _, err := uploadParts(s3client, objSize, 5, bucket, obj, *out.UploadId) if err != nil { return err } @@ -11900,8 +12101,8 @@ func Versioning_status_switch_to_suspended_with_object_lock(s *S3Conf) error { }, withLock()) } -func Versionsin_PutObjectRetention_invalid_versionId(s *S3Conf) error { - testName := "Versionsin_PutObjectRetention_invalid_versionId" +func Versioning_PutObjectRetention_invalid_versionId(s *S3Conf) error { + testName := "Versioning_PutObjectRetention_invalid_versionId" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { obj := "my-obj" _, err := createObjVersions(s3client, bucket, obj, 3) @@ -12002,8 +12203,8 @@ func Versioning_Put_GetObjectRetention_success(s *S3Conf) error { }, withLock(), withVersioning()) } -func Versionsin_PutObjectLegalHold_invalid_versionId(s *S3Conf) error { - testName := "Versionsin_PutObjectLegalHold_invalid_versionId" +func Versioning_PutObjectLegalHold_invalid_versionId(s *S3Conf) error { + testName := "Versioning_PutObjectLegalHold_invalid_versionId" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { obj := "my-obj" _, err := createObjVersions(s3client, bucket, obj, 3) @@ -12251,6 +12452,7 @@ func VersioningDisabled_GetBucketVersioning_not_configured(s *S3Conf) error { return nil }) } + func VersioningDisabled_PutBucketVersioning_not_configured(s *S3Conf) error { testName := "VersioningDisabled_PutBucketVersioning_not_configured" return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { @@ -12266,3 +12468,65 @@ func VersioningDisabled_PutBucketVersioning_not_configured(s *S3Conf) error { return nil }) } + +func Versioning_concurrent_upload_object(s *S3Conf) error { + testName := "Versioninig_concurrent_upload_object" + return actionHandler(s, testName, func(s3client *s3.Client, bucket string) error { + obj := "my-obj" + versionCount := 5 + // Channel to collect errors + errCh := make(chan error, versionCount) + + uploadVersion := func(wg *sync.WaitGroup) { + defer wg.Done() + + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + res, err := s3client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &obj, + }) + cancel() + if err != nil { + // Send error to the channel + errCh <- err + return + } + + fmt.Printf("uploaded object successfully: versionId: %v\n", *res.VersionId) + } + + wg := &sync.WaitGroup{} + wg.Add(versionCount) + + for i := 0; i < versionCount; i++ { + go uploadVersion(wg) + } + + wg.Wait() + close(errCh) + + // Check if there were any errors + for err := range errCh { + if err != nil { + fmt.Printf("error uploading an object: %v\n", err.Error()) + return err + } + } + + // List object versions after all uploads + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) + res, err := s3client.ListObjectVersions(ctx, &s3.ListObjectVersionsInput{ + Bucket: &bucket, + }) + cancel() + if err != nil { + return err + } + + if len(res.Versions) != versionCount { + return fmt.Errorf("expected %v object versions, instead got %v", versionCount, len(res.Versions)) + } + + return nil + }, withVersioning()) +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index e5e6a186..8b00a34b 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -28,7 +28,6 @@ import ( rnd "math/rand" "net/http" "net/url" - "os" "os/exec" "sort" "strings" @@ -707,39 +706,22 @@ func compareDelObjects(list1, list2 []types.DeletedObject) bool { return true } -func uploadParts(client *s3.Client, size, partCount int, bucket, key, uploadId string) (parts []types.Part, err error) { - dr := NewDataReader(size, size) - datafile := "rand.data" - w, err := os.Create(datafile) - if err != nil { - return parts, err - } - defer w.Close() - - _, err = io.Copy(w, dr) - if err != nil { - return parts, err - } - - fileInfo, err := w.Stat() - if err != nil { - return parts, err - } +func uploadParts(client *s3.Client, size, partCount int64, bucket, key, uploadId string) (parts []types.Part, csum string, err error) { + partSize := size / partCount - partSize := fileInfo.Size() / int64(partCount) - var offset int64 + hash := sha256.New() - for partNumber := int64(1); partNumber <= int64(partCount); partNumber++ { + for partNumber := int64(1); partNumber <= partCount; partNumber++ { partStart := (partNumber - 1) * partSize partEnd := partStart + partSize - 1 - if partEnd > fileInfo.Size()-1 { - partEnd = fileInfo.Size() - 1 + if partEnd > size-1 { + partEnd = size - 1 } + partBuffer := make([]byte, partEnd-partStart+1) - _, err := w.ReadAt(partBuffer, partStart) - if err != nil { - return parts, err - } + rand.Read(partBuffer) + hash.Write(partBuffer) + ctx, cancel := context.WithTimeout(context.Background(), shortTimeout) pn := int32(partNumber) out, err := client.UploadPart(ctx, &s3.UploadPartInput{ @@ -751,17 +733,20 @@ func uploadParts(client *s3.Client, size, partCount int, bucket, key, uploadId s }) cancel() if err != nil { - return parts, err + return parts, "", err } + parts = append(parts, types.Part{ ETag: out.ETag, PartNumber: &pn, Size: &partSize, }) - offset += partSize } - return parts, err + sum := hash.Sum(nil) + csum = hex.EncodeToString(sum[:]) + + return parts, csum, err } type user struct {