diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 31f571e8af..8d1ff8d790 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -225,6 +225,21 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = if node.wakuRelay.isSubscribed(topic): return + proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + let msg_hash = topic.computeMessageHash(msg).to0xHex() + + notice "waku.relay received", + my_peer_id = node.peerId, + pubsubTopic = topic, + msg_hash = msg_hash, + receivedTime = getNowInNanosecondTime(), + payloadSizeBytes = msg.payload.len + + let msgSizeKB = msg.payload.len / 1000 + + waku_node_messages.inc(labelValues = ["relay"]) + waku_histogram_message_size.observe(msgSizeKB) + proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = if node.wakuFilter.isNil(): return @@ -240,6 +255,7 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = let defaultHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = + await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) @@ -376,61 +392,6 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" -proc generateRelayObserver(node: WakuNode): PubSubObserver = - proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) = - for msg in msgs.messages: - let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr: - warn "Error generating message id", - my_peer_id = node.peerId, - from_peer_id = peer.peerId, - topic = msg.topic, - error = $error - continue - - let msg_id_short = shortLog(msg_id) - - let wakuMessage = WakuMessage.decode(msg.data).valueOr: - warn "Error decoding to Waku Message", - my_peer_id = node.peerId, - msg_id = msg_id_short, - from_peer_id = peer.peerId, - topic = msg.topic, - error = $error - continue - - let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex() - - if onRecv: - notice "received relay message", - my_peer_id = node.peerId, - msg_hash = msg_hash, - msg_id = msg_id_short, - from_peer_id = peer.peerId, - topic = msg.topic, - receivedTime = getNowInNanosecondTime(), - payloadSizeBytes = wakuMessage.payload.len - - let msgSizeKB = wakuMessage.payload.len / 1000 - waku_node_messages.inc(labelValues = ["relay"]) - waku_histogram_message_size.observe(msgSizeKB) - else: - notice "sent relay message", - my_peer_id = node.peerId, - msg_hash = msg_hash, - msg_id = msg_id_short, - to_peer_id = peer.peerId, - topic = msg.topic, - sentTime = getNowInNanosecondTime(), - payloadSizeBytes = wakuMessage.payload.len - - proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = - logMessageInfo(peer, msgs, onRecv = true) - - proc onSend(peer: PubSubPeer, msgs: var RPCMsg) = - discard - - return PubSubObserver(onRecv: onRecv, onSend: onSend) - proc mountRelay*( node: WakuNode, pubsubTopics: seq[string] = @[], @@ -451,11 +412,6 @@ proc mountRelay*( node.wakuRelay = initRes.value - # register relay observers for logging - debug "Registering Relay observers" - let observerLogger = node.generateRelayObserver() - node.wakuRelay.addObserver(observerLogger) - ## Add peer exchange handler if peerExchangeHandler.isSome(): node.wakuRelay.parameters.enablePX = true @@ -977,14 +933,14 @@ proc mountLightPush*( node: WakuNode, rateLimit: RateLimitSetting = DefaultGlobalNonRelayRateLimit ) {.async.} = info "mounting light push" - - var pushHandler = + + var pushHandler = if node.wakuRelay.isNil: debug "mounting lightpush without relay (nil)" getNilPushHandler() else: debug "mounting lightpush with relay" - let rlnPeer = + let rlnPeer = if isNil(node.wakuRlnRelay): debug "mounting lightpush without rln-relay" none(WakuRLNRelay) diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index ca59a68995..03d5b596ec 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -180,9 +180,6 @@ proc addValidator*( ) {.gcsafe.} = w.wakuValidators.add((handler, errorMessage)) -proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = - procCall GossipSub(w).addObserver(observer) - method start*(w: WakuRelay) {.async, base.} = debug "start" await procCall GossipSub(w).start()