Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLC-427][CLC-428]: Remove Data Migrations in Progress List and Status Map Usages #424

Merged
merged 14 commits into from
Nov 7, 2023
35 changes: 21 additions & 14 deletions base/commands/migration/cancel_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ package migration_test

import (
"context"
"encoding/json"
"os"
"sync"
"testing"
"time"

_ "github.com/hazelcast/hazelcast-commandline-client/base"
_ "github.com/hazelcast/hazelcast-commandline-client/base/commands"
Expand Down Expand Up @@ -34,30 +35,36 @@ func noMigrationsCancelTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
createMapping(ctx, tcx)
err := tcx.CLC().Execute(ctx, "cancel")
require.Contains(t, err.Error(), "there are no migrations in progress")
require.Contains(t, err.Error(), "finding migration in progress: no rows found")
})
}

func cancelTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
mID := migration.MakeMigrationID()
mID := migrationIDFunc()
createMapping(ctx, tcx)
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/cancel/migration_cancelling.json"))
Must(statusMap.Set(ctx, mID, serialization.JSON(b)))
l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList))
m := MustValue(json.Marshal(migration.MigrationInProgress{
MigrationID: mID,
}))
ok := MustValue(l.Add(ctx, serialization.JSON(m)))
require.Equal(t, true, ok)
tcx.WithReset(func() {
setStatusInProgress(tcx, ctx)
var wg sync.WaitGroup
wg.Add(1)
go tcx.WithReset(func() {
defer wg.Done()
Must(tcx.CLC().Execute(ctx, "cancel"))
})
MustValue(l.Remove(ctx, serialization.JSON(m)))
time.Sleep(1 * time.Second)
yuce marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may have a problem if these tests don't work without sleeping.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh, these tests are not enough anyway. I'll build the migration cluster build from the "remove in progress list" PR and try basic things.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(also I don't get why we have sleep in this test, how mocking of migration cluster is done is complex for me. )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setStatusCancelling(mID, tcx, ctx)
wg.Wait()
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
MustValue(statusMap.Remove(ctx, mID))
tcx.AssertStdoutContains(`Migration canceled successfully.`)
})
}

func setStatusCancelling(migrationID string, tcx it.TestContext, ctx context.Context) {
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/cancel/migration_cancelling.json"))
Must(statusMap.Set(ctx, migrationID, serialization.JSON(b)))
}
25 changes: 5 additions & 20 deletions base/commands/migration/cancel_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package migration
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
Expand All @@ -15,10 +14,9 @@ import (
)

type CancelStages struct {
ci *hazelcast.ClientInternal
cancelQueue *hazelcast.Queue
migrationsInProgressList *hazelcast.List
migrationID string
ci *hazelcast.ClientInternal
cancelQueue *hazelcast.Queue
migrationID string
}

func NewCancelStages() *CancelStages {
Expand Down Expand Up @@ -49,24 +47,11 @@ func (st *CancelStages) connectStage(ec plug.ExecContext) func(context.Context,
if err != nil {
return nil, err
}
st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList)
m, err := findMigrationInProgress(ctx, st.ci)
if err != nil {
return nil, err
}
all, err := st.migrationsInProgressList.GetAll(ctx)
if err != nil {
return nil, err
}
if len(all) == 0 {
return nil, fmt.Errorf("there are no migrations in progress")
}
var mip MigrationInProgress
m := all[0].(serialization.JSON)
err = json.Unmarshal(m, &mip)
if err != nil {
return nil, fmt.Errorf("parsing migration in progress: %w", err)
}
st.migrationID = mip.MigrationID
st.migrationID = m.MigrationID
st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue)
return nil, err
}
Expand Down
14 changes: 6 additions & 8 deletions base/commands/migration/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
package migration

const (
StartQueueName = "__datamigration_start_queue"
StatusMapName = "__datamigration_migrations"
UpdateTopicPrefix = "__datamigration_updates_"
DebugLogsListPrefix = "__datamigration_debug_logs_"
MigrationsInProgressList = "__datamigrations_in_progress"
CancelQueue = "__datamigration_cancel_queue"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
StartQueueName = "__datamigration_start_queue"
StatusMapName = "__datamigration_migrations"
DebugLogsListPrefix = "__datamigration_debug_logs_"
CancelQueue = "__datamigration_cancel_queue"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
)

type Status string
Expand Down
2 changes: 1 addition & 1 deletion base/commands/migration/migration_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func getDataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, mig
if err != nil {
return nil, err
}
q := fmt.Sprintf(`SELECT this FROM %s WHERE __key= '%s'`, StatusMapName, migrationID)
q := fmt.Sprintf(`SELECT this FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion base/commands/migration/migration_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Selected data structures in the source cluster will be migrated to the target cl
}
}
ec.PrintlnUnnecessary("")
mID := MakeMigrationID()
mID := MigrationIDGeneratorFunc()
defer func() {
finalizeErr := finalizeMigration(ctx, ec, ci, mID, ec.Props().GetString(flagOutputDir))
if err == nil {
Expand Down
5 changes: 0 additions & 5 deletions base/commands/migration/start_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type StartStages struct {
configDir string
ci *hazelcast.ClientInternal
startQueue *hazelcast.Queue
statusMap *hazelcast.Map
logger log.Logger
}

Expand Down Expand Up @@ -63,10 +62,6 @@ func (st *StartStages) connectStage(ec plug.ExecContext) func(context.Context, s
if err != nil {
return nil, fmt.Errorf("retrieving the start Queue: %w", err)
}
st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName)
if err != nil {
return nil, fmt.Errorf("retrieving the status Map: %w", err)
}
return nil, nil
}
}
Expand Down
20 changes: 4 additions & 16 deletions base/commands/migration/start_stages_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ package migration_test

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"sync"
"testing"
"time"

hz "github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
Expand Down Expand Up @@ -57,6 +55,8 @@ func startMigrationTest(t *testing.T, expectedErr error, statusMapStateFiles []s
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
migration.MigrationIDGeneratorFunc = migrationIDFunc
mID := migrationIDFunc()
ci := hz.NewClientInternal(tcx.Client)
createMapping(ctx, tcx)
createMemberLogs(t, ctx, ci)
Expand All @@ -69,9 +69,6 @@ func startMigrationTest(t *testing.T, expectedErr error, statusMapStateFiles []s
defer wg.Done()
execErr = tcx.CLC().Execute(ctx, "start", "dmt-config", "--yes", "-o", outDir)
})
c := make(chan string)
go findMigrationID(ctx, tcx, c)
mID := <-c
wg.Add(1)
go migrationRunner(t, ctx, tcx, mID, &wg, statusMapStateFiles)
wg.Wait()
Expand Down Expand Up @@ -110,15 +107,6 @@ func createMapping(ctx context.Context, tcx it.TestContext) {
MustValue(tcx.Client.SQL().Execute(ctx, mSQL))
}

func findMigrationID(ctx context.Context, tcx it.TestContext, c chan string) {
q := MustValue(tcx.Client.GetQueue(ctx, migration.StartQueueName))
var b migration.ConfigBundle
for {
v := MustValue(q.PollWithTimeout(ctx, time.Second))
if v != nil {
Must(json.Unmarshal(v.(serialization.JSON), &b))
c <- b.MigrationID
break
}
}
func migrationIDFunc() string {
return "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7"
}
40 changes: 18 additions & 22 deletions base/commands/migration/status_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (
)

type StatusStages struct {
ci *hazelcast.ClientInternal
migrationsInProgressList *hazelcast.List
statusMap *hazelcast.Map
ci *hazelcast.ClientInternal
}

func NewStatusStages() *StatusStages {
Expand All @@ -35,7 +33,7 @@ func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage.
}

type MigrationInProgress struct {
MigrationID string `json:"migrationId"`
MigrationID string `json:"id"`
}

func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) {
Expand All @@ -45,26 +43,24 @@ func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context,
if err != nil {
return nil, err
}
st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList)
m, err := findMigrationInProgress(ctx, st.ci)
if err != nil {
return nil, err
}
all, err := st.migrationsInProgressList.GetAll(ctx)
if err != nil {
return nil, err
}
if len(all) == 0 {
return nil, fmt.Errorf("there are no migrations in progress")
}
var mip MigrationInProgress
m := all[0].(serialization.JSON)
if err = json.Unmarshal(m, &mip); err != nil {
return nil, fmt.Errorf("parsing migration in progress: %w", err)
}
st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName)
if err != nil {
return nil, err
}
return mip.MigrationID, err
return m.MigrationID, err
}
}

func findMigrationInProgress(ctx context.Context, ci *hazelcast.ClientInternal) (MigrationInProgress, error) {
var mip MigrationInProgress
q := fmt.Sprintf("SELECT this FROM %s WHERE JSON_VALUE(this, '$.status') IN('STARTED', 'IN_PROGRESS', 'CANCELING')", StatusMapName)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return mip, fmt.Errorf("finding migration in progress: %w", err)
}
m := r.(serialization.JSON)
if err = json.Unmarshal(m, &mip); err != nil {
return mip, fmt.Errorf("parsing migration in progress: %w", err)
}
return mip, nil
}
42 changes: 16 additions & 26 deletions base/commands/migration/status_stages_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ package migration_test

import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
"testing"
"time"

_ "github.com/hazelcast/hazelcast-commandline-client/base"
_ "github.com/hazelcast/hazelcast-commandline-client/base/commands"
Expand Down Expand Up @@ -38,6 +38,7 @@ func noMigrationsStatusTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
createMapping(ctx, tcx)
var wg sync.WaitGroup
wg.Add(1)
var execErr error
Expand All @@ -46,7 +47,7 @@ func noMigrationsStatusTest(t *testing.T) {
execErr = tcx.CLC().Execute(ctx, "status")
})
wg.Wait()
require.Contains(t, execErr.Error(), "there are no migrations in progress")
require.Contains(t, execErr.Error(), "finding migration in progress: no rows found")
})
}

Expand All @@ -55,21 +56,19 @@ func statusTest(t *testing.T) {
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
ci := hz.NewClientInternal(tcx.Client)
progressList := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList))
mID := preStatusRunner(t, tcx, ctx, ci, progressList)
defer postStatusRunner(ctx, mID, progressList)
createMapping(ctx, tcx)
createMemberLogs(t, ctx, ci)
defer removeMembersLogs(ctx, ci)
outDir := MustValue(os.MkdirTemp("", "clc-"))
mID := setStatusInProgress(tcx, ctx)
var wg sync.WaitGroup
wg.Add(1)
go tcx.WithReset(func() {
defer wg.Done()
Must(tcx.CLC().Execute(ctx, "status", "-o", outDir))
})
it.Eventually(t, func() bool {
return migration.WaitForMigrationToBeInProgress(ctx, ci, mID) == nil
})
statusRunner(mID, tcx, ctx)
time.Sleep(1 * time.Second)
setStatusCompleted(mID, tcx, ctx)
wg.Wait()
tcx.AssertStdoutContains("Connected to the migration cluster")
tcx.WithReset(func() {
Expand All @@ -86,18 +85,20 @@ func statusTest(t *testing.T) {
})
}

func preStatusRunner(t *testing.T, tcx it.TestContext, ctx context.Context, ci *hz.ClientInternal, progressList *hz.List) string {
createMapping(ctx, tcx)
createMemberLogs(t, ctx, ci)
mID := migration.MakeMigrationID()
m := MustValue(json.Marshal(migration.MigrationInProgress{MigrationID: mID}))
require.Equal(t, true, MustValue(progressList.Add(ctx, serialization.JSON(m))))
func setStatusInProgress(tcx it.TestContext, ctx context.Context) string {
mID := migrationIDFunc()
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/start/migration_success_initial.json"))
Must(statusMap.Set(ctx, mID, serialization.JSON(b)))
return mID
}

func setStatusCompleted(migrationID string, tcx it.TestContext, ctx context.Context) {
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/start/migration_success_completed.json"))
Must(statusMap.Set(ctx, migrationID, serialization.JSON(b)))
}

func createMemberLogs(t *testing.T, ctx context.Context, ci *hz.ClientInternal) {
for _, m := range ci.OrderedMembers() {
l := MustValue(ci.Client().GetList(ctx, migration.DebugLogsListPrefix+m.UUID.String()))
Expand All @@ -113,14 +114,3 @@ func removeMembersLogs(ctx context.Context, ci *hz.ClientInternal) {
Must(l.Destroy(ctx))
}
}

func statusRunner(migrationID string, tcx it.TestContext, ctx context.Context) {
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/start/migration_success_completed.json"))
Must(statusMap.Set(ctx, migrationID, serialization.JSON(b)))
}

func postStatusRunner(ctx context.Context, migrationID string, progressList *hz.List) {
m := MustValue(json.Marshal(migration.MigrationInProgress{MigrationID: migrationID}))
MustValue(progressList.Remove(ctx, serialization.JSON(m)))
}
8 changes: 3 additions & 5 deletions base/commands/migration/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,8 @@ func readPathAsString(path string) (string, error) {
return string(b), nil
}

func MakeMigrationID() string {
return types.NewUUID().String()
}
var MigrationIDGeneratorFunc = makeMigrationID

func MakeUpdateTopicName(migrationID string) string {
return UpdateTopicPrefix + migrationID
func makeMigrationID() string {
return types.NewUUID().String()
}
Loading