Skip to content

Commit

Permalink
unrevert more of #5765 (#5834)
Browse files Browse the repository at this point in the history
  • Loading branch information
tersec authored Jan 29, 2024
1 parent 225ef5e commit 3d7f634
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
6 changes: 3 additions & 3 deletions beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type
blobs*: Opt[BlobSidecars]
maybeFinalized*: bool
## The block source claims the block has been finalized already
resfut*: Future[Result[void, VerifierError]]
resfut*: Future[Result[void, VerifierError]].Raising([CancelledError])
queueTick*: Moment # Moment when block was enqueued
validationDur*: Duration # Time it took to perform gossip validation
src*: MsgSource
Expand Down Expand Up @@ -385,7 +385,7 @@ proc checkBloblessSignature(self: BlockProcessor,
proc enqueueBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars],
resfut: Future[Result[void, VerifierError]] = nil,
resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil,
maybeFinalized = false,
validationDur = Duration()) =
withBlck(blck):
Expand Down Expand Up @@ -756,7 +756,7 @@ proc storeBlock(
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized = false,
validationDur = Duration()): Future[Result[void, VerifierError]] =
validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for `resfut` to
Expand Down
21 changes: 12 additions & 9 deletions beacon_chain/networking/peer_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,15 @@ template outgoingEvent(eventType: EventType): AsyncEvent =
pool.outNotFullEvent

proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType,
filter: set[PeerType]) {.async.} =
filter: set[PeerType]) {.async: (raises: [CancelledError]).} =
if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}:
var fut1 = incomingEvent(eventType).wait()
var fut2 = outgoingEvent(eventType).wait()
try:
discard await one(fut1, fut2)
try:
discard await one(fut1, fut2)
except ValueError:
raiseAssert "one precondition satisfied"
if fut1.finished():
if not(fut2.finished()):
await fut2.cancelAndWait()
Expand All @@ -138,11 +141,11 @@ proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType,
outgoingEvent(eventType).clear()

proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): Future[void] =
filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} =
pool.waitForEvent(EventType.NotEmptyEvent, filter)

proc waitNotFullEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]): Future[void] =
filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} =
pool.waitForEvent(EventType.NotFullEvent, filter)

proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
Expand Down Expand Up @@ -451,15 +454,15 @@ proc getPeerSpaceMask[A, B](pool: PeerPool[A, B],
{PeerType.Outgoing}

proc waitForEmptySpace*[A, B](pool: PeerPool[A, B],
peerType: PeerType) {.async.} =
peerType: PeerType) {.async: (raises: [CancelledError]).} =
## This procedure will block until ``pool`` will have an empty space for peer
## of type ``peerType``.
let mask = pool.getPeerSpaceMask(peerType)
while pool.lenSpace({peerType}) == 0:
await pool.waitNotFullEvent(mask)

proc addPeer*[A, B](pool: PeerPool[A, B],
peer: A, peerType: PeerType): Future[PeerStatus] {.async.} =
peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} =
## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``.
##
## This procedure will wait for an empty space in PeerPool ``pool``, if
Expand Down Expand Up @@ -533,7 +536,7 @@ proc acquireItemImpl[A, B](pool: PeerPool[A, B],

proc acquire*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming,
PeerType.Outgoing}): Future[A] {.async.} =
PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} =
## Acquire peer from PeerPool ``pool``, which match the filter ``filter``.
mixin getKey
doAssert(filter != {}, "Filter must not be empty")
Expand Down Expand Up @@ -586,7 +589,7 @@ proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} =
proc acquire*[A, B](pool: PeerPool[A, B],
number: int,
filter = {PeerType.Incoming,
PeerType.Outgoing}): Future[seq[A]] {.async.} =
PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} =
## Acquire ``number`` number of peers from PeerPool ``pool``, which match the
## filter ``filter``.
doAssert(filter != {}, "Filter must not be empty")
Expand Down Expand Up @@ -735,7 +738,7 @@ proc clear*[A, B](pool: PeerPool[A, B]) =
pool.acqIncPeersCount = 0
pool.acqOutPeersCount = 0

proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} =
proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} =
## Performs "safe" clear. Safe means that it first acquires all the peers
## in PeerPool, and only after that it will reset storage.
var acquired = newSeq[A]()
Expand Down

0 comments on commit 3d7f634

Please sign in to comment.