Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: adding observers for message logging #2800

Merged
merged 11 commits into from
Jun 13, 2024
46 changes: 46 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,47 @@ proc startRelay*(node: WakuNode) {.async.} =

info "relay started successfully"

proc generateRelayObserver(w: WakuRelay): PubSubObserver =
proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) =
for msg in msgs.messages:
let msg_id = w.msgIdProvider(msg).valueOr:
warn "Error generating message id",
from_peer = peer.peerId, topic = msg.topic, error = $error
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
continue

let msg_id_short = shortLog(msg_id)

let wakuMessage = WakuMessage.decode(msg.data).valueOr:
warn "Error decoding to Waku Message",
msg_id = msg_id_short,
from_peer = peer.peerId,
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
topic = msg.topic,
error = $error
continue

let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex()

if onRecv:
info "received message",
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
msg_hash = msg_hash,
msg_id = msg_id_short,
from_peer = peer.peerId,
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
topic = msg.topic
else:
info "sent message",
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer = peer.peerId,
gabrielmer marked this conversation as resolved.
Show resolved Hide resolved
topic = msg.topic

proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
logMessageInfo(peer, msgs, onRecv = true)

proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
logMessageInfo(peer, msgs, onRecv = false)

return PubSubObserver(onRecv: onRecv, onSend: onSend)

proc mountRelay*(
node: WakuNode,
pubsubTopics: seq[string] = @[],
Expand All @@ -409,6 +450,11 @@ proc mountRelay*(

node.wakuRelay = initRes.value

# register relay observers for logging
debug "Registering Relay observers"
let observerLogger = node.wakuRelay.generateRelayObserver()
node.wakuRelay.addObserver(observerLogger)

## Add peer exchange handler
if peerExchangeHandler.isSome():
node.wakuRelay.parameters.enablePX = true
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ proc addValidator*(
) {.gcsafe.} =
w.wakuValidators.add((handler, errorMessage))

proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
procCall GossipSub(w).addObserver(observer)

method start*(w: WakuRelay) {.async, base.} =
debug "start"
await procCall GossipSub(w).start()
Expand Down
Loading