From 0fd24c6ac538e861f3d1b5ba7ef86a9d8e8635d3 Mon Sep 17 00:00:00 2001 From: Kutluhan Metin Date: Tue, 7 Nov 2023 11:22:14 +0300 Subject: [PATCH] [CLC-313]: Add Estimate Command to DMT (#422) --- base/commands/migration/const.go | 1 + base/commands/migration/estimate.go | 56 ++++++ base/commands/migration/estimate_it_test.go | 56 ++++++ base/commands/migration/estimate_stages.go | 179 ++++++++++++++++++ base/commands/migration/migration_start.go | 8 +- base/commands/migration/start_stages.go | 15 -- .../migration/start_stages_it_test.go | 2 +- .../testdata/estimate/estimate_completed.json | 5 + base/commands/migration/utils.go | 15 +- 9 files changed, 318 insertions(+), 19 deletions(-) create mode 100644 base/commands/migration/estimate.go create mode 100644 base/commands/migration/estimate_it_test.go create mode 100644 base/commands/migration/estimate_stages.go create mode 100644 base/commands/migration/testdata/estimate/estimate_completed.json diff --git a/base/commands/migration/const.go b/base/commands/migration/const.go index 97d53ba03..d522f0c03 100644 --- a/base/commands/migration/const.go +++ b/base/commands/migration/const.go @@ -4,6 +4,7 @@ package migration const ( StartQueueName = "__datamigration_start_queue" + EstimateQueueName = "__datamigration_estimate_queue" StatusMapName = "__datamigration_migrations" UpdateTopicPrefix = "__datamigration_updates_" DebugLogsListPrefix = "__datamigration_debug_logs_" diff --git a/base/commands/migration/estimate.go b/base/commands/migration/estimate.go new file mode 100644 index 000000000..8dde2888b --- /dev/null +++ b/base/commands/migration/estimate.go @@ -0,0 +1,56 @@ +//go:build std || migration + +package migration + +import ( + "context" + "fmt" + + "github.com/hazelcast/hazelcast-commandline-client/clc/paths" + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" +) + +type EstimateCmd struct{} + +func (e EstimateCmd) Init(cc plug.InitContext) error { + cc.SetCommandUsage("estimate") + cc.SetCommandGroup("migration") + help := "Estimate migration" + cc.SetCommandHelp(help, help) + cc.AddStringArg(argDMTConfig, argTitleDMTConfig) + return nil +} + +func (e EstimateCmd) Exec(ctx context.Context, ec plug.ExecContext) error { + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary(fmt.Sprintf(`%s + +Estimation usually ends within 15 seconds.`, banner)) + conf := ec.GetStringArg(argDMTConfig) + if !paths.Exists(conf) { + return fmt.Errorf("migration config does not exist: %s", conf) + } + mID := MakeMigrationID() + stages, err := NewEstimateStages(ec.Logger(), mID, conf) + if err != nil { + return err + } + sp := stage.NewFixedProvider(stages.Build(ctx, ec)...) + res, err := stage.Execute(ctx, ec, any(nil), sp) + if err != nil { + return err + } + resArr := res.([]string) + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary(resArr[0]) + ec.PrintlnUnnecessary(resArr[1]) + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary("OK Estimation completed successfully.") + return nil +} + +func init() { + check.Must(plug.Registry.RegisterCommand("estimate", &EstimateCmd{})) +} diff --git a/base/commands/migration/estimate_it_test.go b/base/commands/migration/estimate_it_test.go new file mode 100644 index 000000000..5779197da --- /dev/null +++ b/base/commands/migration/estimate_it_test.go @@ -0,0 +1,56 @@ +//go:build std || migration + +package migration_test + +import ( + "context" + "encoding/json" + "os" + "sync" + "testing" + "time" + + "github.com/hazelcast/hazelcast-commandline-client/base/commands/migration" + "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/it" + "github.com/hazelcast/hazelcast-go-client/serialization" +) + +func TestEstimate(t *testing.T) { + tcx := it.TestContext{T: t} + ctx := context.Background() + tcx.Tester(func(tcx it.TestContext) { + createMapping(ctx, tcx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + check.Must(tcx.CLC().Execute(ctx, "estimate", "testdata/dmt_config")) + }() + c := make(chan string) + go findEstimationID(ctx, tcx, c) + mID := <-c + go estimateRunner(ctx, tcx, mID) + wg.Wait() + tcx.AssertStdoutContains("OK Estimation completed successfully") + }) +} + +func estimateRunner(ctx context.Context, tcx it.TestContext, migrationID string) { + statusMap := check.MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + b := check.MustValue(os.ReadFile("testdata/estimate/estimate_completed.json")) + check.Must(statusMap.Set(ctx, migrationID, serialization.JSON(b))) +} + +func findEstimationID(ctx context.Context, tcx it.TestContext, c chan string) { + q := check.MustValue(tcx.Client.GetQueue(ctx, migration.EstimateQueueName)) + var b migration.ConfigBundle + for { + v := check.MustValue(q.PollWithTimeout(ctx, 100*time.Millisecond)) + if v != nil { + check.Must(json.Unmarshal(v.(serialization.JSON), &b)) + c <- b.MigrationID + break + } + } +} diff --git a/base/commands/migration/estimate_stages.go b/base/commands/migration/estimate_stages.go new file mode 100644 index 000000000..2c62a59f4 --- /dev/null +++ b/base/commands/migration/estimate_stages.go @@ -0,0 +1,179 @@ +//go:build std || migration + +package migration + +import ( + "context" + "errors" + "fmt" + "strconv" + "time" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/log" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" + "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" +) + +type EstimateStages struct { + migrationID string + configDir string + ci *hazelcast.ClientInternal + estimateQueue *hazelcast.Queue + statusMap *hazelcast.Map + logger log.Logger +} + +func NewEstimateStages(logger log.Logger, migrationID, configDir string) (*EstimateStages, error) { + if migrationID == "" { + return nil, errors.New("migrationID is required") + } + return &EstimateStages{ + migrationID: migrationID, + configDir: configDir, + logger: logger, + }, nil +} + +func (es *EstimateStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage[any] { + return []stage.Stage[any]{ + { + ProgressMsg: "Connecting to the migration cluster", + SuccessMsg: "Connected to the migration cluster", + FailureMsg: "Could not connect to the migration cluster", + Func: es.connectStage(ec), + }, + { + ProgressMsg: "Estimating the migration", + SuccessMsg: "Estimated the migration", + FailureMsg: "Could not estimate the migration", + Func: es.estimateStage(), + }, + } +} + +func (es *EstimateStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, s stage.Statuser[any]) (any, error) { + var err error + es.ci, err = ec.ClientInternal(ctx) + if err != nil { + return nil, err + } + es.estimateQueue, err = es.ci.Client().GetQueue(ctx, EstimateQueueName) + if err != nil { + return nil, fmt.Errorf("retrieving the estimate Queue: %w", err) + } + es.statusMap, err = es.ci.Client().GetMap(ctx, StatusMapName) + if err != nil { + return nil, fmt.Errorf("retrieving the status Map: %w", err) + } + return nil, nil + } +} + +func (es *EstimateStages) estimateStage() func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { + cb, err := makeConfigBundle(es.configDir, es.migrationID) + if err != nil { + return nil, fmt.Errorf("making configuration bundle: %w", err) + } + if err = es.estimateQueue.Put(ctx, cb); err != nil { + return nil, fmt.Errorf("updating estimate Queue: %w", err) + } + childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err = waitForEstimationToComplete(childCtx, es.ci, es.migrationID, status); err != nil { + return nil, fmt.Errorf("waiting for estimation to complete: %w", err) + } + return fetchEstimationResults(ctx, es.ci, es.migrationID) + } +} + +func waitForEstimationToComplete(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string, stage stage.Statuser[any]) error { + duration := 16 * time.Second + interval := 1 * time.Second + for { + if duration > 0 { + stage.SetRemainingDuration(duration) + } else { + stage.SetText("Estimation took longer than expected.") + } + status, err := fetchMigrationStatus(ctx, ci, migrationID) + if err != nil { + if errors.Is(err, migrationStatusNotFoundErr) { + // migration status will not be available for a while, so we should wait for it + continue + } + return err + } + if Status(status) == StatusFailed { + errs, err := fetchMigrationErrors(ctx, ci, migrationID) + if err != nil { + return fmt.Errorf("estimation failed and dmt cannot fetch estimation errors: %w", err) + } + return errors.New(errs) + } + if Status(status) == StatusComplete { + return nil + } + time.Sleep(interval) + duration = duration - interval + } +} + +func fetchEstimationResults(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) ([]string, error) { + q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.estimatedTime'), JSON_QUERY(this, '$.estimatedSize') FROM %s WHERE __key='%s'`, StatusMapName, migrationID) + res, err := ci.Client().SQL().Execute(ctx, q) + if err != nil { + return nil, err + } + it, err := res.Iterator() + if err != nil { + return nil, err + } + if it.HasNext() { + // single iteration is enough that we are reading single result for a single migration + row, err := it.Next() + if err != nil { + return nil, err + } + estimatedTime, err := row.Get(0) + if err != nil { + return nil, err + } + estimatedSize, err := row.Get(1) + if err != nil { + return nil, err + } + et, err := msToSecs(estimatedTime.(serialization.JSON).String()) + if err != nil { + return nil, err + } + es, err := bytesToMegabytes(estimatedSize.(serialization.JSON).String()) + if err != nil { + return nil, err + } + return []string{fmt.Sprintf("Estimated Size: %s ", es), fmt.Sprintf("Estimated Time: %s", et)}, nil + } + return nil, errors.New("no rows found") +} + +func bytesToMegabytes(bytesStr string) (string, error) { + bytes, err := strconv.ParseFloat(bytesStr, 64) + if err != nil { + return "", err + } + mb := bytes / (1024.0 * 1024.0) + return fmt.Sprintf("%.2f MBs", mb), nil +} + +func msToSecs(ms string) (string, error) { + milliseconds, err := strconv.ParseInt(ms, 10, 64) + if err != nil { + return "", err + } + seconds := float64(milliseconds) / 1000.0 + secondsStr := fmt.Sprintf("%.1f sec", seconds) + return secondsStr, nil +} diff --git a/base/commands/migration/migration_start.go b/base/commands/migration/migration_start.go index fdb7cfe52..66b034b40 100644 --- a/base/commands/migration/migration_start.go +++ b/base/commands/migration/migration_start.go @@ -4,8 +4,10 @@ package migration import ( "context" + "fmt" "github.com/hazelcast/hazelcast-commandline-client/clc" + "github.com/hazelcast/hazelcast-commandline-client/clc/paths" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors" "github.com/hazelcast/hazelcast-commandline-client/internal/check" @@ -41,6 +43,10 @@ func (StartCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) { Selected data structures in the source cluster will be migrated to the target cluster. `) + conf := ec.GetStringArg(argDMTConfig) + if !paths.Exists(conf) { + return fmt.Errorf("migration config does not exist: %s", conf) + } if !ec.Props().GetBool(clc.FlagAutoYes) { p := prompt.New(ec.Stdin(), ec.Stdout()) yes, err := p.YesNo("Proceed?") @@ -60,7 +66,7 @@ Selected data structures in the source cluster will be migrated to the target cl err = finalizeErr } }() - sts, err := NewStartStages(ec.Logger(), mID, ec.GetStringArg(argDMTConfig)) + sts, err := NewStartStages(ec.Logger(), mID, conf) if err != nil { return err } diff --git a/base/commands/migration/start_stages.go b/base/commands/migration/start_stages.go index d2d1842ad..10de56af1 100644 --- a/base/commands/migration/start_stages.go +++ b/base/commands/migration/start_stages.go @@ -4,7 +4,6 @@ package migration import ( "context" - "encoding/json" "errors" "fmt" @@ -12,7 +11,6 @@ import ( "github.com/hazelcast/hazelcast-commandline-client/internal/log" "github.com/hazelcast/hazelcast-commandline-client/internal/plug" "github.com/hazelcast/hazelcast-go-client" - "github.com/hazelcast/hazelcast-go-client/serialization" ) type StartStages struct { @@ -83,16 +81,3 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) ( return nil, nil } } - -func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error) { - var cb ConfigBundle - cb.MigrationID = migrationID - if err := cb.Walk(configDir); err != nil { - return nil, err - } - b, err := json.Marshal(cb) - if err != nil { - return nil, err - } - return b, nil -} diff --git a/base/commands/migration/start_stages_it_test.go b/base/commands/migration/start_stages_it_test.go index d3b6ac7c7..073341002 100644 --- a/base/commands/migration/start_stages_it_test.go +++ b/base/commands/migration/start_stages_it_test.go @@ -67,7 +67,7 @@ func startMigrationTest(t *testing.T, expectedErr error, statusMapStateFiles []s var execErr error go tcx.WithReset(func() { defer wg.Done() - execErr = tcx.CLC().Execute(ctx, "start", "dmt-config", "--yes", "-o", outDir) + execErr = tcx.CLC().Execute(ctx, "start", "testdata/dmt_config", "--yes", "-o", outDir) }) c := make(chan string) go findMigrationID(ctx, tcx, c) diff --git a/base/commands/migration/testdata/estimate/estimate_completed.json b/base/commands/migration/testdata/estimate/estimate_completed.json new file mode 100644 index 000000000..e6ad6081f --- /dev/null +++ b/base/commands/migration/testdata/estimate/estimate_completed.json @@ -0,0 +1,5 @@ +{ + "status": "COMPLETED", + "estimatedTime": 1, + "estimatedSize": 1 +} \ No newline at end of file diff --git a/base/commands/migration/utils.go b/base/commands/migration/utils.go index 5af0aa0ed..06b36610f 100644 --- a/base/commands/migration/utils.go +++ b/base/commands/migration/utils.go @@ -5,6 +5,7 @@ package migration import ( "bufio" "encoding/base64" + "encoding/json" "fmt" "io/fs" "os" @@ -12,6 +13,7 @@ import ( "sort" "strings" + "github.com/hazelcast/hazelcast-go-client/serialization" "github.com/hazelcast/hazelcast-go-client/types" "github.com/hazelcast/hazelcast-commandline-client/clc/paths" @@ -125,6 +127,15 @@ func MakeMigrationID() string { return types.NewUUID().String() } -func MakeUpdateTopicName(migrationID string) string { - return UpdateTopicPrefix + migrationID +func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error) { + var cb ConfigBundle + cb.MigrationID = migrationID + if err := cb.Walk(configDir); err != nil { + return nil, err + } + b, err := json.Marshal(cb) + if err != nil { + return nil, err + } + return b, nil }