Skip to content

Commit

Permalink
chore: adding observers for message logging (#2800)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer authored Jun 13, 2024
1 parent 15d578a commit b522865
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 14 deletions.
74 changes: 60 additions & 14 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
if node.wakuRelay.isSubscribed(topic):
return

proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
notice "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
msg_hash = topic.computeMessageHash(msg).to0xHex(),
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len

let msgSizeKB = msg.payload.len / 1000

waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)

proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuFilter.isNil():
return
Expand All @@ -252,7 +239,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
let defaultHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)

Expand Down Expand Up @@ -389,6 +375,61 @@ proc startRelay*(node: WakuNode) {.async.} =

info "relay started successfully"

proc generateRelayObserver(node: WakuNode): PubSubObserver =
proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) =
for msg in msgs.messages:
let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr:
warn "Error generating message id",
my_peer_id = node.peerId,
from_peer_id = peer.peerId,
topic = msg.topic,
error = $error
continue

let msg_id_short = shortLog(msg_id)

let wakuMessage = WakuMessage.decode(msg.data).valueOr:
warn "Error decoding to Waku Message",
my_peer_id = node.peerId,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
topic = msg.topic,
error = $error
continue

let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex()

if onRecv:
notice "received relay message",
my_peer_id = node.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
topic = msg.topic,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = wakuMessage.payload.len

let msgSizeKB = wakuMessage.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)
else:
notice "sent relay message",
my_peer_id = node.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = peer.peerId,
topic = msg.topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = wakuMessage.payload.len

proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
logMessageInfo(peer, msgs, onRecv = true)

proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
discard

return PubSubObserver(onRecv: onRecv, onSend: onSend)

proc mountRelay*(
node: WakuNode,
pubsubTopics: seq[string] = @[],
Expand All @@ -409,6 +450,11 @@ proc mountRelay*(

node.wakuRelay = initRes.value

# register relay observers for logging
debug "Registering Relay observers"
let observerLogger = node.generateRelayObserver()
node.wakuRelay.addObserver(observerLogger)

## Add peer exchange handler
if peerExchangeHandler.isSome():
node.wakuRelay.parameters.enablePX = true
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ proc addValidator*(
) {.gcsafe.} =
w.wakuValidators.add((handler, errorMessage))

proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
procCall GossipSub(w).addObserver(observer)

method start*(w: WakuRelay) {.async, base.} =
debug "start"
await procCall GossipSub(w).start()
Expand Down

0 comments on commit b522865

Please sign in to comment.