Skip to content

Commit

Permalink
vacuum garbage: abort dangling multipart uploads (#70)
Browse files Browse the repository at this point in the history
* abort dangling multipart uploads

* check running multipart uploads in yproxy and s3
  • Loading branch information
diPhantxm authored Oct 22, 2024
1 parent 1bb40c4 commit e12b1e9
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 1 deletion.
63 changes: 63 additions & 0 deletions pkg/mock/storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions pkg/proc/delete_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) err
if err != nil {
return errors.Wrap(err, "failed to delete file")
}
uploads, err := dh.StorageInterractor.ListFailedMultipartUploads()
if err != nil {
return err
}
ylogger.Zero.Info().Int("amount", len(uploads)).Msg("multipart uploads will be aborted")

if !msg.Confirm { //do not delete files if no confirmation flag provided
return nil
Expand Down Expand Up @@ -76,6 +81,12 @@ func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) err
return errors.Wrap(err, "failed to move some files")
}

for key, uploadId := range uploads {
if err := dh.StorageInterractor.AbortMultipartUpload(key, uploadId); err != nil {
return err
}
}

return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/filestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,7 @@ func (s *FileStorageInteractor) MoveObject(from string, to string) error {
func (s *FileStorageInteractor) DeleteObject(key string) error {
return os.Remove(path.Join(s.cnf.StoragePrefix, key))
}

func (s *FileStorageInteractor) AbortMultipartUploads() error {
return nil
}
55 changes: 54 additions & 1 deletion pkg/storage/s3storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path"
"strconv"
"strings"
"sync"

"github.com/yezzey-gp/aws-sdk-go/aws"
"github.com/yezzey-gp/aws-sdk-go/service/s3"
Expand All @@ -27,7 +28,8 @@ type S3StorageInteractor struct {

cnf *config.Storage

bucketMap map[string]string
bucketMap map[string]string
multipartUploads sync.Map
}

func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64, setts []settings.StorageSettings) (io.ReadCloser, error) {
Expand Down Expand Up @@ -96,6 +98,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [
}

if multipartUpload {
s.multipartUploads.Store(objectPath, true)
_, err = up.Upload(
&s3manager.UploadInput{
Bucket: aws.String(bucket),
Expand All @@ -104,6 +107,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [
StorageClass: aws.String(storageClass),
},
)
s.multipartUploads.Delete(objectPath)
} else {
var body []byte
body, err = io.ReadAll(r)
Expand Down Expand Up @@ -249,3 +253,52 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error {
}
return s.DeleteObject(from)
}

func (s *S3StorageInteractor) AbortMultipartUpload(key, uploadId string) error {
sess, err := s.pool.GetSession(context.TODO())
if err != nil {
return err
}

_, err = sess.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(s.cnf.StorageBucket),
UploadId: aws.String(uploadId),
Key: aws.String(key),
})
return err
}

func (s *S3StorageInteractor) ListFailedMultipartUploads() (map[string]string, error) {
sess, err := s.pool.GetSession(context.TODO())
if err != nil {
return nil, err
}

uploads := make([]*s3.MultipartUpload, 0)
var keyMarker *string
for {
out, err := sess.ListMultipartUploads(&s3.ListMultipartUploadsInput{
Bucket: aws.String(s.cnf.StorageBucket),
KeyMarker: keyMarker,
})
if err != nil {
return nil, err
}

uploads = append(uploads, out.Uploads...)

if !*out.IsTruncated {
break
}

keyMarker = out.NextKeyMarker
}

out := make(map[string]string)
for _, upload := range uploads {
if _, ok := s.multipartUploads.Load(*upload.Key); !ok {
out[*upload.Key] = *upload.UploadId
}
}
return out, nil
}
2 changes: 2 additions & 0 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ type StorageWriter interface {

type StorageLister interface {
ListPath(prefix string) ([]*object.ObjectInfo, error)
ListFailedMultipartUploads() (map[string]string, error)
}

type StorageMover interface {
MoveObject(from string, to string) error
DeleteObject(key string) error
AbortMultipartUpload(key, uploadId string) error
}

//go:generate mockgen -destination=pkg/mock/storage.go -package=mock
Expand Down

0 comments on commit e12b1e9

Please sign in to comment.