Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unrevert rest of https://github.com/status-im/nimbus-eth2/pull/5765 #5867

Merged
merged 4 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ proc initFullNode(
blobQuarantine, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] =
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
# The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise
Expand All @@ -361,27 +361,23 @@ proc initFullNode(
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
maybeFinalized: bool):
Future[Result[void, VerifierError]] =
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
withBlck(signedBlock):
when typeof(forkyBlck).kind >= ConsensusFork.Deneb:
when consensusFork >= ConsensusFork.Deneb:
if not blobQuarantine[].hasBlobs(forkyBlck):
# We don't have all the blobs for this block, so we have
# to put it in blobless quarantine.
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
Future.completed(
Result[void, VerifierError].err(VerifierError.UnviableFork),
"rmanBlockVerifier")
err(VerifierError.UnviableFork)
else:
Future.completed(
Result[void, VerifierError].err(VerifierError.MissingParent),
"rmanBlockVerifier")
err(VerifierError.MissingParent)
else:
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs),
maybeFinalized = maybeFinalized)
else:
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(BlobSidecars),
maybeFinalized = maybeFinalized)

Expand Down
101 changes: 30 additions & 71 deletions beacon_chain/sync/request_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const
type
BlockVerifierFn* =
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [].}
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
InhibitFn* = proc: bool {.gcsafe, raises:[].}

RequestManager* = object
Expand All @@ -49,8 +49,8 @@ type
quarantine: ref Quarantine
blobQuarantine: ref BlobQuarantine
blockVerifier: BlockVerifierFn
blockLoopFuture: Future[void]
blobLoopFuture: Future[void]
blockLoopFuture: Future[void].Raising([CancelledError])
blobLoopFuture: Future[void].Raising([CancelledError])

func shortLog*(x: seq[Eth2Digest]): string =
"[" & x.mapIt(shortLog(it)).join(", ") & "]"
Expand Down Expand Up @@ -104,7 +104,7 @@ proc checkResponse(idList: seq[BlobIdentifier],
return false
true

proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.} =
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} =
var peer: Peer
try:
peer = await rman.network.peerPool.acquire()
Expand Down Expand Up @@ -171,27 +171,21 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.}
peer = peer, blocks = shortLog(items), err = blocks.error()
peer.updateScore(PeerScoreNoValues)

except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blocks by root", exc = exc.msg,
items = shortLog(items), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
rman.network.peerPool.release(peer)

proc fetchBlobsFromNetwork(self: RequestManager,
idList: seq[BlobIdentifier]) {.async.} =
idList: seq[BlobIdentifier])
{.async: (raises: [CancelledError]).} =
var peer: Peer

try:
peer = await self.network.peerPool.acquire()
debug "Requesting blobs by root", peer = peer, blobs = shortLog(idList),
peer_score = peer.getScore()

let blobs = (await blobSidecarsByRoot(peer, BlobIdentifierList idList))
let blobs = await blobSidecarsByRoot(peer, BlobIdentifierList idList)

if blobs.isOk:
let ublobs = blobs.get()
Expand Down Expand Up @@ -219,18 +213,11 @@ proc fetchBlobsFromNetwork(self: RequestManager,
peer = peer, blobs = shortLog(idList), err = blobs.error()
peer.updateScore(PeerScoreNoValues)

except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blobs by root", exc = exc.msg,
idList = shortLog(idList), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
self.network.peerPool.release(peer)

proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
Expand All @@ -245,33 +232,19 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
continue

debug "Requesting detected missing blocks", blocks = shortLog(blocks)
try:
let start = SyncMoment.now(0)

var workers: array[PARALLEL_REQUESTS, Future[void]]
let start = SyncMoment.now(0)

for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.requestBlocksByRoot(blocks)
var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]

await allFutures(workers)
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.requestBlocksByRoot(blocks)

let finish = SyncMoment.now(uint64(len(blocks)))
await allFutures(workers)

var succeed = 0
for worker in workers:
if worker.completed():
inc(succeed)

debug "Request manager block tick", blocks = shortLog(blocks),
succeed = succeed,
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)

except CancelledError:
break
except CatchableError as exc:
warn "Unexpected error in request manager block loop", exc = exc.msg
let finish = SyncMoment.now(uint64(len(blocks)))

debug "Request manager block tick", blocks = shortLog(blocks),
sync_speed = speed(start, finish)

proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
let
Expand Down Expand Up @@ -308,42 +281,28 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
rman.quarantine[].removeBlobless(blobless)
fetches


proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
await sleepAsync(POLL_INTERVAL)
if rman.inhibit():
continue

let fetches = rman.getMissingBlobs()
if fetches.len > 0:
debug "Requesting detected missing blobs", blobs = shortLog(fetches)
try:
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void]]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchBlobsFromNetwork(fetches)

await allFutures(workers)
let finish = SyncMoment.now(uint64(len(fetches)))

var succeed = 0
for worker in workers:
if worker.finished() and not(worker.failed()):
inc(succeed)

debug "Request manager blob tick",
blobs_count = len(fetches),
succeed = succeed,
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)

except CancelledError:
break
except CatchableError as exc:
warn "Unexpected error in request manager blob loop", exc = exc.msg
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchBlobsFromNetwork(fetches)

await allFutures(workers)
let finish = SyncMoment.now(uint64(len(fetches)))

debug "Request manager blob tick",
blobs_count = len(fetches),
sync_speed = speed(start, finish)

proc start*(rman: var RequestManager) =
## Start Request Manager's loops.
Expand Down
Loading
Loading