From 4dd91f063df51958c72d113fc8374573666d1caa Mon Sep 17 00:00:00 2001 From: abyss Date: Fri, 24 Nov 2023 14:50:14 +0800 Subject: [PATCH] feat: Added MD5 check after migration --- migrate/object.go | 74 +++++++++++++++++++++++++++++++++++++---------- model/task.go | 1 + 2 files changed, 60 insertions(+), 15 deletions(-) diff --git a/migrate/object.go b/migrate/object.go index 617aebe..ce27042 100644 --- a/migrate/object.go +++ b/migrate/object.go @@ -4,13 +4,13 @@ import ( "context" "crypto/md5" "encoding/hex" + "fmt" + "github.com/sirupsen/logrus" "io" "strings" "sync" "time" - "github.com/sirupsen/logrus" - "github.com/yunify/qscamel/constants" "github.com/yunify/qscamel/endpoint" "github.com/yunify/qscamel/model" @@ -70,7 +70,7 @@ func checkObject(ctx context.Context, mo model.Object) (ok bool, err error) { logrus.Infof("Start checking object %s.", o.Key) - so, err := statObject(ctx, src, o) + so, err := statObject(ctx, src, o, false) if err != nil { return } @@ -78,7 +78,7 @@ func checkObject(ctx context.Context, mo model.Object) (ok bool, err error) { return true, nil } - do, err := statObject(ctx, dst, o) + do, err := statObject(ctx, dst, o, false) if err != nil { return } @@ -92,19 +92,17 @@ func checkObject(ctx context.Context, mo model.Object) (ok bool, err error) { return } - // Check last modified - if t.IgnoreExisting == constants.TaskIgnoreExistingLastModified { - if so.LastModified > do.LastModified { - logrus.Infof("Object %s was modified, execute an operation on it.", o.Key) + if t.IgnoreBeforeTimestamp != 0 { + if so.LastModified > t.IgnoreBeforeTimestamp { + logrus.Infof("Object %s was modified after %s, execute an operation on it.", o.Key, time.Unix(t.IgnoreBeforeTimestamp, 0)) return } - logrus.Infof("Object %s check passed, ignore.", o.Key) - return true, nil } - if t.IgnoreBeforeTimestamp != 0 { - if so.LastModified > t.IgnoreBeforeTimestamp { - logrus.Infof("Object %s was modified after %s, execute an operation on it.", o.Key, time.Unix(t.IgnoreBeforeTimestamp, 0)) + // Check last modified + if t.IgnoreExisting == constants.TaskIgnoreExistingLastModified { + if so.LastModified > do.LastModified { + logrus.Infof("Object %s was modified, execute an operation on it.", o.Key) return } logrus.Infof("Object %s check passed, ignore.", o.Key) @@ -121,6 +119,34 @@ func checkObject(ctx context.Context, mo model.Object) (ok bool, err error) { return true, nil } +// checkObjectAfterMigrate will check whether the MD5 between the migrated src and dst is consistent. +func checkObjectAfterMigrate(ctx context.Context, mo model.Object) (err error) { + if t.Src.Type == "fs" || t.Dst.Type == "fs" { + return nil + } + + o := mo.(*model.SingleObject) + + rso, err := statObject(ctx, src, o, true) + if err != nil { + logrus.Errorf("Src stat %s failed for %v.", o.Key, err) + return err + } + + rdo, err := statObject(ctx, dst, o, true) + if err != nil { + logrus.Errorf("Dst stat %s failed for %v.", o.Key, err) + return err + } + + if rdo.MD5 != rso.MD5 { + logrus.Errorf("md5 mismatch between src and dst %s.", o.Key) + return fmt.Errorf("md5 not match") + } + + return +} + // copyObject will do a real copy. func copyObject(ctx context.Context, o model.Object) (err error) { so := o.(*model.SingleObject) @@ -140,6 +166,14 @@ func copyObject(ctx context.Context, o model.Object) (err error) { return err } + if t.CheckMD5 { + err = checkObjectAfterMigrate(ctx, o) + if err != nil { + _ = dst.Delete(ctx, so.Key) + return err + } + } + logrus.Infof("Single object %s copied.", so.Key) return nil } @@ -245,6 +279,16 @@ func copyObject(ctx context.Context, o model.Object) (err error) { return err } + if t.CheckMD5 { + if t.CheckMD5 { + err = checkObjectAfterMigrate(ctx, o) + if err != nil { + _ = dst.Delete(ctx, so.Key) + return err + } + } + } + logrus.Infof("Object %s copied.", so.Key) return @@ -299,7 +343,7 @@ func fetchObject(ctx context.Context, o model.Object) (err error) { // statObject will get an object metadata and try to get it's md5 if available. func statObject( - ctx context.Context, e endpoint.Base, o *model.SingleObject, + ctx context.Context, e endpoint.Base, o *model.SingleObject, isMD5 bool, ) (ro *model.SingleObject, err error) { ro, err = e.Stat(ctx, o.Key, o.IsDir) if err != nil { @@ -311,7 +355,7 @@ func statObject( return } - if t.IgnoreExisting != constants.TaskIgnoreExistingMD5Sum { + if t.IgnoreExisting != constants.TaskIgnoreExistingMD5Sum && !isMD5 { return } diff --git a/model/task.go b/model/task.go index a039836..a1505dc 100644 --- a/model/task.go +++ b/model/task.go @@ -25,6 +25,7 @@ type Task struct { Src *Endpoint `yaml:"source" msgpack:"src"` Dst *Endpoint `yaml:"destination" msgpack:"dst"` + CheckMD5 bool `yaml:"check_md5" msgpack:"cm"` IgnoreExisting string `yaml:"ignore_existing" msgpack:"ie"` MultipartBoundarySize int64 `yaml:"multipart_boundary_size" msgpack:"mbs"` // Format: 2006-01-02 15:04:05