diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 48d75cfd15..e9fbb48955 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -377,7 +377,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: ## libp2p DiscoveryManager waku[].discoveryMngr = DiscoveryManager() waku[].discoveryMngr.add( - RendezVousInterface.new(rdv = waku[].node.rendezvous, tta = 1.minutes) + RendezVousInterface.new(rdv = waku[].node.wakuRendezvous, tta = 1.minutes) ) if not isNil(waku[].node.wakuRelay): for topic in waku[].node.wakuRelay.getSubscribedTopics(): diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 4d985f3db6..fe279fad11 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -1217,26 +1217,20 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) = proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} = info "mounting rendezvous discovery protocol" - try: - node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr) - except Exception as e: - error "failed to create rendezvous", error = getCurrentExceptionMsg() - return + node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr) + + # Always start discovering peers at startup + (await node.wakuRendezvous.initialRequestAll()).isOkOr: + error "rendezvous failed initial requests", error = error if node.started: - try: - await node.wakuRendezvous.start() - except CatchableError: - error "failed to start rendezvous", error = getCurrentExceptionMsg() + await node.wakuRendezvous.start() try: node.switch.mount(node.wakuRendezvous) except LPError: error "failed to mount rendezvous", error = getCurrentExceptionMsg() - # Always start discovering peers at startup - await node.wakuRendezvous.initialRequestAll() - proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool = let inputStr = $inputMultiAdd if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"): diff --git a/waku/waku_rendezvous/protocol.nim b/waku/waku_rendezvous/protocol.nim index 163ceb1f65..1e7bba8a68 100644 --- a/waku/waku_rendezvous/protocol.nim +++ b/waku/waku_rendezvous/protocol.nim @@ -36,8 +36,8 @@ proc batchAdvertise*( namespace: string, ttl: Duration = MinimumDuration, peers: seq[PeerId], -): Future[Result[void, string]] {.async.} = - ## Register with all rendez vous peers under a namespace +): Future[Result[void, string]] {.async: (raises: []).} = + ## Register with all rendezvous peers under a namespace let catchable = catch: await procCall RendezVous(self).advertise(namespace, ttl, peers) @@ -52,8 +52,8 @@ proc batchRequest*( namespace: string, count: int = DiscoverLimit, peers: seq[PeerId], -): Future[Result[seq[PeerRecord], string]] {.async.} = - ## Request all records from all rendez vous peers with matching a namespace +): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} = + ## Request all records from all rendezvous peers matching a namespace let catchable = catch: await RendezVous(self).request(namespace, count, peers) @@ -63,35 +63,9 @@ proc batchRequest*( return ok(catchable.get()) -proc getRelayShards(enr: enr.Record): Option[RelayShards] = - let typedRecord = enr.toTyped().valueOr: - return none(RelayShards) - - return typedRecord.relaySharding() - -proc new*( - T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record -): T = - let relayshard = getRelayShards(enr).valueOr: - warn "Using default cluster id 0" - RelayShards(clusterID: 0, shardIds: @[]) - - let capabilities = enr.getCapabilities() - - let wrv = WakuRendezVous( - peerManager: peerManager, relayshard: relayshard, capabilities: capabilities - ) - - RendezVous(wrv).setup(switch) - - debug "waku rendezvous initialized", - cluster = relayshard.clusterId, - shards = relayshard.shardIds, - capabilities = capabilities - - return wrv - -proc advertiseAll(self: WakuRendezVous) {.async.} = +proc advertiseAll( + self: WakuRendezVous +): Future[Result[void, string]] {.async: (raises: []).} = let pubsubTopics = self.relayShard.topics() let futs = collect(newSeq): @@ -105,17 +79,23 @@ proc advertiseAll(self: WakuRendezVous) {.async.} = # Advertise yourself on that peer self.advertise(namespace, DefaultRegistrationTTL, @[rpi.peerId]) - let handles = await allFinished(futs) + let catchable = catch: + await allFinished(futs) + + if catchable.isErr(): + return err(catchable.error.msg) - for fut in handles: - let res = fut.read + for fut in catchable.get(): + if fut.failed(): + warn "rendezvous advertisement failed", error = fut.error.msg - if res.isErr(): - warn "rendezvous advertise failed", error = res.error + debug "waku rendezvous advertisements finished" - debug "waku rendezvous advertisements finished", adverCount = handles.len + return ok() -proc initialRequestAll*(self: WakuRendezVous) {.async.} = +proc initialRequestAll*( + self: WakuRendezVous +): Future[Result[void, string]] {.async: (raises: []).} = let pubsubTopics = self.relayShard.topics() let futs = collect(newSeq): @@ -127,21 +107,27 @@ proc initialRequestAll*(self: WakuRendezVous) {.async.} = continue # Ask for 12 peer records for that shard - self.batchRequest(namespace, 12, @[rpi.peerId]) + self.request(namespace, 12, @[rpi.peerId]) - let handles = await allFinished(futs) + let catchable = catch: + await allFinished(futs) - for fut in handles: - let res = fut.read + if catchable.isErr(): + return err(catchable.error.msg) - if res.isErr(): - warn "rendezvous request failed", error = res.error - else: - for peer in res.get(): + for fut in catchable.get(): + if fut.failed(): + warn "rendezvous request failed", error = fut.error.msg + elif fut.finished(): + let peers = fut.value() + + for peer in peers: peerFoundTotal.inc() self.peerManager.addPeer(peer) - debug "waku rendezvous requests finished", requestCount = handles.len + debug "waku rendezvous requests finished" + + return ok() proc periodicRegistration(self: WakuRendezVous) {.async.} = debug "waku rendezvous periodic registration started", @@ -151,15 +137,44 @@ proc periodicRegistration(self: WakuRendezVous) {.async.} = while true: await sleepAsync(DefaultRegistrationInterval) - await self.advertiseAll() + (await self.advertiseAll()).isOkOr: + debug "waku rendezvous advertisements failed", error = error + +proc getRelayShards(enr: enr.Record): Option[RelayShards] = + let typedRecord = enr.toTyped().valueOr: + return none(RelayShards) + + return typedRecord.relaySharding() + +proc new*( + T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record +): T {.raises: [].} = + let relayshard = getRelayShards(enr).valueOr: + warn "Using default cluster id 0" + RelayShards(clusterID: 0, shardIds: @[]) + + let capabilities = enr.getCapabilities() + + let wrv = WakuRendezVous( + peerManager: peerManager, relayshard: relayshard, capabilities: capabilities + ) + + RendezVous(wrv).setup(switch) + + debug "waku rendezvous initialized", + cluster = relayshard.clusterId, + shards = relayshard.shardIds, + capabilities = capabilities + + return wrv -proc start*(self: WakuRendezVous) {.async.} = +proc start*(self: WakuRendezVous) {.async: (raises: []).} = debug "starting waku rendezvous discovery" # start registering forever self.periodicRegistrationFut = self.periodicRegistration() -proc stopWait*(self: WakuRendezVous) {.async.} = +proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} = debug "stopping waku rendezvous discovery" if not self.periodicRegistrationFut.isNil():