diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 77cba5567b..3e28a7607e 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -224,19 +224,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = if node.wakuRelay.isSubscribed(topic): return - proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - notice "waku.relay received", - my_peer_id = node.peerId, - pubsubTopic = topic, - msg_hash = topic.computeMessageHash(msg).to0xHex(), - receivedTime = getNowInNanosecondTime(), - payloadSizeBytes = msg.payload.len - - let msgSizeKB = msg.payload.len / 1000 - - waku_node_messages.inc(labelValues = ["relay"]) - waku_histogram_message_size.observe(msgSizeKB) - proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = if node.wakuFilter.isNil(): return @@ -252,7 +239,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = let defaultHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = - await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) @@ -389,6 +375,61 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" +proc generateRelayObserver(node: WakuNode): PubSubObserver = + proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) = + for msg in msgs.messages: + let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr: + warn "Error generating message id", + my_peer_id = node.peerId, + from_peer_id = peer.peerId, + topic = msg.topic, + error = $error + continue + + let msg_id_short = shortLog(msg_id) + + let wakuMessage = WakuMessage.decode(msg.data).valueOr: + warn "Error decoding to Waku Message", + my_peer_id = node.peerId, + msg_id = msg_id_short, + from_peer_id = peer.peerId, + topic = msg.topic, + error = $error + continue + + let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex() + + if onRecv: + notice "received relay message", + my_peer_id = node.peerId, + msg_hash = msg_hash, + msg_id = msg_id_short, + from_peer_id = peer.peerId, + topic = msg.topic, + receivedTime = getNowInNanosecondTime(), + payloadSizeBytes = wakuMessage.payload.len + + let msgSizeKB = wakuMessage.payload.len / 1000 + waku_node_messages.inc(labelValues = ["relay"]) + waku_histogram_message_size.observe(msgSizeKB) + else: + notice "sent relay message", + my_peer_id = node.peerId, + msg_hash = msg_hash, + msg_id = msg_id_short, + to_peer_id = peer.peerId, + topic = msg.topic, + sentTime = getNowInNanosecondTime(), + payloadSizeBytes = wakuMessage.payload.len + + proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = + logMessageInfo(peer, msgs, onRecv = true) + + proc onSend(peer: PubSubPeer, msgs: var RPCMsg) = + discard + + return PubSubObserver(onRecv: onRecv, onSend: onSend) + proc mountRelay*( node: WakuNode, pubsubTopics: seq[string] = @[], @@ -409,6 +450,11 @@ proc mountRelay*( node.wakuRelay = initRes.value + # register relay observers for logging + debug "Registering Relay observers" + let observerLogger = node.generateRelayObserver() + node.wakuRelay.addObserver(observerLogger) + ## Add peer exchange handler if peerExchangeHandler.isSome(): node.wakuRelay.parameters.enablePX = true diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 03d5b596ec..ca59a68995 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -180,6 +180,9 @@ proc addValidator*( ) {.gcsafe.} = w.wakuValidators.add((handler, errorMessage)) +proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = + procCall GossipSub(w).addObserver(observer) + method start*(w: WakuRelay) {.async, base.} = debug "start" await procCall GossipSub(w).start()