Skip to content

Commit

Permalink
Node: p2p.Run not always subscribing to heartbeat channel when it should
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Nov 4, 2024
1 parent 02f468f commit daf8d3a
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 2 deletions.
5 changes: 5 additions & 0 deletions node/pkg/common/guardianset.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,3 +214,8 @@ func (st *GuardianSetState) Cleanup() {
}
}
}

// IsSubscribedToHeartbeats returns true if the heartbeat update channel is set.
func (st *GuardianSetState) IsSubscribedToHeartbeats() bool {
return st.updateC != nil
}
10 changes: 10 additions & 0 deletions node/pkg/common/guardianset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"reflect"
"testing"

gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
Expand Down Expand Up @@ -122,3 +123,12 @@ func TestGet(t *testing.T) {
gss.Set(&gs)
assert.Equal(t, gss.Get(), &gs)
}

func TestIsSubscribedToHeartbeats(t *testing.T) {
heartbeatC := make(chan *gossipv1.Heartbeat, 20000)
gst1 := NewGuardianSetState(heartbeatC)
assert.True(t, gst1.IsSubscribedToHeartbeats())

gst2 := NewGuardianSetState(nil)
assert.False(t, gst2.IsSubscribedToHeartbeats())
}
4 changes: 2 additions & 2 deletions node/pkg/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
var controlSubscription, attestationSubscription, vaaSubscription *pubsub.Subscription

// Set up the control channel. ////////////////////////////////////////////////////////////////////
if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil {
if params.nodeName != "" || params.gossipControlSendC != nil || params.obsvReqSendC != nil || params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil || params.gst.IsSubscribedToHeartbeats() {
controlTopic := fmt.Sprintf("%s/%s", params.networkID, "control")
logger.Info("joining the control topic", zap.String("topic", controlTopic))
controlPubsubTopic, err = ps.Join(controlTopic)
Expand All @@ -376,7 +376,7 @@ func Run(params *RunParams) func(ctx context.Context) error {
}
}()

if params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil {
if params.obsvReqRecvC != nil || params.signedGovCfgRecvC != nil || params.signedGovStatusRecvC != nil || params.gst.IsSubscribedToHeartbeats() {
logger.Info("subscribing to the control topic", zap.String("topic", controlTopic))
controlSubscription, err = controlPubsubTopic.Subscribe(pubsub.WithBufferSize(P2P_SUBSCRIPTION_BUFFER_SIZE))
if err != nil {
Expand Down

0 comments on commit daf8d3a

Please sign in to comment.