Skip to content

Commit

Permalink
disconnecting from excess in peers
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status authored and gabrielmer committed Sep 27, 2024
1 parent 055b81b commit 9289a73
Showing 1 changed file with 32 additions and 19 deletions.
51 changes: 32 additions & 19 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,24 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
asyncSpawn(pm.switch.disconnect(peerId))
pm.peerStore.delete(peerId)

proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## containing at least one stream with the given protocol.

var inPeers: seq[PeerId]
var outPeers: seq[PeerId]

for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
let streams = peerConn.getStreams()
if streams.anyIt(it.protocol == protocol):
if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out:
outPeers.add(peerId)

return (inPeers, outPeers)

# called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
Expand All @@ -416,6 +434,19 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected

## Check max allowed in-relay peers
let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0]
if inRelayPeers.len > pm.inRelayPeersTarget and
pm.peerStore.hasPeer(peerId, WakuRelayCodec):
debug "disconnecting relay peer because reached max num in-relay peers",
peerId,
inRelayPeers = inRelayPeers.len,
inRelayPeersTarget = pm.inRelayPeersTarget

await pm.switch.disconnect(peerId)
pm.peerStore.delete(peerId)

## Apply max ip colocation limit
if (let ip = pm.getPeerIp(peerId); ip.isSome()):
pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId)

Expand Down Expand Up @@ -498,7 +529,7 @@ proc new*(
error "Max backoff time can't be over 1 week", maxBackoff = backoff
raise newException(Defect, "Max backoff time can't be over 1 week")

# let outRelayPeersTarget = 2 * maxRelayPeersValue div 3
let outRelayPeersTarget = maxRelayPeersValue div 3

let pm = PeerManager(
switch: switch,
Expand Down Expand Up @@ -673,24 +704,6 @@ proc reconnectPeers*(

await pm.connectToNodes(@[peerInfo])

proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## containing at least one stream with the given protocol.

var inPeers: seq[PeerId]
var outPeers: seq[PeerId]

for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
let streams = peerConn.getStreams()
if streams.anyIt(it.protocol == protocol):
if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out:
outPeers.add(peerId)

return (inPeers, outPeers)

proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) =
var
numStreamsIn = 0
Expand Down

0 comments on commit 9289a73

Please sign in to comment.