diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index d094c454a..ef1465f9a 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [].} + import pkg/chronos import pkg/libp2p/cid import pkg/libp2p/multicodec @@ -81,16 +83,12 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async: (raises: [CancelledError]) proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} = try: while b.advertiserRunning: - try: - if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest): - trace "Advertiser begins iterating blocks..." - for c in cids: - if cid =? await c: - await b.advertiseBlock(cid) - trace "Advertiser iterating blocks finished." - except CatchableError as e: - error "Error in advertise local store loop", error = e.msgDetail - raiseAssert("Unexpected exception in advertiseLocalStoreLoop") + if cidsIter =? await b.localStore.listBlocks(blockType = BlockType.Manifest): + trace "Advertiser begins iterating blocks..." + for c in cidsIter: + if cid =? await c: + await b.advertiseBlock(cid) + trace "Advertiser iterating blocks finished." await sleepAsync(b.advertiseLocalStoreLoopSleep) except CancelledError: @@ -126,8 +124,11 @@ proc start*(b: Advertiser) {.async: (raises: []).} = trace "Advertiser start" - proc onBlock(cid: Cid) {.async.} = - await b.advertiseBlock(cid) + proc onBlock(cid: Cid) {.async: (raises: []).} = + try: + await b.advertiseBlock(cid) + except CancelledError: + trace "Cancelled advertise block", cid doAssert(b.localStore.onBlockStored.isNone()) b.localStore.onBlockStored = onBlock.some diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 35785cfe0..bd7dd9640 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -202,8 +202,8 @@ proc downloadInternal( trace "Block download cancelled" if not handle.finished: await handle.cancelAndWait() - except CatchableError as exc: - warn "Error downloadloading block", exc = exc.msg + except RetriesExhaustedError as exc: + warn "Retries exhausted for block", address, exc = exc.msg if not handle.finished: handle.fail(exc) finally: diff --git a/codex/errors.nim b/codex/errors.nim index 1a571e0f3..10baa434e 100644 --- a/codex/errors.nim +++ b/codex/errors.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [].} + import std/options import std/sugar import std/sequtils @@ -63,7 +65,7 @@ proc allFinishedFailed*[T]( return res proc allFinishedValues*[T]( - futs: seq[Future[T]] + futs: auto ): Future[?!seq[T]] {.async: (raises: [CancelledError]).} = ## If all futures have finished, return corresponding values, ## otherwise return failure diff --git a/codex/node.nim b/codex/node.nim index fb653c0d7..aa0838fc6 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -78,7 +78,9 @@ type CodexNodeRef* = ref CodexNode OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, raises: [].} - BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].} + BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {. + gcsafe, async: (raises: [CancelledError]) + .} func switch*(self: CodexNodeRef): Switch = return self.switch @@ -109,7 +111,9 @@ proc storeManifest*( success blk -proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.} = +proc fetchManifest*( + self: CodexNodeRef, cid: Cid +): Future[?!Manifest] {.async: (raises: [CancelledError]).} = ## Fetch and decode a manifest block ## @@ -144,7 +148,7 @@ proc connect*( proc updateExpiry*( self: CodexNodeRef, manifestCid: Cid, expiry: SecondsSince1970 -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = without manifest =? await self.fetchManifest(manifestCid), error: trace "Unable to fetch manifest for cid", manifestCid return failure(error) @@ -154,7 +158,7 @@ proc updateExpiry*( self.networkStore.localStore.ensureExpiry(manifest.treeCid, it, expiry) ) - let res = await allFinishedFailed(ensuringFutures) + let res = await allFinishedFailed(cast[seq[Future[?!void]]](ensuringFutures)) if res.failure.len > 0: trace "Some blocks failed to update expiry", len = res.failure.len return failure("Some blocks failed to update expiry (" & $res.failure.len & " )") @@ -172,7 +176,7 @@ proc fetchBatched*( batchSize = DefaultFetchBatch, onBatch: BatchProc = nil, fetchLocal = true, -): Future[?!void] {.async, gcsafe.} = +): Future[?!void] {.async: (raises: [CancelledError]), gcsafe.} = ## Fetch blocks in batches of `batchSize` ## @@ -190,7 +194,10 @@ proc fetchBatched*( if not (await address in self.networkStore) or fetchLocal: self.networkStore.getBlock(address) - without blockResults =? await allFinishedValues(blockFutures), err: + if blockFutures.len == 0: + continue + + without blockResults =? await allFinishedValues[?!bt.Block](blockFutures), err: trace "Some blocks failed to fetch", err = err.msg return failure(err) @@ -215,7 +222,7 @@ proc fetchBatched*( batchSize = DefaultFetchBatch, onBatch: BatchProc = nil, fetchLocal = true, -): Future[?!void] = +): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = ## Fetch manifest in batches of `batchSize` ## @@ -240,8 +247,6 @@ proc fetchDatasetAsync*( error "Unable to fetch blocks", err = err.msg except CancelledError as exc: trace "Cancelled fetching blocks", exc = exc.msg - except CatchableError as exc: - error "Error fetching blocks", exc = exc.msg proc fetchDatasetAsyncTask*(self: CodexNodeRef, manifest: Manifest) = ## Start fetching a dataset in the background. @@ -249,7 +254,9 @@ proc fetchDatasetAsyncTask*(self: CodexNodeRef, manifest: Manifest) = ## self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false)) -proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} = +proc streamSingleBlock( + self: CodexNodeRef, cid: Cid +): Future[?!LPStream] {.async: (raises: [CancelledError]).} = ## Streams the contents of a single block. ## trace "Streaming single block", cid = cid @@ -264,7 +271,9 @@ proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async defer: await stream.pushEof() await stream.pushData(blk.data) - except CatchableError as exc: + except CancelledError as exc: + trace "Streaming block cancelled", cid, exc = exc.msg + except LPStreamError as exc: trace "Unable to send block", cid, exc = exc.msg self.trackedFutures.track(streamOneBlock()) @@ -272,7 +281,7 @@ proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async proc streamEntireDataset( self: CodexNodeRef, manifest: Manifest, manifestCid: Cid -): Future[?!LPStream] {.async.} = +): Future[?!LPStream] {.async: (raises: [CancelledError]).} = ## Streams the contents of the entire dataset described by the manifest. ## trace "Retrieving blocks from manifest", manifestCid @@ -294,14 +303,14 @@ proc streamEntireDataset( jobs.add(erasureJob()) - jobs.add(self.fetchDatasetAsync(manifest)) + jobs.add(self.fetchDatasetAsync(manifest, fetchLocal = false)) # Monitor stream completion and cancel background jobs when done proc monitorStream() {.async: (raises: []).} = try: await stream.join() - except CatchableError as exc: - warn "Stream failed", exc = exc.msg + except CancelledError as exc: + warn "Stream cancelled", exc = exc.msg finally: await noCancel allFutures(jobs.mapIt(it.cancelAndWait)) @@ -314,7 +323,7 @@ proc streamEntireDataset( proc retrieve*( self: CodexNodeRef, cid: Cid, local: bool = true -): Future[?!LPStream] {.async.} = +): Future[?!LPStream] {.async: (raises: [CancelledError]).} = ## Retrieve by Cid a single block or an entire dataset described by manifest ## @@ -470,11 +479,11 @@ proc store*( return manifestBlk.cid.success proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} = - without cids =? await self.networkStore.listBlocks(BlockType.Manifest): + without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest): warn "Failed to listBlocks" return - for c in cids: + for c in cidsIter: if cid =? await c: without blk =? await self.networkStore.getBlock(cid): warn "Failed to get manifest block by cid", cid @@ -617,7 +626,7 @@ proc onStore( slotIdx: uint64, blocksCb: BlocksCb, isRepairing: bool = false, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = ## store data in local storage ## @@ -648,13 +657,15 @@ proc onStore( trace "Slot index not in manifest", slotIdx return failure(newException(CodexError, "Slot index not in manifest")) - proc updateExpiry(blocks: seq[bt.Block]): Future[?!void] {.async.} = + proc updateExpiry( + blocks: seq[bt.Block] + ): Future[?!void] {.async: (raises: [CancelledError]).} = trace "Updating expiry for blocks", blocks = blocks.len let ensureExpiryFutures = blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry.toSecondsSince1970)) - let res = await allFinishedFailed(ensureExpiryFutures) + let res = await allFinishedFailed(cast[seq[Future[?!void]]](ensureExpiryFutures)) if res.failure.len > 0: trace "Some blocks failed to update expiry", len = res.failure.len return failure("Some blocks failed to update expiry (" & $res.failure.len & " )") @@ -702,7 +713,7 @@ proc onStore( proc onProve( self: CodexNodeRef, slot: Slot, challenge: ProofChallenge -): Future[?!Groth16Proof] {.async.} = +): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} = ## Generats a proof for a given slot and challenge ## @@ -758,7 +769,7 @@ proc onProve( proc onExpiryUpdate( self: CodexNodeRef, rootCid: Cid, expiry: SecondsSince1970 -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = return await self.updateExpiry(rootCid, expiry) proc onClear(self: CodexNodeRef, request: StorageRequest, slotIndex: uint64) = @@ -781,12 +792,12 @@ proc start*(self: CodexNodeRef) {.async.} = slot: uint64, onBatch: BatchProc, isRepairing: bool = false, - ): Future[?!void] = + ): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = self.onStore(request, slot, onBatch, isRepairing) hostContracts.sales.onExpiryUpdate = proc( rootCid: Cid, expiry: SecondsSince1970 - ): Future[?!void] = + ): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = self.onExpiryUpdate(rootCid, expiry) hostContracts.sales.onClear = proc(request: StorageRequest, slotIndex: uint64) = @@ -795,7 +806,7 @@ proc start*(self: CodexNodeRef) {.async.} = hostContracts.sales.onProve = proc( slot: Slot, challenge: ProofChallenge - ): Future[?!Groth16Proof] = + ): Future[?!Groth16Proof] {.async: (raw: true, raises: [CancelledError]).} = # TODO: generate proof self.onProve(slot, challenge) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index ee493e035..9ef72a383 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -78,7 +78,7 @@ proc retrieveCid( ## manner ## - var stream: LPStream + var lpStream: LPStream var bytes = 0 try: @@ -94,6 +94,8 @@ proc retrieveCid( await resp.sendBody(error.msg) return + lpStream = stream + # It is ok to fetch again the manifest because it will hit the cache without manifest =? (await node.fetchManifest(cid)), err: error "Failed to fetch manifest", err = err.msg @@ -139,15 +141,15 @@ proc retrieveCid( codex_api_downloads.inc() except CancelledError as exc: raise exc - except CatchableError as exc: + except LPStreamError as exc: warn "Error streaming blocks", exc = exc.msg resp.status = Http500 if resp.isPending(): await resp.sendBody(exc.msg) finally: info "Sent bytes", cid = cid, bytes - if not stream.isNil: - await stream.close() + if not lpStream.isNil: + await lpStream.close() proc buildCorsHeaders( httpMethod: string, allowedOrigin: Option[string] diff --git a/codex/sales/salescontext.nim b/codex/sales/salescontext.nim index af940a4b1..40eded7d1 100644 --- a/codex/sales/salescontext.nim +++ b/codex/sales/salescontext.nim @@ -24,15 +24,17 @@ type slotQueue*: SlotQueue simulateProofFailures*: int - BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].} + BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {. + gcsafe, async: (raises: [CancelledError]) + .} OnStore* = proc( request: StorageRequest, slot: uint64, blocksCb: BlocksCb, isRepairing: bool - ): Future[?!void] {.gcsafe, upraises: [].} + ): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).} OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {. - gcsafe, upraises: [] + gcsafe, async: (raises: [CancelledError]) .} OnExpiryUpdate* = proc(rootCid: Cid, expiry: SecondsSince1970): Future[?!void] {. - gcsafe, upraises: [] + gcsafe, async: (raises: [CancelledError]) .} - OnClear* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].} - OnSale* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].} + OnClear* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].} + OnSale* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].} diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index 7cf304d31..0c39b0a56 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -55,7 +55,9 @@ method run*( reservationId = reservation.id availabilityId = reservation.availabilityId - proc onBlocks(blocks: seq[bt.Block]): Future[?!void] {.async.} = + proc onBlocks( + blocks: seq[bt.Block] + ): Future[?!void] {.async: (raises: [CancelledError]).} = # release batches of blocks as they are written to disk and # update availability size var bytes: uint = 0 diff --git a/codex/slots/builder/builder.nim b/codex/slots/builder/builder.nim index 1ea57a0f2..a26fc04e0 100644 --- a/codex/slots/builder/builder.nim +++ b/codex/slots/builder/builder.nim @@ -134,7 +134,7 @@ func manifest*[T, H](self: SlotsBuilder[T, H]): Manifest = proc buildBlockTree*[T, H]( self: SlotsBuilder[T, H], blkIdx: Natural, slotPos: Natural -): Future[?!(seq[byte], T)] {.async.} = +): Future[?!(seq[byte], T)] {.async: (raises: [CancelledError]).} = ## Build the block digest tree and return a tuple with the ## block data and the tree. ## @@ -167,7 +167,7 @@ proc buildBlockTree*[T, H]( proc getCellHashes*[T, H]( self: SlotsBuilder[T, H], slotIndex: Natural -): Future[?!seq[H]] {.async.} = +): Future[?!seq[H]] {.async: (raises: [CancelledError, IndexingError]).} = ## Collect all the cells from a block and return ## their hashes. ## @@ -202,19 +202,23 @@ proc getCellHashes*[T, H]( proc buildSlotTree*[T, H]( self: SlotsBuilder[T, H], slotIndex: Natural -): Future[?!T] {.async.} = +): Future[?!T] {.async: (raises: [CancelledError]).} = ## Build the slot tree from the block digest hashes ## and return the tree. - without cellHashes =? (await self.getCellHashes(slotIndex)), err: - error "Failed to select slot blocks", err = err.msg - return failure(err) + try: + without cellHashes =? (await self.getCellHashes(slotIndex)), err: + error "Failed to select slot blocks", err = err.msg + return failure(err) - T.init(cellHashes) + T.init(cellHashes) + except IndexingError as err: + error "Failed to build slot tree", err = err.msg + return failure(err) proc buildSlot*[T, H]( self: SlotsBuilder[T, H], slotIndex: Natural -): Future[?!H] {.async.} = +): Future[?!H] {.async: (raises: [CancelledError]).} = ## Build a slot tree and store the proofs in ## the block store. ## @@ -250,7 +254,9 @@ proc buildSlot*[T, H]( func buildVerifyTree*[T, H](self: SlotsBuilder[T, H], slotRoots: openArray[H]): ?!T = T.init(@slotRoots) -proc buildSlots*[T, H](self: SlotsBuilder[T, H]): Future[?!void] {.async.} = +proc buildSlots*[T, H]( + self: SlotsBuilder[T, H] +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Build all slot trees and store them in the block store. ## @@ -280,7 +286,9 @@ proc buildSlots*[T, H](self: SlotsBuilder[T, H]): Future[?!void] {.async.} = success() -proc buildManifest*[T, H](self: SlotsBuilder[T, H]): Future[?!Manifest] {.async.} = +proc buildManifest*[T, H]( + self: SlotsBuilder[T, H] +): Future[?!Manifest] {.async: (raises: [CancelledError]).} = if err =? (await self.buildSlots()).errorOption: error "Failed to build slot roots", err = err.msg return failure(err) diff --git a/codex/slots/proofs/prover.nim b/codex/slots/proofs/prover.nim index b1aa77c06..b3707282d 100644 --- a/codex/slots/proofs/prover.nim +++ b/codex/slots/proofs/prover.nim @@ -50,7 +50,7 @@ type proc prove*( self: Prover, slotIdx: int, manifest: Manifest, challenge: ProofChallenge -): Future[?!(AnyProofInputs, AnyProof)] {.async.} = +): Future[?!(AnyProofInputs, AnyProof)] {.async: (raises: [CancelledError]).} = ## Prove a statement using backend. ## Returns a future that resolves to a proof. diff --git a/codex/slots/sampler/sampler.nim b/codex/slots/sampler/sampler.nim index bccdaff2f..6ea41ee33 100644 --- a/codex/slots/sampler/sampler.nim +++ b/codex/slots/sampler/sampler.nim @@ -48,7 +48,7 @@ func getCell*[T, H]( proc getSample*[T, H]( self: DataSampler[T, H], cellIdx: int, slotTreeCid: Cid, slotRoot: H -): Future[?!Sample[H]] {.async.} = +): Future[?!Sample[H]] {.async: (raises: [CancelledError]).} = let cellsPerBlock = self.builder.numBlockCells blkCellIdx = cellIdx.toCellInBlk(cellsPerBlock) # block cell index @@ -81,7 +81,7 @@ proc getSample*[T, H]( proc getProofInput*[T, H]( self: DataSampler[T, H], entropy: ProofChallenge, nSamples: Natural -): Future[?!ProofInputs[H]] {.async.} = +): Future[?!ProofInputs[H]] {.async: (raises: [CancelledError]).} = ## Generate proofs as input to the proving circuit. ## diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 78fab0da7..bbe0bef8f 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -7,10 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import pkg/upraises - -push: - {.upraises: [].} +{.push raises: [].} import pkg/chronos import pkg/libp2p @@ -32,11 +29,13 @@ type Block Both - CidCallback* = proc(cid: Cid): Future[void] {.gcsafe, raises: [].} + CidCallback* = proc(cid: Cid): Future[void] {.gcsafe, async: (raises: []).} BlockStore* = ref object of RootObj onBlockStored*: ?CidCallback -method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base, gcsafe.} = +method getBlock*( + self: BlockStore, cid: Cid +): Future[?!Block] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Get a block from the blockstore ## @@ -44,20 +43,23 @@ method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base, gcsafe.} = method getBlock*( self: BlockStore, treeCid: Cid, index: Natural -): Future[?!Block] {.base, gcsafe.} = +): Future[?!Block] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Get a block from the blockstore ## raiseAssert("getBlock by treecid not implemented!") -method getCid*(self: BlockStore, treeCid: Cid, index: Natural): Future[?!Cid] {.base.} = +method getCid*( + self: BlockStore, treeCid: Cid, index: Natural +): Future[?!Cid] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Get a cid given a tree and index ## + raiseAssert("getCid by treecid not implemented!") method getBlock*( self: BlockStore, address: BlockAddress -): Future[?!Block] {.base, gcsafe.} = +): Future[?!Block] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Get a block from the blockstore ## @@ -65,7 +67,7 @@ method getBlock*( method getBlockAndProof*( self: BlockStore, treeCid: Cid, index: Natural -): Future[?!(Block, CodexProof)] {.base, gcsafe.} = +): Future[?!(Block, CodexProof)] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Get a block and associated inclusion proof by Cid of a merkle tree and an index of a leaf in a tree ## @@ -73,7 +75,7 @@ method getBlockAndProof*( method putBlock*( self: BlockStore, blk: Block, ttl = Duration.none -): Future[?!void] {.base, gcsafe.} = +): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Put a block to the blockstore ## @@ -81,7 +83,7 @@ method putBlock*( method putCidAndProof*( self: BlockStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof -): Future[?!void] {.base, gcsafe.} = +): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Put a block proof to the blockstore ## @@ -89,7 +91,7 @@ method putCidAndProof*( method getCidAndProof*( self: BlockStore, treeCid: Cid, index: Natural -): Future[?!(Cid, CodexProof)] {.base, gcsafe.} = +): Future[?!(Cid, CodexProof)] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Get a block proof from the blockstore ## @@ -97,7 +99,7 @@ method getCidAndProof*( method ensureExpiry*( self: BlockStore, cid: Cid, expiry: SecondsSince1970 -): Future[?!void] {.base, gcsafe.} = +): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Ensure that block's assosicated expiry is at least given timestamp ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## @@ -106,14 +108,16 @@ method ensureExpiry*( method ensureExpiry*( self: BlockStore, treeCid: Cid, index: Natural, expiry: SecondsSince1970 -): Future[?!void] {.base, gcsafe.} = +): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Ensure that block's associated expiry is at least given timestamp ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## raiseAssert("Not implemented!") -method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base, gcsafe.} = +method delBlock*( + self: BlockStore, cid: Cid +): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Delete a block from the blockstore ## @@ -121,13 +125,15 @@ method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base, gcsafe.} = method delBlock*( self: BlockStore, treeCid: Cid, index: Natural -): Future[?!void] {.base, gcsafe.} = +): Future[?!void] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Delete a block from the blockstore ## raiseAssert("delBlock not implemented!") -method hasBlock*(self: BlockStore, cid: Cid): Future[?!bool] {.base, gcsafe.} = +method hasBlock*( + self: BlockStore, cid: Cid +): Future[?!bool] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Check if the block exists in the blockstore ## @@ -135,7 +141,7 @@ method hasBlock*(self: BlockStore, cid: Cid): Future[?!bool] {.base, gcsafe.} = method hasBlock*( self: BlockStore, tree: Cid, index: Natural -): Future[?!bool] {.base, gcsafe.} = +): Future[?!bool] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Check if the block exists in the blockstore ## @@ -143,27 +149,31 @@ method hasBlock*( method listBlocks*( self: BlockStore, blockType = BlockType.Manifest -): Future[?!AsyncIter[?Cid]] {.base, gcsafe.} = +): Future[?!SafeAsyncIter[Cid]] {.base, async: (raises: [CancelledError]), gcsafe.} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## raiseAssert("listBlocks not implemented!") -method close*(self: BlockStore): Future[void] {.base, gcsafe.} = +method close*(self: BlockStore): Future[void] {.base, async: (raises: []), gcsafe.} = ## Close the blockstore, cleaning up resources managed by it. ## For some implementations this may be a no-op ## raiseAssert("close not implemented!") -proc contains*(self: BlockStore, blk: Cid): Future[bool] {.async.} = +proc contains*( + self: BlockStore, blk: Cid +): Future[bool] {.async: (raises: [CancelledError]), gcsafe.} = ## Check if the block exists in the blockstore. ## Return false if error encountered ## return (await self.hasBlock(blk)) |? false -proc contains*(self: BlockStore, address: BlockAddress): Future[bool] {.async.} = +proc contains*( + self: BlockStore, address: BlockAddress +): Future[bool] {.async: (raises: [CancelledError]), gcsafe.} = return if address.leaf: (await self.hasBlock(address.treeCid, address.index)) |? false diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index 6235c9c61..631547845 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -7,10 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import pkg/upraises - -push: - {.upraises: [].} +{.push raises: [].} import std/options @@ -46,7 +43,9 @@ type const DefaultCacheSize*: NBytes = 5.MiBs -method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = +method getBlock*( + self: CacheStore, cid: Cid +): Future[?!Block] {.async: (raises: [CancelledError]).} = ## Get a block from the stores ## @@ -69,7 +68,7 @@ method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = method getCidAndProof*( self: CacheStore, treeCid: Cid, index: Natural -): Future[?!(Cid, CodexProof)] {.async.} = +): Future[?!(Cid, CodexProof)] {.async: (raises: [CancelledError]).} = if cidAndProof =? self.cidAndProofCache.getOption((treeCid, index)): success(cidAndProof) else: @@ -81,7 +80,7 @@ method getCidAndProof*( method getBlock*( self: CacheStore, treeCid: Cid, index: Natural -): Future[?!Block] {.async.} = +): Future[?!Block] {.async: (raises: [CancelledError]).} = without cidAndProof =? (await self.getCidAndProof(treeCid, index)), err: return failure(err) @@ -89,7 +88,7 @@ method getBlock*( method getBlockAndProof*( self: CacheStore, treeCid: Cid, index: Natural -): Future[?!(Block, CodexProof)] {.async.} = +): Future[?!(Block, CodexProof)] {.async: (raises: [CancelledError]).} = without cidAndProof =? (await self.getCidAndProof(treeCid, index)), err: return failure(err) @@ -100,13 +99,17 @@ method getBlockAndProof*( success((blk, proof)) -method getBlock*(self: CacheStore, address: BlockAddress): Future[?!Block] = +method getBlock*( + self: CacheStore, address: BlockAddress +): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} = if address.leaf: self.getBlock(address.treeCid, address.index) else: self.getBlock(address.cid) -method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} = +method hasBlock*( + self: CacheStore, cid: Cid +): Future[?!bool] {.async: (raises: [CancelledError]).} = ## Check if the block exists in the blockstore ## @@ -119,7 +122,7 @@ method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} = method hasBlock*( self: CacheStore, treeCid: Cid, index: Natural -): Future[?!bool] {.async.} = +): Future[?!bool] {.async: (raises: [CancelledError]).} = without cidAndProof =? (await self.getCidAndProof(treeCid, index)), err: if err of BlockNotFoundError: return success(false) @@ -136,7 +139,7 @@ func cids(self: CacheStore): (iterator (): Cid {.gcsafe.}) = method listBlocks*( self: CacheStore, blockType = BlockType.Manifest -): Future[?!AsyncIter[?Cid]] {.async.} = +): Future[?!SafeAsyncIter[Cid]] {.async: (raises: [CancelledError]).} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## @@ -145,12 +148,19 @@ method listBlocks*( proc isFinished(): bool = return finished(cids) - proc genNext(): Future[Cid] {.async.} = - cids() + proc genNext(): Future[?!Cid] {.async: (raises: [CancelledError]).} = + try: + let cid = cids() + success(cid) + except Exception as err: + failure(err.msg) let iter = await ( - AsyncIter[Cid].new(genNext, isFinished).filter( - proc(cid: Cid): Future[bool] {.async.} = + SafeAsyncIter[Cid].new(genNext, isFinished).filter( + proc(cid: ?!Cid): Future[bool] {.async: (raises: [CancelledError]).} = + without cid =? cid, err: + trace "Cannot get Cid from the iterator", err = err.msg + return false without isManifest =? cid.isManifest, err: trace "Error checking if cid is a manifest", err = err.msg return false @@ -164,14 +174,7 @@ method listBlocks*( return not isManifest ) ) - - return success( - map[Cid, ?Cid]( - iter, - proc(cid: Cid): Future[?Cid] {.async.} = - some(cid), - ) - ) + success(iter) func putBlockSync(self: CacheStore, blk: Block): bool = let blkSize = blk.data.len.NBytes # in bytes @@ -196,7 +199,7 @@ func putBlockSync(self: CacheStore, blk: Block): bool = method putBlock*( self: CacheStore, blk: Block, ttl = Duration.none -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Put a block to the blockstore ## @@ -213,13 +216,13 @@ method putBlock*( method putCidAndProof*( self: CacheStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = self.cidAndProofCache[(treeCid, index)] = (blockCid, proof) success() method ensureExpiry*( self: CacheStore, cid: Cid, expiry: SecondsSince1970 -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Updates block's assosicated TTL in store - not applicable for CacheStore ## @@ -227,13 +230,15 @@ method ensureExpiry*( method ensureExpiry*( self: CacheStore, treeCid: Cid, index: Natural, expiry: SecondsSince1970 -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Updates block's associated TTL in store - not applicable for CacheStore ## discard # CacheStore does not have notion of TTL -method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} = +method delBlock*( + self: CacheStore, cid: Cid +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Delete a block from the blockstore ## @@ -250,7 +255,7 @@ method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} = method delBlock*( self: CacheStore, treeCid: Cid, index: Natural -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = let maybeRemoved = self.cidAndProofCache.del((treeCid, index)) if removed =? maybeRemoved: @@ -258,7 +263,7 @@ method delBlock*( return success() -method close*(self: CacheStore): Future[void] {.async.} = +method close*(self: CacheStore): Future[void] {.async: (raises: []).} = ## Close the blockstore, a no-op for this implementation ## diff --git a/codex/stores/maintenance.nim b/codex/stores/maintenance.nim index cced5da90..7568a5d78 100644 --- a/codex/stores/maintenance.nim +++ b/codex/stores/maintenance.nim @@ -10,13 +10,15 @@ ## Store maintenance module ## Looks for and removes expired blocks from blockstores. +{.push raises: [].} + import pkg/chronos import pkg/questionable import pkg/questionable/results import ./repostore import ../utils/timer -import ../utils/asynciter +import ../utils/safeasynciter import ../clock import ../logutils import ../systemclock @@ -54,19 +56,23 @@ proc new*( offset: 0, ) -proc deleteExpiredBlock(self: BlockMaintainer, cid: Cid): Future[void] {.async.} = +proc deleteExpiredBlock( + self: BlockMaintainer, cid: Cid +): Future[void] {.async: (raises: [CancelledError]).} = if isErr (await self.repoStore.delBlock(cid)): trace "Unable to delete block from repoStore" proc processBlockExpiration( self: BlockMaintainer, be: BlockExpiration -): Future[void] {.async.} = +): Future[void] {.async: (raises: [CancelledError]).} = if be.expiry < self.clock.now: await self.deleteExpiredBlock(be.cid) else: inc self.offset -proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} = +proc runBlockCheck( + self: BlockMaintainer +): Future[void] {.async: (raises: [CancelledError]).} = let expirations = await self.repoStore.getBlockExpirations( maxNumber = self.numberOfBlocksPerInterval, offset = self.offset ) @@ -77,7 +83,9 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} = var numberReceived = 0 for beFut in iter: - let be = await beFut + without be =? (await beFut), err: + trace "Unable to obtain blockExpiration from iterator" + continue inc numberReceived await self.processBlockExpiration(be) await sleepAsync(1.millis) # cooperative scheduling @@ -88,15 +96,14 @@ proc runBlockCheck(self: BlockMaintainer): Future[void] {.async.} = self.offset = 0 proc start*(self: BlockMaintainer) = - proc onTimer(): Future[void] {.async.} = + proc onTimer(): Future[void] {.async: (raises: []).} = try: await self.runBlockCheck() - except CancelledError as error: - raise error - except CatchableError as exc: - error "Unexpected exception in BlockMaintainer.onTimer(): ", msg = exc.msg + except CancelledError as err: + trace "Running block check in block maintenance timer callback cancelled: ", + err = err.msg self.timer.start(onTimer, self.interval) -proc stop*(self: BlockMaintainer): Future[void] {.async.} = +proc stop*(self: BlockMaintainer): Future[void] {.async: (raises: []).} = await self.timer.stop() diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index f94bca330..64410ce0b 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -19,7 +19,7 @@ import ../blockexchange import ../logutils import ../merkletree import ../utils/asyncheapqueue -import ../utils/asynciter +import ../utils/safeasynciter import ./blockstore export blockstore, blockexchange, asyncheapqueue @@ -31,7 +31,9 @@ type NetworkStore* = ref object of BlockStore engine*: BlockExcEngine # blockexc decision engine localStore*: BlockStore # local block store -method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.async.} = +method getBlock*( + self: NetworkStore, address: BlockAddress +): Future[?!Block] {.async: (raises: [CancelledError]).} = without blk =? (await self.localStore.getBlock(address)), err: if not (err of BlockNotFoundError): error "Error getting block from local store", address, err = err.msg @@ -45,13 +47,17 @@ method getBlock*(self: NetworkStore, address: BlockAddress): Future[?!Block] {.a return success blk -method getBlock*(self: NetworkStore, cid: Cid): Future[?!Block] = +method getBlock*( + self: NetworkStore, cid: Cid +): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} = ## Get a block from the blockstore ## self.getBlock(BlockAddress.init(cid)) -method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Block] = +method getBlock*( + self: NetworkStore, treeCid: Cid, index: Natural +): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} = ## Get a block from the blockstore ## @@ -59,7 +65,7 @@ method getBlock*(self: NetworkStore, treeCid: Cid, index: Natural): Future[?!Blo method putBlock*( self: NetworkStore, blk: Block, ttl = Duration.none -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Store block locally and notify the network ## let res = await self.localStore.putBlock(blk, ttl) @@ -71,12 +77,12 @@ method putBlock*( method putCidAndProof*( self: NetworkStore, treeCid: Cid, index: Natural, blockCid: Cid, proof: CodexProof -): Future[?!void] = +): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = self.localStore.putCidAndProof(treeCid, index, blockCid, proof) method getCidAndProof*( self: NetworkStore, treeCid: Cid, index: Natural -): Future[?!(Cid, CodexProof)] = +): Future[?!(Cid, CodexProof)] {.async: (raw: true, raises: [CancelledError]).} = ## Get a block proof from the blockstore ## @@ -84,7 +90,7 @@ method getCidAndProof*( method ensureExpiry*( self: NetworkStore, cid: Cid, expiry: SecondsSince1970 -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Ensure that block's assosicated expiry is at least given timestamp ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## @@ -101,7 +107,7 @@ method ensureExpiry*( method ensureExpiry*( self: NetworkStore, treeCid: Cid, index: Natural, expiry: SecondsSince1970 -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Ensure that block's associated expiry is at least given timestamp ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## @@ -118,10 +124,12 @@ method ensureExpiry*( method listBlocks*( self: NetworkStore, blockType = BlockType.Manifest -): Future[?!AsyncIter[?Cid]] = +): Future[?!SafeAsyncIter[Cid]] {.async: (raw: true, raises: [CancelledError]).} = self.localStore.listBlocks(blockType) -method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] = +method delBlock*( + self: NetworkStore, cid: Cid +): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} = ## Delete a block from the blockstore ## @@ -130,7 +138,9 @@ method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] = {.pop.} -method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} = +method hasBlock*( + self: NetworkStore, cid: Cid +): Future[?!bool] {.async: (raises: [CancelledError]).} = ## Check if the block exists in the blockstore ## @@ -139,13 +149,13 @@ method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} = method hasBlock*( self: NetworkStore, tree: Cid, index: Natural -): Future[?!bool] {.async.} = +): Future[?!bool] {.async: (raises: [CancelledError]).} = ## Check if the block exists in the blockstore ## trace "Checking network store for block existence", tree, index return await self.localStore.hasBlock(tree, index) -method close*(self: NetworkStore): Future[void] {.async.} = +method close*(self: NetworkStore): Future[void] {.async: (raises: []).} = ## Close the underlying local blockstore ## diff --git a/codex/stores/queryiterhelper.nim b/codex/stores/queryiterhelper.nim index 6bf3090bf..bbc3be698 100644 --- a/codex/stores/queryiterhelper.nim +++ b/codex/stores/queryiterhelper.nim @@ -5,12 +5,15 @@ import pkg/chronicles import pkg/datastore/typedds import ../utils/asynciter +import ../utils/safeasynciter + +{.push raises: [].} type KeyVal*[T] = tuple[key: Key, value: T] proc toAsyncIter*[T]( queryIter: QueryIter[T], finishOnErr: bool = true -): Future[?!AsyncIter[?!QueryResponse[T]]] {.async.} = +): Future[?!AsyncIter[?!QueryResponse[T]]] {.async: (raises: [CancelledError]).} = ## Converts `QueryIter[T]` to `AsyncIter[?!QueryResponse[T]]` and automatically ## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only ## if the flag finishOnErr is set to true) @@ -42,9 +45,43 @@ proc toAsyncIter*[T]( AsyncIter[?!QueryResponse[T]].new(genNext, isFinished).success +proc toSafeAsyncIter*[T]( + queryIter: QueryIter[T], finishOnErr: bool = true +): Future[?!SafeAsyncIter[QueryResponse[T]]] {.async: (raises: [CancelledError]).} = + ## Converts `QueryIter[T]` to `SafeAsyncIter[QueryResponse[T]]` and automatically + ## runs dispose whenever `QueryIter` finishes or whenever an error occurs (only + ## if the flag finishOnErr is set to true) + ## + + if queryIter.finished: + trace "Disposing iterator" + if error =? (await queryIter.dispose()).errorOption: + return failure(error) + return success(SafeAsyncIter[QueryResponse[T]].empty()) + + var errOccurred = false + + proc genNext(): Future[?!QueryResponse[T]] {.async: (raises: [CancelledError]).} = + let queryResOrErr = await queryIter.next() + + if queryResOrErr.isErr: + errOccurred = true + + if queryIter.finished or (errOccurred and finishOnErr): + trace "Disposing iterator" + if error =? (await queryIter.dispose()).errorOption: + return failure(error) + + return queryResOrErr + + proc isFinished(): bool = + queryIter.finished + + SafeAsyncIter[QueryResponse[T]].new(genNext, isFinished).success + proc filterSuccess*[T]( iter: AsyncIter[?!QueryResponse[T]] -): Future[AsyncIter[tuple[key: Key, value: T]]] {.async.} = +): Future[AsyncIter[tuple[key: Key, value: T]]] {.async: (raises: [CancelledError]).} = ## Filters out any items that are not success proc mapping(resOrErr: ?!QueryResponse[T]): Future[?KeyVal[T]] {.async.} = @@ -63,3 +100,29 @@ proc filterSuccess*[T]( (key: key, value: value).some await mapFilter[?!QueryResponse[T], KeyVal[T]](iter, mapping) + +proc filterSuccess*[T]( + iter: SafeAsyncIter[QueryResponse[T]] +): Future[SafeAsyncIter[tuple[key: Key, value: T]]] {. + async: (raises: [CancelledError]) +.} = + ## Filters out any items that are not success + + proc mapping( + resOrErr: ?!QueryResponse[T] + ): Future[Option[?!KeyVal[T]]] {.async: (raises: [CancelledError]).} = + without res =? resOrErr, error: + error "Error occurred when getting QueryResponse", msg = error.msg + return Result[KeyVal[T], ref CatchableError].none + + without key =? res.key: + warn "No key for a QueryResponse" + return Result[KeyVal[T], ref CatchableError].none + + without value =? res.value, error: + error "Error occurred when getting a value from QueryResponse", msg = error.msg + return Result[KeyVal[T], ref CatchableError].none + + some(success((key: key, value: value))) + + await mapFilter[QueryResponse[T], KeyVal[T]](iter, mapping) diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim index cc488240f..ddbfdfb02 100644 --- a/codex/stores/repostore/operations.nim +++ b/codex/stores/repostore/operations.nim @@ -34,7 +34,7 @@ declareGauge(codex_repostore_bytes_reserved, "codex repostore bytes reserved") proc putLeafMetadata*( self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof -): Future[?!StoreResultKind] {.async.} = +): Future[?!StoreResultKind] {.async: (raises: [CancelledError]).} = without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: return failure(err) @@ -59,7 +59,7 @@ proc putLeafMetadata*( proc delLeafMetadata*( self: RepoStore, treeCid: Cid, index: Natural -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: return failure(err) @@ -70,7 +70,7 @@ proc delLeafMetadata*( proc getLeafMetadata*( self: RepoStore, treeCid: Cid, index: Natural -): Future[?!LeafMetadata] {.async.} = +): Future[?!LeafMetadata] {.async: (raises: [CancelledError]).} = without key =? createBlockCidAndProofMetadataKey(treeCid, index), err: return failure(err) @@ -84,7 +84,7 @@ proc getLeafMetadata*( proc updateTotalBlocksCount*( self: RepoStore, plusCount: Natural = 0, minusCount: Natural = 0 -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = await self.metaDs.modify( CodexTotalBlocksKey, proc(maybeCurrCount: ?Natural): Future[?Natural] {.async.} = @@ -139,7 +139,7 @@ proc updateBlockMetadata*( plusRefCount: Natural = 0, minusRefCount: Natural = 0, minExpiry: SecondsSince1970 = 0, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = if cid.isEmpty: return success() @@ -163,7 +163,7 @@ proc updateBlockMetadata*( proc storeBlock*( self: RepoStore, blk: Block, minExpiry: SecondsSince1970 -): Future[?!StoreResult] {.async.} = +): Future[?!StoreResult] {.async: (raises: [CancelledError]).} = if blk.isEmpty: return success(StoreResult(kind: AlreadyInStore)) @@ -189,7 +189,7 @@ proc storeBlock*( ) res = StoreResult(kind: AlreadyInStore) - # making sure that the block acutally is stored in the repoDs + # making sure that the block actually is stored in the repoDs without hasBlock =? await self.repoDs.has(blkKey), err: raise err @@ -215,7 +215,7 @@ proc storeBlock*( proc tryDeleteBlock*( self: RepoStore, cid: Cid, expiryLimit = SecondsSince1970.low -): Future[?!DeleteResult] {.async.} = +): Future[?!DeleteResult] {.async: (raises: [CancelledError]).} = without metaKey =? createBlockExpirationMetadataKey(cid), err: return failure(err) diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index 130ab15ea..bea2971c7 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +{.push raises: [].} + import pkg/chronos import pkg/chronos/futures import pkg/datastore @@ -36,7 +38,9 @@ logScope: # BlockStore API ########################################################### -method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = +method getBlock*( + self: RepoStore, cid: Cid +): Future[?!Block] {.async: (raises: [CancelledError]).} = ## Get a block from the blockstore ## @@ -63,7 +67,7 @@ method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = method getBlockAndProof*( self: RepoStore, treeCid: Cid, index: Natural -): Future[?!(Block, CodexProof)] {.async.} = +): Future[?!(Block, CodexProof)] {.async: (raises: [CancelledError]).} = without leafMd =? await self.getLeafMetadata(treeCid, index), err: return failure(err) @@ -74,13 +78,15 @@ method getBlockAndProof*( method getBlock*( self: RepoStore, treeCid: Cid, index: Natural -): Future[?!Block] {.async.} = +): Future[?!Block] {.async: (raises: [CancelledError]).} = without leafMd =? await self.getLeafMetadata(treeCid, index), err: return failure(err) await self.getBlock(leafMd.blkCid) -method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] = +method getBlock*( + self: RepoStore, address: BlockAddress +): Future[?!Block] {.async: (raw: true, raises: [CancelledError]).} = ## Get a block from the blockstore ## @@ -91,7 +97,7 @@ method getBlock*(self: RepoStore, address: BlockAddress): Future[?!Block] = method ensureExpiry*( self: RepoStore, cid: Cid, expiry: SecondsSince1970 -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Ensure that block's associated expiry is at least given timestamp ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## @@ -104,7 +110,7 @@ method ensureExpiry*( method ensureExpiry*( self: RepoStore, treeCid: Cid, index: Natural, expiry: SecondsSince1970 -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Ensure that block's associated expiry is at least given timestamp ## If the current expiry is lower then it is updated to the given one, otherwise it is left intact ## @@ -116,7 +122,7 @@ method ensureExpiry*( method putCidAndProof*( self: RepoStore, treeCid: Cid, index: Natural, blkCid: Cid, proof: CodexProof -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Put a block to the blockstore ## @@ -142,13 +148,15 @@ method putCidAndProof*( method getCidAndProof*( self: RepoStore, treeCid: Cid, index: Natural -): Future[?!(Cid, CodexProof)] {.async.} = +): Future[?!(Cid, CodexProof)] {.async: (raises: [CancelledError]).} = without leafMd =? await self.getLeafMetadata(treeCid, index), err: return failure(err) success((leafMd.blkCid, leafMd.proof)) -method getCid*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Cid] {.async.} = +method getCid*( + self: RepoStore, treeCid: Cid, index: Natural +): Future[?!Cid] {.async: (raises: [CancelledError]).} = without leafMd =? await self.getLeafMetadata(treeCid, index), err: return failure(err) @@ -156,7 +164,7 @@ method getCid*(self: RepoStore, treeCid: Cid, index: Natural): Future[?!Cid] {.a method putBlock*( self: RepoStore, blk: Block, ttl = Duration.none -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Put a block to the blockstore ## @@ -186,7 +194,9 @@ method putBlock*( return success() -proc delBlockInternal(self: RepoStore, cid: Cid): Future[?!DeleteResultKind] {.async.} = +proc delBlockInternal( + self: RepoStore, cid: Cid +): Future[?!DeleteResultKind] {.async: (raises: [CancelledError]).} = logScope: cid = cid @@ -208,7 +218,9 @@ proc delBlockInternal(self: RepoStore, cid: Cid): Future[?!DeleteResultKind] {.a success(res.kind) -method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = +method delBlock*( + self: RepoStore, cid: Cid +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Delete a block from the blockstore when block refCount is 0 or block is expired ## @@ -230,7 +242,7 @@ method delBlock*(self: RepoStore, cid: Cid): Future[?!void] {.async.} = method delBlock*( self: RepoStore, treeCid: Cid, index: Natural -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = without leafMd =? await self.getLeafMetadata(treeCid, index), err: if err of BlockNotFoundError: return success() @@ -251,7 +263,9 @@ method delBlock*( success() -method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = +method hasBlock*( + self: RepoStore, cid: Cid +): Future[?!bool] {.async: (raises: [CancelledError]).} = ## Check if the block exists in the blockstore ## @@ -270,7 +284,7 @@ method hasBlock*(self: RepoStore, cid: Cid): Future[?!bool] {.async.} = method hasBlock*( self: RepoStore, treeCid: Cid, index: Natural -): Future[?!bool] {.async.} = +): Future[?!bool] {.async: (raises: [CancelledError]).} = without leafMd =? await self.getLeafMetadata(treeCid, index), err: if err of BlockNotFoundError: return success(false) @@ -281,12 +295,12 @@ method hasBlock*( method listBlocks*( self: RepoStore, blockType = BlockType.Manifest -): Future[?!AsyncIter[?Cid]] {.async.} = +): Future[?!SafeAsyncIter[Cid]] {.async: (raises: [CancelledError]).} = ## Get the list of blocks in the RepoStore. ## This is an intensive operation ## - var iter = AsyncIter[?Cid]() + var iter = SafeAsyncIter[Cid]() let key = case blockType @@ -299,7 +313,7 @@ method listBlocks*( trace "Error querying cids in repo", blockType, err = err.msg return failure(err) - proc next(): Future[?Cid] {.async.} = + proc next(): Future[?!Cid] {.async: (raises: [CancelledError]).} = await idleAsync() if queryIter.finished: iter.finish @@ -307,9 +321,9 @@ method listBlocks*( if pair =? (await queryIter.next()) and cid =? pair.key: doAssert pair.data.len == 0 trace "Retrieved record from repo", cid - return Cid.init(cid.value).option + return Cid.init(cid.value).mapFailure else: - return Cid.none + return Cid.failure("No or invalid Cid") iter.next = next return success iter @@ -332,7 +346,9 @@ proc blockRefCount*(self: RepoStore, cid: Cid): Future[?!Natural] {.async.} = method getBlockExpirations*( self: RepoStore, maxNumber: int, offset: int -): Future[?!AsyncIter[BlockExpiration]] {.async, base.} = +): Future[?!SafeAsyncIter[BlockExpiration]] {. + async: (raises: [CancelledError]), base, gcsafe +.} = ## Get iterator with block expirations ## @@ -344,26 +360,30 @@ method getBlockExpirations*( error "Unable to execute block expirations query", err = err.msg return failure(err) - without asyncQueryIter =? await queryIter.toAsyncIter(), err: + without asyncQueryIter =? (await queryIter.toSafeAsyncIter()), err: error "Unable to convert QueryIter to AsyncIter", err = err.msg return failure(err) - let filteredIter: AsyncIter[KeyVal[BlockMetadata]] = + let filteredIter: SafeAsyncIter[KeyVal[BlockMetadata]] = await asyncQueryIter.filterSuccess() - proc mapping(kv: KeyVal[BlockMetadata]): Future[?BlockExpiration] {.async.} = + proc mapping( + kvRes: ?!KeyVal[BlockMetadata] + ): Future[Option[?!BlockExpiration]] {.async: (raises: [CancelledError]).} = + without kv =? kvRes, err: + error "Error occurred when getting KeyVal", err = err.msg + return Result[BlockExpiration, ref CatchableError].none without cid =? Cid.init(kv.key.value).mapFailure, err: error "Failed decoding cid", err = err.msg - return BlockExpiration.none + return Result[BlockExpiration, ref CatchableError].none - BlockExpiration(cid: cid, expiry: kv.value.expiry).some + some(success(BlockExpiration(cid: cid, expiry: kv.value.expiry))) let blockExpIter = await mapFilter[KeyVal[BlockMetadata], BlockExpiration](filteredIter, mapping) - success(blockExpIter) -method close*(self: RepoStore): Future[void] {.async.} = +method close*(self: RepoStore): Future[void] {.async: (raises: []).} = ## Close the blockstore, cleaning up resources managed by it. ## For some implementations this may be a no-op ## @@ -371,10 +391,13 @@ method close*(self: RepoStore): Future[void] {.async.} = trace "Closing repostore" if not self.metaDs.isNil: - (await self.metaDs.close()).expect("Should meta datastore") + try: + (await noCancel self.metaDs.close()).expect("Should meta datastore") + except CatchableError as err: + error "Failed to close meta datastore", err = err.msg if not self.repoDs.isNil: - (await self.repoDs.close()).expect("Should repo datastore") + (await noCancel self.repoDs.close()).expect("Should repo datastore") ########################################################### # RepoStore procs @@ -400,7 +423,9 @@ proc release*( await self.updateQuotaUsage(minusReserved = bytes) -proc start*(self: RepoStore): Future[void] {.async.} = +proc start*( + self: RepoStore +): Future[void] {.async: (raises: [CancelledError, CodexError]).} = ## Start repo ## @@ -417,7 +442,7 @@ proc start*(self: RepoStore): Future[void] {.async.} = self.started = true -proc stop*(self: RepoStore): Future[void] {.async.} = +proc stop*(self: RepoStore): Future[void] {.async: (raises: []).} = ## Stop repo ## if not self.started: diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 64a356de5..2e06d39dc 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -125,7 +125,7 @@ method readOnce*( return read -method closeImpl*(self: StoreStream) {.async.} = +method closeImpl*(self: StoreStream) {.async: (raises: []).} = trace "Closing StoreStream" self.offset = self.size # set Eof await procCall LPStream(self).closeImpl() diff --git a/codex/utils.nim b/codex/utils.nim index 30d84e74b..9cea427e0 100644 --- a/codex/utils.nim +++ b/codex/utils.nim @@ -8,6 +8,8 @@ ## those terms. ## +{.push raises: [].} + import std/enumerate import std/parseutils import std/options @@ -17,8 +19,9 @@ import pkg/chronos import ./utils/asyncheapqueue import ./utils/fileutils import ./utils/asynciter +import ./utils/safeasynciter -export asyncheapqueue, fileutils, asynciter, chronos +export asyncheapqueue, fileutils, asynciter, safeasynciter, chronos when defined(posix): import os, posix diff --git a/codex/utils/asynciter.nim b/codex/utils/asynciter.nim index b5371d240..d87ff67f3 100644 --- a/codex/utils/asynciter.nim +++ b/codex/utils/asynciter.nim @@ -123,10 +123,10 @@ proc map*[T, U](iter: AsyncIter[T], fn: Function[T, Future[U]]): AsyncIter[U] = proc mapFilter*[T, U]( iter: AsyncIter[T], mapPredicate: Function[T, Future[Option[U]]] -): Future[AsyncIter[U]] {.async.} = +): Future[AsyncIter[U]] {.async: (raises: [CancelledError]).} = var nextFutU: Option[Future[U]] - proc tryFetch(): Future[void] {.async.} = + proc tryFetch(): Future[void] {.async: (raises: [CancelledError]).} = nextFutU = Future[U].none while not iter.finished: let futT = iter.next() @@ -157,7 +157,7 @@ proc mapFilter*[T, U]( proc filter*[T]( iter: AsyncIter[T], predicate: Function[T, Future[bool]] -): Future[AsyncIter[T]] {.async.} = +): Future[AsyncIter[T]] {.async: (raises: [CancelledError]).} = proc wrappedPredicate(t: T): Future[Option[T]] {.async.} = if await predicate(t): some(t) diff --git a/codex/utils/safeasynciter.nim b/codex/utils/safeasynciter.nim new file mode 100644 index 000000000..56e66f510 --- /dev/null +++ b/codex/utils/safeasynciter.nim @@ -0,0 +1,233 @@ +## Nim-Codex +## Copyright (c) 2025 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +{.push raises: [].} + +import std/sugar + +import pkg/questionable +import pkg/questionable/results +import pkg/chronos + +import ./iter + +## SafeAsyncIter[T] is similar to `AsyncIter[Future[T]]` +## but does not throw exceptions others than CancelledError. +## It is thus way easier to use with checked exceptions +## +## +## Public interface: +## +## Attributes +## - next - allows to set a custom function to be called when the next item is requested +## +## Operations: +## - new - to create a new async iterator (SafeAsyncIter) +## - finish - to finish the async iterator +## - finished - to check if the async iterator is finished +## - next - to get the next item from the async iterator +## - items - to iterate over the async iterator +## - pairs - to iterate over the async iterator and return the index of each item +## - mapAsync - to convert a regular sync iterator (Iter) to an async iter (SafeAsyncIter) +## - map - to convert one async iterator (SafeAsyncIter) to another async iter (SafeAsyncIter) +## - mapFilter - to convert one async iterator (SafeAsyncIter) to another async iter (SafeAsyncIter) and apply filtering at the same time +## - filter - to filter an async iterator (SafeAsyncIter) returning another async iterator (SafeAsyncIter) +## - delayBy - to delay each item returned by async iter by a given duration +## - empty - to create an empty async iterator (SafeAsyncIter) + +type + SafeFunction[T, U] = + proc(fut: T): Future[U] {.async: (raises: [CancelledError]), gcsafe, closure.} + SafeIsFinished = proc(): bool {.raises: [], gcsafe, closure.} + SafeGenNext[T] = proc(): Future[T] {.async: (raises: [CancelledError]), gcsafe.} + + SafeAsyncIter*[T] = ref object + finished: bool + next*: SafeGenNext[?!T] + +proc flatMap[T, U]( + fut: auto, fn: SafeFunction[?!T, ?!U] +): Future[?!U] {.async: (raises: [CancelledError]).} = + let t = await fut + await fn(t) + +proc flatMap[T, U]( + fut: auto, fn: SafeFunction[?!T, Option[?!U]] +): Future[Option[?!U]] {.async: (raises: [CancelledError]).} = + let t = await fut + await fn(t) + +######################################################################## +## SafeAsyncIter public interface methods +######################################################################## + +proc new*[T]( + _: type SafeAsyncIter[T], + genNext: SafeGenNext[?!T], + isFinished: IsFinished, + finishOnErr: bool = true, +): SafeAsyncIter[T] = + ## Creates a new Iter using elements returned by supplier function `genNext`. + ## Iter is finished whenever `isFinished` returns true. + ## + + var iter = SafeAsyncIter[T]() + + proc next(): Future[?!T] {.async: (raises: [CancelledError]).} = + try: + if not iter.finished: + let item = await genNext() + if finishOnErr and err =? item.errorOption: + iter.finished = true + return failure(err) + if isFinished(): + iter.finished = true + return item + else: + return failure("SafeAsyncIter is finished but next item was requested") + except CancelledError as err: + iter.finished = true + raise err + + if isFinished(): + iter.finished = true + + iter.next = next + return iter + +# forward declaration +proc mapAsync*[T, U]( + iter: Iter[T], fn: SafeFunction[T, ?!U], finishOnErr: bool = true +): SafeAsyncIter[U] + +proc new*[U, V: Ordinal]( + _: type SafeAsyncIter[U], slice: HSlice[U, V], finishOnErr: bool = true +): SafeAsyncIter[U] = + ## Creates new Iter from a slice + ## + + let iter = Iter[U].new(slice) + mapAsync[U, U]( + iter, + proc(i: U): Future[?!U] {.async: (raises: [CancelledError]).} = + success[U](i), + ) + +proc new*[U, V, S: Ordinal]( + _: type SafeAsyncIter[U], a: U, b: V, step: S = 1, finishOnErr: bool = true +): SafeAsyncIter[U] = + ## Creates new Iter in range a..b with specified step (default 1) + ## + + let iter = Iter[U].new(a, b, step) + mapAsync[U, U]( + iter, + proc(i: U): Future[?!U] {.async: (raises: [CancelledError]).} = + U.success(i), + finishOnErr = finishOnErr, + ) + +proc finish*[T](self: SafeAsyncIter[T]): void = + self.finished = true + +proc finished*[T](self: SafeAsyncIter[T]): bool = + self.finished + +iterator items*[T](self: SafeAsyncIter[T]): auto {.inline.} = + while not self.finished: + yield self.next() + +iterator pairs*[T](self: SafeAsyncIter[T]): auto {.inline.} = + var i = 0 + while not self.finished: + yield (i, self.next()) + inc(i) + +proc mapAsync*[T, U]( + iter: Iter[T], fn: SafeFunction[T, ?!U], finishOnErr: bool = true +): SafeAsyncIter[U] = + SafeAsyncIter[U].new( + genNext = () => fn(iter.next()), + isFinished = () => iter.finished(), + finishOnErr = finishOnErr, + ) + +proc map*[T, U]( + iter: SafeAsyncIter[T], fn: SafeFunction[?!T, ?!U], finishOnErr: bool = true +): SafeAsyncIter[U] = + SafeAsyncIter[U].new( + genNext = () => iter.next().flatMap(fn), + isFinished = () => iter.finished, + finishOnErr = finishOnErr, + ) + +proc mapFilter*[T, U]( + iter: SafeAsyncIter[T], + mapPredicate: SafeFunction[?!T, Option[?!U]], + finishOnErr: bool = true, +): Future[SafeAsyncIter[U]] {.async: (raises: [CancelledError]).} = + var nextU: Option[?!U] + + proc filter(): Future[void] {.async: (raises: [CancelledError]).} = + nextU = none(?!U) + while not iter.finished: + let futT = iter.next() + if mappedValue =? await futT.flatMap(mapPredicate): + nextU = some(mappedValue) + break + + proc genNext(): Future[?!U] {.async: (raises: [CancelledError]).} = + let u = nextU.unsafeGet + await filter() + u + + proc isFinished(): bool = + nextU.isNone + + await filter() + SafeAsyncIter[U].new(genNext, isFinished, finishOnErr = finishOnErr) + +proc filter*[T]( + iter: SafeAsyncIter[T], predicate: SafeFunction[?!T, bool], finishOnErr: bool = true +): Future[SafeAsyncIter[T]] {.async: (raises: [CancelledError]).} = + proc wrappedPredicate( + t: ?!T + ): Future[Option[?!T]] {.async: (raises: [CancelledError]).} = + if await predicate(t): + some(t) + else: + none(?!T) + + await mapFilter[T, T](iter, wrappedPredicate, finishOnErr = finishOnErr) + +proc delayBy*[T]( + iter: SafeAsyncIter[T], d: Duration, finishOnErr: bool = true +): SafeAsyncIter[T] = + ## Delays emitting each item by given duration + ## + + map[T, T]( + iter, + proc(t: ?!T): Future[?!T] {.async: (raises: [CancelledError]).} = + await sleepAsync(d) + return t, + finishOnErr = finishOnErr, + ) + +proc empty*[T](_: type SafeAsyncIter[T]): SafeAsyncIter[T] = + ## Creates an empty SafeAsyncIter + ## + + proc genNext(): Future[?!T] {.async: (raises: [CancelledError]).} = + T.failure("Next item requested from an empty SafeAsyncIter") + + proc isFinished(): bool = + true + + SafeAsyncIter[T].new(genNext, isFinished) diff --git a/codex/utils/timer.nim b/codex/utils/timer.nim index 5a9537cf2..b6aa9e7e8 100644 --- a/codex/utils/timer.nim +++ b/codex/utils/timer.nim @@ -10,17 +10,14 @@ ## Timer ## Used to execute a callback in a loop -import pkg/upraises - -push: - {.upraises: [].} +{.push raises: [].} import pkg/chronos import ../logutils type - TimerCallback* = proc(): Future[void] {.gcsafe, upraises: [].} + TimerCallback* = proc(): Future[void] {.gcsafe, async: (raises: []).} Timer* = ref object of RootObj callback: TimerCallback interval: Duration @@ -38,8 +35,6 @@ proc timerLoop(timer: Timer) {.async: (raises: []).} = await sleepAsync(timer.interval) except CancelledError: discard # do not propagate as timerLoop is asyncSpawned - except CatchableError as exc: - error "Timer caught unhandled exception: ", name = timer.name, msg = exc.msg method start*( timer: Timer, callback: TimerCallback, interval: Duration @@ -51,7 +46,7 @@ method start*( timer.interval = interval timer.loopFuture = timerLoop(timer) -method stop*(timer: Timer) {.async, base.} = +method stop*(timer: Timer) {.base, async: (raises: []).} = if timer.loopFuture != nil and not timer.loopFuture.finished: trace "Timer stopping: ", name = timer.name await timer.loopFuture.cancelAndWait() diff --git a/tests/codex/helpers/mockrepostore.nim b/tests/codex/helpers/mockrepostore.nim index 50b47f026..52e598d96 100644 --- a/tests/codex/helpers/mockrepostore.nim +++ b/tests/codex/helpers/mockrepostore.nim @@ -15,6 +15,7 @@ import pkg/questionable/results import pkg/codex/stores/repostore import pkg/codex/utils/asynciter +import pkg/codex/utils/safeasynciter type MockRepoStore* = ref object of RepoStore delBlockCids*: seq[Cid] @@ -22,19 +23,17 @@ type MockRepoStore* = ref object of RepoStore getBeOffset*: int testBlockExpirations*: seq[BlockExpiration] - getBlockExpirationsThrows*: bool -method delBlock*(self: MockRepoStore, cid: Cid): Future[?!void] {.async.} = +method delBlock*( + self: MockRepoStore, cid: Cid +): Future[?!void] {.async: (raises: [CancelledError]).} = self.delBlockCids.add(cid) self.testBlockExpirations = self.testBlockExpirations.filterIt(it.cid != cid) return success() method getBlockExpirations*( self: MockRepoStore, maxNumber: int, offset: int -): Future[?!AsyncIter[BlockExpiration]] {.async.} = - if self.getBlockExpirationsThrows: - raise new CatchableError - +): Future[?!SafeAsyncIter[BlockExpiration]] {.async: (raises: [CancelledError]).} = self.getBeMaxNumber = maxNumber self.getBeOffset = offset @@ -43,11 +42,13 @@ method getBlockExpirations*( limit = min(offset + maxNumber, len(testBlockExpirationsCpy)) let - iter1 = AsyncIter[int].new(offset ..< limit) + iter1 = SafeAsyncIter[int].new(offset ..< limit) iter2 = map[int, BlockExpiration]( iter1, - proc(i: int): Future[BlockExpiration] {.async.} = - testBlockExpirationsCpy[i], + proc(i: ?!int): Future[?!BlockExpiration] {.async: (raises: [CancelledError]).} = + if i =? i: + return success(testBlockExpirationsCpy[i]) + return failure("Unexpected error!"), ) success(iter2) diff --git a/tests/codex/helpers/mocktimer.nim b/tests/codex/helpers/mocktimer.nim index 60ec9c87a..8d7a59115 100644 --- a/tests/codex/helpers/mocktimer.nim +++ b/tests/codex/helpers/mocktimer.nim @@ -26,7 +26,7 @@ method start*(mockTimer: MockTimer, callback: timer.TimerCallback, interval: Dur mockTimer.mockInterval = interval inc mockTimer.startCalled -method stop*(mockTimer: MockTimer) {.async.} = +method stop*(mockTimer: MockTimer) {.async: (raises: []).} = inc mockTimer.stopCalled method invokeCallback*(mockTimer: MockTimer) {.async, base.} = diff --git a/tests/codex/node/testcontracts.nim b/tests/codex/node/testcontracts.nim index 73dd8dafe..4fe4b94fa 100644 --- a/tests/codex/node/testcontracts.nim +++ b/tests/codex/node/testcontracts.nim @@ -120,7 +120,9 @@ asyncchecksuite "Test Node - Host contracts": (getTime() + DefaultBlockTtl.toTimesDuration + 1.hours).toUnix.uint64 var fetchedBytes: uint = 0 - let onBlocks = proc(blocks: seq[bt.Block]): Future[?!void] {.async.} = + let onBlocks = proc( + blocks: seq[bt.Block] + ): Future[?!void] {.async: (raises: [CancelledError]).} = for blk in blocks: fetchedBytes += blk.data.len.uint return success() diff --git a/tests/codex/node/testnode.nim b/tests/codex/node/testnode.nim index bd5353368..78298ad75 100644 --- a/tests/codex/node/testnode.nim +++ b/tests/codex/node/testnode.nim @@ -73,7 +73,9 @@ asyncchecksuite "Test Node - Basic": await node.fetchBatched( manifest, batchSize = batchSize, - proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} = + proc( + blocks: seq[bt.Block] + ): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).} = check blocks.len > 0 and blocks.len <= batchSize return success(), ) @@ -96,7 +98,9 @@ asyncchecksuite "Test Node - Basic": await node.fetchBatched( manifest, batchSize = batchSize, - proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, async.} = + proc( + blocks: seq[bt.Block] + ): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).} = return failure("Should not be called"), ) ) diff --git a/tests/codex/sales/states/testfilled.nim b/tests/codex/sales/states/testfilled.nim index f077b780e..caf22ee6b 100644 --- a/tests/codex/sales/states/testfilled.nim +++ b/tests/codex/sales/states/testfilled.nim @@ -37,7 +37,7 @@ suite "sales state 'filled'": onExpiryUpdatePassedExpiry = -1 let onExpiryUpdate = proc( rootCid: Cid, expiry: SecondsSince1970 - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = onExpiryUpdatePassedExpiry = expiry return success() let context = SalesContext(market: market, onExpiryUpdate: some onExpiryUpdate) diff --git a/tests/codex/sales/states/testinitialproving.nim b/tests/codex/sales/states/testinitialproving.nim index cae0a0698..0e0058d5f 100644 --- a/tests/codex/sales/states/testinitialproving.nim +++ b/tests/codex/sales/states/testinitialproving.nim @@ -31,7 +31,7 @@ asyncchecksuite "sales state 'initialproving'": setup: let onProve = proc( slot: Slot, challenge: ProofChallenge - ): Future[?!Groth16Proof] {.async.} = + ): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} = receivedChallenge = challenge return success(proof) let context = SalesContext(onProve: onProve.some, market: market, clock: clock) @@ -88,7 +88,7 @@ asyncchecksuite "sales state 'initialproving'": test "switches to errored state when onProve callback fails": let onProveFailed: OnProve = proc( slot: Slot, challenge: ProofChallenge - ): Future[?!Groth16Proof] {.async.} = + ): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} = return failure("oh no!") let proofFailedContext = diff --git a/tests/codex/sales/states/testproving.nim b/tests/codex/sales/states/testproving.nim index 6b7e7bd40..ce7b8ba56 100644 --- a/tests/codex/sales/states/testproving.nim +++ b/tests/codex/sales/states/testproving.nim @@ -31,7 +31,7 @@ asyncchecksuite "sales state 'proving'": market = MockMarket.new() let onProve = proc( slot: Slot, challenge: ProofChallenge - ): Future[?!Groth16Proof] {.async.} = + ): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} = receivedChallenge = challenge return success(proof) let context = SalesContext(market: market, clock: clock, onProve: onProve.some) diff --git a/tests/codex/sales/states/testsimulatedproving.nim b/tests/codex/sales/states/testsimulatedproving.nim index c8f4ae1db..304933fe0 100644 --- a/tests/codex/sales/states/testsimulatedproving.nim +++ b/tests/codex/sales/states/testsimulatedproving.nim @@ -44,7 +44,7 @@ asyncchecksuite "sales state 'simulated-proving'": let onProve = proc( slot: Slot, challenge: ProofChallenge - ): Future[?!Groth16Proof] {.async.} = + ): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} = return success(proof) let context = SalesContext(market: market, clock: clock, onProve: onProve.some) agent = newSalesAgent(context, request.id, slot.slotIndex, request.some) diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index f4d9cbaee..300abc7cd 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -64,18 +64,18 @@ asyncchecksuite "Sales - start": reservations = sales.context.reservations sales.onStore = proc( request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = return success() sales.onExpiryUpdate = proc( rootCid: Cid, expiry: SecondsSince1970 - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = return success() queue = sales.context.slotQueue sales.onProve = proc( slot: Slot, challenge: ProofChallenge - ): Future[?!Groth16Proof] {.async.} = + ): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} = return success(proof) itemsProcessed = @[] expiry = (clock.now() + 42) @@ -185,18 +185,18 @@ asyncchecksuite "Sales": reservations = sales.context.reservations sales.onStore = proc( request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = return success() sales.onExpiryUpdate = proc( rootCid: Cid, expiry: SecondsSince1970 - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = return success() queue = sales.context.slotQueue sales.onProve = proc( slot: Slot, challenge: ProofChallenge - ): Future[?!Groth16Proof] {.async.} = + ): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} = return success(proof) await sales.start() itemsProcessed = @[] @@ -375,7 +375,7 @@ asyncchecksuite "Sales": test "availability size is reduced by request slot size when fully downloaded": sales.onStore = proc( request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = let blk = bt.Block.new(@[1.byte]).get await onBatch(blk.repeat(request.ask.slotSize.int)) @@ -388,7 +388,7 @@ asyncchecksuite "Sales": var slotIndex = 0.uint64 sales.onStore = proc( request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = slotIndex = slot let blk = bt.Block.new(@[1.byte]).get await onBatch(blk.repeat(request.ask.slotSize)) @@ -463,7 +463,7 @@ asyncchecksuite "Sales": var storingRequest: StorageRequest sales.onStore = proc( request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = storingRequest = request return success() @@ -476,7 +476,7 @@ asyncchecksuite "Sales": var storingSlot: uint64 sales.onStore = proc( request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = storingRequest = request storingSlot = slot return success() @@ -489,7 +489,7 @@ asyncchecksuite "Sales": let error = newException(IOError, "data retrieval failed") sales.onStore = proc( request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = return failure(error) createAvailability() await market.requestStorage(request) @@ -500,7 +500,7 @@ asyncchecksuite "Sales": var provingSlot: uint64 sales.onProve = proc( slot: Slot, challenge: ProofChallenge - ): Future[?!Groth16Proof] {.async.} = + ): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} = provingRequest = slot.request provingSlot = slot.slotIndex return success(Groth16Proof.example) @@ -540,8 +540,8 @@ asyncchecksuite "Sales": # which then calls the onClear callback sales.onProve = proc( slot: Slot, challenge: ProofChallenge - ): Future[?!Groth16Proof] {.async.} = - raise newException(IOError, "proof failed") + ): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} = + return failure("proof failed") var clearedRequest: StorageRequest var clearedSlotIndex: uint64 sales.onClear = proc(request: StorageRequest, slotIndex: uint64) = @@ -558,7 +558,7 @@ asyncchecksuite "Sales": let otherHost = Address.example sales.onStore = proc( request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = await sleepAsync(chronos.hours(1)) return success() createAvailability() @@ -574,7 +574,7 @@ asyncchecksuite "Sales": let origSize = availability.freeSize sales.onStore = proc( request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = await sleepAsync(chronos.hours(1)) return success() createAvailability() @@ -601,7 +601,7 @@ asyncchecksuite "Sales": let origSize = availability.freeSize sales.onStore = proc( request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false - ): Future[?!void] {.async.} = + ): Future[?!void] {.async: (raises: [CancelledError]).} = await sleepAsync(chronos.hours(1)) return success() createAvailability() diff --git a/tests/codex/stores/commonstoretests.nim b/tests/codex/stores/commonstoretests.nim index 5e722a8aa..e4287dd23 100644 --- a/tests/codex/stores/commonstoretests.nim +++ b/tests/codex/stores/commonstoretests.nim @@ -58,7 +58,7 @@ proc commonBlockStoreTests*( test "putBlock raises onBlockStored": var storedCid = Cid.example - proc onStored(cid: Cid) {.async.} = + proc onStored(cid: Cid) {.async: (raises: []).} = storedCid = cid store.onBlockStored = onStored.some() @@ -106,10 +106,10 @@ proc commonBlockStoreTests*( check not handle.failed check handle.read.isOk - let cids = (await store.listBlocks(blockType = BlockType.Block)).tryGet() + let cidsIter = (await store.listBlocks(blockType = BlockType.Block)).tryGet() var count = 0 - for c in cids: + for c in cidsIter: if cid =? await c: check (await store.hasBlock(cid)).tryGet() count.inc @@ -130,11 +130,11 @@ proc commonBlockStoreTests*( check not handle.failed check handle.read.isOk - let cids = (await store.listBlocks(blockType = BlockType.Manifest)).tryGet() + let cidsIter = (await store.listBlocks(blockType = BlockType.Manifest)).tryGet() var count = 0 - for c in cids: - if cid =? (await c): + for c in cidsIter: + if cid =? await c: check manifestBlock.cid == cid check (await store.hasBlock(cid)).tryGet() count.inc @@ -155,11 +155,11 @@ proc commonBlockStoreTests*( check not handle.failed check handle.read.isOk - let cids = (await store.listBlocks(blockType = BlockType.Both)).tryGet() + let cidsIter = (await store.listBlocks(blockType = BlockType.Both)).tryGet() var count = 0 - for c in cids: - if cid =? (await c): + for c in cidsIter: + if cid =? await c: check (await store.hasBlock(cid)).tryGet() count.inc diff --git a/tests/codex/stores/testmaintenance.nim b/tests/codex/stores/testmaintenance.nim index 89e757001..f56adcc3d 100644 --- a/tests/codex/stores/testmaintenance.nim +++ b/tests/codex/stores/testmaintenance.nim @@ -73,11 +73,6 @@ suite "BlockMaintainer": mockRepoStore.getBeMaxNumber == 2 mockRepoStore.getBeOffset == 0 - test "Timer callback should handle Catachable errors": - mockRepoStore.getBlockExpirationsThrows = true - blockMaintainer.start() - await mockTimer.invokeCallback() - test "Subsequent timer callback should call getBlockExpirations on RepoStore with offset": blockMaintainer.start() await mockTimer.invokeCallback() diff --git a/tests/codex/stores/testrepostore.nim b/tests/codex/stores/testrepostore.nim index 5274d0461..1666e44a5 100644 --- a/tests/codex/stores/testrepostore.nim +++ b/tests/codex/stores/testrepostore.nim @@ -15,7 +15,7 @@ import pkg/codex/stores import pkg/codex/stores/repostore/operations import pkg/codex/blocktype as bt import pkg/codex/clock -import pkg/codex/utils/asynciter +import pkg/codex/utils/safeasynciter import pkg/codex/merkletree/codex import ../../asynctest @@ -181,9 +181,8 @@ asyncchecksuite "RepoStore": var res = newSeq[BlockExpiration]() for fut in iter: - let be = await fut - res.add(be) - + if be =? (await fut): + res.add(be) res test "Should store block expiration timestamp": @@ -294,14 +293,14 @@ asyncchecksuite "RepoStore": test "Should retrieve block expiration information": proc unpack( - beIter: Future[?!AsyncIter[BlockExpiration]] - ): Future[seq[BlockExpiration]] {.async.} = + beIter: auto + ): Future[seq[BlockExpiration]] {.async: (raises: [CancelledError]).} = var expirations = newSeq[BlockExpiration](0) without iter =? (await beIter), err: return expirations for beFut in toSeq(iter): - let value = await beFut - expirations.add(value) + if value =? (await beFut): + expirations.add(value) return expirations let diff --git a/tests/codex/testutils.nim b/tests/codex/testutils.nim index 8e13f3991..a5346d481 100644 --- a/tests/codex/testutils.nim +++ b/tests/codex/testutils.nim @@ -2,6 +2,7 @@ import ./utils/testoptions import ./utils/testkeyutils import ./utils/testasyncstatemachine import ./utils/testasynciter +import ./utils/testsafeasynciter import ./utils/testtimer import ./utils/testtrackedfutures diff --git a/tests/codex/utils/testsafeasynciter.nim b/tests/codex/utils/testsafeasynciter.nim new file mode 100644 index 000000000..8ffbd9aab --- /dev/null +++ b/tests/codex/utils/testsafeasynciter.nim @@ -0,0 +1,389 @@ +import std/sugar +import pkg/questionable +import pkg/chronos +import pkg/codex/utils/iter +import pkg/codex/utils/safeasynciter + +import ../../asynctest +import ../helpers + +asyncchecksuite "Test SafeAsyncIter": + test "Should be finished": + let iter = SafeAsyncIter[int].empty() + + check: + iter.finished == true + + test "using with async generator": + let value = 1 + var intIter = Iter.new(0 ..< 5) + let expectedSeq = newSeqWith(5, intIter.next()) + intIter = Iter.new(0 ..< 5) + proc asyncGen(): Future[?!int] {.async: (raw: true, raises: [CancelledError]).} = + let fut = newFuture[?!int]() + fut.complete(success(intIter.next())) + return fut + + let iter = SafeAsyncIter[int].new(asyncGen, () => intIter.finished) + + var collected: seq[int] + for iFut in iter: + let iRes = await iFut + if i =? iRes: + collected.add(i) + else: + fail() + + check collected == expectedSeq + let nextRes = await iter.next() + assert nextRes.isFailure + check nextRes.error.msg == "SafeAsyncIter is finished but next item was requested" + + test "getting async iter for simple sync range iterator": + let iter1 = SafeAsyncIter[int].new(0 ..< 5) + + var collected: seq[int] + for iFut in iter1: + let iRes = await iFut + if i =? iRes: + collected.add(i) + else: + fail() + check: + collected == @[0, 1, 2, 3, 4] + + test "Should map each item using `map`": + let iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + + let iter2 = map[int, string]( + iter1, + proc(iRes: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? iRes: + return success($i) + else: + return failure("Some error"), + ) + + var collected: seq[string] + + for fut in iter2: + if i =? (await fut): + collected.add(i) + else: + fail() + + check: + collected == @["0", "1", "2", "3", "4"] + + test "Should leave only odd items using `filter`": + let + iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await filter[int]( + iter1, + proc(i: ?!int): Future[bool] {.async: (raises: [CancelledError]).} = + if i =? i: + return (i mod 2) == 1 + else: + return false, + ) + + var collected: seq[int] + + for fut in iter2: + if i =? (await fut): + collected.add(i) + else: + fail() + + check: + collected == @[1, 3] + + test "Should leave only odd items using `mapFilter`": + let + iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await mapFilter[int, string]( + iter1, + proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = + if i =? i: + if (i mod 2) == 1: + return some(success($i)) + Result[system.string, ref CatchableError].none, + ) + + var collected: seq[string] + + for fut in iter2: + if i =? (await fut): + collected.add(i) + else: + fail() + + check: + collected == @["1", "3"] + + test "Collecting errors on `map` when finish on error is true": + let + iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = map[int, string]( + iter1, + proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? i: + if i < 3: + return success($i) + else: + return failure("Error on item: " & $i) + return failure("Unexpected error"), + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter2: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["0", "1", "2"] + collectedFailure == @["Error on item: 3"] + iter2.finished + + test "Collecting errors on `map` when finish on error is false": + let + iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = map[int, string]( + iter1, + proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? i: + if i < 3: + return success($i) + else: + return failure("Error on item: " & $i) + return failure("Unexpected error"), + finishOnErr = false, + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter2: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["0", "1", "2"] + collectedFailure == @["Error on item: 3", "Error on item: 4"] + iter2.finished + + test "Collecting errors on `map` when errors are mixed with successes": + let + iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = map[int, string]( + iter1, + proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? i: + if i == 1 or i == 3: + return success($i) + else: + return failure("Error on item: " & $i) + return failure("Unexpected error"), + finishOnErr = false, + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter2: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["1", "3"] + collectedFailure == @["Error on item: 0", "Error on item: 2", "Error on item: 4"] + iter2.finished + + test "Collecting errors on `mapFilter` when finish on error is true": + let + iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await mapFilter[int, string]( + iter1, + proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = + if i =? i: + if i == 1: + return some(string.failure("Error on item: " & $i)) + elif i < 3: + return some(success($i)) + else: + return Result[system.string, ref CatchableError].none + return some(string.failure("Unexpected error")), + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter2: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["0"] + collectedFailure == @["Error on item: 1"] + iter2.finished + + test "Collecting errors on `mapFilter` when finish on error is false": + let + iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + iter2 = await mapFilter[int, string]( + iter1, + proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = + if i =? i: + if i == 1: + return some(string.failure("Error on item: " & $i)) + elif i < 3: + return some(success($i)) + else: + return Result[system.string, ref CatchableError].none + return some(string.failure("Unexpected error")), + finishOnErr = false, + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter2: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["0", "2"] + collectedFailure == @["Error on item: 1"] + iter2.finished + + test "Collecting errors on `filter` when finish on error is false": + let + iter1 = SafeAsyncIter[int].new(0 ..< 5) + iter2 = map[int, string]( + iter1, + proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? i: + if i == 1 or i == 2: + return failure("Error on item: " & $i) + elif i < 4: + return success($i) + return failure("Unexpected error"), + finishOnErr = false, + ) + iter3 = await filter[string]( + iter2, + proc(i: ?!string): Future[bool] {.async: (raises: [CancelledError]).} = + without i =? i, err: + if err.msg == "Error on item: 1": + return false + else: + return true + if i == "0": + return false + else: + return true, + finishOnErr = false, + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter3: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["3"] + collectedFailure == @["Error on item: 2", "Unexpected error"] + iter3.finished + + test "Collecting errors on `filter` when finish on error is true": + let + iter1 = SafeAsyncIter[int].new(0 ..< 5) + iter2 = map[int, string]( + iter1, + proc(i: ?!int): Future[?!string] {.async: (raises: [CancelledError]).} = + if i =? i: + if i == 3: + return failure("Error on item: " & $i) + elif i < 3: + return success($i) + return failure("Unexpected error"), + finishOnErr = false, + ) + iter3 = await filter[string]( + iter2, + proc(i: ?!string): Future[bool] {.async: (raises: [CancelledError]).} = + without i =? i, err: + if err.msg == "Unexpected error": + return false + else: + return true + if i == "0": + return false + else: + return true, + ) + + var collectedSuccess: seq[string] + var collectedFailure: seq[string] + + for fut in iter3: + without i =? (await fut), err: + collectedFailure.add(err.msg) + continue + collectedSuccess.add(i) + + check: + collectedSuccess == @["1", "2"] + # On error iterator finishes and returns the error of the item + # that caused the error = that's why we see it here + collectedFailure == @["Error on item: 3"] + iter3.finished + + test "Should propagate cancellation error immediately": + let fut: Future[Option[?!string]].Raising([CancelledError]) = + Future[Option[?!string]].Raising([CancelledError]).init("testsafeasynciter") + + let iter1 = SafeAsyncIter[int].new(0 ..< 5).delayBy(10.millis) + let iter2 = await mapFilter[int, string]( + iter1, + proc(i: ?!int): Future[Option[?!string]] {.async: (raises: [CancelledError]).} = + if i =? i: + if (i < 3): + return some(success($i)) + return await fut, + ) + + proc cancelFut(): Future[void] {.async.} = + await sleepAsync(100.millis) + await fut.cancelAndWait() + + asyncSpawn(cancelFut()) + + var collected: seq[string] + + expect CancelledError: + for fut in iter2: + if i =? (await fut): + collected.add(i) + else: + fail() + + check: + collected == @["0", "1"] + iter2.finished diff --git a/tests/codex/utils/testtimer.nim b/tests/codex/utils/testtimer.nim index 2f356df9b..907404c2c 100644 --- a/tests/codex/utils/testtimer.nim +++ b/tests/codex/utils/testtimer.nim @@ -21,17 +21,14 @@ asyncchecksuite "Timer": var numbersState = 0 var lettersState = 'a' - proc numbersCallback(): Future[void] {.async.} = + proc numbersCallback(): Future[void] {.async: (raises: []).} = output &= $numbersState inc numbersState - proc lettersCallback(): Future[void] {.async.} = + proc lettersCallback(): Future[void] {.async: (raises: []).} = output &= $lettersState inc lettersState - proc exceptionCallback(): Future[void] {.async.} = - raise newException(CatchableError, "Test Exception") - proc startNumbersTimer() = timer1.start(numbersCallback, 10.milliseconds) @@ -73,11 +70,6 @@ asyncchecksuite "Timer": await sleepAsync(30.milliseconds) check output == stoppedOutput - test "Exceptions raised in timer callback are handled": - timer1.start(exceptionCallback, 10.milliseconds) - await sleepAsync(30.milliseconds) - await timer1.stop() - test "Starting both timers should execute callbacks sequentially": startNumbersTimer() startLettersTimer()