diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index ab3d9070e8..d61b187549 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -53,10 +53,16 @@ type DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} + Ttlmessage* = object + msg*: seq[byte] + ttl*: Moment + RpcMessageQueue* = ref object sendPriorityQueue: Deque[Future[void]] - nonPriorityQueue: AsyncQueue[seq[byte]] + nonPriorityQueue: AsyncQueue[Ttlmessage] sendNonPriorityTask: Future[void] + # The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms. + maxDurationInNonPriorityQueue: Duration PubSubPeer* = ref object of RootObj getConn*: GetConn # callback to establish a new send connection @@ -289,7 +295,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { when defined(libp2p_expensive_metrics): libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) else: - await p.rpcmessagequeue.nonPriorityQueue.addLast(msg) + await p.rpcmessagequeue.nonPriorityQueue.addLast(Ttlmessage(msg: msg, ttl: Moment.now())) when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) trace "message queued", p, msg = shortLog(msg) @@ -373,10 +379,12 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = while p.rpcmessagequeue.sendPriorityQueue.len > 0: await p.rpcmessagequeue.sendPriorityQueue[0] p.clearSendPriorityQueue() - let msg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() + let ttlMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst() + if Moment.now() - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: + continue when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - await p.sendMsg(msg) + await p.sendMsg(ttlMsg.msg) proc startSendNonPriorityTask(p: PubSubPeer) = debug "starting sendNonPriorityTask", p @@ -394,10 +402,11 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) -proc new(T: typedesc[RpcMessageQueue]): T = +proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 500.milliseconds): T = return T( sendPriorityQueue: initDeque[Future[void]](), - nonPriorityQueue: newAsyncQueue[seq[byte]](), + nonPriorityQueue: newAsyncQueue[Ttlmessage](), + maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue, ) proc new*(