diff --git a/base/commands/migration/migration_stages.go b/base/commands/migration/migration_stages.go index bf55734c..44702f77 100644 --- a/base/commands/migration/migration_stages.go +++ b/base/commands/migration/migration_stages.go @@ -15,6 +15,7 @@ import ( "github.com/hazelcast/hazelcast-go-client" "github.com/hazelcast/hazelcast-go-client/serialization" + "github.com/hazelcast/hazelcast-go-client/sql" "github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage" clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors" @@ -27,6 +28,8 @@ var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout w var migrationStatusNotFoundErr = fmt.Errorf("migration status not found") +var migrationReportNotFoundErr = "migration report cannot be found: %w" + var progressMsg = "Migrating %s: %s" func createMigrationStages(ctx context.Context, ec plug.ExecContext, ci *hazelcast.ClientInternal, migrationID string) ([]stage.Stage[any], error) { @@ -147,8 +150,12 @@ func getDataStructuresToBeMigrated(ctx context.Context, ec plug.ExecContext, mig if err != nil { return nil, err } + rr, err := r.Get(0) + if err != nil { + return nil, err + } var status OverallMigrationStatus - if err = json.Unmarshal(r.(serialization.JSON), &status); err != nil { + if err = json.Unmarshal(rr.(serialization.JSON), &status); err != nil { return nil, err } if len(status.Migrations) == 0 { @@ -243,70 +250,70 @@ func fetchMigrationStatus(ctx context.Context, ci *hazelcast.ClientInternal, mig if err != nil { return "", migrationStatusNotFoundErr } - return strings.TrimSuffix(strings.TrimPrefix(string(r.(serialization.JSON)), `"`), `"`), nil + rr, err := r.Get(0) + if err != nil { + return "", migrationStatusNotFoundErr + } + return strings.TrimSuffix(strings.TrimPrefix(string(rr.(serialization.JSON)), `"`), `"`), nil } func fetchOverallProgress(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (time.Duration, float32, error) { q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.remainingTime'), JSON_QUERY(this, '$.completionPercentage') FROM %s WHERE __key='%s'`, StatusMapName, migrationID) - res, err := ci.Client().SQL().Execute(ctx, q) + r, err := querySingleRow(ctx, ci, q) if err != nil { return 0, 0, err } - it, err := res.Iterator() + remainingTime, err := r.Get(0) if err != nil { return 0, 0, err } - if it.HasNext() { - // single iteration is enough that we are reading single result for a single migration - row, err := it.Next() - if err != nil { - return 0, 0, err - } - remainingTime, err := row.Get(0) - if err != nil { - return 0, 0, err - } - completionPercentage, err := row.Get(1) - if err != nil { - return 0, 0, err - } - if completionPercentage == nil { - return 0, 0, fmt.Errorf("completionPercentage is not available in %s", StatusMapName) - } - if remainingTime == nil { - return 0, 0, fmt.Errorf("remainingTime is not available in %s", StatusMapName) - } - rt, err := strconv.ParseInt(remainingTime.(serialization.JSON).String(), 10, 64) - if err != nil { - return 0, 0, err - } - cpStr := completionPercentage.(serialization.JSON).String() - cp, err := strconv.ParseFloat(cpStr, 32) - if err != nil { - return 0, 0, err - } - return time.Duration(rt) * time.Millisecond, float32(cp), nil + completionPercentage, err := r.Get(1) + if err != nil { + return 0, 0, err + } + if completionPercentage == nil { + return 0, 0, fmt.Errorf("completionPercentage is not available in %s", StatusMapName) } - return 0, 0, errors.New("no rows found") + if remainingTime == nil { + return 0, 0, fmt.Errorf("remainingTime is not available in %s", StatusMapName) + } + rt, err := strconv.ParseInt(remainingTime.(serialization.JSON).String(), 10, 64) + if err != nil { + return 0, 0, err + } + cpStr := completionPercentage.(serialization.JSON).String() + cp, err := strconv.ParseFloat(cpStr, 32) + if err != nil { + return 0, 0, err + } + return time.Duration(rt) * time.Millisecond, float32(cp), nil } func fetchMigrationReport(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) { q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.report') FROM %s WHERE __key='%s'`, StatusMapName, migrationID) r, err := querySingleRow(ctx, ci, q) if err != nil { - return "", fmt.Errorf("migration report cannot be found: %w", err) + return "", fmt.Errorf(migrationReportNotFoundErr, err) } - return strings.ReplaceAll(string(r.(serialization.JSON)), `\"`, ``), nil + rr, err := r.Get(0) + if err != nil { + return "", fmt.Errorf(migrationReportNotFoundErr, err) + } + return strings.ReplaceAll(string(rr.(serialization.JSON)), `\"`, ``), nil } func fetchMigrationErrors(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) (string, error) { q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.errors' WITH WRAPPER) FROM %s WHERE __key='%s'`, StatusMapName, migrationID) - row, err := querySingleRow(ctx, ci, q) + r, err := querySingleRow(ctx, ci, q) + if err != nil { + return "", err + } + rr, err := r.Get(0) if err != nil { return "", err } var errs []string - err = json.Unmarshal(row.(serialization.JSON), &errs) + err = json.Unmarshal(rr.(serialization.JSON), &errs) if err != nil { return "", err } @@ -326,7 +333,7 @@ func finalizeMigration(ctx context.Context, ec plug.ExecContext, ci *hazelcast.C return nil } -func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query string) (any, error) { +func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query string) (sql.Row, error) { res, err := ci.Client().SQL().Execute(ctx, query) if err != nil { return nil, err @@ -341,11 +348,7 @@ func querySingleRow(ctx context.Context, ci *hazelcast.ClientInternal, query str if err != nil { return nil, err } - r, err := row.Get(0) - if err != nil { - return "", err - } - return r, nil + return row, nil } return nil, errors.New("no rows found") }