diff --git a/base/commands/migration/cancel_it_test.go b/base/commands/migration/cancel_it_test.go index b8726606..f06ffb52 100644 --- a/base/commands/migration/cancel_it_test.go +++ b/base/commands/migration/cancel_it_test.go @@ -37,7 +37,7 @@ func noMigrationsCancelTest(t *testing.T) { tcx.Tester(func(tcx it.TestContext) { createMapping(ctx, tcx) err := tcx.CLC().Execute(ctx, "cancel") - require.Contains(t, err.Error(), "finding migration in progress: no rows found") + require.Contains(t, err.Error(), "finding migration in progress: no result found") }) } diff --git a/base/commands/migration/cancel_stages.go b/base/commands/migration/cancel_stages.go index ead55732..d753d03c 100644 --- a/base/commands/migration/cancel_stages.go +++ b/base/commands/migration/cancel_stages.go @@ -31,6 +31,12 @@ func (st *CancelStages) Build(ctx context.Context, ec plug.ExecContext) []stage. FailureMsg: "Could not connect to the migration cluster", Func: st.connectStage(ec), }, + { + ProgressMsg: "Finding migration in progress", + SuccessMsg: "Found migration in progress", + FailureMsg: "Could not find a migration in progress", + Func: st.findMigrationInProgress(ec), + }, { ProgressMsg: "Canceling the migration", SuccessMsg: "Canceled the migration", @@ -47,13 +53,19 @@ func (st *CancelStages) connectStage(ec plug.ExecContext) func(context.Context, if err != nil { return nil, err } + st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue) + return nil, nil + } +} + +func (st *CancelStages) findMigrationInProgress(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { m, err := findMigrationInProgress(ctx, st.ci) if err != nil { return nil, err } st.migrationID = m.MigrationID - st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue) - return nil, err + return nil, nil } } diff --git a/base/commands/migration/common.go b/base/commands/migration/common.go new file mode 100644 index 00000000..c9f2cf79 --- /dev/null +++ b/base/commands/migration/common.go @@ -0,0 +1,30 @@ +//go:build std || migration + +package migration + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" +) + +type MigrationInProgress struct { + MigrationID string `json:"id"` +} + +func findMigrationInProgress(ctx context.Context, ci *hazelcast.ClientInternal) (MigrationInProgress, error) { + var mip MigrationInProgress + q := fmt.Sprintf("SELECT this FROM %s WHERE JSON_VALUE(this, '$.status') IN('STARTED', 'IN_PROGRESS', 'CANCELING')", StatusMapName) + r, err := querySingleRow(ctx, ci, q) + if err != nil { + return mip, fmt.Errorf("finding migration in progress: %w", err) + } + m := r.(serialization.JSON) + if err = json.Unmarshal(m, &mip); err != nil { + return mip, fmt.Errorf("parsing migration in progress: %w", err) + } + return mip, nil +} diff --git a/base/commands/migration/migration_stages.go b/base/commands/migration/migration_stages.go index 50b814d0..4c945f82 100644 --- a/base/commands/migration/migration_stages.go +++ b/base/commands/migration/migration_stages.go @@ -290,5 +290,5 @@ func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query str } return r, nil } - return nil, errors.New("no rows found") + return nil, errors.New("no result found") } diff --git a/base/commands/migration/status_stages.go b/base/commands/migration/status_stages.go index 333d8d28..a8f7e3f5 100644 --- a/base/commands/migration/status_stages.go +++ b/base/commands/migration/status_stages.go @@ -4,13 +4,10 @@ package migration import ( "context" - "encoding/json" - "fmt" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" "github.com/hazelcast/hazelcast-commandline-client/internal/plug" "github.com/hazelcast/hazelcast-go-client" - "github.com/hazelcast/hazelcast-go-client/serialization" ) type StatusStages struct { @@ -29,13 +26,15 @@ func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage. FailureMsg: "Could not connect to the migration cluster", Func: st.connectStage(ec), }, + { + ProgressMsg: "Finding migration in progress", + SuccessMsg: "Found migration in progress", + FailureMsg: "Could not find a migration in progress", + Func: st.findMigrationInProgress(ec), + }, } } -type MigrationInProgress struct { - MigrationID string `json:"id"` -} - func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { return func(ctx context.Context, status stage.Statuser[any]) (any, error) { var err error @@ -43,6 +42,12 @@ func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, if err != nil { return nil, err } + return nil, nil + } +} + +func (st *StatusStages) findMigrationInProgress(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { m, err := findMigrationInProgress(ctx, st.ci) if err != nil { return nil, err @@ -50,17 +55,3 @@ func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, return m.MigrationID, err } } - -func findMigrationInProgress(ctx context.Context, ci *hazelcast.ClientInternal) (MigrationInProgress, error) { - var mip MigrationInProgress - q := fmt.Sprintf("SELECT this FROM %s WHERE JSON_VALUE(this, '$.status') IN('STARTED', 'IN_PROGRESS', 'CANCELING')", StatusMapName) - r, err := querySingleRow(ctx, ci, q) - if err != nil { - return mip, fmt.Errorf("finding migration in progress: %w", err) - } - m := r.(serialization.JSON) - if err = json.Unmarshal(m, &mip); err != nil { - return mip, fmt.Errorf("parsing migration in progress: %w", err) - } - return mip, nil -} diff --git a/base/commands/migration/status_stages_it_test.go b/base/commands/migration/status_stages_it_test.go index fc174d70..f9f3a45f 100644 --- a/base/commands/migration/status_stages_it_test.go +++ b/base/commands/migration/status_stages_it_test.go @@ -46,7 +46,7 @@ func noMigrationsStatusTest(t *testing.T) { execErr = tcx.CLC().Execute(ctx, "status") }) wg.Wait() - require.Contains(t, execErr.Error(), "finding migration in progress: no rows found") + require.Contains(t, execErr.Error(), "finding migration in progress: no result found") }) }