Skip to content

Commit

Permalink
remove status map and data migrations in progress list usages
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin committed Nov 2, 2023
1 parent cd3e201 commit daf2529
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 115 deletions.
36 changes: 22 additions & 14 deletions base/commands/migration/cancel_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ package migration_test

import (
"context"
"encoding/json"
"os"
"sync"
"testing"

_ "github.com/hazelcast/hazelcast-commandline-client/base"
_ "github.com/hazelcast/hazelcast-commandline-client/base/commands"
"github.com/hazelcast/hazelcast-commandline-client/base/commands/migration"
. "github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/it"
hz "github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/stretchr/testify/require"
)
Expand All @@ -34,30 +35,37 @@ func noMigrationsCancelTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
createMapping(ctx, tcx)
err := tcx.CLC().Execute(ctx, "cancel")
require.Contains(t, err.Error(), "there are no migrations in progress")
require.Contains(t, err.Error(), "finding migration in progress: no rows found")
})
}

func cancelTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
mID := migration.MakeMigrationID()
mID := "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7"
ci := hz.NewClientInternal(tcx.Client)
createMapping(ctx, tcx)
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/cancel/migration_cancelling.json"))
Must(statusMap.Set(ctx, mID, serialization.JSON(b)))
l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList))
m := MustValue(json.Marshal(migration.MigrationInProgress{
MigrationID: mID,
}))
ok := MustValue(l.Add(ctx, serialization.JSON(m)))
require.Equal(t, true, ok)
tcx.WithReset(func() {
setStatusInProgress(tcx, ctx)
var wg sync.WaitGroup
wg.Add(1)
go tcx.WithReset(func() {
defer wg.Done()
Must(tcx.CLC().Execute(ctx, "cancel"))
})
MustValue(l.Remove(ctx, serialization.JSON(m)))
it.Eventually(t, func() bool {
return migration.WaitForMigrationToBeInProgress(ctx, ci, mID) == nil
})
setStatusCancelling(mID, tcx, ctx)
wg.Wait()
tcx.AssertStdoutContains(`Migration canceled successfully.`)
})
}

func setStatusCancelling(migrationID string, tcx it.TestContext, ctx context.Context) {
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/cancel/migration_cancelling.json"))
Must(statusMap.Set(ctx, migrationID, serialization.JSON(b)))
}
25 changes: 5 additions & 20 deletions base/commands/migration/cancel_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package migration
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
Expand All @@ -15,10 +14,9 @@ import (
)

type CancelStages struct {
ci *hazelcast.ClientInternal
cancelQueue *hazelcast.Queue
migrationsInProgressList *hazelcast.List
migrationID string
ci *hazelcast.ClientInternal
cancelQueue *hazelcast.Queue
migrationID string
}

func NewCancelStages() *CancelStages {
Expand Down Expand Up @@ -49,24 +47,11 @@ func (st *CancelStages) connectStage(ec plug.ExecContext) func(context.Context,
if err != nil {
return nil, err
}
st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList)
m, err := findMigrationInProgress(ctx, st.ci)
if err != nil {
return nil, err
}
all, err := st.migrationsInProgressList.GetAll(ctx)
if err != nil {
return nil, err
}
if len(all) == 0 {
return nil, fmt.Errorf("there are no migrations in progress")
}
var mip MigrationInProgress
m := all[0].(serialization.JSON)
err = json.Unmarshal(m, &mip)
if err != nil {
return nil, fmt.Errorf("parsing migration in progress: %w", err)
}
st.migrationID = mip.MigrationID
st.migrationID = m.MigrationID
st.cancelQueue, err = st.ci.Client().GetQueue(ctx, CancelQueue)
return nil, err
}
Expand Down
14 changes: 6 additions & 8 deletions base/commands/migration/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
package migration

const (
StartQueueName = "__datamigration_start_queue"
StatusMapName = "__datamigration_migrations"
UpdateTopicPrefix = "__datamigration_updates_"
DebugLogsListPrefix = "__datamigration_debug_logs_"
MigrationsInProgressList = "__datamigrations_in_progress"
CancelQueue = "__datamigration_cancel_queue"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
StartQueueName = "__datamigration_start_queue"
StatusMapName = "__datamigration_migrations"
DebugLogsListPrefix = "__datamigration_debug_logs_"
CancelQueue = "__datamigration_cancel_queue"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
)

type Status string
Expand Down
2 changes: 1 addition & 1 deletion base/commands/migration/migration_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func getDataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, mig
if err != nil {
return nil, err
}
q := fmt.Sprintf(`SELECT this FROM %s WHERE __key= '%s'`, StatusMapName, migrationID)
q := fmt.Sprintf(`SELECT this FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion base/commands/migration/migration_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ Selected data structures in the source cluster will be migrated to the target cl
}
}
ec.PrintlnUnnecessary("")
mID := MakeMigrationID()
mID := MigrationIDGeneratorFunc()
defer func() {
finalizeErr := finalizeMigration(ctx, ec, ci, mID, ec.Props().GetString(flagOutputDir))
if err == nil {
Expand Down
5 changes: 0 additions & 5 deletions base/commands/migration/start_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ type StartStages struct {
configDir string
ci *hazelcast.ClientInternal
startQueue *hazelcast.Queue
statusMap *hazelcast.Map
logger log.Logger
}

Expand Down Expand Up @@ -63,10 +62,6 @@ func (st *StartStages) connectStage(ec plug.ExecContext) func(context.Context, s
if err != nil {
return nil, fmt.Errorf("retrieving the start Queue: %w", err)
}
st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName)
if err != nil {
return nil, fmt.Errorf("retrieving the status Map: %w", err)
}
return nil, nil
}
}
Expand Down
20 changes: 4 additions & 16 deletions base/commands/migration/start_stages_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ package migration_test

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"sync"
"testing"
"time"

hz "github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
Expand Down Expand Up @@ -57,6 +55,8 @@ func startMigrationTest(t *testing.T, expectedErr error, statusMapStateFiles []s
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
migration.MigrationIDGeneratorFunc = migrationIDFunc
mID := migrationIDFunc()
ci := hz.NewClientInternal(tcx.Client)
createMapping(ctx, tcx)
createMemberLogs(t, ctx, ci)
Expand All @@ -69,9 +69,6 @@ func startMigrationTest(t *testing.T, expectedErr error, statusMapStateFiles []s
defer wg.Done()
execErr = tcx.CLC().Execute(ctx, "start", "dmt-config", "--yes", "-o", outDir)
})
c := make(chan string)
go findMigrationID(ctx, tcx, c)
mID := <-c
wg.Add(1)
go migrationRunner(t, ctx, tcx, mID, &wg, statusMapStateFiles)
wg.Wait()
Expand Down Expand Up @@ -110,15 +107,6 @@ func createMapping(ctx context.Context, tcx it.TestContext) {
MustValue(tcx.Client.SQL().Execute(ctx, mSQL))
}

func findMigrationID(ctx context.Context, tcx it.TestContext, c chan string) {
q := MustValue(tcx.Client.GetQueue(ctx, migration.StartQueueName))
var b migration.ConfigBundle
for {
v := MustValue(q.PollWithTimeout(ctx, time.Second))
if v != nil {
Must(json.Unmarshal(v.(serialization.JSON), &b))
c <- b.MigrationID
break
}
}
func migrationIDFunc() string {
return "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7"
}
40 changes: 18 additions & 22 deletions base/commands/migration/status_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (
)

type StatusStages struct {
ci *hazelcast.ClientInternal
migrationsInProgressList *hazelcast.List
statusMap *hazelcast.Map
ci *hazelcast.ClientInternal
}

func NewStatusStages() *StatusStages {
Expand All @@ -35,7 +33,7 @@ func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage.
}

type MigrationInProgress struct {
MigrationID string `json:"migrationId"`
MigrationID string `json:"id"`
}

func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) {
Expand All @@ -45,26 +43,24 @@ func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context,
if err != nil {
return nil, err
}
st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList)
m, err := findMigrationInProgress(ctx, st.ci)
if err != nil {
return nil, err
}
all, err := st.migrationsInProgressList.GetAll(ctx)
if err != nil {
return nil, err
}
if len(all) == 0 {
return nil, fmt.Errorf("there are no migrations in progress")
}
var mip MigrationInProgress
m := all[0].(serialization.JSON)
if err = json.Unmarshal(m, &mip); err != nil {
return nil, fmt.Errorf("parsing migration in progress: %w", err)
}
st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName)
if err != nil {
return nil, err
}
return mip.MigrationID, err
return m.MigrationID, err
}
}

func findMigrationInProgress(ctx context.Context, ci *hazelcast.ClientInternal) (MigrationInProgress, error) {
var mip MigrationInProgress
q := fmt.Sprintf("SELECT this FROM %s WHERE JSON_VALUE(this, '$.status') IN('STARTED', 'IN_PROGRESS', 'CANCELLING')", StatusMapName)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return mip, fmt.Errorf("finding migration in progress: %w", err)
}
m := r.(serialization.JSON)
if err = json.Unmarshal(m, &mip); err != nil {
return mip, fmt.Errorf("parsing migration in progress: %w", err)
}
return mip, nil
}
37 changes: 14 additions & 23 deletions base/commands/migration/status_stages_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package migration_test

import (
"context"
"encoding/json"
"fmt"
"os"
"sync"
Expand Down Expand Up @@ -38,6 +37,7 @@ func noMigrationsStatusTest(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
createMapping(ctx, tcx)
var wg sync.WaitGroup
wg.Add(1)
var execErr error
Expand All @@ -46,7 +46,7 @@ func noMigrationsStatusTest(t *testing.T) {
execErr = tcx.CLC().Execute(ctx, "status")
})
wg.Wait()
require.Contains(t, execErr.Error(), "there are no migrations in progress")
require.Contains(t, execErr.Error(), "finding migration in progress: no rows found")
})
}

Expand All @@ -55,11 +55,11 @@ func statusTest(t *testing.T) {
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
ci := hz.NewClientInternal(tcx.Client)
progressList := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList))
mID := preStatusRunner(t, tcx, ctx, ci, progressList)
defer postStatusRunner(ctx, mID, progressList)
createMapping(ctx, tcx)
createMemberLogs(t, ctx, ci)
defer removeMembersLogs(ctx, ci)
outDir := MustValue(os.MkdirTemp("", "clc-"))
mID := setStatusInProgress(tcx, ctx)
var wg sync.WaitGroup
wg.Add(1)
go tcx.WithReset(func() {
Expand All @@ -69,7 +69,7 @@ func statusTest(t *testing.T) {
it.Eventually(t, func() bool {
return migration.WaitForMigrationToBeInProgress(ctx, ci, mID) == nil
})
statusRunner(mID, tcx, ctx)
setStatusCompleted(mID, tcx, ctx)
wg.Wait()
tcx.AssertStdoutContains("Connected to the migration cluster")
tcx.WithReset(func() {
Expand All @@ -86,18 +86,20 @@ func statusTest(t *testing.T) {
})
}

func preStatusRunner(t *testing.T, tcx it.TestContext, ctx context.Context, ci *hz.ClientInternal, progressList *hz.List) string {
createMapping(ctx, tcx)
createMemberLogs(t, ctx, ci)
mID := migration.MakeMigrationID()
m := MustValue(json.Marshal(migration.MigrationInProgress{MigrationID: mID}))
require.Equal(t, true, MustValue(progressList.Add(ctx, serialization.JSON(m))))
func setStatusInProgress(tcx it.TestContext, ctx context.Context) string {
mID := "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7"
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/start/migration_success_initial.json"))
Must(statusMap.Set(ctx, mID, serialization.JSON(b)))
return mID
}

func setStatusCompleted(migrationID string, tcx it.TestContext, ctx context.Context) {
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/start/migration_success_completed.json"))
Must(statusMap.Set(ctx, migrationID, serialization.JSON(b)))
}

func createMemberLogs(t *testing.T, ctx context.Context, ci *hz.ClientInternal) {
for _, m := range ci.OrderedMembers() {
l := MustValue(ci.Client().GetList(ctx, migration.DebugLogsListPrefix+m.UUID.String()))
Expand All @@ -113,14 +115,3 @@ func removeMembersLogs(ctx context.Context, ci *hz.ClientInternal) {
Must(l.Destroy(ctx))
}
}

func statusRunner(migrationID string, tcx it.TestContext, ctx context.Context) {
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := MustValue(os.ReadFile("testdata/start/migration_success_completed.json"))
Must(statusMap.Set(ctx, migrationID, serialization.JSON(b)))
}

func postStatusRunner(ctx context.Context, migrationID string, progressList *hz.List) {
m := MustValue(json.Marshal(migration.MigrationInProgress{MigrationID: migrationID}))
MustValue(progressList.Remove(ctx, serialization.JSON(m)))
}
8 changes: 3 additions & 5 deletions base/commands/migration/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,8 @@ func readPathAsString(path string) (string, error) {
return string(b), nil
}

func MakeMigrationID() string {
return types.NewUUID().String()
}
var MigrationIDGeneratorFunc = makeMigrationID

func MakeUpdateTopicName(migrationID string) string {
return UpdateTopicPrefix + migrationID
func makeMigrationID() string {
return types.NewUUID().String()
}

0 comments on commit daf2529

Please sign in to comment.