diff --git a/.github/workflows/test-all-386.yaml b/.github/workflows/test-all-386.yaml index f839be0b3..c20513006 100644 --- a/.github/workflows/test-all-386.yaml +++ b/.github/workflows/test-all-386.yaml @@ -57,4 +57,4 @@ jobs: - name: "Run All Tests" run: | - GOARCH=386 make test TEST_FLAGS="-v -count 1 -timeout 30m" + GOARCH=386 make test-dmt TEST_FLAGS="-v -count 1 -timeout 30m" diff --git a/.github/workflows/test-all.yaml b/.github/workflows/test-all.yaml index 1ecb75df5..d66fd8428 100644 --- a/.github/workflows/test-all.yaml +++ b/.github/workflows/test-all.yaml @@ -70,4 +70,4 @@ jobs: - name: "Run All Tests" run: | - make test + make test-dmt diff --git a/Makefile b/Makefile index 28b53c01d..71aaa40ee 100644 --- a/Makefile +++ b/Makefile @@ -19,9 +19,15 @@ TARGZ ?= true build: CGO_ENABLED=0 go build -tags base,std,hazelcastinternal,hazelcastinternaltest -ldflags "$(LDFLAGS)" -o build/$(BINARY_NAME) ./cmd/clc +build-dmt: + CGO_ENABLED=0 go build -tags base,migration,config,home,version,hazelcastinternal,hazelcastinternaltest -ldflags "$(LDFLAGS)" -o build/$(BINARY_NAME) ./cmd/clc + test: go test -tags base,std,hazelcastinternal,hazelcastinternaltest -p 1 $(TEST_FLAGS) ./... +test-dmt: + go test -tags base,std,migration,config,home,version,hazelcastinternal,hazelcastinternaltest -p 1 $(TEST_FLAGS) ./... + test-cover: go test -tags base,std,hazelcastinternal,hazelcastinternaltest -p 1 $(TEST_FLAGS) -coverprofile=coverage.out -coverpkg $(PACKAGES) -coverprofile=$(COVERAGE_OUT) ./... diff --git a/base/commands/migration/const.go b/base/commands/migration/const.go index 98f7abba6..5d9f684df 100644 --- a/base/commands/migration/const.go +++ b/base/commands/migration/const.go @@ -1,8 +1,33 @@ +//go:build std || migration + package migration const ( - startQueueName = "__datamigration_start_queue" - statusMapEntryName = "status" - argDMTConfig = "dmtConfig" - argTitleDMTConfig = "DMT configuration" + StartQueueName = "__datamigration_start_queue" + StatusMapEntryName = "status" + StatusMapName = "__datamigration_migrations" + UpdateTopicPrefix = "__datamigration_updates_" + DebugLogsListPrefix = "__datamigration_debug_logs_" + MigrationsInProgressList = "__datamigrations_in_progress" + startQueueName = "__datamigration_start_queue" + statusMapEntryName = "status" + argDMTConfig = "dmtConfig" + argTitleDMTConfig = "DMT configuration" ) + +type Status string + +const ( + StatusStarted Status = "STARTED" + StatusCanceling Status = "CANCELING" + StatusComplete Status = "COMPLETED" + StatusCanceled Status = "CANCELED" + StatusFailed Status = "FAILED" + StatusInProgress Status = "IN_PROGRESS" +) + +const flagOutputDir = "output-dir" + +const banner = `Hazelcast Data Migration Tool v5.3.0 +(c) 2023 Hazelcast, Inc. +` diff --git a/base/commands/migration/dummy.go b/base/commands/migration/dummy.go index 753000bce..b17283ab8 100644 --- a/base/commands/migration/dummy.go +++ b/base/commands/migration/dummy.go @@ -1 +1,3 @@ package migration + +// This file exists only for compilation diff --git a/base/commands/migration/migration.go b/base/commands/migration/migration.go index ad87d21a5..27df52448 100644 --- a/base/commands/migration/migration.go +++ b/base/commands/migration/migration.go @@ -1,4 +1,4 @@ -//go:build migration +//go:build std || migration package migration diff --git a/base/commands/migration/migration_stages.go b/base/commands/migration/migration_stages.go new file mode 100644 index 000000000..72f045977 --- /dev/null +++ b/base/commands/migration/migration_stages.go @@ -0,0 +1,328 @@ +//go:build std || migration + +package migration + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + errors2 "github.com/hazelcast/hazelcast-commandline-client/errors" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" + "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" +) + +var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout while reading status: "+ + "please ensure that you are using Hazelcast's migration cluster distribution and your DMT configuration points to that cluster: %w", + context.DeadlineExceeded) + +var migrationStatusNotFoundErr = fmt.Errorf("migration status not found") + +func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) ([]stage.Stage[any], error) { + childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err := waitForMigrationToBeInProgress(childCtx, ci, migrationID); err != nil { + return nil, fmt.Errorf("waiting migration to be created: %w", err) + } + var stages []stage.Stage[any] + dss, err := dataStructuresToBeMigrated(ctx, ec, migrationID) + if err != nil { + return nil, err + } + for i, d := range dss { + i := i + stages = append(stages, stage.Stage[any]{ + ProgressMsg: fmt.Sprintf("Migrating %s: %s", d.Type, d.Name), + SuccessMsg: fmt.Sprintf("Migrated %s: %s", d.Type, d.Name), + FailureMsg: fmt.Sprintf("Failed migrating %s: %s", d.Type, d.Name), + Func: func(ct context.Context, status stage.Statuser[any]) (any, error) { + var execErr error + StatusReaderLoop: + for { + if ctx.Err() != nil { + if errors.Is(err, context.DeadlineExceeded) { + execErr = timeoutErr + break StatusReaderLoop + } + execErr = fmt.Errorf("migration failed: %w", err) + break StatusReaderLoop + } + generalStatus, err := fetchMigrationStatus(ctx, ci, migrationID) + if err != nil { + execErr = fmt.Errorf("reading migration status: %w", err) + break StatusReaderLoop + } + switch Status(generalStatus) { + case StatusComplete: + return nil, nil + case StatusFailed: + errs, err := fetchMigrationErrors(ctx, ci, migrationID) + if err != nil { + execErr = fmt.Errorf("fetching migration errors: %w", err) + break StatusReaderLoop + } + execErr = errors.New(errs) + break StatusReaderLoop + case StatusCanceled, StatusCanceling: + execErr = errors2.ErrUserCancelled + break StatusReaderLoop + } + q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.migrations[%d]') FROM %s WHERE __key= '%s'`, i, StatusMapName, migrationID) + res, err := ci.Client().SQL().Execute(ctx, q) + if err != nil { + execErr = err + break StatusReaderLoop + } + iter, err := res.Iterator() + if err != nil { + execErr = err + break StatusReaderLoop + } + if iter.HasNext() { + row, err := iter.Next() + if err != nil { + execErr = err + break StatusReaderLoop + } + rowStr, err := row.Get(0) + if err != nil { + execErr = err + break StatusReaderLoop + } + var m DSMigrationStatus + if err = json.Unmarshal(rowStr.(serialization.JSON), &m); err != nil { + execErr = err + break StatusReaderLoop + } + status.SetProgress(m.CompletionPercentage) + switch m.Status { + case StatusComplete: + return nil, nil + case StatusFailed: + return nil, stage.IgnoreError(errors.New(m.Error)) + case StatusCanceled: + execErr = errors2.ErrUserCancelled + break StatusReaderLoop + } + } + time.Sleep(1 * time.Second) + } + if execErr != nil { + status.SetText(execErr.Error()) + } + return nil, execErr + }, + }) + } + return stages, nil +} + +func dataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, migrationID string) ([]DataStructureInfo, error) { + var dss []DataStructureInfo + ci, err := ec.ClientInternal(ctx) + if err != nil { + return nil, err + } + q := fmt.Sprintf(`SELECT this FROM %s WHERE __key= '%s'`, StatusMapName, migrationID) + res, err := ci.Client().SQL().Execute(ctx, q) + if err != nil { + return nil, err + } + it, err := res.Iterator() + if err != nil { + return nil, err + } + if it.HasNext() { // single iteration is enough that we are reading single result for a single migration + row, err := it.Next() + if err != nil { + return nil, err + } + r, err := row.Get(0) + if err != nil { + return nil, err + } + var status OverallMigrationStatus + if err = json.Unmarshal(r.(serialization.JSON), &status); err != nil { + return nil, err + } + for _, m := range status.Migrations { + dss = append(dss, DataStructureInfo{ + Name: m.Name, + Type: m.Type, + }) + } + } else { + return nil, fmt.Errorf("no datastructures found to migrate") + } + return dss, nil +} + +func saveMemberLogs(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) error { + for _, m := range ci.OrderedMembers() { + l, err := ci.Client().GetList(ctx, DebugLogsListPrefix+m.UUID.String()) + if err != nil { + return err + } + logs, err := l.GetAll(ctx) + if err != nil { + return err + } + for _, line := range logs { + ec.Logger().Info(fmt.Sprintf("[%s_%s] %s", migrationID, m.UUID.String(), line.(string))) + } + } + return nil +} + +func saveReportToFile(ctx context.Context, ci *hazelcast.ClientInternal, migrationID, fileName string) error { + report, err := fetchMigrationReport(ctx, ci, migrationID) + if err != nil { + return err + } + if report == "" { + return nil + } + f, err := os.Create(fmt.Sprintf(fileName)) + if err != nil { + return err + } + defer f.Close() + return os.WriteFile(fileName, []byte(report), 0600) +} + +func waitForMigrationToBeInProgress(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error { + for { + status, err := fetchMigrationStatus(ctx, ci, migrationID) + if err != nil { + if errors.Is(err, migrationStatusNotFoundErr) { + continue // migration status will not be available for a while, so we should wait for it + } + return err + } + if Status(status) == StatusInProgress { + return nil + } + } +} + +type OverallMigrationStatus struct { + Status Status `json:"status"` + Logs []string `json:"logs"` + Errors []string `json:"errors"` + Report string `json:"report"` + CompletionPercentage float32 `json:"completionPercentage"` + Migrations []DSMigrationStatus `json:"migrations"` +} + +type DataStructureInfo struct { + Name string + Type string +} + +type DSMigrationStatus struct { + Name string `json:"name"` + Type string `json:"type"` + Status Status `json:"status"` + CompletionPercentage float32 `json:"completionPercentage"` + Error string `json:"error"` +} + +func fetchMigrationStatus(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) { + q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.status') FROM %s WHERE __key='%s'`, StatusMapName, migrationID) + res, err := ci.Client().SQL().Execute(ctx, q) + if err != nil { + return "", err + } + it, err := res.Iterator() + if err != nil { + return "", err + } + if it.HasNext() { // single iteration is enough that we are reading single result for a single migration + row, err := it.Next() + if err != nil { + return "", err + } + r, err := row.Get(0) + var m string + if err = json.Unmarshal(r.(serialization.JSON), &m); err != nil { + return "", err + } + return m, nil + } else { + return "", migrationStatusNotFoundErr + } + return "", nil +} + +func fetchMigrationReport(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) { + q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.report') FROM %s WHERE __key='%s'`, StatusMapName, migrationID) + res, err := ci.Client().SQL().Execute(ctx, q) + if err != nil { + return "", err + } + it, err := res.Iterator() + if err != nil { + return "", err + } + if it.HasNext() { // single iteration is enough that we are reading single result for a single migration + row, err := it.Next() + if err != nil { + return "", err + } + r, err := row.Get(0) + var m string + if err = json.Unmarshal(r.(serialization.JSON), &m); err != nil { + return "", err + } + return m, nil + } else { + return "", fmt.Errorf("migration report cannot be found") + } + return "", nil +} + +func fetchMigrationErrors(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) { + q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.errors') FROM %s WHERE __key='%s'`, StatusMapName, migrationID) + res, err := ci.Client().SQL().Execute(ctx, q) + if err != nil { + return "", err + } + it, err := res.Iterator() + if err != nil { + return "", err + } + var errs []string + for it.HasNext() { + row, err := it.Next() + if err != nil { + return "", err + } + r, err := row.Get(0) + var m string + if err = json.Unmarshal(r.(serialization.JSON), &m); err != nil { + return "", err + } + errs = append(errs, m) + } + return strings.Join(errs, "\n"), nil +} + +func finalizeMigration(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID, reportOutputDir string) error { + err := saveMemberLogs(ctx, ec, ci, migrationID) + if err != nil { + return err + } + outFile := filepath.Join(reportOutputDir, fmt.Sprintf("migration_report_%s.txt", migrationID)) + err = saveReportToFile(ctx, ci, migrationID, outFile) + if err != nil { + return fmt.Errorf("saving report to file: %w", err) + } + return nil +} diff --git a/base/commands/migration/migration_start.go b/base/commands/migration/migration_start.go index f19125e8d..843681998 100644 --- a/base/commands/migration/migration_start.go +++ b/base/commands/migration/migration_start.go @@ -1,4 +1,4 @@ -//go:build migration +//go:build std || migration package migration @@ -9,8 +9,10 @@ import ( "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors" "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/output" "github.com/hazelcast/hazelcast-commandline-client/internal/plug" "github.com/hazelcast/hazelcast-commandline-client/internal/prompt" + "github.com/hazelcast/hazelcast-commandline-client/internal/serialization" ) type StartCmd struct{} @@ -24,10 +26,15 @@ func (StartCmd) Init(cc plug.InitContext) error { cc.SetCommandHelp(help, help) cc.AddBoolFlag(clc.FlagAutoYes, "", false, false, "start the migration without confirmation") cc.AddStringArg(argDMTConfig, argTitleDMTConfig) + cc.AddStringFlag(flagOutputDir, "o", "", false, "output directory for the migration report, if not given current directory is used") return nil } -func (StartCmd) Exec(ctx context.Context, ec plug.ExecContext) error { +func (StartCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) { + ci, err := ec.ClientInternal(ctx) + if err != nil { + return err + } ec.PrintlnUnnecessary("") ec.PrintlnUnnecessary(`Hazelcast Data Migration Tool v5.3.0 (c) 2023 Hazelcast, Inc. @@ -45,14 +52,39 @@ Selected data structures in the source cluster will be migrated to the target cl } } ec.PrintlnUnnecessary("") - sts := NewStages(makeMigrationID(), ec.GetStringArg(argDMTConfig)) + mID := MakeMigrationID() + defer func() { + finalizeErr := finalizeMigration(ctx, ec, ci, mID, ec.Props().GetString(flagOutputDir)) + if err == nil { + err = finalizeErr + } + }() + sts, err := NewStartStages(ec.Logger(), mID, ec.GetStringArg(argDMTConfig)) + if err != nil { + return err + } sp := stage.NewFixedProvider(sts.Build(ctx, ec)...) - if _, err := stage.Execute(ctx, ec, any(nil), sp); err != nil { + if _, err = stage.Execute(ctx, ec, any(nil), sp); err != nil { + return err + } + mStages, err := createMigrationStages(ctx, ec, ci, mID) + if err != nil { + return err + } + mp := stage.NewFixedProvider(mStages...) + _, err = stage.Execute(ctx, ec, any(nil), mp) + if err != nil { return err } ec.PrintlnUnnecessary("") ec.PrintlnUnnecessary("OK Migration completed successfully.") - return nil + return ec.AddOutputRows(ctx, output.Row{ + output.Column{ + Name: "Migration ID", + Type: serialization.TypeString, + Value: mID, + }, + }) } func init() { diff --git a/base/commands/migration/migration_status.go b/base/commands/migration/migration_status.go new file mode 100644 index 000000000..c9484a0b2 --- /dev/null +++ b/base/commands/migration/migration_status.go @@ -0,0 +1,61 @@ +//go:build std || migration + +package migration + +import ( + "context" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" +) + +type StatusCmd struct{} + +func (s StatusCmd) Unwrappable() {} + +func (s StatusCmd) Init(cc plug.InitContext) error { + cc.SetCommandUsage("status") + cc.SetCommandGroup("migration") + help := "Get status of the data migration in progress" + cc.AddStringFlag(flagOutputDir, "o", "", false, "output directory for the migration report, if not given current directory is used") + cc.SetCommandHelp(help, help) + return nil +} + +func (s StatusCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) { + ci, err := ec.ClientInternal(ctx) + if err != nil { + return err + } + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary(banner) + sts := NewStatusStages() + sp := stage.NewFixedProvider(sts.Build(ctx, ec)...) + mID, err := stage.Execute(ctx, ec, any(nil), sp) + if err != nil { + return err + } + defer func() { + finalizeErr := finalizeMigration(ctx, ec, ci, mID.(string), ec.Props().GetString(flagOutputDir)) + if err == nil { + err = finalizeErr + } + }() + mStages, err := createMigrationStages(ctx, ec, ci, mID.(string)) + if err != nil { + return err + } + mp := stage.NewFixedProvider(mStages...) + _, err = stage.Execute(ctx, ec, any(nil), mp) + if err != nil { + return err + } + ec.PrintlnUnnecessary("") + ec.PrintlnUnnecessary("OK") + return nil +} + +func init() { + check.Must(plug.Registry.RegisterCommand("status", &StatusCmd{})) +} diff --git a/base/commands/migration/stages.go b/base/commands/migration/stages.go deleted file mode 100644 index e5f196553..000000000 --- a/base/commands/migration/stages.go +++ /dev/null @@ -1,184 +0,0 @@ -package migration - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "time" - - "github.com/hazelcast/hazelcast-go-client" - "github.com/hazelcast/hazelcast-go-client/serialization" - "golang.org/x/exp/slices" - - clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors" - - "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" - "github.com/hazelcast/hazelcast-commandline-client/internal/plug" -) - -type Stages struct { - migrationID string - configDir string - ci *hazelcast.ClientInternal - startQueue *hazelcast.Queue - statusMap *hazelcast.Map -} - -func NewStages(migrationID, configDir string) *Stages { - if migrationID == "" { - panic("migrationID is required") - } - return &Stages{ - migrationID: migrationID, - configDir: configDir, - } -} - -func (st *Stages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage[any] { - return []stage.Stage[any]{ - { - ProgressMsg: "Connecting to the migration cluster", - SuccessMsg: "Connected to the migration cluster", - FailureMsg: "Could not connect to the migration cluster", - Func: st.connectStage(ec), - }, - { - ProgressMsg: "Starting the migration", - SuccessMsg: "Started the migration", - FailureMsg: "Could not start the migration", - Func: st.startStage(), - }, - { - ProgressMsg: "Migrating the cluster", - SuccessMsg: "Migrated the cluster", - FailureMsg: "Could not migrate the cluster", - Func: st.migrateStage(), - }, - } -} - -func (st *Stages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { - return func(ctx context.Context, status stage.Statuser[any]) (any, error) { - var err error - st.ci, err = ec.ClientInternal(ctx) - if err != nil { - return nil, err - } - st.startQueue, err = st.ci.Client().GetQueue(ctx, startQueueName) - if err != nil { - return nil, err - } - st.statusMap, err = st.ci.Client().GetMap(ctx, makeStatusMapName(st.migrationID)) - if err != nil { - return nil, err - } - return nil, nil - } -} - -func (st *Stages) startStage() func(context.Context, stage.Statuser[any]) (any, error) { - return func(ctx context.Context, status stage.Statuser[any]) (any, error) { - if err := st.statusMap.Delete(ctx, statusMapEntryName); err != nil { - return nil, err - } - var cb configBundle - cb.MigrationID = st.migrationID - if err := cb.Walk(st.configDir); err != nil { - return nil, err - } - b, err := json.Marshal(cb) - if err != nil { - return nil, err - } - if err = st.startQueue.Put(ctx, serialization.JSON(b)); err != nil { - return nil, err - } - ctx, cancel := context.WithTimeout(ctx, 30*time.Second) - defer cancel() - if err = st.waitForStatus(ctx, time.Second, statusInProgress, statusComplete); err != nil { - return nil, err - } - return nil, nil - } -} - -func (st *Stages) migrateStage() func(context.Context, stage.Statuser[any]) (any, error) { - return func(ctx context.Context, status stage.Statuser[any]) (any, error) { - return st.waitForStatus(ctx, 5*time.Second, statusComplete), nil - } -} - -func (st *Stages) waitForStatus(ctx context.Context, waitInterval time.Duration, targetStatuses ...status) error { - timeoutErr := fmt.Errorf("migration could not be completed: reached timeout while reading status: "+ - "please ensure that you are using Hazelcast's migration cluster distribution and your DMT config points to that cluster: %w", - context.DeadlineExceeded) - for { - if err := ctx.Err(); err != nil { - if errors.Is(err, context.DeadlineExceeded) { - return timeoutErr - } - return fmt.Errorf("migration failed: %w", err) - } - s, err := st.readStatus(ctx) - if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - return timeoutErr - } - return fmt.Errorf("reading status: %w", err) - } - switch s { - case statusComplete: - return nil - case statusCanceled: - return clcerrors.ErrUserCancelled - case statusFailed: - return errors.New("migration failed") - } - if slices.Contains(targetStatuses, s) { - return nil - } - time.Sleep(waitInterval) - } -} - -func (st *Stages) readStatus(ctx context.Context) (status, error) { - v, err := st.statusMap.Get(ctx, statusMapEntryName) - if err != nil { - return statusNone, err - } - if v == nil { - return statusNone, nil - } - var b []byte - if vv, ok := v.(string); ok { - b = []byte(vv) - } else if vv, ok := v.(serialization.JSON); ok { - b = vv - } else { - return statusNone, fmt.Errorf("invalid status value") - } - var ms migrationStatus - if err := json.Unmarshal(b, &ms); err != nil { - return statusNone, fmt.Errorf("unmarshaling status: %w", err) - } - return ms.Status, nil -} - -func makeStatusMapName(migrationID string) string { - return "__datamigration_" + migrationID -} - -type status string - -const ( - statusNone status = "" - statusComplete status = "COMPLETED" - statusCanceled status = "CANCELED" - statusFailed status = "FAILED" - statusInProgress status = "IN_PROGRESS" -) - -type migrationStatus struct { - Status status `json:"status"` -} diff --git a/base/commands/migration/start_stages.go b/base/commands/migration/start_stages.go new file mode 100644 index 000000000..d2d1842ad --- /dev/null +++ b/base/commands/migration/start_stages.go @@ -0,0 +1,98 @@ +//go:build std || migration + +package migration + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/log" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" + "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" +) + +type StartStages struct { + migrationID string + configDir string + ci *hazelcast.ClientInternal + startQueue *hazelcast.Queue + statusMap *hazelcast.Map + logger log.Logger +} + +func NewStartStages(logger log.Logger, migrationID, configDir string) (*StartStages, error) { + if migrationID == "" { + return nil, errors.New("migrationID is required") + } + return &StartStages{ + migrationID: migrationID, + configDir: configDir, + logger: logger, + }, nil +} + +func (st *StartStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage[any] { + return []stage.Stage[any]{ + { + ProgressMsg: "Connecting to the migration cluster", + SuccessMsg: "Connected to the migration cluster", + FailureMsg: "Could not connect to the migration cluster", + Func: st.connectStage(ec), + }, + { + ProgressMsg: "Starting the migration", + SuccessMsg: fmt.Sprintf("Started the migration with ID: %s", st.migrationID), + FailureMsg: "Could not start the migration", + Func: st.startStage(), + }, + } +} + +func (st *StartStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { + var err error + st.ci, err = ec.ClientInternal(ctx) + if err != nil { + return nil, err + } + st.startQueue, err = st.ci.Client().GetQueue(ctx, StartQueueName) + if err != nil { + return nil, fmt.Errorf("retrieving the start Queue: %w", err) + } + st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName) + if err != nil { + return nil, fmt.Errorf("retrieving the status Map: %w", err) + } + return nil, nil + } +} + +func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { + cb, err := makeConfigBundle(st.configDir, st.migrationID) + if err != nil { + return nil, fmt.Errorf("making configuration bundle: %w", err) + } + if err = st.startQueue.Put(ctx, cb); err != nil { + return nil, fmt.Errorf("updating start Queue: %w", err) + } + return nil, nil + } +} + +func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error) { + var cb ConfigBundle + cb.MigrationID = migrationID + if err := cb.Walk(configDir); err != nil { + return nil, err + } + b, err := json.Marshal(cb) + if err != nil { + return nil, err + } + return b, nil +} diff --git a/base/commands/migration/start_stages_it_test.go b/base/commands/migration/start_stages_it_test.go new file mode 100644 index 000000000..568172ecb --- /dev/null +++ b/base/commands/migration/start_stages_it_test.go @@ -0,0 +1,123 @@ +//go:build std || migration + +package migration_test + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "os" + "sync" + "testing" + "time" + + _ "github.com/hazelcast/hazelcast-commandline-client/base" + _ "github.com/hazelcast/hazelcast-commandline-client/base/commands" + "github.com/hazelcast/hazelcast-commandline-client/base/commands/migration" + "github.com/hazelcast/hazelcast-commandline-client/clc/paths" + . "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/it" + hz "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" + "github.com/stretchr/testify/require" +) + +func TestMigrationStages(t *testing.T) { + testCases := []struct { + name string + statusMapStateFiles []string + expectedErr error + }{ + { + name: "successful", + statusMapStateFiles: []string{ + "testdata/start/migration_success_initial.json", + "testdata/start/migration_success_completed.json", + }, + }, + { + name: "failure", + statusMapStateFiles: []string{ + "testdata/start/migration_success_initial.json", + "testdata/start/migration_success_failure.json", + }, + expectedErr: errors.New("Failed migrating IMAP: imap5: some error"), + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + startMigrationTest(t, tc.expectedErr, tc.statusMapStateFiles) + }) + } +} + +func startMigrationTest(t *testing.T, expectedErr error, statusMapStateFiles []string) { + tcx := it.TestContext{T: t} + ctx := context.Background() + tcx.Tester(func(tcx it.TestContext) { + ci := hz.NewClientInternal(tcx.Client) + createMapping(ctx, tcx) + createMemberLogs(t, ctx, ci) + defer removeMembersLogs(ctx, ci) + outDir := MustValue(os.MkdirTemp("", "clc-")) + var wg sync.WaitGroup + wg.Add(1) + var execErr error + go tcx.WithReset(func() { + defer wg.Done() + execErr = tcx.CLC().Execute(ctx, "start", "dmt-config", "--yes", "-o", outDir) + }) + c := make(chan string) + go findMigrationID(ctx, tcx, c) + mID := <-c + wg.Add(1) + go migrationRunner(t, ctx, tcx, mID, &wg, statusMapStateFiles) + wg.Wait() + if expectedErr == nil { + require.Equal(t, nil, execErr) + } else { + require.Contains(t, execErr.Error(), expectedErr.Error()) + } + tcx.WithReset(func() { + f := paths.Join(outDir, fmt.Sprintf("migration_report_%s.txt", mID)) + require.Equal(t, true, paths.Exists(f)) + Must(os.Remove(f)) + b := MustValue(os.ReadFile(paths.ResolveLogPath("test"))) + for _, m := range ci.OrderedMembers() { + require.Contains(t, string(b), fmt.Sprintf("[%s_%s] log1", mID, m.UUID.String())) + require.Contains(t, string(b), fmt.Sprintf("[%s_%s] log2", mID, m.UUID.String())) + require.Contains(t, string(b), fmt.Sprintf("[%s_%s] log3", mID, m.UUID.String())) + } + }) + }) +} + +func migrationRunner(t *testing.T, ctx context.Context, tcx it.TestContext, migrationID string, wg *sync.WaitGroup, statusMapStateFiles []string) { + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + for _, f := range statusMapStateFiles { + b := MustValue(os.ReadFile(f)) + it.Eventually(t, func() bool { + return statusMap.Set(ctx, migrationID, serialization.JSON(b)) == nil + }) + } + wg.Done() +} + +func createMapping(ctx context.Context, tcx it.TestContext) { + mSQL := fmt.Sprintf(`CREATE MAPPING IF NOT EXISTS %s TYPE IMap OPTIONS('keyFormat'='varchar', 'valueFormat'='json')`, migration.StatusMapName) + MustValue(tcx.Client.SQL().Execute(ctx, mSQL)) +} + +func findMigrationID(ctx context.Context, tcx it.TestContext, c chan string) { + q := MustValue(tcx.Client.GetQueue(ctx, migration.StartQueueName)) + var b migration.ConfigBundle + for { + v := MustValue(q.PollWithTimeout(ctx, time.Second)) + if v != nil { + Must(json.Unmarshal(v.(serialization.JSON), &b)) + c <- b.MigrationID + break + } + } +} diff --git a/base/commands/migration/status_stages.go b/base/commands/migration/status_stages.go new file mode 100644 index 000000000..fbd29ef0f --- /dev/null +++ b/base/commands/migration/status_stages.go @@ -0,0 +1,70 @@ +//go:build std || migration + +package migration + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" + "github.com/hazelcast/hazelcast-commandline-client/internal/plug" + "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" +) + +type StatusStages struct { + ci *hazelcast.ClientInternal + migrationsInProgressList *hazelcast.List + statusMap *hazelcast.Map +} + +func NewStatusStages() *StatusStages { + return &StatusStages{} +} + +func (st *StatusStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage[any] { + return []stage.Stage[any]{ + { + ProgressMsg: "Connecting to the migration cluster", + SuccessMsg: "Connected to the migration cluster", + FailureMsg: "Could not connect to the migration cluster", + Func: st.connectStage(ec), + }, + } +} + +type MigrationInProgress struct { + MigrationID string `json:"migrationId"` +} + +func (st *StatusStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) { + return func(ctx context.Context, status stage.Statuser[any]) (any, error) { + var err error + st.ci, err = ec.ClientInternal(ctx) + if err != nil { + return nil, err + } + st.migrationsInProgressList, err = st.ci.Client().GetList(ctx, MigrationsInProgressList) + if err != nil { + return nil, err + } + all, err := st.migrationsInProgressList.GetAll(ctx) + if err != nil { + return nil, err + } + if len(all) == 0 { + return nil, fmt.Errorf("there are no migrations in progress") + } + var mip MigrationInProgress + m := all[0].(serialization.JSON) + if err = json.Unmarshal(m, &mip); err != nil { + return nil, fmt.Errorf("parsing migration in progress: %w", err) + } + st.statusMap, err = st.ci.Client().GetMap(ctx, StatusMapName) + if err != nil { + return nil, err + } + return mip.MigrationID, err + } +} diff --git a/base/commands/migration/status_stages_it_test.go b/base/commands/migration/status_stages_it_test.go new file mode 100644 index 000000000..0a780c661 --- /dev/null +++ b/base/commands/migration/status_stages_it_test.go @@ -0,0 +1,126 @@ +//go:build std || migration + +package migration_test + +import ( + "context" + "encoding/json" + "fmt" + "os" + "sync" + "testing" + "time" + + _ "github.com/hazelcast/hazelcast-commandline-client/base" + _ "github.com/hazelcast/hazelcast-commandline-client/base/commands" + "github.com/hazelcast/hazelcast-commandline-client/base/commands/migration" + "github.com/hazelcast/hazelcast-commandline-client/clc/paths" + . "github.com/hazelcast/hazelcast-commandline-client/internal/check" + "github.com/hazelcast/hazelcast-commandline-client/internal/it" + hz "github.com/hazelcast/hazelcast-go-client" + "github.com/hazelcast/hazelcast-go-client/serialization" + "github.com/stretchr/testify/require" +) + +func TestStatus(t *testing.T) { + testCases := []struct { + name string + f func(t *testing.T) + }{ + {name: "status", f: statusTest}, + {name: "noMigrationsStatus", f: noMigrationsStatusTest}, + } + for _, tc := range testCases { + t.Run(tc.name, tc.f) + } +} + +func noMigrationsStatusTest(t *testing.T) { + tcx := it.TestContext{T: t} + ctx := context.Background() + tcx.Tester(func(tcx it.TestContext) { + var wg sync.WaitGroup + wg.Add(1) + var execErr error + go tcx.WithReset(func() { + defer wg.Done() + execErr = tcx.CLC().Execute(ctx, "status") + }) + wg.Wait() + require.Contains(t, execErr.Error(), "there are no migrations in progress") + }) +} + +func statusTest(t *testing.T) { + tcx := it.TestContext{T: t} + ctx := context.Background() + tcx.Tester(func(tcx it.TestContext) { + ci := hz.NewClientInternal(tcx.Client) + progressList := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList)) + mID := preStatusRunner(t, tcx, ctx, ci, progressList) + defer postStatusRunner(ctx, mID, progressList) + defer removeMembersLogs(ctx, ci) + outDir := MustValue(os.MkdirTemp("", "clc-")) + var wg sync.WaitGroup + wg.Add(1) + go tcx.WithReset(func() { + defer wg.Done() + Must(tcx.CLC().Execute(ctx, "status", "-o", outDir)) + }) + // statusRunner removes __datamigrations_in_progress list, so we should give some time to command to read it first + time.Sleep(1 * time.Second) + statusRunner(mID, tcx, ctx) + wg.Wait() + tcx.AssertStdoutContains("Connected to the migration cluster") + tcx.WithReset(func() { + f := paths.Join(outDir, fmt.Sprintf("migration_report_%s.txt", mID)) + require.Equal(t, true, paths.Exists(f)) + Must(os.Remove(f)) + b := MustValue(os.ReadFile(paths.ResolveLogPath("test"))) + for _, m := range ci.OrderedMembers() { + require.Contains(t, string(b), fmt.Sprintf("[%s_%s] log1", mID, m.UUID.String())) + require.Contains(t, string(b), fmt.Sprintf("[%s_%s] log2", mID, m.UUID.String())) + require.Contains(t, string(b), fmt.Sprintf("[%s_%s] log3", mID, m.UUID.String())) + } + }) + }) +} + +func preStatusRunner(t *testing.T, tcx it.TestContext, ctx context.Context, ci *hz.ClientInternal, progressList *hz.List) string { + createMapping(ctx, tcx) + createMemberLogs(t, ctx, ci) + mID := migration.MakeMigrationID() + m := MustValue(json.Marshal(migration.MigrationInProgress{MigrationID: mID})) + require.Equal(t, true, MustValue(progressList.Add(ctx, serialization.JSON(m)))) + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + b := MustValue(os.ReadFile("testdata/start/migration_success_initial.json")) + Must(statusMap.Set(ctx, mID, serialization.JSON(b))) + return mID +} + +func createMemberLogs(t *testing.T, ctx context.Context, ci *hz.ClientInternal) { + for _, m := range ci.OrderedMembers() { + l := MustValue(ci.Client().GetList(ctx, migration.DebugLogsListPrefix+m.UUID.String())) + require.Equal(t, true, MustValue(l.Add(ctx, "log1"))) + require.Equal(t, true, MustValue(l.Add(ctx, "log2"))) + require.Equal(t, true, MustValue(l.Add(ctx, "log3"))) + } +} + +func removeMembersLogs(ctx context.Context, ci *hz.ClientInternal) { + for _, m := range ci.OrderedMembers() { + l := MustValue(ci.Client().GetList(ctx, migration.DebugLogsListPrefix+m.UUID.String())) + Must(l.Destroy(ctx)) + } +} + +func statusRunner(migrationID string, tcx it.TestContext, ctx context.Context) { + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName)) + b := MustValue(os.ReadFile("testdata/start/migration_success_completed.json")) + Must(statusMap.Set(ctx, migrationID, serialization.JSON(b))) +} + +func postStatusRunner(ctx context.Context, migrationID string, progressList *hz.List) { + m := MustValue(json.Marshal(migration.MigrationInProgress{MigrationID: migrationID})) + MustValue(progressList.Remove(ctx, serialization.JSON(m))) +} diff --git a/base/commands/migration/testdata/start/migration_success_completed.json b/base/commands/migration/testdata/start/migration_success_completed.json new file mode 100644 index 000000000..132eb7460 --- /dev/null +++ b/base/commands/migration/testdata/start/migration_success_completed.json @@ -0,0 +1,70 @@ +{ + "id": "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "type": "MIGRATION", + "completionPercentage": 12.123, + "migrations": [ + { + "name": "imap5", + "type": "IMAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "entriesMigrated": 1000, + "totalEntries": 1000, + "completionPercentage": 100 + }, + { + "name": "rmap4", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "rmap3", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "rmap2", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "imap1", + "type": "IMAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "imap12", + "type": "IMAP", + "status": "COMPLETED", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + } + ], + "logs": ["some user friendly log message", "another user friendly log message"], + "errors": [], + "report" : "completed migration report" +} \ No newline at end of file diff --git a/base/commands/migration/testdata/start/migration_success_failure.json b/base/commands/migration/testdata/start/migration_success_failure.json new file mode 100644 index 000000000..e4ae91f55 --- /dev/null +++ b/base/commands/migration/testdata/start/migration_success_failure.json @@ -0,0 +1,71 @@ +{ + "id": "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7", + "status": "FAILED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "type": "MIGRATION", + "completionPercentage": 12.123, + "migrations": [ + { + "name": "imap5", + "type": "IMAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "entriesMigrated": 1000, + "totalEntries": 1000, + "completionPercentage": 100 + }, + { + "name": "rmap4", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "rmap3", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "rmap2", + "type": "REPLICATED_MAP", + "status": "COMPLETED", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "imap1", + "type": "IMAP", + "status": "COMPLETED", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100 + }, + { + "name": "imap12", + "type": "IMAP", + "status": "FAILED", + "entriesMigrated": 10000, + "totalEntries": 10000, + "completionPercentage": 100, + "error": "some error" + } + ], + "logs": ["some user friendly log message", "another user friendly log message"], + "errors": ["some error"], + "report": "failed migration report" +} \ No newline at end of file diff --git a/base/commands/migration/testdata/start/migration_success_initial.json b/base/commands/migration/testdata/start/migration_success_initial.json new file mode 100644 index 000000000..5f90ba748 --- /dev/null +++ b/base/commands/migration/testdata/start/migration_success_initial.json @@ -0,0 +1,70 @@ +{ + "id": "e6e928d3-63af-4e72-8c42-0bfcf0ab6cf7", + "status": "IN_PROGRESS", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "type": "MIGRATION", + "completionPercentage": 12.123, + "migrations": [ + { + "name": "imap5", + "type": "IMAP", + "status": "IN_PROGRESS", + "startTimestamp": "2023-01-01T00:00:00Z", + "entriesMigrated": 121, + "totalEntries": 1000, + "completionPercentage": 12.1 + }, + { + "name": "rmap4", + "type": "REPLICATED_MAP", + "status": "IN_PROGRESS", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 1212, + "totalEntries": 10000, + "completionPercentage": 12.12 + }, + { + "name": "rmap3", + "type": "REPLICATED_MAP", + "status": "IN_PROGRESS", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 1212, + "totalEntries": 10000, + "completionPercentage": 12.12 + }, + { + "name": "rmap2", + "type": "REPLICATED_MAP", + "status": "IN_PROGRESS", + "startTimestamp": "2023-09-01T12:11:48+0200", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 1212, + "totalEntries": 10000, + "completionPercentage": 12.12 + }, + { + "name": "imap1", + "type": "IMAP", + "status": "IN_PROGRESS", + "startTimestamp": "2023-01-01T00:00:00Z", + "finishTimestamp": "2023-01-01T00:01:00Z", + "entriesMigrated": 1212, + "totalEntries": 10000, + "completionPercentage": 12.12, + "error": "some error" + }, + { + "name": "imap12", + "type": "IMAP", + "status": "NOT_STARTED", + "entriesMigrated": 0, + "totalEntries": 10000, + "completionPercentage": 0 + } + ], + "logs": ["some user friendly log message", "another user friendly log message"], + "errors": [] +} \ No newline at end of file diff --git a/base/commands/migration/utils.go b/base/commands/migration/utils.go index 61b364cc8..5af0aa0ed 100644 --- a/base/commands/migration/utils.go +++ b/base/commands/migration/utils.go @@ -1,3 +1,5 @@ +//go:build std || migration + package migration import ( @@ -21,7 +23,7 @@ type bundleFile struct { Content string `json:"content"` } -type configBundle struct { +type ConfigBundle struct { MigrationID string `json:"migrationId"` ConfigPath string `json:"configPath"` Source []bundleFile `json:"source"` @@ -30,7 +32,7 @@ type configBundle struct { ReplicatedMaps []string `json:"replicatedMaps"` } -func (cb *configBundle) Walk(root string) error { +func (cb *ConfigBundle) Walk(root string) error { var err error cb.IMaps, err = readItems(filepath.Join(root, "data", "imap_names.txt")) if err != nil { @@ -119,6 +121,10 @@ func readPathAsString(path string) (string, error) { return string(b), nil } -func makeMigrationID() string { +func MakeMigrationID() string { return types.NewUUID().String() } + +func MakeUpdateTopicName(migrationID string) string { + return UpdateTopicPrefix + migrationID +} diff --git a/base/maps/maps.go b/base/maps/maps.go index e85aeba63..7d065516a 100644 --- a/base/maps/maps.go +++ b/base/maps/maps.go @@ -4,7 +4,6 @@ import ( "context" "fmt" - "github.com/hazelcast/hazelcast-commandline-client/base/commands/object" "github.com/hazelcast/hazelcast-commandline-client/base/objects" "github.com/hazelcast/hazelcast-commandline-client/clc" "github.com/hazelcast/hazelcast-commandline-client/internal/output" @@ -19,7 +18,7 @@ func Indexes(ctx context.Context, ec plug.ExecContext, mapName string) error { if mapName != "" { mapNames = append(mapNames, mapName) } else { - maps, err := objects.GetAll(ctx, ec, object.Map, false) + maps, err := objects.GetAll(ctx, ec, "map", false) if err != nil { return err }