diff --git a/waku/factory/external_config.nim b/waku/factory/external_config.nim index 2446352c10..dcc71d54df 100644 --- a/waku/factory/external_config.nim +++ b/waku/factory/external_config.nim @@ -644,6 +644,13 @@ with the drawback of consuming some more bandwitdh.""", name: "peer-exchange-node" .}: string + ## Rendez vous + rendezvous* {. + desc: "Enable waku rendezvous discovery server", + defaultValue: false, + name: "rendezvous" + .}: bool + ## websocket config websocketSupport* {. desc: "Enable websocket: true|false", diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index dcd2179043..a6140ad143 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -204,12 +204,13 @@ proc setupProtocols( protectedShard = shardKey.shard, publicKey = shardKey.key node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId) - # Enable Rendezvous Discovery protocol when Relay is enabled - try: - await mountRendezvous(node) - except CatchableError: - return - err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) + # Only relay nodes can be rendezvous points. + if conf.rendezvous: + try: + await mountRendezvous(node) + except CatchableError: + return + err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg()) # Keepalive mounted on all nodes try: diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index ca10ca799b..b57f49681c 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -17,7 +17,6 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/connectivity/autonat/client, libp2p/protocols/connectivity/autonat/service, - libp2p/protocols/rendezvous, libp2p/builders, libp2p/transports/transport, libp2p/transports/tcptransport, @@ -39,6 +38,7 @@ import ../waku_filter_v2/client as filter_client, ../waku_filter_v2/subscriptions as filter_subscriptions, ../waku_metadata, + ../waku_rendezvous/protocol, ../waku_sync, ../waku_lightpush/client as lightpush_client, ../waku_lightpush/common, @@ -109,7 +109,7 @@ type enr*: enr.Record libp2pPing*: Ping rng*: ref rand.HmacDrbgContext - rendezvous*: RendezVous + wakuRendezvous*: WakuRendezVous announcedAddresses*: seq[MultiAddress] started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] @@ -1269,19 +1269,22 @@ proc startKeepalive*(node: WakuNode) = proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" - node.rendezvous = RendezVous.new(node.switch) + node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr) if node.started: try: - await node.rendezvous.start() + await node.wakuRendezvous.start() except CatchableError: error "failed to start rendezvous", error = getCurrentExceptionMsg() try: - node.switch.mount(node.rendezvous) + node.switch.mount(node.wakuRendezvous) except LPError: error "failed to mount rendezvous", error = getCurrentExceptionMsg() + # Always start discovering peers at startup + await node.wakuRendezvous.initialRequestAll() + proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = let inputStr = $inputMultiAdd if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"): @@ -1338,6 +1341,11 @@ proc start*(node: WakuNode) {.async.} = if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.start() + if not node.wakuRendezvous.isNil(): + try: + await node.wakuRendezvous.start() + except CatchableError: + error "failed to start rendezvous", error = getCurrentExceptionMsg() if not node.wakuSync.isNil(): node.wakuSync.start() @@ -1379,6 +1387,9 @@ proc stop*(node: WakuNode) {.async.} = if not node.wakuStoreResume.isNil(): await node.wakuStoreResume.stopWait() + if not node.wakuRendezvous.isNil(): + await node.wakuRendezvous.stop() + node.started = false proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} = diff --git a/waku/waku_rendezvous.nim b/waku/waku_rendezvous.nim new file mode 100644 index 0000000000..b07f1f7274 --- /dev/null +++ b/waku/waku_rendezvous.nim @@ -0,0 +1,3 @@ +import ./waku_rendezvous/protocol + +export protocol diff --git a/waku/waku_rendezvous/common.nim b/waku/waku_rendezvous/common.nim new file mode 100644 index 0000000000..06e1e494c7 --- /dev/null +++ b/waku/waku_rendezvous/common.nim @@ -0,0 +1,29 @@ +{.push raises: [].} + +import chronos + +import ../waku_enr/capabilities + +const DiscoverLimit* = 1000 +const DefaultRegistrationTTL* = 60.seconds +const DefaultRegistrationInterval* = 10.seconds + +proc computeNamespace*(clusterId: uint16, shard: uint16): string = + var namespace = "rs/" + + namespace &= $clusterId + namespace &= '/' + namespace &= $shard + + return namespace + +proc computeNamespace*(clusterId: uint16, shard: uint16, cap: Capabilities): string = + var namespace = "rs/" + + namespace &= $clusterId + namespace &= '/' + namespace &= $shard + namespace &= '/' + namespace &= $cap + + return namespace diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim new file mode 100644 index 0000000000..163ceb1f65 --- /dev/null +++ b/waku/waku_rendezvous/protocol.nim @@ -0,0 +1,166 @@ +{.push raises: [].} + +import + std/[sugar, options], + results, + chronos, + chronicles, + metrics, + libp2p/protocols/rendezvous, + libp2p/switch, + libp2p/utility + +import + ../node/peer_manager, + ../common/enr, + ../waku_enr/capabilities, + ../waku_enr/sharding, + ../waku_core/peers, + ../waku_core/topics, + ./common + +logScope: + topics = "waku rendez vous" + +declarePublicCounter peerFoundTotal, "total number of peers found via rendezvous" + +type WakuRendezVous* = ref object of RendezVous + peerManager: PeerManager + relayShard: RelayShards + capabilities: seq[Capabilities] + + periodicRegistrationFut: Future[void] + +proc batchAdvertise*( + self: WakuRendezVous, + namespace: string, + ttl: Duration = MinimumDuration, + peers: seq[PeerId], +): Future[Result[void, string]] {.async.} = + ## Register with all rendez vous peers under a namespace + + let catchable = catch: + await procCall RendezVous(self).advertise(namespace, ttl, peers) + + if catchable.isErr(): + return err(catchable.error.msg) + + return ok() + +proc batchRequest*( + self: WakuRendezVous, + namespace: string, + count: int = DiscoverLimit, + peers: seq[PeerId], +): Future[Result[seq[PeerRecord], string]] {.async.} = + ## Request all records from all rendez vous peers with matching a namespace + + let catchable = catch: + await RendezVous(self).request(namespace, count, peers) + + if catchable.isErr(): + return err(catchable.error.msg) + + return ok(catchable.get()) + +proc getRelayShards(enr: enr.Record): Option[RelayShards] = + let typedRecord = enr.toTyped().valueOr: + return none(RelayShards) + + return typedRecord.relaySharding() + +proc new*( + T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record +): T = + let relayshard = getRelayShards(enr).valueOr: + warn "Using default cluster id 0" + RelayShards(clusterID: 0, shardIds: @[]) + + let capabilities = enr.getCapabilities() + + let wrv = WakuRendezVous( + peerManager: peerManager, relayshard: relayshard, capabilities: capabilities + ) + + RendezVous(wrv).setup(switch) + + debug "waku rendezvous initialized", + cluster = relayshard.clusterId, + shards = relayshard.shardIds, + capabilities = capabilities + + return wrv + +proc advertiseAll(self: WakuRendezVous) {.async.} = + let pubsubTopics = self.relayShard.topics() + + let futs = collect(newSeq): + for pubsubTopic in pubsubTopics: + let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) + + # Get a random RDV peer for that shard + let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + continue + + # Advertise yourself on that peer + self.advertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) + + let handles = await allFinished(futs) + + for fut in handles: + let res = fut.read + + if res.isErr(): + warn "rendezvous advertise failed", error = res.error + + debug "waku rendezvous advertisements finished", adverCount = handles.len + +proc initialRequestAll*(self: WakuRendezVous) {.async.} = + let pubsubTopics = self.relayShard.topics() + + let futs = collect(newSeq): + for pubsubTopic in pubsubTopics: + let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId) + + # Get a random RDV peer for that shard + let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr: + continue + + # Ask for 12 peer records for that shard + self.batchRequest(namespace, 12, @[rpi.peerId]) + + let handles = await allFinished(futs) + + for fut in handles: + let res = fut.read + + if res.isErr(): + warn "rendezvous request failed", error = res.error + else: + for peer in res.get(): + peerFoundTotal.inc() + self.peerManager.addPeer(peer) + + debug "waku rendezvous requests finished", requestCount = handles.len + +proc periodicRegistration(self: WakuRendezVous) {.async.} = + debug "waku rendezvous periodic registration started", + interval = DefaultRegistrationInterval + + # infinite loop + while true: + await sleepAsync(DefaultRegistrationInterval) + + await self.advertiseAll() + +proc start*(self: WakuRendezVous) {.async.} = + debug "starting waku rendezvous discovery" + + # start registering forever + self.periodicRegistrationFut = self.periodicRegistration() + +proc stopWait*(self: WakuRendezVous) {.async.} = + debug "stopping waku rendezvous discovery" + + if not self.periodicRegistrationFut.isNil(): + await self.periodicRegistrationFut.cancelAndWait()