Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Prevent concurrent IWANT of the same message (#943)" #977

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions libp2p/protocols/pubsub/gossipsub.nim
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ proc init*(_: type[GossipSubParams]): GossipSubParams =
disconnectBadPeers: false,
enablePX: false,
bandwidthEstimatebps: 100_000_000, # 100 Mbps or 12.5 MBps
iwantTimeout: 3 * GossipSubHeartbeatInterval,
overheadRateLimit: Opt.none(tuple[bytes: int, interval: Duration]),
disconnectPeerAboveRateLimit: false
)
Expand Down Expand Up @@ -461,9 +460,6 @@ method rpcHandler*(g: GossipSub,
let
msgId = msgIdResult.get
msgIdSalted = msgId & g.seenSalt
g.outstandingIWANTs.withValue(msgId, iwantRequest):
if iwantRequest.peer.peerId == peer.peerId:
g.outstandingIWANTs.del(msgId)

# addSeen adds salt to msgId to avoid
# remote attacking the hash function
Expand Down
16 changes: 1 addition & 15 deletions libp2p/protocols/pubsub/gossipsub/behavior.nim
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,7 @@ proc handleIHave*(g: GossipSub,
if not g.hasSeen(msgId):
if peer.iHaveBudget <= 0:
break
elif msgId notin res.messageIds and msgId notin g.outstandingIWANTs:
g.outstandingIWANTs[msgId] = IWANTRequest(messageId: msgId, peer: peer, timestamp: Moment.now())
elif msgId notin res.messageIds:
res.messageIds.add(msgId)
dec peer.iHaveBudget
trace "requested message via ihave", messageID=msgId
Expand Down Expand Up @@ -301,17 +300,6 @@ proc handleIWant*(g: GossipSub,
messages.add(msg)
return messages

proc checkIWANTTimeouts(g: GossipSub, timeoutDuration: Duration) {.raises: [].} =
let currentTime = Moment.now()
var idsToRemove = newSeq[MessageId]()
for msgId, request in g.outstandingIWANTs.pairs():
if currentTime - request.timestamp > timeoutDuration:
trace "IWANT request timed out", messageID=msgId, peer=request.peer
request.peer.behaviourPenalty += 0.1
idsToRemove.add(msgId)
for msgId in idsToRemove:
g.outstandingIWANTs.del(msgId)

proc commitMetrics(metrics: var MeshMetrics) {.raises: [].} =
libp2p_gossipsub_low_peers_topics.set(metrics.lowPeersTopics)
libp2p_gossipsub_no_peers_topics.set(metrics.noPeersTopics)
Expand Down Expand Up @@ -717,5 +705,3 @@ proc heartbeat*(g: GossipSub) {.async.} =
for trigger in g.heartbeatEvents:
trace "firing heartbeat event", instance = cast[int](g)
trigger.fire()

checkIWANTTimeouts(g, g.parameters.iwantTimeout)
7 changes: 0 additions & 7 deletions libp2p/protocols/pubsub/gossipsub/types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ type
enablePX*: bool

bandwidthEstimatebps*: int # This is currently used only for limting flood publishing. 0 disables flood-limiting completely
iwantTimeout*: Duration

overheadRateLimit*: Opt[tuple[bytes: int, interval: Duration]]
disconnectPeerAboveRateLimit*: bool
Expand Down Expand Up @@ -181,7 +180,6 @@ type
routingRecordsHandler*: seq[RoutingRecordsHandler] # Callback for peer exchange

heartbeatEvents*: seq[AsyncEvent]
outstandingIWANTs*: Table[MessageId, IWANTRequest]

MeshMetrics* = object
# scratch buffers for metrics
Expand All @@ -192,8 +190,3 @@ type
lowPeersTopics*: int64 # npeers < dlow
healthyPeersTopics*: int64 # npeers >= dlow
underDoutTopics*: int64

IWANTRequest* = object
messageId*: MessageId
peer*: PubSubPeer
timestamp*: Moment
98 changes: 0 additions & 98 deletions tests/pubsub/testgossipinternal.nim
Original file line number Diff line number Diff line change
Expand Up @@ -718,104 +718,6 @@ suite "GossipSub internal":
await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

asyncTest "two IHAVEs should generate only one IWANT":
let gossipSub = TestGossipSub.init(newStandardSwitch())

var iwantCount = 0

proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} =
check false

proc handler2(topic: string, data: seq[byte]) {.async.} = discard

let topic = "foobar"
var conns = newSeq[Connection]()
gossipSub.subscribe(topic, handler2)

# Setup two connections and two peers
var ihaveMessageId: string
var firstPeer: PubSubPeer
let seqno = @[0'u8, 1, 2, 3]
for i in 0..<2:
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
if isNil(firstPeer):
firstPeer = peer
ihaveMessageId = byteutils.toHex(seqno) & $firstPeer.peerId
peer.handler = handler

# Simulate that each peer sends an IHAVE message to our node
let msg = ControlIHave(
topicID: topic,
messageIDs: @[ihaveMessageId.toBytes()]
)
let iwants = gossipSub.handleIHave(peer, @[msg])
if iwants.messageIds.len > 0:
iwantCount += 1

# Verify that our node responds with only one IWANT message
check: iwantCount == 1
check: gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())

# Simulate that our node receives the RPCMsg in response to the IWANT
let actualMessageData = "Hello, World!".toBytes
let rpcMsg = RPCMsg(
messages: @[Message(
fromPeer: firstPeer.peerId,
seqno: seqno,
data: actualMessageData
)]
)
await gossipSub.rpcHandler(firstPeer, encodeRpcMsg(rpcMsg, false))

check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId.toBytes())

await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

asyncTest "handle unanswered IWANT messages":
let gossipSub = TestGossipSub.init(newStandardSwitch())
gossipSub.parameters.heartbeatInterval = 50.milliseconds
gossipSub.parameters.iwantTimeout = 10.milliseconds
await gossipSub.start()

proc handler(peer: PubSubPeer, data: seq[byte]) {.async.} = discard
proc handler2(topic: string, data: seq[byte]) {.async.} = discard

let topic = "foobar"
var conns = newSeq[Connection]()
gossipSub.subscribe(topic, handler2)

# Setup a connection and a peer
let conn = TestBufferStream.new(noop)
conns &= conn
let peerId = randomPeerId()
conn.peerId = peerId
let peer = gossipSub.getPubSubPeer(peerId)
peer.handler = handler

# Simulate that the peer sends an IHAVE message to our node
let ihaveMessageId = @[0'u8, 1, 2, 3]
let ihaveMsg = ControlIHave(
topicID: topic,
messageIDs: @[ihaveMessageId]
)
discard gossipSub.handleIHave(peer, @[ihaveMsg])

check: gossipSub.outstandingIWANTs.contains(ihaveMessageId)
check: peer.behaviourPenalty == 0.0

await sleepAsync(60.milliseconds)

check: not gossipSub.outstandingIWANTs.contains(ihaveMessageId)
check: peer.behaviourPenalty == 0.1

await allFuturesThrowing(conns.mapIt(it.close()))
await gossipSub.switch.stop()

proc setupTest(): Future[tuple[gossip0: GossipSub, gossip1: GossipSub, receivedMessages: ref HashSet[seq[byte]]]] {.async.} =
let
nodes = generateNodes(2, gossip = true, verifySignature = false)
Expand Down
Loading