Skip to content

Commit

Permalink
error handling & clean-up
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Nov 29, 2024
1 parent 1470d1c commit 6014283
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 65 deletions.
2 changes: 1 addition & 1 deletion waku/factory/waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
## libp2p DiscoveryManager
waku[].discoveryMngr = DiscoveryManager()
waku[].discoveryMngr.add(
RendezVousInterface.new(rdv = waku[].node.rendezvous, tta = 1.minutes)
RendezVousInterface.new(rdv = waku[].node.wakuRendezvous, tta = 1.minutes)
)
if not isNil(waku[].node.wakuRelay):
for topic in waku[].node.wakuRelay.getSubscribedTopics():
Expand Down
18 changes: 6 additions & 12 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1217,26 +1217,20 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) =
proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} =
info "mounting rendezvous discovery protocol"

try:
node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr)
except Exception as e:
error "failed to create rendezvous", error = getCurrentExceptionMsg()
return
node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr)

# Always start discovering peers at startup
(await node.wakuRendezvous.initialRequestAll()).isOkOr:
error "rendezvous failed initial requests", error = error

if node.started:
try:
await node.wakuRendezvous.start()
except CatchableError:
error "failed to start rendezvous", error = getCurrentExceptionMsg()
await node.wakuRendezvous.start()

try:
node.switch.mount(node.wakuRendezvous)
except LPError:
error "failed to mount rendezvous", error = getCurrentExceptionMsg()

# Always start discovering peers at startup
await node.wakuRendezvous.initialRequestAll()

proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =
let inputStr = $inputMultiAdd
if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"):
Expand Down
119 changes: 67 additions & 52 deletions waku/waku_rendezvous/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ proc batchAdvertise*(
namespace: string,
ttl: Duration = MinimumDuration,
peers: seq[PeerId],
): Future[Result[void, string]] {.async.} =
## Register with all rendez vous peers under a namespace
): Future[Result[void, string]] {.async: (raises: []).} =
## Register with all rendezvous peers under a namespace

let catchable = catch:
await procCall RendezVous(self).advertise(namespace, ttl, peers)
Expand All @@ -52,8 +52,8 @@ proc batchRequest*(
namespace: string,
count: int = DiscoverLimit,
peers: seq[PeerId],
): Future[Result[seq[PeerRecord], string]] {.async.} =
## Request all records from all rendez vous peers with matching a namespace
): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} =
## Request all records from all rendezvous peers matching a namespace

let catchable = catch:
await RendezVous(self).request(namespace, count, peers)
Expand All @@ -63,35 +63,9 @@ proc batchRequest*(

return ok(catchable.get())

proc getRelayShards(enr: enr.Record): Option[RelayShards] =
let typedRecord = enr.toTyped().valueOr:
return none(RelayShards)

return typedRecord.relaySharding()

proc new*(
T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record
): T =
let relayshard = getRelayShards(enr).valueOr:
warn "Using default cluster id 0"
RelayShards(clusterID: 0, shardIds: @[])

let capabilities = enr.getCapabilities()

let wrv = WakuRendezVous(
peerManager: peerManager, relayshard: relayshard, capabilities: capabilities
)

RendezVous(wrv).setup(switch)

debug "waku rendezvous initialized",
cluster = relayshard.clusterId,
shards = relayshard.shardIds,
capabilities = capabilities

return wrv

proc advertiseAll(self: WakuRendezVous) {.async.} =
proc advertiseAll(
self: WakuRendezVous
): Future[Result[void, string]] {.async: (raises: []).} =
let pubsubTopics = self.relayShard.topics()

let futs = collect(newSeq):
Expand All @@ -105,17 +79,23 @@ proc advertiseAll(self: WakuRendezVous) {.async.} =
# Advertise yourself on that peer
self.advertise(namespace, DefaultRegistrationTTL, @[rpi.peerId])

let handles = await allFinished(futs)
let catchable = catch:
await allFinished(futs)

if catchable.isErr():
return err(catchable.error.msg)

for fut in handles:
let res = fut.read
for fut in catchable.get():
if fut.failed():
warn "rendezvous advertisement failed", error = fut.error.msg

if res.isErr():
warn "rendezvous advertise failed", error = res.error
debug "waku rendezvous advertisements finished"

debug "waku rendezvous advertisements finished", adverCount = handles.len
return ok()

proc initialRequestAll*(self: WakuRendezVous) {.async.} =
proc initialRequestAll*(
self: WakuRendezVous
): Future[Result[void, string]] {.async: (raises: []).} =
let pubsubTopics = self.relayShard.topics()

let futs = collect(newSeq):
Expand All @@ -127,21 +107,27 @@ proc initialRequestAll*(self: WakuRendezVous) {.async.} =
continue

# Ask for 12 peer records for that shard
self.batchRequest(namespace, 12, @[rpi.peerId])
self.request(namespace, 12, @[rpi.peerId])

let handles = await allFinished(futs)
let catchable = catch:
await allFinished(futs)

for fut in handles:
let res = fut.read
if catchable.isErr():
return err(catchable.error.msg)

if res.isErr():
warn "rendezvous request failed", error = res.error
else:
for peer in res.get():
for fut in catchable.get():
if fut.failed():
warn "rendezvous request failed", error = fut.error.msg
elif fut.finished():
let peers = fut.value()

for peer in peers:
peerFoundTotal.inc()
self.peerManager.addPeer(peer)

debug "waku rendezvous requests finished", requestCount = handles.len
debug "waku rendezvous requests finished"

return ok()

proc periodicRegistration(self: WakuRendezVous) {.async.} =
debug "waku rendezvous periodic registration started",
Expand All @@ -151,15 +137,44 @@ proc periodicRegistration(self: WakuRendezVous) {.async.} =
while true:
await sleepAsync(DefaultRegistrationInterval)

await self.advertiseAll()
(await self.advertiseAll()).isOkOr:
debug "waku rendezvous advertisements failed", error = error

proc getRelayShards(enr: enr.Record): Option[RelayShards] =
let typedRecord = enr.toTyped().valueOr:
return none(RelayShards)

return typedRecord.relaySharding()

proc new*(
T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record
): T {.raises: [].} =
let relayshard = getRelayShards(enr).valueOr:
warn "Using default cluster id 0"
RelayShards(clusterID: 0, shardIds: @[])

let capabilities = enr.getCapabilities()

let wrv = WakuRendezVous(
peerManager: peerManager, relayshard: relayshard, capabilities: capabilities
)

RendezVous(wrv).setup(switch)

debug "waku rendezvous initialized",
cluster = relayshard.clusterId,
shards = relayshard.shardIds,
capabilities = capabilities

return wrv

proc start*(self: WakuRendezVous) {.async.} =
proc start*(self: WakuRendezVous) {.async: (raises: []).} =
debug "starting waku rendezvous discovery"

# start registering forever
self.periodicRegistrationFut = self.periodicRegistration()

proc stopWait*(self: WakuRendezVous) {.async.} =
proc stopWait*(self: WakuRendezVous) {.async: (raises: []).} =
debug "stopping waku rendezvous discovery"

if not self.periodicRegistrationFut.isNil():
Expand Down

0 comments on commit 6014283

Please sign in to comment.