Skip to content

Commit

Permalink
[CLC-427][CLC-428]: Remove Data Migrations in Progress List and Statu…
Browse files Browse the repository at this point in the history
…s Map Usages (#424)
  • Loading branch information
Kutluhan Metin authored Nov 7, 2023
1 parent ead4607 commit beb23e4
Show file tree
Hide file tree
Showing 17 changed files with 186 additions and 125 deletions.
39 changes: 24 additions & 15 deletions base/commands/migration/cancel_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ package migration_test

import (
"context"
"encoding/json"
"os"
"sync"
"testing"

_ "github.com/hazelcast/hazelcast-commandline-client/base"
_ "github.com/hazelcast/hazelcast-commandline-client/base/commands"
"github.com/hazelcast/hazelcast-commandline-client/base/commands/migration"
. "github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/it"
hz "github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/stretchr/testify/require"
)
Expand All @@ -34,30 +35,38 @@ func noMigrationsCancelTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
createMapping(ctx, tcx)
err := tcx.CLC().Execute(ctx, "cancel")
require.Contains(t, err.Error(), "there are no migrations in progress")
require.Contains(t, err.Error(), "finding migration in progress: no result found")
})
}

func cancelTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
mID := migration.MakeMigrationID()
ci := hz.NewClientInternal(tcx.Client)
mID := migrationIDFunc()
createMapping(ctx, tcx)
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/cancel/migration_cancelling.json"))
Must(statusMap.Set(ctx, mID, serialization.JSON(b)))
l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList))
m := MustValue(json.Marshal(migration.MigrationInProgress{
MigrationID: mID,
}))
ok := MustValue(l.Add(ctx, serialization.JSON(m)))
require.Equal(t, true, ok)
tcx.WithReset(func() {
setStatusInProgress(tcx, ctx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
Must(tcx.CLC().Execute(ctx, "cancel"))
})
MustValue(l.Remove(ctx, serialization.JSON(m)))
}()
cq := MustValue(ci.Client().GetQueue(ctx, migration.CancelQueue))
MustValue(cq.Poll(ctx))
setStatusCancelling(mID, tcx, ctx)
wg.Wait()
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
MustValue(statusMap.Remove(ctx, mID))
tcx.AssertStdoutContains(`Migration canceled successfully.`)
})
}

func setStatusCancelling(migrationID string, tcx it.TestContext, ctx context.Context) {
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/stages/migration_cancelling.json"))
Must(statusMap.Set(ctx, migrationID, serialization.JSON(b)))
}
41 changes: 19 additions & 22 deletions base/commands/migration/cancel_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package migration
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
Expand All @@ -15,10 +14,9 @@ import (
)

type CancelStages struct {
ci *hazelcast.ClientInternal
cancelQueue *hazelcast.Queue
migrationsInProgressList *hazelcast.List
migrationID string
ci *hazelcast.ClientInternal
cancelQueue *hazelcast.Queue
migrationID string
}

func NewCancelStages() *CancelStages {
Expand All @@ -33,6 +31,12 @@ func (st *CancelStages) Build(ctx context.Context, ec plug.ExecContext) []stage.
FailureMsg: "Could not connect to the migration cluster",
Func: st.connectStage(ec),
},
{
ProgressMsg: "Finding migration in progress",
SuccessMsg: "Found migration in progress",
FailureMsg: "Could not find a migration in progress",
Func: st.findMigrationInProgress(ec),
},
{
ProgressMsg: "Canceling the migration",
SuccessMsg: "Canceled the migration",
Expand All @@ -49,26 +53,19 @@ func (st *CancelStages) connectStage(ec plug.ExecContext) func(context.Context,
if err != nil {
return nil, err
}
st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList)
if err != nil {
return nil, err
}
all, err := st.migrationsInProgressList.GetAll(ctx)
st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue)
return nil, nil
}
}

func (st *CancelStages) findMigrationInProgress(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) {
return func(ctx context.Context, status stage.Statuser[any]) (any, error) {
m, err := findMigrationInProgress(ctx, st.ci)
if err != nil {
return nil, err
}
if len(all) == 0 {
return nil, fmt.Errorf("there are no migrations in progress")
}
var mip MigrationInProgress
m := all[0].(serialization.JSON)
err = json.Unmarshal(m, &mip)
if err != nil {
return nil, fmt.Errorf("parsing migration in progress: %w", err)
}
st.migrationID = mip.MigrationID
st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue)
return nil, err
st.migrationID = m.MigrationID
return nil, nil
}
}

Expand Down
34 changes: 34 additions & 0 deletions base/commands/migration/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//go:build std || migration

package migration

import (
"context"
"encoding/json"
"fmt"

"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
)

type MigrationInProgress struct {
MigrationID string `json:"id"`
}

func findMigrationInProgress(ctx context.Context, ci *hazelcast.ClientInternal) (MigrationInProgress, error) {
var mip MigrationInProgress
q := fmt.Sprintf("SELECT this FROM %s WHERE JSON_VALUE(this, '$.status') IN('STARTED', 'IN_PROGRESS', 'CANCELING')", StatusMapName)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return mip, fmt.Errorf("finding migration in progress: %w", err)
}
rr, err := r.Get(0)
if err != nil {
return mip, fmt.Errorf("finding migration in progress: %w", err)
}
m := rr.(serialization.JSON)
if err = json.Unmarshal(m, &mip); err != nil {
return mip, fmt.Errorf("parsing migration in progress: %w", err)
}
return mip, nil
}
16 changes: 7 additions & 9 deletions base/commands/migration/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
package migration

const (
StartQueueName = "__datamigration_start_queue"
EstimateQueueName = "__datamigration_estimate_queue"
StatusMapName = "__datamigration_migrations"
UpdateTopicPrefix = "__datamigration_updates_"
DebugLogsListPrefix = "__datamigration_debug_logs_"
MigrationsInProgressList = "__datamigrations_in_progress"
CancelQueue = "__datamigration_cancel_queue"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
StartQueueName = "__datamigration_start_queue"
EstimateQueueName = "__datamigration_estimate_queue"
StatusMapName = "__datamigration_migrations"
DebugLogsListPrefix = "__datamigration_debug_logs_"
CancelQueue = "__datamigration_cancel_queue"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
)

type Status string
Expand Down
2 changes: 1 addition & 1 deletion base/commands/migration/estimate.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Estimation usually ends within 15 seconds.`, banner))
if !paths.Exists(conf) {
return fmt.Errorf("migration config does not exist: %s", conf)
}
mID := MakeMigrationID()
mID := makeMigrationID()
stages, err := NewEstimateStages(ec.Logger(), mID, conf)
if err != nil {
return err
Expand Down
13 changes: 4 additions & 9 deletions base/commands/migration/migration_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ var noDataStructuresFoundErr = errors.New("no datastructures found to migrate")
var progressMsg = "Migrating %s: %s"

func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) ([]stage.Stage[any], error) {
childCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := WaitForMigrationToBeInProgress(childCtx, ci, migrationID); err != nil {
return nil, fmt.Errorf("waiting migration to be created: %w", err)
}
var stages []stage.Stage[any]
dss, err := getDataStructuresToBeMigrated(ctx, ec, migrationID)
if err != nil {
Expand Down Expand Up @@ -82,7 +77,7 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca
execErr = errors.New(errs)
break statusReaderLoop
case StatusCanceled, StatusCanceling:
execErr = clcerrors.ErrUserCancelled
execErr = errors.New("migration canceled by the user")
break statusReaderLoop
case StatusInProgress:
rt, cp, err := fetchOverallProgress(ctx, ci, migrationID)
Expand Down Expand Up @@ -147,7 +142,7 @@ func getDataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, mig
if err != nil {
return nil, err
}
q := fmt.Sprintf(`SELECT this FROM %s WHERE __key= '%s'`, StatusMapName, migrationID)
q := fmt.Sprintf(`SELECT this FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return nil, err
Expand Down Expand Up @@ -220,7 +215,7 @@ func WaitForMigrationToBeInProgress(ctx context.Context, ci *hazelcast.ClientInt
}
return errors.New(errs)
}
if Status(status) == StatusInProgress {
if Status(status) == StatusInProgress || Status(status) == StatusComplete {
return nil
}
}
Expand Down Expand Up @@ -369,7 +364,7 @@ func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query str
}
return row, nil
}
return nil, errors.New("no rows found")
return nil, errors.New("no result found")
}

func maybePrintWarnings(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) {
Expand Down
2 changes: 1 addition & 1 deletion base/commands/migration/migration_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Selected data structures in the source cluster will be migrated to the target cl
}
}
ec.PrintlnUnnecessary("")
mID := MakeMigrationID()
mID := MigrationIDGeneratorFunc()
defer func() {
maybePrintWarnings(ctx, ec, ci, mID)
finalizeErr := finalizeMigration(ctx, ec, ci, mID, ec.Props().GetString(flagOutputDir))
Expand Down
51 changes: 46 additions & 5 deletions base/commands/migration/start_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
"github.com/hazelcast/hazelcast-commandline-client/internal/log"
Expand All @@ -18,7 +19,6 @@ type StartStages struct {
configDir string
ci *hazelcast.ClientInternal
startQueue *hazelcast.Queue
statusMap *hazelcast.Map
logger log.Logger
}

Expand Down Expand Up @@ -47,6 +47,12 @@ func (st *StartStages) Build(ctx context.Context, ec plug.ExecContext) []stage.S
FailureMsg: "Could not start the migration",
Func: st.startStage(),
},
{
ProgressMsg: "Doing pre-checks",
SuccessMsg: "Pre-checks complete",
FailureMsg: "Could not complete pre-checks",
Func: st.preCheckStage(),
},
}
}

Expand All @@ -61,10 +67,6 @@ func (st *StartStages) connectStage(ec plug.ExecContext) func(context.Context, s
if err != nil {
return nil, fmt.Errorf("retrieving the start Queue: %w", err)
}
st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName)
if err != nil {
return nil, fmt.Errorf("retrieving the status Map: %w", err)
}
return nil, nil
}
}
Expand All @@ -78,6 +80,45 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) (
if err = st.startQueue.Put(ctx, cb); err != nil {
return nil, fmt.Errorf("updating start Queue: %w", err)
}
childCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err = waitForMigrationToStart(childCtx, st.ci, st.migrationID); err != nil {
return nil, err
}
return nil, nil
}
}

func (st *StartStages) preCheckStage() func(context.Context, stage.Statuser[any]) (any, error) {
return func(ctx context.Context, status stage.Statuser[any]) (any, error) {
childCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := WaitForMigrationToBeInProgress(childCtx, st.ci, st.migrationID); err != nil {
return nil, fmt.Errorf("waiting for prechecks to complete: %w", err)
}
return nil, nil
}
}

func waitForMigrationToStart(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error {
for {
status, err := fetchMigrationStatus(ctx, ci, migrationID)
if err != nil {
if errors.Is(err, migrationStatusNotFoundErr) {
// migration status will not be available for a while, so we should wait for it
continue
}
return err
}
if Status(status) == StatusFailed {
errs, err := fetchMigrationErrors(ctx, ci, migrationID)
if err != nil {
return fmt.Errorf("migration failed and dmt cannot fetch migration errors: %w", err)
}
return errors.New(errs)
}
if Status(status) == StatusStarted || Status(status) == StatusInProgress || Status(status) == StatusComplete {
return nil
}
}
}
14 changes: 10 additions & 4 deletions base/commands/migration/start_stages_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,17 @@ func TestMigrationStages(t *testing.T) {
{
name: "successful",
statusMapStateFiles: []string{
"testdata/start/migration_success_initial.json",
"testdata/start/migration_success_completed.json",
"testdata/stages/migration_started.json",
"testdata/stages/migration_in_progress.json",
"testdata/stages/migration_completed.json",
},
},
{
name: "failure",
statusMapStateFiles: []string{
"testdata/start/migration_success_initial.json",
"testdata/start/migration_success_failure.json",
"testdata/stages/migration_started.json",
"testdata/stages/migration_in_progress.json",
"testdata/stages/migration_failed.json",
},
expectedErr: errors.New("Failed migrating IMAP: imap5: * some error\n* another error"),
},
Expand Down Expand Up @@ -110,6 +112,10 @@ func createMapping(ctx context.Context, tcx it.TestContext) {
MustValue(tcx.Client.SQL().Execute(ctx, mSQL))
}

func migrationIDFunc() string {
return "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7"
}

func findMigrationID(ctx context.Context, tcx it.TestContext, c chan string) {
q := MustValue(tcx.Client.GetQueue(ctx, migration.StartQueueName))
var b migration.ConfigBundle
Expand Down
Loading

0 comments on commit beb23e4

Please sign in to comment.