From a310d9545838529892563cbd9fa9533699d57e3e Mon Sep 17 00:00:00 2001 From: reshke kirill Date: Mon, 26 Aug 2024 12:21:04 +0000 Subject: [PATCH] Support cat msg v2 --- pkg/message/cat_message_v2.go | 97 +++++++++++++++++++++++++++++++++++ pkg/message/message_test.go | 64 ++++++++++++++++++++++- pkg/message/put_message_v2.go | 35 +++---------- pkg/message/utils.go | 18 +++++++ pkg/mock/storage.go | 6 +-- pkg/proc/interaction.go | 3 +- pkg/settings/settings.go | 6 +++ pkg/storage/filestorage.go | 4 +- pkg/storage/s3storage.go | 3 +- pkg/storage/storage.go | 4 +- pkg/storage/utils.go | 4 +- pkg/storage/utils_test.go | 8 +-- 12 files changed, 208 insertions(+), 44 deletions(-) create mode 100644 pkg/message/cat_message_v2.go create mode 100644 pkg/message/utils.go create mode 100644 pkg/settings/settings.go diff --git a/pkg/message/cat_message_v2.go b/pkg/message/cat_message_v2.go new file mode 100644 index 0000000..1018459 --- /dev/null +++ b/pkg/message/cat_message_v2.go @@ -0,0 +1,97 @@ +package message + +import ( + "encoding/binary" + + "github.com/yezzey-gp/yproxy/pkg/settings" +) + +type CatMessageV2 struct { + Decrypt bool + Name string + StartOffset uint64 + + Settings []settings.StorageSettings +} + +var _ ProtoMessage = &CatMessage{} + +func NewCatMessageV2(name string, decrypt bool, StartOffset uint64) *CatMessageV2 { + return &CatMessageV2{ + Name: name, + Decrypt: decrypt, + StartOffset: StartOffset, + } +} + +func (c *CatMessageV2) Encode() []byte { + bt := []byte{ + byte(MessageTypeCat), + 0, + 0, + 0, + } + + if c.Decrypt { + bt[1] = byte(DecryptMessage) + } else { + bt[1] = byte(NoDecryptMessage) + } + + if c.StartOffset != 0 { + bt[2] = byte(ExtendedMesssage) + } + + bt = append(bt, []byte(c.Name)...) + bt = append(bt, 0) + if c.StartOffset != 0 { + bt = binary.BigEndian.AppendUint64(bt, c.StartOffset) + } + + slen := make([]byte, 8) + binary.BigEndian.PutUint64(slen, uint64(len(c.Settings))) + bt = append(bt, slen...) + + for _, s := range c.Settings { + + bt = append(bt, []byte(s.Name)...) + bt = append(bt, 0) + + bt = append(bt, []byte(s.Value)...) + bt = append(bt, 0) + } + + ln := len(bt) + 8 + + bs := make([]byte, 8) + binary.BigEndian.PutUint64(bs, uint64(ln)) + return append(bs, bt...) +} + +func (c *CatMessageV2) Decode(body []byte) { + var off uint64 + c.Name, off = GetCstring(body[4:]) + if body[1] == byte(DecryptMessage) { + c.Decrypt = true + } + if body[2] == byte(ExtendedMesssage) { + c.StartOffset = binary.BigEndian.Uint64(body[4+len(c.Name)+1:]) + } + + settLen := binary.BigEndian.Uint64(body[4+off : 4+off+8]) + + totalOff := 4 + off + 8 + + c.Settings = make([]settings.StorageSettings, settLen) + + for i := 0; i < int(settLen); i++ { + + var currOff uint64 + + c.Settings[i].Name, currOff = GetCstring(body[totalOff:]) + totalOff += currOff + + c.Settings[i].Value, currOff = GetCstring(body[totalOff:]) + totalOff += currOff + } +} diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 287972e..35898e0 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/settings" ) func TestCatMsg(t *testing.T) { @@ -81,7 +82,7 @@ func TestPutV2Msg(t *testing.T) { name string encrypt bool err error - settings []message.PutSettings + settings []settings.StorageSettings } for _, tt := range []tcase{ @@ -89,7 +90,7 @@ func TestPutV2Msg(t *testing.T) { "nam1", true, nil, - []message.PutSettings{ + []settings.StorageSettings{ { Name: "a", Value: "b", @@ -114,6 +115,65 @@ func TestPutV2Msg(t *testing.T) { } } +func TestCatMsgV2(t *testing.T) { + assert := assert.New(t) + + type tcase struct { + name string + decrypt bool + off uint64 + + settings []settings.StorageSettings + err error + } + + for _, tt := range []tcase{ + { + "nam1", + true, + 0, + []settings.StorageSettings{ + { + Name: "a", + Value: "b", + }, + { + Name: "cdsdsd", + Value: "ds", + }, + }, + nil, + }, + { + "nam1", + true, + 10, + []settings.StorageSettings{ + { + Name: "a", + Value: "b", + }, + { + Name: "cdsdsd", + Value: "ds", + }, + }, + nil, + }, + } { + + msg := message.NewCatMessage(tt.name, tt.decrypt, tt.off) + body := msg.Encode() + + msg2 := message.CatMessage{} + + msg2.Decode(body[8:]) + + assert.Equal(msg.Name, msg2.Name) + assert.Equal(msg.Decrypt, msg2.Decrypt) + } +} + func TestPatchMsg(t *testing.T) { assert := assert.New(t) diff --git a/pkg/message/put_message_v2.go b/pkg/message/put_message_v2.go index 1bc4186..28fc247 100644 --- a/pkg/message/put_message_v2.go +++ b/pkg/message/put_message_v2.go @@ -1,28 +1,24 @@ package message import ( - "bytes" "encoding/binary" + + "github.com/yezzey-gp/yproxy/pkg/settings" ) const StorageClassSetting = "StorageClass" const TableSpaceSetting = "TableSpace" -type PutSettings struct { - Name string - Value string -} - type PutMessageV2 struct { Encrypt bool Name string - Settings []PutSettings + Settings []settings.StorageSettings } var _ ProtoMessage = &PutMessageV2{} -func NewPutMessageV2(name string, encrypt bool, settings []PutSettings) *PutMessageV2 { +func NewPutMessageV2(name string, encrypt bool, settings []settings.StorageSettings) *PutMessageV2 { return &PutMessageV2{ Name: name, Encrypt: encrypt, @@ -67,42 +63,27 @@ func (c *PutMessageV2) Encode() []byte { return append(bs, bt...) } -func (c *PutMessageV2) GetCstring(b []byte) (string, uint64) { - offset := uint64(0) - buff := bytes.NewBufferString("") - - for i := 0; i < len(b); i++ { - offset++ - if b[i] == 0 { - break - } - buff.WriteByte(b[i]) - } - - return buff.String(), offset -} - func (c *PutMessageV2) Decode(body []byte) { if body[1] == byte(EncryptMessage) { c.Encrypt = true } var off uint64 - c.Name, off = c.GetCstring(body[4:]) + c.Name, off = GetCstring(body[4:]) settLen := binary.BigEndian.Uint64(body[4+off : 4+off+8]) totalOff := 4 + off + 8 - c.Settings = make([]PutSettings, settLen) + c.Settings = make([]settings.StorageSettings, settLen) for i := 0; i < int(settLen); i++ { var currOff uint64 - c.Settings[i].Name, currOff = c.GetCstring(body[totalOff:]) + c.Settings[i].Name, currOff = GetCstring(body[totalOff:]) totalOff += currOff - c.Settings[i].Value, currOff = c.GetCstring(body[totalOff:]) + c.Settings[i].Value, currOff = GetCstring(body[totalOff:]) totalOff += currOff } } diff --git a/pkg/message/utils.go b/pkg/message/utils.go new file mode 100644 index 0000000..05431a5 --- /dev/null +++ b/pkg/message/utils.go @@ -0,0 +1,18 @@ +package message + +import "bytes" + +func GetCstring(b []byte) (string, uint64) { + offset := uint64(0) + buff := bytes.NewBufferString("") + + for i := 0; i < len(b); i++ { + offset++ + if b[i] == 0 { + break + } + buff.WriteByte(b[i]) + } + + return buff.String(), offset +} diff --git a/pkg/mock/storage.go b/pkg/mock/storage.go index 8048012..47902f3 100644 --- a/pkg/mock/storage.go +++ b/pkg/mock/storage.go @@ -13,8 +13,8 @@ import ( io "io" reflect "reflect" - message "github.com/yezzey-gp/yproxy/pkg/message" object "github.com/yezzey-gp/yproxy/pkg/object" + settings "github.com/yezzey-gp/yproxy/pkg/settings" gomock "go.uber.org/mock/gomock" ) @@ -94,7 +94,7 @@ func (mr *MockStorageWriterMockRecorder) PatchFile(name, r, startOffset any) *go } // PutFileToDest mocks base method. -func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error { +func (m *MockStorageWriter) PutFileToDest(name string, r io.Reader, settings []settings.StorageSettings) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PutFileToDest", name, r, settings) ret0, _ := ret[0].(error) @@ -292,7 +292,7 @@ func (mr *MockStorageInteractorMockRecorder) PatchFile(name, r, startOffset any) } // PutFileToDest mocks base method. -func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error { +func (m *MockStorageInteractor) PutFileToDest(name string, r io.Reader, settings []settings.StorageSettings) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "PutFileToDest", name, r, settings) ret0, _ := ret[0].(error) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 591ec49..a733054 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -13,6 +13,7 @@ import ( "github.com/yezzey-gp/yproxy/pkg/database" "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" + "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/storage" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -21,7 +22,7 @@ func ProcessPutExtended( s storage.StorageInteractor, pr *ProtoReader, name string, - encrypt bool, settings []message.PutSettings, cr crypt.Crypter, ycl client.YproxyClient) error { + encrypt bool, settings []settings.StorageSettings, cr crypt.Crypter, ycl client.YproxyClient) error { ycl.SetExternalFilePath(name) diff --git a/pkg/settings/settings.go b/pkg/settings/settings.go new file mode 100644 index 0000000..16b29da --- /dev/null +++ b/pkg/settings/settings.go @@ -0,0 +1,6 @@ +package settings + +type StorageSettings struct { + Name string + Value string +} diff --git a/pkg/storage/filestorage.go b/pkg/storage/filestorage.go index 3e1da2d..4f64436 100644 --- a/pkg/storage/filestorage.go +++ b/pkg/storage/filestorage.go @@ -9,8 +9,8 @@ import ( "path/filepath" "github.com/yezzey-gp/yproxy/config" - "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" + "github.com/yezzey-gp/yproxy/pkg/settings" ) // Storage prefix uses as path to folder. @@ -51,7 +51,7 @@ func (s *FileStorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, e return data, err } -func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader, _ []message.PutSettings) error { +func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader, _ []settings.StorageSettings) error { fPath := path.Join(s.cnf.StoragePrefix, name) fDir := path.Dir(fPath) os.MkdirAll(fDir, 0700) diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index 38d3196..3c472f1 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -13,6 +13,7 @@ import ( "github.com/yezzey-gp/yproxy/config" "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" + "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/tablespace" "github.com/yezzey-gp/yproxy/pkg/ylogger" ) @@ -49,7 +50,7 @@ func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io. return object.Body, err } -func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error { +func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings []settings.StorageSettings) error { sess, err := s.pool.GetSession(context.TODO()) if err != nil { ylogger.Zero.Err(err).Msg("failed to acquire s3 session") diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 86270b4..bb5cd0d 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -5,8 +5,8 @@ import ( "io" "github.com/yezzey-gp/yproxy/config" - "github.com/yezzey-gp/yproxy/pkg/message" "github.com/yezzey-gp/yproxy/pkg/object" + "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/tablespace" ) @@ -15,7 +15,7 @@ type StorageReader interface { } type StorageWriter interface { - PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error + PutFileToDest(name string, r io.Reader, settings []settings.StorageSettings) error PatchFile(name string, r io.ReadSeeker, startOffset int64) error } diff --git a/pkg/storage/utils.go b/pkg/storage/utils.go index 80dd1b8..961ac15 100644 --- a/pkg/storage/utils.go +++ b/pkg/storage/utils.go @@ -1,8 +1,8 @@ package storage -import "github.com/yezzey-gp/yproxy/pkg/message" +import "github.com/yezzey-gp/yproxy/pkg/settings" -func ResolveStorageSetting(settings []message.PutSettings, name, defaultVal string) string { +func ResolveStorageSetting(settings []settings.StorageSettings, name, defaultVal string) string { for _, s := range settings { if s.Name == name { diff --git a/pkg/storage/utils_test.go b/pkg/storage/utils_test.go index 2e67cc2..bca91ca 100644 --- a/pkg/storage/utils_test.go +++ b/pkg/storage/utils_test.go @@ -5,7 +5,7 @@ import ( "github.com/stretchr/testify/assert" - "github.com/yezzey-gp/yproxy/pkg/message" + "github.com/yezzey-gp/yproxy/pkg/settings" "github.com/yezzey-gp/yproxy/pkg/storage" ) @@ -17,7 +17,7 @@ func TestResolveSettings(t *testing.T) { name string defaultV string exp string - settings []message.PutSettings + settings []settings.StorageSettings } for _, tt := range []tcase{ @@ -31,7 +31,7 @@ func TestResolveSettings(t *testing.T) { "ababa", "aboba", "aboba", - []message.PutSettings{ + []settings.StorageSettings{ { Name: "djewikdeowp", Value: "jdoiwejoidew", @@ -43,7 +43,7 @@ func TestResolveSettings(t *testing.T) { "ababa", "aboba", "valval", - []message.PutSettings{ + []settings.StorageSettings{ { Name: "ababa", Value: "valval",