From e12b1e9c6ab3af35d430c1339b9edc0b4c464c20 Mon Sep 17 00:00:00 2001 From: Kirill Date: Tue, 22 Oct 2024 14:24:16 +0300 Subject: [PATCH] vacuum garbage: abort dangling multipart uploads (#70) * abort dangling multipart uploads * check running multipart uploads in yproxy and s3 --- pkg/mock/storage.go | 63 ++++++++++++++++++++++++++++++++++++++ pkg/proc/delete_handler.go | 11 +++++++ pkg/storage/filestorage.go | 4 +++ pkg/storage/s3storage.go | 55 ++++++++++++++++++++++++++++++++- pkg/storage/storage.go | 2 ++ 5 files changed, 134 insertions(+), 1 deletion(-) diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go index 6f2488b..f5b1b8d 100644 --- a/pkg/mock/storage.go +++ b/pkg/mock/storage.go @@ -22,6 +22,7 @@ import ( type MockStorageReader struct { ctrl *gomock.Controller recorder *MockStorageReaderMockRecorder + isgomock struct{} } // MockStorageReaderMockRecorder is the mock recorder for MockStorageReader. @@ -60,6 +61,7 @@ func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset, setts type MockStorageWriter struct { ctrl *gomock.Controller recorder *MockStorageWriterMockRecorder + isgomock struct{} } // MockStorageWriterMockRecorder is the mock recorder for MockStorageWriter. @@ -111,6 +113,7 @@ func (mr *MockStorageWriterMockRecorder) PutFileToDest(name, r, settings any) *g type MockStorageLister struct { ctrl *gomock.Controller recorder *MockStorageListerMockRecorder + isgomock struct{} } // MockStorageListerMockRecorder is the mock recorder for MockStorageLister. @@ -130,6 +133,21 @@ func (m *MockStorageLister) EXPECT() *MockStorageListerMockRecorder { return m.recorder } +// ListFailedMultipartUploads mocks base method. +func (m *MockStorageLister) ListFailedMultipartUploads() (map[string]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListFailedMultipartUploads") + ret0, _ := ret[0].(map[string]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListFailedMultipartUploads indicates an expected call of ListFailedMultipartUploads. +func (mr *MockStorageListerMockRecorder) ListFailedMultipartUploads() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListFailedMultipartUploads", reflect.TypeOf((*MockStorageLister)(nil).ListFailedMultipartUploads)) +} + // ListPath mocks base method. func (m *MockStorageLister) ListPath(prefix string) ([]*object.ObjectInfo, error) { m.ctrl.T.Helper() @@ -149,6 +167,7 @@ func (mr *MockStorageListerMockRecorder) ListPath(prefix any) *gomock.Call { type MockStorageMover struct { ctrl *gomock.Controller recorder *MockStorageMoverMockRecorder + isgomock struct{} } // MockStorageMoverMockRecorder is the mock recorder for MockStorageMover. @@ -168,6 +187,20 @@ func (m *MockStorageMover) EXPECT() *MockStorageMoverMockRecorder { return m.recorder } +// AbortMultipartUpload mocks base method. +func (m *MockStorageMover) AbortMultipartUpload(key, uploadId string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AbortMultipartUpload", key, uploadId) + ret0, _ := ret[0].(error) + return ret0 +} + +// AbortMultipartUpload indicates an expected call of AbortMultipartUpload. +func (mr *MockStorageMoverMockRecorder) AbortMultipartUpload(key, uploadId any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AbortMultipartUpload", reflect.TypeOf((*MockStorageMover)(nil).AbortMultipartUpload), key, uploadId) +} + // DeleteObject mocks base method. func (m *MockStorageMover) DeleteObject(key string) error { m.ctrl.T.Helper() @@ -200,6 +233,7 @@ func (mr *MockStorageMoverMockRecorder) MoveObject(from, to any) *gomock.Call { type MockStorageInteractor struct { ctrl *gomock.Controller recorder *MockStorageInteractorMockRecorder + isgomock struct{} } // MockStorageInteractorMockRecorder is the mock recorder for MockStorageInteractor. @@ -219,6 +253,20 @@ func (m *MockStorageInteractor) EXPECT() *MockStorageInteractorMockRecorder { return m.recorder } +// AbortMultipartUpload mocks base method. +func (m *MockStorageInteractor) AbortMultipartUpload(key, uploadId string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AbortMultipartUpload", key, uploadId) + ret0, _ := ret[0].(error) + return ret0 +} + +// AbortMultipartUpload indicates an expected call of AbortMultipartUpload. +func (mr *MockStorageInteractorMockRecorder) AbortMultipartUpload(key, uploadId any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AbortMultipartUpload", reflect.TypeOf((*MockStorageInteractor)(nil).AbortMultipartUpload), key, uploadId) +} + // CatFileFromStorage mocks base method. func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64, setts []settings.StorageSettings) (io.ReadCloser, error) { m.ctrl.T.Helper() @@ -248,6 +296,21 @@ func (mr *MockStorageInteractorMockRecorder) DeleteObject(key any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageInteractor)(nil).DeleteObject), key) } +// ListFailedMultipartUploads mocks base method. +func (m *MockStorageInteractor) ListFailedMultipartUploads() (map[string]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListFailedMultipartUploads") + ret0, _ := ret[0].(map[string]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListFailedMultipartUploads indicates an expected call of ListFailedMultipartUploads. +func (mr *MockStorageInteractorMockRecorder) ListFailedMultipartUploads() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListFailedMultipartUploads", reflect.TypeOf((*MockStorageInteractor)(nil).ListFailedMultipartUploads)) +} + // ListPath mocks base method. func (m *MockStorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, error) { m.ctrl.T.Helper() diff --git a/pkg/proc/delete_handler.go b/pkg/proc/delete_handler.go index 5d01709..9115ed7 100644 --- a/pkg/proc/delete_handler.go +++ b/pkg/proc/delete_handler.go @@ -33,6 +33,11 @@ func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) err if err != nil { return errors.Wrap(err, "failed to delete file") } + uploads, err := dh.StorageInterractor.ListFailedMultipartUploads() + if err != nil { + return err + } + ylogger.Zero.Info().Int("amount", len(uploads)).Msg("multipart uploads will be aborted") if !msg.Confirm { //do not delete files if no confirmation flag provided return nil @@ -76,6 +81,12 @@ func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) err return errors.Wrap(err, "failed to move some files") } + for key, uploadId := range uploads { + if err := dh.StorageInterractor.AbortMultipartUpload(key, uploadId); err != nil { + return err + } + } + return nil } diff --git a/pkg/storage/filestorage.go b/pkg/storage/filestorage.go index 4515ab6..a05708f 100644 --- a/pkg/storage/filestorage.go +++ b/pkg/storage/filestorage.go @@ -85,3 +85,7 @@ func (s *FileStorageInteractor) MoveObject(from string, to string) error { func (s *FileStorageInteractor) DeleteObject(key string) error { return os.Remove(path.Join(s.cnf.StoragePrefix, key)) } + +func (s *FileStorageInteractor) AbortMultipartUploads() error { + return nil +} diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 3a6d14d..f6272b9 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -8,6 +8,7 @@ import ( "path" "strconv" "strings" + "sync" "github.com/yezzey-gp/aws-sdk-go/aws" "github.com/yezzey-gp/aws-sdk-go/service/s3" @@ -27,7 +28,8 @@ type S3StorageInteractor struct { cnf *config.Storage - bucketMap map[string]string + bucketMap map[string]string + multipartUploads sync.Map } func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64, setts []settings.StorageSettings) (io.ReadCloser, error) { @@ -96,6 +98,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ } if multipartUpload { + s.multipartUploads.Store(objectPath, true) _, err = up.Upload( &s3manager.UploadInput{ Bucket: aws.String(bucket), @@ -104,6 +107,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ StorageClass: aws.String(storageClass), }, ) + s.multipartUploads.Delete(objectPath) } else { var body []byte body, err = io.ReadAll(r) @@ -249,3 +253,52 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error { } return s.DeleteObject(from) } + +func (s *S3StorageInteractor) AbortMultipartUpload(key, uploadId string) error { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + return err + } + + _, err = sess.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + Bucket: aws.String(s.cnf.StorageBucket), + UploadId: aws.String(uploadId), + Key: aws.String(key), + }) + return err +} + +func (s *S3StorageInteractor) ListFailedMultipartUploads() (map[string]string, error) { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + return nil, err + } + + uploads := make([]*s3.MultipartUpload, 0) + var keyMarker *string + for { + out, err := sess.ListMultipartUploads(&s3.ListMultipartUploadsInput{ + Bucket: aws.String(s.cnf.StorageBucket), + KeyMarker: keyMarker, + }) + if err != nil { + return nil, err + } + + uploads = append(uploads, out.Uploads...) + + if !*out.IsTruncated { + break + } + + keyMarker = out.NextKeyMarker + } + + out := make(map[string]string) + for _, upload := range uploads { + if _, ok := s.multipartUploads.Load(*upload.Key); !ok { + out[*upload.Key] = *upload.UploadId + } + } + return out, nil +} diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 3e00c4c..8de63a4 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -21,11 +21,13 @@ type StorageWriter interface { type StorageLister interface { ListPath(prefix string) ([]*object.ObjectInfo, error) + ListFailedMultipartUploads() (map[string]string, error) } type StorageMover interface { MoveObject(from string, to string) error DeleteObject(key string) error + AbortMultipartUpload(key, uploadId string) error } //go:generate mockgen -destination=pkg/mock/storage.go -package=mock