Skip to content

Commit

Permalink
subscribe to topic if status is not terminal
Browse files Browse the repository at this point in the history
  • Loading branch information
kutluhanmetin committed Sep 7, 2023
1 parent d5591a0 commit 48a42ce
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 40 deletions.
11 changes: 5 additions & 6 deletions base/commands/migration/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@ import (
)

type MigrationStatus struct {
Status Status `json:"status"`
Logs []string `json:"logs"`
Errors []string `json:"errors"`
Report string `json:"report"`
Migrations []Migration `json:"migrations"`
CompletionPercentage float32 `json:"completionPercentage"`
Status Status `json:"status"`
Logs []string `json:"logs"`
Errors []string `json:"errors"`
Report string `json:"report"`
CompletionPercentage float32 `json:"completionPercentage"`
}

type Migration struct {
Expand Down
15 changes: 10 additions & 5 deletions base/commands/migration/status_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,22 @@ func (st *StatusStages) connectStage(ctx context.Context, ec plug.ExecContext) f
return err
}
st.updateTopic, err = st.ci.Client().GetTopic(ctx, MakeUpdateTopicName(st.migrationID))
if err != nil {
return err
}
st.updateMsgChan = make(chan UpdateMessage)
st.topicListenerID, err = st.updateTopic.AddMessageListener(ctx, st.topicListener)
return err
}
}

func (st *StatusStages) fetchStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error {
return func(stage.Statuser) error {
ms, err := readMigrationStatus(ctx, st.statusMap)
if err != nil {
return fmt.Errorf("reading status: %w", err)
}
if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, ms.Status) {
ec.PrintlnUnnecessary(ms.Report)
return nil
}
st.updateMsgChan = make(chan UpdateMessage)
st.topicListenerID, err = st.updateTopic.AddMessageListener(ctx, st.topicListener)
defer st.updateTopic.RemoveListener(ctx, st.topicListenerID)
for {
select {
Expand Down
55 changes: 26 additions & 29 deletions base/commands/migration/status_stages_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/hazelcast/hazelcast-commandline-client/base/commands/migration"
. "github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/it"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -67,9 +66,9 @@ Hazelcast Data Migration Tool v5.3.0
OK [1/2] Connected to the migration cluster.
first message
Completion Percentage: 80.000000
Completion Percentage: 60.000000
last message
Completion Percentage: 80.000000
Completion Percentage: 100.000000
status report
OK [2/2] Fetched migration status.
Expand All @@ -78,48 +77,46 @@ OK`)
}

func preStatusRunner(t *testing.T, tcx it.TestContext, ctx context.Context) string {
// create a migration in the __datamigrations_in_progress list
mID := migration.MakeMigrationID()
l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList))
m := MustValue(json.Marshal(migration.MigrationInProgress{
MigrationID: mID,
}))
ok := MustValue(l.Add(ctx, serialization.JSON(m)))
require.Equal(t, true, ok)
// create a record in the status map
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.MakeStatusMapName(mID)))
st := MustValue(json.Marshal(migration.MigrationStatus{
Status: migration.StatusInProgress,
Report: "status report",
CompletionPercentage: 60,
}))
Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(st)))
return mID
}

func statusRunner(t *testing.T, migrationID string, tcx it.TestContext, ctx context.Context) {
// publish the first message in the update topic
updateTopic := MustValue(tcx.Client.GetTopic(ctx, migration.MakeUpdateTopicName(migrationID)))
msg := MustValue(json.Marshal(migration.UpdateMessage{Status: migration.StatusInProgress, Message: "first message", CompletionPercentage: 60}))
Must(updateTopic.Publish(ctx, serialization.JSON(msg)))
// create a terminal record in status map
statusMap := MustValue(tcx.Client.GetMap(ctx, migration.MakeStatusMapName(migrationID)))
topic := MustValue(tcx.Client.GetTopic(ctx, migration.MakeUpdateTopicName(migrationID)))
setState(ctx, topic, statusMap, migration.StatusInProgress, "first message")
setState(ctx, topic, statusMap, migration.StatusFailed, "last message")
st := MustValue(json.Marshal(migration.MigrationStatus{
Status: migration.StatusComplete,
Report: "status report",
CompletionPercentage: 100,
}))
Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(st)))
// publish the second message in the update topic
msg = MustValue(json.Marshal(migration.UpdateMessage{Status: migration.StatusComplete, Message: "last message", CompletionPercentage: 100}))
Must(updateTopic.Publish(ctx, serialization.JSON(msg)))
// remove the migration from the __datamigrations_in_progress list
l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList))
m := MustValue(json.Marshal(migration.MigrationInProgress{
MigrationID: migrationID,
}))
ok := MustValue(l.Remove(ctx, serialization.JSON(m)))
require.Equal(t, true, ok)
}

func setState(ctx context.Context, updateTopic *hazelcast.Topic, statusMap *hazelcast.Map, status migration.Status, msg string) {
startTime := MustValue(time.Parse(time.RFC3339, "2023-01-01T00:00:00Z"))
st := MustValue(json.Marshal(migration.MigrationStatus{
Status: status,
Report: "status report",
CompletionPercentage: 12.123,
Migrations: []migration.Migration{
{
Name: "imap5",
Type: "IMap",
Status: status,
StartTimestamp: startTime,
EntriesMigrated: 141,
TotalEntries: 1000,
CompletionPercentage: 14.1,
},
},
}))
message := MustValue(json.Marshal(migration.UpdateMessage{Status: status, Message: msg, CompletionPercentage: 80}))
Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(st)))
Must(updateTopic.Publish(ctx, serialization.JSON(message)))
}

0 comments on commit 48a42ce

Please sign in to comment.