diff --git a/s3/services/object/clean_read_closer.go b/s3/services/object/clean_read_closer.go index ffb5a9fd8..fd767da8d 100644 --- a/s3/services/object/clean_read_closer.go +++ b/s3/services/object/clean_read_closer.go @@ -15,7 +15,7 @@ func WrapCleanReadCloser(rc io.ReadCloser, timeout time.Duration, afterCloseHook go func() { <-ctx.Done() _ = rc.Close() - // call after hooks stack + // call after hooks by stack order for len(afterCloseHooks) > 0 { idx := len(afterCloseHooks) - 1 f := afterCloseHooks[idx] diff --git a/s3/services/object/options.go b/s3/services/object/options.go index 9109d3188..05c4dd9a3 100644 --- a/s3/services/object/options.go +++ b/s3/services/object/options.go @@ -1,3 +1,61 @@ package object +import ( + "github.com/bittorrent/go-btfs/s3/ctxmu" + "time" +) + +const ( + defaultKeySeparator = "/" + defaultBucketSpace = "bkt" + defaultObjectSpace = "obj" + defaultUploadSpace = "upl" + defaultOperationTimeout = 5 * time.Minute + defaultCloseBodyTimeout = 10 * time.Minute +) + +var defaultLock = ctxmu.NewDefaultMultiCtxRWMutex() + type Option func(svc *service) + +func WithKeySeparator(separator string) Option { + return func(svc *service) { + svc.keySeparator = separator + } +} + +func WithBucketSpace(space string) Option { + return func(svc *service) { + svc.bucketSpace = space + } +} + +func WithObjectSpace(space string) Option { + return func(svc *service) { + svc.objectSpace = space + } +} + +func WithUploadSpace(space string) Option { + return func(svc *service) { + svc.uploadSpace = space + } +} + +func WithOperationTimeout(timeout time.Duration) Option { + return func(svc *service) { + svc.operationTimeout = timeout + } +} + +func WithCloseBodyTimeout(timeout time.Duration) Option { + return func(svc *service) { + svc.closeBodyTimeout = timeout + } +} + +func WithLock(lock ctxmu.MultiCtxRWLocker) Option { + return func(svc *service) { + svc.lock = lock + } +} diff --git a/s3/services/object/proto.go b/s3/services/object/proto.go index f62a6035a..eb4a03263 100644 --- a/s3/services/object/proto.go +++ b/s3/services/object/proto.go @@ -17,32 +17,25 @@ var ( ) type Service interface { - // bucket - CreateBucket(ctx context.Context, bucket, region, accessKey, acl string) error - GetBucketMeta(ctx context.Context, bucket string) (meta Bucket, err error) - HasBucket(ctx context.Context, bucket string) bool - SetEmptyBucket(emptyBucket func(ctx context.Context, bucket string) (bool, error)) - DeleteBucket(ctx context.Context, bucket string) error - GetAllBucketsOfUser(username string) (list []*Bucket, err error) - UpdateBucketAcl(ctx context.Context, bucket, acl string) error - GetBucketAcl(ctx context.Context, bucket string) (string, error) - EmptyBucket(emptyBucket func(ctx context.Context, bucket string) (bool, error)) - - // object - PutObject(ctx context.Context, bucname, objname string, reader *hash.Reader, size int64, meta map[string]string) (obj Object, err error) - CopyObject(ctx context.Context, bucket, object string, info Object, size int64, meta map[string]string) (Object, error) - GetObject(ctx context.Context, bucket, object string) (Object, io.ReadCloser, error) - GetObjectInfo(ctx context.Context, bucket, object string) (Object, error) - DeleteObject(ctx context.Context, bucket, object string) error - ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi Object, err error) - ListObjectsV2(ctx context.Context, bucket string, prefix string, continuationToken string, delimiter string, maxKeys int, owner bool, startAfter string) (ListObjectsV2Info, error) - - // martipart - CreateMultipartUpload(ctx context.Context, bucname string, objname string, meta map[string]string) (mtp Multipart, err error) - AbortMultipartUpload(ctx context.Context, bucname string, objname string, uploadID string) (err error) - UploadPart(ctx context.Context, bucname string, objname string, uploadID string, partID int, reader *hash.Reader, size int64, meta map[string]string) (part ObjectPart, err error) - CompleteMultiPartUpload(ctx context.Context, bucname string, objname string, uploadID string, parts []CompletePart) (obj Object, err error) - GetMultipart(ctx context.Context, bucname string, objname string, uploadID string) (mtp Multipart, err error) + CreateBucket(ctx context.Context, user, bucname, region, acl string) (bucket *Bucket, err error) + GetBucket(ctx context.Context, user, bucname string) (bucket *Bucket, err error) + DeleteBucket(ctx context.Context, user, bucname string) (err error) + GetAllBuckets(ctx context.Context, user string) (list []*Bucket, err error) + PutBucketAcl(ctx context.Context, user, bucname, acl string) (err error) + GetBucketAcl(ctx context.Context, user, bucname string) (acl string, err error) + EmptyBucket(ctx context.Context, user, bucname string) (empty bool, err error) + + PutObject(ctx context.Context, user string, bucname, objname string, body *hash.Reader, size int64, meta map[string]string) (object *Object, err error) + CopyObject(ctx context.Context, user string, srcBucname, srcObjname, dstBucname, dstObjname string, meta map[string]string) (dstObject *Object, err error) + GetObject(ctx context.Context, user, bucname, objname string) (object *Object, body io.ReadCloser, err error) + DeleteObject(ctx context.Context, user, bucname, objname string) (err error) + // todo: DeleteObjects + ListObjects(ctx context.Context, user, bucname, prefix, delimiter, marker string, max int) (list *ObjectsList, err error) + + CreateMultipartUpload(ctx context.Context, user, bucname, objname string, meta map[string]string) (multipart *Multipart, err error) + UploadPart(ctx context.Context, user, bucname, objname, uplid string, partId int, reader *hash.Reader, size int64, meta map[string]string) (part *ObjectPart, err error) + AbortMultipartUpload(ctx context.Context, user, bucname, objname, uplid string) (err error) + CompleteMultiPartUpload(ctx context.Context, user string, bucname, objname, uplid string, parts []*CompletePart) (object *Object, err error) } // Bucket contains bucket metadata. @@ -79,7 +72,7 @@ type Multipart struct { UploadID string Initiated time.Time MetaData map[string]string - Parts []ObjectPart + Parts []*ObjectPart } type ObjectPart struct { diff --git a/s3/services/object/service.go b/s3/services/object/service.go index 8ae0d2ae3..0b45d90b4 100644 --- a/s3/services/object/service.go +++ b/s3/services/object/service.go @@ -5,66 +5,36 @@ import ( "github.com/bittorrent/go-btfs/s3/action" "github.com/bittorrent/go-btfs/s3/ctxmu" "github.com/bittorrent/go-btfs/s3/policy" - "regexp" "strings" "time" "github.com/bittorrent/go-btfs/s3/providers" ) -const ( - defaultKeySeparator = "/" - defaultBucketSpace = "bkt" - defaultObjectSpace = "obj" - defaultUploadSpace = "upl" - defaultOperationTimeout = 5 * time.Minute - defaultReadObjectTimeout = 1 * time.Hour - - bucketPrefix = "bkt/" - objectKeyFormat = "obj/%s/%s" - objectPrefix = "obj/%s/" - allObjectPrefixFormat = "obj/%s/%s" - allObjectSeekKeyFormat = "obj/%s/%s" - - uploadKeyFormat = "uploadObj/%s/%s/%s" - allUploadPrefixFormat = "uploadObj/%s/%s" - allUploadSeekKeyFormat = "uploadObj/%s/%s/%s" - - deleteKeyFormat = "delObj/%s" - allDeletePrefixFormat = "delObj/" - - globalOperationTimeout = 5 * time.Minute - deleteOperationTimeout = 1 * time.Minute - - maxCpuPercent = 60 - maxUsedMemoryPercent = 80 -) - -var etagRegex = regexp.MustCompile("\"*?([^\"]*?)\"*?$") - var _ Service = (*service)(nil) // service captures all bucket metadata for a given cluster. type service struct { - providers providers.Providerser - lock ctxmu.MultiCtxRWLocker - keySeparator string - bucketSpace string - objectSpace string - uploadSpace string - operationTimeout time.Duration - readObjectTimeout time.Duration + providers providers.Providerser + lock ctxmu.MultiCtxRWLocker + keySeparator string + bucketSpace string + objectSpace string + uploadSpace string + operationTimeout time.Duration + closeBodyTimeout time.Duration } func NewService(providers providers.Providerser, options ...Option) Service { s := &service{ providers: providers, - lock: ctxmu.NewDefaultMultiCtxRWMutex(), + lock: defaultLock, keySeparator: defaultKeySeparator, bucketSpace: defaultBucketSpace, objectSpace: defaultObjectSpace, uploadSpace: defaultUploadSpace, operationTimeout: defaultOperationTimeout, + closeBodyTimeout: defaultCloseBodyTimeout, } for _, option := range options { option(s) @@ -74,43 +44,43 @@ func NewService(providers providers.Providerser, options ...Option) Service { // common helper methods -func (s *service) getBucketKeyPrefix() (prefix string) { +func (s *service) getAllBucketsKeyPrefix() (prefix string) { prefix = strings.Join([]string{s.bucketSpace, ""}, s.keySeparator) return } -func (s *service) getObjectKeyPrefix(bucname string) (prefix string) { - prefix = strings.Join([]string{s.objectSpace, bucname, ""}, s.keySeparator) +func (s *service) getBucketKey(bucname string) (key string) { + key = s.getAllBucketsKeyPrefix() + bucname return } -func (s *service) getUploadKeyPrefix(bucname, objname string) (prefix string) { - prefix = strings.Join([]string{s.uploadSpace, bucname, objname, ""}, s.keySeparator) +func (s *service) getAllObjectsKeyPrefix(bucname string) (prefix string) { + prefix = strings.Join([]string{s.objectSpace, bucname, ""}, s.keySeparator) return } -func (s *service) getBucketKey(bucname string) (key string) { - key = s.getBucketKeyPrefix() + bucname +func (s *service) getObjectKey(bucname, objname string) (key string) { + key = s.getAllObjectsKeyPrefix(bucname) + objname return } -func (s *service) getObjectKey(bucname, objname string) (key string) { - key = s.getObjectKeyPrefix(bucname) + objname +func (s *service) getAllUploadsKeyPrefix(bucname string) (prefix string) { + prefix = strings.Join([]string{s.uploadSpace, bucname, ""}, s.keySeparator) return } func (s *service) getUploadKey(bucname, objname, uploadid string) (key string) { - key = s.getUploadKeyPrefix(bucname, objname) + uploadid + key = strings.Join([]string{s.getAllUploadsKeyPrefix(bucname), objname, uploadid}, s.keySeparator) return } -func (s *service) checkAcl(owner, acl, user string, act action.Action) (allow bool) { - own := user != "" && user == owner - allow = policy.IsAllowed(own, acl, act) +func (s *service) opctx(parent context.Context) (ctx context.Context, cancel context.CancelFunc) { + ctx, cancel = context.WithTimeout(parent, s.operationTimeout) return } -func (s *service) opctx(parent context.Context) (ctx context.Context, cancel context.CancelFunc) { - ctx, cancel = context.WithTimeout(parent, s.operationTimeout) +func (s *service) checkAcl(owner, acl, user string, act action.Action) (allow bool) { + own := user != "" && user == owner + allow = policy.IsAllowed(own, acl, act) return } diff --git a/s3/services/object/service_bucket.go b/s3/services/object/service_bucket.go index caed601c5..13625aa4b 100644 --- a/s3/services/object/service_bucket.go +++ b/s3/services/object/service_bucket.go @@ -11,73 +11,80 @@ import ( ) // CreateBucket create a new bucket for the specified user -func (s *service) CreateBucket(ctx context.Context, user, bucname, region, acl string) (err error) { - buckey := s.getBucketKey(bucname) - +func (s *service) CreateBucket(ctx context.Context, user, bucname, region, acl string) (bucket *Bucket, err error) { + // Operation context ctx, cancel := s.opctx(ctx) defer cancel() + // Bucket key + buckey := s.getBucketKey(bucname) + + // Lock bucket err = s.lock.Lock(ctx, buckey) if err != nil { return } - defer s.lock.Unlock(buckey) - allow := s.checkAcl(user, acl, user, action.CreateBucketAction) - if !allow { - err = ErrNotAllowed + // Get old bucket + bucketOld, err := s.getBucket(buckey) + if err == nil { return } - - bucket, err := s.getBucket(buckey) - if err == nil { + if bucketOld != nil { + err = ErrBucketAlreadyExists return } - if bucket != nil { - err = ErrBucketAlreadyExists + // Check action acl + allow := s.checkAcl(user, acl, user, action.CreateBucketAction) + if !allow { + err = ErrNotAllowed return } - err = s.providers.StateStore().Put( - buckey, - &Bucket{ - Name: bucname, - Region: region, - Owner: user, - Acl: acl, - Created: time.Now().UTC(), - }, - ) + // Bucket + bucket = &Bucket{ + Name: bucname, + Region: region, + Owner: user, + Acl: acl, + Created: time.Now().UTC(), + } + + // Put bucket + err = s.providers.StateStore().Put(buckey, bucket) return } -// GetBucket get a bucket for the specified user +// GetBucket get a user specified bucket func (s *service) GetBucket(ctx context.Context, user, bucname string) (bucket *Bucket, err error) { - buckey := s.getBucketKey(bucname) - + // Operation context ctx, cancel := s.opctx(ctx) defer cancel() + // Bucket key + buckey := s.getBucketKey(bucname) + + // RLock bucket err = s.lock.RLock(ctx, buckey) if err != nil { return } - defer s.lock.RUnlock(buckey) + // Get bucket bucket, err = s.getBucket(buckey) if err != nil { return } - if bucket == nil { err = ErrBucketNotFound return } + // Check action acl allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.HeadBucketAction) if !allow { err = ErrNotAllowed @@ -86,82 +93,99 @@ func (s *service) GetBucket(ctx context.Context, user, bucname string) (bucket * return } -// DeleteBucket delete the specified user bucket and all the bucket's objects +// DeleteBucket delete a user specified bucket and clear all bucket objects and uploads func (s *service) DeleteBucket(ctx context.Context, user, bucname string) (err error) { - buckey := s.getBucketKey(bucname) - + // Operation context ctx, cancel := s.opctx(ctx) defer cancel() + // Bucket key + buckey := s.getBucketKey(bucname) + + // Lock bucket err = s.lock.Lock(ctx, buckey) if err != nil { return } - defer s.lock.Unlock(buckey) + // Get bucket bucket, err := s.getBucket(buckey) if err != nil { return } - if bucket == nil { err = ErrBucketNotFound return } + // Check action acl allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.DeleteBucketAction) if !allow { err = ErrNotAllowed return } + // Delete bucket err = s.providers.StateStore().Delete(buckey) if err != nil { return } - // bucket objects prefix - objectsPrefix := s.getObjectKeyPrefix(bucname) + // All bucket objects prefix + objectsPrefix := s.getAllObjectsKeyPrefix(bucname) + + // Try to delete all bucket objects + _ = s.deleteObjectsByPrefix(objectsPrefix) + + // All bucket uploads prefix + uploadsPrefix := s.getAllUploadsKeyPrefix(bucname) - // delete all objects of the bucket - err = s.deleteObjectsByPrefix(objectsPrefix) + // Try to delete all bucket uploads + _ = s.deleteUploadsByPrefix(uploadsPrefix) return } // GetAllBuckets get all buckets of the specified user func (s *service) GetAllBuckets(ctx context.Context, user string) (list []*Bucket, err error) { - bucprefix := s.getBucketKeyPrefix() - + // Operation context ctx, cancel := s.opctx(ctx) defer cancel() + // Check action acl allow := s.checkAcl(user, policy.Private, user, action.ListBucketAction) if !allow { err = ErrNotAllowed return } + // All buckets prefix + bucketsPrefix := s.getAllBucketsKeyPrefix() - err = s.providers.StateStore().Iterate(bucprefix, func(key, _ []byte) (stop bool, er error) { + // Collect user's buckets from all buckets + err = s.providers.StateStore().Iterate(bucketsPrefix, func(key, _ []byte) (stop bool, er error) { + // Stop the iteration if error occurred defer func() { if er != nil { stop = true } }() - er = ctx.Err() + // Bucket key + buckey := string(key) + + // Get Bucket + bucket, er := s.getBucket(buckey) if er != nil { return } - var bucket *Bucket - - er = s.providers.StateStore().Get(string(key), bucket) - if er != nil { + // Bucket has been deleted + if bucket == nil { return } + // Collect user's bucket if bucket.Owner == user { list = append(list, bucket) } @@ -172,78 +196,152 @@ func (s *service) GetAllBuckets(ctx context.Context, user string) (list []*Bucke return } -// PutBucketAcl update the acl field value of the specified user's bucket +// PutBucketAcl update user specified bucket's acl field value func (s *service) PutBucketAcl(ctx context.Context, user, bucname, acl string) (err error) { - buckey := s.getBucketKey(bucname) - + // Operation context ctx, cancel := s.opctx(ctx) defer cancel() + // Bucket key + buckey := s.getBucketKey(bucname) + + // Lock bucket err = s.lock.Lock(ctx, buckey) if err != nil { return } - defer s.lock.Unlock(buckey) + // Get bucket bucket, err := s.getBucket(buckey) if err != nil { return } - if bucket == nil { err = ErrBucketNotFound return } + // Check action acl allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.PutBucketAclAction) if !allow { err = ErrNotAllowed return } + // Update bucket acl bucket.Acl = acl + // Put bucket err = s.providers.StateStore().Put(buckey, bucket) return } -// GetBucketAcl get the acl field value of the specified user's bucket +// GetBucketAcl get user specified bucket acl field value func (s *service) GetBucketAcl(ctx context.Context, user, bucname string) (acl string, err error) { - buckey := s.getBucketKey(bucname) - + // Operation context ctx, cancel := s.opctx(ctx) defer cancel() + // Bucket key + buckey := s.getBucketKey(bucname) + + // RLock bucket err = s.lock.RLock(ctx, buckey) if err != nil { return } - defer s.lock.RUnlock(buckey) + // Get bucket bucket, err := s.getBucket(buckey) if err != nil { return } - if bucket == nil { err = ErrBucketNotFound return } + // Check action acl allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.GetBucketAclAction) if !allow { err = ErrNotAllowed return } + // Get acl field value acl = bucket.Acl return } +// EmptyBucket check if the user specified bucked is empty +func (s *service) EmptyBucket(ctx context.Context, user, bucname string) (empty bool, err error) { + ctx, cancel := s.opctx(ctx) + defer cancel() + + // Bucket key + buckey := s.getBucketKey(bucname) + + // RLock bucket + err = s.lock.RLock(ctx, buckey) + if err != nil { + return + } + defer s.lock.RUnlock(buckey) + + // Get bucket + bucket, err := s.getBucket(buckey) + if err != nil { + return + } + if bucket == nil { + err = ErrBucketNotFound + return + } + + // Check action acl + allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.ListObjectsAction) + if !allow { + err = ErrNotAllowed + return + } + + // All bucket objects prefix + objectsPrefix := s.getAllObjectsKeyPrefix(bucname) + + // Initially set empty to true + empty = true + + // Iterate the bucket objects, if no item, empty keep true + // if at least one, set empty to false, and stop iterate + err = s.providers.StateStore().Iterate(objectsPrefix, func(_, _ []byte) (stop bool, er error) { + empty = false + stop = true + return + }) + + // If bucket have at least one object, return not empty, else check if bucket + // have at least one upload + if !empty { + return + } + + // All bucket uploads prefix + uploadsPrefix := s.getAllUploadsKeyPrefix(bucname) + + // Set empty to false if bucket has at least one upload + err = s.providers.StateStore().Iterate(uploadsPrefix, func(_, _ []byte) (stop bool, er error) { + empty = false + stop = true + return + }) + + return +} + func (s *service) getBucket(buckey string) (bucket *Bucket, err error) { err = s.providers.StateStore().Get(buckey, bucket) if errors.Is(err, providers.ErrStateStoreNotFound) { diff --git a/s3/services/object/service_multipart.go b/s3/services/object/service_multipart.go index 455d5bced..592d45c4e 100644 --- a/s3/services/object/service_multipart.go +++ b/s3/services/object/service_multipart.go @@ -1,106 +1,338 @@ package object import ( + "context" "encoding/hex" + "errors" "fmt" + "github.com/bittorrent/go-btfs/s3/action" "github.com/bittorrent/go-btfs/s3/consts" "github.com/bittorrent/go-btfs/s3/etag" + "github.com/bittorrent/go-btfs/s3/providers" "github.com/bittorrent/go-btfs/s3/s3utils" "github.com/bittorrent/go-btfs/s3/utils/hash" "github.com/google/uuid" "io" "net/http" + "regexp" "strings" "time" ) -func (s *service) CreateMultipartUpload(ctx context.Context, bucname string, objname string, meta map[string]string) (mtp Multipart, err error) { - uploadId := uuid.NewString() - mtp = Multipart{ +// CreateMultipartUpload create user specified multipart upload +func (s *service) CreateMultipartUpload(ctx context.Context, user, bucname, objname string, meta map[string]string) (multipart *Multipart, err error) { + // Operation context + ctx, cancel := s.opctx(ctx) + defer cancel() + + // Bucket key + buckey := s.getBucketKey(bucname) + + // RLock bucket + err = s.lock.RLock(ctx, buckey) + if err != nil { + return + } + defer s.lock.RUnlock(buckey) + + // Get bucket + bucket, err := s.getBucket(buckey) + if err != nil { + return + } + if bucket == nil { + err = ErrBucketNotFound + return + } + + // Check action acl + allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.CreateMultipartUploadAction) + if !allow { + err = ErrNotAllowed + return + } + + // Upload id + uplid := uuid.NewString() + + // upload key + uplkey := s.getUploadKey(bucname, objname, uplid) + + // Lock upload + err = s.lock.Lock(ctx, uplkey) + if err != nil { + return + } + defer s.lock.Unlock(uplkey) + + // Multipart upload + multipart = &Multipart{ Bucket: bucname, Object: objname, - UploadID: uploadId, + UploadID: uplid, MetaData: meta, Initiated: time.Now().UTC(), } - err = s.providers.StateStore().Put(getUploadKey(bucname, objname, uploadId), mtp) + // Put multipart upload + err = s.providers.StateStore().Put(uplkey, multipart) + + return +} + +// UploadPart upload user specified multipart part +func (s *service) UploadPart(ctx context.Context, user, bucname, objname, uplid string, partId int, body *hash.Reader, size int64, meta map[string]string) (part *ObjectPart, err error) { + // Operation context + ctx, cancel := s.opctx(ctx) + defer cancel() + + // Bucket key + buckey := s.getBucketKey(bucname) + + // RLock bucket + err = s.lock.RLock(ctx, buckey) if err != nil { return } + defer s.lock.RUnlock(buckey) - return -} + // Get bucket + bucket, err := s.getBucket(buckey) + if err != nil { + return + } + if bucket == nil { + err = ErrBucketNotFound + return + } -func (s *service) UploadPart(ctx context.Context, bucname string, objname string, uploadID string, partID int, reader *hash.Reader, size int64, meta map[string]string) (part ObjectPart, err error) { - cid, err := s.providers.FileStore().Store(reader) + // Check acl + allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.UploadPartAction) + if !allow { + err = ErrNotAllowed + return + } + + // Upload key + uplkey := s.getUploadKey(bucname, objname, uplid) + + // Lock upload + err = s.lock.Lock(ctx, uplkey) if err != nil { return } + defer s.lock.Unlock(uplkey) - part = ObjectPart{ - Number: partID, - ETag: reader.ETag().String(), + // Get multipart upload + multipart, err := s.getMultipart(uplkey) + if err != nil { + return + } + if multipart == nil { + err = ErrUploadNotFound + return + } + + // Store part body + cid, err := s.providers.FileStore().Store(body) + if err != nil { + return + } + + // Init a flag to mark if the part body should be removed, this + // flag will be set to false if the multipart has been successfully put + var removePartBody = true + + // Try to remove the part body + defer func() { + if removePartBody { + _ = s.providers.FileStore().Remove(cid) + } + }() + + // Part + part = &ObjectPart{ + Number: partId, + ETag: body.ETag().String(), Cid: cid, Size: size, ModTime: time.Now().UTC(), } - mtp, err := s.getMultipart(ctx, bucname, objname, uploadID) + // Append part + multipart.Parts = append(multipart.Parts, part) + + // Put multipart upload + err = s.providers.StateStore().Put(uplkey, multipart) if err != nil { return } - mtp.Parts = append(mtp.Parts, part) - err = s.providers.StateStore().Put(getUploadKey(bucname, objname, uploadID), mtp) - if err != nil { - return part, err - } + // Set remove part body flag to false, because this part body has been referenced by the upload + removePartBody = false return } -func (s *service) AbortMultipartUpload(ctx context.Context, bucname string, objname string, uploadID string) (err error) { - mtp, err := s.getMultipart(ctx, bucname, objname, uploadID) +// AbortMultipartUpload abort user specified multipart upload +func (s *service) AbortMultipartUpload(ctx context.Context, user, bucname, objname, uplid string) (err error) { + // Operation context + ctx, cancel := s.opctx(ctx) + defer cancel() + + // Bucket key + buckey := s.getBucketKey(bucname) + + // RLock bucket + err = s.lock.RLock(ctx, buckey) if err != nil { return } + defer s.lock.RUnlock(buckey) - for _, part := range mtp.Parts { - err = s.providers.FileStore().Remove(part.Cid) - if err != nil { - return - } + // Get bucket + bucket, err := s.getBucket(buckey) + if err != nil { + return + } + if bucket == nil { + err = ErrBucketNotFound + return } - err = s.removeMultipart(ctx, bucname, objname, uploadID) + // Check action acl + allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.AbortMultipartUploadAction) + if !allow { + err = ErrNotAllowed + return + } + + // Multipart upload key + uplkey := s.getUploadKey(bucname, objname, uplid) + + // Lock upload + err = s.lock.Lock(ctx, uplkey) if err != nil { return } + defer s.lock.Unlock(uplkey) + + // Get multipart upload + multipart, err := s.getMultipart(uplkey) + if err != nil { + return + } + if multipart == nil { + err = ErrUploadNotFound + return + } + + // Delete multipart upload + err = s.providers.StateStore().Delete(uplkey) + if err != nil { + return + } + + // Try to remove all parts body + for _, part := range multipart.Parts { + _ = s.providers.FileStore().Remove(part.Cid) + } return } -func (s *service) CompleteMultiPartUpload(ctx context.Context, bucname string, objname string, uploadID string, parts []CompletePart) (obj Object, err error) { - mi, err := s.getMultipart(ctx, bucname, objname, uploadID) +// CompleteMultiPartUpload complete user specified multipart upload +func (s *service) CompleteMultiPartUpload(ctx context.Context, user string, bucname, objname, uplid string, parts []*CompletePart) (object *Object, err error) { + // Operation context + ctx, cancel := s.opctx(ctx) + defer cancel() + + // Bucket key + buckey := s.getBucketKey(bucname) + + // RLock bucket + err = s.lock.RLock(ctx, buckey) + if err != nil { + return + } + defer s.lock.RUnlock(buckey) + + // Get bucket + bucket, err := s.getBucket(buckey) + if err != nil { + return + } + if bucket == nil { + err = ErrBucketNotFound + return + } + + // Check acl + allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.CompleteMultipartUploadAction) + if !allow { + err = ErrNotAllowed + return + } + + // Object key + objkey := s.getObjectKey(bucname, objname) + + // Lock object + err = s.lock.Lock(ctx, objkey) + if err != nil { + return + } + defer s.lock.Unlock(objkey) + + // Get old object for try to remove the old body + objectOld, err := s.getObject(objkey) if err != nil { return } - var ( - readers []io.Reader - objectSize int64 - ) + // Upload key + uplkey := s.getUploadKey(bucname, objname, uplid) + + // Lock upload + err = s.lock.Lock(ctx, uplkey) + if err != nil { + return + } + defer s.lock.Unlock(uplkey) + + // Get multipart upload + multipart, err := s.getMultipart(uplkey) + if err != nil { + return + } + if multipart == nil { + err = ErrUploadNotFound + return + } + + // All parts body readers + var readers []io.Reader + // Try to close all parts body readers, because some or all of + // these body may not be used defer func() { for _, rdr := range readers { _ = rdr.(io.ReadCloser).Close() } }() - idxMap := objectPartIndexMap(mi.Parts) + // Total object size + var size int64 + + // Mapping of part number to part index in multipart.Parts + idxmp := s.partIdxMap(multipart.Parts) + + // Iterate all parts to collect all body readers for i, part := range parts { - partIndex, ok := idxMap[part.PartNumber] + // Index in multipart.Parts + partIndex, ok := idxmp[part.PartNumber] + + // Part not exists in multipart if !ok { err = s3utils.InvalidPart{ PartNumber: part.PartNumber, @@ -109,9 +341,13 @@ func (s *service) CompleteMultiPartUpload(ctx context.Context, bucname string, o return } - gotPart := mi.Parts[partIndex] + // Got part in multipart.Parts + gotPart := multipart.Parts[partIndex] - part.ETag = canonicalizeETag(part.ETag) + // Canonicalize part etag + part.ETag = s.canonicalizeETag(part.ETag) + + // Check got part etag with part etag if gotPart.ETag != part.ETag { err = s3utils.InvalidPart{ PartNumber: part.PartNumber, @@ -131,90 +367,100 @@ func (s *service) CompleteMultiPartUpload(ctx context.Context, bucname string, o return } - // Save for total objname size. - objectSize += gotPart.Size + // Save for total object size. + size += gotPart.Size + // Get part body reader var rdr io.ReadCloser rdr, err = s.providers.FileStore().Cat(gotPart.Cid) if err != nil { return } + // Collect part body reader readers = append(readers, rdr) } - cid, err := s.providers.FileStore().Store(io.MultiReader(readers...)) + // Concat all parts body to one + body := io.MultiReader(readers...) + + // Store object body + cid, err := s.providers.FileStore().Store(body) if err != nil { return } - obj = Object{ + // Init a flag to mark if the object body should be removed, this + // flag will be set to false if the object has been successfully put + var removeObjectBody = true + + // Try to remove stored body if put object failed + defer func() { + if removeObjectBody { + _ = s.providers.FileStore().Remove(cid) + } + }() + + // Object + object = &Object{ Bucket: bucname, Name: objname, ModTime: time.Now().UTC(), - Size: objectSize, + Size: size, IsDir: false, - ETag: computeCompleteMultipartMD5(parts), + ETag: s.computeMultipartMD5(parts), Cid: cid, VersionID: "", IsLatest: true, DeleteMarker: false, - ContentType: mi.MetaData[strings.ToLower(consts.ContentType)], - ContentEncoding: mi.MetaData[strings.ToLower(consts.ContentEncoding)], + ContentType: multipart.MetaData[strings.ToLower(consts.ContentType)], + ContentEncoding: multipart.MetaData[strings.ToLower(consts.ContentEncoding)], SuccessorModTime: time.Now().UTC(), } - if exp, ok := mi.MetaData[strings.ToLower(consts.Expires)]; ok { - if t, e := time.Parse(http.TimeFormat, exp); e == nil { - obj.Expires = t.UTC() - } + // Set object expires + exp, e := time.Parse(http.TimeFormat, multipart.MetaData[strings.ToLower(consts.Expires)]) + if e == nil { + object.Expires = exp.UTC() } - err = s.providers.StateStore().Put(getObjectKey(bucname, objname), obj) + // Put object + err = s.providers.StateStore().Put(objkey, object) if err != nil { return } - err = s.removeMultipartInfo(ctx, bucname, objname, uploadID) - if err != nil { - return - } + // Set remove object body flag to false, because it has been referenced by the object + removeObjectBody = false - return -} - -func (s *service) GetMultipart(ctx context.Context, bucname string, objname string, uploadID string) (mtp Multipart, err error) { - return s.getMultipart(ctx, bucname, objname, uploadID) -} + // Try to remove old object body if exists, because it has been covered by new one + if objectOld != nil { + _ = s.providers.FileStore().Remove(objectOld.Cid) + } -func (s *service) getMultipart(ctx context.Context, bucname string, objname string, uploadID string) (mtp Multipart, err error) { - err = s.providers.StateStore().Get(getUploadKey(bucname, objname, uploadID), &mtp) - if errors.Is(err, providers.ErrStateStoreNotFound) { - err = ErrUploadNotFound + // Remove multipart upload + err = s.providers.StateStore().Delete(uplkey) + if err != nil { return } - return -} -func (s *service) removeMultipart(ctx context.Context, bucname string, objname string, uploadID string) (err error) { - err = s.providers.StateStore().Delete(getUploadKey(bucname, objname, uploadID)) - if errors.Is(err, providers.ErrStateStoreNotFound) { - err = ErrUploadNotFound - return + // Try to remove all parts body, because they are no longer be referenced + for _, part := range multipart.Parts { + _ = s.providers.FileStore().Remove(part.Cid) } + return } -func (s *service) removeMultipartInfo(ctx context.Context, bucname string, objname string, uploadID string) (err error) { - err = s.providers.StateStore().Delete(getUploadKey(bucname, objname, uploadID)) +func (s *service) getMultipart(uplkey string) (multipart *Multipart, err error) { + err = s.providers.StateStore().Get(uplkey, multipart) if errors.Is(err, providers.ErrStateStoreNotFound) { - err = ErrUploadNotFound - return + err = nil } return } -func objectPartIndexMap(parts []ObjectPart) map[int]int { +func (s *service) partIdxMap(parts []*ObjectPart) map[int]int { mp := make(map[int]int) for i, part := range parts { mp[part.Number] = i @@ -222,49 +468,44 @@ func objectPartIndexMap(parts []ObjectPart) map[int]int { return mp } +var etagRegex = regexp.MustCompile("\"*?([^\"]*?)\"*?$") + // canonicalizeETag returns ETag with leading and trailing double-quotes removed, // if any present -func canonicalizeETag(etag string) string { +func (s *service) canonicalizeETag(etag string) string { return etagRegex.ReplaceAllString(etag, "$1") } -func computeCompleteMultipartMD5(parts []CompletePart) string { +func (s *service) computeMultipartMD5(parts []*CompletePart) (md5 string) { var finalMD5Bytes []byte for _, part := range parts { - md5Bytes, err := hex.DecodeString(canonicalizeETag(part.ETag)) + md5Bytes, err := hex.DecodeString(s.canonicalizeETag(part.ETag)) if err != nil { finalMD5Bytes = append(finalMD5Bytes, []byte(part.ETag)...) } else { finalMD5Bytes = append(finalMD5Bytes, md5Bytes...) } } - s3MD5 := fmt.Sprintf("%s-%d", etag.Multipart(finalMD5Bytes), len(parts)) - return s3MD5 -} - -func (s *service) getObject(objkey string) (object *Object, err error) { - err = s.providers.StateStore().Get(objkey, object) - if errors.Is(err, providers.ErrStateStoreNotFound) { - err = nil - } + md5 = fmt.Sprintf("%s-%d", etag.Multipart(finalMD5Bytes), len(parts)) return } -// deleteObjectsByPrefix delete all objects have common prefix -// it will continue even if one of the objects be deleted fail -func (s *service) deleteObjectsByPrefix(objectPrefix string) (err error) { - err = s.providers.StateStore().Iterate(objectPrefix, func(key, _ []byte) (stop bool, er error) { - keyStr := string(key) - var object *Object - er = s.providers.StateStore().Get(keyStr, object) +// deleteUploadsByPrefix try to delete all multipart uploads with the specified common prefix +func (s *service) deleteUploadsByPrefix(uploadsPrefix string) (err error) { + err = s.providers.StateStore().Iterate(uploadsPrefix, func(key, _ []byte) (stop bool, er error) { + uplkey := string(key) + var multipart *Multipart + er = s.providers.StateStore().Get(uplkey, multipart) if er != nil { return } - er = s.providers.FileStore().Remove(object.Cid) + er = s.providers.StateStore().Delete(uplkey) if er != nil { return } - er = s.providers.StateStore().Delete(keyStr) + for _, part := range multipart.Parts { + _ = s.providers.FileStore().Remove(part.Cid) + } return }) diff --git a/s3/services/object/service_object.go b/s3/services/object/service_object.go index 78f4d17d0..a0bb6be73 100644 --- a/s3/services/object/service_object.go +++ b/s3/services/object/service_object.go @@ -2,8 +2,10 @@ package object import ( "context" + "errors" "github.com/bittorrent/go-btfs/s3/action" "github.com/bittorrent/go-btfs/s3/consts" + "github.com/bittorrent/go-btfs/s3/providers" "github.com/bittorrent/go-btfs/s3/utils/hash" "io" "net/http" @@ -12,22 +14,22 @@ import ( ) // PutObject put a user specified object -func (s *service) PutObject(ctx context.Context, user string, bucname, objname string, reader *hash.Reader, size int64, meta map[string]string) (object *Object, err error) { - // operation context +func (s *service) PutObject(ctx context.Context, user string, bucname, objname string, body *hash.Reader, size int64, meta map[string]string) (object *Object, err error) { + // Operation context ctx, cancel := s.opctx(ctx) defer cancel() - // bucket key + // Bucket key buckey := s.getBucketKey(bucname) - // rlock bucket + // RLock bucket err = s.lock.RLock(ctx, buckey) if err != nil { return } defer s.lock.RUnlock(buckey) - // get bucket + // Get bucket bucket, err := s.getBucket(buckey) if err != nil { return @@ -37,43 +39,46 @@ func (s *service) PutObject(ctx context.Context, user string, bucname, objname s return } - // check acl + // Check action acl allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.PutObjectAction) if !allow { err = ErrNotAllowed return } - // object key + // Object key objkey := s.getObjectKey(bucname, objname) - // lock object + // Lock object err = s.lock.Lock(ctx, objkey) if err != nil { return } defer s.lock.Unlock(objkey) - // get old object - oldObject, err := s.getObject(objkey) + // Get old object + objectOld, err := s.getObject(objkey) if err != nil { return } - // remove old file, if old object exists and put new object successfully - defer func() { - if oldObject != nil && err == nil { - _ = s.providers.FileStore().Remove(oldObject.Cid) - // todo: log this remove error - } - }() - - // store file - cid, err := s.providers.FileStore().Store(reader) + // Store object body + cid, err := s.providers.FileStore().Store(body) if err != nil { return } + // Init a flag to mark if the object body should be removed, this + // flag will be set to false if the object has been successfully put + var removeObjectBody = true + + // Try to remove stored body if put object failed + defer func() { + if removeObjectBody { + _ = s.providers.FileStore().Remove(cid) + } + }() + // now now := time.Now() @@ -84,7 +89,7 @@ func (s *service) PutObject(ctx context.Context, user string, bucname, objname s ModTime: now.UTC(), Size: size, IsDir: false, - ETag: reader.ETag().String(), + ETag: body.ETag().String(), Cid: cid, VersionID: "", IsLatest: true, @@ -103,27 +108,38 @@ func (s *service) PutObject(ctx context.Context, user string, bucname, objname s // put object err = s.providers.StateStore().Put(objkey, object) + if err != nil { + return + } + + // Set remove object body flag to false, because it has been referenced by the object + removeObjectBody = false + + // Try to remove old object body if exists, because it has been covered by new one + if objectOld != nil { + _ = s.providers.FileStore().Remove(objectOld.Cid) + } return } // CopyObject copy from a user specified source object to a desert object func (s *service) CopyObject(ctx context.Context, user string, srcBucname, srcObjname, dstBucname, dstObjname string, meta map[string]string) (dstObject *Object, err error) { - // operation context + // Operation context ctx, cancel := s.opctx(ctx) defer cancel() - // source bucket key + // Source bucket key srcBuckey := s.getBucketKey(srcBucname) - // rlock source bucket + // RLock source bucket err = s.lock.RLock(ctx, srcBuckey) if err != nil { return } defer s.lock.RUnlock(srcBuckey) - // get source bucket + // Get source bucket srcBucket, err := s.getBucket(srcBuckey) if err != nil { return @@ -133,24 +149,24 @@ func (s *service) CopyObject(ctx context.Context, user string, srcBucname, srcOb return } - // check source acl + // Check source action acl srcAllow := s.checkAcl(srcBucket.Owner, srcBucket.Acl, user, action.GetObjectAction) if !srcAllow { err = ErrNotAllowed return } - // source object key + // Source object key srcObjkey := s.getObjectKey(srcBucname, srcObjname) - // rlock source object + // RLock source object err = s.lock.RLock(ctx, srcObjkey) if err != nil { return } defer s.lock.RUnlock(srcObjkey) - // get source object + // Get source object srcObject, err := s.getObject(srcObjkey) if err != nil { return @@ -160,17 +176,17 @@ func (s *service) CopyObject(ctx context.Context, user string, srcBucname, srcOb return } - // desert bucket key + // Desert bucket key dstBuckey := s.getBucketKey(dstBucname) - // rlock desert bucket + // RLock destination bucket err = s.lock.RLock(ctx, dstBuckey) if err != nil { return } defer s.lock.RUnlock(dstBuckey) - // get desert bucket + // Get destination bucket dstBucket, err := s.getBucket(dstBuckey) if err != nil { return @@ -180,17 +196,17 @@ func (s *service) CopyObject(ctx context.Context, user string, srcBucname, srcOb return } - // check desert acl + // Check destination action acl dstAllow := s.checkAcl(dstBucket.Owner, dstBucket.Acl, user, action.PutObjectAction) if !dstAllow { err = ErrNotAllowed return } - // desert object key + // Destination object key dstObjkey := s.getObjectKey(dstBucname, dstObjname) - // lock desert object + // Lock Destination object err = s.lock.Lock(ctx, dstObjkey) if err != nil { return @@ -200,7 +216,7 @@ func (s *service) CopyObject(ctx context.Context, user string, srcBucname, srcOb // now now := time.Now() - // desert object + // Destination object dstObject = &Object{ Bucket: dstBucname, Name: dstObjname, @@ -217,40 +233,40 @@ func (s *service) CopyObject(ctx context.Context, user string, srcBucname, srcOb SuccessorModTime: now.UTC(), } - // set object desert expires + // Set destination object expires exp, er := time.Parse(http.TimeFormat, strings.ToLower(consts.Expires)) if er != nil { dstObject.Expires = exp.UTC() } - // put desert object + // Put destination object err = s.providers.StateStore().Put(dstObjkey, dstObject) return } -// GetObject get an object for the specified user +// GetObject get a user specified object func (s *service) GetObject(ctx context.Context, user, bucname, objname string) (object *Object, body io.ReadCloser, err error) { - // operation context + // Operation context ctx, cancel := s.opctx(ctx) defer cancel() // bucket key buckey := s.getBucketKey(bucname) - // rlock bucket + // RLock bucket err = s.lock.RLock(ctx, buckey) if err != nil { return } defer func() { - // rUnlock bucket just if getting failed + // RUnlock bucket just if getting failed if err != nil { s.lock.RUnlock(buckey) } }() - // get bucket + // Get bucket bucket, err := s.getBucket(buckey) if err != nil { return @@ -260,29 +276,29 @@ func (s *service) GetObject(ctx context.Context, user, bucname, objname string) return } - // check acl + // Check action acl allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.GetObjectAction) if !allow { err = ErrNotAllowed return } - // object key + // Object key objkey := s.getObjectKey(bucname, objname) - // rlock object + // RLock object err = s.lock.RLock(ctx, objkey) if err != nil { return } defer func() { - // rUnlock object just if getting failed + // RUnlock object just if getting failed if err != nil { s.lock.RUnlock(objkey) } }() - // get object + // Get object object, err = s.getObject(objkey) if err != nil { return @@ -292,21 +308,21 @@ func (s *service) GetObject(ctx context.Context, user, bucname, objname string) return } - // get object body + // Get object body body, err = s.providers.FileStore().Cat(object.Cid) if err != nil { return } - // wrap the body with timeout and unlock hooks + // Wrap the body with timeout and unlock hooks, // this will enable the bucket and object keep rlocked until // read timout or read closed. Normally, these locks will // be released as soon as leave from the call body = WrapCleanReadCloser( body, - s.readObjectTimeout, + s.closeBodyTimeout, func() { - s.lock.RUnlock(objkey) // note: release object first + s.lock.RUnlock(objkey) // Note: Release object first s.lock.RUnlock(buckey) }, ) @@ -316,21 +332,21 @@ func (s *service) GetObject(ctx context.Context, user, bucname, objname string) // DeleteObject delete a user specified object func (s *service) DeleteObject(ctx context.Context, user, bucname, objname string) (err error) { - // operation context + // Operation context ctx, cancel := s.opctx(ctx) defer cancel() - // bucket key + // Bucket key buckey := s.getBucketKey(bucname) - // rlock bucket + // RLock bucket err = s.lock.RLock(ctx, buckey) if err != nil { return } defer s.lock.RUnlock(buckey) - // get bucket + // Get bucket bucket, err := s.getBucket(buckey) if err != nil { return @@ -340,24 +356,24 @@ func (s *service) DeleteObject(ctx context.Context, user, bucname, objname strin return } - // check acl + // Check action acl allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.DeleteObjectAction) if !allow { err = ErrNotAllowed return } - // object key + // Object key objkey := s.getObjectKey(bucname, objname) - // lock object + // Lock object err = s.lock.Lock(ctx, objkey) if err != nil { return } defer s.lock.Unlock(objkey) - // get object + // Get object object, err := s.getObject(objkey) if err != nil { return @@ -367,35 +383,35 @@ func (s *service) DeleteObject(ctx context.Context, user, bucname, objname strin return } - // delete object body - err = s.providers.FileStore().Remove(object.Cid) + // Delete object + err = s.providers.StateStore().Delete(objkey) if err != nil { return } - // delete object - err = s.providers.StateStore().Delete(objkey) + // Try to delete object body + _ = s.providers.FileStore().Remove(object.Cid) return } // ListObjects list user specified objects func (s *service) ListObjects(ctx context.Context, user, bucname, prefix, delimiter, marker string, max int) (list *ObjectsList, err error) { - // operation context + // Operation context ctx, cancel := s.opctx(ctx) defer cancel() - // bucket key + // Bucket key buckey := s.getBucketKey(bucname) - // rlock bucket + // RLock bucket err = s.lock.RLock(ctx, buckey) if err != nil { return } defer s.lock.RUnlock(buckey) - // get bucket + // Get bucket bucket, err := s.getBucket(buckey) if err != nil { return @@ -405,37 +421,38 @@ func (s *service) ListObjects(ctx context.Context, user, bucname, prefix, delimi return } - // check acl + // Check action acl allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.ListObjectsAction) if !allow { err = ErrNotAllowed return } - // object key prefix - objkeyPrefix := s.getObjectKeyPrefix(bucname) + // All bucket objects key prefix + allObjectsKeyPrefix := s.getAllObjectsKeyPrefix(bucname) - // objects key prefix - objskeyPrefix := objkeyPrefix + prefix + // List objects key prefix + listObjectsKeyPrefix := allObjectsKeyPrefix + prefix - // accumulate count + // Accumulate count count := 0 - // begin collect + // Flag mark if begin collect, it initialized to true if + // marker is "" begin := marker == "" - // seen keys + // Seen keys, used to group common keys seen := make(map[string]bool) - // iterate all objects with the specified prefix to collect and group specified range items - err = s.providers.StateStore().Iterate(objskeyPrefix, func(key, _ []byte) (stop bool, er error) { - // object key + // Iterate all objects with the specified prefix to collect and group specified range items + err = s.providers.StateStore().Iterate(listObjectsKeyPrefix, func(key, _ []byte) (stop bool, er error) { + // Object key objkey := string(key) - // object name - objname := objkey[len(objkeyPrefix):] + // Object name + objname := strings.TrimPrefix(objkey, allObjectsKeyPrefix) - // common prefix: if the part of object name without prefix include delimiter + // Common prefix: if the part of object name without prefix include delimiter // it is the string truncated object name after the delimiter, else // it is the bucket name itself commonPrefix := objname @@ -448,7 +465,7 @@ func (s *service) ListObjects(ctx context.Context, user, bucname, prefix, delimi } } - // if collect not begin, check the marker, if it is matched + // If collect not begin, check the marker, if it is matched // with the common prefix, then begin collection from next iterate turn // and mark this common prefix as seen // note: common prefix also can be object name, so when marker is @@ -459,19 +476,19 @@ func (s *service) ListObjects(ctx context.Context, user, bucname, prefix, delimi return } - // no begin, jump the item + // Not begin, jump the item if !begin { return } - // objects with same common prefix will be grouped into one + // Objects with same common prefix will be grouped into one // note: the objects without common prefix will present only once, so // it is not necessary to add these objects names in the seen map if seen[commonPrefix] { return } - // objects with common prefix grouped int one + // Objects with common prefix grouped int one if commonPrefix != objname { list.Prefixes = append(list.Prefixes, commonPrefix) list.NextMarker = commonPrefix @@ -487,10 +504,10 @@ func (s *service) ListObjects(ctx context.Context, user, bucname, prefix, delimi list.NextMarker = objname } - // increment collection count + // Increment collection count count++ - // check the count, if it matched the max, means + // Check the count, if it matched the max, means // the collect is complete, but the items may remain, so stop the // iteration, and mark the list was truncated if count == max { @@ -504,49 +521,28 @@ func (s *service) ListObjects(ctx context.Context, user, bucname, prefix, delimi return } -// EmptyBucket check if the user specified bucked is empty -func (s *service) EmptyBucket(ctx context.Context, user, bucname string) (empty bool, err error) { - ctx, cancel := s.opctx(ctx) - defer cancel() - - // bucket key - buckey := s.getBucketKey(bucname) - - // rlock bucket - err = s.lock.RLock(ctx, buckey) - if err != nil { - return - } - defer s.lock.RUnlock(buckey) - - // get bucket - bucket, err := s.getBucket(buckey) - if err != nil { - return - } - if bucket == nil { - err = ErrBucketNotFound - return - } - - // check acl - allow := s.checkAcl(bucket.Owner, bucket.Acl, user, action.HeadBucketAction) - if !allow { - err = ErrNotAllowed - return +func (s *service) getObject(objkey string) (object *Object, err error) { + err = s.providers.StateStore().Get(objkey, object) + if errors.Is(err, providers.ErrStateStoreNotFound) { + err = nil } + return +} - // object key prefix - objkeyPrefix := s.getObjectKeyPrefix(bucname) - - // initially set empty to true - empty = true - - // iterate the bucket objects, if no item, empty keep true - // if at least one, set empty to false, and stop iterate - err = s.providers.StateStore().Iterate(objkeyPrefix, func(_, _ []byte) (stop bool, er error) { - empty = false - stop = true +// deleteObjectsByPrefix try to delete all objects with the specified common prefix +func (s *service) deleteObjectsByPrefix(objectsPrefix string) (err error) { + err = s.providers.StateStore().Iterate(objectsPrefix, func(key, _ []byte) (stop bool, er error) { + objkey := string(key) + var object *Object + er = s.providers.StateStore().Get(objkey, object) + if er != nil { + return + } + er = s.providers.StateStore().Delete(objkey) + if er != nil { + return + } + _ = s.providers.FileStore().Remove(object.Cid) return })