diff --git a/base/commands/migration/migration_stages.go b/base/commands/migration/migration_stages.go index b2fbc090..3e7a155f 100644 --- a/base/commands/migration/migration_stages.go +++ b/base/commands/migration/migration_stages.go @@ -9,11 +9,13 @@ import ( "fmt" "os" "path/filepath" + "strconv" "strings" "time" "github.com/hazelcast/hazelcast-go-client" "github.com/hazelcast/hazelcast-go-client/serialization" + "github.com/hazelcast/hazelcast-go-client/sql" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors" @@ -26,6 +28,12 @@ var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout w var migrationStatusNotFoundErr = fmt.Errorf("migration status not found") +var migrationReportNotFoundErr = "migration report cannot be found: %w" + +var noDataStructuresFoundErr = errors.New("no datastructures found to migrate") + +var progressMsg = "Migrating %s: %s" + func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) ([]stage.Stage[any], error) { childCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() @@ -40,7 +48,7 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca for i, d := range dss { i := i stages = append(stages, stage.Stage[any]{ - ProgressMsg: fmt.Sprintf("Migrating %s: %s", d.Type, d.Name), + ProgressMsg: fmt.Sprintf(progressMsg, d.Type, d.Name), SuccessMsg: fmt.Sprintf("Migrated %s: %s", d.Type, d.Name), FailureMsg: fmt.Sprintf("Failed migrating %s: %s", d.Type, d.Name), Func: func(ct context.Context, status stage.Statuser[any]) (any, error) { @@ -76,6 +84,16 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca case StatusCanceled, StatusCanceling: execErr = clcerrors.ErrUserCancelled break statusReaderLoop + case StatusInProgress: + rt, cp, err := fetchOverallProgress(ctx, ci, migrationID) + if err != nil { + ec.Logger().Error(err) + status.SetText("Unable to calculate remaining duration and progress") + } else { + status.SetText(fmt.Sprintf(progressMsg, d.Type, d.Name)) + status.SetProgress(cp) + status.SetRemainingDuration(rt) + } } q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.migrations[%d]') FROM %s WHERE __key= '%s'`, i, StatusMapName, migrationID) res, err := ci.Client().SQL().Execute(ctx, q) @@ -104,7 +122,6 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca execErr = err break statusReaderLoop } - status.SetProgress(m.CompletionPercentage) switch m.Status { case StatusComplete: return nil, nil @@ -135,12 +152,19 @@ func getDataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, mig if err != nil { return nil, err } + rr, err := r.Get(0) + if err != nil { + return nil, err + } + if rr == nil { + return nil, noDataStructuresFoundErr + } var status OverallMigrationStatus - if err = json.Unmarshal(r.(serialization.JSON), &status); err != nil { + if err = json.Unmarshal(rr.(serialization.JSON), &status); err != nil { return nil, err } if len(status.Migrations) == 0 { - return nil, errors.New("no datastructures found to migrate") + return nil, noDataStructuresFoundErr } for _, m := range status.Migrations { dss = append(dss, DataStructureInfo{ @@ -208,6 +232,7 @@ type OverallMigrationStatus struct { Errors []string `json:"errors"` Report string `json:"report"` CompletionPercentage float32 `json:"completionPercentage"` + RemainingTime float32 `json:"remainingTime"` Migrations []DataStructureMigrationStatus `json:"migrations"` } @@ -230,28 +255,84 @@ func fetchMigrationStatus(ctx context.Context, ci *hazelcast.ClientInternal, mig if err != nil { return "", migrationStatusNotFoundErr } - return strings.TrimSuffix(strings.TrimPrefix(string(r.(serialization.JSON)), `"`), `"`), nil + rr, err := r.Get(0) + if err != nil { + return "", migrationStatusNotFoundErr + } + if rr == nil { + return "", migrationStatusNotFoundErr + } + return strings.TrimSuffix(strings.TrimPrefix(string(rr.(serialization.JSON)), `"`), `"`), nil +} + +func fetchOverallProgress(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (time.Duration, float32, error) { + q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.remainingTime'), JSON_QUERY(this, '$.completionPercentage') FROM %s WHERE __key='%s'`, StatusMapName, migrationID) + r, err := querySingleRow(ctx, ci, q) + if err != nil { + return 0, 0, err + } + if r == nil { + return 0, 0, errors.New("overall progress not found") + } + remainingTime, err := r.Get(0) + if err != nil { + return 0, 0, err + } + completionPercentage, err := r.Get(1) + if err != nil { + return 0, 0, err + } + if completionPercentage == nil { + return 0, 0, fmt.Errorf("completionPercentage is not available in %s", StatusMapName) + } + if remainingTime == nil { + return 0, 0, fmt.Errorf("remainingTime is not available in %s", StatusMapName) + } + rt, err := strconv.ParseInt(remainingTime.(serialization.JSON).String(), 10, 64) + if err != nil { + return 0, 0, err + } + cpStr := completionPercentage.(serialization.JSON).String() + cp, err := strconv.ParseFloat(cpStr, 32) + if err != nil { + return 0, 0, err + } + return time.Duration(rt) * time.Millisecond, float32(cp), nil } func fetchMigrationReport(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) { q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.report') FROM %s WHERE __key='%s'`, StatusMapName, migrationID) r, err := querySingleRow(ctx, ci, q) if err != nil { - return "", fmt.Errorf("migration report cannot be found: %w", err) + return "", fmt.Errorf(migrationReportNotFoundErr, err) + } + if r == nil { + return "", errors.New("migration report not found") + } + rr, err := r.Get(0) + if err != nil { + return "", fmt.Errorf(migrationReportNotFoundErr, err) } var t string - json.Unmarshal(r.(serialization.JSON), &t) + json.Unmarshal(rr.(serialization.JSON), &t) return t, nil } func fetchMigrationErrors(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) { q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.errors' WITH WRAPPER) FROM %s WHERE __key='%s'`, StatusMapName, migrationID) - row, err := querySingleRow(ctx, ci, q) + r, err := querySingleRow(ctx, ci, q) + if err != nil { + return "", err + } + if r == nil { + return "", errors.New("could not fetch migration errors") + } + rr, err := r.Get(0) if err != nil { return "", err } var errs []string - err = json.Unmarshal(row.(serialization.JSON), &errs) + err = json.Unmarshal(rr.(serialization.JSON), &errs) if err != nil { return "", err } @@ -271,27 +352,7 @@ func finalizeMigration(ctx context.Context, ec plug.ExecContext, ci *hazelcast.C return nil } -func maybePrintWarnings(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) { - q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.warnings' WITH WRAPPER) FROM %s WHERE __key='%s'`, StatusMapName, migrationID) - row, err := querySingleRow(ctx, ci, q) - if err != nil { - ec.Logger().Error(err) - return - } - var warnings []string - err = json.Unmarshal(row.(serialization.JSON), &warnings) - if err != nil { - ec.Logger().Error(err) - return - } - if len(warnings) <= 5 { - ec.PrintlnUnnecessary("* " + strings.Join(warnings, "\n* ")) - } else { - ec.PrintlnUnnecessary(fmt.Sprintf("You have %d warnings that you can find in your migration report.", len(warnings))) - } -} - -func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query string) (any, error) { +func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query string) (sql.Row, error) { res, err := ci.Client().SQL().Execute(ctx, query) if err != nil { return nil, err @@ -306,14 +367,39 @@ func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query str if err != nil { return nil, err } - r, err := row.Get(0) - if err != nil { - return "", err - } - if r == nil { - return nil, errors.New("no rows found") - } - return r, nil + return row, nil } return nil, errors.New("no rows found") } + +func maybePrintWarnings(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) { + q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.warnings' WITH WRAPPER) FROM %s WHERE __key='%s'`, StatusMapName, migrationID) + r, err := querySingleRow(ctx, ci, q) + if err != nil { + ec.Logger().Error(err) + return + } + if r == nil { + ec.Logger().Info("could not find any warnings") + return + } + rr, err := r.Get(0) + if err != nil { + ec.Logger().Error(err) + return + } + if rr == nil { + return + } + var warnings []string + err = json.Unmarshal(rr.(serialization.JSON), &warnings) + if err != nil { + ec.Logger().Error(err) + return + } + if len(warnings) <= 5 { + ec.PrintlnUnnecessary("* " + strings.Join(warnings, "\n* ")) + } else { + ec.PrintlnUnnecessary(fmt.Sprintf("You have %d warnings that you can find in your migration report.", len(warnings))) + } +} diff --git a/base/commands/migration/testdata/start/migration_success_initial.json b/base/commands/migration/testdata/start/migration_success_initial.json index 5f90ba74..6be6e6bb 100644 --- a/base/commands/migration/testdata/start/migration_success_initial.json +++ b/base/commands/migration/testdata/start/migration_success_initial.json @@ -5,6 +5,7 @@ "finishTimestamp": "2023-01-01T00:01:00Z", "type": "MIGRATION", "completionPercentage": 12.123, + "remainingTime": 1, "migrations": [ { "name": "imap5",