diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 53dd088e81..3cc34f1929 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -61,7 +61,7 @@ type blobs*: Opt[BlobSidecars] maybeFinalized*: bool ## The block source claims the block has been finalized already - resfut*: Future[Result[void, VerifierError]] + resfut*: Future[Result[void, VerifierError]].Raising([CancelledError]) queueTick*: Moment # Moment when block was enqueued validationDur*: Duration # Time it took to perform gossip validation src*: MsgSource @@ -385,7 +385,7 @@ proc checkBloblessSignature(self: BlockProcessor, proc enqueueBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], - resfut: Future[Result[void, VerifierError]] = nil, + resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil, maybeFinalized = false, validationDur = Duration()) = withBlck(blck): @@ -756,7 +756,7 @@ proc storeBlock( proc addBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized = false, - validationDur = Duration()): Future[Result[void, VerifierError]] = + validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = ## Enqueue a Gossip-validated block for consensus verification # Backpressure: # There is no backpressure here - producers must wait for `resfut` to diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 6bedebb892..cc21f05e1a 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -108,12 +108,15 @@ template outgoingEvent(eventType: EventType): AsyncEvent = pool.outNotFullEvent proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, - filter: set[PeerType]) {.async.} = + filter: set[PeerType]) {.async: (raises: [CancelledError]).} = if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}: var fut1 = incomingEvent(eventType).wait() var fut2 = outgoingEvent(eventType).wait() try: - discard await one(fut1, fut2) + try: + discard await one(fut1, fut2) + except ValueError: + raiseAssert "one precondition satisfied" if fut1.finished(): if not(fut2.finished()): await fut2.cancelAndWait() @@ -138,11 +141,11 @@ proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, outgoingEvent(eventType).clear() proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]): Future[void] = + filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} = pool.waitForEvent(EventType.NotEmptyEvent, filter) proc waitNotFullEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]): Future[void] = + filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} = pool.waitForEvent(EventType.NotFullEvent, filter) proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, @@ -451,7 +454,7 @@ proc getPeerSpaceMask[A, B](pool: PeerPool[A, B], {PeerType.Outgoing} proc waitForEmptySpace*[A, B](pool: PeerPool[A, B], - peerType: PeerType) {.async.} = + peerType: PeerType) {.async: (raises: [CancelledError]).} = ## This procedure will block until ``pool`` will have an empty space for peer ## of type ``peerType``. let mask = pool.getPeerSpaceMask(peerType) @@ -459,7 +462,7 @@ proc waitForEmptySpace*[A, B](pool: PeerPool[A, B], await pool.waitNotFullEvent(mask) proc addPeer*[A, B](pool: PeerPool[A, B], - peer: A, peerType: PeerType): Future[PeerStatus] {.async.} = + peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## This procedure will wait for an empty space in PeerPool ``pool``, if @@ -533,7 +536,7 @@ proc acquireItemImpl[A, B](pool: PeerPool[A, B], proc acquire*[A, B](pool: PeerPool[A, B], filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[A] {.async.} = + PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} = ## Acquire peer from PeerPool ``pool``, which match the filter ``filter``. mixin getKey doAssert(filter != {}, "Filter must not be empty") @@ -586,7 +589,7 @@ proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} = proc acquire*[A, B](pool: PeerPool[A, B], number: int, filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[seq[A]] {.async.} = + PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} = ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the ## filter ``filter``. doAssert(filter != {}, "Filter must not be empty") @@ -735,7 +738,7 @@ proc clear*[A, B](pool: PeerPool[A, B]) = pool.acqIncPeersCount = 0 pool.acqOutPeersCount = 0 -proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} = +proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} = ## Performs "safe" clear. Safe means that it first acquires all the peers ## in PeerPool, and only after that it will reset storage. var acquired = newSeq[A]()