Skip to content

Commit

Permalink
chore: test peer connection management (#3049)
Browse files Browse the repository at this point in the history
* Make some useful consts public, add some utils.
* Implement various utilities.
* peer_manager reconnectPeers enhancements

---------

Co-authored-by: Álex Cabeza Romero <[email protected]>
  • Loading branch information
Ivansete-status and AlejandroCabeza authored Sep 24, 2024
1 parent 7c4a971 commit 711e7db
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 97 deletions.
83 changes: 28 additions & 55 deletions tests/node/test_wakunode_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import
chronos,
# chronos/timer,
chronicles,
times,
libp2p/[peerstore, crypto/crypto, multiaddress]

from times import getTime, toUnix
Expand Down Expand Up @@ -62,9 +63,9 @@ suite "Peer Manager":
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()

server = newTestWakuNode(serverKey, listenIp, listenPort)
server = newTestWakuNode(serverKey, listenIp, Port(3000))
serverPeerStore = server.peerManager.peerStore
client = newTestWakuNode(clientKey, listenIp, listenPort)
client = newTestWakuNode(clientKey, listenIp, Port(3001))
clientPeerStore = client.peerManager.peerStore

await allFutures(server.start(), client.start())
Expand Down Expand Up @@ -577,77 +578,49 @@ suite "Peer Manager":
Connectedness.CannotConnect

suite "Automatic Reconnection":
xasyncTest "Automatic Reconnection Implementation":
asyncTest "Automatic Reconnection Implementation":
# Given two correctly initialised nodes, that are available for reconnection
await server.mountRelay()
await client.mountRelay()
await client.connectToNodes(@[serverRemotePeerInfo])
await server.switch.stop()
await client.switch.stop()
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect

waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected

await client.disconnectNode(serverRemotePeerInfo)

waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect

# When triggering the reconnection
await client.peerManager.reconnectPeers(WakuRelayCodec)

# Then both peers should be marked as Connected
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected
waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected

xasyncTest "Automatic Reconnection Implementation (With Backoff)":
# Given two correctly initialised nodes, that are available for reconnection
await server.mountRelay()
await client.mountRelay()
await client.connectToNodes(@[serverRemotePeerInfo])
waitFor allFutures(server.switch.stop(), client.switch.stop())
waitFor allFutures(server.switch.start(), client.switch.start())
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect
## Now let's do the same but with backoff period
await client.disconnectNode(serverRemotePeerInfo)

# When triggering a reconnection with a backoff period
let
backoffPeriod = 10.seconds
halfBackoffPeriod = 5.seconds
waitActive:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect and
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect

# When triggering a reconnection with a backoff period
let backoffPeriod = chronos.seconds(1)
let beforeReconnect = getTime().toUnixFloat()
await client.peerManager.reconnectPeers(WakuRelayCodec, backoffPeriod)
await sleepAsync(halfBackoffPeriod)

# If the backoff period is not over, then the peers should still be marked as CanConnect
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect

# When waiting for the backoff period to be over
await sleepAsync(halfBackoffPeriod)

# Then both peers should be marked as Connected
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected

xasyncTest "Automatic Reconnection Implementation (After client restart)":
# Given two correctly initialised nodes, that are available for reconnection
await server.mountRelay()
await client.mountRelay()
await client.connectToNodes(@[serverRemotePeerInfo])
await server.switch.stop()
await client.switch.stop()
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.CanConnect
serverPeerStore.get(clientPeerId).connectedness == Connectedness.CanConnect

# When triggering the reconnection, and some time for the reconnection to happen
waitFor allFutures(client.stop(), server.stop())
await allFutures(server.start(), client.start())
await sleepAsync(FUTURE_TIMEOUT_LONG)
let reconnectDurationWithBackoffPeriod =
getTime().toUnixFloat() - beforeReconnect

# Then both peers should be marked as Connected
check:
clientPeerStore.get(serverPeerId).connectedness == Connectedness.Connected
serverPeerStore.get(clientPeerId).connectedness == Connectedness.Connected
reconnectDurationWithBackoffPeriod > backoffPeriod.seconds.float

suite "Handling Connections on Different Networks":
# TODO: Implement after discv5 and peer manager's interaction is understood
Expand Down
10 changes: 9 additions & 1 deletion tests/testlib/testutils.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import testutils/unittests
import testutils/unittests, chronos

template xsuite*(name: string, body: untyped) =
discard
Expand Down Expand Up @@ -27,3 +27,11 @@ template xasyncTest*(name: string, body: untyped) =
template asyncTestx*(name: string, body: untyped) =
test name:
skip()

template waitActive*(condition: bool) =
for i in 0 ..< 200:
if condition:
break
await sleepAsync(10)

assert condition
70 changes: 29 additions & 41 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ proc connectRelay*(

return false

proc disconnectNode*(pm: PeerManager, peer: RemotePeerInfo) {.async.} =
let peerId = peer.peerId
await pm.switch.disconnect(peerId)

# Dialing should be used for just protocols that require a stream to write and read
# This shall not be used to dial Relay protocols, since that would create
# unneccesary unused streams.
Expand Down Expand Up @@ -560,46 +564,6 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str

pm.addPeer(remotePeerInfo)

proc reconnectPeers*(
pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0)
) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.

trace "Reconnecting peers", proto = proto

# Proto is not persisted, we need to iterate over all peers.
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if peerInfo.connectedness == CannotConnect:
error "Not reconnecting to unreachable or non-existing peer",
peerId = peerInfo.peerId
continue

# Respect optional backoff period where applicable.
let
# TODO: Add method to peerStore (eg isBackoffExpired())
disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert
currentTime = Moment.init(getTime().toUnix, Second)
# Current time comparable to persisted value
backoffTime = disconnectTime + backoff - currentTime
# Consider time elapsed since last disconnect

trace "Respecting backoff",
backoff = backoff,
disconnectTime = disconnectTime,
currentTime = currentTime,
backoffTime = backoffTime

# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
if backoffTime > ZeroDuration:
trace "Backing off before reconnect...",
peerId = peerInfo.peerId, backoffTime = backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)

discard await pm.connectRelay(peerInfo)

####################
# Dialer interface #
####################
Expand Down Expand Up @@ -647,7 +611,7 @@ proc connectToNodes*(
if nodes.len == 0:
return

info "Dialing multiple peers", numOfPeers = nodes.len
info "Dialing multiple peers", numOfPeers = nodes.len, nodes = $nodes

var futConns: seq[Future[bool]]
var connectedPeers: seq[RemotePeerInfo]
Expand Down Expand Up @@ -685,6 +649,30 @@ proc connectToNodes*(
# later.
await sleepAsync(chronos.seconds(5))

proc reconnectPeers*(
pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0)
) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.

debug "Reconnecting peers", proto = proto

# Proto is not persisted, we need to iterate over all peers.
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if peerInfo.connectedness == CannotConnect:
error "Not reconnecting to unreachable or non-existing peer",
peerId = peerInfo.peerId
continue

if backoffTime > ZeroDuration:
debug "Backing off before reconnect",
peerId = peerInfo.peerId, backoffTime = backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)

await pm.connectToNodes(@[peerInfo])

proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## containing at least one stream with the given protocol.
Expand Down
3 changes: 3 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ proc connectToNodes*(
# NOTE Connects to the node without a give protocol, which automatically creates streams for relay
await peer_manager.connectToNodes(node.peerManager, nodes, source = source)

proc disconnectNode*(node: WakuNode, remotePeer: RemotePeerInfo) {.async.} =
await peer_manager.disconnectNode(node.peerManager, remotePeer)

## Waku Sync

proc mountWakuSync*(
Expand Down

0 comments on commit 711e7db

Please sign in to comment.