Skip to content

Commit

Permalink
[CLC-313]: Add Estimate Command to DMT (#422)
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin authored Nov 7, 2023
1 parent b9e7c18 commit 0fd24c6
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 19 deletions.
1 change: 1 addition & 0 deletions base/commands/migration/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package migration

const (
StartQueueName = "__datamigration_start_queue"
EstimateQueueName = "__datamigration_estimate_queue"
StatusMapName = "__datamigration_migrations"
UpdateTopicPrefix = "__datamigration_updates_"
DebugLogsListPrefix = "__datamigration_debug_logs_"
Expand Down
56 changes: 56 additions & 0 deletions base/commands/migration/estimate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//go:build std || migration

package migration

import (
"context"
"fmt"

"github.com/hazelcast/hazelcast-commandline-client/clc/paths"
"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
"github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
)

type EstimateCmd struct{}

func (e EstimateCmd) Init(cc plug.InitContext) error {
cc.SetCommandUsage("estimate")
cc.SetCommandGroup("migration")
help := "Estimate migration"
cc.SetCommandHelp(help, help)
cc.AddStringArg(argDMTConfig, argTitleDMTConfig)
return nil
}

func (e EstimateCmd) Exec(ctx context.Context, ec plug.ExecContext) error {
ec.PrintlnUnnecessary("")
ec.PrintlnUnnecessary(fmt.Sprintf(`%s
Estimation usually ends within 15 seconds.`, banner))
conf := ec.GetStringArg(argDMTConfig)
if !paths.Exists(conf) {
return fmt.Errorf("migration config does not exist: %s", conf)
}
mID := MakeMigrationID()
stages, err := NewEstimateStages(ec.Logger(), mID, conf)
if err != nil {
return err
}
sp := stage.NewFixedProvider(stages.Build(ctx, ec)...)
res, err := stage.Execute(ctx, ec, any(nil), sp)
if err != nil {
return err
}
resArr := res.([]string)
ec.PrintlnUnnecessary("")
ec.PrintlnUnnecessary(resArr[0])
ec.PrintlnUnnecessary(resArr[1])
ec.PrintlnUnnecessary("")
ec.PrintlnUnnecessary("OK Estimation completed successfully.")
return nil
}

func init() {
check.Must(plug.Registry.RegisterCommand("estimate", &EstimateCmd{}))
}
56 changes: 56 additions & 0 deletions base/commands/migration/estimate_it_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//go:build std || migration

package migration_test

import (
"context"
"encoding/json"
"os"
"sync"
"testing"
"time"

"github.com/hazelcast/hazelcast-commandline-client/base/commands/migration"
"github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/it"
"github.com/hazelcast/hazelcast-go-client/serialization"
)

func TestEstimate(t *testing.T) {
tcx := it.TestContext{T: t}
ctx := context.Background()
tcx.Tester(func(tcx it.TestContext) {
createMapping(ctx, tcx)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
check.Must(tcx.CLC().Execute(ctx, "estimate", "testdata/dmt_config"))
}()
c := make(chan string)
go findEstimationID(ctx, tcx, c)
mID := <-c
go estimateRunner(ctx, tcx, mID)
wg.Wait()
tcx.AssertStdoutContains("OK Estimation completed successfully")
})
}

func estimateRunner(ctx context.Context, tcx it.TestContext, migrationID string) {
statusMap := check.MustValue(tcx.Client.GetMap(ctx, migration.StatusMapName))
b := check.MustValue(os.ReadFile("testdata/estimate/estimate_completed.json"))
check.Must(statusMap.Set(ctx, migrationID, serialization.JSON(b)))
}

func findEstimationID(ctx context.Context, tcx it.TestContext, c chan string) {
q := check.MustValue(tcx.Client.GetQueue(ctx, migration.EstimateQueueName))
var b migration.ConfigBundle
for {
v := check.MustValue(q.PollWithTimeout(ctx, 100*time.Millisecond))
if v != nil {
check.Must(json.Unmarshal(v.(serialization.JSON), &b))
c <- b.MigrationID
break
}
}
}
179 changes: 179 additions & 0 deletions base/commands/migration/estimate_stages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
//go:build std || migration

package migration

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
"github.com/hazelcast/hazelcast-commandline-client/internal/log"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
)

type EstimateStages struct {
migrationID string
configDir string
ci *hazelcast.ClientInternal
estimateQueue *hazelcast.Queue
statusMap *hazelcast.Map
logger log.Logger
}

func NewEstimateStages(logger log.Logger, migrationID, configDir string) (*EstimateStages, error) {
if migrationID == "" {
return nil, errors.New("migrationID is required")
}
return &EstimateStages{
migrationID: migrationID,
configDir: configDir,
logger: logger,
}, nil
}

func (es *EstimateStages) Build(ctx context.Context, ec plug.ExecContext) []stage.Stage[any] {
return []stage.Stage[any]{
{
ProgressMsg: "Connecting to the migration cluster",
SuccessMsg: "Connected to the migration cluster",
FailureMsg: "Could not connect to the migration cluster",
Func: es.connectStage(ec),
},
{
ProgressMsg: "Estimating the migration",
SuccessMsg: "Estimated the migration",
FailureMsg: "Could not estimate the migration",
Func: es.estimateStage(),
},
}
}

func (es *EstimateStages) connectStage(ec plug.ExecContext) func(context.Context, stage.Statuser[any]) (any, error) {
return func(ctx context.Context, s stage.Statuser[any]) (any, error) {
var err error
es.ci, err = ec.ClientInternal(ctx)
if err != nil {
return nil, err
}
es.estimateQueue, err = es.ci.Client().GetQueue(ctx, EstimateQueueName)
if err != nil {
return nil, fmt.Errorf("retrieving the estimate Queue: %w", err)
}
es.statusMap, err = es.ci.Client().GetMap(ctx, StatusMapName)
if err != nil {
return nil, fmt.Errorf("retrieving the status Map: %w", err)
}
return nil, nil
}
}

func (es *EstimateStages) estimateStage() func(context.Context, stage.Statuser[any]) (any, error) {
return func(ctx context.Context, status stage.Statuser[any]) (any, error) {
cb, err := makeConfigBundle(es.configDir, es.migrationID)
if err != nil {
return nil, fmt.Errorf("making configuration bundle: %w", err)
}
if err = es.estimateQueue.Put(ctx, cb); err != nil {
return nil, fmt.Errorf("updating estimate Queue: %w", err)
}
childCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err = waitForEstimationToComplete(childCtx, es.ci, es.migrationID, status); err != nil {
return nil, fmt.Errorf("waiting for estimation to complete: %w", err)
}
return fetchEstimationResults(ctx, es.ci, es.migrationID)
}
}

func waitForEstimationToComplete(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string, stage stage.Statuser[any]) error {
duration := 16 * time.Second
interval := 1 * time.Second
for {
if duration > 0 {
stage.SetRemainingDuration(duration)
} else {
stage.SetText("Estimation took longer than expected.")
}
status, err := fetchMigrationStatus(ctx, ci, migrationID)
if err != nil {
if errors.Is(err, migrationStatusNotFoundErr) {
// migration status will not be available for a while, so we should wait for it
continue
}
return err
}
if Status(status) == StatusFailed {
errs, err := fetchMigrationErrors(ctx, ci, migrationID)
if err != nil {
return fmt.Errorf("estimation failed and dmt cannot fetch estimation errors: %w", err)
}
return errors.New(errs)
}
if Status(status) == StatusComplete {
return nil
}
time.Sleep(interval)
duration = duration - interval
}
}

func fetchEstimationResults(ctx context.Context, ci *hazelcast.ClientInternal, migrationID string) ([]string, error) {
q := fmt.Sprintf(`SELECT JSON_QUERY(this, '$.estimatedTime'), JSON_QUERY(this, '$.estimatedSize') FROM %s WHERE __key='%s'`, StatusMapName, migrationID)
res, err := ci.Client().SQL().Execute(ctx, q)
if err != nil {
return nil, err
}
it, err := res.Iterator()
if err != nil {
return nil, err
}
if it.HasNext() {
// single iteration is enough that we are reading single result for a single migration
row, err := it.Next()
if err != nil {
return nil, err
}
estimatedTime, err := row.Get(0)
if err != nil {
return nil, err
}
estimatedSize, err := row.Get(1)
if err != nil {
return nil, err
}
et, err := msToSecs(estimatedTime.(serialization.JSON).String())
if err != nil {
return nil, err
}
es, err := bytesToMegabytes(estimatedSize.(serialization.JSON).String())
if err != nil {
return nil, err
}
return []string{fmt.Sprintf("Estimated Size: %s ", es), fmt.Sprintf("Estimated Time: %s", et)}, nil
}
return nil, errors.New("no rows found")
}

func bytesToMegabytes(bytesStr string) (string, error) {
bytes, err := strconv.ParseFloat(bytesStr, 64)
if err != nil {
return "", err
}
mb := bytes / (1024.0 * 1024.0)
return fmt.Sprintf("%.2f MBs", mb), nil
}

func msToSecs(ms string) (string, error) {
milliseconds, err := strconv.ParseInt(ms, 10, 64)
if err != nil {
return "", err
}
seconds := float64(milliseconds) / 1000.0
secondsStr := fmt.Sprintf("%.1f sec", seconds)
return secondsStr, nil
}
8 changes: 7 additions & 1 deletion base/commands/migration/migration_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package migration

import (
"context"
"fmt"

"github.com/hazelcast/hazelcast-commandline-client/clc"
"github.com/hazelcast/hazelcast-commandline-client/clc/paths"
"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
clcerrors "github.com/hazelcast/hazelcast-commandline-client/errors"
"github.com/hazelcast/hazelcast-commandline-client/internal/check"
Expand Down Expand Up @@ -41,6 +43,10 @@ func (StartCmd) Exec(ctx context.Context, ec plug.ExecContext) (err error) {
Selected data structures in the source cluster will be migrated to the target cluster.
`)
conf := ec.GetStringArg(argDMTConfig)
if !paths.Exists(conf) {
return fmt.Errorf("migration config does not exist: %s", conf)
}
if !ec.Props().GetBool(clc.FlagAutoYes) {
p := prompt.New(ec.Stdin(), ec.Stdout())
yes, err := p.YesNo("Proceed?")
Expand All @@ -60,7 +66,7 @@ Selected data structures in the source cluster will be migrated to the target cl
err = finalizeErr
}
}()
sts, err := NewStartStages(ec.Logger(), mID, ec.GetStringArg(argDMTConfig))
sts, err := NewStartStages(ec.Logger(), mID, conf)
if err != nil {
return err
}
Expand Down
15 changes: 0 additions & 15 deletions base/commands/migration/start_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ package migration

import (
"context"
"encoding/json"
"errors"
"fmt"

"github.com/hazelcast/hazelcast-commandline-client/clc/ux/stage"
"github.com/hazelcast/hazelcast-commandline-client/internal/log"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
)

type StartStages struct {
Expand Down Expand Up @@ -83,16 +81,3 @@ func (st *StartStages) startStage() func(context.Context, stage.Statuser[any]) (
return nil, nil
}
}

func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error) {
var cb ConfigBundle
cb.MigrationID = migrationID
if err := cb.Walk(configDir); err != nil {
return nil, err
}
b, err := json.Marshal(cb)
if err != nil {
return nil, err
}
return b, nil
}
2 changes: 1 addition & 1 deletion base/commands/migration/start_stages_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func startMigrationTest(t *testing.T, expectedErr error, statusMapStateFiles []s
var execErr error
go tcx.WithReset(func() {
defer wg.Done()
execErr = tcx.CLC().Execute(ctx, "start", "dmt-config", "--yes", "-o", outDir)
execErr = tcx.CLC().Execute(ctx, "start", "testdata/dmt_config", "--yes", "-o", outDir)
})
c := make(chan string)
go findMigrationID(ctx, tcx, c)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"status": "COMPLETED",
"estimatedTime": 1,
"estimatedSize": 1
}
Loading

0 comments on commit 0fd24c6

Please sign in to comment.