diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 66d2ff8660..97a8596fc3 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -352,7 +352,7 @@ proc initFullNode( blobQuarantine, getBeaconTime) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): - Future[Result[void, VerifierError]] = + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = # The design with a callback for block verification is unusual compared # to the rest of the application, but fits with the general approach # taken in the sync/request managers - this is an architectural compromise @@ -361,27 +361,23 @@ proc initFullNode( MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): - Future[Result[void, VerifierError]] = + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = withBlck(signedBlock): - when typeof(forkyBlck).kind >= ConsensusFork.Deneb: + when consensusFork >= ConsensusFork.Deneb: if not blobQuarantine[].hasBlobs(forkyBlck): # We don't have all the blobs for this block, so we have # to put it in blobless quarantine. if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck): - Future.completed( - Result[void, VerifierError].err(VerifierError.UnviableFork), - "rmanBlockVerifier") + err(VerifierError.UnviableFork) else: - Future.completed( - Result[void, VerifierError].err(VerifierError.MissingParent), - "rmanBlockVerifier") + err(VerifierError.MissingParent) else: let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) - blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.some(blobs), maybeFinalized = maybeFinalized) else: - blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.none(BlobSidecars), maybeFinalized = maybeFinalized) diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 008bc5bd00..d03aac8739 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -39,7 +39,7 @@ const type BlockVerifierFn* = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): - Future[Result[void, VerifierError]] {.gcsafe, raises: [].} + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} InhibitFn* = proc: bool {.gcsafe, raises:[].} RequestManager* = object @@ -49,8 +49,8 @@ type quarantine: ref Quarantine blobQuarantine: ref BlobQuarantine blockVerifier: BlockVerifierFn - blockLoopFuture: Future[void] - blobLoopFuture: Future[void] + blockLoopFuture: Future[void].Raising([CancelledError]) + blobLoopFuture: Future[void].Raising([CancelledError]) func shortLog*(x: seq[Eth2Digest]): string = "[" & x.mapIt(shortLog(it)).join(", ") & "]" @@ -104,7 +104,7 @@ proc checkResponse(idList: seq[BlobIdentifier], return false true -proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.} = +proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} = var peer: Peer try: peer = await rman.network.peerPool.acquire() @@ -171,19 +171,13 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.} peer = peer, blocks = shortLog(items), err = blocks.error() peer.updateScore(PeerScoreNoValues) - except CancelledError as exc: - raise exc - except CatchableError as exc: - peer.updateScore(PeerScoreNoValues) - debug "Error while fetching blocks by root", exc = exc.msg, - items = shortLog(items), peer = peer, peer_score = peer.getScore() - raise exc finally: if not(isNil(peer)): rman.network.peerPool.release(peer) proc fetchBlobsFromNetwork(self: RequestManager, - idList: seq[BlobIdentifier]) {.async.} = + idList: seq[BlobIdentifier]) + {.async: (raises: [CancelledError]).} = var peer: Peer try: @@ -191,7 +185,7 @@ proc fetchBlobsFromNetwork(self: RequestManager, debug "Requesting blobs by root", peer = peer, blobs = shortLog(idList), peer_score = peer.getScore() - let blobs = (await blobSidecarsByRoot(peer, BlobIdentifierList idList)) + let blobs = await blobSidecarsByRoot(peer, BlobIdentifierList idList) if blobs.isOk: let ublobs = blobs.get() @@ -219,18 +213,11 @@ proc fetchBlobsFromNetwork(self: RequestManager, peer = peer, blobs = shortLog(idList), err = blobs.error() peer.updateScore(PeerScoreNoValues) - except CancelledError as exc: - raise exc - except CatchableError as exc: - peer.updateScore(PeerScoreNoValues) - debug "Error while fetching blobs by root", exc = exc.msg, - idList = shortLog(idList), peer = peer, peer_score = peer.getScore() - raise exc finally: if not(isNil(peer)): self.network.peerPool.release(peer) -proc requestManagerBlockLoop(rman: RequestManager) {.async.} = +proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} = while true: # TODO This polling could be replaced with an AsyncEvent that is fired # from the quarantine when there's work to do @@ -245,33 +232,19 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async.} = continue debug "Requesting detected missing blocks", blocks = shortLog(blocks) - try: - let start = SyncMoment.now(0) - - var workers: array[PARALLEL_REQUESTS, Future[void]] + let start = SyncMoment.now(0) - for i in 0 ..< PARALLEL_REQUESTS: - workers[i] = rman.requestBlocksByRoot(blocks) + var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])] - await allFutures(workers) + for i in 0 ..< PARALLEL_REQUESTS: + workers[i] = rman.requestBlocksByRoot(blocks) - let finish = SyncMoment.now(uint64(len(blocks))) + await allFutures(workers) - var succeed = 0 - for worker in workers: - if worker.completed(): - inc(succeed) - - debug "Request manager block tick", blocks = shortLog(blocks), - succeed = succeed, - failed = (len(workers) - succeed), - sync_speed = speed(start, finish) - - except CancelledError: - break - except CatchableError as exc: - warn "Unexpected error in request manager block loop", exc = exc.msg + let finish = SyncMoment.now(uint64(len(blocks))) + debug "Request manager block tick", blocks = shortLog(blocks), + sync_speed = speed(start, finish) proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] = let @@ -308,11 +281,10 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] = rman.quarantine[].removeBlobless(blobless) fetches - -proc requestManagerBlobLoop(rman: RequestManager) {.async.} = +proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} = while true: - # TODO This polling could be replaced with an AsyncEvent that is fired - # from the quarantine when there's work to do + # TODO This polling could be replaced with an AsyncEvent that is fired + # from the quarantine when there's work to do await sleepAsync(POLL_INTERVAL) if rman.inhibit(): continue @@ -320,30 +292,17 @@ proc requestManagerBlobLoop(rman: RequestManager) {.async.} = let fetches = rman.getMissingBlobs() if fetches.len > 0: debug "Requesting detected missing blobs", blobs = shortLog(fetches) - try: - let start = SyncMoment.now(0) - var workers: array[PARALLEL_REQUESTS, Future[void]] - for i in 0 ..< PARALLEL_REQUESTS: - workers[i] = rman.fetchBlobsFromNetwork(fetches) - - await allFutures(workers) - let finish = SyncMoment.now(uint64(len(fetches))) - - var succeed = 0 - for worker in workers: - if worker.finished() and not(worker.failed()): - inc(succeed) - - debug "Request manager blob tick", - blobs_count = len(fetches), - succeed = succeed, - failed = (len(workers) - succeed), - sync_speed = speed(start, finish) - - except CancelledError: - break - except CatchableError as exc: - warn "Unexpected error in request manager blob loop", exc = exc.msg + let start = SyncMoment.now(0) + var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])] + for i in 0 ..< PARALLEL_REQUESTS: + workers[i] = rman.fetchBlobsFromNetwork(fetches) + + await allFutures(workers) + let finish = SyncMoment.now(uint64(len(fetches))) + + debug "Request manager blob tick", + blobs_count = len(fetches), + sync_speed = speed(start, finish) proc start*(rman: var RequestManager) = ## Start Request Manager's loops. diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 4e231eb138..dc1f162053 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -43,7 +43,7 @@ type NoMonitor SyncWorker*[A, B] = object - future: Future[void] + future: Future[void].Raising([CancelledError]) status: SyncWorkerStatus SyncManager*[A, B] = ref object @@ -158,8 +158,9 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], res.initQueue() res -proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, - req: SyncRequest): Future[BeaconBlocksRes] {.async.} = +proc getBlocks[A, B](man: SyncManager[A, B], peer: A, + req: SyncRequest): Future[BeaconBlocksRes] {. + async: (raises: [CancelledError], raw: true).} = mixin getScore, `==` logScope: @@ -171,21 +172,8 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, doAssert(not(req.isEmpty()), "Request must not be empty!") debug "Requesting blocks from peer", request = req - try: - let res = await beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64) - - if res.isErr(): - debug "Error, while reading getBlocks response", request = req, - error = $res.error() - return - return res - except CancelledError: - debug "Interrupt, while waiting getBlocks response", request = req - return - except CatchableError as exc: - debug "Error, while waiting getBlocks response", request = req, - errName = exc.name, errMsg = exc.msg - return + + beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64) proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool = let wallEpoch = man.getLocalWallSlot().epoch @@ -194,8 +182,8 @@ proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool = e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, - req: SyncRequest - ): Future[BlobSidecarsRes] {.async.} = + req: SyncRequest): Future[BlobSidecarsRes] + {.async: (raises: [CancelledError], raw: true).} = mixin getScore, `==` logScope: @@ -207,21 +195,7 @@ proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, doAssert(not(req.isEmpty()), "Request must not be empty!") debug "Requesting blobs sidecars from peer", request = req - try: - let res = await blobSidecarsByRange(peer, req.slot, req.count) - - if res.isErr(): - debug "Error, while reading blobSidecarsByRange response", request = req, - error = $res.error() - return - return res - except CancelledError: - debug "Interrupt, while waiting blobSidecarsByRange response", request = req - return - except CatchableError as exc: - debug "Error, while waiting blobSidecarsByRange response", request = req, - errName = exc.name, errMsg = exc.msg - return + blobSidecarsByRange(peer, req.slot, req.count) proc remainingSlots(man: SyncManager): uint64 = let @@ -282,7 +256,8 @@ func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] = ? blob_sidecar[].verify_blob_sidecar_inclusion_proof() ok() -proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = +proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) + {.async: (raises: [CancelledError]).} = logScope: peer_score = peer.getScore() peer_speed = peer.netKbps() @@ -322,17 +297,11 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = trace "Updating peer's status information", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_head_slot = headSlot - try: - let res = await peer.updateStatus() - if not(res): - peer.updateScore(PeerScoreNoStatus) - debug "Failed to get remote peer's status, exiting", - peer_head_slot = peerSlot + if not(await peer.updateStatus()): + peer.updateScore(PeerScoreNoStatus) + debug "Failed to get remote peer's status, exiting", + peer_head_slot = peerSlot - return - except CatchableError as exc: - debug "Unexpected exception while updating peer's status", - peer_head_slot = peerSlot, errName = exc.name, errMsg = exc.msg return let newPeerSlot = peer.getHeadSlot() @@ -419,110 +388,103 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = man.workers[index].status = SyncWorkerStatus.Downloading - try: - let blocks = await man.getBlocks(peer, req) - if blocks.isErr(): - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Failed to receive blocks on request", request = req - return - let blockData = blocks.get().asSeq() - let blockSmap = getShortMap(req, blockData) - debug "Received blocks on request", blocks_count = len(blockData), - blocks_map = blockSmap, request = req - - let slots = mapIt(blockData, it[].slot) - if not(checkResponse(req, slots)): - peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Received blocks sequence is not in requested range", - blocks_count = len(blockData), blocks_map = blockSmap, - request = req - return - - func combine(acc: seq[Slot], cur: Slot): seq[Slot] = - var copy = acc - if copy[copy.len-1] != cur: - copy.add(cur) - copy - - let blobData = - if man.shouldGetBlobs(req.slot.epoch): - let blobs = await man.getBlobSidecars(peer, req) - if blobs.isErr(): - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Failed to receive blobs on request", request = req - return - let blobData = blobs.get().asSeq() - let blobSmap = getShortMap(req, blobData) - debug "Received blobs on request", blobs_count = len(blobData), - blobs_map = blobSmap, request = req - - if len(blobData) > 0: - let slots = mapIt(blobData, it[].signed_block_header.message.slot) - let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) - if not(checkResponse(req, uniqueSlots)): - peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Received blobs sequence is not in requested range", - blobs_count = len(blobData), blobs_map = getShortMap(req, blobData), - request = req - return - let groupedBlobs = groupBlobs(req, blockData, blobData) - if groupedBlobs.isErr(): - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - info "Received blobs sequence is inconsistent", - blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error() - return - if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr): + let blocks = await man.getBlocks(peer, req) + if blocks.isErr(): + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + debug "Failed to receive blocks on request", request = req + return + let blockData = blocks.get().asSeq() + let blockSmap = getShortMap(req, blockData) + debug "Received blocks on request", blocks_count = len(blockData), + blocks_map = blockSmap, request = req + + let slots = mapIt(blockData, it[].slot) + if not(checkResponse(req, slots)): + peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received blocks sequence is not in requested range", + blocks_count = len(blockData), blocks_map = blockSmap, + request = req + return + + func combine(acc: seq[Slot], cur: Slot): seq[Slot] = + var copy = acc + if copy[copy.len-1] != cur: + copy.add(cur) + copy + + let blobData = + if man.shouldGetBlobs(req.slot.epoch): + let blobs = await man.getBlobSidecars(peer, req) + if blobs.isErr(): + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + debug "Failed to receive blobs on request", request = req + return + let blobData = blobs.get().asSeq() + let blobSmap = getShortMap(req, blobData) + debug "Received blobs on request", blobs_count = len(blobData), + blobs_map = blobSmap, request = req + + if len(blobData) > 0: + let slots = mapIt(blobData, it[].signed_block_header.message.slot) + let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) + if not(checkResponse(req, uniqueSlots)): peer.updateScore(PeerScoreBadResponse) man.queue.push(req) - warn "Received blobs sequence is invalid", - blobs_count = len(blobData), - blobs_map = getShortMap(req, blobData), - request = req, - msg = checkRes.error + warn "Received blobs sequence is not in requested range", + blobs_count = len(blobData), blobs_map = getShortMap(req, blobData), + request = req return - Opt.some(groupedBlobs.get()) - else: - Opt.none(seq[BlobSidecars]) - - if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and - req.contains(man.getSafeSlot()): - # The sync protocol does not distinguish between: - # - All requested slots are empty - # - Peer does not have data available about requested range - # - # However, we include the `backfill` slot in backward sync requests. - # If we receive an empty response to a request covering that slot, - # we know that the response is incomplete and can descore. - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Response does not include known-to-exist block", request = req - return - - # Scoring will happen in `syncUpdate`. - man.workers[index].status = SyncWorkerStatus.Queueing - let - peerFinalized = peer.getFinalizedEpoch().start_slot() - lastSlot = req.slot + req.count - # The peer claims the block is finalized - our own block processing will - # verify this point down the line - # TODO descore peers that lie - maybeFinalized = lastSlot < peerFinalized - - await man.queue.push(req, blockData, blobData, maybeFinalized, proc() = - man.workers[index].status = SyncWorkerStatus.Processing) + let groupedBlobs = groupBlobs(req, blockData, blobData) + if groupedBlobs.isErr(): + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + info "Received blobs sequence is inconsistent", + blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error() + return + if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr): + peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received blobs sequence is invalid", + blobs_count = len(blobData), + blobs_map = getShortMap(req, blobData), + request = req, + msg = checkRes.error + return + Opt.some(groupedBlobs.get()) + else: + Opt.none(seq[BlobSidecars]) - except CatchableError as exc: + if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and + req.contains(man.getSafeSlot()): + # The sync protocol does not distinguish between: + # - All requested slots are empty + # - Peer does not have data available about requested range + # + # However, we include the `backfill` slot in backward sync requests. + # If we receive an empty response to a request covering that slot, + # we know that the response is incomplete and can descore. + peer.updateScore(PeerScoreNoValues) man.queue.push(req) - debug "Unexpected exception while receiving blocks", request = req, - errName = exc.name, errMsg = exc.msg + debug "Response does not include known-to-exist block", request = req return -proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = + # Scoring will happen in `syncUpdate`. + man.workers[index].status = SyncWorkerStatus.Queueing + let + peerFinalized = peer.getFinalizedEpoch().start_slot() + lastSlot = req.slot + req.count + # The peer claims the block is finalized - our own block processing will + # verify this point down the line + # TODO descore peers that lie + maybeFinalized = lastSlot < peerFinalized + + await man.queue.push(req, blockData, blobData, maybeFinalized, proc() = + man.workers[index].status = SyncWorkerStatus.Processing) + +proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} = mixin getKey, getScore, getHeadSlot logScope: @@ -533,30 +495,21 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = debug "Starting syncing worker" - while true: - var peer: A = nil - let doBreak = - try: - man.workers[index].status = SyncWorkerStatus.Sleeping - # This event is going to be set until we are not in sync with network - await man.notInSyncEvent.wait() - man.workers[index].status = SyncWorkerStatus.WaitingPeer - peer = await man.pool.acquire() - await man.syncStep(index, peer) - man.pool.release(peer) - false - except CancelledError: - if not(isNil(peer)): - man.pool.release(peer) - true - except CatchableError as exc: - debug "Unexpected exception in sync worker", - peer = peer, peer_score = peer.getScore(), - peer_speed = peer.netKbps(), - errName = exc.name, errMsg = exc.msg - true - if doBreak: - break + var peer: A = nil + + try: + while true: + man.workers[index].status = SyncWorkerStatus.Sleeping + # This event is going to be set until we are not in sync with network + await man.notInSyncEvent.wait() + man.workers[index].status = SyncWorkerStatus.WaitingPeer + peer = await man.pool.acquire() + await man.syncStep(index, peer) + man.pool.release(peer) + peer = nil + finally: + if not(isNil(peer)): + man.pool.release(peer) debug "Sync worker stopped" @@ -593,34 +546,10 @@ proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string, map[i] = ch (map, sleeping, waiting, pending) -proc guardTask[A, B](man: SyncManager[A, B]) {.async.} = - logScope: - index = index - sync_ident = man.ident - direction = man.direction - topics = "syncman" - - var pending: array[SyncWorkersCount, Future[void]] - +proc startWorkers[A, B](man: SyncManager[A, B]) = # Starting all the synchronization workers. for i in 0 ..< len(man.workers): - let future = syncWorker[A, B](man, i) - man.workers[i].future = future - pending[i] = future - - # Wait for synchronization worker's failure and replace it with new one. - while true: - let failFuture = await one(pending) - let index = pending.find(failFuture) - if failFuture.failed(): - warn "Synchronization worker stopped working unexpectedly with an error", - errName = failFuture.error.name, errMsg = failFuture.error.msg - else: - warn "Synchronization worker stopped working unexpectedly without error" - - let future = syncWorker[A, B](man, index) - man.workers[index].future = future - pending[index] = future + man.workers[i].future = syncWorker[A, B](man, i) proc toTimeLeftString*(d: Duration): string = if d == InfiniteDuration: @@ -648,11 +577,9 @@ proc toTimeLeftString*(d: Duration): string = res = res & "00m" res -proc syncClose[A, B](man: SyncManager[A, B], guardTaskFut: Future[void], +proc syncClose[A, B](man: SyncManager[A, B], speedTaskFut: Future[void]) {.async.} = var pending: seq[FutureBase] - if not(guardTaskFut.finished()): - pending.add(guardTaskFut.cancelAndWait()) if not(speedTaskFut.finished()): pending.add(speedTaskFut.cancelAndWait()) for worker in man.workers: @@ -669,11 +596,11 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = mixin getKey, getScore var pauseTime = 0 - var guardTaskFut = man.guardTask() + man.startWorkers() debug "Synchronization loop started" - proc averageSpeedTask() {.async.} = + proc averageSpeedTask() {.async: (raises: [CancelledError]).} = while true: # Reset sync speeds between each loss-of-sync event man.avgSyncSpeed = 0 @@ -703,7 +630,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = stamp = newStamp - var averageSpeedTaskFut = averageSpeedTask() + let averageSpeedTaskFut = averageSpeedTask() while true: let wallSlot = man.getLocalWallSlot() @@ -788,7 +715,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = of SyncQueueKind.Forward: if man.inProgress: if SyncManagerFlag.NoMonitor in man.flags: - await man.syncClose(guardTaskFut, averageSpeedTaskFut) + await man.syncClose(averageSpeedTaskFut) man.inProgress = false debug "Forward synchronization process finished, exiting", wall_head_slot = wallSlot, local_head_slot = headSlot, @@ -809,10 +736,8 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = of SyncQueueKind.Backward: # Backward syncing is going to be executed only once, so we exit loop # and stop all pending tasks which belongs to this instance (sync - # workers, guard task and speed calculation task). - # We first need to cancel and wait for guard task, because otherwise - # it will be able to restore cancelled workers. - await man.syncClose(guardTaskFut, averageSpeedTaskFut) + # workers, speed calculation task). + await man.syncClose(averageSpeedTaskFut) man.inProgress = false debug "Backward synchronization process finished, exiting", wall_head_slot = wallSlot, local_head_slot = headSlot, diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index 22d5100b86..66d7bbc1b1 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -27,7 +27,7 @@ type ProcessingCallback* = proc() {.gcsafe, raises: [].} BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): - Future[Result[void, VerifierError]] {.gcsafe, raises: [].} + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} SyncQueueKind* {.pure.} = enum Forward, Backward @@ -50,7 +50,7 @@ type item*: T SyncWaiter* = ref object - future: Future[void] + future: Future[void].Raising([CancelledError]) reset: bool RewindPoint = object @@ -311,9 +311,9 @@ proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) = if not(item.future.finished()): item.future.complete() -proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} = +proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async: (raises: [CancelledError]).} = ## Create new waiter and wait for completion from `wakeupWaiters()`. - var waitfut = newFuture[void]("SyncQueue.waitForChanges") + let waitfut = Future[void].Raising([CancelledError]).init("SyncQueue.waitForChanges") let waititem = SyncWaiter(future: waitfut) sq.waiters.add(waititem) try: @@ -322,7 +322,7 @@ proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} = finally: sq.waiters.delete(sq.waiters.find(waititem)) -proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} = +proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async: (raises: [CancelledError]).} = ## This procedure will perform wakeupWaiters(true) and blocks until last ## waiter will be awakened. var waitChanges = sq.waitForChanges() @@ -333,7 +333,7 @@ proc clearAndWakeup*[T](sq: SyncQueue[T]) = sq.pending.clear() sq.wakeupWaiters(true) -proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} = +proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async: (raises: [CancelledError]).} = ## Perform reset of all the blocked waiters in SyncQueue. ## ## We adding one more waiter to the waiters sequence and @@ -610,7 +610,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], data: seq[ref ForkedSignedBeaconBlock], blobs: Opt[seq[BlobSidecars]], maybeFinalized: bool = false, - processingCb: ProcessingCallback = nil) {.async.} = + processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} = logScope: sync_ident = sq.ident topics = "syncman" diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index bdbde9f2b9..d68db308dc 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -50,8 +50,8 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier = # the BlockProcessor and this test proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): - Future[Result[void, VerifierError]] = - let fut = newFuture[Result[void, VerifierError]]() + Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = + let fut = Future[Result[void, VerifierError]].Raising([CancelledError]).init() try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut)) except CatchableError as exc: raiseAssert exc.msg return fut