Skip to content

Commit

Permalink
feat: option to emit event to bus upon message sent
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Dec 3, 2024
1 parent 0c594b3 commit e42da3a
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
27 changes: 27 additions & 0 deletions waku/v2/api/publish/message_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
Expand Down Expand Up @@ -53,6 +55,12 @@ type MessageSender struct {
messageSentCheck ISentCheck
rateLimiter *PublishRateLimiter
logger *zap.Logger
evtMessageSent event.Emitter
}

type MessageSent struct {
Size uint32 // Size of payload in bytes
Timestamp int64
}

type Request struct {
Expand Down Expand Up @@ -96,6 +104,15 @@ func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *Mess
return ms
}

func (ms *MessageSender) WithMessageSentEmitter(host host.Host) *MessageSender {
evtMessageSent, err := host.EventBus().Emitter(new(MessageSent))
if err != nil {
ms.logger.Error("failed to create message sent emitter", zap.Error(err))
}
ms.evtMessageSent = evtMessageSent
return ms
}

func (ms *MessageSender) Send(req *Request) error {
logger := ms.logger.With(
zap.Stringer("envelopeHash", req.envelope.Hash()),
Expand Down Expand Up @@ -149,6 +166,16 @@ func (ms *MessageSender) Send(req *Request) error {
)
}

if ms.evtMessageSent != nil {
err := ms.evtMessageSent.Emit(MessageSent{
Size: uint32(len(req.envelope.Message().Payload)),
Timestamp: req.envelope.Message().GetTimestamp(),
})
if err != nil {
logger.Error("failed to emit message sent event", zap.Error(err))
}
}

return nil
}

Expand Down
44 changes: 44 additions & 0 deletions waku/v2/api/publish/message_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package publish
import (
"context"
"crypto/rand"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -129,3 +130,46 @@ func createRelayNode(t *testing.T) (host.Host, *relay.WakuRelay) {

return host, relay
}

func TestMessageSentEmitter(t *testing.T) {
host, relayNode := createRelayNode(t)
err := relayNode.Start(context.Background())
require.Nil(t, err)
defer relayNode.Stop()

_, err = relayNode.Subscribe(context.Background(), protocol.NewContentFilter("test-pubsub-topic"))
require.Nil(t, err)
publisher := NewDefaultPublisher(nil, relayNode)
sender, err := NewMessageSender(Relay, publisher, utils.Logger())
require.Nil(t, err)

check := &MockMessageSentCheck{Messages: make(map[string]map[common.Hash]uint32)}
sender.WithMessageSentCheck(check)
sender.WithMessageSentEmitter(host)

msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
Timestamp: utils.GetUnixEpoch(),
ContentTopic: "test-content-topic",
}
envelope := protocol.NewEnvelope(msg, *utils.GetUnixEpoch(), "test-pubsub-topic")
req := NewRequest(context.TODO(), envelope)

wg := sync.WaitGroup{}
wg.Add(1)
sub, err := host.EventBus().Subscribe(new(MessageSent))
require.Nil(t, err)
defer sub.Close()

go func() {
for msgSentEvt := range sub.Out() {
msgSent := msgSentEvt.(MessageSent)
require.Equal(t, uint32(len(msg.Payload)), msgSent.Size)
wg.Done()
}
}()

err = sender.Send(req)
require.Nil(t, err)
go wg.Wait()
}

0 comments on commit e42da3a

Please sign in to comment.