From 16da1df0652f3cffb779ab5990ab1a59ed176389 Mon Sep 17 00:00:00 2001 From: Kutluhan Metin Date: Tue, 14 Nov 2023 14:58:53 +0300 Subject: [PATCH 1/4] fix hanging problem when source and target clusters is not running (#428) --- base/commands/migration/migration_stages.go | 3 +++ base/commands/migration/start_stages.go | 7 +++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/base/commands/migration/migration_stages.go b/base/commands/migration/migration_stages.go index 9d176e6f..a5a191e2 100644 --- a/base/commands/migration/migration_stages.go +++ b/base/commands/migration/migration_stages.go @@ -200,6 +200,9 @@ func saveReportToFile(ctx context.Context, ci *hazelcast.ClientInternal, migrati func WaitForMigrationToBeInProgress(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error { for { + if ctx.Err() != nil { + return ctx.Err() + } status, err := fetchMigrationStatus(ctx, ci, migrationID) if err != nil { if errors.Is(err, migrationStatusNotFoundErr) { diff --git a/base/commands/migration/start_stages.go b/base/commands/migration/start_stages.go index 4b8179bf..87ed5f67 100644 --- a/base/commands/migration/start_stages.go +++ b/base/commands/migration/start_stages.go @@ -80,7 +80,7 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) ( if err = st.startQueue.Put(ctx, cb); err != nil { return nil, fmt.Errorf("updating start Queue: %w", err) } - childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + childCtx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() if err = waitForMigrationToStart(childCtx, st.ci, st.migrationID); err != nil { return nil, err @@ -91,7 +91,7 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) ( func (st *StartStages) preCheckStage() func(context.Context, stage.Statuser[any]) (any, error) { return func(ctx context.Context, status stage.Statuser[any]) (any, error) { - childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + childCtx, cancel := context.WithTimeout(ctx, time.Minute) defer cancel() if err := WaitForMigrationToBeInProgress(childCtx, st.ci, st.migrationID); err != nil { return nil, fmt.Errorf("waiting for prechecks to complete: %w", err) @@ -102,6 +102,9 @@ func (st *StartStages) preCheckStage() func(context.Context, stage.Statuser[any] func waitForMigrationToStart(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error { for { + if ctx.Err() != nil { + return ctx.Err() + } status, err := fetchMigrationStatus(ctx, ci, migrationID) if err != nil { if errors.Is(err, migrationStatusNotFoundErr) { From 2973d5b38e2887ec7d42d06ebeb87c91229d8ba5 Mon Sep 17 00:00:00 2001 From: Kutluhan Metin Date: Tue, 14 Nov 2023 15:00:51 +0300 Subject: [PATCH 2/4] divide status percentage by 100 (#433) --- base/commands/migration/migration_stages.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/base/commands/migration/migration_stages.go b/base/commands/migration/migration_stages.go index a5a191e2..03615e42 100644 --- a/base/commands/migration/migration_stages.go +++ b/base/commands/migration/migration_stages.go @@ -86,7 +86,7 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca status.SetText("Unable to calculate remaining duration and progress") } else { status.SetText(fmt.Sprintf(progressMsg, d.Type, d.Name)) - status.SetProgress(cp) + status.SetProgress(cp / 100.0) status.SetRemainingDuration(rt) } } From 404e09a815d1eca046149110c28bfdb067c40c8a Mon Sep 17 00:00:00 2001 From: Kutluhan Metin Date: Tue, 14 Nov 2023 15:21:05 +0300 Subject: [PATCH 3/4] fix hanging problem without any output (#429) --- base/commands/migration/migration_start.go | 18 +++++++----------- base/commands/migration/migration_status.go | 10 +++------- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/base/commands/migration/migration_start.go b/base/commands/migration/migration_start.go index c11ad5a0..60c3043f 100644 --- a/base/commands/migration/migration_start.go +++ b/base/commands/migration/migration_start.go @@ -33,10 +33,6 @@ func (StartCmd) Init(cc plug.InitContext) error { } func (StartCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) { - ci, err := ec.ClientInternal(ctx) - if err != nil { - return err - } ec.PrintlnUnnecessary("") ec.PrintlnUnnecessary(`Hazelcast Data Migration Tool v5.3.0 (c) 2023 Hazelcast, Inc. @@ -59,22 +55,22 @@ Selected data structures in the source cluster will be migrated to the target cl } ec.PrintlnUnnecessary("") mID := MigrationIDGeneratorFunc() + sts, err := NewStartStages(ec.Logger(), mID, conf) + if err != nil { + return err + } defer func() { - maybePrintWarnings(ctx, ec, ci, mID) - finalizeErr := finalizeMigration(ctx, ec, ci, mID, ec.Props().GetString(flagOutputDir)) + maybePrintWarnings(ctx, ec, sts.ci, mID) + finalizeErr := finalizeMigration(ctx, ec, sts.ci, mID, ec.Props().GetString(flagOutputDir)) if err == nil { err = finalizeErr } }() - sts, err := NewStartStages(ec.Logger(), mID, conf) - if err != nil { - return err - } sp := stage.NewFixedProvider(sts.Build(ctx, ec)...) if _, err = stage.Execute(ctx, ec, any(nil), sp); err != nil { return err } - mStages, err := createMigrationStages(ctx, ec, ci, mID) + mStages, err := createMigrationStages(ctx, ec, sts.ci, mID) if err != nil { return err } diff --git a/base/commands/migration/migration_status.go b/base/commands/migration/migration_status.go index 40cf9e71..5d9b75df 100644 --- a/base/commands/migration/migration_status.go +++ b/base/commands/migration/migration_status.go @@ -24,10 +24,6 @@ func (s StatusCmd) Init(cc plug.InitContext) error { } func (s StatusCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) { - ci, err := ec.ClientInternal(ctx) - if err != nil { - return err - } ec.PrintlnUnnecessary("") ec.PrintlnUnnecessary(banner) sts := NewStatusStages() @@ -37,13 +33,13 @@ func (s StatusCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) { return err } defer func() { - maybePrintWarnings(ctx, ec, ci, mID.(string)) - finalizeErr := finalizeMigration(ctx, ec, ci, mID.(string), ec.Props().GetString(flagOutputDir)) + maybePrintWarnings(ctx, ec, sts.ci, mID.(string)) + finalizeErr := finalizeMigration(ctx, ec, sts.ci, mID.(string), ec.Props().GetString(flagOutputDir)) if err == nil { err = finalizeErr } }() - mStages, err := createMigrationStages(ctx, ec, ci, mID.(string)) + mStages, err := createMigrationStages(ctx, ec, sts.ci, mID.(string)) if err != nil { return err } From 1af0ff216e6045bbe47272b8083f8d3eeac6c03c Mon Sep 17 00:00:00 2001 From: Kutluhan Metin Date: Tue, 14 Nov 2023 15:21:31 +0300 Subject: [PATCH 4/4] [CLC-434]: migration -> migration/estimation (#432) --- base/commands/migration/migration_status.go | 2 +- base/commands/migration/status_stages.go | 6 +++--- base/commands/migration/status_stages_it_test.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/base/commands/migration/migration_status.go b/base/commands/migration/migration_status.go index 5d9b75df..e1e092b9 100644 --- a/base/commands/migration/migration_status.go +++ b/base/commands/migration/migration_status.go @@ -17,7 +17,7 @@ func (s StatusCmd) Unwrappable() {} func (s StatusCmd) Init(cc plug.InitContext) error { cc.SetCommandUsage("status") cc.SetCommandGroup("migration") - help := "Get status of the data migration in progress" + help := "Get status of the data migration/estimation in progress" cc.AddStringFlag(flagOutputDir, "o", "", false, "output directory for the migration report, if not given current directory is used") cc.SetCommandHelp(help, help) return nil diff --git a/base/commands/migration/status_stages.go b/base/commands/migration/status_stages.go index a8f7e3f5..252608f8 100644 --- a/base/commands/migration/status_stages.go +++ b/base/commands/migration/status_stages.go @@ -27,9 +27,9 @@ func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage. Func: st.connectStage(ec), }, { - ProgressMsg: "Finding migration in progress", - SuccessMsg: "Found migration in progress", - FailureMsg: "Could not find a migration in progress", + ProgressMsg: "Finding migration/estimation in progress", + SuccessMsg: "Found migration/estimation in progress", + FailureMsg: "Could not find a migration/estimation in progress", Func: st.findMigrationInProgress(ec), }, } diff --git a/base/commands/migration/status_stages_it_test.go b/base/commands/migration/status_stages_it_test.go index 38f1aae5..f2375d6e 100644 --- a/base/commands/migration/status_stages_it_test.go +++ b/base/commands/migration/status_stages_it_test.go @@ -66,7 +66,7 @@ func statusTest(t *testing.T) { defer wg.Done() Must(tcx.CLC().Execute(ctx, "status", "-o", outDir)) }() - tcx.AssertStdoutContains("Found migration in progress") + tcx.AssertStdoutContains("Found migration/estimation in progress") setStatusCompleted(mID, tcx, ctx) wg.Wait() tcx.WithReset(func() {