From e83fe7982f3e0610ddd8816ff3f26829d742b616 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Tue, 16 Jul 2024 16:27:28 +0300 Subject: [PATCH 01/10] chore: adding onRecvAndValidated observers --- libp2p/protocols/pubsub/gossipsub.nim | 3 +++ libp2p/protocols/pubsub/pubsubpeer.nim | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index 30e6163849..e3769ff8b1 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -453,6 +453,9 @@ proc validateAndRelay( g.rewardDelivered(peer, topic, true) + # trigger hooks + peer.recvAndValidatedObservers(msg, msgId) + # The send list typically matches the idontwant list from above, but # might differ if validation takes time var toSendPeers = HashSet[PubSubPeer]() diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 9dd00f66a7..cc4acaa1dc 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -67,6 +67,8 @@ type PubSubObserver* = ref object onRecv*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} onSend*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} + onRecvAndValidated*: + proc(peer: PubSubPeer, msg: Message, msgId: MessageId) {.gcsafe, raises: [].} PubSubPeerEventKind* {.pure.} = enum StreamOpened @@ -172,6 +174,13 @@ proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) = if not (isNil(obs)): # TODO: should never be nil, but... obs.onRecv(p, msg) +proc recvAndValidatedObservers*(p: PubSubPeer, msg: Message, msgId: MessageId) = + # trigger hooks + if not (isNil(p.observers)) and p.observers[].len > 0: + for obs in p.observers[]: + if not (isNil(obs)): # TODO: should never be nil, but... + obs.onRecvAndValidated(p, msg, msgId) + proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks if not (isNil(p.observers)) and p.observers[].len > 0: From 096026e5ad060e3010f3905503be64b2d1a11506 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Fri, 19 Jul 2024 13:17:47 +0300 Subject: [PATCH 02/10] add extra nil check and modify test --- libp2p/protocols/pubsub/pubsubpeer.nim | 9 ++++++--- tests/pubsub/testgossipsub.nim | 4 +++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index cc4acaa1dc..6be4cbc476 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -172,21 +172,24 @@ proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) = if not (isNil(p.observers)) and p.observers[].len > 0: for obs in p.observers[]: if not (isNil(obs)): # TODO: should never be nil, but... - obs.onRecv(p, msg) + if not (isNil(obs.onRecv)): + obs.onRecv(p, msg) proc recvAndValidatedObservers*(p: PubSubPeer, msg: Message, msgId: MessageId) = # trigger hooks if not (isNil(p.observers)) and p.observers[].len > 0: for obs in p.observers[]: if not (isNil(obs)): # TODO: should never be nil, but... - obs.onRecvAndValidated(p, msg, msgId) + if not (isNil(obs.onRecvAndValidated)): + obs.onRecvAndValidated(p, msg, msgId) proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks if not (isNil(p.observers)) and p.observers[].len > 0: for obs in p.observers[]: if not (isNil(obs)): # TODO: should never be nil, but... - obs.onSend(p, msg) + if not (isNil(obs.onSend)): + obs.onSend(p, msg) proc handle*(p: PubSubPeer, conn: Connection) {.async.} = debug "starting pubsub read loop", conn, peer = p, closed = conn.closed diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index caf41482c9..4e5763779a 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -357,6 +357,8 @@ suite "GossipSub": let obs1 = PubSubObserver( onRecv: proc(peer: PubSubPeer, msgs: var RPCMsg) = + inc observed, + onRecvAndValidated: proc(peer: PubSubPeer, msg: Message, msgId: MessageId) = inc observed ) obs2 = PubSubObserver( @@ -382,7 +384,7 @@ suite "GossipSub": await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop()) await allFuturesThrowing(nodesFut.concat()) - check observed == 2 + check observed == 3 asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic": var passed = newFuture[void]() From 363dfddf305a6ebe79366b3cc64e086bd3daef14 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Fri, 19 Jul 2024 13:27:09 +0300 Subject: [PATCH 03/10] fix test --- tests/pubsub/testgossipsub.nim | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 4e5763779a..251e603862 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -357,17 +357,20 @@ suite "GossipSub": let obs1 = PubSubObserver( onRecv: proc(peer: PubSubPeer, msgs: var RPCMsg) = - inc observed, - onRecvAndValidated: proc(peer: PubSubPeer, msg: Message, msgId: MessageId) = inc observed ) obs2 = PubSubObserver( + onRecvAndValidated: proc(peer: PubSubPeer, msg: Message, msgId: MessageId) = + inc observed + ) + obs3 = PubSubObserver( onSend: proc(peer: PubSubPeer, msgs: var RPCMsg) = inc observed ) nodes[1].addObserver(obs1) - nodes[0].addObserver(obs2) + nodes[1].addObserver(obs2) + nodes[0].addObserver(obs3) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 From 4fc5918a876bc0cf7183fd844131bcaffd9f3286 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 31 Jul 2024 12:00:59 +0300 Subject: [PATCH 04/10] renaming observer to onValidated --- libp2p/protocols/pubsub/pubsubpeer.nim | 6 +++--- tests/pubsub/testgossipsub.nim | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 6be4cbc476..6b0d5e5615 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -67,7 +67,7 @@ type PubSubObserver* = ref object onRecv*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} onSend*: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} - onRecvAndValidated*: + onValidated*: proc(peer: PubSubPeer, msg: Message, msgId: MessageId) {.gcsafe, raises: [].} PubSubPeerEventKind* {.pure.} = enum @@ -180,8 +180,8 @@ proc recvAndValidatedObservers*(p: PubSubPeer, msg: Message, msgId: MessageId) = if not (isNil(p.observers)) and p.observers[].len > 0: for obs in p.observers[]: if not (isNil(obs)): # TODO: should never be nil, but... - if not (isNil(obs.onRecvAndValidated)): - obs.onRecvAndValidated(p, msg, msgId) + if not (isNil(obs.onValidated)): + obs.onValidated(p, msg, msgId) proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 251e603862..2fb2ca3e5a 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -360,7 +360,7 @@ suite "GossipSub": inc observed ) obs2 = PubSubObserver( - onRecvAndValidated: proc(peer: PubSubPeer, msg: Message, msgId: MessageId) = + onValidated: proc(peer: PubSubPeer, msg: Message, msgId: MessageId) = inc observed ) obs3 = PubSubObserver( From 29c5f96be83058e0c176989b80e03cd7c2e03083 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 31 Jul 2024 15:25:25 +0300 Subject: [PATCH 05/10] resetting old test and creating a new observers one --- tests/pubsub/testgossipsub.nim | 78 +++++++++++++++++++++++++++++++--- 1 file changed, 71 insertions(+), 7 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 2fb2ca3e5a..9095a93a03 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -220,6 +220,75 @@ suite "GossipSub": await allFuturesThrowing(nodesFut.concat()) + asyncTest "GossipSub's observers should run after message is sent, received and validated": + var + handlerFut = newFuture[bool]() + recvCounter = 0 + sendCounter = 0 + validatedCounter = 0 + + proc handler(topic: string, data: seq[byte]) {.async.} = + check topic == "foo" + handlerFut.complete(true) + + proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = + inc recvCounter + + proc onSend(peer: PubSubPeer, msgs: var RPCMsg) = + inc sendCounter + + proc onValidated(peer: PubSubPeer, msgs: var RPCMsg) = + inc validatedCounter + + let obs0 = PubSubObserver(onSend: onSend) + let obs1 = PubSubObserver(onRecv: onRecv, onValidated: onValidated) + + let + nodes = generateNodes(2, gossip = true) + # start switches + nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start()) + + await subscribeNodes(nodes) + + nodes[0].addObserver(obs0) + nodes[1].addObserver(obs1) + nodes[1].subscribe("foo", handler) + nodes[1].subscribe("bar", handler) + + proc validator( + topic: string, message: Message + ): Future[ValidationResult] {.async.} = + result = if topic == "foo": ValidationResult.Accept else: ValidationResult.Reject + + nodes[1].addValidator("foo", "bar", validator) + + # Send message that will be accepted by the receiver's validator + tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1 + + check await handlerFut + + check: + recvCounter == 1 + validatedCounter == 1 + sendCounter == 1 + + # Reset future + handlerFut = newFuture[bool]() + + # Send message that will be rejected by the receiver's validator + tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1 + + check await handlerFut + + check: + recvCounter == 2 + validatedCounter == 1 + sendCounter == 2 + + await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop()) + + await allFuturesThrowing(nodesFut.concat()) + asyncTest "GossipSub unsub - resub faster than backoff": var handlerFut = newFuture[bool]() proc handler(topic: string, data: seq[byte]) {.async.} = @@ -360,17 +429,12 @@ suite "GossipSub": inc observed ) obs2 = PubSubObserver( - onValidated: proc(peer: PubSubPeer, msg: Message, msgId: MessageId) = - inc observed - ) - obs3 = PubSubObserver( onSend: proc(peer: PubSubPeer, msgs: var RPCMsg) = inc observed ) nodes[1].addObserver(obs1) - nodes[1].addObserver(obs2) - nodes[0].addObserver(obs3) + nodes[0].addObserver(obs2) tryPublish await nodes[0].publish("foobar", "Hello!".toBytes()), 1 @@ -387,7 +451,7 @@ suite "GossipSub": await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop()) await allFuturesThrowing(nodesFut.concat()) - check observed == 3 + check observed == 2 asyncTest "e2e - GossipSub send over fanout A -> B for subscribed topic": var passed = newFuture[void]() From bb0d0fb32d7923709d209c66d0cec3dcdcfb3999 Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 31 Jul 2024 15:42:24 +0300 Subject: [PATCH 06/10] fix compilation error --- tests/pubsub/testgossipsub.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 9095a93a03..5b87e97fb0 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -237,7 +237,7 @@ suite "GossipSub": proc onSend(peer: PubSubPeer, msgs: var RPCMsg) = inc sendCounter - proc onValidated(peer: PubSubPeer, msgs: var RPCMsg) = + proc onValidated(peer: PubSubPeer, msg: Message, msgId: MessageId) = inc validatedCounter let obs0 = PubSubObserver(onSend: onSend) From b6b1782fbb864c3c84aacc674c1726810b3a9b7d Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 31 Jul 2024 15:52:55 +0300 Subject: [PATCH 07/10] fixing test --- tests/pubsub/testgossipsub.nim | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 5b87e97fb0..614916abe7 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -265,21 +265,16 @@ suite "GossipSub": # Send message that will be accepted by the receiver's validator tryPublish await nodes[0].publish("foo", "Hello!".toBytes()), 1 - check await handlerFut - check: recvCounter == 1 validatedCounter == 1 sendCounter == 1 - # Reset future - handlerFut = newFuture[bool]() + check await handlerFut # Send message that will be rejected by the receiver's validator tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1 - check await handlerFut - check: recvCounter == 2 validatedCounter == 1 From 82f7927c595dcc829143c01caea20e9a818b5d6e Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Wed, 31 Jul 2024 15:56:54 +0300 Subject: [PATCH 08/10] rename and delete unnecessary if condition --- libp2p/protocols/pubsub/gossipsub.nim | 2 +- libp2p/protocols/pubsub/pubsubpeer.nim | 7 +++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index e3769ff8b1..03a105ea02 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -454,7 +454,7 @@ proc validateAndRelay( g.rewardDelivered(peer, topic, true) # trigger hooks - peer.recvAndValidatedObservers(msg, msgId) + peer.validatedObservers(msg, msgId) # The send list typically matches the idontwant list from above, but # might differ if validation takes time diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 6b0d5e5615..79c5c90de4 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -175,13 +175,12 @@ proc recvObservers*(p: PubSubPeer, msg: var RPCMsg) = if not (isNil(obs.onRecv)): obs.onRecv(p, msg) -proc recvAndValidatedObservers*(p: PubSubPeer, msg: Message, msgId: MessageId) = +proc validatedObservers*(p: PubSubPeer, msg: Message, msgId: MessageId) = # trigger hooks if not (isNil(p.observers)) and p.observers[].len > 0: for obs in p.observers[]: - if not (isNil(obs)): # TODO: should never be nil, but... - if not (isNil(obs.onValidated)): - obs.onValidated(p, msg, msgId) + if not (isNil(obs.onValidated)): + obs.onValidated(p, msg, msgId) proc sendObservers(p: PubSubPeer, msg: var RPCMsg) = # trigger hooks From ad548cff1aad5a9a22e26852806f7530fb8afd3c Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 1 Aug 2024 10:29:08 +0300 Subject: [PATCH 09/10] discarding handler --- tests/pubsub/testgossipsub.nim | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 614916abe7..9ba41b30a5 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -222,14 +222,12 @@ suite "GossipSub": asyncTest "GossipSub's observers should run after message is sent, received and validated": var - handlerFut = newFuture[bool]() recvCounter = 0 sendCounter = 0 validatedCounter = 0 proc handler(topic: string, data: seq[byte]) {.async.} = - check topic == "foo" - handlerFut.complete(true) + discard proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = inc recvCounter @@ -270,8 +268,6 @@ suite "GossipSub": validatedCounter == 1 sendCounter == 1 - check await handlerFut - # Send message that will be rejected by the receiver's validator tryPublish await nodes[0].publish("bar", "Hello!".toBytes()), 1 From 00fb97106507ce4c1fd82a9e5a4dbee4c3e6c68f Mon Sep 17 00:00:00 2001 From: Gabriel mermelstein Date: Thu, 1 Aug 2024 16:32:54 +0300 Subject: [PATCH 10/10] removing nodesFut --- tests/pubsub/testgossipsub.nim | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 9ba41b30a5..9f059b8f8a 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -241,10 +241,9 @@ suite "GossipSub": let obs0 = PubSubObserver(onSend: onSend) let obs1 = PubSubObserver(onRecv: onRecv, onValidated: onValidated) - let - nodes = generateNodes(2, gossip = true) - # start switches - nodesFut = await allFinished(nodes[0].switch.start(), nodes[1].switch.start()) + let nodes = generateNodes(2, gossip = true) + # start switches + discard await allFinished(nodes[0].switch.start(), nodes[1].switch.start()) await subscribeNodes(nodes) @@ -278,8 +277,6 @@ suite "GossipSub": await allFuturesThrowing(nodes[0].switch.stop(), nodes[1].switch.stop()) - await allFuturesThrowing(nodesFut.concat()) - asyncTest "GossipSub unsub - resub faster than backoff": var handlerFut = newFuture[bool]() proc handler(topic: string, data: seq[byte]) {.async.} =