From df7cb3d0d7e49454352398756b11fcbb899b6a83 Mon Sep 17 00:00:00 2001 From: Steve Date: Sat, 9 Sep 2023 03:13:19 +0800 Subject: [PATCH] fix: multipart etag calculation --- s3/handlers/handlers_bucket.go | 2 + s3/handlers/handlers_multipart.go | 1 - s3/handlers/handlers_object.go | 2 +- s3/protocol/request.go | 1 - s3/requests/parsers.go | 2 +- s3/routers/routers.go | 1 - s3/services/object/proto.go | 15 ++- s3/services/object/service.go | 90 ++++++++++++++++- s3/services/object/service_bucket.go | 53 ++-------- s3/services/object/service_multipart.go | 48 +++------ s3/services/object/service_object.go | 123 ++---------------------- 11 files changed, 130 insertions(+), 208 deletions(-) diff --git a/s3/handlers/handlers_bucket.go b/s3/handlers/handlers_bucket.go index 8f26b417e..cdb3e2bad 100644 --- a/s3/handlers/handlers_bucket.go +++ b/s3/handlers/handlers_bucket.go @@ -16,6 +16,8 @@ func (h *Handlers) respErr(err error) (rerr *responses.Error) { switch err { case object.ErrBucketNotFound: rerr = responses.ErrNoSuchBucket + case object.ErrBucketeNotEmpty: + rerr = responses.ErrBucketNotEmpty case object.ErrObjectNotFound: rerr = responses.ErrNoSuchKey case object.ErrUploadNotFound: diff --git a/s3/handlers/handlers_multipart.go b/s3/handlers/handlers_multipart.go index a3c67bd93..efafc1299 100644 --- a/s3/handlers/handlers_multipart.go +++ b/s3/handlers/handlers_multipart.go @@ -247,4 +247,3 @@ func (h *Handlers) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http responses.WriteSuccessResponse(w, output, "CompleteMultipartUploadResult") } - diff --git a/s3/handlers/handlers_object.go b/s3/handlers/handlers_object.go index 3f28c65f6..f3ad3d3d5 100644 --- a/s3/handlers/handlers_object.go +++ b/s3/handlers/handlers_object.go @@ -243,7 +243,7 @@ func (h *Handlers) DeleteObjectsHandler(w http.ResponseWriter, r *http.Request) if input.Delete == nil || len(input.Delete.Objects) == 0 || - len(input.Delete.Objects) > consts.MaxObjectList { + len(input.Delete.Objects) > consts.MaxDeleteList { rerr := responses.ErrMalformedXML err = rerr responses.WriteErrorResponse(w, r, rerr) diff --git a/s3/protocol/request.go b/s3/protocol/request.go index fa0b2726b..031ef9a0b 100644 --- a/s3/protocol/request.go +++ b/s3/protocol/request.go @@ -159,7 +159,6 @@ func parseLocation(r *http.Request, inv reflect.Value) (err error) { fv = fv.Convert(byteSliceType) } - switch ft.Tag.Get("location") { case "headers": prefix := ft.Tag.Get("locationName") diff --git a/s3/requests/parsers.go b/s3/requests/parsers.go index 47b0f415c..d53e4af6a 100644 --- a/s3/requests/parsers.go +++ b/s3/requests/parsers.go @@ -95,4 +95,4 @@ func ParsePutBucketAclRequest(r *http.Request) (req *PutBucketACLRequest, rerr * } req.ACL, rerr = ParseBucketACL(r) return -} \ No newline at end of file +} diff --git a/s3/routers/routers.go b/s3/routers/routers.go index 94f4687ca..98ed85f41 100644 --- a/s3/routers/routers.go +++ b/s3/routers/routers.go @@ -25,7 +25,6 @@ func (routers *Routers) Register() http.Handler { hs := routers.handlers - root.Use( hs.Cors, hs.Log, diff --git a/s3/services/object/proto.go b/s3/services/object/proto.go index 949b50369..0d8ff5eaf 100644 --- a/s3/services/object/proto.go +++ b/s3/services/object/proto.go @@ -9,14 +9,12 @@ import ( ) var ( - ErrBucketNotFound = errors.New("bucket not found") - ErrObjectNotFound = errors.New("object not found") - ErrUploadNotFound = errors.New("upload not found") - ErrNotAllowed = errors.New("not allowed") - ErrBucketAlreadyExists = errors.New("bucket already exists") - ErrOperationTimeout = errors.New("operation timeout") - ErrContentSHA256Mismatch = errors.New("sha256 mismatch") - ErrBadDigest = errors.New("bad digest") + ErrBucketNotFound = errors.New("bucket not found") + ErrBucketeNotEmpty = errors.New("bucket not empty") + ErrObjectNotFound = errors.New("object not found") + ErrUploadNotFound = errors.New("upload not found") + ErrNotAllowed = errors.New("not allowed") + ErrBucketAlreadyExists = errors.New("bucket already exists") ) type Service interface { @@ -26,7 +24,6 @@ type Service interface { GetAllBuckets(ctx context.Context, user string) (list []*Bucket, err error) PutBucketACL(ctx context.Context, user, bucname, acl string) (err error) GetBucketACL(ctx context.Context, user, bucname string) (acl string, err error) - EmptyBucket(ctx context.Context, user, bucname string) (empty bool, err error) PutObject(ctx context.Context, user, bucname, objname string, body *hash.Reader, size int64, meta map[string]string) (object *Object, err error) CopyObject(ctx context.Context, user, srcBucname, srcObjname, dstBucname, dstObjname string, meta map[string]string) (dstObject *Object, err error) diff --git a/s3/services/object/service.go b/s3/services/object/service.go index 4c503a67c..be7b38c36 100644 --- a/s3/services/object/service.go +++ b/s3/services/object/service.go @@ -6,6 +6,7 @@ import ( "github.com/bittorrent/go-btfs/s3/action" "github.com/bittorrent/go-btfs/s3/ctxmu" "github.com/bittorrent/go-btfs/s3/policy" + "io" "strings" "time" @@ -14,7 +15,6 @@ import ( var _ Service = (*service)(nil) -// service captures all bucket metadata for a given cluster. type service struct { providers providers.Providerser lock ctxmu.MultiCtxRWLocker @@ -102,3 +102,91 @@ func (s *service) checkACL(owner, acl, user string, act action.Action) (allow bo allow = policy.IsAllowed(own, acl, act) return } + +func (s *service) addBodyRef(ctx context.Context, cid, tokey string) (err error) { + // Cid reference key + crfkey := s.getCidrefKey(cid, tokey) + + // Add cid reference + err = s.providers.StateStore().Put(crfkey, nil) + + return +} + +func (s *service) removeBodyRef(ctx context.Context, cid, tokey string) (err error) { + // This object cid reference key + crfkey := s.getCidrefKey(cid, tokey) + + // Delete cid ref of this object + err = s.providers.StateStore().Delete(crfkey) + + return +} + +func (s *service) storeBody(ctx context.Context, body io.Reader, tokey string) (cid string, err error) { + // RLock all cid refs to enable no cid will be deleted + err = s.lock.RLock(ctx, s.cidrefSpace) + if err != nil { + return + } + defer s.lock.RUnlock(s.cidrefSpace) + + // Store body and get the cid + cid, err = s.providers.FileStore().Store(body) + if err != nil { + return + } + + // Add cid reference + err = s.addBodyRef(ctx, cid, tokey) + + return +} + +func (s *service) removeBody(ctx context.Context, cid, tokey string) (err error) { + // Flag to mark cid be referenced by other object + otherRef := false + + // Log removing + defer func() { + fmt.Printf("s3-api: remove <%s>, ref <%s>, other-ref - %v, err: %v\n", cid, tokey, otherRef, err) + }() + + // Lock all cid refs to enable new cid reference can not be added when + // remove is executing + err = s.lock.Lock(ctx, s.cidrefSpace) + if err != nil { + return + } + defer s.lock.Unlock(s.cidrefSpace) + + // Remove cid ref of this object + err = s.removeBodyRef(ctx, cid, tokey) + if err != nil { + return + } + + // All this cid references prefix + allRefsPrefix := s.getAllCidrefsKeyPrefix(cid) + + // Iterate all this cid refs, if exists other object's ref, set + // the otherRef mark to true + err = s.providers.StateStore().Iterate(allRefsPrefix, func(key, _ []byte) (stop bool, err error) { + otherRef = true + stop = true + return + }) + if err != nil { + return + } + + // Exists other refs, cid body can not be removed + if otherRef { + return + } + + // No other refs to this cid, remove it + err = s.providers.FileStore().Remove(cid) + + return +} diff --git a/s3/services/object/service_bucket.go b/s3/services/object/service_bucket.go index 078e4b6f8..ff347d258 100644 --- a/s3/services/object/service_bucket.go +++ b/s3/services/object/service_bucket.go @@ -126,23 +126,18 @@ func (s *service) DeleteBucket(ctx context.Context, user, bucname string) (err e return } - // Delete bucket - err = s.providers.StateStore().Delete(buckey) + // Check if bucket is empty + empty, err := s.isBucketEmpty(bucname) if err != nil { return } + if !empty { + err = ErrBucketeNotEmpty + return + } - // All bucket objects prefix - objectsPrefix := s.getAllObjectsKeyPrefix(bucname) - - // Try to delete all bucket objects - _ = s.deleteObjectsByPrefix(ctx, objectsPrefix) - - // All bucket uploads prefix - uploadsPrefix := s.getAllUploadsKeyPrefix(bucname) - - // Try to delete all bucket uploads - _ = s.deleteUploadsByPrefix(ctx, uploadsPrefix) + // Delete bucket + err = s.providers.StateStore().Delete(buckey) return } @@ -278,37 +273,7 @@ func (s *service) GetBucketACL(ctx context.Context, user, bucname string) (acl s } // EmptyBucket check if the user specified bucked is empty -func (s *service) EmptyBucket(ctx context.Context, user, bucname string) (empty bool, err error) { - ctx, cancel := s.opctx(ctx) - defer cancel() - - // Bucket key - buckey := s.getBucketKey(bucname) - - // RLock bucket - err = s.lock.RLock(ctx, buckey) - if err != nil { - return - } - defer s.lock.RUnlock(buckey) - - // Get bucket - bucket, err := s.getBucket(buckey) - if err != nil { - return - } - if bucket == nil { - err = ErrBucketNotFound - return - } - - // Check action ACL - allow := s.checkACL(bucket.Owner, bucket.ACL, user, action.ListObjectsAction) - if !allow { - err = ErrNotAllowed - return - } - +func (s *service) isBucketEmpty(bucname string) (empty bool, err error) { // All bucket objects prefix objectsPrefix := s.getAllObjectsKeyPrefix(bucname) diff --git a/s3/services/object/service_multipart.go b/s3/services/object/service_multipart.go index 1ee203baf..3e0533558 100644 --- a/s3/services/object/service_multipart.go +++ b/s3/services/object/service_multipart.go @@ -2,9 +2,7 @@ package object import ( "context" - "encoding/hex" "errors" - "fmt" "github.com/bittorrent/go-btfs/s3/action" "github.com/bittorrent/go-btfs/s3/consts" "github.com/bittorrent/go-btfs/s3/etag" @@ -404,6 +402,13 @@ func (s *service) CompleteMultiPartUpload(ctx context.Context, user, bucname, ob } }() + // Calculate multipart etag + multiEtag, err := s.calcMultiETag(parts) + if err != nil { + return + } + + // Current time now := time.Now().UTC() // Object @@ -413,7 +418,7 @@ func (s *service) CompleteMultiPartUpload(ctx context.Context, user, bucname, ob ModTime: now, Size: size, IsDir: false, - ETag: s.computeMultipartMD5(parts), + ETag: multiEtag.String(), CID: cid, ACL: "", VersionID: "", @@ -500,39 +505,16 @@ func (s *service) canonicalizeETag(etag string) string { return etagRegex.ReplaceAllString(etag, "$1") } -func (s *service) computeMultipartMD5(parts []*CompletePart) (md5 string) { - var finalMD5Bytes []byte +func (s *service) calcMultiETag(parts []*CompletePart) (multiEtag etag.ETag, err error) { + var completeETags []etag.ETag for _, part := range parts { - md5Bytes, err := hex.DecodeString(s.canonicalizeETag(part.ETag)) + var etg etag.ETag + etg, err = etag.Parse(part.ETag) if err != nil { - finalMD5Bytes = append(finalMD5Bytes, []byte(part.ETag)...) - } else { - finalMD5Bytes = append(finalMD5Bytes, md5Bytes...) - } - } - md5 = fmt.Sprintf("%s-%d", etag.Multipart(finalMD5Bytes), len(parts)) - return -} - -// deleteUploadsByPrefix try to delete all multipart uploads with the specified common prefix -func (s *service) deleteUploadsByPrefix(ctx context.Context, uploadsPrefix string) (err error) { - err = s.providers.StateStore().Iterate(uploadsPrefix, func(key, _ []byte) (stop bool, er error) { - uplkey := string(key) - var multipart *Multipart - er = s.providers.StateStore().Get(uplkey, multipart) - if er != nil { - return - } - er = s.providers.StateStore().Delete(uplkey) - if er != nil { return } - for i, part := range multipart.Parts { - prtkey := s.getUploadPartKey(uplkey, i) - _ = s.removeBody(ctx, part.CID, prtkey) - } - return - }) - + completeETags = append(completeETags, etg) + } + multiEtag = etag.Multipart(completeETags...) return } diff --git a/s3/services/object/service_object.go b/s3/services/object/service_object.go index a7ff85ef1..2218a7ca4 100644 --- a/s3/services/object/service_object.go +++ b/s3/services/object/service_object.go @@ -3,7 +3,6 @@ package object import ( "context" "errors" - "fmt" "github.com/bittorrent/go-btfs/s3/action" "github.com/bittorrent/go-btfs/s3/consts" "github.com/bittorrent/go-btfs/s3/providers" @@ -479,8 +478,15 @@ func (s *service) ListObjects(ctx context.Context, user, bucname, prefix, delimi return } + // Object list list = &ObjectsList{} + // MaxKeys is zero + if max == 0 { + list.IsTruncated = true + return + } + // All bucket objects key prefix allObjectsKeyPrefix := s.getAllObjectsKeyPrefix(bucname) @@ -608,118 +614,3 @@ func (s *service) getObject(objkey string) (object *Object, err error) { } return } - -// deleteObjectsByPrefix try to delete all objects with the specified common prefix -func (s *service) deleteObjectsByPrefix(ctx context.Context, objectsPrefix string) (err error) { - err = s.providers.StateStore().Iterate(objectsPrefix, func(key, _ []byte) (stop bool, er error) { - objkey := string(key) - var object *Object - er = s.providers.StateStore().Get(objkey, object) - if er != nil { - return - } - er = s.providers.StateStore().Delete(objkey) - if er != nil { - return - } - _ = s.removeBody(ctx, object.CID, objkey) - return - }) - - return -} - -func (s *service) addBodyRef(ctx context.Context, cid, toKey string) (err error) { - // Cid reference key - crfKey := s.getCidrefKey(cid, toKey) - - // Add cid reference - err = s.providers.StateStore().Put(crfKey, nil) - - return -} - -func (s *service) removeBodyRef(ctx context.Context, cid, toKey string) (err error) { - // This object cid reference key - crfKey := s.getCidrefKey(cid, toKey) - - // Delete cid ref of this object - err = s.providers.StateStore().Delete(crfKey) - - return -} - -func (s *service) storeBody(ctx context.Context, body io.Reader, toKey string) (cid string, err error) { - // RLock all cid refs to enable no cid will be deleted - err = s.lock.RLock(ctx, s.cidrefSpace) - if err != nil { - return - } - defer s.lock.RUnlock(s.cidrefSpace) - - // Store body and get the cid - cid, err = s.providers.FileStore().Store(body) - if err != nil { - return - } - - // Cid reference key - crfKey := s.getCidrefKey(cid, toKey) - - // Add cid reference - err = s.providers.StateStore().Put(crfKey, nil) - - return -} - -func (s *service) removeBody(ctx context.Context, cid, tokey string) (err error) { - // Flag to mark cid be referenced by other object - otherRef := false - - // Log removing - defer func() { - fmt.Printf("remove <%s>, ref <%s>, refered - %v, err: %v\n", cid, tokey, otherRef, err) - }() - - // Lock all cid refs to enable new cid reference can not be added when - // remove is executing - err = s.lock.Lock(ctx, s.cidrefSpace) - if err != nil { - return - } - defer s.lock.Unlock(s.cidrefSpace) - - // This object cid reference key - crfKey := s.getCidrefKey(cid, tokey) - - // Delete cid ref of this object - err = s.providers.StateStore().Delete(crfKey) - if err != nil { - return - } - - // All this cid references prefix - allRefsPrefix := s.getAllCidrefsKeyPrefix(cid) - - - // Iterate all this cid refs, if exists other object's ref, set - // the otherRef mark to true - err = s.providers.StateStore().Iterate(allRefsPrefix, func(key, _ []byte) (stop bool, err error) { - otherRef = true - stop = true - return - }) - if err != nil { - return - } - - // Exists other refs, cid body can not be removed - if otherRef { - return - } - - // No other refs to this cid, remove it - err = s.providers.FileStore().Remove(cid) - - return -}