Skip to content

Commit

Permalink
replace stale peer management with timestemp approch
Browse files Browse the repository at this point in the history
  • Loading branch information
darshankabariya committed Sep 23, 2024
1 parent d68b06f commit 731fe6c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 28 deletions.
48 changes: 25 additions & 23 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ const
# TODO: Make configurable
DefaultDialTimeout* = chronos.seconds(10)

# Max attempts before removing the peer
MaxFailedAttempts = 5

# Time to wait before attempting to dial again is calculated as:
# initialBackoffInSec*(backoffFactor^(failedAttempts-1))
# 120s, 480s, 1920, 7680s
Expand All @@ -71,13 +68,14 @@ const
# Max peers that we allow from the same IP
DefaultColocationLimit* = 5

Threshold = chronos.hours(2)

type PeerManager* = ref object of RootObj
switch*: Switch
peerStore*: PeerStore
wakuMetadata*: WakuMetadata
initialBackoffInSec*: int
backoffFactor*: int
maxFailedAttempts*: int
storage*: PeerStorage
serviceSlots*: Table[string, RemotePeerInfo]
maxRelayPeers*: int
Expand Down Expand Up @@ -184,9 +182,8 @@ proc connectRelay*(
if not pm.peerStore.hasPeer(peerId, WakuRelayCodec):
pm.addPeer(peer)

let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
trace "Connecting to relay peer",
wireAddr = peer.addrs, peerId = peerId, failedAttempts = failedAttempts
wireAddr = peer.addrs, peerId = peerId

var deadline = sleepAsync(dialTimeout)
let workfut = pm.switch.connect(peerId, peer.addrs)
Expand All @@ -208,20 +205,21 @@ proc connectRelay*(
waku_peers_dials.inc(labelValues = ["successful"])
waku_node_conns_initiated.inc(labelValues = [source])

pm.peerStore[NumberFailedConnBook][peerId] = 0

if pm.peerStore[FirstFailedConnBook].contains(peerId):
discard pm.peerStore[FirstFailedConnBook].del(peerId)

return true

# Dial failed
pm.peerStore[NumberFailedConnBook][peerId] =
pm.peerStore[NumberFailedConnBook][peerId] + 1
pm.peerStore[LastFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)
pm.peerStore[ConnectionBook][peerId] = CannotConnect
if not pm.peerStore[FirstFailedConnBook].contains(peerId):
pm.peerStore[FirstFailedConnBook][peerId] = Moment.init(getTime().toUnix, Second)

trace "Connecting relay peer failed",
peerId = peerId,
reason = reasonFailed,
failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]
reason = reasonFailed

waku_peers_dials.inc(labelValues = [reasonFailed])

return false
Expand Down Expand Up @@ -311,22 +309,25 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool =
# Returns if we can try to connect to this peer, based on past failed attempts
# It uses an exponential backoff. Each connection attempt makes us
# wait more before trying again.
let failedAttempts = pm.peerStore[NumberFailedConnBook][peerId]

# if it never errored, we can try to connect
if failedAttempts == 0:
if not pm.peerStore[FirstFailedConnBook].contains(peerId):
return true

# if there are too many failed attempts, do not reconnect
if failedAttempts >= pm.maxFailedAttempts:

# if it's break threshold then do not reconnect
let
disconnectTime = pm.peerStore[FirstFailedConnBook][peerId]
currentTime = Moment.init(getTime().toUnix, Second)

if (currentTime - disconnectTime) > Threshold:
return false

# If it errored we wait an exponential backoff from last connection
# the more failed attempts, the greater the backoff since last attempt
let now = Moment.init(getTime().toUnix, Second)
let lastFailed = pm.peerStore[LastFailedConnBook][peerId]
let backoff =
calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, failedAttempts)
calculateBackoff(pm.initialBackoffInSec, pm.backoffFactor, 5)

return now >= (lastFailed + backoff)

Expand Down Expand Up @@ -457,7 +458,6 @@ proc new*(
storage: PeerStorage = nil,
initialBackoffInSec = InitialBackoffInSec,
backoffFactor = BackoffFactor,
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = DefaultColocationLimit,
shardedPeerManagement = false,
): PeerManager {.gcsafe.} =
Expand Down Expand Up @@ -489,7 +489,7 @@ proc new*(
maxRelayPeersValue = maxConnections - (maxConnections div 5)

# attempt to calculate max backoff to prevent potential overflows or unreasonably high values
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, maxFailedAttempts)
let backoff = calculateBackoff(initialBackoffInSec, backoffFactor, 5)
if backoff.weeks() > 1:
error "Max backoff time can't be over 1 week", maxBackoff = backoff
raise newException(Defect, "Max backoff time can't be over 1 week")
Expand All @@ -506,7 +506,6 @@ proc new*(
outRelayPeersTarget: outRelayPeersTarget,
inRelayPeersTarget: maxRelayPeersValue - outRelayPeersTarget,
maxRelayPeers: maxRelayPeersValue,
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,
)
Expand Down Expand Up @@ -849,8 +848,11 @@ proc prunePeerStore*(pm: PeerManager) =
var peersToPrune: HashSet[PeerId]

# prune failed connections
for peerId, count in pm.peerStore[NumberFailedConnBook].book.pairs:
if count < pm.maxFailedAttempts:
for peerId in pm.peerStore[FirstFailedConnBook].book.keys:
let
disconnectTime = pm.peerStore[FirstFailedConnBook][peerId]
currentTime = Moment.init(getTime().toUnix, Second)
if (currentTime - disconnectTime) < Threshold:
continue

if peersToPrune.len >= pruningCount:
Expand Down
6 changes: 3 additions & 3 deletions waku/node/peer_manager/waku_peer_store.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type
# Last failed connection attemp timestamp
LastFailedConnBook* = ref object of PeerBook[Moment]

# Failed connection attempts
NumberFailedConnBook* = ref object of PeerBook[int]
# First failed connection attemp timestamp
FirstFailedConnBook* = ref object of PeerBook[Moment]

# Keeps track of when peers were disconnected in Unix timestamps
DisconnectBook* = ref object of PeerBook[int64]
Expand Down Expand Up @@ -67,7 +67,7 @@ proc get*(peerStore: PeerStore, peerId: PeerID): RemotePeerInfo =
origin: peerStore[SourceBook][peerId],
direction: peerStore[DirectionBook][peerId],
lastFailedConn: peerStore[LastFailedConnBook][peerId],
numberFailedConn: peerStore[NumberFailedConnBook][peerId],
firstFailedConn: peerStore[FirstFailedConnBook][peerId],
)

proc getWakuProtos*(peerStore: PeerStore): seq[string] =
Expand Down
5 changes: 3 additions & 2 deletions waku/waku_core/peers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ type RemotePeerInfo* = ref object
disconnectTime*: int64
origin*: PeerOrigin
direction*: PeerDirection
firstFailedConn*: Moment
lastFailedConn*: Moment
numberFailedConn*: int


func `$`*(remotePeerInfo: RemotePeerInfo): string =
$remotePeerInfo.peerId

Expand Down

0 comments on commit 731fe6c

Please sign in to comment.