Skip to content

Commit

Permalink
refactor remove listener
Browse files Browse the repository at this point in the history
  • Loading branch information
kmetin committed Sep 8, 2023
1 parent 999a3bf commit aed1d01
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 7 deletions.
9 changes: 8 additions & 1 deletion base/commands/migration/migration_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/hazelcast/hazelcast-commandline-client/internal/check"
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
"github.com/hazelcast/hazelcast-commandline-client/internal/prompt"
"github.com/hazelcast/hazelcast-go-client"
)

type StartCmd struct{}
Expand Down Expand Up @@ -45,7 +46,13 @@ Selected data structures in the source cluster will be migrated to the target cl
}
}
ec.PrintlnUnnecessary("")
sts := NewStartStages(MakeMigrationID(), ec.Args()[0])
var updateTopic *hazelcast.Topic
sts := NewStartStages(updateTopic, MakeMigrationID(), ec.Args()[0])
if !sts.topicListenerID.Default() && sts.updateTopic != nil {
if err := sts.updateTopic.RemoveListener(ctx, sts.topicListenerID); err != nil {
return err
}
}
sp := stage.NewFixedProvider(sts.Build(ctx, ec)...)
if err := stage.Execute(ctx, ec, sp); err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions base/commands/migration/start_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ var timeoutErr = fmt.Errorf("migration could not be completed: reached timeout w
"please ensure that you are using Hazelcast's migration cluster distribution and your DMT config points to that cluster: %w",
context.DeadlineExceeded)

func NewStartStages(migrationID, configDir string) *StartStages {
func NewStartStages(updateTopic *hazelcast.Topic, migrationID, configDir string) *StartStages {
if migrationID == "" {
panic("migrationID is required")
}
return &StartStages{
updateTopic: updateTopic,
migrationID: migrationID,
configDir: configDir,
}
Expand Down Expand Up @@ -133,7 +134,6 @@ func makeConfigBundle(configDir, migrationID string) (serialization.JSON, error)

func (st *StartStages) migrateStage(ctx context.Context, ec plug.ExecContext) func(statuser stage.Statuser) error {
return func(stage.Statuser) error {
defer st.updateTopic.RemoveListener(ctx, st.topicListenerID)
for {
select {
case msg := <-st.updateMsgChan:
Expand Down
6 changes: 2 additions & 4 deletions base/commands/migration/status_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/hazelcast/hazelcast-commandline-client/internal/plug"
"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/serialization"
"github.com/hazelcast/hazelcast-go-client/types"
)

type StatusStages struct {
Expand All @@ -20,7 +19,6 @@ type StatusStages struct {
migrationsInProgressList *hazelcast.List
statusMap *hazelcast.Map
updateTopic *hazelcast.Topic
topicListenerID types.UUID
updateMsgChan chan UpdateMessage
}

Expand Down Expand Up @@ -94,8 +92,8 @@ func (st *StatusStages) fetchStage(ctx context.Context, ec plug.ExecContext) fun
return nil
}
st.updateMsgChan = make(chan UpdateMessage)
st.topicListenerID, err = st.updateTopic.AddMessageListener(ctx, st.topicListener)
defer st.updateTopic.RemoveListener(ctx, st.topicListenerID)
id, err := st.updateTopic.AddMessageListener(ctx, st.topicListener)
defer st.updateTopic.RemoveListener(ctx, id)
for {
select {
case msg := <-st.updateMsgChan:
Expand Down

0 comments on commit aed1d01

Please sign in to comment.