Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: waku rendezvous wrapper #2962

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,13 @@ with the drawback of consuming some more bandwitdh.""",
name: "peer-exchange-node"
.}: string

## Rendez vous
rendezvous* {.
desc: "Enable waku rendezvous discovery server",
defaultValue: false,
name: "rendezvous"
.}: bool

## websocket config
websocketSupport* {.
desc: "Enable websocket: true|false",
Expand Down
13 changes: 7 additions & 6 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,13 @@ proc setupProtocols(
protectedShard = shardKey.shard, publicKey = shardKey.key
node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId)

# Enable Rendezvous Discovery protocol when Relay is enabled
try:
await mountRendezvous(node)
except CatchableError:
return
err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg())
# Only relay nodes can be rendezvous points.
if conf.rendezvous:
try:
await mountRendezvous(node)
except CatchableError:
return
err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg())

# Keepalive mounted on all nodes
try:
Expand Down
21 changes: 16 additions & 5 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/connectivity/autonat/client,
libp2p/protocols/connectivity/autonat/service,
libp2p/protocols/rendezvous,
libp2p/builders,
libp2p/transports/transport,
libp2p/transports/tcptransport,
Expand All @@ -39,6 +38,7 @@ import
../waku_filter_v2/client as filter_client,
../waku_filter_v2/subscriptions as filter_subscriptions,
../waku_metadata,
../waku_rendezvous/protocol,
../waku_sync,
../waku_lightpush/client as lightpush_client,
../waku_lightpush/common,
Expand Down Expand Up @@ -109,7 +109,7 @@ type
enr*: enr.Record
libp2pPing*: Ping
rng*: ref rand.HmacDrbgContext
rendezvous*: RendezVous
wakuRendezvous*: WakuRendezVous
announcedAddresses*: seq[MultiAddress]
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
Expand Down Expand Up @@ -1269,19 +1269,22 @@ proc startKeepalive*(node: WakuNode) =
proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} =
info "mounting rendezvous discovery protocol"

node.rendezvous = RendezVous.new(node.switch)
node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr)

if node.started:
try:
await node.rendezvous.start()
await node.wakuRendezvous.start()
except CatchableError:
error "failed to start rendezvous", error = getCurrentExceptionMsg()

try:
node.switch.mount(node.rendezvous)
node.switch.mount(node.wakuRendezvous)
except LPError:
error "failed to mount rendezvous", error = getCurrentExceptionMsg()

# Always start discovering peers at startup
await node.wakuRendezvous.initialRequestAll()

proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =
let inputStr = $inputMultiAdd
if inputStr.contains("0.0.0.0/tcp/0") or inputStr.contains("127.0.0.1/tcp/0"):
Expand Down Expand Up @@ -1338,6 +1341,11 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.start()

if not node.wakuRendezvous.isNil():
try:
await node.wakuRendezvous.start()
except CatchableError:
error "failed to start rendezvous", error = getCurrentExceptionMsg()
if not node.wakuSync.isNil():
node.wakuSync.start()

Expand Down Expand Up @@ -1379,6 +1387,9 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.stopWait()

if not node.wakuRendezvous.isNil():
await node.wakuRendezvous.stop()

node.started = false

proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_rendezvous.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import ./waku_rendezvous/protocol

export protocol
29 changes: 29 additions & 0 deletions waku/waku_rendezvous/common.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{.push raises: [].}

import chronos

import ../waku_enr/capabilities

const DiscoverLimit* = 1000
const DefaultRegistrationTTL* = 60.seconds
const DefaultRegistrationInterval* = 10.seconds

proc computeNamespace*(clusterId: uint16, shard: uint16): string =
var namespace = "rs/"

namespace &= $clusterId
namespace &= '/'
namespace &= $shard

return namespace

proc computeNamespace*(clusterId: uint16, shard: uint16, cap: Capabilities): string =
var namespace = "rs/"

namespace &= $clusterId
namespace &= '/'
namespace &= $shard
namespace &= '/'
namespace &= $cap

return namespace
166 changes: 166 additions & 0 deletions waku/waku_rendezvous/protocol.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
{.push raises: [].}

import
std/[sugar, options],
results,
chronos,
chronicles,
metrics,
libp2p/protocols/rendezvous,
libp2p/switch,
libp2p/utility

import
../node/peer_manager,
../common/enr,
../waku_enr/capabilities,
../waku_enr/sharding,
../waku_core/peers,
../waku_core/topics,
./common

logScope:
topics = "waku rendez vous"

declarePublicCounter peerFoundTotal, "total number of peers found via rendezvous"

type WakuRendezVous* = ref object of RendezVous
peerManager: PeerManager
relayShard: RelayShards
capabilities: seq[Capabilities]

periodicRegistrationFut: Future[void]

proc batchAdvertise*(
self: WakuRendezVous,
namespace: string,
ttl: Duration = MinimumDuration,
peers: seq[PeerId],
): Future[Result[void, string]] {.async.} =
## Register with all rendez vous peers under a namespace

let catchable = catch:
await procCall RendezVous(self).advertise(namespace, ttl, peers)

if catchable.isErr():
return err(catchable.error.msg)

return ok()

proc batchRequest*(
self: WakuRendezVous,
namespace: string,
count: int = DiscoverLimit,
peers: seq[PeerId],
): Future[Result[seq[PeerRecord], string]] {.async.} =
## Request all records from all rendez vous peers with matching a namespace

let catchable = catch:
await RendezVous(self).request(namespace, count, peers)

if catchable.isErr():
return err(catchable.error.msg)

return ok(catchable.get())

proc getRelayShards(enr: enr.Record): Option[RelayShards] =
let typedRecord = enr.toTyped().valueOr:
return none(RelayShards)

return typedRecord.relaySharding()

proc new*(
T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record
): T =
let relayshard = getRelayShards(enr).valueOr:
warn "Using default cluster id 0"
RelayShards(clusterID: 0, shardIds: @[])

let capabilities = enr.getCapabilities()

let wrv = WakuRendezVous(
peerManager: peerManager, relayshard: relayshard, capabilities: capabilities
)

RendezVous(wrv).setup(switch)

debug "waku rendezvous initialized",
cluster = relayshard.clusterId,
shards = relayshard.shardIds,
capabilities = capabilities

return wrv

proc advertiseAll(self: WakuRendezVous) {.async.} =
let pubsubTopics = self.relayShard.topics()

let futs = collect(newSeq):
for pubsubTopic in pubsubTopics:
let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId)

# Get a random RDV peer for that shard
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
continue

# Advertise yourself on that peer
self.advertise(namespace, DefaultRegistrationTTL, @[rpi.peerId])

let handles = await allFinished(futs)

for fut in handles:
let res = fut.read

if res.isErr():
warn "rendezvous advertise failed", error = res.error

debug "waku rendezvous advertisements finished", adverCount = handles.len

proc initialRequestAll*(self: WakuRendezVous) {.async.} =
let pubsubTopics = self.relayShard.topics()

let futs = collect(newSeq):
for pubsubTopic in pubsubTopics:
let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId)

# Get a random RDV peer for that shard
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
continue

# Ask for 12 peer records for that shard
self.batchRequest(namespace, 12, @[rpi.peerId])

let handles = await allFinished(futs)

for fut in handles:
let res = fut.read

if res.isErr():
warn "rendezvous request failed", error = res.error
else:
for peer in res.get():
peerFoundTotal.inc()
self.peerManager.addPeer(peer)

debug "waku rendezvous requests finished", requestCount = handles.len

proc periodicRegistration(self: WakuRendezVous) {.async.} =
debug "waku rendezvous periodic registration started",
interval = DefaultRegistrationInterval

# infinite loop
while true:
await sleepAsync(DefaultRegistrationInterval)

await self.advertiseAll()

proc start*(self: WakuRendezVous) {.async.} =
debug "starting waku rendezvous discovery"

# start registering forever
self.periodicRegistrationFut = self.periodicRegistration()

proc stopWait*(self: WakuRendezVous) {.async.} =
debug "stopping waku rendezvous discovery"

if not self.periodicRegistrationFut.isNil():
await self.periodicRegistrationFut.cancelAndWait()
Loading