From b0cc27f56380ffd8fd647e80ed92bf8d7efe95d4 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Mon, 16 Dec 2024 13:01:49 +0700 Subject: [PATCH] fix(blockexchange): ensures futures are asyncSpawned (#1037) * fix(blockexchange): asyncSpawn advertising of local store blocks * fix(blockexchange): asyncSpawn discoveryQueueLoop - prevents silently swallowing async errors * fix(blockexchange): asyncSpawns block exchange tasks - prevents silently swallow future exceptions --- codex/blockexchange/engine/advertiser.nim | 51 ++++++++++++----------- codex/blockexchange/engine/discovery.nim | 44 ++++++++++--------- codex/blockexchange/engine/engine.nim | 29 +++++++------ 3 files changed, 67 insertions(+), 57 deletions(-) diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index e4a97db19..5ff82e489 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -18,6 +18,8 @@ import ../protobuf/presence import ../peers import ../../utils +import ../../utils/exceptions +import ../../utils/trackedfutures import ../../discovery import ../../stores/blockstore import ../../logutils @@ -42,7 +44,7 @@ type advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle advertiseQueue*: AsyncQueue[Cid] # Advertise queue - advertiseTasks*: seq[Future[void]] # Advertise tasks + trackedFutures*: TrackedFutures # Advertise tasks futures advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests @@ -70,20 +72,26 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} = await b.addCidToQueue(cid) await b.addCidToQueue(manifest.treeCid) -proc advertiseLocalStoreLoop(b: Advertiser) {.async.} = +proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} = while b.advertiserRunning: - if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): - trace "Advertiser begins iterating blocks..." - for c in cids: - if cid =? await c: - await b.advertiseBlock(cid) - trace "Advertiser iterating blocks finished." + try: + if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): + trace "Advertiser begins iterating blocks..." + for c in cids: + if cid =? await c: + await b.advertiseBlock(cid) + trace "Advertiser iterating blocks finished." + + await sleepAsync(b.advertiseLocalStoreLoopSleep) - await sleepAsync(b.advertiseLocalStoreLoopSleep) + except CancelledError: + break # do not propagate as advertiseLocalStoreLoop was asyncSpawned + except CatchableError as e: + error "failed to advertise blocks in local store", error = e.msgDetail info "Exiting advertise task loop" -proc processQueueLoop(b: Advertiser) {.async.} = +proc processQueueLoop(b: Advertiser) {.async: (raises: []).} = while b.advertiserRunning: try: let @@ -129,9 +137,11 @@ proc start*(b: Advertiser) {.async.} = b.advertiserRunning = true for i in 0..