diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index 3cc34f1929..53dd088e81 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -61,7 +61,7 @@ type blobs*: Opt[BlobSidecars] maybeFinalized*: bool ## The block source claims the block has been finalized already - resfut*: Future[Result[void, VerifierError]].Raising([CancelledError]) + resfut*: Future[Result[void, VerifierError]] queueTick*: Moment # Moment when block was enqueued validationDur*: Duration # Time it took to perform gossip validation src*: MsgSource @@ -385,7 +385,7 @@ proc checkBloblessSignature(self: BlockProcessor, proc enqueueBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], - resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil, + resfut: Future[Result[void, VerifierError]] = nil, maybeFinalized = false, validationDur = Duration()) = withBlck(blck): @@ -756,7 +756,7 @@ proc storeBlock( proc addBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized = false, - validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = + validationDur = Duration()): Future[Result[void, VerifierError]] = ## Enqueue a Gossip-validated block for consensus verification # Backpressure: # There is no backpressure here - producers must wait for `resfut` to diff --git a/beacon_chain/networking/peer_pool.nim b/beacon_chain/networking/peer_pool.nim index cc21f05e1a..6bedebb892 100644 --- a/beacon_chain/networking/peer_pool.nim +++ b/beacon_chain/networking/peer_pool.nim @@ -108,15 +108,12 @@ template outgoingEvent(eventType: EventType): AsyncEvent = pool.outNotFullEvent proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, - filter: set[PeerType]) {.async: (raises: [CancelledError]).} = + filter: set[PeerType]) {.async.} = if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}: var fut1 = incomingEvent(eventType).wait() var fut2 = outgoingEvent(eventType).wait() try: - try: - discard await one(fut1, fut2) - except ValueError: - raiseAssert "one precondition satisfied" + discard await one(fut1, fut2) if fut1.finished(): if not(fut2.finished()): await fut2.cancelAndWait() @@ -141,11 +138,11 @@ proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType, outgoingEvent(eventType).clear() proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} = + filter: set[PeerType]): Future[void] = pool.waitForEvent(EventType.NotEmptyEvent, filter) proc waitNotFullEvent[A, B](pool: PeerPool[A, B], - filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} = + filter: set[PeerType]): Future[void] = pool.waitForEvent(EventType.NotFullEvent, filter) proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1, @@ -454,7 +451,7 @@ proc getPeerSpaceMask[A, B](pool: PeerPool[A, B], {PeerType.Outgoing} proc waitForEmptySpace*[A, B](pool: PeerPool[A, B], - peerType: PeerType) {.async: (raises: [CancelledError]).} = + peerType: PeerType) {.async.} = ## This procedure will block until ``pool`` will have an empty space for peer ## of type ``peerType``. let mask = pool.getPeerSpaceMask(peerType) @@ -462,7 +459,7 @@ proc waitForEmptySpace*[A, B](pool: PeerPool[A, B], await pool.waitNotFullEvent(mask) proc addPeer*[A, B](pool: PeerPool[A, B], - peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} = + peer: A, peerType: PeerType): Future[PeerStatus] {.async.} = ## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``. ## ## This procedure will wait for an empty space in PeerPool ``pool``, if @@ -536,7 +533,7 @@ proc acquireItemImpl[A, B](pool: PeerPool[A, B], proc acquire*[A, B](pool: PeerPool[A, B], filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} = + PeerType.Outgoing}): Future[A] {.async.} = ## Acquire peer from PeerPool ``pool``, which match the filter ``filter``. mixin getKey doAssert(filter != {}, "Filter must not be empty") @@ -589,7 +586,7 @@ proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} = proc acquire*[A, B](pool: PeerPool[A, B], number: int, filter = {PeerType.Incoming, - PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} = + PeerType.Outgoing}): Future[seq[A]] {.async.} = ## Acquire ``number`` number of peers from PeerPool ``pool``, which match the ## filter ``filter``. doAssert(filter != {}, "Filter must not be empty") @@ -738,7 +735,7 @@ proc clear*[A, B](pool: PeerPool[A, B]) = pool.acqIncPeersCount = 0 pool.acqOutPeersCount = 0 -proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} = +proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} = ## Performs "safe" clear. Safe means that it first acquires all the peers ## in PeerPool, and only after that it will reset storage. var acquired = newSeq[A]() diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index a82d4faecf..a2b668d427 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -351,7 +351,7 @@ proc initFullNode( blobQuarantine, getBeaconTime) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): - Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = + Future[Result[void, VerifierError]] = # The design with a callback for block verification is unusual compared # to the rest of the application, but fits with the general approach # taken in the sync/request managers - this is an architectural compromise @@ -360,23 +360,27 @@ proc initFullNode( MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): - Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = + Future[Result[void, VerifierError]] = withBlck(signedBlock): - when consensusFork >= ConsensusFork.Deneb: + when typeof(forkyBlck).kind >= ConsensusFork.Deneb: if not blobQuarantine[].hasBlobs(forkyBlck): # We don't have all the blobs for this block, so we have # to put it in blobless quarantine. if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck): - err(VerifierError.UnviableFork) + Future.completed( + Result[void, VerifierError].err(VerifierError.UnviableFork), + "rmanBlockVerifier") else: - err(VerifierError.MissingParent) + Future.completed( + Result[void, VerifierError].err(VerifierError.MissingParent), + "rmanBlockVerifier") else: let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) - await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.some(blobs), maybeFinalized = maybeFinalized) else: - await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.none(BlobSidecars), maybeFinalized = maybeFinalized) diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 1e57e47988..008bc5bd00 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -39,7 +39,7 @@ const type BlockVerifierFn* = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): - Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} + Future[Result[void, VerifierError]] {.gcsafe, raises: [].} InhibitFn* = proc: bool {.gcsafe, raises:[].} RequestManager* = object @@ -49,8 +49,8 @@ type quarantine: ref Quarantine blobQuarantine: ref BlobQuarantine blockVerifier: BlockVerifierFn - blockLoopFuture: Future[void].Raising([CancelledError]) - blobLoopFuture: Future[void].Raising([CancelledError]) + blockLoopFuture: Future[void] + blobLoopFuture: Future[void] func shortLog*(x: seq[Eth2Digest]): string = "[" & x.mapIt(shortLog(it)).join(", ") & "]" @@ -104,7 +104,7 @@ proc checkResponse(idList: seq[BlobIdentifier], return false true -proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} = +proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.} = var peer: Peer try: peer = await rman.network.peerPool.acquire() @@ -171,13 +171,19 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: peer = peer, blocks = shortLog(items), err = blocks.error() peer.updateScore(PeerScoreNoValues) + except CancelledError as exc: + raise exc + except CatchableError as exc: + peer.updateScore(PeerScoreNoValues) + debug "Error while fetching blocks by root", exc = exc.msg, + items = shortLog(items), peer = peer, peer_score = peer.getScore() + raise exc finally: if not(isNil(peer)): rman.network.peerPool.release(peer) proc fetchBlobsFromNetwork(self: RequestManager, - idList: seq[BlobIdentifier]) - {.async: (raises: [CancelledError]).} = + idList: seq[BlobIdentifier]) {.async.} = var peer: Peer try: @@ -185,7 +191,7 @@ proc fetchBlobsFromNetwork(self: RequestManager, debug "Requesting blobs by root", peer = peer, blobs = shortLog(idList), peer_score = peer.getScore() - let blobs = await blobSidecarsByRoot(peer, BlobIdentifierList idList) + let blobs = (await blobSidecarsByRoot(peer, BlobIdentifierList idList)) if blobs.isOk: let ublobs = blobs.get() @@ -213,11 +219,18 @@ proc fetchBlobsFromNetwork(self: RequestManager, peer = peer, blobs = shortLog(idList), err = blobs.error() peer.updateScore(PeerScoreNoValues) + except CancelledError as exc: + raise exc + except CatchableError as exc: + peer.updateScore(PeerScoreNoValues) + debug "Error while fetching blobs by root", exc = exc.msg, + idList = shortLog(idList), peer = peer, peer_score = peer.getScore() + raise exc finally: if not(isNil(peer)): self.network.peerPool.release(peer) -proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} = +proc requestManagerBlockLoop(rman: RequestManager) {.async.} = while true: # TODO This polling could be replaced with an AsyncEvent that is fired # from the quarantine when there's work to do @@ -232,19 +245,33 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledE continue debug "Requesting detected missing blocks", blocks = shortLog(blocks) - let start = SyncMoment.now(0) + try: + let start = SyncMoment.now(0) - var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])] + var workers: array[PARALLEL_REQUESTS, Future[void]] - for i in 0 ..< PARALLEL_REQUESTS: - workers[i] = rman.requestBlocksByRoot(blocks) + for i in 0 ..< PARALLEL_REQUESTS: + workers[i] = rman.requestBlocksByRoot(blocks) - await allFutures(workers) + await allFutures(workers) - let finish = SyncMoment.now(uint64(len(blocks))) + let finish = SyncMoment.now(uint64(len(blocks))) + + var succeed = 0 + for worker in workers: + if worker.completed(): + inc(succeed) + + debug "Request manager block tick", blocks = shortLog(blocks), + succeed = succeed, + failed = (len(workers) - succeed), + sync_speed = speed(start, finish) + + except CancelledError: + break + except CatchableError as exc: + warn "Unexpected error in request manager block loop", exc = exc.msg - debug "Request manager block tick", blocks = shortLog(blocks), - sync_speed = speed(start, finish) proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] = let @@ -281,28 +308,42 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] = rman.quarantine[].removeBlobless(blobless) fetches -proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} = + +proc requestManagerBlobLoop(rman: RequestManager) {.async.} = while true: - # TODO This polling could be replaced with an AsyncEvent that is fired - # from the quarantine when there's work to do + # TODO This polling could be replaced with an AsyncEvent that is fired + # from the quarantine when there's work to do await sleepAsync(POLL_INTERVAL) if rman.inhibit(): continue - let fetches = rman.getMissingBlobs() - if fetches.len > 0: - debug "Requesting detected missing blobs", blobs = shortLog(fetches) - let start = SyncMoment.now(0) - var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])] - for i in 0 ..< PARALLEL_REQUESTS: - workers[i] = rman.fetchBlobsFromNetwork(fetches) - - await allFutures(workers) - let finish = SyncMoment.now(uint64(len(fetches))) - - debug "Request manager blob tick", - blobs_count = len(fetches), - sync_speed = speed(start, finish) + let fetches = rman.getMissingBlobs() + if fetches.len > 0: + debug "Requesting detected missing blobs", blobs = shortLog(fetches) + try: + let start = SyncMoment.now(0) + var workers: array[PARALLEL_REQUESTS, Future[void]] + for i in 0 ..< PARALLEL_REQUESTS: + workers[i] = rman.fetchBlobsFromNetwork(fetches) + + await allFutures(workers) + let finish = SyncMoment.now(uint64(len(fetches))) + + var succeed = 0 + for worker in workers: + if worker.finished() and not(worker.failed()): + inc(succeed) + + debug "Request manager blob tick", + blobs_count = len(fetches), + succeed = succeed, + failed = (len(workers) - succeed), + sync_speed = speed(start, finish) + + except CancelledError: + break + except CatchableError as exc: + warn "Unexpected error in request manager blob loop", exc = exc.msg proc start*(rman: var RequestManager) = ## Start Request Manager's loops. diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 6e66b99da9..4e231eb138 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -43,7 +43,7 @@ type NoMonitor SyncWorker*[A, B] = object - future: Future[void].Raising([CancelledError]) + future: Future[void] status: SyncWorkerStatus SyncManager*[A, B] = ref object @@ -158,9 +158,8 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], res.initQueue() res -proc getBlocks[A, B](man: SyncManager[A, B], peer: A, - req: SyncRequest): Future[BeaconBlocksRes] {. - async: (raises: [CancelledError], raw: true).} = +proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, + req: SyncRequest): Future[BeaconBlocksRes] {.async.} = mixin getScore, `==` logScope: @@ -172,8 +171,21 @@ proc getBlocks[A, B](man: SyncManager[A, B], peer: A, doAssert(not(req.isEmpty()), "Request must not be empty!") debug "Requesting blocks from peer", request = req - - beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64) + try: + let res = await beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64) + + if res.isErr(): + debug "Error, while reading getBlocks response", request = req, + error = $res.error() + return + return res + except CancelledError: + debug "Interrupt, while waiting getBlocks response", request = req + return + except CatchableError as exc: + debug "Error, while waiting getBlocks response", request = req, + errName = exc.name, errMsg = exc.msg + return proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool = let wallEpoch = man.getLocalWallSlot().epoch @@ -182,8 +194,8 @@ proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool = e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, - req: SyncRequest): Future[BlobSidecarsRes] - {.async: (raises: [CancelledError], raw: true).} = + req: SyncRequest + ): Future[BlobSidecarsRes] {.async.} = mixin getScore, `==` logScope: @@ -195,7 +207,21 @@ proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, doAssert(not(req.isEmpty()), "Request must not be empty!") debug "Requesting blobs sidecars from peer", request = req - blobSidecarsByRange(peer, req.slot, req.count) + try: + let res = await blobSidecarsByRange(peer, req.slot, req.count) + + if res.isErr(): + debug "Error, while reading blobSidecarsByRange response", request = req, + error = $res.error() + return + return res + except CancelledError: + debug "Interrupt, while waiting blobSidecarsByRange response", request = req + return + except CatchableError as exc: + debug "Error, while waiting blobSidecarsByRange response", request = req, + errName = exc.name, errMsg = exc.msg + return proc remainingSlots(man: SyncManager): uint64 = let @@ -256,8 +282,7 @@ func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] = ? blob_sidecar[].verify_blob_sidecar_inclusion_proof() ok() -proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) - {.async: (raises: [CancelledError]).} = +proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = logScope: peer_score = peer.getScore() peer_speed = peer.netKbps() @@ -297,11 +322,17 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) trace "Updating peer's status information", wall_clock_slot = wallSlot, remote_head_slot = peerSlot, local_head_slot = headSlot - if not await peer.updateStatus(): - peer.updateScore(PeerScoreNoStatus) - debug "Failed to get remote peer's status, exiting", - peer_head_slot = peerSlot + try: + let res = await peer.updateStatus() + if not(res): + peer.updateScore(PeerScoreNoStatus) + debug "Failed to get remote peer's status, exiting", + peer_head_slot = peerSlot + return + except CatchableError as exc: + debug "Unexpected exception while updating peer's status", + peer_head_slot = peerSlot, errName = exc.name, errMsg = exc.msg return let newPeerSlot = peer.getHeadSlot() @@ -388,97 +419,110 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) man.workers[index].status = SyncWorkerStatus.Downloading - let blocks = (await man.getBlocks(peer, req)).valueOr: - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Failed to receive blocks on request", request = req - return - - let blockSmap = getShortMap(req, blocks.asSeq()) - debug "Received blocks on request", blocks_count = len(blocks), - blocks_map = blockSmap, request = req - - let slots = mapIt(blocks, it[].slot) - if not(checkResponse(req, slots)): - peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Received blocks sequence is not in requested range", - blocks_count = len(blocks), blocks_map = blockSmap, - request = req - return - - func combine(acc: seq[Slot], cur: Slot): seq[Slot] = - var copy = acc - if copy[copy.len-1] != cur: - copy.add(cur) - copy - - let blobData = - if man.shouldGetBlobs(req.slot.epoch): - let blobs = (await man.getBlobSidecars(peer, req)).valueOr: - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - debug "Failed to receive blobs on request", request = req - return - let blobSmap = getShortMap(req, blobs.asSeq()) - debug "Received blobs on request", blobs_count = len(blobs), - blobs_map = blobSmap, request = req - - if len(blobs) > 0: - let slots = mapIt(blobs, it[].signed_block_header.message.slot) - let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) - if not(checkResponse(req, uniqueSlots)): + try: + let blocks = await man.getBlocks(peer, req) + if blocks.isErr(): + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + debug "Failed to receive blocks on request", request = req + return + let blockData = blocks.get().asSeq() + let blockSmap = getShortMap(req, blockData) + debug "Received blocks on request", blocks_count = len(blockData), + blocks_map = blockSmap, request = req + + let slots = mapIt(blockData, it[].slot) + if not(checkResponse(req, slots)): + peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received blocks sequence is not in requested range", + blocks_count = len(blockData), blocks_map = blockSmap, + request = req + return + + func combine(acc: seq[Slot], cur: Slot): seq[Slot] = + var copy = acc + if copy[copy.len-1] != cur: + copy.add(cur) + copy + + let blobData = + if man.shouldGetBlobs(req.slot.epoch): + let blobs = await man.getBlobSidecars(peer, req) + if blobs.isErr(): + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + debug "Failed to receive blobs on request", request = req + return + let blobData = blobs.get().asSeq() + let blobSmap = getShortMap(req, blobData) + debug "Received blobs on request", blobs_count = len(blobData), + blobs_map = blobSmap, request = req + + if len(blobData) > 0: + let slots = mapIt(blobData, it[].signed_block_header.message.slot) + let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]]) + if not(checkResponse(req, uniqueSlots)): + peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received blobs sequence is not in requested range", + blobs_count = len(blobData), blobs_map = getShortMap(req, blobData), + request = req + return + let groupedBlobs = groupBlobs(req, blockData, blobData) + if groupedBlobs.isErr(): + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + info "Received blobs sequence is inconsistent", + blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error() + return + if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr): peer.updateScore(PeerScoreBadResponse) man.queue.push(req) - warn "Received blobs sequence is not in requested range", - blobs_count = len(blobs), blobs_map = blobSmap, - request = req + warn "Received blobs sequence is invalid", + blobs_count = len(blobData), + blobs_map = getShortMap(req, blobData), + request = req, + msg = checkRes.error return - let groupedBlobs = groupBlobs(req, blocks.asSeq(), blobs.asSeq()) - if groupedBlobs.isErr(): - peer.updateScore(PeerScoreNoValues) - man.queue.push(req) - info "Received blobs sequence is inconsistent", - blobs_map = blobSmap, request = req, msg=groupedBlobs.error() - return - if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr): - peer.updateScore(PeerScoreBadResponse) - man.queue.push(req) - warn "Received blobs sequence is invalid", - blobs_map = blobSmap, request = req, msg=groupedBlobs.error() - return - Opt.some(groupedBlobs.get()) - else: - Opt.none(seq[BlobSidecars]) + Opt.some(groupedBlobs.get()) + else: + Opt.none(seq[BlobSidecars]) + + if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and + req.contains(man.getSafeSlot()): + # The sync protocol does not distinguish between: + # - All requested slots are empty + # - Peer does not have data available about requested range + # + # However, we include the `backfill` slot in backward sync requests. + # If we receive an empty response to a request covering that slot, + # we know that the response is incomplete and can descore. + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + debug "Response does not include known-to-exist block", request = req + return + + # Scoring will happen in `syncUpdate`. + man.workers[index].status = SyncWorkerStatus.Queueing + let + peerFinalized = peer.getFinalizedEpoch().start_slot() + lastSlot = req.slot + req.count + # The peer claims the block is finalized - our own block processing will + # verify this point down the line + # TODO descore peers that lie + maybeFinalized = lastSlot < peerFinalized - if len(blocks) == 0 and man.direction == SyncQueueKind.Backward and - req.contains(man.getSafeSlot()): - # The sync protocol does not distinguish between: - # - All requested slots are empty - # - Peer does not have data available about requested range - # - # However, we include the `backfill` slot in backward sync requests. - # If we receive an empty response to a request covering that slot, - # we know that the response is incomplete and can descore. - peer.updateScore(PeerScoreNoValues) + await man.queue.push(req, blockData, blobData, maybeFinalized, proc() = + man.workers[index].status = SyncWorkerStatus.Processing) + + except CatchableError as exc: man.queue.push(req) - debug "Response does not include known-to-exist block", request = req + debug "Unexpected exception while receiving blocks", request = req, + errName = exc.name, errMsg = exc.msg return - # Scoring will happen in `syncUpdate`. - man.workers[index].status = SyncWorkerStatus.Queueing - let - peerFinalized = peer.getFinalizedEpoch().start_slot() - lastSlot = req.slot + req.count - # The peer claims the block is finalized - our own block processing will - # verify this point down the line - # TODO descore peers that lie - maybeFinalized = lastSlot < peerFinalized - - await man.queue.push(req, blocks.asSeq(), blobData, maybeFinalized, proc() = - man.workers[index].status = SyncWorkerStatus.Processing) - -proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} = +proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = mixin getKey, getScore, getHeadSlot logScope: @@ -489,21 +533,30 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [Can debug "Starting syncing worker" - var peer: A = nil - - try: - while true: - man.workers[index].status = SyncWorkerStatus.Sleeping - # This event is going to be set until we are not in sync with network - await man.notInSyncEvent.wait() - man.workers[index].status = SyncWorkerStatus.WaitingPeer - peer = await man.pool.acquire() - await man.syncStep(index, peer) - man.pool.release(peer) - peer = nil - finally: - if not(isNil(peer)): - man.pool.release(peer) + while true: + var peer: A = nil + let doBreak = + try: + man.workers[index].status = SyncWorkerStatus.Sleeping + # This event is going to be set until we are not in sync with network + await man.notInSyncEvent.wait() + man.workers[index].status = SyncWorkerStatus.WaitingPeer + peer = await man.pool.acquire() + await man.syncStep(index, peer) + man.pool.release(peer) + false + except CancelledError: + if not(isNil(peer)): + man.pool.release(peer) + true + except CatchableError as exc: + debug "Unexpected exception in sync worker", + peer = peer, peer_score = peer.getScore(), + peer_speed = peer.netKbps(), + errName = exc.name, errMsg = exc.msg + true + if doBreak: + break debug "Sync worker stopped" @@ -540,10 +593,34 @@ proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string, map[i] = ch (map, sleeping, waiting, pending) -proc startWorkers[A, B](man: SyncManager[A, B]) = +proc guardTask[A, B](man: SyncManager[A, B]) {.async.} = + logScope: + index = index + sync_ident = man.ident + direction = man.direction + topics = "syncman" + + var pending: array[SyncWorkersCount, Future[void]] + # Starting all the synchronization workers. for i in 0 ..< len(man.workers): - man.workers[i].future = syncWorker[A, B](man, i) + let future = syncWorker[A, B](man, i) + man.workers[i].future = future + pending[i] = future + + # Wait for synchronization worker's failure and replace it with new one. + while true: + let failFuture = await one(pending) + let index = pending.find(failFuture) + if failFuture.failed(): + warn "Synchronization worker stopped working unexpectedly with an error", + errName = failFuture.error.name, errMsg = failFuture.error.msg + else: + warn "Synchronization worker stopped working unexpectedly without error" + + let future = syncWorker[A, B](man, index) + man.workers[index].future = future + pending[index] = future proc toTimeLeftString*(d: Duration): string = if d == InfiniteDuration: @@ -571,9 +648,11 @@ proc toTimeLeftString*(d: Duration): string = res = res & "00m" res -proc syncClose[A, B](man: SyncManager[A, B], +proc syncClose[A, B](man: SyncManager[A, B], guardTaskFut: Future[void], speedTaskFut: Future[void]) {.async.} = var pending: seq[FutureBase] + if not(guardTaskFut.finished()): + pending.add(guardTaskFut.cancelAndWait()) if not(speedTaskFut.finished()): pending.add(speedTaskFut.cancelAndWait()) for worker in man.workers: @@ -590,11 +669,11 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = mixin getKey, getScore var pauseTime = 0 - man.startWorkers() + var guardTaskFut = man.guardTask() debug "Synchronization loop started" - proc averageSpeedTask() {.async: (raises: [CancelledError]).} = + proc averageSpeedTask() {.async.} = while true: # Reset sync speeds between each loss-of-sync event man.avgSyncSpeed = 0 @@ -624,7 +703,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = stamp = newStamp - let averageSpeedTaskFut = averageSpeedTask() + var averageSpeedTaskFut = averageSpeedTask() while true: let wallSlot = man.getLocalWallSlot() @@ -709,7 +788,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = of SyncQueueKind.Forward: if man.inProgress: if SyncManagerFlag.NoMonitor in man.flags: - await man.syncClose(averageSpeedTaskFut) + await man.syncClose(guardTaskFut, averageSpeedTaskFut) man.inProgress = false debug "Forward synchronization process finished, exiting", wall_head_slot = wallSlot, local_head_slot = headSlot, @@ -730,8 +809,10 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} = of SyncQueueKind.Backward: # Backward syncing is going to be executed only once, so we exit loop # and stop all pending tasks which belongs to this instance (sync - # workers, speed calculation task). - await man.syncClose(averageSpeedTaskFut) + # workers, guard task and speed calculation task). + # We first need to cancel and wait for guard task, because otherwise + # it will be able to restore cancelled workers. + await man.syncClose(guardTaskFut, averageSpeedTaskFut) man.inProgress = false debug "Backward synchronization process finished, exiting", wall_head_slot = wallSlot, local_head_slot = headSlot, diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index 66d7bbc1b1..22d5100b86 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -27,7 +27,7 @@ type ProcessingCallback* = proc() {.gcsafe, raises: [].} BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): - Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} + Future[Result[void, VerifierError]] {.gcsafe, raises: [].} SyncQueueKind* {.pure.} = enum Forward, Backward @@ -50,7 +50,7 @@ type item*: T SyncWaiter* = ref object - future: Future[void].Raising([CancelledError]) + future: Future[void] reset: bool RewindPoint = object @@ -311,9 +311,9 @@ proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) = if not(item.future.finished()): item.future.complete() -proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async: (raises: [CancelledError]).} = +proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} = ## Create new waiter and wait for completion from `wakeupWaiters()`. - let waitfut = Future[void].Raising([CancelledError]).init("SyncQueue.waitForChanges") + var waitfut = newFuture[void]("SyncQueue.waitForChanges") let waititem = SyncWaiter(future: waitfut) sq.waiters.add(waititem) try: @@ -322,7 +322,7 @@ proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async: (raises: [Cancel finally: sq.waiters.delete(sq.waiters.find(waititem)) -proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async: (raises: [CancelledError]).} = +proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} = ## This procedure will perform wakeupWaiters(true) and blocks until last ## waiter will be awakened. var waitChanges = sq.waitForChanges() @@ -333,7 +333,7 @@ proc clearAndWakeup*[T](sq: SyncQueue[T]) = sq.pending.clear() sq.wakeupWaiters(true) -proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async: (raises: [CancelledError]).} = +proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} = ## Perform reset of all the blocked waiters in SyncQueue. ## ## We adding one more waiter to the waiters sequence and @@ -610,7 +610,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], data: seq[ref ForkedSignedBeaconBlock], blobs: Opt[seq[BlobSidecars]], maybeFinalized: bool = false, - processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} = + processingCb: ProcessingCallback = nil) {.async.} = logScope: sync_ident = sq.ident topics = "syncman" diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index d68db308dc..bdbde9f2b9 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -50,8 +50,8 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier = # the BlockProcessor and this test proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], maybeFinalized: bool): - Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = - let fut = Future[Result[void, VerifierError]].Raising([CancelledError]).init() + Future[Result[void, VerifierError]] = + let fut = newFuture[Result[void, VerifierError]]() try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut)) except CatchableError as exc: raiseAssert exc.msg return fut