Skip to content

Commit

Permalink
Support cat msg v2
Browse files Browse the repository at this point in the history
  • Loading branch information
reshke committed Aug 26, 2024
1 parent 183dd69 commit a310d95
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 44 deletions.
97 changes: 97 additions & 0 deletions pkg/message/cat_message_v2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package message

import (
"encoding/binary"

"github.com/yezzey-gp/yproxy/pkg/settings"
)

type CatMessageV2 struct {
Decrypt bool
Name string
StartOffset uint64

Settings []settings.StorageSettings
}

var _ ProtoMessage = &CatMessage{}

func NewCatMessageV2(name string, decrypt bool, StartOffset uint64) *CatMessageV2 {
return &CatMessageV2{
Name: name,
Decrypt: decrypt,
StartOffset: StartOffset,
}
}

func (c *CatMessageV2) Encode() []byte {
bt := []byte{
byte(MessageTypeCat),
0,
0,
0,
}

if c.Decrypt {
bt[1] = byte(DecryptMessage)
} else {
bt[1] = byte(NoDecryptMessage)
}

if c.StartOffset != 0 {
bt[2] = byte(ExtendedMesssage)
}

bt = append(bt, []byte(c.Name)...)
bt = append(bt, 0)
if c.StartOffset != 0 {
bt = binary.BigEndian.AppendUint64(bt, c.StartOffset)
}

slen := make([]byte, 8)
binary.BigEndian.PutUint64(slen, uint64(len(c.Settings)))
bt = append(bt, slen...)

for _, s := range c.Settings {

bt = append(bt, []byte(s.Name)...)
bt = append(bt, 0)

bt = append(bt, []byte(s.Value)...)
bt = append(bt, 0)
}

ln := len(bt) + 8

bs := make([]byte, 8)
binary.BigEndian.PutUint64(bs, uint64(ln))
return append(bs, bt...)
}

func (c *CatMessageV2) Decode(body []byte) {
var off uint64
c.Name, off = GetCstring(body[4:])
if body[1] == byte(DecryptMessage) {
c.Decrypt = true
}
if body[2] == byte(ExtendedMesssage) {
c.StartOffset = binary.BigEndian.Uint64(body[4+len(c.Name)+1:])
}

settLen := binary.BigEndian.Uint64(body[4+off : 4+off+8])

totalOff := 4 + off + 8

c.Settings = make([]settings.StorageSettings, settLen)

for i := 0; i < int(settLen); i++ {

var currOff uint64

c.Settings[i].Name, currOff = GetCstring(body[totalOff:])
totalOff += currOff

c.Settings[i].Value, currOff = GetCstring(body[totalOff:])
totalOff += currOff
}
}
64 changes: 62 additions & 2 deletions pkg/message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/settings"
)

func TestCatMsg(t *testing.T) {
Expand Down Expand Up @@ -81,15 +82,15 @@ func TestPutV2Msg(t *testing.T) {
name string
encrypt bool
err error
settings []message.PutSettings
settings []settings.StorageSettings
}

for _, tt := range []tcase{
{
"nam1",
true,
nil,
[]message.PutSettings{
[]settings.StorageSettings{
{
Name: "a",
Value: "b",
Expand All @@ -114,6 +115,65 @@ func TestPutV2Msg(t *testing.T) {
}
}

func TestCatMsgV2(t *testing.T) {
assert := assert.New(t)

type tcase struct {
name string
decrypt bool
off uint64

settings []settings.StorageSettings
err error
}

for _, tt := range []tcase{
{
"nam1",
true,
0,
[]settings.StorageSettings{
{
Name: "a",
Value: "b",
},
{
Name: "cdsdsd",
Value: "ds",
},
},
nil,
},
{
"nam1",
true,
10,
[]settings.StorageSettings{
{
Name: "a",
Value: "b",
},
{
Name: "cdsdsd",
Value: "ds",
},
},
nil,
},
} {

msg := message.NewCatMessage(tt.name, tt.decrypt, tt.off)
body := msg.Encode()

msg2 := message.CatMessage{}

msg2.Decode(body[8:])

assert.Equal(msg.Name, msg2.Name)
assert.Equal(msg.Decrypt, msg2.Decrypt)
}
}

func TestPatchMsg(t *testing.T) {
assert := assert.New(t)

Expand Down
35 changes: 8 additions & 27 deletions pkg/message/put_message_v2.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
package message

import (
"bytes"
"encoding/binary"

"github.com/yezzey-gp/yproxy/pkg/settings"
)

const StorageClassSetting = "StorageClass"
const TableSpaceSetting = "TableSpace"

type PutSettings struct {
Name string
Value string
}

type PutMessageV2 struct {
Encrypt bool
Name string

Settings []PutSettings
Settings []settings.StorageSettings
}

var _ ProtoMessage = &PutMessageV2{}

func NewPutMessageV2(name string, encrypt bool, settings []PutSettings) *PutMessageV2 {
func NewPutMessageV2(name string, encrypt bool, settings []settings.StorageSettings) *PutMessageV2 {
return &PutMessageV2{
Name: name,
Encrypt: encrypt,
Expand Down Expand Up @@ -67,42 +63,27 @@ func (c *PutMessageV2) Encode() []byte {
return append(bs, bt...)
}

func (c *PutMessageV2) GetCstring(b []byte) (string, uint64) {
offset := uint64(0)
buff := bytes.NewBufferString("")

for i := 0; i < len(b); i++ {
offset++
if b[i] == 0 {
break
}
buff.WriteByte(b[i])
}

return buff.String(), offset
}

func (c *PutMessageV2) Decode(body []byte) {
if body[1] == byte(EncryptMessage) {
c.Encrypt = true
}
var off uint64
c.Name, off = c.GetCstring(body[4:])
c.Name, off = GetCstring(body[4:])

settLen := binary.BigEndian.Uint64(body[4+off : 4+off+8])

totalOff := 4 + off + 8

c.Settings = make([]PutSettings, settLen)
c.Settings = make([]settings.StorageSettings, settLen)

for i := 0; i < int(settLen); i++ {

var currOff uint64

c.Settings[i].Name, currOff = c.GetCstring(body[totalOff:])
c.Settings[i].Name, currOff = GetCstring(body[totalOff:])
totalOff += currOff

c.Settings[i].Value, currOff = c.GetCstring(body[totalOff:])
c.Settings[i].Value, currOff = GetCstring(body[totalOff:])
totalOff += currOff
}
}
18 changes: 18 additions & 0 deletions pkg/message/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package message

import "bytes"

func GetCstring(b []byte) (string, uint64) {
offset := uint64(0)
buff := bytes.NewBufferString("")

for i := 0; i < len(b); i++ {
offset++
if b[i] == 0 {
break
}
buff.WriteByte(b[i])
}

return buff.String(), offset
}
6 changes: 3 additions & 3 deletions pkg/mock/storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/yezzey-gp/yproxy/pkg/database"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/object"
"github.com/yezzey-gp/yproxy/pkg/settings"
"github.com/yezzey-gp/yproxy/pkg/storage"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
)
Expand All @@ -21,7 +22,7 @@ func ProcessPutExtended(
s storage.StorageInteractor,
pr *ProtoReader,
name string,
encrypt bool, settings []message.PutSettings, cr crypt.Crypter, ycl client.YproxyClient) error {
encrypt bool, settings []settings.StorageSettings, cr crypt.Crypter, ycl client.YproxyClient) error {

ycl.SetExternalFilePath(name)

Expand Down
6 changes: 6 additions & 0 deletions pkg/settings/settings.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package settings

type StorageSettings struct {
Name string
Value string
}
4 changes: 2 additions & 2 deletions pkg/storage/filestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"path/filepath"

"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/object"
"github.com/yezzey-gp/yproxy/pkg/settings"
)

// Storage prefix uses as path to folder.
Expand Down Expand Up @@ -51,7 +51,7 @@ func (s *FileStorageInteractor) ListPath(prefix string) ([]*object.ObjectInfo, e
return data, err
}

func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader, _ []message.PutSettings) error {
func (s *FileStorageInteractor) PutFileToDest(name string, r io.Reader, _ []settings.StorageSettings) error {
fPath := path.Join(s.cnf.StoragePrefix, name)
fDir := path.Dir(fPath)
os.MkdirAll(fDir, 0700)
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/s3storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/object"
"github.com/yezzey-gp/yproxy/pkg/settings"
"github.com/yezzey-gp/yproxy/pkg/tablespace"
"github.com/yezzey-gp/yproxy/pkg/ylogger"
)
Expand Down Expand Up @@ -49,7 +50,7 @@ func (s *S3StorageInteractor) CatFileFromStorage(name string, offset int64) (io.
return object.Body, err
}

func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error {
func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings []settings.StorageSettings) error {
sess, err := s.pool.GetSession(context.TODO())
if err != nil {
ylogger.Zero.Err(err).Msg("failed to acquire s3 session")
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"io"

"github.com/yezzey-gp/yproxy/config"
"github.com/yezzey-gp/yproxy/pkg/message"
"github.com/yezzey-gp/yproxy/pkg/object"
"github.com/yezzey-gp/yproxy/pkg/settings"
"github.com/yezzey-gp/yproxy/pkg/tablespace"
)

Expand All @@ -15,7 +15,7 @@ type StorageReader interface {
}

type StorageWriter interface {
PutFileToDest(name string, r io.Reader, settings []message.PutSettings) error
PutFileToDest(name string, r io.Reader, settings []settings.StorageSettings) error
PatchFile(name string, r io.ReadSeeker, startOffset int64) error
}

Expand Down
Loading

0 comments on commit a310d95

Please sign in to comment.