Skip to content

Commit

Permalink
logging received message info via onValidated observer
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer committed Aug 14, 2024
1 parent 225b5e7 commit 963d8f7
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 27 deletions.
8 changes: 0 additions & 8 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) =

proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} =
let msg_hash = topic.computeMessageHash(msg).to0xHex()

notice "waku.relay received",
my_peer_id = node.peerId,
pubsubTopic = topic,
msg_hash = msg_hash,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len

let msgSizeKB = msg.payload.len / 1000

waku_node_messages.inc(labelValues = ["relay"])
Expand Down
6 changes: 6 additions & 0 deletions waku/waku_api/rest/relay/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ proc installRelayApiHandlers*(
(await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr:
return RestApiResponse.badRequest("Failed to publish: " & error)

# Log for message tracking purposes
logMessageInfo(node.wakuRelay, "rest", pubsubTopic, "none", message, onRecv = true)

# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message",
pubSubTopic = pubSubTopic, rln = not node.wakuRlnRelay.isNil()
Expand Down Expand Up @@ -272,6 +275,9 @@ proc installRelayApiHandlers*(
(await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr:
return RestApiResponse.badRequest("Failed to publish: " & error)

# Log for message tracking purposes
logMessageInfo(node.wakuRelay, "rest", pubsubTopic, "none", message, onRecv = true)

# if we reach here its either a non-RLN message or a RLN message with a valid proof
debug "Publishing message",
contentTopic = message.contentTopic, rln = not node.wakuRlnRelay.isNil()
Expand Down
73 changes: 54 additions & 19 deletions waku/waku_relay/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,36 @@ proc initProtocolHandler(w: WakuRelay) =
w.handler = handler
w.codec = WakuRelayCodec

proc initRelayMetricObserver(w: WakuRelay) =
proc logMessageInfo*(
w: WakuRelay,
peerId: string,
topic: string,
msg_id_short: string,
msg: WakuMessage,
onRecv: bool,
) =
let msg_hash = computeMessageHash(topic, msg).to0xHex()

if onRecv:
notice "received relay message",
my_peer_id = w.switch.peerInfo.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
from_peer_id = peerId,
topic = topic,
receivedTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len
else:
notice "sent relay message",
my_peer_id = w.switch.peerInfo.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = peerId,
topic = topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len

proc initRelayObservers(w: WakuRelay) =
proc decodeRpcMessageInfo(
peer: PubSubPeer, msg: Message
): Result[
Expand Down Expand Up @@ -179,20 +208,6 @@ proc initRelayMetricObserver(w: WakuRelay) =
let msgSize = msg.data.len + msg.topic.len
return ok((msg_id_short, msg.topic, wakuMessage, msgSize))

proc logMessageInfo(
peer: PubSubPeer, topic: string, msg_id_short: string, msg: WakuMessage
) =
let msg_hash = computeMessageHash(topic, msg).to0xHex()

notice "sent relay message",
my_peer_id = w.switch.peerInfo.peerId,
msg_hash = msg_hash,
msg_id = msg_id_short,
to_peer_id = peer.peerId,
topic = topic,
sentTime = getNowInNanosecondTime(),
payloadSizeBytes = msg.payload.len

proc updateMetrics(
peer: PubSubPeer,
pubsub_topic: string,
Expand All @@ -208,18 +223,38 @@ proc initRelayMetricObserver(w: WakuRelay) =
for msg in msgs.messages:
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
continue
# message receive log happens in treaceHandler as this one is called before checks
# message receive log happens in onValidated observer as onRecv is called before checks
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = true)
discard

proc onValidated(peer: PubSubPeer, msg: Message, msgId: MessageId) =
let msg_id_short = shortLog(msgId)
let wakuMessage = WakuMessage.decode(msg.data).valueOr:
warn "onValidated: failed decoding to Waku Message",
my_peer_id = w.switch.peerInfo.peerId,
msg_id = msg_id_short,
from_peer_id = peer.peerId,
pubsub_topic = msg.topic,
error = $error
return

logMessageInfo(
w, shortLog(peer.peerId), msg.topic, msg_id_short, wakuMessage, onRecv = true
)

proc onSend(peer: PubSubPeer, msgs: var RPCMsg) =
for msg in msgs.messages:
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
warn "onSend: failed decoding RPC info",
my_peer_id = w.switch.peerInfo.peerId, to_peer_id = peer.peerId
continue
logMessageInfo(peer, topic, msg_id_short, wakuMessage)
logMessageInfo(
w, shortLog(peer.peerId), topic, msg_id_short, wakuMessage, onRecv = false
)
updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = false)

let administrativeObserver = PubSubObserver(onRecv: onRecv, onSend: onSend)
let administrativeObserver =
PubSubObserver(onRecv: onRecv, onSend: onSend, onValidated: onValidated)

w.addObserver(administrativeObserver)

Expand All @@ -243,7 +278,7 @@ proc new*(

procCall GossipSub(w).initPubSub()
w.initProtocolHandler()
w.initRelayMetricObserver()
w.initRelayObservers()
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())

Expand Down

0 comments on commit 963d8f7

Please sign in to comment.