Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: adding observers for message logging #2800

Merged
merged 11 commits into from
Jun 13, 2024
74 changes: 60 additions & 14 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
if node.wakuRelay.isSubscribed(topic):
return

proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
notice "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
msg_hash = topic.computeMessageHash(msg).to0xHex(),
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len

let msgSizeKB = msg.payload.len / 1000

waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)

proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
if node.wakuFilter.isNil():
return
Expand All @@ -252,7 +239,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =
let defaultHandler = proc(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
await traceHandler(topic, msg)
await filterHandler(topic, msg)
await archiveHandler(topic, msg)

Expand Down Expand Up @@ -389,6 +375,61 @@ proc startRelay*(node: WakuNode) {.async.} =

info "relay started successfully"

proc generateRelayObserver(node: WakuNode): PubSubObserver =
proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) =
for msg in msgs.messages:
let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr:
warn "Error generating message id",
my_peer_id = node.peerId,
from_peer_id = peer.peerId,
topic = msg.topic,
error = $error
continue

let msg_id_short = shortLog(msg_id)

let wakuMessage = WakuMessage.decode(msg.data).valueOr:
warn "Error decoding to Waku Message",
my_peer_id = node.peerId,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
topic = msg.topic,
error = $error
continue

let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex()

if onRecv:
notice "received relay message",
my_peer_id = node.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
topic = msg.topic,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = wakuMessage.payload.len

let msgSizeKB = wakuMessage.payload.len / 1000
waku_node_messages.inc(labelValues = ["relay"])
waku_histogram_message_size.observe(msgSizeKB)
else:
notice "sent relay message",
my_peer_id = node.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = peer.peerId,
topic = msg.topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = wakuMessage.payload.len

proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
logMessageInfo(peer, msgs, onRecv = true)

proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
discard

return PubSubObserver(onRecv: onRecv, onSend: onSend)

proc mountRelay*(
node: WakuNode,
pubsubTopics: seq[string] = @[],
Expand All @@ -409,6 +450,11 @@ proc mountRelay*(

node.wakuRelay = initRes.value

# register relay observers for logging
debug "Registering Relay observers"
let observerLogger = node.generateRelayObserver()
node.wakuRelay.addObserver(observerLogger)

## Add peer exchange handler
if peerExchangeHandler.isSome():
node.wakuRelay.parameters.enablePX = true
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ proc addValidator*(
) {.gcsafe.} =
w.wakuValidators.add((handler, errorMessage))

proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} =
procCall GossipSub(w).addObserver(observer)

method start*(w: WakuRelay) {.async, base.} =
debug "start"
await procCall GossipSub(w).start()
Expand Down
Loading