diff --git a/base/commands/migration/migration_stages.go b/base/commands/migration/migration_stages.go index 9d176e6f..03615e42 100644 --- a/base/commands/migration/migration_stages.go +++ b/base/commands/migration/migration_stages.go @@ -86,7 +86,7 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca status.SetText("Unable to calculate remaining duration and progress") } else { status.SetText(fmt.Sprintf(progressMsg, d.Type, d.Name)) - status.SetProgress(cp) + status.SetProgress(cp / 100.0) status.SetRemainingDuration(rt) } } @@ -200,6 +200,9 @@ func saveReportToFile(ctx context.Context, ci *hazelcast.ClientInternal, migrati func WaitForMigrationToBeInProgress(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error { for { + if ctx.Err() != nil { + return ctx.Err() + } status, err := fetchMigrationStatus(ctx, ci, migrationID) if err != nil { if errors.Is(err, migrationStatusNotFoundErr) { diff --git a/base/commands/migration/migration_start.go b/base/commands/migration/migration_start.go index 9e307fe7..b714641e 100644 --- a/base/commands/migration/migration_start.go +++ b/base/commands/migration/migration_start.go @@ -35,10 +35,6 @@ func (StartCmd) Init(cc plug.InitContext) error { func (StartCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) { cmd.SetCancelMsg(" (Ctrl+C to exit) ") - ci, err := ec.ClientInternal(ctx) - if err != nil { - return err - } ec.PrintlnUnnecessary("") ec.PrintlnUnnecessary(`Hazelcast Data Migration Tool v5.3.0 (c) 2023 Hazelcast, Inc. @@ -64,22 +60,22 @@ In order to cancel the migration, use the 'cancel' command. } ec.PrintlnUnnecessary("") mID := MigrationIDGeneratorFunc() + sts, err := NewStartStages(ec.Logger(), mID, conf) + if err != nil { + return err + } defer func() { - maybePrintWarnings(ctx, ec, ci, mID) - finalizeErr := finalizeMigration(ctx, ec, ci, mID, ec.Props().GetString(flagOutputDir)) + maybePrintWarnings(ctx, ec, sts.ci, mID) + finalizeErr := finalizeMigration(ctx, ec, sts.ci, mID, ec.Props().GetString(flagOutputDir)) if err == nil { err = finalizeErr } }() - sts, err := NewStartStages(ec.Logger(), mID, conf) - if err != nil { - return err - } sp := stage.NewFixedProvider(sts.Build(ctx, ec)...) if _, err = stage.Execute(ctx, ec, any(nil), sp); err != nil { return err } - mStages, err := createMigrationStages(ctx, ec, ci, mID) + mStages, err := createMigrationStages(ctx, ec, sts.ci, mID) if err != nil { return err } diff --git a/base/commands/migration/migration_status.go b/base/commands/migration/migration_status.go index 40cf9e71..e1e092b9 100644 --- a/base/commands/migration/migration_status.go +++ b/base/commands/migration/migration_status.go @@ -17,17 +17,13 @@ func (s StatusCmd) Unwrappable() {} func (s StatusCmd) Init(cc plug.InitContext) error { cc.SetCommandUsage("status") cc.SetCommandGroup("migration") - help := "Get status of the data migration in progress" + help := "Get status of the data migration/estimation in progress" cc.AddStringFlag(flagOutputDir, "o", "", false, "output directory for the migration report, if not given current directory is used") cc.SetCommandHelp(help, help) return nil } func (s StatusCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) { - ci, err := ec.ClientInternal(ctx) - if err != nil { - return err - } ec.PrintlnUnnecessary("") ec.PrintlnUnnecessary(banner) sts := NewStatusStages() @@ -37,13 +33,13 @@ func (s StatusCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) { return err } defer func() { - maybePrintWarnings(ctx, ec, ci, mID.(string)) - finalizeErr := finalizeMigration(ctx, ec, ci, mID.(string), ec.Props().GetString(flagOutputDir)) + maybePrintWarnings(ctx, ec, sts.ci, mID.(string)) + finalizeErr := finalizeMigration(ctx, ec, sts.ci, mID.(string), ec.Props().GetString(flagOutputDir)) if err == nil { err = finalizeErr } }() - mStages, err := createMigrationStages(ctx, ec, ci, mID.(string)) + mStages, err := createMigrationStages(ctx, ec, sts.ci, mID.(string)) if err != nil { return err } diff --git a/base/commands/migration/start_stages.go b/base/commands/migration/start_stages.go index 4b8179bf..87ed5f67 100644 --- a/base/commands/migration/start_stages.go +++ b/base/commands/migration/start_stages.go @@ -80,7 +80,7 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) ( if err = st.startQueue.Put(ctx, cb); err != nil { return nil, fmt.Errorf("updating start Queue: %w", err) } - childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + childCtx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() if err = waitForMigrationToStart(childCtx, st.ci, st.migrationID); err != nil { return nil, err @@ -91,7 +91,7 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) ( func (st *StartStages) preCheckStage() func(context.Context, stage.Statuser[any]) (any, error) { return func(ctx context.Context, status stage.Statuser[any]) (any, error) { - childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + childCtx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() if err := WaitForMigrationToBeInProgress(childCtx, st.ci, st.migrationID); err != nil { return nil, fmt.Errorf("waiting for prechecks to complete: %w", err) @@ -102,6 +102,9 @@ func (st *StartStages) preCheckStage() func(context.Context, stage.Statuser[any] func waitForMigrationToStart(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error { for { + if ctx.Err() != nil { + return ctx.Err() + } status, err := fetchMigrationStatus(ctx, ci, migrationID) if err != nil { if errors.Is(err, migrationStatusNotFoundErr) { diff --git a/base/commands/migration/status_stages.go b/base/commands/migration/status_stages.go index a8f7e3f5..252608f8 100644 --- a/base/commands/migration/status_stages.go +++ b/base/commands/migration/status_stages.go @@ -27,9 +27,9 @@ func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage. Func: st.connectStage(ec), }, { - ProgressMsg: "Finding migration in progress", - SuccessMsg: "Found migration in progress", - FailureMsg: "Could not find a migration in progress", + ProgressMsg: "Finding migration/estimation in progress", + SuccessMsg: "Found migration/estimation in progress", + FailureMsg: "Could not find a migration/estimation in progress", Func: st.findMigrationInProgress(ec), }, } diff --git a/base/commands/migration/status_stages_it_test.go b/base/commands/migration/status_stages_it_test.go index 38f1aae5..f2375d6e 100644 --- a/base/commands/migration/status_stages_it_test.go +++ b/base/commands/migration/status_stages_it_test.go @@ -66,7 +66,7 @@ func statusTest(t *testing.T) { defer wg.Done() Must(tcx.CLC().Execute(ctx, "status", "-o", outDir)) }() - tcx.AssertStdoutContains("Found migration in progress") + tcx.AssertStdoutContains("Found migration/estimation in progress") setStatusCompleted(mID, tcx, ctx) wg.Wait() tcx.WithReset(func() {