From 1165cb66e281c0fd3543f0336945ec7cd1b2b494 Mon Sep 17 00:00:00 2001 From: tersec Date: Mon, 29 Jan 2024 05:09:45 +0100 Subject: [PATCH 1/2] unrevert more of https://github.com/status-im/nimbus-eth2/pull/5765 --- .../gossip_processing/block_processor.nim | 6 +++--- beacon_chain/networking/peer_pool.nim | 21 +++++++++++-------- tests/test_sync_manager.nim | 4 ++-- 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 53dd088e81..3cc34f1929 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -61,7 +61,7 @@ type blobs*: Opt[BlobSidecars] maybeFinalized*: bool ## The block source claims the block has been finalized already - resfut*: Future[Result[void, VerifierError]] + resfut*: Future[Result[void, VerifierError]].Raising([CancelledError]) queueTick*: Moment # Moment when block was enqueued validationDur*: Duration # Time it took to perform gossip validation src*: MsgSource @@ -385,7 +385,7 @@ proc checkBloblessSignature(self: BlockProcessor, proc enqueueBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], - resfut: Future[Result[void, VerifierError]] = nil, + resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil, maybeFinalized = false, validationDur = Duration()) = withBlck(blck): @@ -756,7 +756,7 @@ proc storeBlock( proc addBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized = false, - validationDur = Duration()): Future[Result[void, VerifierError]] = + validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = ## Enqueue a Gossip-validated block for consensus verification # Backpressure: # There is no backpressure here - producers must wait for `resfut` to diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index 6bedebb892..cc21f05e1a 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -108,12 +108,15 @@ template outgoingEvent(eventType: EventType): AsyncEvent = pool.outNotFullEvent proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, - filter: set[PeerType]) {.async.} = + filter: set[PeerType]) {.async: (raises: [CancelledError]).} = if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}: var fut1 = incomingEvent(eventType).wait() var fut2 = outgoingEvent(eventType).wait() try: - discard await one(fut1, fut2) + try: + discard await one(fut1, fut2) + except ValueError: + raiseAssert "one precondition satisfied" if fut1.finished(): if not(fut2.finished()): await fut2.cancelAndWait() @@ -138,11 +141,11 @@ proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, outgoingEvent(eventType).clear() proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]): Future[void] = + filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} = pool.waitForEvent(EventType.NotEmptyEvent, filter) proc waitNotFullEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]): Future[void] = + filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} = pool.waitForEvent(EventType.NotFullEvent, filter) proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, @@ -451,7 +454,7 @@ proc getPeerSpaceMask[A, B](pool: PeerPool[A, B], {PeerType.Outgoing} proc waitForEmptySpace*[A, B](pool: PeerPool[A, B], - peerType: PeerType) {.async.} = + peerType: PeerType) {.async: (raises: [CancelledError]).} = ## This procedure will block until ``pool`` will have an empty space for peer ## of type ``peerType``. let mask = pool.getPeerSpaceMask(peerType) @@ -459,7 +462,7 @@ proc waitForEmptySpace*[A, B](pool: PeerPool[A, B], await pool.waitNotFullEvent(mask) proc addPeer*[A, B](pool: PeerPool[A, B], - peer: A, peerType: PeerType): Future[PeerStatus] {.async.} = + peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## This procedure will wait for an empty space in PeerPool ``pool``, if @@ -533,7 +536,7 @@ proc acquireItemImpl[A, B](pool: PeerPool[A, B], proc acquire*[A, B](pool: PeerPool[A, B], filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[A] {.async.} = + PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} = ## Acquire peer from PeerPool ``pool``, which match the filter ``filter``. mixin getKey doAssert(filter != {}, "Filter must not be empty") @@ -586,7 +589,7 @@ proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} = proc acquire*[A, B](pool: PeerPool[A, B], number: int, filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[seq[A]] {.async.} = + PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} = ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the ## filter ``filter``. doAssert(filter != {}, "Filter must not be empty") @@ -735,7 +738,7 @@ proc clear*[A, B](pool: PeerPool[A, B]) = pool.acqIncPeersCount = 0 pool.acqOutPeersCount = 0 -proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} = +proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} = ## Performs "safe" clear. Safe means that it first acquires all the peers ## in PeerPool, and only after that it will reset storage. var acquired = newSeq[A]() diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index bdbde9f2b9..d68db308dc 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -50,8 +50,8 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier = # the BlockProcessor and this test proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): - Future[Result[void, VerifierError]] = - let fut = newFuture[Result[void, VerifierError]]() + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = + let fut = Future[Result[void, VerifierError]].Raising([CancelledError]).init() try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut)) except CatchableError as exc: raiseAssert exc.msg return fut From 6366112d120f087e47b41bed6eae863cad8b9114 Mon Sep 17 00:00:00 2001 From: tersec Date: Mon, 29 Jan 2024 06:40:30 +0100 Subject: [PATCH 2/2] re-revert tests/test_sync_manager.nim --- tests/test_sync_manager.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index d68db308dc..bdbde9f2b9 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -50,8 +50,8 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier = # the BlockProcessor and this test proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): - Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = - let fut = Future[Result[void, VerifierError]].Raising([CancelledError]).init() + Future[Result[void, VerifierError]] = + let fut = newFuture[Result[void, VerifierError]]() try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut)) except CatchableError as exc: raiseAssert exc.msg return fut