Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: drop msgs to be relayed waiting for too long in the queue #1017

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
enablePX: false,
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]),
disconnectPeerAboveRateLimit: false
disconnectPeerAboveRateLimit: false,
maxDurationInNonPriorityQueue: Opt.none(Duration),
)

proc validateParameters*(parameters: GossipSubParams): Result[void, cstring] =
Expand Down Expand Up @@ -220,6 +221,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
for topic, info in stats[].topicInfos.mpairs:
info.firstMessageDeliveries = 0

pubSubPeer.stopSendNonPriorityTask()

procCall FloodSub(g).unsubscribePeer(peer)

proc handleSubscribe*(g: GossipSub,
Expand Down Expand Up @@ -279,31 +282,40 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
respControl.prune.add(g.handleGraft(peer, control.graft))
let messages = g.handleIWant(peer, control.iwant)

if
respControl.prune.len > 0 or
respControl.iwant.len > 0 or
messages.len > 0:
# iwant and prunes from here, also messages
let
isPruneNotEmpty = respControl.prune.len > 0
isIWantNotEmpty = respControl.iwant.len > 0

if isPruneNotEmpty or isIWantNotEmpty:

if isIWantNotEmpty:
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)

if isPruneNotEmpty:
for prune in respControl.prune:
if g.knownTopics.contains(prune.topicId):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
g.send(
peer,
RPCMsg(control: some(respControl)), true)

if messages.len > 0:
for smsg in messages:
for topic in smsg.topicIds:
if g.knownTopics.contains(topic):
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
else:
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])

libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)

for prune in respControl.prune:
if g.knownTopics.contains(prune.topicId):
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
else:
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])

trace "sending control message", msg = shortLog(respControl), peer
# iwant replies have lower priority
trace "sending iwant reply messages", peer
g.send(
peer,
RPCMsg(control: some(respControl), messages: messages))
RPCMsg(messages: messages), false)

proc validateAndRelay(g: GossipSub,
msg: Message,
Expand Down Expand Up @@ -370,7 +382,7 @@ proc validateAndRelay(g: GossipSub,

# In theory, if topics are the same in all messages, we could batch - we'd
# also have to be careful to only include validated messages
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), false)
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
for topic in msg.topicIds:
if topic notin g.topics: continue
Expand Down Expand Up @@ -441,7 +453,7 @@ method rpcHandler*(g: GossipSub,
peer.recvObservers(rpcMsg)

if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
g.send(peer, RPCMsg(pong: rpcMsg.ping))
g.send(peer, RPCMsg(pong: rpcMsg.ping), true)
peer.pingBudget.dec
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
template sub: untyped = rpcMsg.subscriptions[i]
Expand Down Expand Up @@ -655,7 +667,7 @@ method publish*(g: GossipSub,

g.mcache.put(msgId, msg)

g.broadcast(peers, RPCMsg(messages: @[msg]))
g.broadcast(peers, RPCMsg(messages: @[msg]), true)

if g.knownTopics.contains(topic):
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])
Expand Down Expand Up @@ -740,4 +752,5 @@ method getOrCreatePeer*(
let peer = procCall PubSub(g).getOrCreatePeer(peerId, protos)
g.parameters.overheadRateLimit.withValue(overheadRateLimit):
peer.overheadRateLimitOpt = Opt.some(TokenBucket.new(overheadRateLimit.bytes, overheadRateLimit.interval))
peer.rpcmessagequeue.maxDurationInNonPriorityQueue = g.parameters.maxDurationInNonPriorityQueue
return peer
4 changes: 4 additions & 0 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ type
overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
disconnectPeerAboveRateLimit*: bool

# The maximum duration a message can stay in the non-priority queue. If it exceeds this duration, it will be discarded
# as soon as it is dequeued, instead of being sent to the remote peer. The default value is none, i.e., no maximum duration.
maxDurationInNonPriorityQueue*: Opt[Duration]

BackoffTable* = Table[string, Table[PeerId, Moment]]
ValidationSeenTable* = Table[MessageId, HashSet[PubSubPeer]]

Expand Down
11 changes: 6 additions & 5 deletions libp2p/protocols/pubsub/pubsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,18 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =

libp2p_pubsub_peers.set(p.peers.len.int64)

proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [].} =
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool = false) {.raises: [].} =
## Attempt to send `msg` to remote peer
##

trace "sending pubsub message to peer", peer, msg = shortLog(msg)
peer.send(msg, p.anonymize)
asyncSpawn peer.send(msg, p.anonymize, isHighPriority)

proc broadcast*(
p: PubSub,
sendPeers: auto, # Iteratble[PubSubPeer]
msg: RPCMsg) {.raises: [].} =
msg: RPCMsg,
isHighPriority: bool = false) {.raises: [].} =
## Attempt to send `msg` to the given peers

let npeers = sendPeers.len.int64
Expand Down Expand Up @@ -195,12 +196,12 @@ proc broadcast*(

if anyIt(sendPeers, it.hasObservers):
for peer in sendPeers:
p.send(peer, msg)
p.send(peer, msg, isHighPriority)
else:
# Fast path that only encodes message once
let encoded = encodeRpcMsg(msg, p.anonymize)
for peer in sendPeers:
asyncSpawn peer.sendEncoded(encoded)
asyncSpawn peer.sendEncoded(encoded, isHighPriority)

proc sendSubs*(p: PubSub,
peer: PubSubPeer,
Expand Down
138 changes: 113 additions & 25 deletions libp2p/protocols/pubsub/pubsubpeer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
declareCounter(libp2p_pubsub_skipped_received_messages, "number of received skipped messages", labels = ["id"])
declareCounter(libp2p_pubsub_skipped_sent_messages, "number of sent skipped messages", labels = ["id"])

declareGauge(libp2p_gossipsub_priority_queue_size, "the number of messages in the priority queue", labels = ["id"])
declareGauge(libp2p_gossipsub_non_priority_queue_size, "the number of messages in the non-priority queue", labels = ["id"])

declareCounter(libp2p_gossipsub_non_priority_msgs_dropped, "the number of dropped messages in the non-priority queue", labels = ["id"])


type
PeerRateLimitError* = object of CatchableError

Expand All @@ -49,6 +55,20 @@
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}

QueuedMessage* = object
msg*: seq[byte]
addedAt*: Moment

RpcMessageQueue* = ref object
# Tracks async tasks for sending high-priority peer-published messages.
sendPriorityQueue: Deque[Future[void]]
# Queue for lower-priority messages, like "IWANT" replies and relay messages.
nonPriorityQueue: AsyncQueue[QueuedMessage]
# Task for processing non-priority message queue.
sendNonPriorityTask: Future[void]
# The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms.
maxDurationInNonPriorityQueue*: Opt[Duration]

PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
onEvent*: OnEvent # Connectivity updates for peer
Expand All @@ -70,6 +90,8 @@
behaviourPenalty*: float64 # the eventual penalty score
overheadRateLimitOpt*: Opt[TokenBucket]

rpcmessagequeue*: RpcMessageQueue

RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}

Expand All @@ -82,6 +104,16 @@
#so we have to read the parents short agent..
p.sendConn.getWrapped().shortAgent

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"

func hash*(p: PubSubPeer): Hash =
p.peerId.hash

Expand Down Expand Up @@ -227,17 +259,13 @@
# metrics
libp2p_pubsub_sent_messages.inc(labelValues = [$p.peerId, t])

proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")

if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return

if msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return
proc clearSendPriorityQueue(p: PubSubPeer) =
while p.rpcmessagequeue.sendPriorityQueue.len > 0 and p.rpcmessagequeue.sendPriorityQueue[0].finished:
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.dec(labelValues = [$p.peerId])
discard p.rpcmessagequeue.sendPriorityQueue.popFirst()

proc sendMsg(p: PubSubPeer, msg: seq[byte]) {.async.} =
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
# complete this even if the sendConn setup failed
Expand All @@ -262,6 +290,30 @@

await conn.close() # This will clean up the send connection

proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {.async.} =
doAssert(not isNil(p), "pubsubpeer nil!")

if msg.len <= 0:
debug "empty message, skipping", p, msg = shortLog(msg)
return

if msg.len > p.maxMessageSize:
info "trying to send a msg too big for pubsub", maxSize=p.maxMessageSize, msgSize=msg.len
return

if isHighPriority:
p.clearSendPriorityQueue()
let f = p.sendMsg(msg)
if not f.finished:
p.rpcmessagequeue.sendPriorityQueue.addLast(f)
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.nonPriorityQueue.addLast(QueuedMessage(msg: msg, addedAt: Moment.now()))
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)

iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
## This iterator takes an `RPCMsg` and sequentially repackages its Messages into new `RPCMsg` instances.
## Each new `RPCMsg` accumulates Messages until reaching the specified `maxSize`. If a single Message
Expand Down Expand Up @@ -297,7 +349,7 @@
else:
trace "message too big to sent", peer, rpcMsg = shortLog(currentRPCMsg)

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool, isHighPriority: bool = false) {.async.} =
# When sending messages, we take care to re-encode them with the right
# anonymization flag to ensure that we're not penalized for sending invalid
# or malicious data on the wire - in particular, re-encoding protects against
Expand All @@ -317,11 +369,11 @@

if encoded.len > p.maxMessageSize and msg.messages.len > 1:
for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
asyncSpawn p.sendEncoded(encodedSplitMsg)
await p.sendEncoded(encodedSplitMsg, isHighPriority)
else:
# If the message size is within limits, send it as is
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
asyncSpawn p.sendEncoded(encoded)
await p.sendEncoded(encoded, isHighPriority)

proc canAskIWant*(p: PubSubPeer, msgId: MessageId): bool =
for sentIHave in p.sentIHaves.mitems():
Expand All @@ -330,14 +382,58 @@
return true
return false

proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
while true:
# we send non-priority messages only if there are no pending priority messages
let queuedMsg = await p.rpcmessagequeue.nonPriorityQueue.popFirst()
while p.rpcmessagequeue.sendPriorityQueue.len > 0:
p.clearSendPriorityQueue()
# this minimizes the number of times we have to wait for something (each wait = performance cost)
# we will never wait for a finished future and by waiting for the last one, all that come before it are guaranteed
# to be finished already (since sends are processed in order).
if p.rpcmessagequeue.sendPriorityQueue.len > 0:
await p.rpcmessagequeue.sendPriorityQueue[^1]
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
p.rpcmessagequeue.maxDurationInNonPriorityQueue.withValue(maxDurationInNonPriorityQueue):
if Moment.now() - queuedMsg.addedAt >= maxDurationInNonPriorityQueue:
when defined(libp2p_expensive_metrics):

Check warning on line 400 in libp2p/protocols/pubsub/pubsubpeer.nim

View check run for this annotation

Codecov / codecov/patch

libp2p/protocols/pubsub/pubsubpeer.nim#L400

Added line #L400 was not covered by tests
libp2p_gossipsub_non_priority_msgs_dropped.inc(labelValues = [$p.peerId])
continue
await p.sendMsg(queuedMsg.msg)

proc startSendNonPriorityTask(p: PubSubPeer) =
debug "starting sendNonPriorityTask", p
if p.rpcmessagequeue.sendNonPriorityTask.isNil:
p.rpcmessagequeue.sendNonPriorityTask = p.sendNonPriorityTask()

proc stopSendNonPriorityTask*(p: PubSubPeer) =
if not p.rpcmessagequeue.sendNonPriorityTask.isNil:
debug "stopping sendNonPriorityTask", p
p.rpcmessagequeue.sendNonPriorityTask.cancel()
p.rpcmessagequeue.sendNonPriorityTask = nil
p.rpcmessagequeue.sendPriorityQueue.clear()
p.rpcmessagequeue.nonPriorityQueue.clear()
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)

proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = Opt.none(Duration)): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[QueuedMessage](),
maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue,
)

proc new*(
T: typedesc[PubSubPeer],
peerId: PeerId,
getConn: GetConn,
onEvent: OnEvent,
codec: string,
maxMessageSize: int,
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket)): T =
overheadRateLimitOpt: Opt[TokenBucket] = Opt.none(TokenBucket),
maxDurationInNonPriorityQueue = Opt.none(Duration)): T =

result = T(
getConn: getConn,
Expand All @@ -346,17 +442,9 @@
peerId: peerId,
connectedFut: newFuture[void](),
maxMessageSize: maxMessageSize,
overheadRateLimitOpt: overheadRateLimitOpt
overheadRateLimitOpt: overheadRateLimitOpt,
rpcmessagequeue: RpcMessageQueue.new(maxDurationInNonPriorityQueue),
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId]))

proc getAgent*(peer: PubSubPeer): string =
return
when defined(libp2p_agents_metrics):
if peer.shortAgent.len > 0:
peer.shortAgent
else:
"unknown"
else:
"unknown"
result.startSendNonPriorityTask()
Loading
Loading