Skip to content

Commit

Permalink
fix: multipart etag calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
imstevez committed Sep 8, 2023
1 parent 4d3d6b7 commit df7cb3d
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 208 deletions.
2 changes: 2 additions & 0 deletions s3/handlers/handlers_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ func (h *Handlers) respErr(err error) (rerr *responses.Error) {
switch err {
case object.ErrBucketNotFound:
rerr = responses.ErrNoSuchBucket
case object.ErrBucketeNotEmpty:
rerr = responses.ErrBucketNotEmpty
case object.ErrObjectNotFound:
rerr = responses.ErrNoSuchKey
case object.ErrUploadNotFound:
Expand Down
1 change: 0 additions & 1 deletion s3/handlers/handlers_multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,4 +247,3 @@ func (h *Handlers) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http

responses.WriteSuccessResponse(w, output, "CompleteMultipartUploadResult")
}

2 changes: 1 addition & 1 deletion s3/handlers/handlers_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (h *Handlers) DeleteObjectsHandler(w http.ResponseWriter, r *http.Request)

if input.Delete == nil ||
len(input.Delete.Objects) == 0 ||
len(input.Delete.Objects) > consts.MaxObjectList {
len(input.Delete.Objects) > consts.MaxDeleteList {
rerr := responses.ErrMalformedXML
err = rerr
responses.WriteErrorResponse(w, r, rerr)
Expand Down
1 change: 0 additions & 1 deletion s3/protocol/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ func parseLocation(r *http.Request, inv reflect.Value) (err error) {
fv = fv.Convert(byteSliceType)
}


switch ft.Tag.Get("location") {
case "headers":
prefix := ft.Tag.Get("locationName")
Expand Down
2 changes: 1 addition & 1 deletion s3/requests/parsers.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,4 @@ func ParsePutBucketAclRequest(r *http.Request) (req *PutBucketACLRequest, rerr *
}
req.ACL, rerr = ParseBucketACL(r)
return
}
}
1 change: 0 additions & 1 deletion s3/routers/routers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func (routers *Routers) Register() http.Handler {

hs := routers.handlers


root.Use(
hs.Cors,
hs.Log,
Expand Down
15 changes: 6 additions & 9 deletions s3/services/object/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@ import (
)

var (
ErrBucketNotFound = errors.New("bucket not found")
ErrObjectNotFound = errors.New("object not found")
ErrUploadNotFound = errors.New("upload not found")
ErrNotAllowed = errors.New("not allowed")
ErrBucketAlreadyExists = errors.New("bucket already exists")
ErrOperationTimeout = errors.New("operation timeout")
ErrContentSHA256Mismatch = errors.New("sha256 mismatch")
ErrBadDigest = errors.New("bad digest")
ErrBucketNotFound = errors.New("bucket not found")
ErrBucketeNotEmpty = errors.New("bucket not empty")
ErrObjectNotFound = errors.New("object not found")
ErrUploadNotFound = errors.New("upload not found")
ErrNotAllowed = errors.New("not allowed")
ErrBucketAlreadyExists = errors.New("bucket already exists")
)

type Service interface {
Expand All @@ -26,7 +24,6 @@ type Service interface {
GetAllBuckets(ctx context.Context, user string) (list []*Bucket, err error)
PutBucketACL(ctx context.Context, user, bucname, acl string) (err error)
GetBucketACL(ctx context.Context, user, bucname string) (acl string, err error)
EmptyBucket(ctx context.Context, user, bucname string) (empty bool, err error)

PutObject(ctx context.Context, user, bucname, objname string, body *hash.Reader, size int64, meta map[string]string) (object *Object, err error)
CopyObject(ctx context.Context, user, srcBucname, srcObjname, dstBucname, dstObjname string, meta map[string]string) (dstObject *Object, err error)
Expand Down
90 changes: 89 additions & 1 deletion s3/services/object/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/bittorrent/go-btfs/s3/action"
"github.com/bittorrent/go-btfs/s3/ctxmu"
"github.com/bittorrent/go-btfs/s3/policy"
"io"
"strings"
"time"

Expand All @@ -14,7 +15,6 @@ import (

var _ Service = (*service)(nil)

// service captures all bucket metadata for a given cluster.
type service struct {
providers providers.Providerser
lock ctxmu.MultiCtxRWLocker
Expand Down Expand Up @@ -102,3 +102,91 @@ func (s *service) checkACL(owner, acl, user string, act action.Action) (allow bo
allow = policy.IsAllowed(own, acl, act)
return
}

func (s *service) addBodyRef(ctx context.Context, cid, tokey string) (err error) {
// Cid reference key
crfkey := s.getCidrefKey(cid, tokey)

// Add cid reference
err = s.providers.StateStore().Put(crfkey, nil)

return
}

func (s *service) removeBodyRef(ctx context.Context, cid, tokey string) (err error) {
// This object cid reference key
crfkey := s.getCidrefKey(cid, tokey)

// Delete cid ref of this object
err = s.providers.StateStore().Delete(crfkey)

return
}

func (s *service) storeBody(ctx context.Context, body io.Reader, tokey string) (cid string, err error) {
// RLock all cid refs to enable no cid will be deleted
err = s.lock.RLock(ctx, s.cidrefSpace)
if err != nil {
return
}
defer s.lock.RUnlock(s.cidrefSpace)

// Store body and get the cid
cid, err = s.providers.FileStore().Store(body)
if err != nil {
return
}

// Add cid reference
err = s.addBodyRef(ctx, cid, tokey)

return
}

func (s *service) removeBody(ctx context.Context, cid, tokey string) (err error) {
// Flag to mark cid be referenced by other object
otherRef := false

// Log removing
defer func() {
fmt.Printf("s3-api: remove <%s>, ref <%s>, other-ref - %v, err: %v\n", cid, tokey, otherRef, err)
}()

// Lock all cid refs to enable new cid reference can not be added when
// remove is executing
err = s.lock.Lock(ctx, s.cidrefSpace)
if err != nil {
return
}
defer s.lock.Unlock(s.cidrefSpace)

// Remove cid ref of this object
err = s.removeBodyRef(ctx, cid, tokey)
if err != nil {
return
}

// All this cid references prefix
allRefsPrefix := s.getAllCidrefsKeyPrefix(cid)

// Iterate all this cid refs, if exists other object's ref, set
// the otherRef mark to true
err = s.providers.StateStore().Iterate(allRefsPrefix, func(key, _ []byte) (stop bool, err error) {
otherRef = true
stop = true
return
})
if err != nil {
return
}

// Exists other refs, cid body can not be removed
if otherRef {
return
}

// No other refs to this cid, remove it
err = s.providers.FileStore().Remove(cid)

return
}
53 changes: 9 additions & 44 deletions s3/services/object/service_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,23 +126,18 @@ func (s *service) DeleteBucket(ctx context.Context, user, bucname string) (err e
return
}

// Delete bucket
err = s.providers.StateStore().Delete(buckey)
// Check if bucket is empty
empty, err := s.isBucketEmpty(bucname)
if err != nil {
return
}
if !empty {
err = ErrBucketeNotEmpty
return
}

// All bucket objects prefix
objectsPrefix := s.getAllObjectsKeyPrefix(bucname)

// Try to delete all bucket objects
_ = s.deleteObjectsByPrefix(ctx, objectsPrefix)

// All bucket uploads prefix
uploadsPrefix := s.getAllUploadsKeyPrefix(bucname)

// Try to delete all bucket uploads
_ = s.deleteUploadsByPrefix(ctx, uploadsPrefix)
// Delete bucket
err = s.providers.StateStore().Delete(buckey)

return
}
Expand Down Expand Up @@ -278,37 +273,7 @@ func (s *service) GetBucketACL(ctx context.Context, user, bucname string) (acl s
}

// EmptyBucket check if the user specified bucked is empty
func (s *service) EmptyBucket(ctx context.Context, user, bucname string) (empty bool, err error) {
ctx, cancel := s.opctx(ctx)
defer cancel()

// Bucket key
buckey := s.getBucketKey(bucname)

// RLock bucket
err = s.lock.RLock(ctx, buckey)
if err != nil {
return
}
defer s.lock.RUnlock(buckey)

// Get bucket
bucket, err := s.getBucket(buckey)
if err != nil {
return
}
if bucket == nil {
err = ErrBucketNotFound
return
}

// Check action ACL
allow := s.checkACL(bucket.Owner, bucket.ACL, user, action.ListObjectsAction)
if !allow {
err = ErrNotAllowed
return
}

func (s *service) isBucketEmpty(bucname string) (empty bool, err error) {
// All bucket objects prefix
objectsPrefix := s.getAllObjectsKeyPrefix(bucname)

Expand Down
48 changes: 15 additions & 33 deletions s3/services/object/service_multipart.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package object

import (
"context"
"encoding/hex"
"errors"
"fmt"
"github.com/bittorrent/go-btfs/s3/action"
"github.com/bittorrent/go-btfs/s3/consts"
"github.com/bittorrent/go-btfs/s3/etag"
Expand Down Expand Up @@ -404,6 +402,13 @@ func (s *service) CompleteMultiPartUpload(ctx context.Context, user, bucname, ob
}
}()

// Calculate multipart etag
multiEtag, err := s.calcMultiETag(parts)
if err != nil {
return
}

// Current time
now := time.Now().UTC()

// Object
Expand All @@ -413,7 +418,7 @@ func (s *service) CompleteMultiPartUpload(ctx context.Context, user, bucname, ob
ModTime: now,
Size: size,
IsDir: false,
ETag: s.computeMultipartMD5(parts),
ETag: multiEtag.String(),
CID: cid,
ACL: "",
VersionID: "",
Expand Down Expand Up @@ -500,39 +505,16 @@ func (s *service) canonicalizeETag(etag string) string {
return etagRegex.ReplaceAllString(etag, "$1")
}

func (s *service) computeMultipartMD5(parts []*CompletePart) (md5 string) {
var finalMD5Bytes []byte
func (s *service) calcMultiETag(parts []*CompletePart) (multiEtag etag.ETag, err error) {
var completeETags []etag.ETag
for _, part := range parts {
md5Bytes, err := hex.DecodeString(s.canonicalizeETag(part.ETag))
var etg etag.ETag
etg, err = etag.Parse(part.ETag)
if err != nil {
finalMD5Bytes = append(finalMD5Bytes, []byte(part.ETag)...)
} else {
finalMD5Bytes = append(finalMD5Bytes, md5Bytes...)
}
}
md5 = fmt.Sprintf("%s-%d", etag.Multipart(finalMD5Bytes), len(parts))
return
}

// deleteUploadsByPrefix try to delete all multipart uploads with the specified common prefix
func (s *service) deleteUploadsByPrefix(ctx context.Context, uploadsPrefix string) (err error) {
err = s.providers.StateStore().Iterate(uploadsPrefix, func(key, _ []byte) (stop bool, er error) {
uplkey := string(key)
var multipart *Multipart
er = s.providers.StateStore().Get(uplkey, multipart)
if er != nil {
return
}
er = s.providers.StateStore().Delete(uplkey)
if er != nil {
return
}
for i, part := range multipart.Parts {
prtkey := s.getUploadPartKey(uplkey, i)
_ = s.removeBody(ctx, part.CID, prtkey)
}
return
})

completeETags = append(completeETags, etg)
}
multiEtag = etag.Multipart(completeETags...)
return
}
Loading

0 comments on commit df7cb3d

Please sign in to comment.