Skip to content

Commit

Permalink
partially revert #5765 (#5833)
Browse files Browse the repository at this point in the history
  • Loading branch information
tersec authored Jan 28, 2024
1 parent 5c54eb5 commit 225ef5e
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 187 deletions.
6 changes: 3 additions & 3 deletions beacon_chain/gossip_processing/block_processor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type
blobs*: Opt[BlobSidecars]
maybeFinalized*: bool
## The block source claims the block has been finalized already
resfut*: Future[Result[void, VerifierError]].Raising([CancelledError])
resfut*: Future[Result[void, VerifierError]]
queueTick*: Moment # Moment when block was enqueued
validationDur*: Duration # Time it took to perform gossip validation
src*: MsgSource
Expand Down Expand Up @@ -385,7 +385,7 @@ proc checkBloblessSignature(self: BlockProcessor,
proc enqueueBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars],
resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil,
resfut: Future[Result[void, VerifierError]] = nil,
maybeFinalized = false,
validationDur = Duration()) =
withBlck(blck):
Expand Down Expand Up @@ -756,7 +756,7 @@ proc storeBlock(
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized = false,
validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
validationDur = Duration()): Future[Result[void, VerifierError]] =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for `resfut` to
Expand Down
21 changes: 9 additions & 12 deletions beacon_chain/networking/peer_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,12 @@ template outgoingEvent(eventType: EventType): AsyncEvent =
pool.outNotFullEvent

proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType,
filter: set[PeerType]) {.async: (raises: [CancelledError]).} =
filter: set[PeerType]) {.async.} =
if filter == {PeerType.Incoming, PeerType.Outgoing} or filter == {}:
var fut1 = incomingEvent(eventType).wait()
var fut2 = outgoingEvent(eventType).wait()
try:
try:
discard await one(fut1, fut2)
except ValueError:
raiseAssert "one precondition satisfied"
discard await one(fut1, fut2)
if fut1.finished():
if not(fut2.finished()):
await fut2.cancelAndWait()
Expand All @@ -141,11 +138,11 @@ proc waitForEvent[A, B](pool: PeerPool[A, B], eventType: EventType,
outgoingEvent(eventType).clear()

proc waitNotEmptyEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]) {.async: (raises: [CancelledError], raw: true).} =
filter: set[PeerType]): Future[void] =
pool.waitForEvent(EventType.NotEmptyEvent, filter)

proc waitNotFullEvent[A, B](pool: PeerPool[A, B],
filter: set[PeerType]){.async: (raises: [CancelledError], raw: true).} =
filter: set[PeerType]): Future[void] =
pool.waitForEvent(EventType.NotFullEvent, filter)

proc newPeerPool*[A, B](maxPeers = -1, maxIncomingPeers = -1,
Expand Down Expand Up @@ -454,15 +451,15 @@ proc getPeerSpaceMask[A, B](pool: PeerPool[A, B],
{PeerType.Outgoing}

proc waitForEmptySpace*[A, B](pool: PeerPool[A, B],
peerType: PeerType) {.async: (raises: [CancelledError]).} =
peerType: PeerType) {.async.} =
## This procedure will block until ``pool`` will have an empty space for peer
## of type ``peerType``.
let mask = pool.getPeerSpaceMask(peerType)
while pool.lenSpace({peerType}) == 0:
await pool.waitNotFullEvent(mask)

proc addPeer*[A, B](pool: PeerPool[A, B],
peer: A, peerType: PeerType): Future[PeerStatus] {.async: (raises: [CancelledError]).} =
peer: A, peerType: PeerType): Future[PeerStatus] {.async.} =
## Add peer ``peer`` of type ``peerType`` to PeerPool ``pool``.
##
## This procedure will wait for an empty space in PeerPool ``pool``, if
Expand Down Expand Up @@ -536,7 +533,7 @@ proc acquireItemImpl[A, B](pool: PeerPool[A, B],

proc acquire*[A, B](pool: PeerPool[A, B],
filter = {PeerType.Incoming,
PeerType.Outgoing}): Future[A] {.async: (raises: [CancelledError]).} =
PeerType.Outgoing}): Future[A] {.async.} =
## Acquire peer from PeerPool ``pool``, which match the filter ``filter``.
mixin getKey
doAssert(filter != {}, "Filter must not be empty")
Expand Down Expand Up @@ -589,7 +586,7 @@ proc release*[A, B](pool: PeerPool[A, B], peers: openArray[A]) {.inline.} =
proc acquire*[A, B](pool: PeerPool[A, B],
number: int,
filter = {PeerType.Incoming,
PeerType.Outgoing}): Future[seq[A]] {.async: (raises: [CancelledError]).} =
PeerType.Outgoing}): Future[seq[A]] {.async.} =
## Acquire ``number`` number of peers from PeerPool ``pool``, which match the
## filter ``filter``.
doAssert(filter != {}, "Filter must not be empty")
Expand Down Expand Up @@ -738,7 +735,7 @@ proc clear*[A, B](pool: PeerPool[A, B]) =
pool.acqIncPeersCount = 0
pool.acqOutPeersCount = 0

proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async: (raises: [CancelledError]).} =
proc clearSafe*[A, B](pool: PeerPool[A, B]) {.async.} =
## Performs "safe" clear. Safe means that it first acquires all the peers
## in PeerPool, and only after that it will reset storage.
var acquired = newSeq[A]()
Expand Down
18 changes: 11 additions & 7 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ proc initFullNode(
blobQuarantine, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
Future[Result[void, VerifierError]] =
# The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise
Expand All @@ -360,23 +360,27 @@ proc initFullNode(
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
Future[Result[void, VerifierError]] =
withBlck(signedBlock):
when consensusFork >= ConsensusFork.Deneb:
when typeof(forkyBlck).kind >= ConsensusFork.Deneb:
if not blobQuarantine[].hasBlobs(forkyBlck):
# We don't have all the blobs for this block, so we have
# to put it in blobless quarantine.
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
err(VerifierError.UnviableFork)
Future.completed(
Result[void, VerifierError].err(VerifierError.UnviableFork),
"rmanBlockVerifier")
else:
err(VerifierError.MissingParent)
Future.completed(
Result[void, VerifierError].err(VerifierError.MissingParent),
"rmanBlockVerifier")
else:
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs),
maybeFinalized = maybeFinalized)
else:
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(BlobSidecars),
maybeFinalized = maybeFinalized)

Expand Down
107 changes: 74 additions & 33 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const
type
BlockVerifierFn* =
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
Future[Result[void, VerifierError]] {.gcsafe, raises: [].}
InhibitFn* = proc: bool {.gcsafe, raises:[].}

RequestManager* = object
Expand All @@ -49,8 +49,8 @@ type
quarantine: ref Quarantine
blobQuarantine: ref BlobQuarantine
blockVerifier: BlockVerifierFn
blockLoopFuture: Future[void].Raising([CancelledError])
blobLoopFuture: Future[void].Raising([CancelledError])
blockLoopFuture: Future[void]
blobLoopFuture: Future[void]

func shortLog*(x: seq[Eth2Digest]): string =
"[" & x.mapIt(shortLog(it)).join(", ") & "]"
Expand Down Expand Up @@ -104,7 +104,7 @@ proc checkResponse(idList: seq[BlobIdentifier],
return false
true

proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} =
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.} =
var peer: Peer
try:
peer = await rman.network.peerPool.acquire()
Expand Down Expand Up @@ -171,21 +171,27 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async:
peer = peer, blocks = shortLog(items), err = blocks.error()
peer.updateScore(PeerScoreNoValues)

except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blocks by root", exc = exc.msg,
items = shortLog(items), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
rman.network.peerPool.release(peer)

proc fetchBlobsFromNetwork(self: RequestManager,
idList: seq[BlobIdentifier])
{.async: (raises: [CancelledError]).} =
idList: seq[BlobIdentifier]) {.async.} =
var peer: Peer

try:
peer = await self.network.peerPool.acquire()
debug "Requesting blobs by root", peer = peer, blobs = shortLog(idList),
peer_score = peer.getScore()

let blobs = await blobSidecarsByRoot(peer, BlobIdentifierList idList)
let blobs = (await blobSidecarsByRoot(peer, BlobIdentifierList idList))

if blobs.isOk:
let ublobs = blobs.get()
Expand Down Expand Up @@ -213,11 +219,18 @@ proc fetchBlobsFromNetwork(self: RequestManager,
peer = peer, blobs = shortLog(idList), err = blobs.error()
peer.updateScore(PeerScoreNoValues)

except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blobs by root", exc = exc.msg,
idList = shortLog(idList), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
self.network.peerPool.release(peer)

proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =
proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
Expand All @@ -232,19 +245,33 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledE
continue

debug "Requesting detected missing blocks", blocks = shortLog(blocks)
let start = SyncMoment.now(0)
try:
let start = SyncMoment.now(0)

var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
var workers: array[PARALLEL_REQUESTS, Future[void]]

for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.requestBlocksByRoot(blocks)
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.requestBlocksByRoot(blocks)

await allFutures(workers)
await allFutures(workers)

let finish = SyncMoment.now(uint64(len(blocks)))
let finish = SyncMoment.now(uint64(len(blocks)))

var succeed = 0
for worker in workers:
if worker.completed():
inc(succeed)

debug "Request manager block tick", blocks = shortLog(blocks),
succeed = succeed,
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)

except CancelledError:
break
except CatchableError as exc:
warn "Unexpected error in request manager block loop", exc = exc.msg

debug "Request manager block tick", blocks = shortLog(blocks),
sync_speed = speed(start, finish)

proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
let
Expand Down Expand Up @@ -281,28 +308,42 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
rman.quarantine[].removeBlobless(blobless)
fetches

proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =

proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
await sleepAsync(POLL_INTERVAL)
if rman.inhibit():
continue

let fetches = rman.getMissingBlobs()
if fetches.len > 0:
debug "Requesting detected missing blobs", blobs = shortLog(fetches)
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchBlobsFromNetwork(fetches)

await allFutures(workers)
let finish = SyncMoment.now(uint64(len(fetches)))

debug "Request manager blob tick",
blobs_count = len(fetches),
sync_speed = speed(start, finish)
let fetches = rman.getMissingBlobs()
if fetches.len > 0:
debug "Requesting detected missing blobs", blobs = shortLog(fetches)
try:
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void]]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchBlobsFromNetwork(fetches)

await allFutures(workers)
let finish = SyncMoment.now(uint64(len(fetches)))

var succeed = 0
for worker in workers:
if worker.finished() and not(worker.failed()):
inc(succeed)

debug "Request manager blob tick",
blobs_count = len(fetches),
succeed = succeed,
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)

except CancelledError:
break
except CatchableError as exc:
warn "Unexpected error in request manager blob loop", exc = exc.msg

proc start*(rman: var RequestManager) =
## Start Request Manager's loops.
Expand Down
Loading

0 comments on commit 225ef5e

Please sign in to comment.