diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 3895f52312..b3354c7c28 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -266,14 +266,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = let msg_hash = topic.computeMessageHash(msg).to0xHex() - - notice "waku.relay received", - my_peer_id = node.peerId, - pubsubTopic = topic, - msg_hash = msg_hash, - receivedTime = getNowInNanosecondTime(), - payloadSizeBytes = msg.payload.len - let msgSizeKB = msg.payload.len / 1000 waku_node_messages.inc(labelValues = ["relay"]) diff --git a/waku/waku_api/rest/relay/handlers.nim b/waku/waku_api/rest/relay/handlers.nim index bbe9d94802..cbd65d653d 100644 --- a/waku/waku_api/rest/relay/handlers.nim +++ b/waku/waku_api/rest/relay/handlers.nim @@ -161,6 +161,9 @@ proc installRelayApiHandlers*( (await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr: return RestApiResponse.badRequest("Failed to publish: " & error) + # Log for message tracking purposes + logMessageInfo(node.wakuRelay, "rest", pubsubTopic, "none", message, onRecv = true) + # if we reach here its either a non-RLN message or a RLN message with a valid proof debug "Publishing message", pubSubTopic = pubSubTopic, rln = not node.wakuRlnRelay.isNil() @@ -272,6 +275,9 @@ proc installRelayApiHandlers*( (await node.wakuRelay.validateMessage(pubsubTopic, message)).isOkOr: return RestApiResponse.badRequest("Failed to publish: " & error) + # Log for message tracking purposes + logMessageInfo(node.wakuRelay, "rest", pubsubTopic, "none", message, onRecv = true) + # if we reach here its either a non-RLN message or a RLN message with a valid proof debug "Publishing message", contentTopic = message.contentTopic, rln = not node.wakuRlnRelay.isNil() diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index b922f69d3b..7c6c58351f 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -151,7 +151,36 @@ proc initProtocolHandler(w: WakuRelay) = w.handler = handler w.codec = WakuRelayCodec -proc initRelayMetricObserver(w: WakuRelay) = +proc logMessageInfo*( + w: WakuRelay, + remotePeerId: string, + topic: string, + msg_id_short: string, + msg: WakuMessage, + onRecv: bool, +) = + let msg_hash = computeMessageHash(topic, msg).to0xHex() + + if onRecv: + notice "received relay message", + my_peer_id = w.switch.peerInfo.peerId, + msg_hash = msg_hash, + msg_id = msg_id_short, + from_peer_id = remotePeerId, + topic = topic, + receivedTime = getNowInNanosecondTime(), + payloadSizeBytes = msg.payload.len + else: + notice "sent relay message", + my_peer_id = w.switch.peerInfo.peerId, + msg_hash = msg_hash, + msg_id = msg_id_short, + to_peer_id = remotePeerId, + topic = topic, + sentTime = getNowInNanosecondTime(), + payloadSizeBytes = msg.payload.len + +proc initRelayObservers(w: WakuRelay) = proc decodeRpcMessageInfo( peer: PubSubPeer, msg: Message ): Result[ @@ -179,20 +208,6 @@ proc initRelayMetricObserver(w: WakuRelay) = let msgSize = msg.data.len + msg.topic.len return ok((msg_id_short, msg.topic, wakuMessage, msgSize)) - proc logMessageInfo( - peer: PubSubPeer, topic: string, msg_id_short: string, msg: WakuMessage - ) = - let msg_hash = computeMessageHash(topic, msg).to0xHex() - - notice "sent relay message", - my_peer_id = w.switch.peerInfo.peerId, - msg_hash = msg_hash, - msg_id = msg_id_short, - to_peer_id = peer.peerId, - topic = topic, - sentTime = getNowInNanosecondTime(), - payloadSizeBytes = msg.payload.len - proc updateMetrics( peer: PubSubPeer, pubsub_topic: string, @@ -208,18 +223,38 @@ proc initRelayMetricObserver(w: WakuRelay) = for msg in msgs.messages: let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr: continue - # message receive log happens in treaceHandler as this one is called before checks + # message receive log happens in onValidated observer as onRecv is called before checks updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = true) discard + proc onValidated(peer: PubSubPeer, msg: Message, msgId: MessageId) = + let msg_id_short = shortLog(msgId) + let wakuMessage = WakuMessage.decode(msg.data).valueOr: + warn "onValidated: failed decoding to Waku Message", + my_peer_id = w.switch.peerInfo.peerId, + msg_id = msg_id_short, + from_peer_id = peer.peerId, + pubsub_topic = msg.topic, + error = $error + return + + logMessageInfo( + w, shortLog(peer.peerId), msg.topic, msg_id_short, wakuMessage, onRecv = true + ) + proc onSend(peer: PubSubPeer, msgs: var RPCMsg) = for msg in msgs.messages: let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr: + warn "onSend: failed decoding RPC info", + my_peer_id = w.switch.peerInfo.peerId, to_peer_id = peer.peerId continue - logMessageInfo(peer, topic, msg_id_short, wakuMessage) + logMessageInfo( + w, shortLog(peer.peerId), topic, msg_id_short, wakuMessage, onRecv = false + ) updateMetrics(peer, topic, wakuMessage, msgSize, onRecv = false) - let administrativeObserver = PubSubObserver(onRecv: onRecv, onSend: onSend) + let administrativeObserver = + PubSubObserver(onRecv: onRecv, onSend: onSend, onValidated: onValidated) w.addObserver(administrativeObserver) @@ -243,7 +278,7 @@ proc new*( procCall GossipSub(w).initPubSub() w.initProtocolHandler() - w.initRelayMetricObserver() + w.initRelayObservers() except InitializationError: return err("initialization error: " & getCurrentExceptionMsg())