From 9289a730e31fbb4d7ed774620e73f2aea7e29612 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Thu, 5 Sep 2024 16:09:59 +0200 Subject: [PATCH] disconnecting from excess in peers --- waku/node/peer_manager/peer_manager.nim | 51 ++++++++++++++++--------- 1 file changed, 32 insertions(+), 19 deletions(-) diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index cc56daa54..63480ac52 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -403,6 +403,24 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = asyncSpawn(pm.switch.disconnect(peerId)) pm.peerStore.delete(peerId) +proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = + ## Returns the peerIds of physical connections (in and out) + ## containing at least one stream with the given protocol. + + var inPeers: seq[PeerId] + var outPeers: seq[PeerId] + + for peerId, muxers in pm.switch.connManager.getConnections(): + for peerConn in muxers: + let streams = peerConn.getStreams() + if streams.anyIt(it.protocol == protocol): + if peerConn.connection.transportDir == Direction.In: + inPeers.add(peerId) + elif peerConn.connection.transportDir == Direction.Out: + outPeers.add(peerId) + + return (inPeers, outPeers) + # called when a peer i) first connects to us ii) disconnects all connections from us proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined: @@ -416,6 +434,19 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = direction = if event.initiator: Outbound else: Inbound connectedness = Connected + ## Check max allowed in-relay peers + let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0] + if inRelayPeers.len > pm.inRelayPeersTarget and + pm.peerStore.hasPeer(peerId, WakuRelayCodec): + debug "disconnecting relay peer because reached max num in-relay peers", + peerId, + inRelayPeers = inRelayPeers.len, + inRelayPeersTarget = pm.inRelayPeersTarget + + await pm.switch.disconnect(peerId) + pm.peerStore.delete(peerId) + + ## Apply max ip colocation limit if (let ip = pm.getPeerIp(peerId); ip.isSome()): pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) @@ -498,7 +529,7 @@ proc new*( error "Max backoff time can't be over 1 week", maxBackoff = backoff raise newException(Defect, "Max backoff time can't be over 1 week") - # let outRelayPeersTarget = 2 * maxRelayPeersValue div 3 + let outRelayPeersTarget = maxRelayPeersValue div 3 let pm = PeerManager( switch: switch, @@ -673,24 +704,6 @@ proc reconnectPeers*( await pm.connectToNodes(@[peerInfo]) -proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = - ## Returns the peerIds of physical connections (in and out) - ## containing at least one stream with the given protocol. - - var inPeers: seq[PeerId] - var outPeers: seq[PeerId] - - for peerId, muxers in pm.switch.connManager.getConnections(): - for peerConn in muxers: - let streams = peerConn.getStreams() - if streams.anyIt(it.protocol == protocol): - if peerConn.connection.transportDir == Direction.In: - inPeers.add(peerId) - elif peerConn.connection.transportDir == Direction.Out: - outPeers.add(peerId) - - return (inPeers, outPeers) - proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) = var numStreamsIn = 0