Skip to content

Commit

Permalink
refractor: refract object service
Browse files Browse the repository at this point in the history
  • Loading branch information
imstevez committed Sep 1, 2023
1 parent d0d92c9 commit 714aa30
Show file tree
Hide file tree
Showing 7 changed files with 718 additions and 362 deletions.
2 changes: 1 addition & 1 deletion s3/services/object/clean_read_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func WrapCleanReadCloser(rc io.ReadCloser, timeout time.Duration, afterCloseHook
go func() {
<-ctx.Done()
_ = rc.Close()
// call after hooks stack
// call after hooks by stack order
for len(afterCloseHooks) > 0 {
idx := len(afterCloseHooks) - 1
f := afterCloseHooks[idx]
Expand Down
58 changes: 58 additions & 0 deletions s3/services/object/options.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,61 @@
package object

import (
"github.com/bittorrent/go-btfs/s3/ctxmu"
"time"
)

const (
defaultKeySeparator = "/"
defaultBucketSpace = "bkt"
defaultObjectSpace = "obj"
defaultUploadSpace = "upl"
defaultOperationTimeout = 5 * time.Minute
defaultCloseBodyTimeout = 10 * time.Minute
)

var defaultLock = ctxmu.NewDefaultMultiCtxRWMutex()

type Option func(svc *service)

func WithKeySeparator(separator string) Option {
return func(svc *service) {
svc.keySeparator = separator
}
}

func WithBucketSpace(space string) Option {
return func(svc *service) {
svc.bucketSpace = space
}
}

func WithObjectSpace(space string) Option {
return func(svc *service) {
svc.objectSpace = space
}
}

func WithUploadSpace(space string) Option {
return func(svc *service) {
svc.uploadSpace = space
}
}

func WithOperationTimeout(timeout time.Duration) Option {
return func(svc *service) {
svc.operationTimeout = timeout
}
}

func WithCloseBodyTimeout(timeout time.Duration) Option {
return func(svc *service) {
svc.closeBodyTimeout = timeout
}
}

func WithLock(lock ctxmu.MultiCtxRWLocker) Option {
return func(svc *service) {
svc.lock = lock
}
}
47 changes: 20 additions & 27 deletions s3/services/object/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,25 @@ var (
)

type Service interface {
// bucket
CreateBucket(ctx context.Context, bucket, region, accessKey, acl string) error
GetBucketMeta(ctx context.Context, bucket string) (meta Bucket, err error)
HasBucket(ctx context.Context, bucket string) bool
SetEmptyBucket(emptyBucket func(ctx context.Context, bucket string) (bool, error))
DeleteBucket(ctx context.Context, bucket string) error
GetAllBucketsOfUser(username string) (list []*Bucket, err error)
UpdateBucketAcl(ctx context.Context, bucket, acl string) error
GetBucketAcl(ctx context.Context, bucket string) (string, error)
EmptyBucket(emptyBucket func(ctx context.Context, bucket string) (bool, error))

// object
PutObject(ctx context.Context, bucname, objname string, reader *hash.Reader, size int64, meta map[string]string) (obj Object, err error)
CopyObject(ctx context.Context, bucket, object string, info Object, size int64, meta map[string]string) (Object, error)
GetObject(ctx context.Context, bucket, object string) (Object, io.ReadCloser, error)
GetObjectInfo(ctx context.Context, bucket, object string) (Object, error)
DeleteObject(ctx context.Context, bucket, object string) error
ListObjects(ctx context.Context, bucket string, prefix string, marker string, delimiter string, maxKeys int) (loi Object, err error)
ListObjectsV2(ctx context.Context, bucket string, prefix string, continuationToken string, delimiter string, maxKeys int, owner bool, startAfter string) (ListObjectsV2Info, error)

// martipart
CreateMultipartUpload(ctx context.Context, bucname string, objname string, meta map[string]string) (mtp Multipart, err error)
AbortMultipartUpload(ctx context.Context, bucname string, objname string, uploadID string) (err error)
UploadPart(ctx context.Context, bucname string, objname string, uploadID string, partID int, reader *hash.Reader, size int64, meta map[string]string) (part ObjectPart, err error)
CompleteMultiPartUpload(ctx context.Context, bucname string, objname string, uploadID string, parts []CompletePart) (obj Object, err error)
GetMultipart(ctx context.Context, bucname string, objname string, uploadID string) (mtp Multipart, err error)
CreateBucket(ctx context.Context, user, bucname, region, acl string) (bucket *Bucket, err error)
GetBucket(ctx context.Context, user, bucname string) (bucket *Bucket, err error)
DeleteBucket(ctx context.Context, user, bucname string) (err error)
GetAllBuckets(ctx context.Context, user string) (list []*Bucket, err error)
PutBucketAcl(ctx context.Context, user, bucname, acl string) (err error)
GetBucketAcl(ctx context.Context, user, bucname string) (acl string, err error)
EmptyBucket(ctx context.Context, user, bucname string) (empty bool, err error)

PutObject(ctx context.Context, user string, bucname, objname string, body *hash.Reader, size int64, meta map[string]string) (object *Object, err error)
CopyObject(ctx context.Context, user string, srcBucname, srcObjname, dstBucname, dstObjname string, meta map[string]string) (dstObject *Object, err error)
GetObject(ctx context.Context, user, bucname, objname string) (object *Object, body io.ReadCloser, err error)
DeleteObject(ctx context.Context, user, bucname, objname string) (err error)
// todo: DeleteObjects
ListObjects(ctx context.Context, user, bucname, prefix, delimiter, marker string, max int) (list *ObjectsList, err error)

CreateMultipartUpload(ctx context.Context, user, bucname, objname string, meta map[string]string) (multipart *Multipart, err error)
UploadPart(ctx context.Context, user, bucname, objname, uplid string, partId int, reader *hash.Reader, size int64, meta map[string]string) (part *ObjectPart, err error)
AbortMultipartUpload(ctx context.Context, user, bucname, objname, uplid string) (err error)
CompleteMultiPartUpload(ctx context.Context, user string, bucname, objname, uplid string, parts []*CompletePart) (object *Object, err error)
}

// Bucket contains bucket metadata.
Expand Down Expand Up @@ -79,7 +72,7 @@ type Multipart struct {
UploadID string
Initiated time.Time
MetaData map[string]string
Parts []ObjectPart
Parts []*ObjectPart
}

type ObjectPart struct {
Expand Down
80 changes: 25 additions & 55 deletions s3/services/object/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,66 +5,36 @@ import (
"github.com/bittorrent/go-btfs/s3/action"
"github.com/bittorrent/go-btfs/s3/ctxmu"
"github.com/bittorrent/go-btfs/s3/policy"
"regexp"
"strings"
"time"

"github.com/bittorrent/go-btfs/s3/providers"
)

const (
defaultKeySeparator = "/"
defaultBucketSpace = "bkt"
defaultObjectSpace = "obj"
defaultUploadSpace = "upl"
defaultOperationTimeout = 5 * time.Minute
defaultReadObjectTimeout = 1 * time.Hour

bucketPrefix = "bkt/"
objectKeyFormat = "obj/%s/%s"
objectPrefix = "obj/%s/"
allObjectPrefixFormat = "obj/%s/%s"
allObjectSeekKeyFormat = "obj/%s/%s"

uploadKeyFormat = "uploadObj/%s/%s/%s"
allUploadPrefixFormat = "uploadObj/%s/%s"
allUploadSeekKeyFormat = "uploadObj/%s/%s/%s"

deleteKeyFormat = "delObj/%s"
allDeletePrefixFormat = "delObj/"

globalOperationTimeout = 5 * time.Minute
deleteOperationTimeout = 1 * time.Minute

maxCpuPercent = 60
maxUsedMemoryPercent = 80
)

var etagRegex = regexp.MustCompile("\"*?([^\"]*?)\"*?$")

var _ Service = (*service)(nil)

// service captures all bucket metadata for a given cluster.
type service struct {
providers providers.Providerser
lock ctxmu.MultiCtxRWLocker
keySeparator string
bucketSpace string
objectSpace string
uploadSpace string
operationTimeout time.Duration
readObjectTimeout time.Duration
providers providers.Providerser
lock ctxmu.MultiCtxRWLocker
keySeparator string
bucketSpace string
objectSpace string
uploadSpace string
operationTimeout time.Duration
closeBodyTimeout time.Duration
}

func NewService(providers providers.Providerser, options ...Option) Service {
s := &service{
providers: providers,
lock: ctxmu.NewDefaultMultiCtxRWMutex(),
lock: defaultLock,
keySeparator: defaultKeySeparator,
bucketSpace: defaultBucketSpace,
objectSpace: defaultObjectSpace,
uploadSpace: defaultUploadSpace,
operationTimeout: defaultOperationTimeout,
closeBodyTimeout: defaultCloseBodyTimeout,
}
for _, option := range options {
option(s)
Expand All @@ -74,43 +44,43 @@ func NewService(providers providers.Providerser, options ...Option) Service {

// common helper methods

func (s *service) getBucketKeyPrefix() (prefix string) {
func (s *service) getAllBucketsKeyPrefix() (prefix string) {
prefix = strings.Join([]string{s.bucketSpace, ""}, s.keySeparator)
return
}

func (s *service) getObjectKeyPrefix(bucname string) (prefix string) {
prefix = strings.Join([]string{s.objectSpace, bucname, ""}, s.keySeparator)
func (s *service) getBucketKey(bucname string) (key string) {
key = s.getAllBucketsKeyPrefix() + bucname
return
}

func (s *service) getUploadKeyPrefix(bucname, objname string) (prefix string) {
prefix = strings.Join([]string{s.uploadSpace, bucname, objname, ""}, s.keySeparator)
func (s *service) getAllObjectsKeyPrefix(bucname string) (prefix string) {
prefix = strings.Join([]string{s.objectSpace, bucname, ""}, s.keySeparator)
return
}

func (s *service) getBucketKey(bucname string) (key string) {
key = s.getBucketKeyPrefix() + bucname
func (s *service) getObjectKey(bucname, objname string) (key string) {
key = s.getAllObjectsKeyPrefix(bucname) + objname
return
}

func (s *service) getObjectKey(bucname, objname string) (key string) {
key = s.getObjectKeyPrefix(bucname) + objname
func (s *service) getAllUploadsKeyPrefix(bucname string) (prefix string) {
prefix = strings.Join([]string{s.uploadSpace, bucname, ""}, s.keySeparator)
return
}

func (s *service) getUploadKey(bucname, objname, uploadid string) (key string) {
key = s.getUploadKeyPrefix(bucname, objname) + uploadid
key = strings.Join([]string{s.getAllUploadsKeyPrefix(bucname), objname, uploadid}, s.keySeparator)
return
}

func (s *service) checkAcl(owner, acl, user string, act action.Action) (allow bool) {
own := user != "" && user == owner
allow = policy.IsAllowed(own, acl, act)
func (s *service) opctx(parent context.Context) (ctx context.Context, cancel context.CancelFunc) {
ctx, cancel = context.WithTimeout(parent, s.operationTimeout)
return
}

func (s *service) opctx(parent context.Context) (ctx context.Context, cancel context.CancelFunc) {
ctx, cancel = context.WithTimeout(parent, s.operationTimeout)
func (s *service) checkAcl(owner, acl, user string, act action.Action) (allow bool) {
own := user != "" && user == owner
allow = policy.IsAllowed(own, acl, act)
return
}
Loading

0 comments on commit 714aa30

Please sign in to comment.