Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLC-311, CLC-314, CLC-326] DMT Updates #398

Merged
merged 57 commits into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
2f281d6
* add initial tests for DMT
Sep 6, 2023
f79befd
add errors and logs
Sep 6, 2023
8b68b51
[CLC-311]: Add status command to DMT
Sep 7, 2023
3f76b0f
refactor
Sep 7, 2023
98ea33f
add overall completion percentage to status output
Sep 7, 2023
ad4bc17
refactor
Sep 7, 2023
d0dfafa
fix PR comments
Sep 7, 2023
318fff5
fix dependency
Sep 7, 2023
b529b9a
fix PR comment: change error msg
Sep 7, 2023
d5591a0
fix PR comments
Sep 7, 2023
48a42ce
subscribe to topic if status is not terminal
Sep 7, 2023
999a3bf
write migration report to file
Sep 7, 2023
aed1d01
refactor remove listener
Sep 8, 2023
75654cd
add debug log files
Sep 8, 2023
dc45c7a
add progress
Sep 18, 2023
243f7b1
add flag for output dir of report file
Sep 18, 2023
db210e3
add flag for output dir of report file
Sep 18, 2023
e42e56c
save debug logs to clc log file
Sep 18, 2023
45c4933
fix tests
Sep 18, 2023
dfaddaf
fix PR comment
Sep 18, 2023
f874293
delete unnecessary struct
Sep 18, 2023
b5554a9
fix PR comments
Sep 18, 2023
e35e7b5
fix PR comments
Sep 19, 2023
b4fc17e
Merge branch 'dmt' into CLC-306
Sep 19, 2023
44260f5
merge
Sep 19, 2023
ae6c008
set status
Sep 19, 2023
476be4e
query with sql
Sep 21, 2023
f486385
fix status command
Sep 21, 2023
547cf14
fix status command
Sep 21, 2023
9c493b9
refactor
Sep 21, 2023
1b95eb8
add timeout
Sep 22, 2023
752972f
refactor start test code
Sep 22, 2023
2f224bc
refactor status test code
Sep 22, 2023
7b2d93e
remove member logs in test
Sep 22, 2023
6b7119b
comment log tests
Sep 22, 2023
eb122a1
yuce's fix for default log path in tests
Sep 22, 2023
01cd2f3
add sleep
Sep 22, 2023
7dbbcba
Merge branch 'main' into DMT-Updates
Sep 28, 2023
e0f9627
Merge branch 'dmt' into DMT-Updates
Sep 28, 2023
d6f2e4b
fix tests
Sep 28, 2023
df939b2
fix tests
Sep 28, 2023
cd61ae5
set text for individual migration
Sep 28, 2023
c2f15d2
add deleted build-dmt to Makefile again
Sep 28, 2023
9425bc7
fix wait logic
Oct 11, 2023
57e1df2
add migration id info to the logs
Oct 11, 2023
f57aa14
fix tests
Oct 11, 2023
677d455
fix tests
Oct 11, 2023
5ac0b8d
Merge remote-tracking branch 'upstream/dmt' into DMT-Updates
Oct 12, 2023
8cde4fd
fix PR comment
Oct 12, 2023
ccfb425
fix Serkan's comments
Oct 16, 2023
2d916cf
fix Serkan's comment
Oct 18, 2023
0c7028c
fix Yuce's comments 1
Oct 19, 2023
f50a1de
fix Yuce's comments 2
Oct 19, 2023
db6348f
try fixing tests
Oct 19, 2023
d8a8917
try fixing tests
Oct 19, 2023
4a9371d
fix comments
Oct 20, 2023
1037acb
fix comments
Oct 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions base/commands/migration/const.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,33 @@
//go:build std || migration

package migration

const (
startQueueName = "__datamigration_start_queue"
statusMapEntryName = "status"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
StartQueueName = "__datamigration_start_queue"
StatusMapEntryName = "status"
StatusMapName = "__datamigration_migrations"
UpdateTopicPrefix = "__datamigration_updates_"
DebugLogsListPrefix = "__datamigration_debug_logs_"
MigrationsInProgressList = "__datamigrations_in_progress"
startQueueName = "__datamigration_start_queue"
statusMapEntryName = "status"
argDMTConfig = "dmtConfig"
argTitleDMTConfig = "DMT configuration"
)

type Status string

const (
StatusStarted Status = "STARTED"
StatusCanceling Status = "CANCELING"
StatusComplete Status = "COMPLETED"
StatusCanceled Status = "CANCELED"
StatusFailed Status = "FAILED"
StatusInProgress Status = "IN_PROGRESS"
)

const flagOutputDir = "output-dir"

const banner = `Hazelcast Data Migration Tool v5.3.0
(c) 2023 Hazelcast, Inc.
`
2 changes: 2 additions & 0 deletions base/commands/migration/dummy.go
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
package migration

// This file exists only for compilation
2 changes: 1 addition & 1 deletion base/commands/migration/migration.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//go:build migration
//go:build std || migration

package migration

Expand Down
302 changes: 302 additions & 0 deletions base/commands/migration/migration_stages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,302 @@
//go:build std || migration

package migration

import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strings"
"time"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
errors2 "github.com/hazelcast/hazelcast-commandline-client/errors"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
)

var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout while reading status: "+
"please ensure that you are using Hazelcast's migration cluster distribution and your DMT config points to that cluster: %w",
context.DeadlineExceeded)

func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) ([]stage.Stage[any], error) {
childCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := waitForMigrationToBeCreated(childCtx, ci, migrationID); err != nil {
return nil, fmt.Errorf("waiting migration to be created: %w", err)
}
var stages []stage.Stage[any]
dss, err := dataStructuresToBeMigrated(ctx, ec, migrationID)
if err != nil {
return nil, err
}
for i, d := range dss {
i := i
stages = append(stages, stage.Stage[any]{
ProgressMsg: fmt.Sprintf("Migrating %s: %s", d.Type, d.Name),
SuccessMsg: fmt.Sprintf("Migrated %s: %s ...", d.Type, d.Name),
FailureMsg: fmt.Sprintf("Failed migrating %s: %s ...", d.Type, d.Name),
Func: func(ct context.Context, status stage.Statuser[any]) (any, error) {
for {
if ctx.Err() != nil {
if errors.Is(err, context.DeadlineExceeded) {
return nil, timeoutErr
}
return nil, fmt.Errorf("migration failed: %w", err)
}
generalStatus, err := fetchMigrationStatus(ctx, ci, migrationID)
if err != nil {
return nil, fmt.Errorf("reading migration status: %w", err)
}
switch Status(generalStatus) {
case StatusComplete:
return nil, nil
case StatusFailed:
errs, err := fetchMigrationErrors(ctx, ci, migrationID)
if err != nil {
return nil, fmt.Errorf("fetching migration errors: %w", err)
}
return nil, errors.New(errs)
case StatusCanceled, StatusCanceling:
return nil, errors2.ErrUserCancelled
}
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.migrations[%d]') FROM %s WHERE __key= '%s'`, i, StatusMapName, migrationID)
res, err := ci.Client().SQL().Execute(ctx, q)
if err != nil {
return nil, err
}
iter, err := res.Iterator()
if err != nil {
return nil, err
}
if iter.HasNext() {
row, err := iter.Next()
if err != nil {
return nil, err
}
rowStr, err := row.Get(0)
if err != nil {
return nil, err
}
var m DSMigrationStatus
if err = json.Unmarshal(rowStr.(serialization.JSON), &m); err != nil {
return nil, err
}
status.SetProgress(m.CompletionPercentage)
switch m.Status {
case StatusComplete:
return nil, nil
case StatusFailed:
return nil, stage.IgnoreError(errors.New(m.Error))
case StatusCanceled:
return nil, errors2.ErrUserCancelled
}
}
time.Sleep(1 * time.Second)
}
},
})
}
return stages, nil
}

func dataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, migrationID string) ([]DataStructureInfo, error) {
var dss []DataStructureInfo
ci, err := ec.ClientInternal(ctx)
if err != nil {
return nil, err
}
q := fmt.Sprintf(`SELECT this FROM %s WHERE __key= '%s'`, StatusMapName, migrationID)
res, err := ci.Client().SQL().Execute(ctx, q)
if err != nil {
return nil, err
}
it, err := res.Iterator()
if err != nil {
return nil, err
}
if it.HasNext() { // single iteration is enough that we are reading single result for a single migration
row, err := it.Next()
if err != nil {
return nil, err
}
r, err := row.Get(0)
if err != nil {
return nil, err
}
var status OverallMigrationStatus
if err = json.Unmarshal(r.(serialization.JSON), &status); err != nil {
return nil, err
}
for _, m := range status.Migrations {
dss = append(dss, DataStructureInfo{
Name: m.Name,
Type: m.Type,
})
}
}
return dss, nil
}

func saveMemberLogs(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) error {
for _, m := range ci.OrderedMembers() {
l, err := ci.Client().GetList(ctx, DebugLogsListPrefix+m.UUID.String())
if err != nil {
return err
}
logs, err := l.GetAll(ctx)
if err != nil {
return err
}
for _, line := range logs {
ec.Logger().Info(fmt.Sprintf("[%s_%s] %s", migrationID, m.UUID.String(), line.(string)))
}
}
return nil
}

func saveReportToFile(ctx context.Context, ci *hazelcast.ClientInternal, migrationID, fileName string) error {
report, err := fetchMigrationReport(ctx, ci, migrationID)
if err != nil {
return err
}
f, err := os.Create(fmt.Sprintf(fileName))
if err != nil {
return err
}
defer f.Close()
return os.WriteFile(fileName, []byte(report), 0600)
}

func waitForMigrationToBeCreated(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) error {
for {
statusMap, err := ci.Client().GetMap(ctx, StatusMapName)
if err != nil {
return err
}
ok, err := statusMap.ContainsKey(ctx, migrationID)
if err != nil {
return err
}
if ok {
return nil
}
}
}

type OverallMigrationStatus struct {
Status Status `json:"status"`
Logs []string `json:"logs"`
Errors []string `json:"errors"`
Report string `json:"report"`
CompletionPercentage float32 `json:"completionPercentage"`
Migrations []DSMigrationStatus `json:"migrations"`
}

type DataStructureInfo struct {
Name string
Type string
}

type DSMigrationStatus struct {
Name string `json:"name"`
Type string `json:"type"`
Status Status `json:"status"`
CompletionPercentage float32 `json:"completionPercentage"`
Error string `json:"error"`
}

func fetchMigrationStatus(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.status') FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
res, err := ci.Client().SQL().Execute(ctx, q)
if err != nil {
return "", err
}
it, err := res.Iterator()
if err != nil {
return "", err
}
if it.HasNext() { // single iteration is enough that we are reading single result for a single migration
row, err := it.Next()
if err != nil {
return "", err
}
r, err := row.Get(0)
var m string
if err = json.Unmarshal(r.(serialization.JSON), &m); err != nil {
return "", err
}
return m, nil
}
return "", nil
}

func fetchMigrationReport(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.report') FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
res, err := ci.Client().SQL().Execute(ctx, q)
if err != nil {
return "", err
}
it, err := res.Iterator()
if err != nil {
return "", err
}
if it.HasNext() { // single iteration is enough that we are reading single result for a single migration
row, err := it.Next()
if err != nil {
return "", err
}
r, err := row.Get(0)
var m string
if err = json.Unmarshal(r.(serialization.JSON), &m); err != nil {
return "", err
}
return m, nil
}
return "", nil
}

func fetchMigrationErrors(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.errors') FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
res, err := ci.Client().SQL().Execute(ctx, q)
if err != nil {
return "", err
}
it, err := res.Iterator()
if err != nil {
return "", err
}
var errs []string
for it.HasNext() { // single iteration is enough that we are reading single result for a single migration
row, err := it.Next()
if err != nil {
return "", err
}
r, err := row.Get(0)
var m string
if err = json.Unmarshal(r.(serialization.JSON), &m); err != nil {
return "", err
}
errs = append(errs, m)
}
return strings.Join(errs, "\n"), nil
}

func finalizeMigration(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID, reportOutputDir string) error {
err := saveMemberLogs(ctx, ec, ci, migrationID)
if err != nil {
return err
}
var name string
if reportOutputDir == "" {
name = fmt.Sprintf("migration_report_%s.txt", migrationID)
}
err = saveReportToFile(ctx, ci, migrationID, name)
if err != nil {
return fmt.Errorf("saving report to file: %w", err)
}
return nil
}
Loading