From f2b1d9eef7d6f45481df34920bacab2ba9c0cb01 Mon Sep 17 00:00:00 2001 From: kmetin Date: Wed, 1 Nov 2023 10:33:22 +0300 Subject: [PATCH 1/7] add estimate command to DMT --- base/commands/migration/const.go | 1 + base/commands/migration/estimate.go | 53 +++++++ base/commands/migration/estimate_it_test.go | 1 + base/commands/migration/estimate_stages.go | 158 ++++++++++++++++++++ base/commands/migration/start_stages.go | 15 -- base/commands/migration/utils.go | 15 +- internal/str/str.go | 19 +++ 7 files changed, 245 insertions(+), 17 deletions(-) create mode 100644 base/commands/migration/estimate.go create mode 100644 base/commands/migration/estimate_it_test.go create mode 100644 base/commands/migration/estimate_stages.go diff --git a/base/commands/migration/const.go b/base/commands/migration/const.go index 97d53ba03..d522f0c03 100644 --- a/base/commands/migration/const.go +++ b/base/commands/migration/const.go @@ -4,6 +4,7 @@ package migration const ( StartQueueName = "__datamigration_start_queue" + EstimateQueueName = "__datamigration_estimate_queue" StatusMapName = "__datamigration_migrations" UpdateTopicPrefix = "__datamigration_updates_" DebugLogsListPrefix = "__datamigration_debug_logs_" diff --git a/base/commands/migration/estimate.go b/base/commands/migration/estimate.go new file mode 100644 index 000000000..72e2d9323 --- /dev/null +++ b/base/commands/migration/estimate.go @@ -0,0 +1,53 @@ +//go:build std || migration + +package migration + +import ( + "context" + "fmt" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" +) + +type EstimateCmd struct{} + +func (e EstimateCmd) Init(cc plug.InitContext) error { + cc.SetCommandUsage("estimate") + cc.SetCommandGroup("migration") + help := "Estimate migration" + cc.SetCommandHelp(help, help) + cc.AddStringArg(argDMTConfig, argTitleDMTConfig) + return nil +} + +func (e EstimateCmd) Exec(ctx context.Context, ec plug.ExecContext) error { + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary(`Hazelcast Data Migration Tool v5.3.0 +(c) 2023 Hazelcast, Inc. + +Estimation usually ends within 15 seconds. +`) + mID := MakeMigrationID() + stages, err := NewEstimateStages(ec.Logger(), mID, ec.GetStringArg(argDMTConfig)) + if err != nil { + return err + } + sp := stage.NewFixedProvider(stages.Build(ctx, ec)...) + res, err := stage.Execute(ctx, ec, any(nil), sp) + if err != nil { + return err + } + resArr := res.([]string) + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary(fmt.Sprintf("OK %s", resArr[0])) + ec.PrintlnUnnecessary(fmt.Sprintf("OK %s", resArr[1])) + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary("OK Estimation completed successfully.") + return nil +} + +func init() { + check.Must(plug.Registry.RegisterCommand("estimate", &EstimateCmd{})) +} diff --git a/base/commands/migration/estimate_it_test.go b/base/commands/migration/estimate_it_test.go new file mode 100644 index 000000000..753000bce --- /dev/null +++ b/base/commands/migration/estimate_it_test.go @@ -0,0 +1 @@ +package migration diff --git a/base/commands/migration/estimate_stages.go b/base/commands/migration/estimate_stages.go new file mode 100644 index 000000000..1205155e6 --- /dev/null +++ b/base/commands/migration/estimate_stages.go @@ -0,0 +1,158 @@ +//go:build std || migration + +package migration + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/log" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" + "github.com/hazelcast/hazelcast-commandline-client/internal/str" + "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" +) + +type EstimateStages struct { + migrationID string + configDir string + ci *hazelcast.ClientInternal + estimateQueue *hazelcast.Queue + statusMap *hazelcast.Map + logger log.Logger +} + +func NewEstimateStages(logger log.Logger, migrationID, configDir string) (*EstimateStages, error) { + if migrationID == "" { + return nil, errors.New("migrationID is required") + } + return &EstimateStages{ + migrationID: migrationID, + configDir: configDir, + logger: logger, + }, nil +} + +func (es *EstimateStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage[any] { + return []stage.Stage[any]{ + { + ProgressMsg: "Connecting to the migration cluster", + SuccessMsg: "Connected to the migration cluster", + FailureMsg: "Could not connect to the migration cluster", + Func: es.connectStage(ec), + }, + { + ProgressMsg: "Estimating the migration", + SuccessMsg: "Estimated the migration", + FailureMsg: "Could not estimate the migration", + Func: es.estimateStage(), + }, + } +} + +func (es *EstimateStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, s stage.Statuser[any]) (any, error) { + var err error + es.ci, err = ec.ClientInternal(ctx) + if err != nil { + return nil, err + } + es.estimateQueue, err = es.ci.Client().GetQueue(ctx, EstimateQueueName) + if err != nil { + return nil, fmt.Errorf("retrieving the estimate Queue: %w", err) + } + es.statusMap, err = es.ci.Client().GetMap(ctx, StatusMapName) + if err != nil { + return nil, fmt.Errorf("retrieving the status Map: %w", err) + } + return nil, nil + } +} + +func (es *EstimateStages) estimateStage() func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { + cb, err := makeConfigBundle(es.configDir, es.migrationID) + if err != nil { + return nil, fmt.Errorf("making configuration bundle: %w", err) + } + if err = es.estimateQueue.Put(ctx, cb); err != nil { + return nil, fmt.Errorf("updating estimate Queue: %w", err) + } + if err = waitForEstimationToComplete(ctx, es.ci, es.migrationID, status); err != nil { + return nil, fmt.Errorf("waiting for estimation to complete: %w", err) + } + return fetchEstimationResults(ctx, es.ci, es.migrationID) + } +} + +func waitForEstimationToComplete(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string, stage stage.Statuser[any]) error { + duration := 16 * time.Second + interval := 1 * time.Second + for { + if duration > 0 { + stage.SetRemainingDuration(duration) + } else { + stage.SetText("It will take a bit longer than expected.") + } + status, err := fetchMigrationStatus(ctx, ci, migrationID) + if err != nil { + if errors.Is(err, migrationStatusNotFoundErr) { + // migration status will not be available for a while, so we should wait for it + continue + } + return err + } + if Status(status) == StatusFailed { + errs, err := fetchMigrationErrors(ctx, ci, migrationID) + if err != nil { + return fmt.Errorf("estimation failed and dmt cannot fetch estimation errors: %w", err) + } + return errors.New(errs) + } + if Status(status) == StatusComplete { + return nil + } + time.Sleep(interval) + duration = duration - interval + } +} + +func fetchEstimationResults(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) ([]string, error) { + q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.estimatedTime'), JSON_QUERY(this, '$.estimatedSize') FROM %s WHERE __key='%s'`, StatusMapName, migrationID) + res, err := ci.Client().SQL().Execute(ctx, q) + if err != nil { + return nil, err + } + it, err := res.Iterator() + if err != nil { + return nil, err + } + if it.HasNext() { + // single iteration is enough that we are reading single result for a single migration + row, err := it.Next() + if err != nil { + return nil, err + } + estimatedTime, err := row.Get(0) + if err != nil { + return nil, err + } + estimatedSize, err := row.Get(1) + if err != nil { + return nil, err + } + et, err := str.MsToSecs(estimatedTime.(serialization.JSON).String()) + if err != nil { + return nil, err + } + es, err := str.BytesToMegabytes(estimatedSize.(serialization.JSON).String()) + if err != nil { + return nil, err + } + return []string{fmt.Sprintf("Estimated Size: %s ", es), fmt.Sprintf("Estimated Time: %s", et)}, nil + } + return nil, errors.New("no rows found") +} diff --git a/base/commands/migration/start_stages.go b/base/commands/migration/start_stages.go index d2d1842ad..10de56af1 100644 --- a/base/commands/migration/start_stages.go +++ b/base/commands/migration/start_stages.go @@ -4,7 +4,6 @@ package migration import ( "context" - "encoding/json" "errors" "fmt" @@ -12,7 +11,6 @@ import ( "github.com/hazelcast/hazelcast-commandline-client/internal/log" "github.com/hazelcast/hazelcast-commandline-client/internal/plug" "github.com/hazelcast/hazelcast-go-client" - "github.com/hazelcast/hazelcast-go-client/serialization" ) type StartStages struct { @@ -83,16 +81,3 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) ( return nil, nil } } - -func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error) { - var cb ConfigBundle - cb.MigrationID = migrationID - if err := cb.Walk(configDir); err != nil { - return nil, err - } - b, err := json.Marshal(cb) - if err != nil { - return nil, err - } - return b, nil -} diff --git a/base/commands/migration/utils.go b/base/commands/migration/utils.go index 5af0aa0ed..06b36610f 100644 --- a/base/commands/migration/utils.go +++ b/base/commands/migration/utils.go @@ -5,6 +5,7 @@ package migration import ( "bufio" "encoding/base64" + "encoding/json" "fmt" "io/fs" "os" @@ -12,6 +13,7 @@ import ( "sort" "strings" + "github.com/hazelcast/hazelcast-go-client/serialization" "github.com/hazelcast/hazelcast-go-client/types" "github.com/hazelcast/hazelcast-commandline-client/clc/paths" @@ -125,6 +127,15 @@ func MakeMigrationID() string { return types.NewUUID().String() } -func MakeUpdateTopicName(migrationID string) string { - return UpdateTopicPrefix + migrationID +func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error) { + var cb ConfigBundle + cb.MigrationID = migrationID + if err := cb.Walk(configDir); err != nil { + return nil, err + } + b, err := json.Marshal(cb) + if err != nil { + return nil, err + } + return b, nil } diff --git a/internal/str/str.go b/internal/str/str.go index 4dd34e761..7daf555de 100644 --- a/internal/str/str.go +++ b/internal/str/str.go @@ -59,3 +59,22 @@ func Colorize(text string) string { } return text } + +func BytesToMegabytes(bytesStr string) (string, error) { + bytes, err := strconv.ParseFloat(bytesStr, 64) + if err != nil { + return "", err + } + mb := bytes / (1024.0 * 1024.0) + return fmt.Sprintf("%.2f MBs", mb), nil +} + +func MsToSecs(ms string) (string, error) { + milliseconds, err := strconv.ParseInt(ms, 10, 64) + if err != nil { + return "", err + } + seconds := float64(milliseconds) / 1000.0 + secondsStr := fmt.Sprintf("%.1f sec", seconds) + return secondsStr, nil +} From cddaad42d6bee10f9446fad179848ce3c32489d8 Mon Sep 17 00:00:00 2001 From: kmetin Date: Wed, 1 Nov 2023 11:00:49 +0300 Subject: [PATCH 2/7] add tests for estimate command --- base/commands/migration/estimate_it_test.go | 59 ++++++++++++++++++- .../testdata/estimate/estimate_completed.json | 5 ++ 2 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 base/commands/migration/testdata/estimate/estimate_completed.json diff --git a/base/commands/migration/estimate_it_test.go b/base/commands/migration/estimate_it_test.go index 753000bce..e9c1cc780 100644 --- a/base/commands/migration/estimate_it_test.go +++ b/base/commands/migration/estimate_it_test.go @@ -1 +1,58 @@ -package migration +//go:build std || migration + +package migration_test + +import ( + "context" + "encoding/json" + "os" + "sync" + "testing" + "time" + + "github.com/hazelcast/hazelcast-commandline-client/base/commands/migration" + "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/it" + "github.com/hazelcast/hazelcast-go-client/serialization" +) + +func TestEstimate(t *testing.T) { + tcx := it.TestContext{T: t} + ctx := context.Background() + tcx.Tester(func(tcx it.TestContext) { + createMapping(ctx, tcx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + check.Must(tcx.CLC().Execute(ctx, "estimate", "dmt-config")) + }() + c := make(chan string) + go findEstimationID(ctx, tcx, c) + mID := <-c + go estimateRunner(t, ctx, tcx, mID) + wg.Wait() + tcx.AssertStdoutContains("OK Estimation completed successfully") + }) +} + +func estimateRunner(t *testing.T, ctx context.Context, tcx it.TestContext, migrationID string) { + statusMap := check.MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + b := check.MustValue(os.ReadFile("testdata/estimate/estimate_completed.json")) + it.Eventually(t, func() bool { + return statusMap.Set(ctx, migrationID, serialization.JSON(b)) == nil + }) +} + +func findEstimationID(ctx context.Context, tcx it.TestContext, c chan string) { + q := check.MustValue(tcx.Client.GetQueue(ctx, migration.EstimateQueueName)) + var b migration.ConfigBundle + for { + v := check.MustValue(q.PollWithTimeout(ctx, time.Second)) + if v != nil { + check.Must(json.Unmarshal(v.(serialization.JSON), &b)) + c <- b.MigrationID + break + } + } +} diff --git a/base/commands/migration/testdata/estimate/estimate_completed.json b/base/commands/migration/testdata/estimate/estimate_completed.json new file mode 100644 index 000000000..e6ad6081f --- /dev/null +++ b/base/commands/migration/testdata/estimate/estimate_completed.json @@ -0,0 +1,5 @@ +{ + "status": "COMPLETED", + "estimatedTime": 1, + "estimatedSize": 1 +} \ No newline at end of file From f47341fb85f065c9043d02bc7769586d74a7b092 Mon Sep 17 00:00:00 2001 From: kmetin Date: Thu, 2 Nov 2023 14:11:04 +0300 Subject: [PATCH 3/7] fix PR comments --- base/commands/migration/estimate_stages.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/base/commands/migration/estimate_stages.go b/base/commands/migration/estimate_stages.go index 1205155e6..709b179f4 100644 --- a/base/commands/migration/estimate_stages.go +++ b/base/commands/migration/estimate_stages.go @@ -81,7 +81,9 @@ func (es *EstimateStages) estimateStage() func(context.Context, stage.Statuser[a if err = es.estimateQueue.Put(ctx, cb); err != nil { return nil, fmt.Errorf("updating estimate Queue: %w", err) } - if err = waitForEstimationToComplete(ctx, es.ci, es.migrationID, status); err != nil { + childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err = waitForEstimationToComplete(childCtx, es.ci, es.migrationID, status); err != nil { return nil, fmt.Errorf("waiting for estimation to complete: %w", err) } return fetchEstimationResults(ctx, es.ci, es.migrationID) @@ -95,7 +97,7 @@ func waitForEstimationToComplete(ctx context.Context, ci *hazelcast.ClientIntern if duration > 0 { stage.SetRemainingDuration(duration) } else { - stage.SetText("It will take a bit longer than expected.") + stage.SetText("Estimation took longer than expected.") } status, err := fetchMigrationStatus(ctx, ci, migrationID) if err != nil { From e4a7c2c5b9988275f6c4aaa15cce0883faf837bf Mon Sep 17 00:00:00 2001 From: kmetin Date: Fri, 3 Nov 2023 11:19:49 +0300 Subject: [PATCH 4/7] fix Yuce's PR comments --- base/commands/migration/estimate.go | 6 ++---- base/commands/migration/estimate_it_test.go | 12 +++++------- base/commands/migration/start_stages_it_test.go | 2 +- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/base/commands/migration/estimate.go b/base/commands/migration/estimate.go index 72e2d9323..bb40d47f4 100644 --- a/base/commands/migration/estimate.go +++ b/base/commands/migration/estimate.go @@ -24,11 +24,9 @@ func (e EstimateCmd) Init(cc plug.InitContext) error { func (e EstimateCmd) Exec(ctx context.Context, ec plug.ExecContext) error { ec.PrintlnUnnecessary("") - ec.PrintlnUnnecessary(`Hazelcast Data Migration Tool v5.3.0 -(c) 2023 Hazelcast, Inc. + ec.PrintlnUnnecessary(fmt.Sprintf(`%s -Estimation usually ends within 15 seconds. -`) +Estimation usually ends within 15 seconds.`, banner)) mID := MakeMigrationID() stages, err := NewEstimateStages(ec.Logger(), mID, ec.GetStringArg(argDMTConfig)) if err != nil { diff --git a/base/commands/migration/estimate_it_test.go b/base/commands/migration/estimate_it_test.go index e9c1cc780..b21da4fc4 100644 --- a/base/commands/migration/estimate_it_test.go +++ b/base/commands/migration/estimate_it_test.go @@ -25,30 +25,28 @@ func TestEstimate(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - check.Must(tcx.CLC().Execute(ctx, "estimate", "dmt-config")) + check.Must(tcx.CLC().Execute(ctx, "estimate", "testdata/dmt-config")) }() c := make(chan string) go findEstimationID(ctx, tcx, c) mID := <-c - go estimateRunner(t, ctx, tcx, mID) + go estimateRunner(ctx, tcx, mID) wg.Wait() tcx.AssertStdoutContains("OK Estimation completed successfully") }) } -func estimateRunner(t *testing.T, ctx context.Context, tcx it.TestContext, migrationID string) { +func estimateRunner(ctx context.Context, tcx it.TestContext, migrationID string) { statusMap := check.MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) b := check.MustValue(os.ReadFile("testdata/estimate/estimate_completed.json")) - it.Eventually(t, func() bool { - return statusMap.Set(ctx, migrationID, serialization.JSON(b)) == nil - }) + check.Must(statusMap.Set(ctx, migrationID, serialization.JSON(b))) } func findEstimationID(ctx context.Context, tcx it.TestContext, c chan string) { q := check.MustValue(tcx.Client.GetQueue(ctx, migration.EstimateQueueName)) var b migration.ConfigBundle for { - v := check.MustValue(q.PollWithTimeout(ctx, time.Second)) + v := check.MustValue(q.PollWithTimeout(ctx, 100*time.Millisecond)) if v != nil { check.Must(json.Unmarshal(v.(serialization.JSON), &b)) c <- b.MigrationID diff --git a/base/commands/migration/start_stages_it_test.go b/base/commands/migration/start_stages_it_test.go index d3b6ac7c7..a1634de49 100644 --- a/base/commands/migration/start_stages_it_test.go +++ b/base/commands/migration/start_stages_it_test.go @@ -67,7 +67,7 @@ func startMigrationTest(t *testing.T, expectedErr error, statusMapStateFiles []s var execErr error go tcx.WithReset(func() { defer wg.Done() - execErr = tcx.CLC().Execute(ctx, "start", "dmt-config", "--yes", "-o", outDir) + execErr = tcx.CLC().Execute(ctx, "start", "testdata/dmt-config", "--yes", "-o", outDir) }) c := make(chan string) go findMigrationID(ctx, tcx, c) From 0433098cea6f8d20645c6af77b458abd55f76fcc Mon Sep 17 00:00:00 2001 From: kmetin Date: Sat, 4 Nov 2023 19:58:45 +0300 Subject: [PATCH 5/7] fix Yuce's PR comments --- base/commands/migration/estimate.go | 4 ++-- base/commands/migration/estimate_stages.go | 25 +++++++++++++++++++--- internal/str/str.go | 19 ---------------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/base/commands/migration/estimate.go b/base/commands/migration/estimate.go index bb40d47f4..ecd4a5a07 100644 --- a/base/commands/migration/estimate.go +++ b/base/commands/migration/estimate.go @@ -39,8 +39,8 @@ Estimation usually ends within 15 seconds.`, banner)) } resArr := res.([]string) ec.PrintlnUnnecessary("") - ec.PrintlnUnnecessary(fmt.Sprintf("OK %s", resArr[0])) - ec.PrintlnUnnecessary(fmt.Sprintf("OK %s", resArr[1])) + ec.PrintlnUnnecessary(resArr[0]) + ec.PrintlnUnnecessary(resArr[1]) ec.PrintlnUnnecessary("") ec.PrintlnUnnecessary("OK Estimation completed successfully.") return nil diff --git a/base/commands/migration/estimate_stages.go b/base/commands/migration/estimate_stages.go index 709b179f4..b1a340ec4 100644 --- a/base/commands/migration/estimate_stages.go +++ b/base/commands/migration/estimate_stages.go @@ -6,12 +6,12 @@ import ( "context" "errors" "fmt" + "strconv" "time" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" "github.com/hazelcast/hazelcast-commandline-client/internal/log" "github.com/hazelcast/hazelcast-commandline-client/internal/plug" - "github.com/hazelcast/hazelcast-commandline-client/internal/str" "github.com/hazelcast/hazelcast-go-client" "github.com/hazelcast/hazelcast-go-client/serialization" ) @@ -146,11 +146,11 @@ func fetchEstimationResults(ctx context.Context, ci *hazelcast.ClientInternal, m if err != nil { return nil, err } - et, err := str.MsToSecs(estimatedTime.(serialization.JSON).String()) + et, err := MsToSecs(estimatedTime.(serialization.JSON).String()) if err != nil { return nil, err } - es, err := str.BytesToMegabytes(estimatedSize.(serialization.JSON).String()) + es, err := BytesToMegabytes(estimatedSize.(serialization.JSON).String()) if err != nil { return nil, err } @@ -158,3 +158,22 @@ func fetchEstimationResults(ctx context.Context, ci *hazelcast.ClientInternal, m } return nil, errors.New("no rows found") } + +func BytesToMegabytes(bytesStr string) (string, error) { + bytes, err := strconv.ParseFloat(bytesStr, 64) + if err != nil { + return "", err + } + mb := bytes / (1024.0 * 1024.0) + return fmt.Sprintf("%.2f MBs", mb), nil +} + +func MsToSecs(ms string) (string, error) { + milliseconds, err := strconv.ParseInt(ms, 10, 64) + if err != nil { + return "", err + } + seconds := float64(milliseconds) / 1000.0 + secondsStr := fmt.Sprintf("%.1f sec", seconds) + return secondsStr, nil +} diff --git a/internal/str/str.go b/internal/str/str.go index 7daf555de..4dd34e761 100644 --- a/internal/str/str.go +++ b/internal/str/str.go @@ -59,22 +59,3 @@ func Colorize(text string) string { } return text } - -func BytesToMegabytes(bytesStr string) (string, error) { - bytes, err := strconv.ParseFloat(bytesStr, 64) - if err != nil { - return "", err - } - mb := bytes / (1024.0 * 1024.0) - return fmt.Sprintf("%.2f MBs", mb), nil -} - -func MsToSecs(ms string) (string, error) { - milliseconds, err := strconv.ParseInt(ms, 10, 64) - if err != nil { - return "", err - } - seconds := float64(milliseconds) / 1000.0 - secondsStr := fmt.Sprintf("%.1f sec", seconds) - return secondsStr, nil -} From 1cd2f8c2fc7d42b74fc743c48b025caede9c0f06 Mon Sep 17 00:00:00 2001 From: kmetin Date: Mon, 6 Nov 2023 12:08:48 +0300 Subject: [PATCH 6/7] fix Yuce's PR comment --- base/commands/migration/estimate_stages.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/base/commands/migration/estimate_stages.go b/base/commands/migration/estimate_stages.go index b1a340ec4..2c62a59f4 100644 --- a/base/commands/migration/estimate_stages.go +++ b/base/commands/migration/estimate_stages.go @@ -146,11 +146,11 @@ func fetchEstimationResults(ctx context.Context, ci *hazelcast.ClientInternal, m if err != nil { return nil, err } - et, err := MsToSecs(estimatedTime.(serialization.JSON).String()) + et, err := msToSecs(estimatedTime.(serialization.JSON).String()) if err != nil { return nil, err } - es, err := BytesToMegabytes(estimatedSize.(serialization.JSON).String()) + es, err := bytesToMegabytes(estimatedSize.(serialization.JSON).String()) if err != nil { return nil, err } @@ -159,7 +159,7 @@ func fetchEstimationResults(ctx context.Context, ci *hazelcast.ClientInternal, m return nil, errors.New("no rows found") } -func BytesToMegabytes(bytesStr string) (string, error) { +func bytesToMegabytes(bytesStr string) (string, error) { bytes, err := strconv.ParseFloat(bytesStr, 64) if err != nil { return "", err @@ -168,7 +168,7 @@ func BytesToMegabytes(bytesStr string) (string, error) { return fmt.Sprintf("%.2f MBs", mb), nil } -func MsToSecs(ms string) (string, error) { +func msToSecs(ms string) (string, error) { milliseconds, err := strconv.ParseInt(ms, 10, 64) if err != nil { return "", err From 527e73cb23bd583f83927f3c268ddf2ff232efa4 Mon Sep 17 00:00:00 2001 From: kmetin Date: Mon, 6 Nov 2023 14:00:29 +0300 Subject: [PATCH 7/7] fixes --- base/commands/migration/estimate.go | 7 ++++++- base/commands/migration/estimate_it_test.go | 2 +- base/commands/migration/migration_start.go | 8 +++++++- base/commands/migration/start_stages_it_test.go | 2 +- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/base/commands/migration/estimate.go b/base/commands/migration/estimate.go index ecd4a5a07..8dde2888b 100644 --- a/base/commands/migration/estimate.go +++ b/base/commands/migration/estimate.go @@ -6,6 +6,7 @@ import ( "context" "fmt" + "github.com/hazelcast/hazelcast-commandline-client/clc/paths" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" "github.com/hazelcast/hazelcast-commandline-client/internal/check" "github.com/hazelcast/hazelcast-commandline-client/internal/plug" @@ -27,8 +28,12 @@ func (e EstimateCmd) Exec(ctx context.Context, ec plug.ExecContext) error { ec.PrintlnUnnecessary(fmt.Sprintf(`%s Estimation usually ends within 15 seconds.`, banner)) + conf := ec.GetStringArg(argDMTConfig) + if !paths.Exists(conf) { + return fmt.Errorf("migration config does not exist: %s", conf) + } mID := MakeMigrationID() - stages, err := NewEstimateStages(ec.Logger(), mID, ec.GetStringArg(argDMTConfig)) + stages, err := NewEstimateStages(ec.Logger(), mID, conf) if err != nil { return err } diff --git a/base/commands/migration/estimate_it_test.go b/base/commands/migration/estimate_it_test.go index b21da4fc4..5779197da 100644 --- a/base/commands/migration/estimate_it_test.go +++ b/base/commands/migration/estimate_it_test.go @@ -25,7 +25,7 @@ func TestEstimate(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - check.Must(tcx.CLC().Execute(ctx, "estimate", "testdata/dmt-config")) + check.Must(tcx.CLC().Execute(ctx, "estimate", "testdata/dmt_config")) }() c := make(chan string) go findEstimationID(ctx, tcx, c) diff --git a/base/commands/migration/migration_start.go b/base/commands/migration/migration_start.go index 843681998..499b1eae9 100644 --- a/base/commands/migration/migration_start.go +++ b/base/commands/migration/migration_start.go @@ -4,8 +4,10 @@ package migration import ( "context" + "fmt" "github.com/hazelcast/hazelcast-commandline-client/clc" + "github.com/hazelcast/hazelcast-commandline-client/clc/paths" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors" "github.com/hazelcast/hazelcast-commandline-client/internal/check" @@ -41,6 +43,10 @@ func (StartCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) { Selected data structures in the source cluster will be migrated to the target cluster. `) + conf := ec.GetStringArg(argDMTConfig) + if !paths.Exists(conf) { + return fmt.Errorf("migration config does not exist: %s", conf) + } if !ec.Props().GetBool(clc.FlagAutoYes) { p := prompt.New(ec.Stdin(), ec.Stdout()) yes, err := p.YesNo("Proceed?") @@ -59,7 +65,7 @@ Selected data structures in the source cluster will be migrated to the target cl err = finalizeErr } }() - sts, err := NewStartStages(ec.Logger(), mID, ec.GetStringArg(argDMTConfig)) + sts, err := NewStartStages(ec.Logger(), mID, conf) if err != nil { return err } diff --git a/base/commands/migration/start_stages_it_test.go b/base/commands/migration/start_stages_it_test.go index a1634de49..073341002 100644 --- a/base/commands/migration/start_stages_it_test.go +++ b/base/commands/migration/start_stages_it_test.go @@ -67,7 +67,7 @@ func startMigrationTest(t *testing.T, expectedErr error, statusMapStateFiles []s var execErr error go tcx.WithReset(func() { defer wg.Done() - execErr = tcx.CLC().Execute(ctx, "start", "testdata/dmt-config", "--yes", "-o", outDir) + execErr = tcx.CLC().Execute(ctx, "start", "testdata/dmt_config", "--yes", "-o", outDir) }) c := make(chan string) go findMigrationID(ctx, tcx, c)