Skip to content

Commit

Permalink
only copy files that exists in virtual index (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
diPhantxm authored Oct 18, 2024
1 parent 5905cda commit a0c5a2c
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 4 deletions.
3 changes: 2 additions & 1 deletion cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func catFunc(con net.Conn, instanceCnf *config.Instance, args []string) error {
func copyFunc(con net.Conn, instanceCnf *config.Instance, args []string) error {
ylogger.Zero.Info().Msg("Execute copy command")
ylogger.Zero.Info().Str("name", args[0]).Msg("copy")
msg := message.NewCopyMessage(args[0], oldCfgPath, encrypt, decrypt).Encode()
msg := message.NewCopyMessage(args[0], oldCfgPath, encrypt, decrypt, segmentPort).Encode()
_, err := con.Write(msg)
if err != nil {
return err
Expand Down Expand Up @@ -343,6 +343,7 @@ func init() {
copyCmd.PersistentFlags().BoolVarP(&decrypt, "decrypt", "d", false, "decrypt external object or not")
copyCmd.PersistentFlags().BoolVarP(&encrypt, "encrypt", "e", false, "encrypt external object before put")
copyCmd.PersistentFlags().StringVarP(&oldCfgPath, "old-config", "", "/etc/yproxy/yproxy.yaml", "path to old yproxy config file")
copyCmd.PersistentFlags().Uint64VarP(&segmentPort, "port", "p", 6000, "port that segment is listening on")
rootCmd.AddCommand(copyCmd)

putCmd.PersistentFlags().BoolVarP(&encrypt, "encrypt", "e", false, "encrypt external object before put")
Expand Down
9 changes: 8 additions & 1 deletion pkg/message/copy_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,18 @@ type CopyMessage struct {
Encrypt bool
Name string
OldCfgPath string
Port uint64
}

var _ ProtoMessage = &CopyMessage{}

func NewCopyMessage(name, oldCfgPath string, encrypt, decrypt bool) *CopyMessage {
func NewCopyMessage(name, oldCfgPath string, encrypt, decrypt bool, port uint64) *CopyMessage {
return &CopyMessage{
Name: name,
Encrypt: encrypt,
Decrypt: decrypt,
OldCfgPath: oldCfgPath,
Port: port,
}
}

Expand Down Expand Up @@ -52,6 +54,10 @@ func (message *CopyMessage) Encode() []byte {
encodedMessage = append(encodedMessage, byteLen...)
encodedMessage = append(encodedMessage, byteOldCfg...)

port := make([]byte, 8)
binary.BigEndian.PutUint64(port, uint64(message.Port))
encodedMessage = append(encodedMessage, port...)

binary.BigEndian.PutUint64(byteLen, uint64(len(encodedMessage)+8))
fmt.Printf("send: %v\n", MessageType(encodedMessage[0]))
ylogger.Zero.Debug().Str("object-path", MessageType(encodedMessage[0]).String()).Msg("decrypt object")
Expand All @@ -70,4 +76,5 @@ func (encodedMessage *CopyMessage) Decode(data []byte) {
encodedMessage.Name = string(data[12 : 12+nameLen])
oldConfLen := binary.BigEndian.Uint64(data[12+nameLen : 12+nameLen+8])
encodedMessage.OldCfgPath = string(data[12+nameLen+8 : 12+nameLen+8+oldConfLen])
encodedMessage.Port = binary.BigEndian.Uint64(data[12+nameLen+8+oldConfLen : 12+nameLen+8+oldConfLen+8])
}
3 changes: 2 additions & 1 deletion pkg/message/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestListMsg(t *testing.T) {
func TestCopyMsg(t *testing.T) {
assert := assert.New(t)

msg := message.NewCopyMessage("myname/mynextname", "myoldcfg/path", true, true)
msg := message.NewCopyMessage("myname/mynextname", "myoldcfg/path", true, true, 5432)
body := msg.Encode()

assert.Equal(body[8], byte(message.MessageTypeCopy))
Expand All @@ -283,6 +283,7 @@ func TestCopyMsg(t *testing.T) {
assert.Equal("myoldcfg/path", msg2.OldCfgPath)
assert.True(msg2.Decrypt)
assert.True(msg2.Encrypt)
assert.Equal(uint64(5432), msg2.Port)
}

func TestDeleteMsg(t *testing.T) {
Expand Down
11 changes: 11 additions & 0 deletions pkg/proc/interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,23 @@ func ProcessCopyExtended(msg message.CopyMessage, s storage.StorageInteractor, c
return nil
}

dbInterractor := &database.DatabaseHandler{}
vi, _, err := dbInterractor.GetVirtualExpireIndexes(msg.Port)
if err != nil {
return err
}

var failed []*object.ObjectInfo
retryCount := 0
for len(objectMetas) > 0 && retryCount < 10 {
retryCount++
for i := 0; i < len(objectMetas); i++ {
path := strings.TrimPrefix(objectMetas[i].Path, instanceCnf.StorageCnf.StoragePrefix)
reworked := ReworkFileName(path)
if _, ok := vi[reworked]; !ok {
continue
}

//get reader
readerFromOldBucket := NewYRetryReader(NewRestartReader(oldStorage, path, nil))
var fromReader io.Reader
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/s3storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [

storageClass := ResolveStorageSetting(settings, message.StorageClassSetting, "STANDARD")
tableSpace := ResolveStorageSetting(settings, message.TableSpaceSetting, tablespace.DefaultTableSpace)
multipartChunksizeStr := ResolveStorageSetting(settings, message.MultipartChunksize, "")
multipartChunksizeStr := ResolveStorageSetting(settings, message.MultipartChunksize, "16777216")
multipartChunksize, err := strconv.ParseInt(multipartChunksizeStr, 10, 64)
if err != nil {
return err
Expand Down

0 comments on commit a0c5a2c

Please sign in to comment.