Skip to content

Commit

Permalink
check running multipart uploads in yproxy and s3
Browse files Browse the repository at this point in the history
  • Loading branch information
diPhantxm committed Oct 22, 2024
1 parent 2ad084a commit fa8bad8
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 28 deletions.
54 changes: 42 additions & 12 deletions pkg/mock/storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions pkg/proc/delete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) err
if err != nil {
return errors.Wrap(err, "failed to delete file")
}
uploads, err := dh.StorageInterractor.ListFailedMultipartUploads()
if err != nil {
return err
}
ylogger.Zero.Info().Int("amount", len(uploads)).Msg("multipart uploads will be aborted")

if !msg.Confirm { //do not delete files if no confirmation flag provided
return nil
Expand Down Expand Up @@ -76,8 +81,10 @@ func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) err
return errors.Wrap(err, "failed to move some files")
}

if err := dh.StorageInterractor.AbortMultipartUploads(); err != nil {
return err
for key, uploadId := range uploads {
if err := dh.StorageInterractor.AbortMultipartUpload(key, uploadId); err != nil {
return err
}
}

return nil
Expand Down
39 changes: 26 additions & 13 deletions pkg/storage/s3storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path"
"strconv"
"strings"
"sync"

"github.com/yezzey-gp/aws-sdk-go/aws"
"github.com/yezzey-gp/aws-sdk-go/service/s3"
Expand All @@ -27,7 +28,8 @@ type S3StorageInteractor struct {

cnf *config.Storage

bucketMap map[string]string
bucketMap map[string]string
multipartUploads sync.Map
}

func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64, setts []settings.StorageSettings) (io.ReadCloser, error) {
Expand Down Expand Up @@ -96,6 +98,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [
}

if multipartUpload {
s.multipartUploads.Store(objectPath, true)
_, err = up.Upload(
&s3manager.UploadInput{
Bucket: aws.String(bucket),
Expand All @@ -104,6 +107,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [
StorageClass: aws.String(storageClass),
},
)
s.multipartUploads.Delete(objectPath)
} else {
var body []byte
body, err = io.ReadAll(r)
Expand Down Expand Up @@ -250,12 +254,26 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error {
return s.DeleteObject(from)
}

func (s *S3StorageInteractor) AbortMultipartUploads() error {
func (s *S3StorageInteractor) AbortMultipartUpload(key, uploadId string) error {
sess, err := s.pool.GetSession(context.TODO())
if err != nil {
return err
}

_, err = sess.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(s.cnf.StorageBucket),
UploadId: aws.String(uploadId),
Key: aws.String(key),
})
return err
}

func (s *S3StorageInteractor) ListFailedMultipartUploads() (map[string]string, error) {
sess, err := s.pool.GetSession(context.TODO())
if err != nil {
return nil, err
}

uploads := make([]*s3.MultipartUpload, 0)
var keyMarker *string
for {
Expand All @@ -264,7 +282,7 @@ func (s *S3StorageInteractor) AbortMultipartUploads() error {
KeyMarker: keyMarker,
})
if err != nil {
return err
return nil, err
}

uploads = append(uploads, out.Uploads...)
Expand All @@ -276,16 +294,11 @@ func (s *S3StorageInteractor) AbortMultipartUploads() error {
keyMarker = out.NextKeyMarker
}

for i := 0; i < len(uploads); i++ {
_, err := sess.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(s.cnf.StorageBucket),
UploadId: uploads[i].UploadId,
Key: uploads[i].Key,
})
if err != nil {
return err
out := make(map[string]string)
for _, upload := range uploads {
if _, ok := s.multipartUploads.Load(*upload.Key); !ok {
out[*upload.Key] = *upload.UploadId
}
}

return nil
return out, nil
}
3 changes: 2 additions & 1 deletion pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ type StorageWriter interface {

type StorageLister interface {
ListPath(prefix string) ([]*object.ObjectInfo, error)
ListFailedMultipartUploads() (map[string]string, error)
}

type StorageMover interface {
MoveObject(from string, to string) error
DeleteObject(key string) error
AbortMultipartUploads() error
AbortMultipartUpload(key, uploadId string) error
}

//go:generate mockgen -destination=pkg/mock/storage.go -package=mock
Expand Down

0 comments on commit fa8bad8

Please sign in to comment.