From 66774dcaca44425538702e871a041835c240e8bc Mon Sep 17 00:00:00 2001 From: shashankshampi Date: Mon, 30 Sep 2024 15:22:42 +0530 Subject: [PATCH] added test for membership for join and leave topic --- tests/pubsub/testgossipmembership.nim | 84 +++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) diff --git a/tests/pubsub/testgossipmembership.nim b/tests/pubsub/testgossipmembership.nim index 82157f9db4..108e79e25c 100644 --- a/tests/pubsub/testgossipmembership.nim +++ b/tests/pubsub/testgossipmembership.nim @@ -22,7 +22,16 @@ import ../../libp2p/muxers/muxer import ../../libp2p/protocols/pubsub/rpc/protobuf import utils import chronos +import unittest2, chronos, stew/byteutils, ../../libp2p/protocols/pubsub/gossipsub +import ../helpers + +import sequtils, options, tables, sets, sugar +import chronos, chronicles # Added chronicles for logging (trace) +import stew/byteutils +import chronos/ratelimit +import metrics +import ../../libp2p/protocols/pubsub/errors as pubsub_errors import ../helpers proc noop(data: seq[byte]) {.async: (raises: [CancelledError, LPStreamError]).} = @@ -182,3 +191,78 @@ suite "GossipSub Topic Membership Tests": await allFuturesThrowing(conns.mapIt(it.close())) await gossipSub.switch.stop() + + # Test for verifying peers joining a topic using `JOIN(topic)` + asyncTest "handle JOIN event": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + let topic = "test-join-topic" + + # Initialize relevant data structures + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + + var conns = newSeq[Connection]() + + for i in 0 ..< 5: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.sendConn = conn + gossipSub.gossipsub[topic].incl(peer) + + # Simulate the peer joining the topic + gossipSub.PubSub.subscribe( + topic, + proc(topic: string, data: seq[byte]): Future[void] {.async.} = + discard + , + ) + + check gossipSub.mesh[topic].len > 0 # Ensure the peer is added to the mesh + check gossipSub.topics.contains(topic) # Ensure the topic is in `topics` + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop() + + # Test for verifying peers leaving a topic using `LEAVE(topic)` + asyncTest "handle LEAVE event": + let gossipSub = TestGossipSub.init(newStandardSwitch()) + + let topic = "test-leave-topic" + + # Initialize relevant data structures + gossipSub.mesh[topic] = initHashSet[PubSubPeer]() + gossipSub.topicParams[topic] = TopicParams.init() + gossipSub.gossipsub[topic] = initHashSet[PubSubPeer]() + + var conns = newSeq[Connection]() + + for i in 0 ..< 5: + let conn = TestBufferStream.new(noop) + conns &= conn + let peerId = randomPeerId() + conn.peerId = peerId + let peer = gossipSub.getPubSubPeer(peerId) + peer.sendConn = conn + gossipSub.gossipsub[topic].incl(peer) + + # Simulate peer joining the topic first + gossipSub.PubSub.subscribe( + topic, + proc(topic: string, data: seq[byte]): Future[void] {.async.} = + discard + , + ) + + # Now simulate peer leaving the topic + gossipSub.PubSub.unsubscribeAll(topic) + + check topic notin gossipSub.mesh # Ensure the peer is removed from the mesh + check topic in gossipSub.gossipsub # Ensure the topic remains in `gossipsub` + + await allFuturesThrowing(conns.mapIt(it.close())) + await gossipSub.switch.stop()