Skip to content

Commit

Permalink
fix(blockexchange): ensures futures are asyncSpawned (#1037)
Browse files Browse the repository at this point in the history
* fix(blockexchange): asyncSpawn advertising of local store blocks

* fix(blockexchange): asyncSpawn discoveryQueueLoop

- prevents silently swallowing async errors

* fix(blockexchange): asyncSpawns block exchange tasks

- prevents silently swallow future exceptions
  • Loading branch information
emizzle authored Dec 16, 2024
1 parent 5f2ba14 commit b0cc27f
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 57 deletions.
51 changes: 26 additions & 25 deletions codex/blockexchange/engine/advertiser.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import ../protobuf/presence
import ../peers

import ../../utils
import ../../utils/exceptions
import ../../utils/trackedfutures
import ../../discovery
import ../../stores/blockstore
import ../../logutils
Expand All @@ -42,7 +44,7 @@ type

advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
trackedFutures*: TrackedFutures # Advertise tasks futures

advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
Expand Down Expand Up @@ -70,20 +72,26 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} =
await b.addCidToQueue(cid)
await b.addCidToQueue(manifest.treeCid)

proc advertiseLocalStoreLoop(b: Advertiser) {.async.} =
proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
try:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."

await sleepAsync(b.advertiseLocalStoreLoopSleep)

await sleepAsync(b.advertiseLocalStoreLoopSleep)
except CancelledError:
break # do not propagate as advertiseLocalStoreLoop was asyncSpawned
except CatchableError as e:
error "failed to advertise blocks in local store", error = e.msgDetail

info "Exiting advertise task loop"

proc processQueueLoop(b: Advertiser) {.async.} =
proc processQueueLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
try:
let
Expand Down Expand Up @@ -129,9 +137,11 @@ proc start*(b: Advertiser) {.async.} =

b.advertiserRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(processQueueLoop(b))
let fut = b.processQueueLoop().track(b)
asyncSpawn fut

b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b)
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b).track(b)
asyncSpawn b.advertiseLocalStoreLoop

proc stop*(b: Advertiser) {.async.} =
## Stop the advertiser
Expand All @@ -145,19 +155,9 @@ proc stop*(b: Advertiser) {.async.} =
b.advertiserRunning = false
# Stop incoming tasks from callback and localStore loop
b.localStore.onBlockStored = CidCallback.none
if not b.advertiseLocalStoreLoop.isNil and not b.advertiseLocalStoreLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLocalStoreLoop.cancelAndWait()
trace "Advertise loop stopped"

# Clear up remaining tasks
for task in b.advertiseTasks:
if not task.finished:
trace "Awaiting advertise task to stop"
await task.cancelAndWait()
trace "Advertise task stopped"

trace "Advertiser stopped"
trace "Stopping advertise loop and tasks"
await b.trackedFutures.cancelTracked()
trace "Advertiser loop and tasks stopped"

proc new*(
T: type Advertiser,
Expand All @@ -173,5 +173,6 @@ proc new*(
discovery: discovery,
concurrentAdvReqs: concurrentAdvReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
trackedFutures: TrackedFutures.new(),
inFlightAdvReqs: initTable[Cid, Future[void]](),
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep)
44 changes: 24 additions & 20 deletions codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import ../network
import ../peers

import ../../utils
import ../../utils/trackedfutures
import ../../discovery
import ../../stores/blockstore
import ../../logutils
Expand Down Expand Up @@ -50,12 +51,12 @@ type
concurrentDiscReqs: int # Concurrent discovery requests
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
discoveryTasks*: seq[Future[void]] # Discovery tasks
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests

proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
while b.discEngineRunning:
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
try:
Expand All @@ -66,13 +67,15 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
except CatchableError as exc:
warn "Exception in discovery loop", exc = exc.msg

logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len

await sleepAsync(b.discoveryLoopSleep)
try:
logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len
await sleepAsync(b.discoveryLoopSleep)
except CancelledError:
discard # do not propagate as discoveryQueueLoop was asyncSpawned

proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
## Run discovery tasks
##

Expand Down Expand Up @@ -116,6 +119,11 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
return
except CatchableError as exc:
warn "Exception in discovery task runner", exc = exc.msg
except Exception as e:
# Raised by b.discovery.removeProvider somehow...
# This should not be catchable, and we should never get here. Therefore,
# raise a Defect.
raiseAssert "Exception when removing provider"

info "Exiting discovery task runner"

Expand All @@ -139,9 +147,11 @@ proc start*(b: DiscoveryEngine) {.async.} =

b.discEngineRunning = true
for i in 0..<b.concurrentDiscReqs:
b.discoveryTasks.add(discoveryTaskLoop(b))
let fut = b.discoveryTaskLoop().track(b)
asyncSpawn fut

b.discoveryLoop = discoveryQueueLoop(b)
b.discoveryLoop = b.discoveryQueueLoop().track(b)
asyncSpawn b.discoveryLoop

proc stop*(b: DiscoveryEngine) {.async.} =
## Stop the discovery engine
Expand All @@ -153,16 +163,9 @@ proc stop*(b: DiscoveryEngine) {.async.} =
return

b.discEngineRunning = false
for task in b.discoveryTasks:
if not task.finished:
trace "Awaiting discovery task to stop"
await task.cancelAndWait()
trace "Discovery task stopped"

if not b.discoveryLoop.isNil and not b.discoveryLoop.finished:
trace "Awaiting discovery loop to stop"
await b.discoveryLoop.cancelAndWait()
trace "Discovery loop stopped"
trace "Stopping discovery loop and tasks"
await b.trackedFutures.cancelTracked()
trace "Discovery loop and tasks stopped"

trace "Discovery engine stopped"

Expand All @@ -187,6 +190,7 @@ proc new*(
pendingBlocks: pendingBlocks,
concurrentDiscReqs: concurrentDiscReqs,
discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs),
trackedFutures: TrackedFutures.new(),
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
discoveryLoopSleep: discoveryLoopSleep,
minPeersPerBlock: minPeersPerBlock)
29 changes: 17 additions & 12 deletions codex/blockexchange/engine/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import pkg/questionable
import ../../stores/blockstore
import ../../blocktype
import ../../utils
import ../../utils/exceptions
import ../../utils/trackedfutures
import ../../merkletree
import ../../logutils
import ../../manifest
Expand Down Expand Up @@ -70,7 +72,7 @@ type
peers*: PeerCtxStore # Peers we're currently actively exchanging with
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # Peers we're currently processing tasks for
concurrentTasks: int # Number of concurrent peers we're serving at any given time
blockexcTasks: seq[Future[void]] # Future to control blockexc task
trackedFutures: TrackedFutures # Tracks futures of blockexc tasks
blockexcRunning: bool # Indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
peersPerRequest: int # Max number of peers to request from
Expand All @@ -88,7 +90,7 @@ type
proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()

proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.}
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).}

proc start*(b: BlockExcEngine) {.async.} =
## Start the blockexc task
Expand All @@ -104,7 +106,8 @@ proc start*(b: BlockExcEngine) {.async.} =

b.blockexcRunning = true
for i in 0..<b.concurrentTasks:
b.blockexcTasks.add(blockexcTaskRunner(b))
let fut = b.blockexcTaskRunner().track(b)
asyncSpawn fut

proc stop*(b: BlockExcEngine) {.async.} =
## Stop the blockexc blockexc
Expand All @@ -119,11 +122,7 @@ proc stop*(b: BlockExcEngine) {.async.} =
return

b.blockexcRunning = false
for task in b.blockexcTasks:
if not task.finished:
trace "Awaiting task to stop"
await task.cancelAndWait()
trace "Task stopped"
await b.trackedFutures.cancelTracked()

trace "NetworkStore stopped"

Expand Down Expand Up @@ -565,16 +564,21 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =

task.peerWants.keepItIf(it.address notin successAddresses)

proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} =
## process tasks
##

trace "Starting blockexc task runner"
while b.blockexcRunning:
let
peerCtx = await b.taskQueue.pop()
try:
let
peerCtx = await b.taskQueue.pop()

await b.taskHandler(peerCtx)
await b.taskHandler(peerCtx)
except CancelledError:
break # do not propagate as blockexcTaskRunner was asyncSpawned
except CatchableError as e:
error "error running block exchange task", error = e.msgDetail

info "Exiting blockexc task runner"

Expand Down Expand Up @@ -603,6 +607,7 @@ proc new*(
network: network,
wallet: wallet,
concurrentTasks: concurrentTasks,
trackedFutures: TrackedFutures.new(),
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery,
advertiser: advertiser,
Expand Down

0 comments on commit b0cc27f

Please sign in to comment.