Skip to content

Commit

Permalink
add cancel command to dmt
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin committed Sep 8, 2023
1 parent 9fad1d0 commit ea23864
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 4 deletions.
71 changes: 71 additions & 0 deletions base/commands/migration/cancel_it_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
//go:build migration

package migration_test

import (
"context"
"encoding/json"
"sync"
"testing"

_ "github.com/hazelcast/hazelcast-commandline-client/base"
_ "github.com/hazelcast/hazelcast-commandline-client/base/commands"
"github.com/hazelcast/hazelcast-commandline-client/base/commands/migration"
. "github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/it"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/stretchr/testify/require"
)

func TestStatus(t *testing.T) {
testCases := []struct {
name string
f func(t *testing.T)
}{
{name: "status", f: statusTest},
{name: "noMigrationsStatus", f: noMigrationsStatusTest},
}
for _, tc := range testCases {
t.Run(tc.name, tc.f)
}
}

func noMigrationsStatusTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
var wg sync.WaitGroup
wg.Add(1)
go tcx.WithReset(func() {
defer wg.Done()
tcx.CLC().Execute(ctx, "cancel")
})
wg.Wait()
tcx.AssertStdoutContains("there are no migrations are in progress on migration cluster")
})
}

func statusTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
mID := migration.MakeMigrationID()
l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList))
m := MustValue(json.Marshal(migration.MigrationInProgress{
MigrationID: mID,
}))
ok := MustValue(l.Add(ctx, serialization.JSON(m)))
require.Equal(t, true, ok)
var wg sync.WaitGroup
wg.Add(1)
go tcx.WithReset(func() {
defer wg.Done()
Must(tcx.CLC().Execute(ctx, "cancel"))
})
wg.Wait()
tcx.AssertStdoutContains(`OK [1/2] Connected to the migration cluster.
OK [2/2] Canceled the migration.
OK Migration canceled successfully.`)
})
}
90 changes: 90 additions & 0 deletions base/commands/migration/cancel_stages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package migration

import (
"context"
"encoding/json"
"fmt"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
)

type CancelStages struct {
ci *hazelcast.ClientInternal
cancelQueue *hazelcast.Queue
migrationsInProgressList *hazelcast.List
migrationID string
}

func NewCancelStages() *CancelStages {
return &CancelStages{}
}

func (st *CancelStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage {
return []stage.Stage{
{
ProgressMsg: "Connecting to the migration cluster",
SuccessMsg: "Connected to the migration cluster",
FailureMsg: "Could not connect to the migration cluster",
Func: st.connectStage(ctx, ec),
},
{
ProgressMsg: "Canceling the migration",
SuccessMsg: "Canceled the migration",
FailureMsg: "Could not cancel the migration",
Func: st.cancelStage(ctx),
},
}
}

func (st *CancelStages) connectStage(ctx context.Context, ec plug.ExecContext) func(stage.Statuser) error {
return func(status stage.Statuser) error {
var err error
st.ci, err = ec.ClientInternal(ctx)
if err != nil {
return err
}
st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList)
if err != nil {
return err
}
all, err := st.migrationsInProgressList.GetAll(ctx)
if err != nil {
return err
}
if len(all) == 0 {
return fmt.Errorf("there are no migrations are in progress on migration cluster")
}
var mip MigrationInProgress
m := all[0].(serialization.JSON)
err = json.Unmarshal(m, &mip)
if err != nil {
return fmt.Errorf("parsing migration in progress: %w", err)
}
st.migrationID = mip.MigrationID
st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue)
return err
}
}

func (st *CancelStages) cancelStage(ctx context.Context) func(stage.Statuser) error {
return func(statuser stage.Statuser) error {
c := CancelItem{ID: st.migrationID}
b, err := json.Marshal(c)
if err != nil {
return err
}
st.cancelQueue.Put(ctx, serialization.JSON(b))
return err
}
}

type MigrationInProgress struct {
MigrationID string `json:"migrationId"`
}

type CancelItem struct {
ID string `json:"id"`
}
6 changes: 4 additions & 2 deletions base/commands/migration/const.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package migration

const (
startQueueName = "__datamigration_start_queue"
statusMapEntryName = "status"
startQueueName = "__datamigration_start_queue"
statusMapEntryName = "status"
MigrationsInProgressList = "__datamigrations_in_progress"
CancelQueue = "__datamigration_cancel_queue"
)
39 changes: 39 additions & 0 deletions base/commands/migration/migration_cancel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//go:build migration

package migration

import (
"context"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
"github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
)

type CancelCmd struct{}

func (c CancelCmd) Unwrappable() {}

func (c CancelCmd) Init(cc plug.InitContext) error {
cc.SetCommandUsage("cancel")
cc.SetCommandGroup("migration")
help := "Cancel the data migration"
cc.SetCommandHelp(help, help)
cc.SetPositionalArgCount(0, 0)
return nil
}

func (c CancelCmd) Exec(ctx context.Context, ec plug.ExecContext) error {
sts := NewCancelStages()
sp := stage.NewFixedProvider(sts.Build(ctx, ec)...)
if err := stage.Execute(ctx, ec, sp); err != nil {
return err
}
ec.PrintlnUnnecessary("")
ec.PrintlnUnnecessary("OK Migration canceled successfully.")
return nil
}

func init() {
check.Must(plug.Registry.RegisterCommand("cancel", &CancelCmd{}))
}
2 changes: 1 addition & 1 deletion base/commands/migration/migration_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ Selected data structures in the source cluster will be migrated to the target cl
}
}
ec.PrintlnUnnecessary("")
sts := NewStages(makeMigrationID(), ec.Args()[0])
sts := NewStages(MakeMigrationID(), ec.Args()[0])
sp := stage.NewFixedProvider(sts.Build(ctx, ec)...)
if err := stage.Execute(ctx, ec, sp); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion base/commands/migration/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,6 @@ func readPathAsString(path string) (string, error) {
return string(b), nil
}

func makeMigrationID() string {
func MakeMigrationID() string {
return types.NewUUID().String()
}

0 comments on commit ea23864

Please sign in to comment.