From fa8bad8bbe414f57a3d0bd18ff5e806f71cf66ab Mon Sep 17 00:00:00 2001 From: Kirill Gavrilov Date: Tue, 22 Oct 2024 10:47:35 +0000 Subject: [PATCH] check running multipart uploads in yproxy and s3 --- pkg/mock/storage.go | 54 +++++++++++++++++++++++++++++--------- pkg/proc/delete_handler.go | 11 ++++++-- pkg/storage/s3storage.go | 39 ++++++++++++++++++--------- pkg/storage/storage.go | 3 ++- 4 files changed, 79 insertions(+), 28 deletions(-) diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go index 11df57f..f5b1b8d 100644 --- a/pkg/mock/storage.go +++ b/pkg/mock/storage.go @@ -133,6 +133,21 @@ func (m *MockStorageLister) EXPECT() *MockStorageListerMockRecorder { return m.recorder } +// ListFailedMultipartUploads mocks base method. +func (m *MockStorageLister) ListFailedMultipartUploads() (map[string]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListFailedMultipartUploads") + ret0, _ := ret[0].(map[string]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListFailedMultipartUploads indicates an expected call of ListFailedMultipartUploads. +func (mr *MockStorageListerMockRecorder) ListFailedMultipartUploads() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListFailedMultipartUploads", reflect.TypeOf((*MockStorageLister)(nil).ListFailedMultipartUploads)) +} + // ListPath mocks base method. func (m *MockStorageLister) ListPath(prefix string) ([]*object.ObjectInfo, error) { m.ctrl.T.Helper() @@ -172,18 +187,18 @@ func (m *MockStorageMover) EXPECT() *MockStorageMoverMockRecorder { return m.recorder } -// AbortMultipartUploads mocks base method. -func (m *MockStorageMover) AbortMultipartUploads() error { +// AbortMultipartUpload mocks base method. +func (m *MockStorageMover) AbortMultipartUpload(key, uploadId string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AbortMultipartUploads") + ret := m.ctrl.Call(m, "AbortMultipartUpload", key, uploadId) ret0, _ := ret[0].(error) return ret0 } -// AbortMultipartUploads indicates an expected call of AbortMultipartUploads. -func (mr *MockStorageMoverMockRecorder) AbortMultipartUploads() *gomock.Call { +// AbortMultipartUpload indicates an expected call of AbortMultipartUpload. +func (mr *MockStorageMoverMockRecorder) AbortMultipartUpload(key, uploadId any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AbortMultipartUploads", reflect.TypeOf((*MockStorageMover)(nil).AbortMultipartUploads)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AbortMultipartUpload", reflect.TypeOf((*MockStorageMover)(nil).AbortMultipartUpload), key, uploadId) } // DeleteObject mocks base method. @@ -238,18 +253,18 @@ func (m *MockStorageInteractor) EXPECT() *MockStorageInteractorMockRecorder { return m.recorder } -// AbortMultipartUploads mocks base method. -func (m *MockStorageInteractor) AbortMultipartUploads() error { +// AbortMultipartUpload mocks base method. +func (m *MockStorageInteractor) AbortMultipartUpload(key, uploadId string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AbortMultipartUploads") + ret := m.ctrl.Call(m, "AbortMultipartUpload", key, uploadId) ret0, _ := ret[0].(error) return ret0 } -// AbortMultipartUploads indicates an expected call of AbortMultipartUploads. -func (mr *MockStorageInteractorMockRecorder) AbortMultipartUploads() *gomock.Call { +// AbortMultipartUpload indicates an expected call of AbortMultipartUpload. +func (mr *MockStorageInteractorMockRecorder) AbortMultipartUpload(key, uploadId any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AbortMultipartUploads", reflect.TypeOf((*MockStorageInteractor)(nil).AbortMultipartUploads)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AbortMultipartUpload", reflect.TypeOf((*MockStorageInteractor)(nil).AbortMultipartUpload), key, uploadId) } // CatFileFromStorage mocks base method. @@ -281,6 +296,21 @@ func (mr *MockStorageInteractorMockRecorder) DeleteObject(key any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageInteractor)(nil).DeleteObject), key) } +// ListFailedMultipartUploads mocks base method. +func (m *MockStorageInteractor) ListFailedMultipartUploads() (map[string]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListFailedMultipartUploads") + ret0, _ := ret[0].(map[string]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListFailedMultipartUploads indicates an expected call of ListFailedMultipartUploads. +func (mr *MockStorageInteractorMockRecorder) ListFailedMultipartUploads() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListFailedMultipartUploads", reflect.TypeOf((*MockStorageInteractor)(nil).ListFailedMultipartUploads)) +} + // ListPath mocks base method. func (m *MockStorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, error) { m.ctrl.T.Helper() diff --git a/pkg/proc/delete_handler.go b/pkg/proc/delete_handler.go index e7b9dc3..9115ed7 100644 --- a/pkg/proc/delete_handler.go +++ b/pkg/proc/delete_handler.go @@ -33,6 +33,11 @@ func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) err if err != nil { return errors.Wrap(err, "failed to delete file") } + uploads, err := dh.StorageInterractor.ListFailedMultipartUploads() + if err != nil { + return err + } + ylogger.Zero.Info().Int("amount", len(uploads)).Msg("multipart uploads will be aborted") if !msg.Confirm { //do not delete files if no confirmation flag provided return nil @@ -76,8 +81,10 @@ func (dh *BasicDeleteHandler) HandleDeleteGarbage(msg message.DeleteMessage) err return errors.Wrap(err, "failed to move some files") } - if err := dh.StorageInterractor.AbortMultipartUploads(); err != nil { - return err + for key, uploadId := range uploads { + if err := dh.StorageInterractor.AbortMultipartUpload(key, uploadId); err != nil { + return err + } } return nil diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index ab45c82..f6272b9 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -8,6 +8,7 @@ import ( "path" "strconv" "strings" + "sync" "github.com/yezzey-gp/aws-sdk-go/aws" "github.com/yezzey-gp/aws-sdk-go/service/s3" @@ -27,7 +28,8 @@ type S3StorageInteractor struct { cnf *config.Storage - bucketMap map[string]string + bucketMap map[string]string + multipartUploads sync.Map } func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64, setts []settings.StorageSettings) (io.ReadCloser, error) { @@ -96,6 +98,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ } if multipartUpload { + s.multipartUploads.Store(objectPath, true) _, err = up.Upload( &s3manager.UploadInput{ Bucket: aws.String(bucket), @@ -104,6 +107,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ StorageClass: aws.String(storageClass), }, ) + s.multipartUploads.Delete(objectPath) } else { var body []byte body, err = io.ReadAll(r) @@ -250,12 +254,26 @@ func (s *S3StorageInteractor) MoveObject(from string, to string) error { return s.DeleteObject(from) } -func (s *S3StorageInteractor) AbortMultipartUploads() error { +func (s *S3StorageInteractor) AbortMultipartUpload(key, uploadId string) error { sess, err := s.pool.GetSession(context.TODO()) if err != nil { return err } + _, err = sess.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ + Bucket: aws.String(s.cnf.StorageBucket), + UploadId: aws.String(uploadId), + Key: aws.String(key), + }) + return err +} + +func (s *S3StorageInteractor) ListFailedMultipartUploads() (map[string]string, error) { + sess, err := s.pool.GetSession(context.TODO()) + if err != nil { + return nil, err + } + uploads := make([]*s3.MultipartUpload, 0) var keyMarker *string for { @@ -264,7 +282,7 @@ func (s *S3StorageInteractor) AbortMultipartUploads() error { KeyMarker: keyMarker, }) if err != nil { - return err + return nil, err } uploads = append(uploads, out.Uploads...) @@ -276,16 +294,11 @@ func (s *S3StorageInteractor) AbortMultipartUploads() error { keyMarker = out.NextKeyMarker } - for i := 0; i < len(uploads); i++ { - _, err := sess.AbortMultipartUpload(&s3.AbortMultipartUploadInput{ - Bucket: aws.String(s.cnf.StorageBucket), - UploadId: uploads[i].UploadId, - Key: uploads[i].Key, - }) - if err != nil { - return err + out := make(map[string]string) + for _, upload := range uploads { + if _, ok := s.multipartUploads.Load(*upload.Key); !ok { + out[*upload.Key] = *upload.UploadId } } - - return nil + return out, nil } diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 22e35c3..8de63a4 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -21,12 +21,13 @@ type StorageWriter interface { type StorageLister interface { ListPath(prefix string) ([]*object.ObjectInfo, error) + ListFailedMultipartUploads() (map[string]string, error) } type StorageMover interface { MoveObject(from string, to string) error DeleteObject(key string) error - AbortMultipartUploads() error + AbortMultipartUpload(key, uploadId string) error } //go:generate mockgen -destination=pkg/mock/storage.go -package=mock