From beb23e4d3954404f473734656a5fb0663de73c28 Mon Sep 17 00:00:00 2001 From: Kutluhan Metin Date: Tue, 7 Nov 2023 11:45:11 +0300 Subject: [PATCH] [CLC-427][CLC-428]: Remove Data Migrations in Progress List and Status Map Usages (#424) --- base/commands/migration/cancel_it_test.go | 39 ++++++++------ base/commands/migration/cancel_stages.go | 41 +++++++-------- base/commands/migration/common.go | 34 +++++++++++++ base/commands/migration/const.go | 16 +++--- base/commands/migration/estimate.go | 2 +- base/commands/migration/migration_stages.go | 13 ++--- base/commands/migration/migration_start.go | 2 +- base/commands/migration/start_stages.go | 51 +++++++++++++++++-- .../migration/start_stages_it_test.go | 14 +++-- base/commands/migration/status_stages.go | 43 ++++++---------- .../migration/status_stages_it_test.go | 48 +++++++---------- .../migration_cancelling.json | 0 .../migration_completed.json} | 0 .../migration_failed.json} | 0 .../migration_in_progress.json} | 0 .../testdata/stages/migration_started.json | 4 ++ base/commands/migration/utils.go | 4 +- 17 files changed, 186 insertions(+), 125 deletions(-) create mode 100644 base/commands/migration/common.go rename base/commands/migration/testdata/{cancel => stages}/migration_cancelling.json (100%) rename base/commands/migration/testdata/{start/migration_success_completed.json => stages/migration_completed.json} (100%) rename base/commands/migration/testdata/{start/migration_success_failure.json => stages/migration_failed.json} (100%) rename base/commands/migration/testdata/{start/migration_success_initial.json => stages/migration_in_progress.json} (100%) create mode 100644 base/commands/migration/testdata/stages/migration_started.json diff --git a/base/commands/migration/cancel_it_test.go b/base/commands/migration/cancel_it_test.go index ded42853e..48fae9e8c 100644 --- a/base/commands/migration/cancel_it_test.go +++ b/base/commands/migration/cancel_it_test.go @@ -4,8 +4,8 @@ package migration_test import ( "context" - "encoding/json" "os" + "sync" "testing" _ "github.com/hazelcast/hazelcast-commandline-client/base" @@ -13,6 +13,7 @@ import ( "github.com/hazelcast/hazelcast-commandline-client/base/commands/migration" . "github.com/hazelcast/hazelcast-commandline-client/internal/check" "github.com/hazelcast/hazelcast-commandline-client/internal/it" + hz "github.com/hazelcast/hazelcast-go-client" "github.com/hazelcast/hazelcast-go-client/serialization" "github.com/stretchr/testify/require" ) @@ -34,8 +35,9 @@ func noMigrationsCancelTest(t *testing.T) { tcx := it.TestContext{T: t} ctx := context.Background() tcx.Tester(func(tcx it.TestContext) { + createMapping(ctx, tcx) err := tcx.CLC().Execute(ctx, "cancel") - require.Contains(t, err.Error(), "there are no migrations in progress") + require.Contains(t, err.Error(), "finding migration in progress: no result found") }) } @@ -43,21 +45,28 @@ func cancelTest(t *testing.T) { tcx := it.TestContext{T: t} ctx := context.Background() tcx.Tester(func(tcx it.TestContext) { - mID := migration.MakeMigrationID() + ci := hz.NewClientInternal(tcx.Client) + mID := migrationIDFunc() createMapping(ctx, tcx) - statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) - b := MustValue(os.ReadFile("testdata/cancel/migration_cancelling.json")) - Must(statusMap.Set(ctx, mID, serialization.JSON(b))) - l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList)) - m := MustValue(json.Marshal(migration.MigrationInProgress{ - MigrationID: mID, - })) - ok := MustValue(l.Add(ctx, serialization.JSON(m))) - require.Equal(t, true, ok) - tcx.WithReset(func() { + setStatusInProgress(tcx, ctx) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() Must(tcx.CLC().Execute(ctx, "cancel")) - }) - MustValue(l.Remove(ctx, serialization.JSON(m))) + }() + cq := MustValue(ci.Client().GetQueue(ctx, migration.CancelQueue)) + MustValue(cq.Poll(ctx)) + setStatusCancelling(mID, tcx, ctx) + wg.Wait() + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + MustValue(statusMap.Remove(ctx, mID)) tcx.AssertStdoutContains(`Migration canceled successfully.`) }) } + +func setStatusCancelling(migrationID string, tcx it.TestContext, ctx context.Context) { + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + b := MustValue(os.ReadFile("testdata/stages/migration_cancelling.json")) + Must(statusMap.Set(ctx, migrationID, serialization.JSON(b))) +} diff --git a/base/commands/migration/cancel_stages.go b/base/commands/migration/cancel_stages.go index b3cd6cca7..d753d03c1 100644 --- a/base/commands/migration/cancel_stages.go +++ b/base/commands/migration/cancel_stages.go @@ -5,7 +5,6 @@ package migration import ( "context" "encoding/json" - "fmt" "time" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" @@ -15,10 +14,9 @@ import ( ) type CancelStages struct { - ci *hazelcast.ClientInternal - cancelQueue *hazelcast.Queue - migrationsInProgressList *hazelcast.List - migrationID string + ci *hazelcast.ClientInternal + cancelQueue *hazelcast.Queue + migrationID string } func NewCancelStages() *CancelStages { @@ -33,6 +31,12 @@ func (st *CancelStages) Build(ctx context.Context, ec plug.ExecContext) []stage. FailureMsg: "Could not connect to the migration cluster", Func: st.connectStage(ec), }, + { + ProgressMsg: "Finding migration in progress", + SuccessMsg: "Found migration in progress", + FailureMsg: "Could not find a migration in progress", + Func: st.findMigrationInProgress(ec), + }, { ProgressMsg: "Canceling the migration", SuccessMsg: "Canceled the migration", @@ -49,26 +53,19 @@ func (st *CancelStages) connectStage(ec plug.ExecContext) func(context.Context, if err != nil { return nil, err } - st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList) - if err != nil { - return nil, err - } - all, err := st.migrationsInProgressList.GetAll(ctx) + st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue) + return nil, nil + } +} + +func (st *CancelStages) findMigrationInProgress(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { + m, err := findMigrationInProgress(ctx, st.ci) if err != nil { return nil, err } - if len(all) == 0 { - return nil, fmt.Errorf("there are no migrations in progress") - } - var mip MigrationInProgress - m := all[0].(serialization.JSON) - err = json.Unmarshal(m, &mip) - if err != nil { - return nil, fmt.Errorf("parsing migration in progress: %w", err) - } - st.migrationID = mip.MigrationID - st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue) - return nil, err + st.migrationID = m.MigrationID + return nil, nil } } diff --git a/base/commands/migration/common.go b/base/commands/migration/common.go new file mode 100644 index 000000000..1fdd8701a --- /dev/null +++ b/base/commands/migration/common.go @@ -0,0 +1,34 @@ +//go:build std || migration + +package migration + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" +) + +type MigrationInProgress struct { + MigrationID string `json:"id"` +} + +func findMigrationInProgress(ctx context.Context, ci *hazelcast.ClientInternal) (MigrationInProgress, error) { + var mip MigrationInProgress + q := fmt.Sprintf("SELECT this FROM %s WHERE JSON_VALUE(this, '$.status') IN('STARTED', 'IN_PROGRESS', 'CANCELING')", StatusMapName) + r, err := querySingleRow(ctx, ci, q) + if err != nil { + return mip, fmt.Errorf("finding migration in progress: %w", err) + } + rr, err := r.Get(0) + if err != nil { + return mip, fmt.Errorf("finding migration in progress: %w", err) + } + m := rr.(serialization.JSON) + if err = json.Unmarshal(m, &mip); err != nil { + return mip, fmt.Errorf("parsing migration in progress: %w", err) + } + return mip, nil +} diff --git a/base/commands/migration/const.go b/base/commands/migration/const.go index d522f0c03..c0d772ca1 100644 --- a/base/commands/migration/const.go +++ b/base/commands/migration/const.go @@ -3,15 +3,13 @@ package migration const ( - StartQueueName = "__datamigration_start_queue" - EstimateQueueName = "__datamigration_estimate_queue" - StatusMapName = "__datamigration_migrations" - UpdateTopicPrefix = "__datamigration_updates_" - DebugLogsListPrefix = "__datamigration_debug_logs_" - MigrationsInProgressList = "__datamigrations_in_progress" - CancelQueue = "__datamigration_cancel_queue" - argDMTConfig = "dmtConfig" - argTitleDMTConfig = "DMT configuration" + StartQueueName = "__datamigration_start_queue" + EstimateQueueName = "__datamigration_estimate_queue" + StatusMapName = "__datamigration_migrations" + DebugLogsListPrefix = "__datamigration_debug_logs_" + CancelQueue = "__datamigration_cancel_queue" + argDMTConfig = "dmtConfig" + argTitleDMTConfig = "DMT configuration" ) type Status string diff --git a/base/commands/migration/estimate.go b/base/commands/migration/estimate.go index 8dde2888b..790601c1d 100644 --- a/base/commands/migration/estimate.go +++ b/base/commands/migration/estimate.go @@ -32,7 +32,7 @@ Estimation usually ends within 15 seconds.`, banner)) if !paths.Exists(conf) { return fmt.Errorf("migration config does not exist: %s", conf) } - mID := MakeMigrationID() + mID := makeMigrationID() stages, err := NewEstimateStages(ec.Logger(), mID, conf) if err != nil { return err diff --git a/base/commands/migration/migration_stages.go b/base/commands/migration/migration_stages.go index 3e7a155ff..9d176e6f6 100644 --- a/base/commands/migration/migration_stages.go +++ b/base/commands/migration/migration_stages.go @@ -35,11 +35,6 @@ var noDataStructuresFoundErr = errors.New("no datastructures found to migrate") var progressMsg = "Migrating %s: %s" func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) ([]stage.Stage[any], error) { - childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - if err := WaitForMigrationToBeInProgress(childCtx, ci, migrationID); err != nil { - return nil, fmt.Errorf("waiting migration to be created: %w", err) - } var stages []stage.Stage[any] dss, err := getDataStructuresToBeMigrated(ctx, ec, migrationID) if err != nil { @@ -82,7 +77,7 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca execErr = errors.New(errs) break statusReaderLoop case StatusCanceled, StatusCanceling: - execErr = clcerrors.ErrUserCancelled + execErr = errors.New("migration canceled by the user") break statusReaderLoop case StatusInProgress: rt, cp, err := fetchOverallProgress(ctx, ci, migrationID) @@ -147,7 +142,7 @@ func getDataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, mig if err != nil { return nil, err } - q := fmt.Sprintf(`SELECT this FROM %s WHERE __key= '%s'`, StatusMapName, migrationID) + q := fmt.Sprintf(`SELECT this FROM %s WHERE __key='%s'`, StatusMapName, migrationID) r, err := querySingleRow(ctx, ci, q) if err != nil { return nil, err @@ -220,7 +215,7 @@ func WaitForMigrationToBeInProgress(ctx context.Context, ci *hazelcast.ClientInt } return errors.New(errs) } - if Status(status) == StatusInProgress { + if Status(status) == StatusInProgress || Status(status) == StatusComplete { return nil } } @@ -369,7 +364,7 @@ func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query str } return row, nil } - return nil, errors.New("no rows found") + return nil, errors.New("no result found") } func maybePrintWarnings(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) { diff --git a/base/commands/migration/migration_start.go b/base/commands/migration/migration_start.go index 66b034b40..c11ad5a04 100644 --- a/base/commands/migration/migration_start.go +++ b/base/commands/migration/migration_start.go @@ -58,7 +58,7 @@ Selected data structures in the source cluster will be migrated to the target cl } } ec.PrintlnUnnecessary("") - mID := MakeMigrationID() + mID := MigrationIDGeneratorFunc() defer func() { maybePrintWarnings(ctx, ec, ci, mID) finalizeErr := finalizeMigration(ctx, ec, ci, mID, ec.Props().GetString(flagOutputDir)) diff --git a/base/commands/migration/start_stages.go b/base/commands/migration/start_stages.go index 10de56af1..4b8179bf5 100644 --- a/base/commands/migration/start_stages.go +++ b/base/commands/migration/start_stages.go @@ -6,6 +6,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" "github.com/hazelcast/hazelcast-commandline-client/internal/log" @@ -18,7 +19,6 @@ type StartStages struct { configDir string ci *hazelcast.ClientInternal startQueue *hazelcast.Queue - statusMap *hazelcast.Map logger log.Logger } @@ -47,6 +47,12 @@ func (st *StartStages) Build(ctx context.Context, ec plug.ExecContext) []stage.S FailureMsg: "Could not start the migration", Func: st.startStage(), }, + { + ProgressMsg: "Doing pre-checks", + SuccessMsg: "Pre-checks complete", + FailureMsg: "Could not complete pre-checks", + Func: st.preCheckStage(), + }, } } @@ -61,10 +67,6 @@ func (st *StartStages) connectStage(ec plug.ExecContext) func(context.Context, s if err != nil { return nil, fmt.Errorf("retrieving the start Queue: %w", err) } - st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName) - if err != nil { - return nil, fmt.Errorf("retrieving the status Map: %w", err) - } return nil, nil } } @@ -78,6 +80,45 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) ( if err = st.startQueue.Put(ctx, cb); err != nil { return nil, fmt.Errorf("updating start Queue: %w", err) } + childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err = waitForMigrationToStart(childCtx, st.ci, st.migrationID); err != nil { + return nil, err + } return nil, nil } } + +func (st *StartStages) preCheckStage() func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { + childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err := WaitForMigrationToBeInProgress(childCtx, st.ci, st.migrationID); err != nil { + return nil, fmt.Errorf("waiting for prechecks to complete: %w", err) + } + return nil, nil + } +} + +func waitForMigrationToStart(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error { + for { + status, err := fetchMigrationStatus(ctx, ci, migrationID) + if err != nil { + if errors.Is(err, migrationStatusNotFoundErr) { + // migration status will not be available for a while, so we should wait for it + continue + } + return err + } + if Status(status) == StatusFailed { + errs, err := fetchMigrationErrors(ctx, ci, migrationID) + if err != nil { + return fmt.Errorf("migration failed and dmt cannot fetch migration errors: %w", err) + } + return errors.New(errs) + } + if Status(status) == StatusStarted || Status(status) == StatusInProgress || Status(status) == StatusComplete { + return nil + } + } +} diff --git a/base/commands/migration/start_stages_it_test.go b/base/commands/migration/start_stages_it_test.go index 073341002..72eb8bb6d 100644 --- a/base/commands/migration/start_stages_it_test.go +++ b/base/commands/migration/start_stages_it_test.go @@ -33,15 +33,17 @@ func TestMigrationStages(t *testing.T) { { name: "successful", statusMapStateFiles: []string{ - "testdata/start/migration_success_initial.json", - "testdata/start/migration_success_completed.json", + "testdata/stages/migration_started.json", + "testdata/stages/migration_in_progress.json", + "testdata/stages/migration_completed.json", }, }, { name: "failure", statusMapStateFiles: []string{ - "testdata/start/migration_success_initial.json", - "testdata/start/migration_success_failure.json", + "testdata/stages/migration_started.json", + "testdata/stages/migration_in_progress.json", + "testdata/stages/migration_failed.json", }, expectedErr: errors.New("Failed migrating IMAP: imap5: * some error\n* another error"), }, @@ -110,6 +112,10 @@ func createMapping(ctx context.Context, tcx it.TestContext) { MustValue(tcx.Client.SQL().Execute(ctx, mSQL)) } +func migrationIDFunc() string { + return "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7" +} + func findMigrationID(ctx context.Context, tcx it.TestContext, c chan string) { q := MustValue(tcx.Client.GetQueue(ctx, migration.StartQueueName)) var b migration.ConfigBundle diff --git a/base/commands/migration/status_stages.go b/base/commands/migration/status_stages.go index fbd29ef0f..a8f7e3f5c 100644 --- a/base/commands/migration/status_stages.go +++ b/base/commands/migration/status_stages.go @@ -4,19 +4,14 @@ package migration import ( "context" - "encoding/json" - "fmt" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" "github.com/hazelcast/hazelcast-commandline-client/internal/plug" "github.com/hazelcast/hazelcast-go-client" - "github.com/hazelcast/hazelcast-go-client/serialization" ) type StatusStages struct { - ci *hazelcast.ClientInternal - migrationsInProgressList *hazelcast.List - statusMap *hazelcast.Map + ci *hazelcast.ClientInternal } func NewStatusStages() *StatusStages { @@ -31,13 +26,15 @@ func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage. FailureMsg: "Could not connect to the migration cluster", Func: st.connectStage(ec), }, + { + ProgressMsg: "Finding migration in progress", + SuccessMsg: "Found migration in progress", + FailureMsg: "Could not find a migration in progress", + Func: st.findMigrationInProgress(ec), + }, } } -type MigrationInProgress struct { - MigrationID string `json:"migrationId"` -} - func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { return func(ctx context.Context, status stage.Statuser[any]) (any, error) { var err error @@ -45,26 +42,16 @@ func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, if err != nil { return nil, err } - st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList) - if err != nil { - return nil, err - } - all, err := st.migrationsInProgressList.GetAll(ctx) - if err != nil { - return nil, err - } - if len(all) == 0 { - return nil, fmt.Errorf("there are no migrations in progress") - } - var mip MigrationInProgress - m := all[0].(serialization.JSON) - if err = json.Unmarshal(m, &mip); err != nil { - return nil, fmt.Errorf("parsing migration in progress: %w", err) - } - st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName) + return nil, nil + } +} + +func (st *StatusStages) findMigrationInProgress(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { + m, err := findMigrationInProgress(ctx, st.ci) if err != nil { return nil, err } - return mip.MigrationID, err + return m.MigrationID, err } } diff --git a/base/commands/migration/status_stages_it_test.go b/base/commands/migration/status_stages_it_test.go index 8211364e3..38f1aae5f 100644 --- a/base/commands/migration/status_stages_it_test.go +++ b/base/commands/migration/status_stages_it_test.go @@ -4,7 +4,6 @@ package migration_test import ( "context" - "encoding/json" "fmt" "os" "sync" @@ -38,6 +37,7 @@ func noMigrationsStatusTest(t *testing.T) { tcx := it.TestContext{T: t} ctx := context.Background() tcx.Tester(func(tcx it.TestContext) { + createMapping(ctx, tcx) var wg sync.WaitGroup wg.Add(1) var execErr error @@ -46,7 +46,7 @@ func noMigrationsStatusTest(t *testing.T) { execErr = tcx.CLC().Execute(ctx, "status") }) wg.Wait() - require.Contains(t, execErr.Error(), "there are no migrations in progress") + require.Contains(t, execErr.Error(), "finding migration in progress: no result found") }) } @@ -55,23 +55,20 @@ func statusTest(t *testing.T) { ctx := context.Background() tcx.Tester(func(tcx it.TestContext) { ci := hz.NewClientInternal(tcx.Client) - progressList := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList)) - mID := preStatusRunner(t, tcx, ctx, ci, progressList) - defer postStatusRunner(ctx, mID, progressList) + createMapping(ctx, tcx) + createMemberLogs(t, ctx, ci) defer removeMembersLogs(ctx, ci) outDir := MustValue(os.MkdirTemp("", "clc-")) + mID := setStatusInProgress(tcx, ctx) var wg sync.WaitGroup wg.Add(1) - go tcx.WithReset(func() { + go func() { defer wg.Done() Must(tcx.CLC().Execute(ctx, "status", "-o", outDir)) - }) - it.Eventually(t, func() bool { - return migration.WaitForMigrationToBeInProgress(ctx, ci, mID) == nil - }) - statusRunner(mID, tcx, ctx) + }() + tcx.AssertStdoutContains("Found migration in progress") + setStatusCompleted(mID, tcx, ctx) wg.Wait() - tcx.AssertStdoutContains("Connected to the migration cluster") tcx.WithReset(func() { f := paths.Join(outDir, fmt.Sprintf("migration_report_%s.txt", mID)) require.Equal(t, true, paths.Exists(f)) @@ -86,18 +83,20 @@ func statusTest(t *testing.T) { }) } -func preStatusRunner(t *testing.T, tcx it.TestContext, ctx context.Context, ci *hz.ClientInternal, progressList *hz.List) string { - createMapping(ctx, tcx) - createMemberLogs(t, ctx, ci) - mID := migration.MakeMigrationID() - m := MustValue(json.Marshal(migration.MigrationInProgress{MigrationID: mID})) - require.Equal(t, true, MustValue(progressList.Add(ctx, serialization.JSON(m)))) +func setStatusInProgress(tcx it.TestContext, ctx context.Context) string { + mID := migrationIDFunc() statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) - b := MustValue(os.ReadFile("testdata/start/migration_success_initial.json")) + b := MustValue(os.ReadFile("testdata/stages/migration_in_progress.json")) Must(statusMap.Set(ctx, mID, serialization.JSON(b))) return mID } +func setStatusCompleted(migrationID string, tcx it.TestContext, ctx context.Context) { + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + b := MustValue(os.ReadFile("testdata/stages/migration_completed.json")) + Must(statusMap.Set(ctx, migrationID, serialization.JSON(b))) +} + func createMemberLogs(t *testing.T, ctx context.Context, ci *hz.ClientInternal) { for _, m := range ci.OrderedMembers() { l := MustValue(ci.Client().GetList(ctx, migration.DebugLogsListPrefix+m.UUID.String())) @@ -113,14 +112,3 @@ func removeMembersLogs(ctx context.Context, ci *hz.ClientInternal) { Must(l.Destroy(ctx)) } } - -func statusRunner(migrationID string, tcx it.TestContext, ctx context.Context) { - statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) - b := MustValue(os.ReadFile("testdata/start/migration_success_completed.json")) - Must(statusMap.Set(ctx, migrationID, serialization.JSON(b))) -} - -func postStatusRunner(ctx context.Context, migrationID string, progressList *hz.List) { - m := MustValue(json.Marshal(migration.MigrationInProgress{MigrationID: migrationID})) - MustValue(progressList.Remove(ctx, serialization.JSON(m))) -} diff --git a/base/commands/migration/testdata/cancel/migration_cancelling.json b/base/commands/migration/testdata/stages/migration_cancelling.json similarity index 100% rename from base/commands/migration/testdata/cancel/migration_cancelling.json rename to base/commands/migration/testdata/stages/migration_cancelling.json diff --git a/base/commands/migration/testdata/start/migration_success_completed.json b/base/commands/migration/testdata/stages/migration_completed.json similarity index 100% rename from base/commands/migration/testdata/start/migration_success_completed.json rename to base/commands/migration/testdata/stages/migration_completed.json diff --git a/base/commands/migration/testdata/start/migration_success_failure.json b/base/commands/migration/testdata/stages/migration_failed.json similarity index 100% rename from base/commands/migration/testdata/start/migration_success_failure.json rename to base/commands/migration/testdata/stages/migration_failed.json diff --git a/base/commands/migration/testdata/start/migration_success_initial.json b/base/commands/migration/testdata/stages/migration_in_progress.json similarity index 100% rename from base/commands/migration/testdata/start/migration_success_initial.json rename to base/commands/migration/testdata/stages/migration_in_progress.json diff --git a/base/commands/migration/testdata/stages/migration_started.json b/base/commands/migration/testdata/stages/migration_started.json new file mode 100644 index 000000000..5020aea5b --- /dev/null +++ b/base/commands/migration/testdata/stages/migration_started.json @@ -0,0 +1,4 @@ +{ + "id": "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7", + "status": "STARTED" +} \ No newline at end of file diff --git a/base/commands/migration/utils.go b/base/commands/migration/utils.go index 06b36610f..1f1b3ec86 100644 --- a/base/commands/migration/utils.go +++ b/base/commands/migration/utils.go @@ -123,7 +123,9 @@ func readPathAsString(path string) (string, error) { return string(b), nil } -func MakeMigrationID() string { +var MigrationIDGeneratorFunc = makeMigrationID + +func makeMigrationID() string { return types.NewUUID().String() }