Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unrevert more of https://github.com/status-im/nimbus-eth2/pull/5765 #5834

Merged
merged 2 commits into from
Jan 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type
blobs*: Opt[BlobSidecars]
maybeFinalized*: bool
## The block source claims the block has been finalized already
resfut*: Future[Result[void, VerifierError]]
resfut*: Future[Result[void, VerifierError]].Raising([CancelledError])
queueTick*: Moment # Moment when block was enqueued
validationDur*: Duration # Time it took to perform gossip validation
src*: MsgSource
Expand Down Expand Up @@ -385,7 +385,7 @@ proc checkBloblessSignature(self: BlockProcessor,
proc enqueueBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars],
resfut: Future[Result[void, VerifierError]] = nil,
resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil,
maybeFinalized = false,
validationDur = Duration()) =
withBlck(blck):
Expand Down Expand Up @@ -756,7 +756,7 @@ proc storeBlock(
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized = false,
validationDur = Duration()): Future[Result[void, VerifierError]] =
validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for `resfut` to
Expand Down
21 changes: 12 additions & 9 deletions beacon_chain/networking/peer_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,15 @@ template outgoingEvent(eventType: EventType): AsyncEvent =
pool.outNotFullEvent

proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType,
filter: set[PeerType]) {.async.} =
filter: set[PeerType]) {.async: (raises: [CancelledError]).} =
if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}:
var fut1 = incomingEvent(eventType).wait()
var fut2 = outgoingEvent(eventType).wait()
try:
discard await one(fut1, fut2)
try:
discard await one(fut1, fut2)
except ValueError:
raiseAssert "one precondition satisfied"
if fut1.finished():
if not(fut2.finished()):
await fut2.cancelAndWait()
Expand All @@ -138,11 +141,11 @@ proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType,
outgoingEvent(eventType).clear()

proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): Future[void] =
filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} =
pool.waitForEvent(EventType.NotEmptyEvent, filter)

proc waitNotFullEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): Future[void] =
filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} =
pool.waitForEvent(EventType.NotFullEvent, filter)

proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
Expand Down Expand Up @@ -451,15 +454,15 @@ proc getPeerSpaceMask[A, B](pool: PeerPool[A, B],
{PeerType.Outgoing}

proc waitForEmptySpace*[A, B](pool: PeerPool[A, B],
peerType: PeerType) {.async.} =
peerType: PeerType) {.async: (raises: [CancelledError]).} =
## This procedure will block until ``pool`` will have an empty space for peer
## of type ``peerType``.
let mask = pool.getPeerSpaceMask(peerType)
while pool.lenSpace({peerType}) == 0:
await pool.waitNotFullEvent(mask)

proc addPeer*[A, B](pool: PeerPool[A, B],
peer: A, peerType: PeerType): Future[PeerStatus] {.async.} =
peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} =
## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``.
##
## This procedure will wait for an empty space in PeerPool ``pool``, if
Expand Down Expand Up @@ -533,7 +536,7 @@ proc acquireItemImpl[A, B](pool: PeerPool[A, B],

proc acquire*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming,
PeerType.Outgoing}): Future[A] {.async.} =
PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} =
## Acquire peer from PeerPool ``pool``, which match the filter ``filter``.
mixin getKey
doAssert(filter != {}, "Filter must not be empty")
Expand Down Expand Up @@ -586,7 +589,7 @@ proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} =
proc acquire*[A, B](pool: PeerPool[A, B],
number: int,
filter = {PeerType.Incoming,
PeerType.Outgoing}): Future[seq[A]] {.async.} =
PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} =
## Acquire ``number`` number of peers from PeerPool ``pool``, which match the
## filter ``filter``.
doAssert(filter != {}, "Filter must not be empty")
Expand Down Expand Up @@ -735,7 +738,7 @@ proc clear*[A, B](pool: PeerPool[A, B]) =
pool.acqIncPeersCount = 0
pool.acqOutPeersCount = 0

proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} =
proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} =
## Performs "safe" clear. Safe means that it first acquires all the peers
## in PeerPool, and only after that it will reset storage.
var acquired = newSeq[A]()
Expand Down
Loading