Skip to content

Commit

Permalink
[CLC-426]: Show Overall Remaining Time and Completion Percentage (#423)
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin authored Nov 7, 2023
1 parent 0fd24c6 commit ead4607
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 38 deletions.
162 changes: 124 additions & 38 deletions base/commands/migration/migration_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"

"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/hazelcast/hazelcast-go-client/sql"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors"
Expand All @@ -26,6 +28,12 @@ var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout w

var migrationStatusNotFoundErr = fmt.Errorf("migration status not found")

var migrationReportNotFoundErr = "migration report cannot be found: %w"

var noDataStructuresFoundErr = errors.New("no datastructures found to migrate")

var progressMsg = "Migrating %s: %s"

func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) ([]stage.Stage[any], error) {
childCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
Expand All @@ -40,7 +48,7 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca
for i, d := range dss {
i := i
stages = append(stages, stage.Stage[any]{
ProgressMsg: fmt.Sprintf("Migrating %s: %s", d.Type, d.Name),
ProgressMsg: fmt.Sprintf(progressMsg, d.Type, d.Name),
SuccessMsg: fmt.Sprintf("Migrated %s: %s", d.Type, d.Name),
FailureMsg: fmt.Sprintf("Failed migrating %s: %s", d.Type, d.Name),
Func: func(ct context.Context, status stage.Statuser[any]) (any, error) {
Expand Down Expand Up @@ -76,6 +84,16 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca
case StatusCanceled, StatusCanceling:
execErr = clcerrors.ErrUserCancelled
break statusReaderLoop
case StatusInProgress:
rt, cp, err := fetchOverallProgress(ctx, ci, migrationID)
if err != nil {
ec.Logger().Error(err)
status.SetText("Unable to calculate remaining duration and progress")
} else {
status.SetText(fmt.Sprintf(progressMsg, d.Type, d.Name))
status.SetProgress(cp)
status.SetRemainingDuration(rt)
}
}
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.migrations[%d]') FROM %s WHERE __key= '%s'`, i, StatusMapName, migrationID)
res, err := ci.Client().SQL().Execute(ctx, q)
Expand Down Expand Up @@ -104,7 +122,6 @@ func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelca
execErr = err
break statusReaderLoop
}
status.SetProgress(m.CompletionPercentage)
switch m.Status {
case StatusComplete:
return nil, nil
Expand Down Expand Up @@ -135,12 +152,19 @@ func getDataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, mig
if err != nil {
return nil, err
}
rr, err := r.Get(0)
if err != nil {
return nil, err
}
if rr == nil {
return nil, noDataStructuresFoundErr
}
var status OverallMigrationStatus
if err = json.Unmarshal(r.(serialization.JSON), &status); err != nil {
if err = json.Unmarshal(rr.(serialization.JSON), &status); err != nil {
return nil, err
}
if len(status.Migrations) == 0 {
return nil, errors.New("no datastructures found to migrate")
return nil, noDataStructuresFoundErr
}
for _, m := range status.Migrations {
dss = append(dss, DataStructureInfo{
Expand Down Expand Up @@ -208,6 +232,7 @@ type OverallMigrationStatus struct {
Errors []string `json:"errors"`
Report string `json:"report"`
CompletionPercentage float32 `json:"completionPercentage"`
RemainingTime float32 `json:"remainingTime"`
Migrations []DataStructureMigrationStatus `json:"migrations"`
}

Expand All @@ -230,28 +255,84 @@ func fetchMigrationStatus(ctx context.Context, ci *hazelcast.ClientInternal, mig
if err != nil {
return "", migrationStatusNotFoundErr
}
return strings.TrimSuffix(strings.TrimPrefix(string(r.(serialization.JSON)), `"`), `"`), nil
rr, err := r.Get(0)
if err != nil {
return "", migrationStatusNotFoundErr
}
if rr == nil {
return "", migrationStatusNotFoundErr
}
return strings.TrimSuffix(strings.TrimPrefix(string(rr.(serialization.JSON)), `"`), `"`), nil
}

func fetchOverallProgress(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (time.Duration, float32, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.remainingTime'), JSON_QUERY(this, '$.completionPercentage') FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return 0, 0, err
}
if r == nil {
return 0, 0, errors.New("overall progress not found")
}
remainingTime, err := r.Get(0)
if err != nil {
return 0, 0, err
}
completionPercentage, err := r.Get(1)
if err != nil {
return 0, 0, err
}
if completionPercentage == nil {
return 0, 0, fmt.Errorf("completionPercentage is not available in %s", StatusMapName)
}
if remainingTime == nil {
return 0, 0, fmt.Errorf("remainingTime is not available in %s", StatusMapName)
}
rt, err := strconv.ParseInt(remainingTime.(serialization.JSON).String(), 10, 64)
if err != nil {
return 0, 0, err
}
cpStr := completionPercentage.(serialization.JSON).String()
cp, err := strconv.ParseFloat(cpStr, 32)
if err != nil {
return 0, 0, err
}
return time.Duration(rt) * time.Millisecond, float32(cp), nil
}

func fetchMigrationReport(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.report') FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return "", fmt.Errorf("migration report cannot be found: %w", err)
return "", fmt.Errorf(migrationReportNotFoundErr, err)
}
if r == nil {
return "", errors.New("migration report not found")
}
rr, err := r.Get(0)
if err != nil {
return "", fmt.Errorf(migrationReportNotFoundErr, err)
}
var t string
json.Unmarshal(r.(serialization.JSON), &t)
json.Unmarshal(rr.(serialization.JSON), &t)
return t, nil
}

func fetchMigrationErrors(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.errors' WITH WRAPPER) FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
row, err := querySingleRow(ctx, ci, q)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
return "", err
}
if r == nil {
return "", errors.New("could not fetch migration errors")
}
rr, err := r.Get(0)
if err != nil {
return "", err
}
var errs []string
err = json.Unmarshal(row.(serialization.JSON), &errs)
err = json.Unmarshal(rr.(serialization.JSON), &errs)
if err != nil {
return "", err
}
Expand All @@ -271,27 +352,7 @@ func finalizeMigration(ctx context.Context, ec plug.ExecContext, ci *hazelcast.C
return nil
}

func maybePrintWarnings(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.warnings' WITH WRAPPER) FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
row, err := querySingleRow(ctx, ci, q)
if err != nil {
ec.Logger().Error(err)
return
}
var warnings []string
err = json.Unmarshal(row.(serialization.JSON), &warnings)
if err != nil {
ec.Logger().Error(err)
return
}
if len(warnings) <= 5 {
ec.PrintlnUnnecessary("* " + strings.Join(warnings, "\n* "))
} else {
ec.PrintlnUnnecessary(fmt.Sprintf("You have %d warnings that you can find in your migration report.", len(warnings)))
}
}

func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query string) (any, error) {
func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query string) (sql.Row, error) {
res, err := ci.Client().SQL().Execute(ctx, query)
if err != nil {
return nil, err
Expand All @@ -306,14 +367,39 @@ func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query str
if err != nil {
return nil, err
}
r, err := row.Get(0)
if err != nil {
return "", err
}
if r == nil {
return nil, errors.New("no rows found")
}
return r, nil
return row, nil
}
return nil, errors.New("no rows found")
}

func maybePrintWarnings(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.warnings' WITH WRAPPER) FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
r, err := querySingleRow(ctx, ci, q)
if err != nil {
ec.Logger().Error(err)
return
}
if r == nil {
ec.Logger().Info("could not find any warnings")
return
}
rr, err := r.Get(0)
if err != nil {
ec.Logger().Error(err)
return
}
if rr == nil {
return
}
var warnings []string
err = json.Unmarshal(rr.(serialization.JSON), &warnings)
if err != nil {
ec.Logger().Error(err)
return
}
if len(warnings) <= 5 {
ec.PrintlnUnnecessary("* " + strings.Join(warnings, "\n* "))
} else {
ec.PrintlnUnnecessary(fmt.Sprintf("You have %d warnings that you can find in your migration report.", len(warnings)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"finishTimestamp": "2023-01-01T00:01:00Z",
"type": "MIGRATION",
"completionPercentage": 12.123,
"remainingTime": 1,
"migrations": [
{
"name": "imap5",
Expand Down

0 comments on commit ead4607

Please sign in to comment.