From b93f1ff4ff1d65e24fadf336040cb893d6bb00ff Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 11 Jun 2024 18:03:05 +0200 Subject: [PATCH 01/11] initial implementation of observer loggers --- waku/node/waku_node.nim | 14 ++++++++++++++ waku/waku_relay/protocol.nim | 3 +++ 2 files changed, 17 insertions(+) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 77cba5567b..7c5a06b3f8 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -389,6 +389,15 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" +proc generateRelayObserver(): PubSubObserver = + proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} = + echo "-------------- Received message --------------" + + proc onSend(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} = + echo "-------------- Sent message --------------" + + return PubSubObserver(onRecv: onRecv, onSend: onSend) + proc mountRelay*( node: WakuNode, pubsubTopics: seq[string] = @[], @@ -409,6 +418,11 @@ proc mountRelay*( node.wakuRelay = initRes.value + # register rln validator as default validator + debug "Registering Relay observers" + let observerLogger = generateRelayObserver() + node.wakuRelay.addObserver(observerLogger) + ## Add peer exchange handler if peerExchangeHandler.isSome(): node.wakuRelay.parameters.enablePX = true diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 03d5b596ec..ca59a68995 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -180,6 +180,9 @@ proc addValidator*( ) {.gcsafe.} = w.wakuValidators.add((handler, errorMessage)) +proc addObserver*(w: WakuRelay, observer: PubSubObserver) {.gcsafe.} = + procCall GossipSub(w).addObserver(observer) + method start*(w: WakuRelay) {.async, base.} = debug "start" await procCall GossipSub(w).start() From ed6a75a9d605c5786136e8c4e7c81a7f172058dd Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 11 Jun 2024 19:15:29 +0200 Subject: [PATCH 02/11] logging more info --- waku/node/waku_node.nim | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 7c5a06b3f8..1656c0f3f8 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -389,9 +389,31 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" -proc generateRelayObserver(): PubSubObserver = - proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} = - echo "-------------- Received message --------------" +proc generateRelayObserver(w: WakuRelay): PubSubObserver = + proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = + for msg in msgs.messages: + let msg_id = w.msgIdProvider(msg).valueOr: + warn "Error generating message id", + from_peer = msg.fromPeer, topic = msg.topic, peer_id = peer.peerId + continue + + let wakuMessage = WakuMessage.decode(msg.data).valueOr: + error "Error decoding to Waku Message", + msg_id = msg_id, + from_peer = msg.fromPeer, + topic = msg.topic, + peer_id = peer.peerId, + error = $error + continue + + let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex() + + info "received message", + msg_hash = msg_hash, + msg_id = msg_id, + from_peer = msg.fromPeer, + topic = msg.topic, + peer_id = peer.peerId proc onSend(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} = echo "-------------- Sent message --------------" @@ -420,7 +442,7 @@ proc mountRelay*( # register rln validator as default validator debug "Registering Relay observers" - let observerLogger = generateRelayObserver() + let observerLogger = node.wakuRelay.generateRelayObserver() node.wakuRelay.addObserver(observerLogger) ## Add peer exchange handler From 358d7573b8710d9e1da381d2384c8aaa3ef585c7 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 11 Jun 2024 19:18:59 +0200 Subject: [PATCH 03/11] error improvements --- waku/node/waku_node.nim | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 1656c0f3f8..e0c673792e 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -394,11 +394,14 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = for msg in msgs.messages: let msg_id = w.msgIdProvider(msg).valueOr: warn "Error generating message id", - from_peer = msg.fromPeer, topic = msg.topic, peer_id = peer.peerId + from_peer = msg.fromPeer, + topic = msg.topic, + peer_id = peer.peerId, + error = $error continue let wakuMessage = WakuMessage.decode(msg.data).valueOr: - error "Error decoding to Waku Message", + warn "Error decoding to Waku Message", msg_id = msg_id, from_peer = msg.fromPeer, topic = msg.topic, From eca230a7e233317443747d57b81f7f192bf2619a Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 11 Jun 2024 19:30:34 +0200 Subject: [PATCH 04/11] using msg_id shortlog --- waku/node/waku_node.nim | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e0c673792e..17c067ffe8 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -400,9 +400,11 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = error = $error continue + let msg_id_short = shortLog(msg_id) + let wakuMessage = WakuMessage.decode(msg.data).valueOr: warn "Error decoding to Waku Message", - msg_id = msg_id, + msg_id = msg_id_short, from_peer = msg.fromPeer, topic = msg.topic, peer_id = peer.peerId, @@ -413,7 +415,7 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = info "received message", msg_hash = msg_hash, - msg_id = msg_id, + msg_id = msg_id_short, from_peer = msg.fromPeer, topic = msg.topic, peer_id = peer.peerId From a12b3a48b15c4b8b5bf7098b9edcaab01b6e7048 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 11 Jun 2024 19:52:24 +0200 Subject: [PATCH 05/11] removing msg_fromPeer reference (always empty) --- waku/node/waku_node.nim | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 17c067ffe8..290ede0481 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -394,10 +394,7 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = for msg in msgs.messages: let msg_id = w.msgIdProvider(msg).valueOr: warn "Error generating message id", - from_peer = msg.fromPeer, - topic = msg.topic, - peer_id = peer.peerId, - error = $error + from_peer = peer.peerId, topic = msg.topic, error = $error continue let msg_id_short = shortLog(msg_id) @@ -405,9 +402,8 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = let wakuMessage = WakuMessage.decode(msg.data).valueOr: warn "Error decoding to Waku Message", msg_id = msg_id_short, - from_peer = msg.fromPeer, + from_peer = peer.peerId, topic = msg.topic, - peer_id = peer.peerId, error = $error continue @@ -416,9 +412,8 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = info "received message", msg_hash = msg_hash, msg_id = msg_id_short, - from_peer = msg.fromPeer, - topic = msg.topic, - peer_id = peer.peerId + from_peer = peer.peerId, + topic = msg.topic proc onSend(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} = echo "-------------- Sent message --------------" From 73dc27b4109fc9b7df783469491ecf50a18b1198 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 12 Jun 2024 10:17:26 +0200 Subject: [PATCH 06/11] refactor and adding logs when messages are sent --- waku/node/waku_node.nim | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 290ede0481..0a56093da3 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -390,7 +390,7 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" proc generateRelayObserver(w: WakuRelay): PubSubObserver = - proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = + proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) = for msg in msgs.messages: let msg_id = w.msgIdProvider(msg).valueOr: warn "Error generating message id", @@ -409,14 +409,24 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex() - info "received message", - msg_hash = msg_hash, - msg_id = msg_id_short, - from_peer = peer.peerId, - topic = msg.topic + if onRecv: + info "received message", + msg_hash = msg_hash, + msg_id = msg_id_short, + from_peer = peer.peerId, + topic = msg.topic + else: + info "sent message", + msg_hash = msg_hash, + msg_id = msg_id_short, + from_peer = peer.peerId, + topic = msg.topic + + proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = + logMessageInfo(peer, msgs, onRecv = true) - proc onSend(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} = - echo "-------------- Sent message --------------" + proc onSend(peer: PubSubPeer, msgs: var RPCMsg) = + logMessageInfo(peer, msgs, onRecv = false) return PubSubObserver(onRecv: onRecv, onSend: onSend) From b48e3e33eaf14c2e09b985ffe18dda21931647b9 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 12 Jun 2024 10:21:09 +0200 Subject: [PATCH 07/11] fixing log --- waku/node/waku_node.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 0a56093da3..c74f8de5a0 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -419,7 +419,7 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = info "sent message", msg_hash = msg_hash, msg_id = msg_id_short, - from_peer = peer.peerId, + to_peer = peer.peerId, topic = msg.topic proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = From 399758c18d8828766fead51fea4b9fdbfcb787c7 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 13 Jun 2024 12:18:34 +0200 Subject: [PATCH 08/11] fix comment --- waku/node/waku_node.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index c74f8de5a0..66110c7415 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -450,7 +450,7 @@ proc mountRelay*( node.wakuRelay = initRes.value - # register rln validator as default validator + # register relay observers for logging debug "Registering Relay observers" let observerLogger = node.wakuRelay.generateRelayObserver() node.wakuRelay.addObserver(observerLogger) From 91295408a21bb72108e9bb8d76291ecbe3ddaaa8 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 13 Jun 2024 14:18:40 +0200 Subject: [PATCH 09/11] improved logging --- waku/node/waku_node.nim | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 66110c7415..d9d55be565 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -394,7 +394,7 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = for msg in msgs.messages: let msg_id = w.msgIdProvider(msg).valueOr: warn "Error generating message id", - from_peer = peer.peerId, topic = msg.topic, error = $error + from_peer_id = peer.peerId, topic = msg.topic, error = $error continue let msg_id_short = shortLog(msg_id) @@ -402,7 +402,7 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = let wakuMessage = WakuMessage.decode(msg.data).valueOr: warn "Error decoding to Waku Message", msg_id = msg_id_short, - from_peer = peer.peerId, + from_peer_id = peer.peerId, topic = msg.topic, error = $error continue @@ -410,16 +410,16 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = let msg_hash = computeMessageHash(msg.topic, wakuMessage).to0xHex() if onRecv: - info "received message", + notice "received relay message", msg_hash = msg_hash, msg_id = msg_id_short, - from_peer = peer.peerId, + from_peer_id = peer.peerId, topic = msg.topic else: - info "sent message", + notice "sent relay message", msg_hash = msg_hash, msg_id = msg_id_short, - to_peer = peer.peerId, + to_peer_id = peer.peerId, topic = msg.topic proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = From 4932c4da0cc731fb75a77cda3d0de68d0c78edef Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 13 Jun 2024 14:28:54 +0200 Subject: [PATCH 10/11] replacin traceHandler --- waku/node/waku_node.nim | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index d9d55be565..8e88b9880d 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -224,19 +224,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = if node.wakuRelay.isSubscribed(topic): return - proc traceHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - notice "waku.relay received", - my_peer_id = node.peerId, - pubsubTopic = topic, - msg_hash = topic.computeMessageHash(msg).to0xHex(), - receivedTime = getNowInNanosecondTime(), - payloadSizeBytes = msg.payload.len - - let msgSizeKB = msg.payload.len / 1000 - - waku_node_messages.inc(labelValues = ["relay"]) - waku_histogram_message_size.observe(msgSizeKB) - proc filterHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = if node.wakuFilter.isNil(): return @@ -252,7 +239,6 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = let defaultHandler = proc( topic: PubsubTopic, msg: WakuMessage ): Future[void] {.async, gcsafe.} = - await traceHandler(topic, msg) await filterHandler(topic, msg) await archiveHandler(topic, msg) @@ -414,13 +400,21 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = msg_hash = msg_hash, msg_id = msg_id_short, from_peer_id = peer.peerId, - topic = msg.topic + topic = msg.topic, + receivedTime = getNowInNanosecondTime(), + payloadSizeBytes = wakuMessage.payload.len + + let msgSizeKB = wakuMessage.payload.len / 1000 + waku_node_messages.inc(labelValues = ["relay"]) + waku_histogram_message_size.observe(msgSizeKB) else: notice "sent relay message", msg_hash = msg_hash, msg_id = msg_id_short, to_peer_id = peer.peerId, - topic = msg.topic + topic = msg.topic, + sentTime = getNowInNanosecondTime(), + payloadSizeBytes = wakuMessage.payload.len proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = logMessageInfo(peer, msgs, onRecv = true) From 14ade799b592cce06bf37b3269baa2eda4ca16fd Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 13 Jun 2024 15:03:23 +0200 Subject: [PATCH 11/11] adding my_peer_id and not logging on sent message --- waku/node/waku_node.nim | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 8e88b9880d..3e28a7607e 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -375,18 +375,22 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" -proc generateRelayObserver(w: WakuRelay): PubSubObserver = +proc generateRelayObserver(node: WakuNode): PubSubObserver = proc logMessageInfo(peer: PubSubPeer, msgs: var RPCMsg, onRecv: bool) = for msg in msgs.messages: - let msg_id = w.msgIdProvider(msg).valueOr: + let msg_id = node.wakuRelay.msgIdProvider(msg).valueOr: warn "Error generating message id", - from_peer_id = peer.peerId, topic = msg.topic, error = $error + my_peer_id = node.peerId, + from_peer_id = peer.peerId, + topic = msg.topic, + error = $error continue let msg_id_short = shortLog(msg_id) let wakuMessage = WakuMessage.decode(msg.data).valueOr: warn "Error decoding to Waku Message", + my_peer_id = node.peerId, msg_id = msg_id_short, from_peer_id = peer.peerId, topic = msg.topic, @@ -397,6 +401,7 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = if onRecv: notice "received relay message", + my_peer_id = node.peerId, msg_hash = msg_hash, msg_id = msg_id_short, from_peer_id = peer.peerId, @@ -409,6 +414,7 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = waku_histogram_message_size.observe(msgSizeKB) else: notice "sent relay message", + my_peer_id = node.peerId, msg_hash = msg_hash, msg_id = msg_id_short, to_peer_id = peer.peerId, @@ -420,7 +426,7 @@ proc generateRelayObserver(w: WakuRelay): PubSubObserver = logMessageInfo(peer, msgs, onRecv = true) proc onSend(peer: PubSubPeer, msgs: var RPCMsg) = - logMessageInfo(peer, msgs, onRecv = false) + discard return PubSubObserver(onRecv: onRecv, onSend: onSend) @@ -446,7 +452,7 @@ proc mountRelay*( # register relay observers for logging debug "Registering Relay observers" - let observerLogger = node.wakuRelay.generateRelayObserver() + let observerLogger = node.generateRelayObserver() node.wakuRelay.addObserver(observerLogger) ## Add peer exchange handler