Skip to content

Commit

Permalink
fix: rejecting excess relay connections (#3065)
Browse files Browse the repository at this point in the history
  • Loading branch information
gabrielmer committed Sep 27, 2024
1 parent 368bb3c commit f8946b8
Showing 1 changed file with 50 additions and 57 deletions.
107 changes: 50 additions & 57 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,24 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)

proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## containing at least one stream with the given protocol.

var inPeers: seq[PeerId]
var outPeers: seq[PeerId]

for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
let streams = peerConn.getStreams()
if streams.anyIt(it.protocol == protocol):
if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out:
outPeers.add(peerId)

return (inPeers, outPeers)

# called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
Expand All @@ -412,6 +430,17 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected

## Check max allowed in-relay peers
let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0]
if inRelayPeers.len > pm.inRelayPeersTarget and
pm.peerStore.hasPeer(peerId, WakuRelayCodec):
debug "disconnecting relay peer because reached max num in-relay peers",
peerId = peerId,
inRelayPeers = inRelayPeers.len,
inRelayPeersTarget = pm.inRelayPeersTarget
await pm.switch.disconnect(peerId)

## Apply max ip colocation limit
if (let ip = pm.getPeerIp(peerId); ip.isSome()):
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)

Expand Down Expand Up @@ -494,7 +523,7 @@ proc new*(
error "Max backoff time can't be over 1 week", maxBackoff = backoff
raise newException(Defect, "Max backoff time can't be over 1 week")

let outRelayPeersTarget = max(maxRelayPeersValue div 3, 10)
let outRelayPeersTarget = maxRelayPeersValue div 3

let pm = PeerManager(
switch: switch,
Expand Down Expand Up @@ -560,46 +589,6 @@ proc addServicePeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, proto: str

pm.addPeer(remotePeerInfo)

proc reconnectPeers*(
pm: PeerManager, proto: string, backoff: chronos.Duration = chronos.seconds(0)
) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.

trace "Reconnecting peers", proto = proto

# Proto is not persisted, we need to iterate over all peers.
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if peerInfo.connectedness == CannotConnect:
error "Not reconnecting to unreachable or non-existing peer",
peerId = peerInfo.peerId
continue

# Respect optional backoff period where applicable.
let
# TODO: Add method to peerStore (eg isBackoffExpired())
disconnectTime = Moment.init(peerInfo.disconnectTime, Second) # Convert
currentTime = Moment.init(getTime().toUnix, Second)
# Current time comparable to persisted value
backoffTime = disconnectTime + backoff - currentTime
# Consider time elapsed since last disconnect

trace "Respecting backoff",
backoff = backoff,
disconnectTime = disconnectTime,
currentTime = currentTime,
backoffTime = backoffTime

# TODO: This blocks the whole function. Try to connect to another peer in the meantime.
if backoffTime > ZeroDuration:
trace "Backing off before reconnect...",
peerId = peerInfo.peerId, backoffTime = backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)

discard await pm.connectRelay(peerInfo)

####################
# Dialer interface #
####################
Expand Down Expand Up @@ -685,23 +674,29 @@ proc connectToNodes*(
# later.
await sleepAsync(chronos.seconds(5))

proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## containing at least one stream with the given protocol.
proc reconnectPeers*(
pm: PeerManager, proto: string, backoffTime: chronos.Duration = chronos.seconds(0)
) {.async.} =
## Reconnect to peers registered for this protocol. This will update connectedness.
## Especially useful to resume connections from persistent storage after a restart.

var inPeers: seq[PeerId]
var outPeers: seq[PeerId]
debug "Reconnecting peers", proto = proto

for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
let streams = peerConn.getStreams()
if streams.anyIt(it.protocol == protocol):
if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out:
outPeers.add(peerId)
# Proto is not persisted, we need to iterate over all peers.
for peerInfo in pm.peerStore.peers(protocolMatcher(proto)):
# Check that the peer can be connected
if peerInfo.connectedness == CannotConnect:
error "Not reconnecting to unreachable or non-existing peer",
peerId = peerInfo.peerId
continue

return (inPeers, outPeers)
if backoffTime > ZeroDuration:
debug "Backing off before reconnect",
peerId = peerInfo.peerId, backoffTime = backoffTime
# We disconnected recently and still need to wait for a backoff period before connecting
await sleepAsync(backoffTime)

await pm.connectToNodes(@[peerInfo])

proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
var
Expand Down Expand Up @@ -730,9 +725,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} =

proc connectToRelayPeers*(pm: PeerManager) {.async.} =
var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec)
let maxConnections = pm.switch.connManager.inSema.size
let totalRelayPeers = inRelayPeers.len + outRelayPeers.len
let inPeersTarget = maxConnections - pm.outRelayPeersTarget

if inRelayPeers.len > pm.inRelayPeersTarget:
await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)
Expand Down

0 comments on commit f8946b8

Please sign in to comment.