From ea238642a0d4ac01da680b9f79a6e3a8756a3df3 Mon Sep 17 00:00:00 2001 From: kmetin Date: Fri, 8 Sep 2023 10:55:08 +0300 Subject: [PATCH] add cancel command to dmt --- base/commands/migration/cancel_it_test.go | 71 ++++++++++++++++ base/commands/migration/cancel_stages.go | 90 +++++++++++++++++++++ base/commands/migration/const.go | 6 +- base/commands/migration/migration_cancel.go | 39 +++++++++ base/commands/migration/migration_start.go | 2 +- base/commands/migration/utils.go | 2 +- 6 files changed, 206 insertions(+), 4 deletions(-) create mode 100644 base/commands/migration/cancel_it_test.go create mode 100644 base/commands/migration/cancel_stages.go create mode 100644 base/commands/migration/migration_cancel.go diff --git a/base/commands/migration/cancel_it_test.go b/base/commands/migration/cancel_it_test.go new file mode 100644 index 00000000..e6f3b388 --- /dev/null +++ b/base/commands/migration/cancel_it_test.go @@ -0,0 +1,71 @@ +//go:build migration + +package migration_test + +import ( + "context" + "encoding/json" + "sync" + "testing" + + _ "github.com/hazelcast/hazelcast-commandline-client/base" + _ "github.com/hazelcast/hazelcast-commandline-client/base/commands" + "github.com/hazelcast/hazelcast-commandline-client/base/commands/migration" + . "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/it" + "github.com/hazelcast/hazelcast-go-client/serialization" + "github.com/stretchr/testify/require" +) + +func TestStatus(t *testing.T) { + testCases := []struct { + name string + f func(t *testing.T) + }{ + {name: "status", f: statusTest}, + {name: "noMigrationsStatus", f: noMigrationsStatusTest}, + } + for _, tc := range testCases { + t.Run(tc.name, tc.f) + } +} + +func noMigrationsStatusTest(t *testing.T) { + tcx := it.TestContext{T: t} + ctx := context.Background() + tcx.Tester(func(tcx it.TestContext) { + var wg sync.WaitGroup + wg.Add(1) + go tcx.WithReset(func() { + defer wg.Done() + tcx.CLC().Execute(ctx, "cancel") + }) + wg.Wait() + tcx.AssertStdoutContains("there are no migrations are in progress on migration cluster") + }) +} + +func statusTest(t *testing.T) { + tcx := it.TestContext{T: t} + ctx := context.Background() + tcx.Tester(func(tcx it.TestContext) { + mID := migration.MakeMigrationID() + l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList)) + m := MustValue(json.Marshal(migration.MigrationInProgress{ + MigrationID: mID, + })) + ok := MustValue(l.Add(ctx, serialization.JSON(m))) + require.Equal(t, true, ok) + var wg sync.WaitGroup + wg.Add(1) + go tcx.WithReset(func() { + defer wg.Done() + Must(tcx.CLC().Execute(ctx, "cancel")) + }) + wg.Wait() + tcx.AssertStdoutContains(`OK [1/2] Connected to the migration cluster. + OK [2/2] Canceled the migration. + + OK Migration canceled successfully.`) + }) +} diff --git a/base/commands/migration/cancel_stages.go b/base/commands/migration/cancel_stages.go new file mode 100644 index 00000000..79189a21 --- /dev/null +++ b/base/commands/migration/cancel_stages.go @@ -0,0 +1,90 @@ +package migration + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" + "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" +) + +type CancelStages struct { + ci *hazelcast.ClientInternal + cancelQueue *hazelcast.Queue + migrationsInProgressList *hazelcast.List + migrationID string +} + +func NewCancelStages() *CancelStages { + return &CancelStages{} +} + +func (st *CancelStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage { + return []stage.Stage{ + { + ProgressMsg: "Connecting to the migration cluster", + SuccessMsg: "Connected to the migration cluster", + FailureMsg: "Could not connect to the migration cluster", + Func: st.connectStage(ctx, ec), + }, + { + ProgressMsg: "Canceling the migration", + SuccessMsg: "Canceled the migration", + FailureMsg: "Could not cancel the migration", + Func: st.cancelStage(ctx), + }, + } +} + +func (st *CancelStages) connectStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error { + return func(status stage.Statuser) error { + var err error + st.ci, err = ec.ClientInternal(ctx) + if err != nil { + return err + } + st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList) + if err != nil { + return err + } + all, err := st.migrationsInProgressList.GetAll(ctx) + if err != nil { + return err + } + if len(all) == 0 { + return fmt.Errorf("there are no migrations are in progress on migration cluster") + } + var mip MigrationInProgress + m := all[0].(serialization.JSON) + err = json.Unmarshal(m, &mip) + if err != nil { + return fmt.Errorf("parsing migration in progress: %w", err) + } + st.migrationID = mip.MigrationID + st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue) + return err + } +} + +func (st *CancelStages) cancelStage(ctx context.Context) func(stage.Statuser) error { + return func(statuser stage.Statuser) error { + c := CancelItem{ID: st.migrationID} + b, err := json.Marshal(c) + if err != nil { + return err + } + st.cancelQueue.Put(ctx, serialization.JSON(b)) + return err + } +} + +type MigrationInProgress struct { + MigrationID string `json:"migrationId"` +} + +type CancelItem struct { + ID string `json:"id"` +} diff --git a/base/commands/migration/const.go b/base/commands/migration/const.go index 3888287c..61583cc3 100644 --- a/base/commands/migration/const.go +++ b/base/commands/migration/const.go @@ -1,6 +1,8 @@ package migration const ( - startQueueName = "__datamigration_start_queue" - statusMapEntryName = "status" + startQueueName = "__datamigration_start_queue" + statusMapEntryName = "status" + MigrationsInProgressList = "__datamigrations_in_progress" + CancelQueue = "__datamigration_cancel_queue" ) diff --git a/base/commands/migration/migration_cancel.go b/base/commands/migration/migration_cancel.go new file mode 100644 index 00000000..2f53ea3e --- /dev/null +++ b/base/commands/migration/migration_cancel.go @@ -0,0 +1,39 @@ +//go:build migration + +package migration + +import ( + "context" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" +) + +type CancelCmd struct{} + +func (c CancelCmd) Unwrappable() {} + +func (c CancelCmd) Init(cc plug.InitContext) error { + cc.SetCommandUsage("cancel") + cc.SetCommandGroup("migration") + help := "Cancel the data migration" + cc.SetCommandHelp(help, help) + cc.SetPositionalArgCount(0, 0) + return nil +} + +func (c CancelCmd) Exec(ctx context.Context, ec plug.ExecContext) error { + sts := NewCancelStages() + sp := stage.NewFixedProvider(sts.Build(ctx, ec)...) + if err := stage.Execute(ctx, ec, sp); err != nil { + return err + } + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary("OK Migration canceled successfully.") + return nil +} + +func init() { + check.Must(plug.Registry.RegisterCommand("cancel", &CancelCmd{})) +} diff --git a/base/commands/migration/migration_start.go b/base/commands/migration/migration_start.go index 95136cb9..de12e82e 100644 --- a/base/commands/migration/migration_start.go +++ b/base/commands/migration/migration_start.go @@ -45,7 +45,7 @@ Selected data structures in the source cluster will be migrated to the target cl } } ec.PrintlnUnnecessary("") - sts := NewStages(makeMigrationID(), ec.Args()[0]) + sts := NewStages(MakeMigrationID(), ec.Args()[0]) sp := stage.NewFixedProvider(sts.Build(ctx, ec)...) if err := stage.Execute(ctx, ec, sp); err != nil { return err diff --git a/base/commands/migration/utils.go b/base/commands/migration/utils.go index 61b364cc..9ef3caf6 100644 --- a/base/commands/migration/utils.go +++ b/base/commands/migration/utils.go @@ -119,6 +119,6 @@ func readPathAsString(path string) (string, error) { return string(b), nil } -func makeMigrationID() string { +func MakeMigrationID() string { return types.NewUUID().String() }