diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 0b53129277..07a091e864 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -404,6 +404,24 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = asyncSpawn(pm.switch.disconnect(peerId)) pm.wakuPeerStore.delete(peerId) +proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = + ## Returns the peerIds of physical connections (in and out) + ## containing at least one stream with the given protocol. + + var inPeers: seq[PeerId] + var outPeers: seq[PeerId] + + for peerId, muxers in pm.switch.connManager.getConnections(): + for peerConn in muxers: + let streams = peerConn.getStreams() + if streams.anyIt(it.protocol == protocol): + if peerConn.connection.transportDir == Direction.In: + inPeers.add(peerId) + elif peerConn.connection.transportDir == Direction.Out: + outPeers.add(peerId) + + return (inPeers, outPeers) + # called when a peer i) first connects to us ii) disconnects all connections from us proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined: @@ -417,6 +435,17 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = direction = if event.initiator: Outbound else: Inbound connectedness = Connected + ## Check max allowed in-relay peers + let inRelayPeers = pm.connectedPeers(WakuRelayCodec)[0] + if inRelayPeers.len > pm.inRelayPeersTarget and + pm.wakuPeerStore.hasPeer(peerId, WakuRelayCodec): + debug "disconnecting relay peer because reached max num in-relay peers", + peerId = peerId, + inRelayPeers = inRelayPeers.len, + inRelayPeersTarget = pm.inRelayPeersTarget + await pm.switch.disconnect(peerId) + + ## Apply max ip colocation limit if (let ip = pm.getPeerIp(peerId); ip.isSome()): pm.ipTable.mgetOrPut(ip.get, newSeq[PeerId]()).add(peerId) @@ -499,7 +528,7 @@ proc new*( error "Max backoff time can't be over 1 week", maxBackoff = backoff raise newException(Defect, "Max backoff time can't be over 1 week") - let outRelayPeersTarget = max(maxRelayPeersValue div 3, 10) + let outRelayPeersTarget = maxRelayPeersValue div 3 let pm = PeerManager( switch: switch, @@ -674,24 +703,6 @@ proc reconnectPeers*( await pm.connectToNodes(@[peerInfo]) -proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = - ## Returns the peerIds of physical connections (in and out) - ## containing at least one stream with the given protocol. - - var inPeers: seq[PeerId] - var outPeers: seq[PeerId] - - for peerId, muxers in pm.switch.connManager.getConnections(): - for peerConn in muxers: - let streams = peerConn.getStreams() - if streams.anyIt(it.protocol == protocol): - if peerConn.connection.transportDir == Direction.In: - inPeers.add(peerId) - elif peerConn.connection.transportDir == Direction.Out: - outPeers.add(peerId) - - return (inPeers, outPeers) - proc getNumStreams*(pm: PeerManager, protocol: string): (int, int) = var numStreamsIn = 0 @@ -719,9 +730,7 @@ proc pruneInRelayConns(pm: PeerManager, amount: int) {.async.} = proc connectToRelayPeers*(pm: PeerManager) {.async.} = var (inRelayPeers, outRelayPeers) = pm.connectedPeers(WakuRelayCodec) - let maxConnections = pm.switch.connManager.inSema.size let totalRelayPeers = inRelayPeers.len + outRelayPeers.len - let inPeersTarget = maxConnections - pm.outRelayPeersTarget if inRelayPeers.len > pm.inRelayPeersTarget: await pm.pruneInRelayConns(inRelayPeers.len - pm.inRelayPeersTarget)