Skip to content

checked exceptions in stores #1179

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
25 changes: 13 additions & 12 deletions codex/blockexchange/engine/advertiser.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.

{.push raises: [].}

import pkg/chronos
import pkg/libp2p/cid
import pkg/libp2p/multicodec
Expand Down Expand Up @@ -81,16 +83,12 @@
proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
try:
while b.advertiserRunning:
try:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
except CatchableError as e:
error "Error in advertise local store loop", error = e.msgDetail
raiseAssert("Unexpected exception in advertiseLocalStoreLoop")
if cidsIter =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cidsIter:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."

await sleepAsync(b.advertiseLocalStoreLoopSleep)
except CancelledError:
Expand Down Expand Up @@ -126,8 +124,11 @@

trace "Advertiser start"

proc onBlock(cid: Cid) {.async.} =
await b.advertiseBlock(cid)
proc onBlock(cid: Cid) {.async: (raises: []).} =
try:
await b.advertiseBlock(cid)
except CancelledError:

Check warning on line 130 in codex/blockexchange/engine/advertiser.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/advertiser.nim#L130

Added line #L130 was not covered by tests
trace "Cancelled advertise block", cid

doAssert(b.localStore.onBlockStored.isNone())
b.localStore.onBlockStored = onBlock.some
Expand Down
4 changes: 2 additions & 2 deletions codex/blockexchange/engine/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@
trace "Block download cancelled"
if not handle.finished:
await handle.cancelAndWait()
except CatchableError as exc:
warn "Error downloadloading block", exc = exc.msg
except RetriesExhaustedError as exc:
warn "Retries exhausted for block", address, exc = exc.msg

Check warning on line 206 in codex/blockexchange/engine/engine.nim

View check run for this annotation

Codecov / codecov/patch

codex/blockexchange/engine/engine.nim#L206

Added line #L206 was not covered by tests
if not handle.finished:
handle.fail(exc)
finally:
Expand Down
4 changes: 3 additions & 1 deletion codex/errors.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
## This file may not be copied, modified, or distributed except according to
## those terms.

{.push raises: [].}

import std/options
import std/sugar
import std/sequtils
Expand Down Expand Up @@ -63,7 +65,7 @@ proc allFinishedFailed*[T](
return res

proc allFinishedValues*[T](
futs: seq[Future[T]]
futs: auto
): Future[?!seq[T]] {.async: (raises: [CancelledError]).} =
## If all futures have finished, return corresponding values,
## otherwise return failure
Expand Down
63 changes: 37 additions & 26 deletions codex/node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@
CodexNodeRef* = ref CodexNode

OnManifest* = proc(cid: Cid, manifest: Manifest): void {.gcsafe, raises: [].}
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].}
BatchProc* = proc(blocks: seq[bt.Block]): Future[?!void] {.
gcsafe, async: (raises: [CancelledError])
.}

func switch*(self: CodexNodeRef): Switch =
return self.switch
Expand Down Expand Up @@ -109,7 +111,9 @@

success blk

proc fetchManifest*(self: CodexNodeRef, cid: Cid): Future[?!Manifest] {.async.} =
proc fetchManifest*(
self: CodexNodeRef, cid: Cid
): Future[?!Manifest] {.async: (raises: [CancelledError]).} =
## Fetch and decode a manifest block
##

Expand Down Expand Up @@ -144,7 +148,7 @@

proc updateExpiry*(
self: CodexNodeRef, manifestCid: Cid, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
without manifest =? await self.fetchManifest(manifestCid), error:
trace "Unable to fetch manifest for cid", manifestCid
return failure(error)
Expand All @@ -154,7 +158,7 @@
self.networkStore.localStore.ensureExpiry(manifest.treeCid, it, expiry)
)

let res = await allFinishedFailed(ensuringFutures)
let res = await allFinishedFailed(cast[seq[Future[?!void]]](ensuringFutures))
if res.failure.len > 0:
trace "Some blocks failed to update expiry", len = res.failure.len
return failure("Some blocks failed to update expiry (" & $res.failure.len & " )")
Expand All @@ -172,7 +176,7 @@
batchSize = DefaultFetchBatch,
onBatch: BatchProc = nil,
fetchLocal = true,
): Future[?!void] {.async, gcsafe.} =
): Future[?!void] {.async: (raises: [CancelledError]), gcsafe.} =
## Fetch blocks in batches of `batchSize`
##

Expand All @@ -190,7 +194,10 @@
if not (await address in self.networkStore) or fetchLocal:
self.networkStore.getBlock(address)

without blockResults =? await allFinishedValues(blockFutures), err:
if blockFutures.len == 0:
continue

without blockResults =? await allFinishedValues[?!bt.Block](blockFutures), err:
trace "Some blocks failed to fetch", err = err.msg
return failure(err)

Expand All @@ -215,7 +222,7 @@
batchSize = DefaultFetchBatch,
onBatch: BatchProc = nil,
fetchLocal = true,
): Future[?!void] =
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
## Fetch manifest in batches of `batchSize`
##

Expand All @@ -240,16 +247,16 @@
error "Unable to fetch blocks", err = err.msg
except CancelledError as exc:
trace "Cancelled fetching blocks", exc = exc.msg
except CatchableError as exc:
error "Error fetching blocks", exc = exc.msg

proc fetchDatasetAsyncTask*(self: CodexNodeRef, manifest: Manifest) =
## Start fetching a dataset in the background.
## The task will be tracked and cleaned up on node shutdown.
##
self.trackedFutures.track(self.fetchDatasetAsync(manifest, fetchLocal = false))

proc streamSingleBlock(self: CodexNodeRef, cid: Cid): Future[?!LPStream] {.async.} =
proc streamSingleBlock(
self: CodexNodeRef, cid: Cid
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
## Streams the contents of a single block.
##
trace "Streaming single block", cid = cid
Expand All @@ -264,15 +271,17 @@
defer:
await stream.pushEof()
await stream.pushData(blk.data)
except CatchableError as exc:
except CancelledError as exc:

Check warning on line 274 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L274

Added line #L274 was not covered by tests
trace "Streaming block cancelled", cid, exc = exc.msg
except LPStreamError as exc:
trace "Unable to send block", cid, exc = exc.msg

self.trackedFutures.track(streamOneBlock())
LPStream(stream).success

proc streamEntireDataset(
self: CodexNodeRef, manifest: Manifest, manifestCid: Cid
): Future[?!LPStream] {.async.} =
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
## Streams the contents of the entire dataset described by the manifest.
##
trace "Retrieving blocks from manifest", manifestCid
Expand All @@ -294,14 +303,14 @@

jobs.add(erasureJob())

jobs.add(self.fetchDatasetAsync(manifest))
jobs.add(self.fetchDatasetAsync(manifest, fetchLocal = false))

# Monitor stream completion and cancel background jobs when done
proc monitorStream() {.async: (raises: []).} =
try:
await stream.join()
except CatchableError as exc:
warn "Stream failed", exc = exc.msg
except CancelledError as exc:
warn "Stream cancelled", exc = exc.msg
finally:
await noCancel allFutures(jobs.mapIt(it.cancelAndWait))

Expand All @@ -314,7 +323,7 @@

proc retrieve*(
self: CodexNodeRef, cid: Cid, local: bool = true
): Future[?!LPStream] {.async.} =
): Future[?!LPStream] {.async: (raises: [CancelledError]).} =
## Retrieve by Cid a single block or an entire dataset described by manifest
##

Expand Down Expand Up @@ -470,11 +479,11 @@
return manifestBlk.cid.success

proc iterateManifests*(self: CodexNodeRef, onManifest: OnManifest) {.async.} =
without cids =? await self.networkStore.listBlocks(BlockType.Manifest):
without cidsIter =? await self.networkStore.listBlocks(BlockType.Manifest):
warn "Failed to listBlocks"
return

for c in cids:
for c in cidsIter:
if cid =? await c:
without blk =? await self.networkStore.getBlock(cid):
warn "Failed to get manifest block by cid", cid
Expand Down Expand Up @@ -617,7 +626,7 @@
slotIdx: uint64,
blocksCb: BlocksCb,
isRepairing: bool = false,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
## store data in local storage
##

Expand Down Expand Up @@ -648,13 +657,15 @@
trace "Slot index not in manifest", slotIdx
return failure(newException(CodexError, "Slot index not in manifest"))

proc updateExpiry(blocks: seq[bt.Block]): Future[?!void] {.async.} =
proc updateExpiry(
blocks: seq[bt.Block]
): Future[?!void] {.async: (raises: [CancelledError]).} =
trace "Updating expiry for blocks", blocks = blocks.len

let ensureExpiryFutures =
blocks.mapIt(self.networkStore.ensureExpiry(it.cid, expiry.toSecondsSince1970))

let res = await allFinishedFailed(ensureExpiryFutures)
let res = await allFinishedFailed(cast[seq[Future[?!void]]](ensureExpiryFutures))
if res.failure.len > 0:
trace "Some blocks failed to update expiry", len = res.failure.len
return failure("Some blocks failed to update expiry (" & $res.failure.len & " )")
Expand Down Expand Up @@ -702,7 +713,7 @@

proc onProve(
self: CodexNodeRef, slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] {.async.} =
): Future[?!Groth16Proof] {.async: (raises: [CancelledError]).} =

Check warning on line 716 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L716

Added line #L716 was not covered by tests
## Generats a proof for a given slot and challenge
##

Expand Down Expand Up @@ -758,7 +769,7 @@

proc onExpiryUpdate(
self: CodexNodeRef, rootCid: Cid, expiry: SecondsSince1970
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
return await self.updateExpiry(rootCid, expiry)

proc onClear(self: CodexNodeRef, request: StorageRequest, slotIndex: uint64) =
Expand All @@ -781,12 +792,12 @@
slot: uint64,
onBatch: BatchProc,
isRepairing: bool = false,
): Future[?!void] =
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
self.onStore(request, slot, onBatch, isRepairing)

hostContracts.sales.onExpiryUpdate = proc(
rootCid: Cid, expiry: SecondsSince1970
): Future[?!void] =
): Future[?!void] {.async: (raw: true, raises: [CancelledError]).} =
self.onExpiryUpdate(rootCid, expiry)

hostContracts.sales.onClear = proc(request: StorageRequest, slotIndex: uint64) =
Expand All @@ -795,7 +806,7 @@

hostContracts.sales.onProve = proc(
slot: Slot, challenge: ProofChallenge
): Future[?!Groth16Proof] =
): Future[?!Groth16Proof] {.async: (raw: true, raises: [CancelledError]).} =

Check warning on line 809 in codex/node.nim

View check run for this annotation

Codecov / codecov/patch

codex/node.nim#L809

Added line #L809 was not covered by tests
# TODO: generate proof
self.onProve(slot, challenge)

Expand Down
10 changes: 6 additions & 4 deletions codex/rest/api.nim
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ proc retrieveCid(
## manner
##

var stream: LPStream
var lpStream: LPStream

var bytes = 0
try:
Expand All @@ -94,6 +94,8 @@ proc retrieveCid(
await resp.sendBody(error.msg)
return

lpStream = stream

# It is ok to fetch again the manifest because it will hit the cache
without manifest =? (await node.fetchManifest(cid)), err:
error "Failed to fetch manifest", err = err.msg
Expand Down Expand Up @@ -139,15 +141,15 @@ proc retrieveCid(
codex_api_downloads.inc()
except CancelledError as exc:
raise exc
except CatchableError as exc:
except LPStreamError as exc:
warn "Error streaming blocks", exc = exc.msg
resp.status = Http500
if resp.isPending():
await resp.sendBody(exc.msg)
finally:
info "Sent bytes", cid = cid, bytes
if not stream.isNil:
await stream.close()
if not lpStream.isNil:
await lpStream.close()

proc buildCorsHeaders(
httpMethod: string, allowedOrigin: Option[string]
Expand Down
14 changes: 8 additions & 6 deletions codex/sales/salescontext.nim
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,17 @@ type
slotQueue*: SlotQueue
simulateProofFailures*: int

BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.gcsafe, raises: [].}
BlocksCb* = proc(blocks: seq[bt.Block]): Future[?!void] {.
gcsafe, async: (raises: [CancelledError])
.}
OnStore* = proc(
request: StorageRequest, slot: uint64, blocksCb: BlocksCb, isRepairing: bool
): Future[?!void] {.gcsafe, upraises: [].}
): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).}
OnProve* = proc(slot: Slot, challenge: ProofChallenge): Future[?!Groth16Proof] {.
gcsafe, upraises: []
gcsafe, async: (raises: [CancelledError])
.}
OnExpiryUpdate* = proc(rootCid: Cid, expiry: SecondsSince1970): Future[?!void] {.
gcsafe, upraises: []
gcsafe, async: (raises: [CancelledError])
.}
OnClear* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].}
OnSale* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].}
OnClear* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].}
OnSale* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].}
4 changes: 3 additions & 1 deletion codex/sales/states/downloading.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ method run*(
reservationId = reservation.id
availabilityId = reservation.availabilityId

proc onBlocks(blocks: seq[bt.Block]): Future[?!void] {.async.} =
proc onBlocks(
blocks: seq[bt.Block]
): Future[?!void] {.async: (raises: [CancelledError]).} =
# release batches of blocks as they are written to disk and
# update availability size
var bytes: uint = 0
Expand Down
Loading
Loading