From 8f56a9b7df95a7a4b7db010a90002099352aec18 Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Sat, 24 Aug 2024 08:03:14 +0000 Subject: [PATCH 1/6] Refactor client-server interaction facilities --- cmd/client/main.go | 5 +- pkg/message/object_meta_message.go | 10 +- pkg/mock/backups.go | 9 +- pkg/mock/database.go | 9 +- pkg/mock/proc/yrreader.go | 11 +- pkg/mock/storage.go | 54 ++++---- pkg/object/objectInfo.go | 6 + pkg/proc/delete_handler_test.go | 4 +- pkg/proc/interaction.go | 197 ++++++++++++++++------------- pkg/storage/filestorage.go | 10 +- pkg/storage/s3storage.go | 15 +-- pkg/storage/storage.go | 6 +- 12 files changed, 193 insertions(+), 143 deletions(-) create mode 100644 pkg/object/objectInfo.go diff --git a/cmd/client/main.go b/cmd/client/main.go index 151da04..9f1ed15 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -6,12 +6,11 @@ import ( "net" "os" - "github.com/yezzey-gp/yproxy/pkg/storage" - "github.com/spf13/cobra" "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/client" "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/proc" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -167,7 +166,7 @@ func listFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { r := proc.NewProtoReader(ycl) done := false - res := make([]*storage.ObjectInfo, 0) + res := make([]*object.ObjectInfo, 0) for { if done { break diff --git a/pkg/message/object_meta_message.go b/pkg/message/object_meta_message.go index 6f8bfc4..2c3cf2e 100644 --- a/pkg/message/object_meta_message.go +++ b/pkg/message/object_meta_message.go @@ -4,16 +4,16 @@ import ( "bytes" "encoding/binary" - "github.com/yezzey-gp/yproxy/pkg/storage" + "github.com/yezzey-gp/yproxy/pkg/object" ) type ObjectInfoMessage struct { - Content []*storage.ObjectInfo + Content []*object.ObjectInfo } var _ ProtoMessage = &ObjectInfoMessage{} -func NewObjectMetaMessage(content []*storage.ObjectInfo) *ObjectInfoMessage { +func NewObjectMetaMessage(content []*object.ObjectInfo) *ObjectInfoMessage { return &ObjectInfoMessage{ Content: content, } @@ -44,12 +44,12 @@ func (c *ObjectInfoMessage) Encode() []byte { func (c *ObjectInfoMessage) Decode(body []byte) { body = body[4:] - c.Content = make([]*storage.ObjectInfo, 0) + c.Content = make([]*object.ObjectInfo, 0) for len(body) > 0 { name, index := c.GetString(body) size := int64(binary.BigEndian.Uint64(body[index : index+8])) - c.Content = append(c.Content, &storage.ObjectInfo{ + c.Content = append(c.Content, &object.ObjectInfo{ Path: name, Size: size, }) diff --git a/pkg/mock/backups.go b/pkg/mock/backups.go index 530bb1b..8ec8647 100644 --- a/pkg/mock/backups.go +++ b/pkg/mock/backups.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: pkg/backups/backups.go +// +// Generated by this command: +// +// mockgen -source=pkg/backups/backups.go -destination=pkg/mock/backups.go -package=mock +// // Package mock is a generated GoMock package. package mock @@ -7,7 +12,7 @@ package mock import ( reflect "reflect" - gomock "github.com/golang/mock/gomock" + gomock "go.uber.org/mock/gomock" ) // MockBackupInterractor is a mock of BackupInterractor interface. @@ -43,7 +48,7 @@ func (m *MockBackupInterractor) GetFirstLSN(arg0 int) (uint64, error) { } // GetFirstLSN indicates an expected call of GetFirstLSN. -func (mr *MockBackupInterractorMockRecorder) GetFirstLSN(arg0 interface{}) *gomock.Call { +func (mr *MockBackupInterractorMockRecorder) GetFirstLSN(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFirstLSN", reflect.TypeOf((*MockBackupInterractor)(nil).GetFirstLSN), arg0) } diff --git a/pkg/mock/database.go b/pkg/mock/database.go index d04c573..04408c2 100644 --- a/pkg/mock/database.go +++ b/pkg/mock/database.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: pkg/database/database.go +// +// Generated by this command: +// +// mockgen -source=pkg/database/database.go -destination=pkg/mock/database.go -package=mock +// // Package mock is a generated GoMock package. package mock @@ -7,7 +12,7 @@ package mock import ( reflect "reflect" - gomock "github.com/golang/mock/gomock" + gomock "go.uber.org/mock/gomock" ) // MockDatabaseInterractor is a mock of DatabaseInterractor interface. @@ -44,7 +49,7 @@ func (m *MockDatabaseInterractor) GetVirtualExpireIndexes(arg0 int) (map[string] } // GetVirtualExpireIndexes indicates an expected call of GetVirtualExpireIndexes. -func (mr *MockDatabaseInterractorMockRecorder) GetVirtualExpireIndexes(arg0 interface{}) *gomock.Call { +func (mr *MockDatabaseInterractorMockRecorder) GetVirtualExpireIndexes(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVirtualExpireIndexes", reflect.TypeOf((*MockDatabaseInterractor)(nil).GetVirtualExpireIndexes), arg0) } diff --git a/pkg/mock/proc/yrreader.go b/pkg/mock/proc/yrreader.go index d0f6975..f125507 100644 --- a/pkg/mock/proc/yrreader.go +++ b/pkg/mock/proc/yrreader.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: pkg/proc/yrreader.go +// +// Generated by this command: +// +// mockgen -source=pkg/proc/yrreader.go -destination=pkg/mock/proc/yrreader.go -package=mock +// // Package mock is a generated GoMock package. package mock @@ -7,7 +12,7 @@ package mock import ( reflect "reflect" - gomock "github.com/golang/mock/gomock" + gomock "go.uber.org/mock/gomock" ) // MockRestartReader is a mock of RestartReader interface. @@ -57,7 +62,7 @@ func (m *MockRestartReader) Read(p []byte) (int, error) { } // Read indicates an expected call of Read. -func (mr *MockRestartReaderMockRecorder) Read(p interface{}) *gomock.Call { +func (mr *MockRestartReaderMockRecorder) Read(p any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockRestartReader)(nil).Read), p) } @@ -71,7 +76,7 @@ func (m *MockRestartReader) Restart(offsetStart int64) error { } // Restart indicates an expected call of Restart. -func (mr *MockRestartReaderMockRecorder) Restart(offsetStart interface{}) *gomock.Call { +func (mr *MockRestartReaderMockRecorder) Restart(offsetStart any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Restart", reflect.TypeOf((*MockRestartReader)(nil).Restart), offsetStart) } diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go index a733f0d..3dbe9cc 100644 --- a/pkg/mock/storage.go +++ b/pkg/mock/storage.go @@ -1,5 +1,10 @@ // Code generated by MockGen. DO NOT EDIT. // Source: pkg/storage/storage.go +// +// Generated by this command: +// +// mockgen -source=pkg/storage/storage.go -destination=pkg/mock/storage.go -package=mock +// // Package mock is a generated GoMock package. package mock @@ -8,8 +13,9 @@ import ( io "io" reflect "reflect" - gomock "github.com/golang/mock/gomock" - storage "github.com/yezzey-gp/yproxy/pkg/storage" + message "github.com/yezzey-gp/yproxy/pkg/message" + object "github.com/yezzey-gp/yproxy/pkg/object" + gomock "go.uber.org/mock/gomock" ) // MockStorageReader is a mock of StorageReader interface. @@ -45,7 +51,7 @@ func (m *MockStorageReader) CatFileFromStorage(name string, offset int64) (io.Re } // CatFileFromStorage indicates an expected call of CatFileFromStorage. -func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { +func (mr *MockStorageReaderMockRecorder) CatFileFromStorage(name, offset any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageReader)(nil).CatFileFromStorage), name, offset) } @@ -82,23 +88,23 @@ func (m *MockStorageWriter) PatchFile(name string, r io.ReadSeeker, startOffset } // PatchFile indicates an expected call of PatchFile. -func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffset interface{}) *gomock.Call { +func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffset any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageWriter)(nil).PatchFile), name, r, startOffset) } // PutFileToDest mocks base method. -func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader) error { +func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret := m.ctrl.Call(m, "PutFileToDest", name, r, settings) ret0, _ := ret[0].(error) return ret0 } // PutFileToDest indicates an expected call of PutFileToDest. -func (mr *MockStorageWriterMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { +func (mr *MockStorageWriterMockRecorder) PutFileToDest(name, r, settings any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageWriter)(nil).PutFileToDest), name, r) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageWriter)(nil).PutFileToDest), name, r, settings) } // MockStorageLister is a mock of StorageLister interface. @@ -125,16 +131,16 @@ func (m *MockStorageLister) EXPECT() *MockStorageListerMockRecorder { } // ListPath mocks base method. -func (m *MockStorageLister) ListPath(prefix string) ([]*storage.ObjectInfo, error) { +func (m *MockStorageLister) ListPath(prefix string) ([]*object.ObjectInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListPath", prefix) - ret0, _ := ret[0].([]*storage.ObjectInfo) + ret0, _ := ret[0].([]*object.ObjectInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // ListPath indicates an expected call of ListPath. -func (mr *MockStorageListerMockRecorder) ListPath(prefix interface{}) *gomock.Call { +func (mr *MockStorageListerMockRecorder) ListPath(prefix any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageLister)(nil).ListPath), prefix) } @@ -171,7 +177,7 @@ func (m *MockStorageMover) DeleteObject(key string) error { } // DeleteObject indicates an expected call of DeleteObject. -func (mr *MockStorageMoverMockRecorder) DeleteObject(key interface{}) *gomock.Call { +func (mr *MockStorageMoverMockRecorder) DeleteObject(key any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageMover)(nil).DeleteObject), key) } @@ -185,7 +191,7 @@ func (m *MockStorageMover) MoveObject(from, to string) error { } // MoveObject indicates an expected call of MoveObject. -func (mr *MockStorageMoverMockRecorder) MoveObject(from, to interface{}) *gomock.Call { +func (mr *MockStorageMoverMockRecorder) MoveObject(from, to any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageMover)(nil).MoveObject), from, to) } @@ -223,7 +229,7 @@ func (m *MockStorageInteractor) CatFileFromStorage(name string, offset int64) (i } // CatFileFromStorage indicates an expected call of CatFileFromStorage. -func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) CatFileFromStorage(name, offset any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CatFileFromStorage", reflect.TypeOf((*MockStorageInteractor)(nil).CatFileFromStorage), name, offset) } @@ -237,22 +243,22 @@ func (m *MockStorageInteractor) DeleteObject(key string) error { } // DeleteObject indicates an expected call of DeleteObject. -func (mr *MockStorageInteractorMockRecorder) DeleteObject(key interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) DeleteObject(key any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockStorageInteractor)(nil).DeleteObject), key) } // ListPath mocks base method. -func (m *MockStorageInteractor) ListPath(prefix string) ([]*storage.ObjectInfo, error) { +func (m *MockStorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ListPath", prefix) - ret0, _ := ret[0].([]*storage.ObjectInfo) + ret0, _ := ret[0].([]*object.ObjectInfo) ret1, _ := ret[1].(error) return ret0, ret1 } // ListPath indicates an expected call of ListPath. -func (mr *MockStorageInteractorMockRecorder) ListPath(prefix interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) ListPath(prefix any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListPath", reflect.TypeOf((*MockStorageInteractor)(nil).ListPath), prefix) } @@ -266,7 +272,7 @@ func (m *MockStorageInteractor) MoveObject(from, to string) error { } // MoveObject indicates an expected call of MoveObject. -func (mr *MockStorageInteractorMockRecorder) MoveObject(from, to interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) MoveObject(from, to any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MoveObject", reflect.TypeOf((*MockStorageInteractor)(nil).MoveObject), from, to) } @@ -280,21 +286,21 @@ func (m *MockStorageInteractor) PatchFile(name string, r io.ReadSeeker, startOff } // PatchFile indicates an expected call of PatchFile. -func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffset interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffset any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PatchFile", reflect.TypeOf((*MockStorageInteractor)(nil).PatchFile), name, r, startOffset) } // PutFileToDest mocks base method. -func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader) error { +func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PutFileToDest", name, r) + ret := m.ctrl.Call(m, "PutFileToDest", name, r, settings) ret0, _ := ret[0].(error) return ret0 } // PutFileToDest indicates an expected call of PutFileToDest. -func (mr *MockStorageInteractorMockRecorder) PutFileToDest(name, r interface{}) *gomock.Call { +func (mr *MockStorageInteractorMockRecorder) PutFileToDest(name, r, settings any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageInteractor)(nil).PutFileToDest), name, r) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PutFileToDest", reflect.TypeOf((*MockStorageInteractor)(nil).PutFileToDest), name, r, settings) } diff --git a/pkg/object/objectInfo.go b/pkg/object/objectInfo.go new file mode 100644 index 0000000..ae7f549 --- /dev/null +++ b/pkg/object/objectInfo.go @@ -0,0 +1,6 @@ +package object + +type ObjectInfo struct { + Path string + Size int64 +} diff --git a/pkg/proc/delete_handler_test.go b/pkg/proc/delete_handler_test.go index a37995d..c0fabd2 100644 --- a/pkg/proc/delete_handler_test.go +++ b/pkg/proc/delete_handler_test.go @@ -7,8 +7,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/yezzey-gp/yproxy/pkg/message" mock "github.com/yezzey-gp/yproxy/pkg/mock" + "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/proc" - "github.com/yezzey-gp/yproxy/pkg/storage" ) func TestReworkingName(t *testing.T) { @@ -60,7 +60,7 @@ func TestFilesToDeletion(t *testing.T) { Confirm: false, } - filesInStorage := []*storage.ObjectInfo{ + filesInStorage := []*object.ObjectInfo{ {Path: "1663_16530_not-deleted_18002_"}, {Path: "1663_16530_deleted-after-backup_18002_"}, {Path: "1663_16530_deleted-when-backup-start_18002_"}, diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index ba2207d..a37b3d9 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -12,10 +12,110 @@ import ( "github.com/yezzey-gp/yproxy/pkg/crypt" "github.com/yezzey-gp/yproxy/pkg/database" "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) +func ProcessPutExtended( + s storage.StorageInteractor, + pr *ProtoReader, + name string, + encrypt bool, settings []message.PutSetting, cr crypt.Crypter, ycl client.YproxyClient) error { + + ycl.SetExternalFilePath(name) + + var w io.WriteCloser + + r, w := io.Pipe() + + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + + var ww io.WriteCloser = w + if encrypt { + if cr == nil { + _ = ycl.ReplyError(fmt.Errorf("failed to encrypt, crypter not configured"), "connection aborted") + ycl.Close() + return + } + + var err error + ww, err = cr.Encrypt(w) + if err != nil { + _ = ycl.ReplyError(err, "failed to encrypt") + + ycl.Close() + return + } + } else { + ylogger.Zero.Debug().Str("path", name).Msg("omit encryption for chunk") + } + + defer w.Close() + defer wg.Done() + + for { + tp, body, err := pr.ReadPacket() + if err != nil { + _ = ycl.ReplyError(err, "failed to read chunk of data") + return + } + + ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request") + + switch tp { + case message.MessageTypeCopyData: + msg := message.CopyDataMessage{} + msg.Decode(body) + if n, err := ww.Write(msg.Data); err != nil { + _ = ycl.ReplyError(err, "failed to write copy data") + + return + } else if n != int(msg.Sz) { + + _ = ycl.ReplyError(fmt.Errorf("unfull write"), "failed to compelete request") + + return + } + case message.MessageTypeCommandComplete: + msg := message.CommandCompleteMessage{} + msg.Decode(body) + + if err := ww.Close(); err != nil { + _ = ycl.ReplyError(err, "failed to close connection") + return + } + + ylogger.Zero.Debug().Msg("closing msg writer") + return + } + } + }() + + err := s.PutFileToDest(name, r, settings) + + wg.Wait() + + if err != nil { + _ = ycl.ReplyError(err, "failed to upload") + + return err + } + + _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()) + + if err != nil { + _ = ycl.ReplyError(err, "failed to upload") + + return err + } + + return nil +} + func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient) error { defer func() { @@ -77,94 +177,17 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl msg := message.PutMessage{} msg.Decode(body) - ycl.SetExternalFilePath(msg.Name) - - var w io.WriteCloser - - r, w := io.Pipe() - - wg := sync.WaitGroup{} - wg.Add(1) - - go func() { - - var ww io.WriteCloser = w - if msg.Encrypt { - if cr == nil { - _ = ycl.ReplyError(err, "failed to encrypt, crypter not configured") - ycl.Close() - return - } - - var err error - ww, err = cr.Encrypt(w) - if err != nil { - _ = ycl.ReplyError(err, "failed to encrypt") - - ycl.Close() - return - } - } else { - ylogger.Zero.Debug().Str("path", msg.Name).Msg("omit encryption for chunk") - } - - defer w.Close() - defer wg.Done() - - for { - tp, body, err := pr.ReadPacket() - if err != nil { - _ = ycl.ReplyError(err, "failed to read chunk of data") - return - } - - ylogger.Zero.Debug().Str("msg-type", tp.String()).Msg("recieved client request") - - switch tp { - case message.MessageTypeCopyData: - msg := message.CopyDataMessage{} - msg.Decode(body) - if n, err := ww.Write(msg.Data); err != nil { - _ = ycl.ReplyError(err, "failed to write copy data") - - return - } else if n != int(msg.Sz) { - - _ = ycl.ReplyError(fmt.Errorf("unfull write"), "failed to compelete request") - - return - } - case message.MessageTypeCommandComplete: - msg := message.CommandCompleteMessage{} - msg.Decode(body) - - if err := ww.Close(); err != nil { - _ = ycl.ReplyError(err, "failed to close connection") - return - } - - ylogger.Zero.Debug().Msg("closing msg writer") - return - } - } - }() - - err := s.PutFileToDest(msg.Name, r) - - wg.Wait() - - if err != nil { - _ = ycl.ReplyError(err, "failed to upload") - - return nil + if err := ProcessPutExtended(s, pr, msg.Name, msg.Encrypt, nil, cr, ycl); err != nil { + return err } - _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()) + case message.MessageTypePutV2: - if err != nil { - _ = ycl.ReplyError(err, "failed to upload") + msg := message.PutMessageV2{} + msg.Decode(body) - return nil + if err := ProcessPutExtended(s, pr, msg.Name, msg.Encrypt, msg.Settings, cr, ycl); err != nil { + return err } case message.MessageTypeList: @@ -226,7 +249,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl return nil } - var failed []*storage.ObjectInfo + var failed []*object.ObjectInfo retryCount := 0 for len(objectMetas) > 0 && retryCount < 10 { retryCount++ @@ -289,7 +312,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl }() //write file - err = s.PutFileToDest(path, readerEncrypt) + err = s.PutFileToDest(path, readerEncrypt, nil) if err != nil { ylogger.Zero.Error().Err(err).Msg("failed to upload file") failed = append(failed, objectMetas[i]) @@ -298,7 +321,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl } objectMetas = failed fmt.Printf("failed files count: %d\n", len(objectMetas)) - failed = make([]*storage.ObjectInfo, 0) + failed = make([]*object.ObjectInfo, 0) } if len(objectMetas) > 0 { diff --git a/pkg/storage/filestorage.go b/pkg/storage/filestorage.go index 4b4d970..d066220 100644 --- a/pkg/storage/filestorage.go +++ b/pkg/storage/filestorage.go @@ -9,6 +9,8 @@ import ( "path/filepath" "github.com/yezzey-gp/yproxy/config" + "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/object" ) // Storage prefix uses as path to folder. @@ -26,8 +28,8 @@ func (s *FileStorageInteractor) CatFileFromStorage(name string, offset int64) (i _, err = io.CopyN(io.Discard, file, offset) return file, err } -func (s *FileStorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { - var data []*ObjectInfo +func (s *FileStorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, error) { + var data []*object.ObjectInfo err := filepath.WalkDir(s.cnf.StoragePrefix+prefix, func(path string, d fs.DirEntry, err error) error { if err != nil { return err @@ -43,13 +45,13 @@ func (s *FileStorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { if err != nil { return err } - data = append(data, &ObjectInfo{fileinfo.Name(), fileinfo.Size()}) + data = append(data, &object.ObjectInfo{Path: fileinfo.Name(), Size: fileinfo.Size()}) return nil }) return data, err } -func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader) error { +func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader, _ []message.PutSetting) error { fPath := path.Join(s.cnf.StoragePrefix, name) fDir := path.Dir(fPath) os.MkdirAll(fDir, 0700) diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index bbf6e2b..4f078d4 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -11,6 +11,8 @@ import ( "github.com/yezzey-gp/aws-sdk-go/service/s3" "github.com/yezzey-gp/aws-sdk-go/service/s3/s3manager" "github.com/yezzey-gp/yproxy/config" + "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -44,7 +46,7 @@ func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io. return object.Body, err } -func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader) error { +func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error { sess, err := s.pool.GetSession(context.TODO()) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -94,12 +96,7 @@ func (s *S3StorageInteractor) PatchFile(name string, r io.ReadSeeker, startOffse return err } -type ObjectInfo struct { - Path string - Size int64 -} - -func (s *S3StorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { +func (s *S3StorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, error) { sess, err := s.pool.GetSession(context.TODO()) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -108,7 +105,7 @@ func (s *S3StorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { var continuationToken *string prefix = path.Join(s.cnf.StoragePrefix, prefix) - metas := make([]*ObjectInfo, 0) + metas := make([]*object.ObjectInfo, 0) for { input := &s3.ListObjectsV2Input{ @@ -123,7 +120,7 @@ func (s *S3StorageInteractor) ListPath(prefix string) ([]*ObjectInfo, error) { } for _, obj := range out.Contents { - metas = append(metas, &ObjectInfo{ + metas = append(metas, &object.ObjectInfo{ Path: *obj.Key, Size: *obj.Size, }) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 6f02559..d5e3e98 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -5,6 +5,8 @@ import ( "io" "github.com/yezzey-gp/yproxy/config" + "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/object" ) type StorageReader interface { @@ -12,12 +14,12 @@ type StorageReader interface { } type StorageWriter interface { - PutFileToDest(name string, r io.Reader) error + PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error PatchFile(name string, r io.ReadSeeker, startOffset int64) error } type StorageLister interface { - ListPath(prefix string) ([]*ObjectInfo, error) + ListPath(prefix string) ([]*object.ObjectInfo, error) } type StorageMover interface { From 0216e8864cd092aa7c5c712963b257e96a09fa7c Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Sat, 24 Aug 2024 08:20:36 +0000 Subject: [PATCH 2/6] Fix for go sum --- go.mod | 1 + go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/go.mod b/go.mod index 02215d7..163b15a 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( ) require ( + go.uber.org/mock v0.4.0 // indirect golang.org/x/crypto v0.17.0 // indirect golang.org/x/sys v0.15.0 // indirect ) diff --git a/go.sum b/go.sum index ca60523..fa8e80d 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/yezzey-gp/aws-sdk-go v0.1.0 h1:as6ANEva14gKdhWPjZy6qaGR+/WhP0HN4UMzDH github.com/yezzey-gp/aws-sdk-go v0.1.0/go.mod h1:+gUq+WgyFOP6Eto+AcgJDDeM6tnXmOT4RJmfpuafV3Y= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= From 5d996593d9fe9d7337fa371043c1cfdf0d34437e Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Sat, 24 Aug 2024 08:22:16 +0000 Subject: [PATCH 3/6] Fix unit --- pkg/proc/delete_handler_test.go | 2 +- pkg/proc/yrreader_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/proc/delete_handler_test.go b/pkg/proc/delete_handler_test.go index c0fabd2..9f44ede 100644 --- a/pkg/proc/delete_handler_test.go +++ b/pkg/proc/delete_handler_test.go @@ -3,12 +3,12 @@ package proc_test import ( "testing" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/yezzey-gp/yproxy/pkg/message" mock "github.com/yezzey-gp/yproxy/pkg/mock" "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/proc" + "go.uber.org/mock/gomock" ) func TestReworkingName(t *testing.T) { diff --git a/pkg/proc/yrreader_test.go b/pkg/proc/yrreader_test.go index edd0a63..920542f 100644 --- a/pkg/proc/yrreader_test.go +++ b/pkg/proc/yrreader_test.go @@ -5,10 +5,10 @@ import ( "io" "testing" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" mock "github.com/yezzey-gp/yproxy/pkg/mock/proc" "github.com/yezzey-gp/yproxy/pkg/proc" + "go.uber.org/mock/gomock" ) func TestYproxyRetryReaderEmpty(t *testing.T) { From 2837dd93f0c24a1aebf33ff52f5d5c27cbc2d3db Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Sat, 24 Aug 2024 08:36:18 +0000 Subject: [PATCH 4/6] Add put TableSpace and StorageClass settings --- config/storage.go | 2 ++ pkg/storage/s3storage.go | 18 ++++++++++-- pkg/storage/storage.go | 14 +++++++-- pkg/storage/utils.go | 14 +++++++++ pkg/storage/utils_test.go | 57 ++++++++++++++++++++++++++++++++++++ pkg/tablespace/tablespace.go | 3 ++ 6 files changed, 103 insertions(+), 5 deletions(-) create mode 100644 pkg/storage/utils.go create mode 100644 pkg/storage/utils_test.go create mode 100644 pkg/tablespace/tablespace.go diff --git a/config/storage.go b/config/storage.go index 0b54712..ae9bd04 100644 --- a/config/storage.go +++ b/config/storage.go @@ -12,6 +12,8 @@ type Storage struct { StoragePrefix string `json:"storage_prefix" toml:"storage_prefix" yaml:"storage_prefix"` StorageBucket string `json:"storage_bucket" toml:"storage_bucket" yaml:"storage_bucket"` + TablespaceMap map[string]string `json:"tablespace_map" toml:"tablespace_map" yaml:"tablespace_map"` + // how many concurrrent connection acquire allowed StorageConcurrency int64 `json:"storage_concurrency" toml:"storage_concurrency" yaml:"storage_concurrency"` diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 4f078d4..0415312 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -13,6 +13,7 @@ import ( "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" + "github.com/yezzey-gp/yproxy/pkg/tablespace" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -22,6 +23,8 @@ type S3StorageInteractor struct { pool SessionPool cnf *config.Storage + + bucketMap map[string]string } func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io.ReadCloser, error) { @@ -50,7 +53,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ sess, err := s.pool.GetSession(context.TODO()) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") - return nil + return err } objectPath := path.Join(s.cnf.StoragePrefix, name) @@ -60,12 +63,21 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ uploader.Concurrency = 1 }) + storageClass := ResolveStorageSetting(settings, "StorageClass", "STANDARD") + tableSpace := ResolveStorageSetting(settings, "Tablespace", tablespace.DefaultTableSpace) + + bucket, ok := s.bucketMap[tableSpace] + if !ok { + ylogger.Zero.Err(err).Msg(fmt.Sprintf("failed to match tablespace %s to s3 bucket.", tableSpace)) + return err + } + _, err = up.Upload( &s3manager.UploadInput{ - Bucket: aws.String(s.cnf.StorageBucket), + Bucket: aws.String(bucket), Key: aws.String(objectPath), Body: r, - StorageClass: aws.String("STANDARD"), + StorageClass: aws.String(storageClass), }, ) diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index d5e3e98..c010d08 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -7,6 +7,7 @@ import ( "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" + "github.com/yezzey-gp/yproxy/pkg/tablespace" ) type StorageReader interface { @@ -43,10 +44,19 @@ func NewStorage(cnf *config.Storage) (StorageInteractor, error) { }, nil case "s3": return &S3StorageInteractor{ - pool: NewSessionPool(cnf), - cnf: cnf, + pool: NewSessionPool(cnf), + cnf: cnf, + bucketMap: buildBucketMapFromCnf(cnf), }, nil default: return nil, fmt.Errorf("wrong storage type " + cnf.StorageType) } } + +func buildBucketMapFromCnf(cnf *config.Storage) map[string]string { + mp := cnf.TablespaceMap + if _, ok := mp[tablespace.DefaultTableSpace]; !ok { + mp[tablespace.DefaultTableSpace] = cnf.StorageBucket + } + return mp +} diff --git a/pkg/storage/utils.go b/pkg/storage/utils.go new file mode 100644 index 0000000..7ea8943 --- /dev/null +++ b/pkg/storage/utils.go @@ -0,0 +1,14 @@ +package storage + +import "github.com/yezzey-gp/yproxy/pkg/message" + +func ResolveStorageSetting(settings []message.PutSetting, name, defaultVal string) string { + + for _, s := range settings { + if s.Name == name { + return s.Value + } + } + + return defaultVal +} diff --git a/pkg/storage/utils_test.go b/pkg/storage/utils_test.go new file mode 100644 index 0000000..9e28667 --- /dev/null +++ b/pkg/storage/utils_test.go @@ -0,0 +1,57 @@ +package storage_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/storage" +) + +func TestResolveSettings(t *testing.T) { + + assert := assert.New(t) + + type tcase struct { + name string + defaultV string + exp string + settings []message.PutSetting + } + + for _, tt := range []tcase{ + { + "ababa", + "aboba", + "aboba", + nil, + }, + { + "ababa", + "aboba", + "aboba", + []message.PutSetting{ + { + Name: "djewikdeowp", + Value: "jdoiwejoidew", + }, + }, + }, + + { + "ababa", + "aboba", + "valval", + []message.PutSetting{ + { + Name: "ababa", + Value: "valval", + }, + }, + }, + } { + + assert.Equal(tt.exp, storage.ResolveStorageSetting(tt.settings, tt.name, tt.defaultV)) + } +} diff --git a/pkg/tablespace/tablespace.go b/pkg/tablespace/tablespace.go new file mode 100644 index 0000000..4fe80c2 --- /dev/null +++ b/pkg/tablespace/tablespace.go @@ -0,0 +1,3 @@ +package tablespace + +const DefaultTableSpace = "BASE" From 546d37fb8d5b810beb6df25b5991ebee93875a4f Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Sat, 24 Aug 2024 09:37:36 +0000 Subject: [PATCH 5/6] Client-side support to storage class and tablespace settings --- cmd/client/main.go | 42 ++++++++++++++++++++++++++--------- pkg/message/message_test.go | 4 ++-- pkg/message/put_message_v2.go | 11 +++++---- pkg/mock/storage.go | 4 ++-- pkg/proc/interaction.go | 8 +++++-- pkg/storage/filestorage.go | 2 +- pkg/storage/s3storage.go | 6 ++--- pkg/storage/storage.go | 6 ++++- pkg/storage/utils.go | 2 +- pkg/storage/utils_test.go | 6 ++--- 10 files changed, 61 insertions(+), 30 deletions(-) diff --git a/cmd/client/main.go b/cmd/client/main.go index 9f1ed15..afd3845 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -12,19 +12,28 @@ import ( "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" "github.com/yezzey-gp/yproxy/pkg/proc" + "github.com/yezzey-gp/yproxy/pkg/tablespace" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) -var cfgPath string -var oldCfgPath string -var logLevel string -var decrypt bool -var encrypt bool -var offset uint64 -var segmentPort int -var segmentNum int -var confirm bool -var garbage bool +var ( + cfgPath string + oldCfgPath string + logLevel string + + decrypt bool + /* Put command flags */ + encrypt bool + storageClass string + tableSpace string + + offset uint64 + + segmentPort int + segmentNum int + confirm bool + garbage bool +) // TODOV func Runner(f func(net.Conn, *config.Instance, []string) error) func(*cobra.Command, []string) error { @@ -96,7 +105,16 @@ func putFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { ycl := client.NewYClient(con) r := proc.NewProtoReader(ycl) - msg := message.NewPutMessage(args[0], encrypt).Encode() + msg := message.NewPutMessageV2(args[0], encrypt, []message.PutSettings{ + { + Name: message.StorageClassSetting, + Value: storageClass, + }, + { + Name: message.TableSpaceSetting, + Value: tableSpace, + }, + }).Encode() _, err := con.Write(msg) if err != nil { return err @@ -341,6 +359,8 @@ func init() { rootCmd.AddCommand(copyCmd) putCmd.PersistentFlags().BoolVarP(&encrypt, "encrypt", "e", false, "encrypt external object before put") + putCmd.PersistentFlags().StringVarP(&storageClass, "storage-class", "s", "STANDARD", "storage class for message upload") + putCmd.PersistentFlags().StringVarP(&tableSpace, "tablespace", "t", tablespace.DefaultTableSpace, "storage class for message upload") rootCmd.AddCommand(putCmd) rootCmd.AddCommand(listCmd) diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 98f7b8f..287972e 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -81,7 +81,7 @@ func TestPutV2Msg(t *testing.T) { name string encrypt bool err error - settings []message.PutSetting + settings []message.PutSettings } for _, tt := range []tcase{ @@ -89,7 +89,7 @@ func TestPutV2Msg(t *testing.T) { "nam1", true, nil, - []message.PutSetting{ + []message.PutSettings{ { Name: "a", Value: "b", diff --git a/pkg/message/put_message_v2.go b/pkg/message/put_message_v2.go index 2554edd..1bc4186 100644 --- a/pkg/message/put_message_v2.go +++ b/pkg/message/put_message_v2.go @@ -5,7 +5,10 @@ import ( "encoding/binary" ) -type PutSetting struct { +const StorageClassSetting = "StorageClass" +const TableSpaceSetting = "TableSpace" + +type PutSettings struct { Name string Value string } @@ -14,12 +17,12 @@ type PutMessageV2 struct { Encrypt bool Name string - Settings []PutSetting + Settings []PutSettings } var _ ProtoMessage = &PutMessageV2{} -func NewPutMessageV2(name string, encrypt bool, settings []PutSetting) *PutMessageV2 { +func NewPutMessageV2(name string, encrypt bool, settings []PutSettings) *PutMessageV2 { return &PutMessageV2{ Name: name, Encrypt: encrypt, @@ -90,7 +93,7 @@ func (c *PutMessageV2) Decode(body []byte) { totalOff := 4 + off + 8 - c.Settings = make([]PutSetting, settLen) + c.Settings = make([]PutSettings, settLen) for i := 0; i < int(settLen); i++ { diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go index 3dbe9cc..8048012 100644 --- a/pkg/mock/storage.go +++ b/pkg/mock/storage.go @@ -94,7 +94,7 @@ func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffset any) *go } // PutFileToDest mocks base method. -func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error { +func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PutFileToDest", name, r, settings) ret0, _ := ret[0].(error) @@ -292,7 +292,7 @@ func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffset any) } // PutFileToDest mocks base method. -func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error { +func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PutFileToDest", name, r, settings) ret0, _ := ret[0].(error) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index a37b3d9..fe538f9 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -21,7 +21,7 @@ func ProcessPutExtended( s storage.StorageInteractor, pr *ProtoReader, name string, - encrypt bool, settings []message.PutSetting, cr crypt.Crypter, ycl client.YproxyClient) error { + encrypt bool, settings []message.PutSettings, cr crypt.Crypter, ycl client.YproxyClient) error { ycl.SetExternalFilePath(name) @@ -51,7 +51,11 @@ func ProcessPutExtended( return } } else { - ylogger.Zero.Debug().Str("path", name).Msg("omit encryption for chunk") + ylogger.Zero.Debug().Str("path", name).Msg("omit encryption for upload chunks") + } + + for _, set := range settings { + ylogger.Zero.Debug().Str("setting name", set.Name).Str("value", set.Value).Msg("setting for chunk") } defer w.Close() diff --git a/pkg/storage/filestorage.go b/pkg/storage/filestorage.go index d066220..3e1da2d 100644 --- a/pkg/storage/filestorage.go +++ b/pkg/storage/filestorage.go @@ -51,7 +51,7 @@ func (s *FileStorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, e return data, err } -func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader, _ []message.PutSetting) error { +func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader, _ []message.PutSettings) error { fPath := path.Join(s.cnf.StoragePrefix, name) fDir := path.Dir(fPath) os.MkdirAll(fDir, 0700) diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 0415312..a644201 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -49,7 +49,7 @@ func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io. return object.Body, err } -func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error { +func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error { sess, err := s.pool.GetSession(context.TODO()) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") @@ -63,8 +63,8 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ uploader.Concurrency = 1 }) - storageClass := ResolveStorageSetting(settings, "StorageClass", "STANDARD") - tableSpace := ResolveStorageSetting(settings, "Tablespace", tablespace.DefaultTableSpace) + storageClass := ResolveStorageSetting(settings, message.StorageClassSetting, "STANDARD") + tableSpace := ResolveStorageSetting(settings, message.TableSpaceSetting, tablespace.DefaultTableSpace) bucket, ok := s.bucketMap[tableSpace] if !ok { diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index c010d08..86270b4 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -15,7 +15,7 @@ type StorageReader interface { } type StorageWriter interface { - PutFileToDest(name string, r io.Reader, settings []message.PutSetting) error + PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error PatchFile(name string, r io.ReadSeeker, startOffset int64) error } @@ -55,6 +55,10 @@ func NewStorage(cnf *config.Storage) (StorageInteractor, error) { func buildBucketMapFromCnf(cnf *config.Storage) map[string]string { mp := cnf.TablespaceMap + if mp == nil { + /* fallback for backward-compatibilty if to TableSpace map configured */ + mp = map[string]string{} + } if _, ok := mp[tablespace.DefaultTableSpace]; !ok { mp[tablespace.DefaultTableSpace] = cnf.StorageBucket } diff --git a/pkg/storage/utils.go b/pkg/storage/utils.go index 7ea8943..80dd1b8 100644 --- a/pkg/storage/utils.go +++ b/pkg/storage/utils.go @@ -2,7 +2,7 @@ package storage import "github.com/yezzey-gp/yproxy/pkg/message" -func ResolveStorageSetting(settings []message.PutSetting, name, defaultVal string) string { +func ResolveStorageSetting(settings []message.PutSettings, name, defaultVal string) string { for _, s := range settings { if s.Name == name { diff --git a/pkg/storage/utils_test.go b/pkg/storage/utils_test.go index 9e28667..2e67cc2 100644 --- a/pkg/storage/utils_test.go +++ b/pkg/storage/utils_test.go @@ -17,7 +17,7 @@ func TestResolveSettings(t *testing.T) { name string defaultV string exp string - settings []message.PutSetting + settings []message.PutSettings } for _, tt := range []tcase{ @@ -31,7 +31,7 @@ func TestResolveSettings(t *testing.T) { "ababa", "aboba", "aboba", - []message.PutSetting{ + []message.PutSettings{ { Name: "djewikdeowp", Value: "jdoiwejoidew", @@ -43,7 +43,7 @@ func TestResolveSettings(t *testing.T) { "ababa", "aboba", "valval", - []message.PutSetting{ + []message.PutSettings{ { Name: "ababa", Value: "valval", From 879a50ac899dae04df3614a98d8aa5266c2219ee Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Sat, 24 Aug 2024 10:14:00 +0000 Subject: [PATCH 6/6] Fix error handling --- pkg/proc/interaction.go | 38 +++++++++++++++++--------------------- pkg/storage/s3storage.go | 3 ++- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index fe538f9..adb6968 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -26,13 +26,24 @@ func ProcessPutExtended( ycl.SetExternalFilePath(name) var w io.WriteCloser - r, w := io.Pipe() + err := s.PutFileToDest(name, r, settings) + + if err != nil { + _ = ycl.ReplyError(err, "failed to upload") + + return err + } + + defer r.Close() + defer w.Close() + wg := sync.WaitGroup{} wg.Add(1) go func() { + defer wg.Done() var ww io.WriteCloser = w if encrypt { @@ -54,13 +65,6 @@ func ProcessPutExtended( ylogger.Zero.Debug().Str("path", name).Msg("omit encryption for upload chunks") } - for _, set := range settings { - ylogger.Zero.Debug().Str("setting name", set.Name).Str("value", set.Value).Msg("setting for chunk") - } - - defer w.Close() - defer wg.Done() - for { tp, body, err := pr.ReadPacket() if err != nil { @@ -99,16 +103,8 @@ func ProcessPutExtended( } }() - err := s.PutFileToDest(name, r, settings) - wg.Wait() - if err != nil { - _ = ycl.ReplyError(err, "failed to upload") - - return err - } - _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()) if err != nil { @@ -224,7 +220,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl if err != nil { _ = ycl.ReplyError(err, "failed to upload") - return nil + return err } case message.MessageTypeCopy: @@ -340,7 +336,7 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { _ = ycl.ReplyError(err, "failed to upload") - return nil + return err } fmt.Println("Copy finished successfully") ylogger.Zero.Info().Msg("Copy finished successfully") @@ -366,19 +362,19 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl err = dh.HandleDeleteGarbage(msg) if err != nil { _ = ycl.ReplyError(err, "failed to finish operation") - return nil + return err } } else { err = dh.HandleDeleteFile(msg) if err != nil { _ = ycl.ReplyError(err, "failed to finish operation") - return nil + return err } } if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { _ = ycl.ReplyError(err, "failed to upload") - return nil + return err } ylogger.Zero.Info().Msg("Deleted garbage successfully") if !msg.Confirm { diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index a644201..38d3196 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -68,7 +68,8 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ bucket, ok := s.bucketMap[tableSpace] if !ok { - ylogger.Zero.Err(err).Msg(fmt.Sprintf("failed to match tablespace %s to s3 bucket.", tableSpace)) + err := fmt.Errorf("failed to match tablespace %s to s3 bucket.", tableSpace) + ylogger.Zero.Err(err) return err }