From e1c51f9749008219760489be597c11a5fa5bc53f Mon Sep 17 00:00:00 2001 From: Victor <87538976+visill@users.noreply.github.com> Date: Mon, 7 Oct 2024 19:57:16 +0500 Subject: [PATCH] Recude codepath nesting: move listing, deletion and copying into separate functions * I put the code of deletion and copying into separate functions * The uniform style of the name * The list code has also been made into a separate function --- pkg/proc/interaction.go | 401 +++++++++++++++++++++------------------- 1 file changed, 209 insertions(+), 192 deletions(-) diff --git a/pkg/proc/interaction.go b/pkg/proc/interaction.go index 3d24881..ab10212 100644 --- a/pkg/proc/interaction.go +++ b/pkg/proc/interaction.go @@ -172,7 +172,212 @@ func ProcessPutExtended( return nil } +func ProcessListExtended(msg message.ListMessage, s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient, cnf *config.Vacuum) error { + ycl.SetExternalFilePath(msg.Prefix) + objectMetas, err := s.ListPath(msg.Prefix) + if err != nil { + _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request") + + return nil + } + + const chunkSize = 1000 + + for i := 0; i < len(objectMetas); i += chunkSize { + _, err = ycl.GetRW().Write(message.NewObjectMetaMessage(objectMetas[i:min(i+chunkSize, len(objectMetas))]).Encode()) + if err != nil { + _ = ycl.ReplyError(err, "failed to upload") + + return nil + } + + } + + _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()) + + if err != nil { + _ = ycl.ReplyError(err, "failed to upload") + + return err + } + return nil +} +func ProcessCopyExtended(msg message.CopyMessage, s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient) error { + + ycl.SetExternalFilePath(msg.Name) + + //get config for old bucket + instanceCnf, err := config.ReadInstanceConfig(msg.OldCfgPath) + if err != nil { + _ = ycl.ReplyError(fmt.Errorf("could not read old config: %s", err), "failed to compelete request") + return nil + } + config.EmbedDefaults(&instanceCnf) + oldStorage, err := storage.NewStorage(&instanceCnf.StorageCnf) + if err != nil { + return err + } + ylogger.Zero.Info().Interface("cnf", instanceCnf).Msg("loaded new config") + + //list objects + objectMetas, err := oldStorage.ListPath(msg.Name) + if err != nil { + _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request") + return nil + } + + var failed []*object.ObjectInfo + retryCount := 0 + for len(objectMetas) > 0 && retryCount < 10 { + retryCount++ + for i := 0; i < len(objectMetas); i++ { + path := strings.TrimPrefix(objectMetas[i].Path, instanceCnf.StorageCnf.StoragePrefix) + //get reader + readerFromOldBucket := NewYRetryReader(NewRestartReader(oldStorage, path, nil)) + var fromReader io.Reader + fromReader = readerFromOldBucket + defer readerFromOldBucket.Close() + + if msg.Decrypt { + oldCr, err := crypt.NewCrypto(&instanceCnf.CryptoCnf) + if err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to configure decrypter") + failed = append(failed, objectMetas[i]) + continue + } + fromReader, err = oldCr.Decrypt(readerFromOldBucket) + if err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to decrypt object") + failed = append(failed, objectMetas[i]) + continue + } + } + + //reencrypt + readerEncrypt, writerEncrypt := io.Pipe() + + go func() { + defer func() { + if err := writerEncrypt.Close(); err != nil { + ylogger.Zero.Warn().Err(err).Msg("failed to close writer") + } + }() + + var writerToNewBucket io.WriteCloser = writerEncrypt + + if msg.Encrypt { + var err error + writerToNewBucket, err = cr.Encrypt(writerEncrypt) + if err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to encrypt object") + failed = append(failed, objectMetas[i]) + return + } + } + + if _, err := io.Copy(writerToNewBucket, fromReader); err != nil { + ylogger.Zero.Error().Str("path", path).Err(err).Msg("failed to copy data") + failed = append(failed, objectMetas[i]) + return + } + + if err := writerToNewBucket.Close(); err != nil { + ylogger.Zero.Error().Str("path", path).Err(err).Msg("failed to close writer") + failed = append(failed, objectMetas[i]) + return + } + }() + + //write file + err = s.PutFileToDest(path, readerEncrypt, nil) + if err != nil { + ylogger.Zero.Error().Err(err).Msg("failed to upload file") + failed = append(failed, objectMetas[i]) + continue + } + } + objectMetas = failed + fmt.Printf("failed files count: %d\n", len(objectMetas)) + failed = make([]*object.ObjectInfo, 0) + } + + if len(objectMetas) > 0 { + fmt.Printf("failed files count: %d\n", len(objectMetas)) + fmt.Printf("failed files: %v\n", objectMetas) + ylogger.Zero.Error().Int("failed files count", len(objectMetas)).Msg("failed to upload some files") + ylogger.Zero.Error().Any("failed files", objectMetas).Msg("failed to upload some files") + + // _ = ycl.ReplyError(err, "failed to copy some files") + // return nil + } + + if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { + _ = ycl.ReplyError(err, "failed to upload") + return err + } + fmt.Println("Copy finished successfully") + ylogger.Zero.Info().Msg("Copy finished successfully") + return nil +} +func ProcessDeleteExtended(msg message.DeleteMessage, s storage.StorageInteractor, ycl client.YproxyClient, cnf *config.Vacuum) error { + ycl.SetExternalFilePath(msg.Name) + + dbInterractor := &database.DatabaseHandler{} + backupHandler := &backups.WalgBackupInterractor{} + + var dh = &BasicDeleteHandler{ + StorageInterractor: s, + DbInterractor: dbInterractor, + BackupInterractor: backupHandler, + Cnf: cnf, + } + + if msg.Garbage { + ylogger.Zero.Debug(). + Str("Name", msg.Name). + Uint64("port", msg.Port). + Uint64("segment", msg.Segnum). + Bool("confirm", msg.Confirm).Msg("requested to perform external storage VACUUM") + } else { + ylogger.Zero.Debug(). + Str("Name", msg.Name). + Uint64("port", msg.Port). + Uint64("segment", msg.Segnum). + Bool("confirm", msg.Confirm).Msg("requested to remove external chunk") + } + + if msg.Garbage { + err := dh.HandleDeleteGarbage(msg) + if err != nil { + _ = ycl.ReplyError(err, "failed to finish operation") + return err + } + } else { + err := dh.HandleDeleteFile(msg) + if err != nil { + _ = ycl.ReplyError(err, "failed to finish operation") + return err + } + } + + if _, err := ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { + _ = ycl.ReplyError(err, "failed to upload") + return err + } + + if msg.Garbage { + if !msg.Confirm { + ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted") + } else { + ylogger.Zero.Info().Msg("Deleted garbage successfully") + } + } else { + ylogger.Zero.Info().Msg("Deleted chunk successfully") + } + + return nil +} func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyClient, cnf *config.Vacuum) error { defer func() { @@ -234,214 +439,26 @@ func ProcConn(s storage.StorageInteractor, cr crypt.Crypter, ycl client.YproxyCl msg := message.ListMessage{} msg.Decode(body) - ycl.SetExternalFilePath(msg.Prefix) - - objectMetas, err := s.ListPath(msg.Prefix) - if err != nil { - _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request") - - return nil - } - - const chunkSize = 1000 - - for i := 0; i < len(objectMetas); i += chunkSize { - _, err = ycl.GetRW().Write(message.NewObjectMetaMessage(objectMetas[i:min(i+chunkSize, len(objectMetas))]).Encode()) - if err != nil { - _ = ycl.ReplyError(err, "failed to upload") - - return nil - } - - } - - _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()) - + err := ProcessListExtended(msg, s, cr, ycl, cnf) if err != nil { - _ = ycl.ReplyError(err, "failed to upload") - return err } - case message.MessageTypeCopy: msg := message.CopyMessage{} msg.Decode(body) - ycl.SetExternalFilePath(msg.Name) - - //get config for old bucket - instanceCnf, err := config.ReadInstanceConfig(msg.OldCfgPath) - if err != nil { - _ = ycl.ReplyError(fmt.Errorf("could not read old config: %s", err), "failed to compelete request") - return nil - } - config.EmbedDefaults(&instanceCnf) - oldStorage, err := storage.NewStorage(&instanceCnf.StorageCnf) + err := ProcessCopyExtended(msg, s, cr, ycl) if err != nil { return err } - ylogger.Zero.Info().Interface("cnf", instanceCnf).Msg("loaded new config") - - //list objects - objectMetas, err := oldStorage.ListPath(msg.Name) - if err != nil { - _ = ycl.ReplyError(fmt.Errorf("could not list objects: %s", err), "failed to compelete request") - return nil - } - - var failed []*object.ObjectInfo - retryCount := 0 - for len(objectMetas) > 0 && retryCount < 10 { - retryCount++ - for i := 0; i < len(objectMetas); i++ { - path := strings.TrimPrefix(objectMetas[i].Path, instanceCnf.StorageCnf.StoragePrefix) - //get reader - readerFromOldBucket := NewYRetryReader(NewRestartReader(oldStorage, path, nil)) - var fromReader io.Reader - fromReader = readerFromOldBucket - defer readerFromOldBucket.Close() - - if msg.Decrypt { - oldCr, err := crypt.NewCrypto(&instanceCnf.CryptoCnf) - if err != nil { - ylogger.Zero.Error().Err(err).Msg("failed to configure decrypter") - failed = append(failed, objectMetas[i]) - continue - } - fromReader, err = oldCr.Decrypt(readerFromOldBucket) - if err != nil { - ylogger.Zero.Error().Err(err).Msg("failed to decrypt object") - failed = append(failed, objectMetas[i]) - continue - } - } - - //reencrypt - readerEncrypt, writerEncrypt := io.Pipe() - - go func() { - defer func() { - if err := writerEncrypt.Close(); err != nil { - ylogger.Zero.Warn().Err(err).Msg("failed to close writer") - } - }() - - var writerToNewBucket io.WriteCloser = writerEncrypt - - if msg.Encrypt { - var err error - writerToNewBucket, err = cr.Encrypt(writerEncrypt) - if err != nil { - ylogger.Zero.Error().Err(err).Msg("failed to encrypt object") - failed = append(failed, objectMetas[i]) - return - } - } - - if _, err := io.Copy(writerToNewBucket, fromReader); err != nil { - ylogger.Zero.Error().Str("path", path).Err(err).Msg("failed to copy data") - failed = append(failed, objectMetas[i]) - return - } - - if err := writerToNewBucket.Close(); err != nil { - ylogger.Zero.Error().Str("path", path).Err(err).Msg("failed to close writer") - failed = append(failed, objectMetas[i]) - return - } - }() - - //write file - err = s.PutFileToDest(path, readerEncrypt, nil) - if err != nil { - ylogger.Zero.Error().Err(err).Msg("failed to upload file") - failed = append(failed, objectMetas[i]) - continue - } - } - objectMetas = failed - fmt.Printf("failed files count: %d\n", len(objectMetas)) - failed = make([]*object.ObjectInfo, 0) - } - - if len(objectMetas) > 0 { - fmt.Printf("failed files count: %d\n", len(objectMetas)) - fmt.Printf("failed files: %v\n", objectMetas) - ylogger.Zero.Error().Int("failed files count", len(objectMetas)).Msg("failed to upload some files") - ylogger.Zero.Error().Any("failed files", objectMetas).Msg("failed to upload some files") - - // _ = ycl.ReplyError(err, "failed to copy some files") - // return nil - } - - if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { - _ = ycl.ReplyError(err, "failed to upload") - return err - } - fmt.Println("Copy finished successfully") - ylogger.Zero.Info().Msg("Copy finished successfully") - case message.MessageTypeDelete: //recieve message msg := message.DeleteMessage{} msg.Decode(body) - - ycl.SetExternalFilePath(msg.Name) - - dbInterractor := &database.DatabaseHandler{} - backupHandler := &backups.WalgBackupInterractor{} - - var dh DeleteHandler - dh = &BasicDeleteHandler{ - StorageInterractor: s, - DbInterractor: dbInterractor, - BackupInterractor: backupHandler, - Cnf: cnf, - } - - if msg.Garbage { - ylogger.Zero.Debug(). - Str("Name", msg.Name). - Uint64("port", msg.Port). - Uint64("segment", msg.Segnum). - Bool("confirm", msg.Confirm).Msg("requested to perform external storage VACUUM") - } else { - ylogger.Zero.Debug(). - Str("Name", msg.Name). - Uint64("port", msg.Port). - Uint64("segment", msg.Segnum). - Bool("confirm", msg.Confirm).Msg("requested to remove external chunk") - } - - if msg.Garbage { - err = dh.HandleDeleteGarbage(msg) - if err != nil { - _ = ycl.ReplyError(err, "failed to finish operation") - return err - } - } else { - err = dh.HandleDeleteFile(msg) - if err != nil { - _ = ycl.ReplyError(err, "failed to finish operation") - return err - } - } - - if _, err = ycl.GetRW().Write(message.NewReadyForQueryMessage().Encode()); err != nil { - _ = ycl.ReplyError(err, "failed to upload") + err := ProcessDeleteExtended(msg, s, ycl, cnf) + if err != nil { return err } - - if msg.Garbage { - if !msg.Confirm { - ylogger.Zero.Warn().Msg("It was a dry-run, nothing was deleted") - } else { - ylogger.Zero.Info().Msg("Deleted garbage successfully") - } - } else { - ylogger.Zero.Info().Msg("Deleted chunk successfully") - } - case message.MessageTypeGool: return ProcMotion(s, cr, ycl)