Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLC-427][CLC-428]: Remove Data Migrations in Progress List and Status Map Usages #424

Merged
merged 14 commits into from
Nov 7, 2023
39 changes: 24 additions & 15 deletions base/commands/migration/cancel_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ package migration_test

import (
"context"
"encoding/json"
"os"
"sync"
"testing"

_ "github.com/hazelcast/hazelcast-commandline-client/base"
_ "github.com/hazelcast/hazelcast-commandline-client/base/commands"
"github.com/hazelcast/hazelcast-commandline-client/base/commands/migration"
. "github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/it"
hz "github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/stretchr/testify/require"
)
Expand All @@ -34,30 +35,38 @@ func noMigrationsCancelTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
createMapping(ctx, tcx)
err := tcx.CLC().Execute(ctx, "cancel")
require.Contains(t, err.Error(), "there are no migrations in progress")
require.Contains(t, err.Error(), "finding migration in progress: no result found")
})
}

func cancelTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
mID := migration.MakeMigrationID()
ci := hz.NewClientInternal(tcx.Client)
mID := migrationIDFunc()
createMapping(ctx, tcx)
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/cancel/migration_cancelling.json"))
Must(statusMap.Set(ctx, mID, serialization.JSON(b)))
l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList))
m := MustValue(json.Marshal(migration.MigrationInProgress{
MigrationID: mID,
}))
ok := MustValue(l.Add(ctx, serialization.JSON(m)))
require.Equal(t, true, ok)
tcx.WithReset(func() {
setStatusInProgress(tcx, ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
Must(tcx.CLC().Execute(ctx, "cancel"))
})
MustValue(l.Remove(ctx, serialization.JSON(m)))
}()
cq := MustValue(ci.Client().GetQueue(ctx, migration.CancelQueue))
MustValue(cq.Poll(ctx))
setStatusCancelling(mID, tcx, ctx)
wg.Wait()
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
MustValue(statusMap.Remove(ctx, mID))
tcx.AssertStdoutContains(`Migration canceled successfully.`)
})
}

func setStatusCancelling(migrationID string, tcx it.TestContext, ctx context.Context) {
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/cancel/migration_cancelling.json"))
Must(statusMap.Set(ctx, migrationID, serialization.JSON(b)))
}
41 changes: 19 additions & 22 deletions base/commands/migration/cancel_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package migration
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
Expand All @@ -15,10 +14,9 @@ import (
)

type CancelStages struct {
ci *hazelcast.ClientInternal
cancelQueue *hazelcast.Queue
migrationsInProgressList *hazelcast.List
migrationID string
ci *hazelcast.ClientInternal
cancelQueue *hazelcast.Queue
migrationID string
}

func NewCancelStages() *CancelStages {
Expand All @@ -33,6 +31,12 @@ func (st *CancelStages) Build(ctx context.Context, ec plug.ExecContext) []stage.
FailureMsg: "Could not connect to the migration cluster",
Func: st.connectStage(ec),
},
{
ProgressMsg: "Finding migration in progress",
SuccessMsg: "Found migration in progress",
FailureMsg: "Could not find a migration in progress",
Func: st.findMigrationInProgress(ec),
},
{
ProgressMsg: "Canceling the migration",
SuccessMsg: "Canceled the migration",
Expand All @@ -49,26 +53,19 @@ func (st *CancelStages) connectStage(ec plug.ExecContext) func(context.Context,
if err != nil {
return nil, err
}
st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList)
if err != nil {
return nil, err
}
all, err := st.migrationsInProgressList.GetAll(ctx)
st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue)
return nil, nil
}
}

func (st *CancelStages) findMigrationInProgress(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) {
return func(ctx context.Context, status stage.Statuser[any]) (any, error) {
m, err := findMigrationInProgress(ctx, st.ci)
if err != nil {
return nil, err
}
if len(all) == 0 {
return nil, fmt.Errorf("there are no migrations in progress")
}
var mip MigrationInProgress
m := all[0].(serialization.JSON)
err = json.Unmarshal(m, &mip)
if err != nil {
return nil, fmt.Errorf("parsing migration in progress: %w", err)
}
st.migrationID = mip.MigrationID
st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue)
return nil, err
st.migrationID = m.MigrationID
return nil, nil
}
}

Expand Down
30 changes: 30 additions & 0 deletions base/commands/migration/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//go:build std || migration

package migration

import (
"context"
"encoding/json"
"fmt"

"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
)

type MigrationInProgress struct {
MigrationID string `json:"id"`
}

func findMigrationInProgress(ctx context.Context, ci *hazelcast.ClientInternal) (MigrationInProgress, error) {
var mip MigrationInProgress
q := fmt.Sprintf("SELECT this FROM %s WHERE JSON_VALUE(this, '$.status') IN('STARTED', 'IN_PROGRESS', 'CANCELING')", StatusMapName)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return mip, fmt.Errorf("finding migration in progress: %w", err)
}
m := r.(serialization.JSON)
if err = json.Unmarshal(m, &mip); err != nil {
return mip, fmt.Errorf("parsing migration in progress: %w", err)
}
return mip, nil
}
14 changes: 6 additions & 8 deletions base/commands/migration/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
package migration

const (
StartQueueName = "__datamigration_start_queue"
StatusMapName = "__datamigration_migrations"
UpdateTopicPrefix = "__datamigration_updates_"
DebugLogsListPrefix = "__datamigration_debug_logs_"
MigrationsInProgressList = "__datamigrations_in_progress"
CancelQueue = "__datamigration_cancel_queue"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
StartQueueName = "__datamigration_start_queue"
StatusMapName = "__datamigration_migrations"
DebugLogsListPrefix = "__datamigration_debug_logs_"
CancelQueue = "__datamigration_cancel_queue"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
)

type Status string
Expand Down
8 changes: 4 additions & 4 deletions base/commands/migration/migration_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca
execErr = errors.New(errs)
break statusReaderLoop
case StatusCanceled, StatusCanceling:
execErr = clcerrors.ErrUserCancelled
execErr = errors.New("migration canceled by the user")
break statusReaderLoop
}
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.migrations[%d]') FROM %s WHERE __key= '%s'`, i, StatusMapName, migrationID)
Expand Down Expand Up @@ -130,7 +130,7 @@ func getDataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, mig
if err != nil {
return nil, err
}
q := fmt.Sprintf(`SELECT this FROM %s WHERE __key= '%s'`, StatusMapName, migrationID)
q := fmt.Sprintf(`SELECT this FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return nil, err
Expand Down Expand Up @@ -196,7 +196,7 @@ func WaitForMigrationToBeInProgress(ctx context.Context, ci *hazelcast.ClientInt
}
return errors.New(errs)
}
if Status(status) == StatusInProgress {
if Status(status) == StatusInProgress || Status(status) == StatusComplete {
return nil
}
}
Expand Down Expand Up @@ -290,5 +290,5 @@ func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query str
}
return r, nil
}
return nil, errors.New("no rows found")
return nil, errors.New("no result found")
}
2 changes: 1 addition & 1 deletion base/commands/migration/migration_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Selected data structures in the source cluster will be migrated to the target cl
}
}
ec.PrintlnUnnecessary("")
mID := MakeMigrationID()
mID := MigrationIDGeneratorFunc()
defer func() {
finalizeErr := finalizeMigration(ctx, ec, ci, mID, ec.Props().GetString(flagOutputDir))
if err == nil {
Expand Down
5 changes: 0 additions & 5 deletions base/commands/migration/start_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type StartStages struct {
configDir string
ci *hazelcast.ClientInternal
startQueue *hazelcast.Queue
statusMap *hazelcast.Map
logger log.Logger
}

Expand Down Expand Up @@ -63,10 +62,6 @@ func (st *StartStages) connectStage(ec plug.ExecContext) func(context.Context, s
if err != nil {
return nil, fmt.Errorf("retrieving the start Queue: %w", err)
}
st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName)
if err != nil {
return nil, fmt.Errorf("retrieving the status Map: %w", err)
}
return nil, nil
}
}
Expand Down
4 changes: 4 additions & 0 deletions base/commands/migration/start_stages_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ func createMapping(ctx context.Context, tcx it.TestContext) {
MustValue(tcx.Client.SQL().Execute(ctx, mSQL))
}

func migrationIDFunc() string {
return "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7"
}

func findMigrationID(ctx context.Context, tcx it.TestContext, c chan string) {
q := MustValue(tcx.Client.GetQueue(ctx, migration.StartQueueName))
var b migration.ConfigBundle
Expand Down
43 changes: 15 additions & 28 deletions base/commands/migration/status_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,14 @@ package migration

import (
"context"
"encoding/json"
"fmt"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
)

type StatusStages struct {
ci *hazelcast.ClientInternal
migrationsInProgressList *hazelcast.List
statusMap *hazelcast.Map
ci *hazelcast.ClientInternal
}

func NewStatusStages() *StatusStages {
Expand All @@ -31,40 +26,32 @@ func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage.
FailureMsg: "Could not connect to the migration cluster",
Func: st.connectStage(ec),
},
{
ProgressMsg: "Finding migration in progress",
SuccessMsg: "Found migration in progress",
FailureMsg: "Could not find a migration in progress",
Func: st.findMigrationInProgress(ec),
},
}
}

type MigrationInProgress struct {
MigrationID string `json:"migrationId"`
}

func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) {
return func(ctx context.Context, status stage.Statuser[any]) (any, error) {
var err error
st.ci, err = ec.ClientInternal(ctx)
if err != nil {
return nil, err
}
st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList)
if err != nil {
return nil, err
}
all, err := st.migrationsInProgressList.GetAll(ctx)
if err != nil {
return nil, err
}
if len(all) == 0 {
return nil, fmt.Errorf("there are no migrations in progress")
}
var mip MigrationInProgress
m := all[0].(serialization.JSON)
if err = json.Unmarshal(m, &mip); err != nil {
return nil, fmt.Errorf("parsing migration in progress: %w", err)
}
st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName)
return nil, nil
}
}

func (st *StatusStages) findMigrationInProgress(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) {
return func(ctx context.Context, status stage.Statuser[any]) (any, error) {
m, err := findMigrationInProgress(ctx, st.ci)
if err != nil {
return nil, err
}
return mip.MigrationID, err
return m.MigrationID, err
}
}
Loading
Loading