diff --git a/base/commands/migration/const.go b/base/commands/migration/const.go index 97d53ba03..d522f0c03 100644 --- a/base/commands/migration/const.go +++ b/base/commands/migration/const.go @@ -4,6 +4,7 @@ package migration const ( StartQueueName = "__datamigration_start_queue" + EstimateQueueName = "__datamigration_estimate_queue" StatusMapName = "__datamigration_migrations" UpdateTopicPrefix = "__datamigration_updates_" DebugLogsListPrefix = "__datamigration_debug_logs_" diff --git a/base/commands/migration/estimate.go b/base/commands/migration/estimate.go new file mode 100644 index 000000000..72e2d9323 --- /dev/null +++ b/base/commands/migration/estimate.go @@ -0,0 +1,53 @@ +//go:build std || migration + +package migration + +import ( + "context" + "fmt" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" +) + +type EstimateCmd struct{} + +func (e EstimateCmd) Init(cc plug.InitContext) error { + cc.SetCommandUsage("estimate") + cc.SetCommandGroup("migration") + help := "Estimate migration" + cc.SetCommandHelp(help, help) + cc.AddStringArg(argDMTConfig, argTitleDMTConfig) + return nil +} + +func (e EstimateCmd) Exec(ctx context.Context, ec plug.ExecContext) error { + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary(`Hazelcast Data Migration Tool v5.3.0 +(c) 2023 Hazelcast, Inc. + +Estimation usually ends within 15 seconds. +`) + mID := MakeMigrationID() + stages, err := NewEstimateStages(ec.Logger(), mID, ec.GetStringArg(argDMTConfig)) + if err != nil { + return err + } + sp := stage.NewFixedProvider(stages.Build(ctx, ec)...) + res, err := stage.Execute(ctx, ec, any(nil), sp) + if err != nil { + return err + } + resArr := res.([]string) + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary(fmt.Sprintf("OK %s", resArr[0])) + ec.PrintlnUnnecessary(fmt.Sprintf("OK %s", resArr[1])) + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary("OK Estimation completed successfully.") + return nil +} + +func init() { + check.Must(plug.Registry.RegisterCommand("estimate", &EstimateCmd{})) +} diff --git a/base/commands/migration/estimate_it_test.go b/base/commands/migration/estimate_it_test.go new file mode 100644 index 000000000..753000bce --- /dev/null +++ b/base/commands/migration/estimate_it_test.go @@ -0,0 +1 @@ +package migration diff --git a/base/commands/migration/estimate_stages.go b/base/commands/migration/estimate_stages.go new file mode 100644 index 000000000..1205155e6 --- /dev/null +++ b/base/commands/migration/estimate_stages.go @@ -0,0 +1,158 @@ +//go:build std || migration + +package migration + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/log" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" + "github.com/hazelcast/hazelcast-commandline-client/internal/str" + "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" +) + +type EstimateStages struct { + migrationID string + configDir string + ci *hazelcast.ClientInternal + estimateQueue *hazelcast.Queue + statusMap *hazelcast.Map + logger log.Logger +} + +func NewEstimateStages(logger log.Logger, migrationID, configDir string) (*EstimateStages, error) { + if migrationID == "" { + return nil, errors.New("migrationID is required") + } + return &EstimateStages{ + migrationID: migrationID, + configDir: configDir, + logger: logger, + }, nil +} + +func (es *EstimateStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage[any] { + return []stage.Stage[any]{ + { + ProgressMsg: "Connecting to the migration cluster", + SuccessMsg: "Connected to the migration cluster", + FailureMsg: "Could not connect to the migration cluster", + Func: es.connectStage(ec), + }, + { + ProgressMsg: "Estimating the migration", + SuccessMsg: "Estimated the migration", + FailureMsg: "Could not estimate the migration", + Func: es.estimateStage(), + }, + } +} + +func (es *EstimateStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, s stage.Statuser[any]) (any, error) { + var err error + es.ci, err = ec.ClientInternal(ctx) + if err != nil { + return nil, err + } + es.estimateQueue, err = es.ci.Client().GetQueue(ctx, EstimateQueueName) + if err != nil { + return nil, fmt.Errorf("retrieving the estimate Queue: %w", err) + } + es.statusMap, err = es.ci.Client().GetMap(ctx, StatusMapName) + if err != nil { + return nil, fmt.Errorf("retrieving the status Map: %w", err) + } + return nil, nil + } +} + +func (es *EstimateStages) estimateStage() func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { + cb, err := makeConfigBundle(es.configDir, es.migrationID) + if err != nil { + return nil, fmt.Errorf("making configuration bundle: %w", err) + } + if err = es.estimateQueue.Put(ctx, cb); err != nil { + return nil, fmt.Errorf("updating estimate Queue: %w", err) + } + if err = waitForEstimationToComplete(ctx, es.ci, es.migrationID, status); err != nil { + return nil, fmt.Errorf("waiting for estimation to complete: %w", err) + } + return fetchEstimationResults(ctx, es.ci, es.migrationID) + } +} + +func waitForEstimationToComplete(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string, stage stage.Statuser[any]) error { + duration := 16 * time.Second + interval := 1 * time.Second + for { + if duration > 0 { + stage.SetRemainingDuration(duration) + } else { + stage.SetText("It will take a bit longer than expected.") + } + status, err := fetchMigrationStatus(ctx, ci, migrationID) + if err != nil { + if errors.Is(err, migrationStatusNotFoundErr) { + // migration status will not be available for a while, so we should wait for it + continue + } + return err + } + if Status(status) == StatusFailed { + errs, err := fetchMigrationErrors(ctx, ci, migrationID) + if err != nil { + return fmt.Errorf("estimation failed and dmt cannot fetch estimation errors: %w", err) + } + return errors.New(errs) + } + if Status(status) == StatusComplete { + return nil + } + time.Sleep(interval) + duration = duration - interval + } +} + +func fetchEstimationResults(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) ([]string, error) { + q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.estimatedTime'), JSON_QUERY(this, '$.estimatedSize') FROM %s WHERE __key='%s'`, StatusMapName, migrationID) + res, err := ci.Client().SQL().Execute(ctx, q) + if err != nil { + return nil, err + } + it, err := res.Iterator() + if err != nil { + return nil, err + } + if it.HasNext() { + // single iteration is enough that we are reading single result for a single migration + row, err := it.Next() + if err != nil { + return nil, err + } + estimatedTime, err := row.Get(0) + if err != nil { + return nil, err + } + estimatedSize, err := row.Get(1) + if err != nil { + return nil, err + } + et, err := str.MsToSecs(estimatedTime.(serialization.JSON).String()) + if err != nil { + return nil, err + } + es, err := str.BytesToMegabytes(estimatedSize.(serialization.JSON).String()) + if err != nil { + return nil, err + } + return []string{fmt.Sprintf("Estimated Size: %s ", es), fmt.Sprintf("Estimated Time: %s", et)}, nil + } + return nil, errors.New("no rows found") +} diff --git a/base/commands/migration/start_stages.go b/base/commands/migration/start_stages.go index d2d1842ad..10de56af1 100644 --- a/base/commands/migration/start_stages.go +++ b/base/commands/migration/start_stages.go @@ -4,7 +4,6 @@ package migration import ( "context" - "encoding/json" "errors" "fmt" @@ -12,7 +11,6 @@ import ( "github.com/hazelcast/hazelcast-commandline-client/internal/log" "github.com/hazelcast/hazelcast-commandline-client/internal/plug" "github.com/hazelcast/hazelcast-go-client" - "github.com/hazelcast/hazelcast-go-client/serialization" ) type StartStages struct { @@ -83,16 +81,3 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) ( return nil, nil } } - -func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error) { - var cb ConfigBundle - cb.MigrationID = migrationID - if err := cb.Walk(configDir); err != nil { - return nil, err - } - b, err := json.Marshal(cb) - if err != nil { - return nil, err - } - return b, nil -} diff --git a/base/commands/migration/utils.go b/base/commands/migration/utils.go index 5af0aa0ed..06b36610f 100644 --- a/base/commands/migration/utils.go +++ b/base/commands/migration/utils.go @@ -5,6 +5,7 @@ package migration import ( "bufio" "encoding/base64" + "encoding/json" "fmt" "io/fs" "os" @@ -12,6 +13,7 @@ import ( "sort" "strings" + "github.com/hazelcast/hazelcast-go-client/serialization" "github.com/hazelcast/hazelcast-go-client/types" "github.com/hazelcast/hazelcast-commandline-client/clc/paths" @@ -125,6 +127,15 @@ func MakeMigrationID() string { return types.NewUUID().String() } -func MakeUpdateTopicName(migrationID string) string { - return UpdateTopicPrefix + migrationID +func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error) { + var cb ConfigBundle + cb.MigrationID = migrationID + if err := cb.Walk(configDir); err != nil { + return nil, err + } + b, err := json.Marshal(cb) + if err != nil { + return nil, err + } + return b, nil } diff --git a/internal/str/str.go b/internal/str/str.go index 4dd34e761..7daf555de 100644 --- a/internal/str/str.go +++ b/internal/str/str.go @@ -59,3 +59,22 @@ func Colorize(text string) string { } return text } + +func BytesToMegabytes(bytesStr string) (string, error) { + bytes, err := strconv.ParseFloat(bytesStr, 64) + if err != nil { + return "", err + } + mb := bytes / (1024.0 * 1024.0) + return fmt.Sprintf("%.2f MBs", mb), nil +} + +func MsToSecs(ms string) (string, error) { + milliseconds, err := strconv.ParseInt(ms, 10, 64) + if err != nil { + return "", err + } + seconds := float64(milliseconds) / 1000.0 + secondsStr := fmt.Sprintf("%.1f sec", seconds) + return secondsStr, nil +}