Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: out connections leak #3077

Merged
merged 13 commits into from
Oct 3, 2024
10 changes: 6 additions & 4 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -404,17 +404,19 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
asyncSpawn(pm.switch.disconnect(peerId))
pm.wakuPeerStore.delete(peerId)

proc connectedPeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## containing at least one stream with the given protocol.
proc connectedPeers*(
pm: PeerManager, protocol: string = ""
): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## If a protocol is specified, only returns peers with at least one stream of that protocol

var inPeers: seq[PeerId]
var outPeers: seq[PeerId]

for peerId, muxers in pm.switch.connManager.getConnections():
for peerConn in muxers:
let streams = peerConn.getStreams()
if streams.anyIt(it.protocol == protocol):
if protocol.len == 0 or streams.anyIt(it.protocol == protocol):
if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out:
Expand Down
13 changes: 8 additions & 5 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1240,16 +1240,19 @@ proc mountLibp2pPing*(node: WakuNode) {.async: (raises: []).} =
# TODO: Move this logic to PeerManager
proc keepaliveLoop(node: WakuNode, keepalive: chronos.Duration) {.async.} =
while node.started:
# Keep all connected peers alive while running
# Keep connected peers alive while running
# Each node is responsible of keeping its outgoing connections alive
trace "Running keepalive"

# First get a list of connected peer infos
let peers =
node.peerManager.wakuPeerStore.peers().filterIt(it.connectedness == Connected)
let outPeers = node.peerManager.connectedPeers()[1]

for peer in peers:
for peerId in outPeers:
try:
let conn = await node.switch.dial(peer.peerId, peer.addrs, PingCodec)
info "calling keepAlive dial", peerId = peerId
let conn = (await node.peerManager.dialPeer(peerId, PingCodec)).valueOr:
warn "Failed dialing peer for keep alive", peerId = peerId
continue
let pingDelay = await node.libp2pPing.ping(conn)
await conn.close()
except CatchableError as exc:
Expand Down
Loading