Skip to content

Commit

Permalink
feat: waku rendezvous wrapper (#2962)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Dec 9, 2024
1 parent 8f2bd39 commit 650a948
Show file tree
Hide file tree
Showing 11 changed files with 391 additions and 78 deletions.
2 changes: 1 addition & 1 deletion tests/factory/test_node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ suite "Node Factory":
node.wakuStore.isNil()
node.wakuFilter.isNil()
not node.wakuStoreClient.isNil()
not node.rendezvous.isNil()
not node.wakuRendezvous.isNil()

test "Set up a node with Store enabled":
var conf = defaultTestWakuNodeConf()
Expand Down
100 changes: 56 additions & 44 deletions tests/test_waku_rendezvous.nim
Original file line number Diff line number Diff line change
@@ -1,51 +1,63 @@
{.used.}

import chronos, testutils/unittests, libp2p/builders, libp2p/protocols/rendezvous
import std/options, chronos, testutils/unittests, libp2p/builders

import waku/node/waku_switch, ./testlib/common, ./testlib/wakucore

proc newRendezvousClientSwitch(rdv: RendezVous): Switch =
SwitchBuilder
.new()
.withRng(rng())
.withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()])
.withTcpTransport()
.withMplex()
.withNoise()
.withRendezVous(rdv)
.build()
import
waku/waku_core/peers,
waku/node/waku_node,
waku/node/peer_manager/peer_manager,
waku/waku_rendezvous/protocol,
./testlib/[wakucore, wakunode]

procSuite "Waku Rendezvous":
asyncTest "Waku Switch uses Rendezvous":
## Setup

asyncTest "Simple remote test":
let
wakuClient = RendezVous.new()
sourceClient = RendezVous.new()
destClient = RendezVous.new()
wakuSwitch = newRendezvousClientSwitch(wakuClient) #rendezvous point
sourceSwitch = newRendezvousClientSwitch(sourceClient) #client
destSwitch = newRendezvousClientSwitch(destClient) #client

# Setup client rendezvous
wakuClient.setup(wakuSwitch)
sourceClient.setup(sourceSwitch)
destClient.setup(destSwitch)

await allFutures(wakuSwitch.start(), sourceSwitch.start(), destSwitch.start())

# Connect clients to the rendezvous point
await sourceSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs)
await destSwitch.connect(wakuSwitch.peerInfo.peerId, wakuSwitch.peerInfo.addrs)

let res0 = await sourceClient.request("empty")
check res0.len == 0

# Check that source client gets peer info of dest client from rendezvous point
await sourceClient.advertise("foo")
let res1 = await destClient.request("foo")
check:
res1.len == 1
res1[0] == sourceSwitch.peerInfo.signedPeerRecord.data
clusterId = 10.uint16
node1 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
)
node2 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
)
node3 = newTestWakuNode(
generateSecp256k1Key(),
parseIpAddress("0.0.0.0"),
Port(0),
clusterId = clusterId,
)

await allFutures(
[node1.mountRendezvous(), node2.mountRendezvous(), node3.mountRendezvous()]
)
await allFutures([node1.start(), node2.start(), node3.start()])

let peerInfo1 = node1.switch.peerInfo.toRemotePeerInfo()
let peerInfo2 = node2.switch.peerInfo.toRemotePeerInfo()
let peerInfo3 = node3.switch.peerInfo.toRemotePeerInfo()

node1.peerManager.addPeer(peerInfo2)
node2.peerManager.addPeer(peerInfo1)
node2.peerManager.addPeer(peerInfo3)
node3.peerManager.addPeer(peerInfo2)

let namespace = "test/name/space"

let res = await node1.wakuRendezvous.batchAdvertise(
namespace, 60.seconds, @[peerInfo2.peerId]
)
assert res.isOk(), $res.error

let response =
await node3.wakuRendezvous.batchRequest(namespace, 1, @[peerInfo2.peerId])
assert response.isOk(), $response.error
let records = response.get()

await allFutures(wakuSwitch.stop(), sourceSwitch.stop(), destSwitch.stop())
check:
records.len == 1
records[0].peerId == peerInfo1.peerId
1 change: 1 addition & 0 deletions tests/testlib/wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ proc defaultTestWakuNodeConf*(): WakuNodeConf =
clusterId: DefaultClusterId,
shards: @[DefaultShardId],
relay: true,
rendezvous: true,
storeMessageDbUrl: "sqlite://store.sqlite3",
)

Expand Down
4 changes: 2 additions & 2 deletions tests/wakunode2/test_app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ suite "Wakunode2 - Waku initialization":
node.wakuArchive.isNil()
node.wakuStore.isNil()
not node.wakuStoreClient.isNil()
not node.rendezvous.isNil()
not node.wakuRendezvous.isNil()

## Cleanup
waitFor waku.stop()
Expand Down Expand Up @@ -92,7 +92,7 @@ suite "Wakunode2 - Waku initialization":
node.wakuArchive.isNil()
node.wakuStore.isNil()
not node.wakuStoreClient.isNil()
not node.rendezvous.isNil()
not node.wakuRendezvous.isNil()

# DS structures are updated with dynamic ports
typedNodeEnr.get().tcp.get() != 0
Expand Down
7 changes: 7 additions & 0 deletions waku/factory/external_config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,13 @@ with the drawback of consuming some more bandwidth.""",
name: "peer-exchange-node"
.}: string

## Rendez vous
rendezvous* {.
desc: "Enable waku rendezvous discovery server",
defaultValue: true,
name: "rendezvous"
.}: bool

## websocket config
websocketSupport* {.
desc: "Enable websocket: true|false",
Expand Down
9 changes: 3 additions & 6 deletions waku/factory/node_factory.nim
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,9 @@ proc setupProtocols(
protectedShard = shardKey.shard, publicKey = shardKey.key
node.wakuRelay.addSignedShardsValidator(subscribedProtectedShards, conf.clusterId)

# Enable Rendezvous Discovery protocol when Relay is enabled
try:
await mountRendezvous(node)
except CatchableError:
return
err("failed to mount waku rendezvous protocol: " & getCurrentExceptionMsg())
# Only relay nodes should be rendezvous points.
if conf.rendezvous:
await node.mountRendezvous()

# Keepalive mounted on all nodes
try:
Expand Down
10 changes: 0 additions & 10 deletions waku/factory/waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -413,16 +413,6 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
if not waku[].deliveryMonitor.isNil():
waku[].deliveryMonitor.startDeliveryMonitor()

## libp2p DiscoveryManager
waku[].discoveryMngr = DiscoveryManager()
waku[].discoveryMngr.add(
RendezVousInterface.new(rdv = waku[].node.rendezvous, tta = 1.minutes)
)
if not isNil(waku[].node.wakuRelay):
for topic in waku[].node.wakuRelay.getSubscribedTopics():
debug "advertise rendezvous namespace", topic
waku[].discoveryMngr.advertise(RdvNamespace(topic))

return ok()

# Waku shutdown
Expand Down
30 changes: 15 additions & 15 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/protocols/connectivity/autonat/client,
libp2p/protocols/connectivity/autonat/service,
libp2p/protocols/rendezvous,
libp2p/builders,
libp2p/transports/transport,
libp2p/transports/tcptransport,
Expand All @@ -39,6 +38,7 @@ import
../waku_filter_v2/client as filter_client,
../waku_filter_v2/subscriptions as filter_subscriptions,
../waku_metadata,
../waku_rendezvous/protocol,
../waku_lightpush/client as lightpush_client,
../waku_lightpush/common,
../waku_lightpush/protocol,
Expand Down Expand Up @@ -110,7 +110,7 @@ type
enr*: enr.Record
libp2pPing*: Ping
rng*: ref rand.HmacDrbgContext
rendezvous*: RendezVous
wakuRendezvous*: WakuRendezVous
announcedAddresses*: seq[MultiAddress]
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
Expand Down Expand Up @@ -1217,22 +1217,16 @@ proc startKeepalive*(node: WakuNode, keepalive = 2.minutes) =
proc mountRendezvous*(node: WakuNode) {.async: (raises: []).} =
info "mounting rendezvous discovery protocol"

try:
node.rendezvous = RendezVous.new(node.switch)
except Exception as e:
error "failed to create rendezvous", error = getCurrentExceptionMsg()
node.wakuRendezvous = WakuRendezVous.new(node.switch, node.peerManager, node.enr).valueOr:
error "initializing waku rendezvous failed", error = error
return

if node.started:
try:
await node.rendezvous.start()
except CatchableError:
error "failed to start rendezvous", error = getCurrentExceptionMsg()
# Always start discovering peers at startup
(await node.wakuRendezvous.initialRequestAll()).isOkOr:
error "rendezvous failed initial requests", error = error

try:
node.switch.mount(node.rendezvous)
except LPError:
error "failed to mount rendezvous", error = getCurrentExceptionMsg()
if node.started:
await node.wakuRendezvous.start()

proc isBindIpWithZeroPort(inputMultiAdd: MultiAddress): bool =
let inputStr = $inputMultiAdd
Expand Down Expand Up @@ -1304,6 +1298,9 @@ proc start*(node: WakuNode) {.async.} =
if not node.wakuStoreResume.isNil():
await node.wakuStoreResume.start()

if not node.wakuRendezvous.isNil():
await node.wakuRendezvous.start()

## The switch uses this mapper to update peer info addrs
## with announced addrs after start
let addressMapper = proc(
Expand Down Expand Up @@ -1346,6 +1343,9 @@ proc stop*(node: WakuNode) {.async.} =
if not node.wakuPeerExchange.isNil() and not node.wakuPeerExchange.pxLoopHandle.isNil():
await node.wakuPeerExchange.pxLoopHandle.cancelAndWait()

if not node.wakuRendezvous.isNil():
await node.wakuRendezvous.stopWait()

node.started = false

proc isReady*(node: WakuNode): Future[bool] {.async: (raises: [Exception]).} =
Expand Down
3 changes: 3 additions & 0 deletions waku/waku_rendezvous.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import ./waku_rendezvous/protocol

export protocol
36 changes: 36 additions & 0 deletions waku/waku_rendezvous/common.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{.push raises: [].}

import std/options, chronos

import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding

const DiscoverLimit* = 1000
const DefaultRegistrationTTL* = 60.seconds
const DefaultRegistrationInterval* = 10.seconds
const PeersRequestedCount* = 12

proc computeNamespace*(clusterId: uint16, shard: uint16): string =
var namespace = "rs/"

namespace &= $clusterId
namespace &= '/'
namespace &= $shard

return namespace

proc computeNamespace*(clusterId: uint16, shard: uint16, cap: Capabilities): string =
var namespace = "rs/"

namespace &= $clusterId
namespace &= '/'
namespace &= $shard
namespace &= '/'
namespace &= $cap

return namespace

proc getRelayShards*(enr: enr.Record): Option[RelayShards] =
let typedRecord = enr.toTyped().valueOr:
return none(RelayShards)

return typedRecord.relaySharding()
Loading

0 comments on commit 650a948

Please sign in to comment.