From daf25292251d3902ff19430bb8c4d257436d3d6a Mon Sep 17 00:00:00 2001 From: kmetin Date: Thu, 2 Nov 2023 14:01:17 +0300 Subject: [PATCH] remove status map and data migrations in progress list usages --- base/commands/migration/cancel_it_test.go | 36 ++++++++++------- base/commands/migration/cancel_stages.go | 25 +++--------- base/commands/migration/const.go | 14 +++---- base/commands/migration/migration_stages.go | 2 +- base/commands/migration/migration_start.go | 2 +- base/commands/migration/start_stages.go | 5 --- .../migration/start_stages_it_test.go | 20 ++-------- base/commands/migration/status_stages.go | 40 +++++++++---------- .../migration/status_stages_it_test.go | 37 +++++++---------- base/commands/migration/utils.go | 8 ++-- 10 files changed, 74 insertions(+), 115 deletions(-) diff --git a/base/commands/migration/cancel_it_test.go b/base/commands/migration/cancel_it_test.go index ded42853e..2b6caf999 100644 --- a/base/commands/migration/cancel_it_test.go +++ b/base/commands/migration/cancel_it_test.go @@ -4,8 +4,8 @@ package migration_test import ( "context" - "encoding/json" "os" + "sync" "testing" _ "github.com/hazelcast/hazelcast-commandline-client/base" @@ -13,6 +13,7 @@ import ( "github.com/hazelcast/hazelcast-commandline-client/base/commands/migration" . "github.com/hazelcast/hazelcast-commandline-client/internal/check" "github.com/hazelcast/hazelcast-commandline-client/internal/it" + hz "github.com/hazelcast/hazelcast-go-client" "github.com/hazelcast/hazelcast-go-client/serialization" "github.com/stretchr/testify/require" ) @@ -34,8 +35,9 @@ func noMigrationsCancelTest(t *testing.T) { tcx := it.TestContext{T: t} ctx := context.Background() tcx.Tester(func(tcx it.TestContext) { + createMapping(ctx, tcx) err := tcx.CLC().Execute(ctx, "cancel") - require.Contains(t, err.Error(), "there are no migrations in progress") + require.Contains(t, err.Error(), "finding migration in progress: no rows found") }) } @@ -43,21 +45,27 @@ func cancelTest(t *testing.T) { tcx := it.TestContext{T: t} ctx := context.Background() tcx.Tester(func(tcx it.TestContext) { - mID := migration.MakeMigrationID() + mID := "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7" + ci := hz.NewClientInternal(tcx.Client) createMapping(ctx, tcx) - statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) - b := MustValue(os.ReadFile("testdata/cancel/migration_cancelling.json")) - Must(statusMap.Set(ctx, mID, serialization.JSON(b))) - l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList)) - m := MustValue(json.Marshal(migration.MigrationInProgress{ - MigrationID: mID, - })) - ok := MustValue(l.Add(ctx, serialization.JSON(m))) - require.Equal(t, true, ok) - tcx.WithReset(func() { + setStatusInProgress(tcx, ctx) + var wg sync.WaitGroup + wg.Add(1) + go tcx.WithReset(func() { + defer wg.Done() Must(tcx.CLC().Execute(ctx, "cancel")) }) - MustValue(l.Remove(ctx, serialization.JSON(m))) + it.Eventually(t, func() bool { + return migration.WaitForMigrationToBeInProgress(ctx, ci, mID) == nil + }) + setStatusCancelling(mID, tcx, ctx) + wg.Wait() tcx.AssertStdoutContains(`Migration canceled successfully.`) }) } + +func setStatusCancelling(migrationID string, tcx it.TestContext, ctx context.Context) { + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + b := MustValue(os.ReadFile("testdata/cancel/migration_cancelling.json")) + Must(statusMap.Set(ctx, migrationID, serialization.JSON(b))) +} diff --git a/base/commands/migration/cancel_stages.go b/base/commands/migration/cancel_stages.go index b3cd6cca7..ead557320 100644 --- a/base/commands/migration/cancel_stages.go +++ b/base/commands/migration/cancel_stages.go @@ -5,7 +5,6 @@ package migration import ( "context" "encoding/json" - "fmt" "time" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" @@ -15,10 +14,9 @@ import ( ) type CancelStages struct { - ci *hazelcast.ClientInternal - cancelQueue *hazelcast.Queue - migrationsInProgressList *hazelcast.List - migrationID string + ci *hazelcast.ClientInternal + cancelQueue *hazelcast.Queue + migrationID string } func NewCancelStages() *CancelStages { @@ -49,24 +47,11 @@ func (st *CancelStages) connectStage(ec plug.ExecContext) func(context.Context, if err != nil { return nil, err } - st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList) + m, err := findMigrationInProgress(ctx, st.ci) if err != nil { return nil, err } - all, err := st.migrationsInProgressList.GetAll(ctx) - if err != nil { - return nil, err - } - if len(all) == 0 { - return nil, fmt.Errorf("there are no migrations in progress") - } - var mip MigrationInProgress - m := all[0].(serialization.JSON) - err = json.Unmarshal(m, &mip) - if err != nil { - return nil, fmt.Errorf("parsing migration in progress: %w", err) - } - st.migrationID = mip.MigrationID + st.migrationID = m.MigrationID st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue) return nil, err } diff --git a/base/commands/migration/const.go b/base/commands/migration/const.go index 97d53ba03..724b32dfe 100644 --- a/base/commands/migration/const.go +++ b/base/commands/migration/const.go @@ -3,14 +3,12 @@ package migration const ( - StartQueueName = "__datamigration_start_queue" - StatusMapName = "__datamigration_migrations" - UpdateTopicPrefix = "__datamigration_updates_" - DebugLogsListPrefix = "__datamigration_debug_logs_" - MigrationsInProgressList = "__datamigrations_in_progress" - CancelQueue = "__datamigration_cancel_queue" - argDMTConfig = "dmtConfig" - argTitleDMTConfig = "DMT configuration" + StartQueueName = "__datamigration_start_queue" + StatusMapName = "__datamigration_migrations" + DebugLogsListPrefix = "__datamigration_debug_logs_" + CancelQueue = "__datamigration_cancel_queue" + argDMTConfig = "dmtConfig" + argTitleDMTConfig = "DMT configuration" ) type Status string diff --git a/base/commands/migration/migration_stages.go b/base/commands/migration/migration_stages.go index 1d24c133c..7ccd9e49b 100644 --- a/base/commands/migration/migration_stages.go +++ b/base/commands/migration/migration_stages.go @@ -130,7 +130,7 @@ func getDataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, mig if err != nil { return nil, err } - q := fmt.Sprintf(`SELECT this FROM %s WHERE __key= '%s'`, StatusMapName, migrationID) + q := fmt.Sprintf(`SELECT this FROM %s WHERE __key='%s'`, StatusMapName, migrationID) r, err := querySingleRow(ctx, ci, q) if err != nil { return nil, err diff --git a/base/commands/migration/migration_start.go b/base/commands/migration/migration_start.go index 843681998..b9df75e0e 100644 --- a/base/commands/migration/migration_start.go +++ b/base/commands/migration/migration_start.go @@ -52,7 +52,7 @@ Selected data structures in the source cluster will be migrated to the target cl } } ec.PrintlnUnnecessary("") - mID := MakeMigrationID() + mID := MigrationIDGeneratorFunc() defer func() { finalizeErr := finalizeMigration(ctx, ec, ci, mID, ec.Props().GetString(flagOutputDir)) if err == nil { diff --git a/base/commands/migration/start_stages.go b/base/commands/migration/start_stages.go index d2d1842ad..970d04880 100644 --- a/base/commands/migration/start_stages.go +++ b/base/commands/migration/start_stages.go @@ -20,7 +20,6 @@ type StartStages struct { configDir string ci *hazelcast.ClientInternal startQueue *hazelcast.Queue - statusMap *hazelcast.Map logger log.Logger } @@ -63,10 +62,6 @@ func (st *StartStages) connectStage(ec plug.ExecContext) func(context.Context, s if err != nil { return nil, fmt.Errorf("retrieving the start Queue: %w", err) } - st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName) - if err != nil { - return nil, fmt.Errorf("retrieving the status Map: %w", err) - } return nil, nil } } diff --git a/base/commands/migration/start_stages_it_test.go b/base/commands/migration/start_stages_it_test.go index d3b6ac7c7..8930a737d 100644 --- a/base/commands/migration/start_stages_it_test.go +++ b/base/commands/migration/start_stages_it_test.go @@ -4,13 +4,11 @@ package migration_test import ( "context" - "encoding/json" "errors" "fmt" "os" "sync" "testing" - "time" hz "github.com/hazelcast/hazelcast-go-client" "github.com/hazelcast/hazelcast-go-client/serialization" @@ -57,6 +55,8 @@ func startMigrationTest(t *testing.T, expectedErr error, statusMapStateFiles []s tcx := it.TestContext{T: t} ctx := context.Background() tcx.Tester(func(tcx it.TestContext) { + migration.MigrationIDGeneratorFunc = migrationIDFunc + mID := migrationIDFunc() ci := hz.NewClientInternal(tcx.Client) createMapping(ctx, tcx) createMemberLogs(t, ctx, ci) @@ -69,9 +69,6 @@ func startMigrationTest(t *testing.T, expectedErr error, statusMapStateFiles []s defer wg.Done() execErr = tcx.CLC().Execute(ctx, "start", "dmt-config", "--yes", "-o", outDir) }) - c := make(chan string) - go findMigrationID(ctx, tcx, c) - mID := <-c wg.Add(1) go migrationRunner(t, ctx, tcx, mID, &wg, statusMapStateFiles) wg.Wait() @@ -110,15 +107,6 @@ func createMapping(ctx context.Context, tcx it.TestContext) { MustValue(tcx.Client.SQL().Execute(ctx, mSQL)) } -func findMigrationID(ctx context.Context, tcx it.TestContext, c chan string) { - q := MustValue(tcx.Client.GetQueue(ctx, migration.StartQueueName)) - var b migration.ConfigBundle - for { - v := MustValue(q.PollWithTimeout(ctx, time.Second)) - if v != nil { - Must(json.Unmarshal(v.(serialization.JSON), &b)) - c <- b.MigrationID - break - } - } +func migrationIDFunc() string { + return "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7" } diff --git a/base/commands/migration/status_stages.go b/base/commands/migration/status_stages.go index fbd29ef0f..c2236052d 100644 --- a/base/commands/migration/status_stages.go +++ b/base/commands/migration/status_stages.go @@ -14,9 +14,7 @@ import ( ) type StatusStages struct { - ci *hazelcast.ClientInternal - migrationsInProgressList *hazelcast.List - statusMap *hazelcast.Map + ci *hazelcast.ClientInternal } func NewStatusStages() *StatusStages { @@ -35,7 +33,7 @@ func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage. } type MigrationInProgress struct { - MigrationID string `json:"migrationId"` + MigrationID string `json:"id"` } func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { @@ -45,26 +43,24 @@ func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, if err != nil { return nil, err } - st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList) + m, err := findMigrationInProgress(ctx, st.ci) if err != nil { return nil, err } - all, err := st.migrationsInProgressList.GetAll(ctx) - if err != nil { - return nil, err - } - if len(all) == 0 { - return nil, fmt.Errorf("there are no migrations in progress") - } - var mip MigrationInProgress - m := all[0].(serialization.JSON) - if err = json.Unmarshal(m, &mip); err != nil { - return nil, fmt.Errorf("parsing migration in progress: %w", err) - } - st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName) - if err != nil { - return nil, err - } - return mip.MigrationID, err + return m.MigrationID, err + } +} + +func findMigrationInProgress(ctx context.Context, ci *hazelcast.ClientInternal) (MigrationInProgress, error) { + var mip MigrationInProgress + q := fmt.Sprintf("SELECT this FROM %s WHERE JSON_VALUE(this, '$.status') IN('STARTED', 'IN_PROGRESS', 'CANCELLING')", StatusMapName) + r, err := querySingleRow(ctx, ci, q) + if err != nil { + return mip, fmt.Errorf("finding migration in progress: %w", err) + } + m := r.(serialization.JSON) + if err = json.Unmarshal(m, &mip); err != nil { + return mip, fmt.Errorf("parsing migration in progress: %w", err) } + return mip, nil } diff --git a/base/commands/migration/status_stages_it_test.go b/base/commands/migration/status_stages_it_test.go index 8211364e3..81ca2cfe5 100644 --- a/base/commands/migration/status_stages_it_test.go +++ b/base/commands/migration/status_stages_it_test.go @@ -4,7 +4,6 @@ package migration_test import ( "context" - "encoding/json" "fmt" "os" "sync" @@ -38,6 +37,7 @@ func noMigrationsStatusTest(t *testing.T) { tcx := it.TestContext{T: t} ctx := context.Background() tcx.Tester(func(tcx it.TestContext) { + createMapping(ctx, tcx) var wg sync.WaitGroup wg.Add(1) var execErr error @@ -46,7 +46,7 @@ func noMigrationsStatusTest(t *testing.T) { execErr = tcx.CLC().Execute(ctx, "status") }) wg.Wait() - require.Contains(t, execErr.Error(), "there are no migrations in progress") + require.Contains(t, execErr.Error(), "finding migration in progress: no rows found") }) } @@ -55,11 +55,11 @@ func statusTest(t *testing.T) { ctx := context.Background() tcx.Tester(func(tcx it.TestContext) { ci := hz.NewClientInternal(tcx.Client) - progressList := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList)) - mID := preStatusRunner(t, tcx, ctx, ci, progressList) - defer postStatusRunner(ctx, mID, progressList) + createMapping(ctx, tcx) + createMemberLogs(t, ctx, ci) defer removeMembersLogs(ctx, ci) outDir := MustValue(os.MkdirTemp("", "clc-")) + mID := setStatusInProgress(tcx, ctx) var wg sync.WaitGroup wg.Add(1) go tcx.WithReset(func() { @@ -69,7 +69,7 @@ func statusTest(t *testing.T) { it.Eventually(t, func() bool { return migration.WaitForMigrationToBeInProgress(ctx, ci, mID) == nil }) - statusRunner(mID, tcx, ctx) + setStatusCompleted(mID, tcx, ctx) wg.Wait() tcx.AssertStdoutContains("Connected to the migration cluster") tcx.WithReset(func() { @@ -86,18 +86,20 @@ func statusTest(t *testing.T) { }) } -func preStatusRunner(t *testing.T, tcx it.TestContext, ctx context.Context, ci *hz.ClientInternal, progressList *hz.List) string { - createMapping(ctx, tcx) - createMemberLogs(t, ctx, ci) - mID := migration.MakeMigrationID() - m := MustValue(json.Marshal(migration.MigrationInProgress{MigrationID: mID})) - require.Equal(t, true, MustValue(progressList.Add(ctx, serialization.JSON(m)))) +func setStatusInProgress(tcx it.TestContext, ctx context.Context) string { + mID := "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7" statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) b := MustValue(os.ReadFile("testdata/start/migration_success_initial.json")) Must(statusMap.Set(ctx, mID, serialization.JSON(b))) return mID } +func setStatusCompleted(migrationID string, tcx it.TestContext, ctx context.Context) { + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + b := MustValue(os.ReadFile("testdata/start/migration_success_completed.json")) + Must(statusMap.Set(ctx, migrationID, serialization.JSON(b))) +} + func createMemberLogs(t *testing.T, ctx context.Context, ci *hz.ClientInternal) { for _, m := range ci.OrderedMembers() { l := MustValue(ci.Client().GetList(ctx, migration.DebugLogsListPrefix+m.UUID.String())) @@ -113,14 +115,3 @@ func removeMembersLogs(ctx context.Context, ci *hz.ClientInternal) { Must(l.Destroy(ctx)) } } - -func statusRunner(migrationID string, tcx it.TestContext, ctx context.Context) { - statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) - b := MustValue(os.ReadFile("testdata/start/migration_success_completed.json")) - Must(statusMap.Set(ctx, migrationID, serialization.JSON(b))) -} - -func postStatusRunner(ctx context.Context, migrationID string, progressList *hz.List) { - m := MustValue(json.Marshal(migration.MigrationInProgress{MigrationID: migrationID})) - MustValue(progressList.Remove(ctx, serialization.JSON(m))) -} diff --git a/base/commands/migration/utils.go b/base/commands/migration/utils.go index 5af0aa0ed..43b9d8fc0 100644 --- a/base/commands/migration/utils.go +++ b/base/commands/migration/utils.go @@ -121,10 +121,8 @@ func readPathAsString(path string) (string, error) { return string(b), nil } -func MakeMigrationID() string { - return types.NewUUID().String() -} +var MigrationIDGeneratorFunc = makeMigrationID -func MakeUpdateTopicName(migrationID string) string { - return UpdateTopicPrefix + migrationID +func makeMigrationID() string { + return types.NewUUID().String() }