diff --git a/base/commands/migration/cancel_it_test.go b/base/commands/migration/cancel_it_test.go index f06ffb52..48fae9e8 100644 --- a/base/commands/migration/cancel_it_test.go +++ b/base/commands/migration/cancel_it_test.go @@ -67,6 +67,6 @@ func cancelTest(t *testing.T) { func setStatusCancelling(migrationID string, tcx it.TestContext, ctx context.Context) { statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) - b := MustValue(os.ReadFile("testdata/cancel/migration_cancelling.json")) + b := MustValue(os.ReadFile("testdata/stages/migration_cancelling.json")) Must(statusMap.Set(ctx, migrationID, serialization.JSON(b))) } diff --git a/base/commands/migration/migration_stages.go b/base/commands/migration/migration_stages.go index 7d699de4..06d11923 100644 --- a/base/commands/migration/migration_stages.go +++ b/base/commands/migration/migration_stages.go @@ -27,11 +27,6 @@ var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout w var migrationStatusNotFoundErr = fmt.Errorf("migration status not found") func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) ([]stage.Stage[any], error) { - childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - if err := WaitForMigrationToBeInProgress(childCtx, ci, migrationID); err != nil { - return nil, fmt.Errorf("waiting migration to be created: %w", err) - } var stages []stage.Stage[any] dss, err := getDataStructuresToBeMigrated(ctx, ec, migrationID) if err != nil { diff --git a/base/commands/migration/start_stages.go b/base/commands/migration/start_stages.go index 970d0488..b13e3dfa 100644 --- a/base/commands/migration/start_stages.go +++ b/base/commands/migration/start_stages.go @@ -7,6 +7,7 @@ import ( "encoding/json" "errors" "fmt" + "time" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" "github.com/hazelcast/hazelcast-commandline-client/internal/log" @@ -48,6 +49,12 @@ func (st *StartStages) Build(ctx context.Context, ec plug.ExecContext) []stage.S FailureMsg: "Could not start the migration", Func: st.startStage(), }, + { + ProgressMsg: "Doing pre-checks", + SuccessMsg: "Pre-checks complete", + FailureMsg: "Could not complete pre-checks", + Func: st.preCheckStage(), + }, } } @@ -75,10 +82,49 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) ( if err = st.startQueue.Put(ctx, cb); err != nil { return nil, fmt.Errorf("updating start Queue: %w", err) } + childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err = waitForMigrationToStart(childCtx, st.ci, st.migrationID); err != nil { + return nil, err + } return nil, nil } } +func (st *StartStages) preCheckStage() func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { + childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err := WaitForMigrationToBeInProgress(childCtx, st.ci, st.migrationID); err != nil { + return nil, fmt.Errorf("waiting for prechecks to complete: %w", err) + } + return nil, nil + } +} + +func waitForMigrationToStart(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error { + for { + status, err := fetchMigrationStatus(ctx, ci, migrationID) + if err != nil { + if errors.Is(err, migrationStatusNotFoundErr) { + // migration status will not be available for a while, so we should wait for it + continue + } + return err + } + if Status(status) == StatusFailed { + errs, err := fetchMigrationErrors(ctx, ci, migrationID) + if err != nil { + return fmt.Errorf("migration failed and dmt cannot fetch migration errors: %w", err) + } + return errors.New(errs) + } + if Status(status) == StatusStarted || Status(status) == StatusInProgress || Status(status) == StatusComplete { + return nil + } + } +} + func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error) { var cb ConfigBundle cb.MigrationID = migrationID diff --git a/base/commands/migration/start_stages_it_test.go b/base/commands/migration/start_stages_it_test.go index 91ebf229..519804aa 100644 --- a/base/commands/migration/start_stages_it_test.go +++ b/base/commands/migration/start_stages_it_test.go @@ -33,15 +33,17 @@ func TestMigrationStages(t *testing.T) { { name: "successful", statusMapStateFiles: []string{ - "testdata/start/migration_success_initial.json", - "testdata/start/migration_success_completed.json", + "testdata/stages/migration_started.json", + "testdata/stages/migration_in_progress.json", + "testdata/stages/migration_completed.json", }, }, { name: "failure", statusMapStateFiles: []string{ - "testdata/start/migration_success_initial.json", - "testdata/start/migration_success_failure.json", + "testdata/stages/migration_started.json", + "testdata/stages/migration_in_progress.json", + "testdata/stages/migration_failed.json", }, expectedErr: errors.New("Failed migrating IMAP: imap5: * some error\n* another error"), }, diff --git a/base/commands/migration/status_stages_it_test.go b/base/commands/migration/status_stages_it_test.go index 0efa54f1..38f1aae5 100644 --- a/base/commands/migration/status_stages_it_test.go +++ b/base/commands/migration/status_stages_it_test.go @@ -86,14 +86,14 @@ func statusTest(t *testing.T) { func setStatusInProgress(tcx it.TestContext, ctx context.Context) string { mID := migrationIDFunc() statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) - b := MustValue(os.ReadFile("testdata/start/migration_success_initial.json")) + b := MustValue(os.ReadFile("testdata/stages/migration_in_progress.json")) Must(statusMap.Set(ctx, mID, serialization.JSON(b))) return mID } func setStatusCompleted(migrationID string, tcx it.TestContext, ctx context.Context) { statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) - b := MustValue(os.ReadFile("testdata/start/migration_success_completed.json")) + b := MustValue(os.ReadFile("testdata/stages/migration_completed.json")) Must(statusMap.Set(ctx, migrationID, serialization.JSON(b))) } diff --git a/base/commands/migration/testdata/cancel/migration_cancelling.json b/base/commands/migration/testdata/stages/migration_cancelling.json similarity index 100% rename from base/commands/migration/testdata/cancel/migration_cancelling.json rename to base/commands/migration/testdata/stages/migration_cancelling.json diff --git a/base/commands/migration/testdata/start/migration_success_completed.json b/base/commands/migration/testdata/stages/migration_completed.json similarity index 100% rename from base/commands/migration/testdata/start/migration_success_completed.json rename to base/commands/migration/testdata/stages/migration_completed.json diff --git a/base/commands/migration/testdata/start/migration_success_failure.json b/base/commands/migration/testdata/stages/migration_failed.json similarity index 100% rename from base/commands/migration/testdata/start/migration_success_failure.json rename to base/commands/migration/testdata/stages/migration_failed.json diff --git a/base/commands/migration/testdata/start/migration_success_initial.json b/base/commands/migration/testdata/stages/migration_in_progress.json similarity index 100% rename from base/commands/migration/testdata/start/migration_success_initial.json rename to base/commands/migration/testdata/stages/migration_in_progress.json diff --git a/base/commands/migration/testdata/stages/migration_started.json b/base/commands/migration/testdata/stages/migration_started.json new file mode 100644 index 00000000..5020aea5 --- /dev/null +++ b/base/commands/migration/testdata/stages/migration_started.json @@ -0,0 +1,4 @@ +{ + "id": "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7", + "status": "STARTED" +} \ No newline at end of file