Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLC-426]: Show Overall Remaining Time and Completion Percentage #423

Merged
merged 10 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 124 additions & 38 deletions base/commands/migration/migration_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/hazelcast/hazelcast-go-client/sql"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors"
Expand All @@ -26,6 +28,12 @@ var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout w

var migrationStatusNotFoundErr = fmt.Errorf("migration status not found")

var migrationReportNotFoundErr = "migration report cannot be found: %w"

var noDataStructuresFoundErr = errors.New("no datastructures found to migrate")

var progressMsg = "Migrating %s: %s"

func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) ([]stage.Stage[any], error) {
childCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
Expand All @@ -40,7 +48,7 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca
for i, d := range dss {
i := i
stages = append(stages, stage.Stage[any]{
ProgressMsg: fmt.Sprintf("Migrating %s: %s", d.Type, d.Name),
ProgressMsg: fmt.Sprintf(progressMsg, d.Type, d.Name),
SuccessMsg: fmt.Sprintf("Migrated %s: %s", d.Type, d.Name),
FailureMsg: fmt.Sprintf("Failed migrating %s: %s", d.Type, d.Name),
Func: func(ct context.Context, status stage.Statuser[any]) (any, error) {
Expand Down Expand Up @@ -76,6 +84,16 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca
case StatusCanceled, StatusCanceling:
execErr = clcerrors.ErrUserCancelled
break statusReaderLoop
case StatusInProgress:
rt, cp, err := fetchOverallProgress(ctx, ci, migrationID)
if err != nil {
ec.Logger().Error(err)
status.SetText("Unable to calculate remaining duration and progress")
} else {
status.SetText(fmt.Sprintf(progressMsg, d.Type, d.Name))
status.SetProgress(cp)
status.SetRemainingDuration(rt)
}
}
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.migrations[%d]') FROM %s WHERE __key= '%s'`, i, StatusMapName, migrationID)
res, err := ci.Client().SQL().Execute(ctx, q)
Expand Down Expand Up @@ -104,7 +122,6 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca
execErr = err
break statusReaderLoop
}
status.SetProgress(m.CompletionPercentage)
switch m.Status {
case StatusComplete:
return nil, nil
Expand Down Expand Up @@ -135,12 +152,19 @@ func getDataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, mig
if err != nil {
return nil, err
}
rr, err := r.Get(0)
if err != nil {
return nil, err
}
if rr == nil {
return nil, noDataStructuresFoundErr
}
var status OverallMigrationStatus
if err = json.Unmarshal(r.(serialization.JSON), &status); err != nil {
if err = json.Unmarshal(rr.(serialization.JSON), &status); err != nil {
return nil, err
}
if len(status.Migrations) == 0 {
return nil, errors.New("no datastructures found to migrate")
return nil, noDataStructuresFoundErr
}
for _, m := range status.Migrations {
dss = append(dss, DataStructureInfo{
Expand Down Expand Up @@ -208,6 +232,7 @@ type OverallMigrationStatus struct {
Errors []string `json:"errors"`
Report string `json:"report"`
CompletionPercentage float32 `json:"completionPercentage"`
RemainingTime float32 `json:"remainingTime"`
Migrations []DataStructureMigrationStatus `json:"migrations"`
}

Expand All @@ -230,28 +255,84 @@ func fetchMigrationStatus(ctx context.Context, ci *hazelcast.ClientInternal, mig
if err != nil {
return "", migrationStatusNotFoundErr
}
return strings.TrimSuffix(strings.TrimPrefix(string(r.(serialization.JSON)), `"`), `"`), nil
rr, err := r.Get(0)
if err != nil {
return "", migrationStatusNotFoundErr
}
if rr == nil {
return "", migrationStatusNotFoundErr
}
return strings.TrimSuffix(strings.TrimPrefix(string(rr.(serialization.JSON)), `"`), `"`), nil
}

func fetchOverallProgress(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (time.Duration, float32, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.remainingTime'), JSON_QUERY(this, '$.completionPercentage') FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return 0, 0, err
}
if r == nil {
return 0, 0, errors.New("overall progress not found")
}
remainingTime, err := r.Get(0)
if err != nil {
return 0, 0, err
}
completionPercentage, err := r.Get(1)
if err != nil {
return 0, 0, err
}
if completionPercentage == nil {
return 0, 0, fmt.Errorf("completionPercentage is not available in %s", StatusMapName)
}
if remainingTime == nil {
return 0, 0, fmt.Errorf("remainingTime is not available in %s", StatusMapName)
}
rt, err := strconv.ParseInt(remainingTime.(serialization.JSON).String(), 10, 64)
if err != nil {
return 0, 0, err
}
cpStr := completionPercentage.(serialization.JSON).String()
cp, err := strconv.ParseFloat(cpStr, 32)
if err != nil {
return 0, 0, err
}
return time.Duration(rt) * time.Millisecond, float32(cp), nil
}

func fetchMigrationReport(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.report') FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return "", fmt.Errorf("migration report cannot be found: %w", err)
return "", fmt.Errorf(migrationReportNotFoundErr, err)
}
if r == nil {
return "", errors.New("migration report not found")
}
rr, err := r.Get(0)
if err != nil {
return "", fmt.Errorf(migrationReportNotFoundErr, err)
}
var t string
json.Unmarshal(r.(serialization.JSON), &t)
json.Unmarshal(rr.(serialization.JSON), &t)
return t, nil
}

func fetchMigrationErrors(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.errors' WITH WRAPPER) FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
row, err := querySingleRow(ctx, ci, q)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return "", err
}
if r == nil {
return "", errors.New("could not fetch migration errors")
}
rr, err := r.Get(0)
if err != nil {
return "", err
}
var errs []string
err = json.Unmarshal(row.(serialization.JSON), &errs)
err = json.Unmarshal(rr.(serialization.JSON), &errs)
if err != nil {
return "", err
}
Expand All @@ -271,27 +352,7 @@ func finalizeMigration(ctx context.Context, ec plug.ExecContext, ci *hazelcast.C
return nil
}

func maybePrintWarnings(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.warnings' WITH WRAPPER) FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
row, err := querySingleRow(ctx, ci, q)
if err != nil {
ec.Logger().Error(err)
return
}
var warnings []string
err = json.Unmarshal(row.(serialization.JSON), &warnings)
if err != nil {
ec.Logger().Error(err)
return
}
if len(warnings) <= 5 {
ec.PrintlnUnnecessary("* " + strings.Join(warnings, "\n* "))
} else {
ec.PrintlnUnnecessary(fmt.Sprintf("You have %d warnings that you can find in your migration report.", len(warnings)))
}
}

func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query string) (any, error) {
func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query string) (sql.Row, error) {
res, err := ci.Client().SQL().Execute(ctx, query)
if err != nil {
return nil, err
Expand All @@ -306,14 +367,39 @@ func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query str
if err != nil {
return nil, err
}
r, err := row.Get(0)
if err != nil {
return "", err
}
if r == nil {
return nil, errors.New("no rows found")
}
return r, nil
return row, nil
}
return nil, errors.New("no rows found")
}

func maybePrintWarnings(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.warnings' WITH WRAPPER) FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
ec.Logger().Error(err)
return
}
if r == nil {
ec.Logger().Info("could not find any warnings")
return
}
rr, err := r.Get(0)
if err != nil {
ec.Logger().Error(err)
return
}
if rr == nil {
return
}
var warnings []string
err = json.Unmarshal(rr.(serialization.JSON), &warnings)
if err != nil {
ec.Logger().Error(err)
return
}
if len(warnings) <= 5 {
ec.PrintlnUnnecessary("* " + strings.Join(warnings, "\n* "))
} else {
ec.PrintlnUnnecessary(fmt.Sprintf("You have %d warnings that you can find in your migration report.", len(warnings)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"finishTimestamp": "2023-01-01T00:01:00Z",
"type": "MIGRATION",
"completionPercentage": 12.123,
"remainingTime": 1,
"migrations": [
{
"name": "imap5",
Expand Down
Loading