Skip to content

Commit

Permalink
unrevert rest of #5765 (#5867)
Browse files Browse the repository at this point in the history
* unrevert rest of #5765

* rm stray e2store docs changes

* reduce diff

* fix indent

---------

Co-authored-by: Jacek Sieka <[email protected]>
  • Loading branch information
tersec and arnetheduck authored Feb 9, 2024
1 parent dca444b commit 642774e
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 295 deletions.
18 changes: 7 additions & 11 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ proc initFullNode(
blobQuarantine, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] =
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
# The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise
Expand All @@ -361,27 +361,23 @@ proc initFullNode(
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
maybeFinalized: bool):
Future[Result[void, VerifierError]] =
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
withBlck(signedBlock):
when typeof(forkyBlck).kind >= ConsensusFork.Deneb:
when consensusFork >= ConsensusFork.Deneb:
if not blobQuarantine[].hasBlobs(forkyBlck):
# We don't have all the blobs for this block, so we have
# to put it in blobless quarantine.
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
Future.completed(
Result[void, VerifierError].err(VerifierError.UnviableFork),
"rmanBlockVerifier")
err(VerifierError.UnviableFork)
else:
Future.completed(
Result[void, VerifierError].err(VerifierError.MissingParent),
"rmanBlockVerifier")
err(VerifierError.MissingParent)
else:
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs),
maybeFinalized = maybeFinalized)
else:
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(BlobSidecars),
maybeFinalized = maybeFinalized)

Expand Down
101 changes: 30 additions & 71 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const
type
BlockVerifierFn* =
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [].}
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
InhibitFn* = proc: bool {.gcsafe, raises:[].}

RequestManager* = object
Expand All @@ -49,8 +49,8 @@ type
quarantine: ref Quarantine
blobQuarantine: ref BlobQuarantine
blockVerifier: BlockVerifierFn
blockLoopFuture: Future[void]
blobLoopFuture: Future[void]
blockLoopFuture: Future[void].Raising([CancelledError])
blobLoopFuture: Future[void].Raising([CancelledError])

func shortLog*(x: seq[Eth2Digest]): string =
"[" & x.mapIt(shortLog(it)).join(", ") & "]"
Expand Down Expand Up @@ -104,7 +104,7 @@ proc checkResponse(idList: seq[BlobIdentifier],
return false
true

proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.} =
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} =
var peer: Peer
try:
peer = await rman.network.peerPool.acquire()
Expand Down Expand Up @@ -171,27 +171,21 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.}
peer = peer, blocks = shortLog(items), err = blocks.error()
peer.updateScore(PeerScoreNoValues)

except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blocks by root", exc = exc.msg,
items = shortLog(items), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
rman.network.peerPool.release(peer)

proc fetchBlobsFromNetwork(self: RequestManager,
idList: seq[BlobIdentifier]) {.async.} =
idList: seq[BlobIdentifier])
{.async: (raises: [CancelledError]).} =
var peer: Peer

try:
peer = await self.network.peerPool.acquire()
debug "Requesting blobs by root", peer = peer, blobs = shortLog(idList),
peer_score = peer.getScore()

let blobs = (await blobSidecarsByRoot(peer, BlobIdentifierList idList))
let blobs = await blobSidecarsByRoot(peer, BlobIdentifierList idList)

if blobs.isOk:
let ublobs = blobs.get()
Expand Down Expand Up @@ -219,18 +213,11 @@ proc fetchBlobsFromNetwork(self: RequestManager,
peer = peer, blobs = shortLog(idList), err = blobs.error()
peer.updateScore(PeerScoreNoValues)

except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blobs by root", exc = exc.msg,
idList = shortLog(idList), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
self.network.peerPool.release(peer)

proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
Expand All @@ -245,33 +232,19 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
continue

debug "Requesting detected missing blocks", blocks = shortLog(blocks)
try:
let start = SyncMoment.now(0)

var workers: array[PARALLEL_REQUESTS, Future[void]]
let start = SyncMoment.now(0)

for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.requestBlocksByRoot(blocks)
var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]

await allFutures(workers)
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.requestBlocksByRoot(blocks)

let finish = SyncMoment.now(uint64(len(blocks)))
await allFutures(workers)

var succeed = 0
for worker in workers:
if worker.completed():
inc(succeed)

debug "Request manager block tick", blocks = shortLog(blocks),
succeed = succeed,
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)

except CancelledError:
break
except CatchableError as exc:
warn "Unexpected error in request manager block loop", exc = exc.msg
let finish = SyncMoment.now(uint64(len(blocks)))

debug "Request manager block tick", blocks = shortLog(blocks),
sync_speed = speed(start, finish)

proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
let
Expand Down Expand Up @@ -308,42 +281,28 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
rman.quarantine[].removeBlobless(blobless)
fetches


proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
await sleepAsync(POLL_INTERVAL)
if rman.inhibit():
continue

let fetches = rman.getMissingBlobs()
if fetches.len > 0:
debug "Requesting detected missing blobs", blobs = shortLog(fetches)
try:
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void]]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchBlobsFromNetwork(fetches)

await allFutures(workers)
let finish = SyncMoment.now(uint64(len(fetches)))

var succeed = 0
for worker in workers:
if worker.finished() and not(worker.failed()):
inc(succeed)

debug "Request manager blob tick",
blobs_count = len(fetches),
succeed = succeed,
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)

except CancelledError:
break
except CatchableError as exc:
warn "Unexpected error in request manager blob loop", exc = exc.msg
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchBlobsFromNetwork(fetches)

await allFutures(workers)
let finish = SyncMoment.now(uint64(len(fetches)))

debug "Request manager blob tick",
blobs_count = len(fetches),
sync_speed = speed(start, finish)

proc start*(rman: var RequestManager) =
## Start Request Manager's loops.
Expand Down
Loading

0 comments on commit 642774e

Please sign in to comment.