diff --git a/base/commands/migration/common.go b/base/commands/migration/common.go index 79a13ca3..218b076e 100644 --- a/base/commands/migration/common.go +++ b/base/commands/migration/common.go @@ -11,12 +11,11 @@ import ( ) type MigrationStatus struct { - Status Status `json:"status"` - Logs []string `json:"logs"` - Errors []string `json:"errors"` - Report string `json:"report"` - Migrations []Migration `json:"migrations"` - CompletionPercentage float32 `json:"completionPercentage"` + Status Status `json:"status"` + Logs []string `json:"logs"` + Errors []string `json:"errors"` + Report string `json:"report"` + CompletionPercentage float32 `json:"completionPercentage"` } type Migration struct { diff --git a/base/commands/migration/status_stages.go b/base/commands/migration/status_stages.go index 3e9c60da..c9ecad15 100644 --- a/base/commands/migration/status_stages.go +++ b/base/commands/migration/status_stages.go @@ -79,17 +79,22 @@ func (st *StatusStages) connectStage(ctx context.Context, ec plug.ExecContext) f return err } st.updateTopic, err = st.ci.Client().GetTopic(ctx, MakeUpdateTopicName(st.migrationID)) - if err != nil { - return err - } - st.updateMsgChan = make(chan UpdateMessage) - st.topicListenerID, err = st.updateTopic.AddMessageListener(ctx, st.topicListener) return err } } func (st *StatusStages) fetchStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error { return func(stage.Statuser) error { + ms, err := readMigrationStatus(ctx, st.statusMap) + if err != nil { + return fmt.Errorf("reading status: %w", err) + } + if slices.Contains([]Status{StatusComplete, StatusFailed, StatusCanceled}, ms.Status) { + ec.PrintlnUnnecessary(ms.Report) + return nil + } + st.updateMsgChan = make(chan UpdateMessage) + st.topicListenerID, err = st.updateTopic.AddMessageListener(ctx, st.topicListener) defer st.updateTopic.RemoveListener(ctx, st.topicListenerID) for { select { diff --git a/base/commands/migration/status_stages_it_test.go b/base/commands/migration/status_stages_it_test.go index 8584c00b..c49de813 100644 --- a/base/commands/migration/status_stages_it_test.go +++ b/base/commands/migration/status_stages_it_test.go @@ -14,7 +14,6 @@ import ( "github.com/hazelcast/hazelcast-commandline-client/base/commands/migration" . "github.com/hazelcast/hazelcast-commandline-client/internal/check" "github.com/hazelcast/hazelcast-commandline-client/internal/it" - "github.com/hazelcast/hazelcast-go-client" "github.com/hazelcast/hazelcast-go-client/serialization" "github.com/stretchr/testify/require" ) @@ -67,9 +66,9 @@ Hazelcast Data Migration Tool v5.3.0 OK [1/2] Connected to the migration cluster. first message -Completion Percentage: 80.000000 +Completion Percentage: 60.000000 last message -Completion Percentage: 80.000000 +Completion Percentage: 100.000000 status report OK [2/2] Fetched migration status. @@ -78,6 +77,7 @@ OK`) } func preStatusRunner(t *testing.T, tcx it.TestContext, ctx context.Context) string { + // create a migration in the __datamigrations_in_progress list mID := migration.MakeMigrationID() l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList)) m := MustValue(json.Marshal(migration.MigrationInProgress{ @@ -85,14 +85,34 @@ func preStatusRunner(t *testing.T, tcx it.TestContext, ctx context.Context) stri })) ok := MustValue(l.Add(ctx, serialization.JSON(m))) require.Equal(t, true, ok) + // create a record in the status map + statusMap := MustValue(tcx.Client.GetMap(ctx, migration.MakeStatusMapName(mID))) + st := MustValue(json.Marshal(migration.MigrationStatus{ + Status: migration.StatusInProgress, + Report: "status report", + CompletionPercentage: 60, + })) + Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(st))) return mID } func statusRunner(t *testing.T, migrationID string, tcx it.TestContext, ctx context.Context) { + // publish the first message in the update topic + updateTopic := MustValue(tcx.Client.GetTopic(ctx, migration.MakeUpdateTopicName(migrationID))) + msg := MustValue(json.Marshal(migration.UpdateMessage{Status: migration.StatusInProgress, Message: "first message", CompletionPercentage: 60})) + Must(updateTopic.Publish(ctx, serialization.JSON(msg))) + // create a terminal record in status map statusMap := MustValue(tcx.Client.GetMap(ctx, migration.MakeStatusMapName(migrationID))) - topic := MustValue(tcx.Client.GetTopic(ctx, migration.MakeUpdateTopicName(migrationID))) - setState(ctx, topic, statusMap, migration.StatusInProgress, "first message") - setState(ctx, topic, statusMap, migration.StatusFailed, "last message") + st := MustValue(json.Marshal(migration.MigrationStatus{ + Status: migration.StatusComplete, + Report: "status report", + CompletionPercentage: 100, + })) + Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(st))) + // publish the second message in the update topic + msg = MustValue(json.Marshal(migration.UpdateMessage{Status: migration.StatusComplete, Message: "last message", CompletionPercentage: 100})) + Must(updateTopic.Publish(ctx, serialization.JSON(msg))) + // remove the migration from the __datamigrations_in_progress list l := MustValue(tcx.Client.GetList(ctx, migration.MigrationsInProgressList)) m := MustValue(json.Marshal(migration.MigrationInProgress{ MigrationID: migrationID, @@ -100,26 +120,3 @@ func statusRunner(t *testing.T, migrationID string, tcx it.TestContext, ctx cont ok := MustValue(l.Remove(ctx, serialization.JSON(m))) require.Equal(t, true, ok) } - -func setState(ctx context.Context, updateTopic *hazelcast.Topic, statusMap *hazelcast.Map, status migration.Status, msg string) { - startTime := MustValue(time.Parse(time.RFC3339, "2023-01-01T00:00:00Z")) - st := MustValue(json.Marshal(migration.MigrationStatus{ - Status: status, - Report: "status report", - CompletionPercentage: 12.123, - Migrations: []migration.Migration{ - { - Name: "imap5", - Type: "IMap", - Status: status, - StartTimestamp: startTime, - EntriesMigrated: 141, - TotalEntries: 1000, - CompletionPercentage: 14.1, - }, - }, - })) - message := MustValue(json.Marshal(migration.UpdateMessage{Status: status, Message: msg, CompletionPercentage: 80})) - Must(statusMap.Set(ctx, migration.StatusMapEntryName, serialization.JSON(st))) - Must(updateTopic.Publish(ctx, serialization.JSON(message))) -}