diff --git a/cmd/client/main.go b/cmd/client/main.go index 2015943..ae38a06 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -81,7 +81,7 @@ func catFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { func copyFunc(con net.Conn, instanceCnf *config.Instance, args []string) error { ylogger.Zero.Info().Msg("Execute copy command") ylogger.Zero.Info().Str("name", args[0]).Msg("copy") - msg := message.NewCopyMessage(args[0], oldCfgPath, encrypt, decrypt).Encode() + msg := message.NewCopyMessage(args[0], oldCfgPath, encrypt, decrypt, segmentPort).Encode() _, err := con.Write(msg) if err != nil { return err @@ -343,6 +343,7 @@ func init() { copyCmd.PersistentFlags().BoolVarP(&decrypt, "decrypt", "d", false, "decrypt external object or not") copyCmd.PersistentFlags().BoolVarP(&encrypt, "encrypt", "e", false, "encrypt external object before put") copyCmd.PersistentFlags().StringVarP(&oldCfgPath, "old-config", "", "/etc/yproxy/yproxy.yaml", "path to old yproxy config file") + copyCmd.PersistentFlags().Uint64VarP(&segmentPort, "port", "p", 6000, "port that segment is listening on") rootCmd.AddCommand(copyCmd) putCmd.PersistentFlags().BoolVarP(&encrypt, "encrypt", "e", false, "encrypt external object before put") diff --git a/pkg/message/copy_message.go b/pkg/message/copy_message.go index 3a1e1cc..4198f09 100644 --- a/pkg/message/copy_message.go +++ b/pkg/message/copy_message.go @@ -12,16 +12,18 @@ type CopyMessage struct { Encrypt bool Name string OldCfgPath string + Port uint64 } var _ ProtoMessage = &CopyMessage{} -func NewCopyMessage(name, oldCfgPath string, encrypt, decrypt bool) *CopyMessage { +func NewCopyMessage(name, oldCfgPath string, encrypt, decrypt bool, port uint64) *CopyMessage { return &CopyMessage{ Name: name, Encrypt: encrypt, Decrypt: decrypt, OldCfgPath: oldCfgPath, + Port: port, } } @@ -52,6 +54,10 @@ func (message *CopyMessage) Encode() []byte { encodedMessage = append(encodedMessage, byteLen...) encodedMessage = append(encodedMessage, byteOldCfg...) + port := make([]byte, 8) + binary.BigEndian.PutUint64(port, uint64(message.Port)) + encodedMessage = append(encodedMessage, port...) + binary.BigEndian.PutUint64(byteLen, uint64(len(encodedMessage)+8)) fmt.Printf("send: %v\n", MessageType(encodedMessage[0])) ylogger.Zero.Debug().Str("object-path", MessageType(encodedMessage[0]).String()).Msg("decrypt object") @@ -70,4 +76,5 @@ func (encodedMessage *CopyMessage) Decode(data []byte) { encodedMessage.Name = string(data[12 : 12+nameLen]) oldConfLen := binary.BigEndian.Uint64(data[12+nameLen : 12+nameLen+8]) encodedMessage.OldCfgPath = string(data[12+nameLen+8 : 12+nameLen+8+oldConfLen]) + encodedMessage.Port = binary.BigEndian.Uint64(data[12+nameLen+8+oldConfLen : 12+nameLen+8+oldConfLen+8]) } diff --git a/pkg/message/message_test.go b/pkg/message/message_test.go index 1461bfd..2ea1c85 100644 --- a/pkg/message/message_test.go +++ b/pkg/message/message_test.go @@ -271,7 +271,7 @@ func TestListMsg(t *testing.T) { func TestCopyMsg(t *testing.T) { assert := assert.New(t) - msg := message.NewCopyMessage("myname/mynextname", "myoldcfg/path", true, true) + msg := message.NewCopyMessage("myname/mynextname", "myoldcfg/path", true, true, 5432) body := msg.Encode() assert.Equal(body[8], byte(message.MessageTypeCopy)) @@ -283,6 +283,7 @@ func TestCopyMsg(t *testing.T) { assert.Equal("myoldcfg/path", msg2.OldCfgPath) assert.True(msg2.Decrypt) assert.True(msg2.Encrypt) + assert.Equal(uint64(5432), msg2.Port) } func TestDeleteMsg(t *testing.T) { diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 9ff4cac..0828da5 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -227,12 +227,23 @@ func ProcessCopyExtended(msg message.CopyMessage, s storage.StorageInteractor, c return nil } + dbInterractor := &database.DatabaseHandler{} + vi, _, err := dbInterractor.GetVirtualExpireIndexes(msg.Port) + if err != nil { + return err + } + var failed []*object.ObjectInfo retryCount := 0 for len(objectMetas) > 0 && retryCount < 10 { retryCount++ for i := 0; i < len(objectMetas); i++ { path := strings.TrimPrefix(objectMetas[i].Path, instanceCnf.StorageCnf.StoragePrefix) + reworked := ReworkFileName(path) + if _, ok := vi[reworked]; !ok { + continue + } + //get reader readerFromOldBucket := NewYRetryReader(NewRestartReader(oldStorage, path, nil)) var fromReader io.Reader diff --git a/pkg/storage/s3storage.go b/pkg/storage/s3storage.go index dd8894e..3a6d14d 100644 --- a/pkg/storage/s3storage.go +++ b/pkg/storage/s3storage.go @@ -73,7 +73,7 @@ func (s *S3StorageInteractor) PutFileToDest(name string, r io.Reader, settings [ storageClass := ResolveStorageSetting(settings, message.StorageClassSetting, "STANDARD") tableSpace := ResolveStorageSetting(settings, message.TableSpaceSetting, tablespace.DefaultTableSpace) - multipartChunksizeStr := ResolveStorageSetting(settings, message.MultipartChunksize, "") + multipartChunksizeStr := ResolveStorageSetting(settings, message.MultipartChunksize, "16777216") multipartChunksize, err := strconv.ParseInt(multipartChunksizeStr, 10, 64) if err != nil { return err