Skip to content

Commit

Permalink
rename ttl property and message type
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Feb 13, 2024
1 parent f6775d2 commit 5d9478b
Showing 1 changed file with 6 additions and 6 deletions.
12 changes: 6 additions & 6 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ type
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}

Ttlmessage* = object
QueuedMessage* = object
msg*: seq[byte]
ttl*: Moment
addedAt*: Moment

RpcMessageQueue* = ref object
# Tracks async tasks for sending high-priority peer-published messages.
sendPriorityQueue: Deque[Future[void]]
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
nonPriorityQueue: AsyncQueue[seq[byte]]
nonPriorityQueue: AsyncQueue[QueuedMessage]
# Task for processing non-priority message queue.
sendNonPriorityTask: Future[void]
# The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms.
Expand Down Expand Up @@ -309,7 +309,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.nonPriorityQueue.addLast(Ttlmessage(msg: msg, ttl: Moment.now()))
await p.rpcmessagequeue.nonPriorityQueue.addLast(QueuedMessage(msg: msg, addedAt: Moment.now()))
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)
Expand Down Expand Up @@ -395,7 +395,7 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
await p.rpcmessagequeue.sendPriorityQueue[^1]
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
if Moment.now() - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue:
if Moment.now() - ttlMsg.addedAt >= p.rpcmessagequeue.maxDurationInNonPriorityQueue:
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId])
continue
Expand All @@ -420,7 +420,7 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 1.seconds): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[Ttlmessage](),
nonPriorityQueue: newAsyncQueue[QueuedMessage](),
maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue,
)

Expand Down

0 comments on commit 5d9478b

Please sign in to comment.