From 6252e845b108d2d4968a7c1c9d460dc77b1a4d85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philipp=20Kr=C3=BCger?= Date: Mon, 28 Aug 2023 14:20:18 +0200 Subject: [PATCH] feat: Implement `thiserror` types --- Cargo.lock | 9 ++- car-mirror/Cargo.toml | 1 + car-mirror/src/common.rs | 52 ++++++++---- car-mirror/src/dag_walk.rs | 15 ++-- car-mirror/src/error.rs | 92 ++++++++++++++++++++++ car-mirror/src/incremental_verification.rs | 45 +++++++---- car-mirror/src/lib.rs | 2 + car-mirror/src/pull.rs | 6 +- car-mirror/src/push.rs | 6 +- 9 files changed, 183 insertions(+), 45 deletions(-) create mode 100644 car-mirror/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 32f2802..1eb49fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,6 +391,7 @@ dependencies = [ "serde", "serde_ipld_dagcbor", "test-strategy", + "thiserror", "tracing", "tracing-subscriber", "wnfs-common", @@ -1791,18 +1792,18 @@ checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" [[package]] name = "thiserror" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" dependencies = [ "proc-macro2", "quote", diff --git a/car-mirror/Cargo.toml b/car-mirror/Cargo.toml index 3a59ad5..89b7e99 100644 --- a/car-mirror/Cargo.toml +++ b/car-mirror/Cargo.toml @@ -36,6 +36,7 @@ proptest = { version = "1.1", optional = true } roaring-graphs = { version = "0.12", optional = true } serde = "1.0.183" serde_ipld_dagcbor = "0.4.0" +thiserror = "1.0.47" tracing = "0.1" tracing-subscriber = "0.3" wnfs-common = "0.1.23" diff --git a/car-mirror/src/common.rs b/car-mirror/src/common.rs index a6f5184..3816cfb 100644 --- a/car-mirror/src/common.rs +++ b/car-mirror/src/common.rs @@ -1,4 +1,4 @@ -use anyhow::{anyhow, bail, Result}; +use anyhow::anyhow; use bytes::Bytes; use deterministic_bloom::runtime_size::BloomFilter; use futures::TryStreamExt; @@ -11,6 +11,7 @@ use wnfs_common::BlockStore; use crate::{ dag_walk::DagWalk, + error::Error, incremental_verification::{BlockState, IncrementalDagVerification}, messages::{Bloom, PullRequest, PushResponse}, }; @@ -70,7 +71,7 @@ pub async fn block_send( last_state: Option, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { let ReceiverState { ref missing_subgraph_roots, have_cids_bloom, @@ -128,7 +129,10 @@ pub async fn block_send( Vec::new(), ); - writer.write_header().await?; + writer + .write_header() + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))?; let mut block_bytes = 0; let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone()); @@ -150,7 +154,10 @@ pub async fn block_send( "writing block to CAR", ); - writer.write(cid, &block).await?; + writer + .write(cid, &block) + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))?; // TODO(matheus23): Count the actual bytes sent? // At the moment, this is a rough estimate. iroh-car could be improved to return the written bytes. @@ -161,7 +168,11 @@ pub async fn block_send( } Ok(CarFile { - bytes: writer.finish().await?.into(), + bytes: writer + .finish() + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))? + .into(), }) } @@ -179,14 +190,20 @@ pub async fn block_receive( last_car: Option, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { let mut dag_verification = IncrementalDagVerification::new([root], store).await?; if let Some(car) = last_car { - let mut reader = CarReader::new(Cursor::new(car.bytes)).await?; + let mut reader = CarReader::new(Cursor::new(car.bytes)) + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))?; let mut block_bytes = 0; - while let Some((cid, vec)) = reader.next_block().await? { + while let Some((cid, vec)) = reader + .next_block() + .await + .map_err(|e| Error::CarFileError(anyhow!(e)))? + { let block = Bytes::from(vec); debug!( @@ -197,10 +214,10 @@ pub async fn block_receive( block_bytes += block.len(); if block_bytes > config.receive_maximum { - bail!( - "Received more than {} bytes ({block_bytes}), aborting request.", - config.receive_maximum - ); + return Err(Error::TooManyBytes { + block_bytes, + receive_maximum: config.receive_maximum, + }); } match dag_verification.block_state(cid) { @@ -273,13 +290,18 @@ pub async fn block_receive( /// This will error out if /// - the codec is not supported /// - the block can't be parsed. -pub fn references>(cid: Cid, block: impl AsRef<[u8]>, mut refs: E) -> Result { +pub fn references>( + cid: Cid, + block: impl AsRef<[u8]>, + mut refs: E, +) -> Result { let codec: IpldCodec = cid .codec() .try_into() - .map_err(|_| anyhow!("Unsupported codec in Cid: {cid}"))?; + .map_err(|_| Error::UnsupportedCodec { cid })?; - >::references(codec, &mut Cursor::new(block), &mut refs)?; + >::references(codec, &mut Cursor::new(block), &mut refs) + .map_err(Error::ParsingError)?; Ok(refs) } diff --git a/car-mirror/src/dag_walk.rs b/car-mirror/src/dag_walk.rs index 3f27e7e..1ae84a7 100644 --- a/car-mirror/src/dag_walk.rs +++ b/car-mirror/src/dag_walk.rs @@ -1,5 +1,4 @@ -use crate::common::references; -use anyhow::Result; +use crate::{common::references, error::Error}; use bytes::Bytes; use futures::{stream::try_unfold, Stream}; use libipld_core::cid::Cid; @@ -54,7 +53,7 @@ impl DagWalk { /// Return the next node in the traversal. /// /// Returns `None` if no nodes are left to be visited. - pub async fn next(&mut self, store: &impl BlockStore) -> Result> { + pub async fn next(&mut self, store: &impl BlockStore) -> Result, Error> { let cid = loop { let popped = if self.breadth_first { self.frontier.pop_back() @@ -75,7 +74,10 @@ impl DagWalk { // TODO: Two opportunities for performance improvement: // - skip Raw CIDs. They can't have further links (but needs adjustment to this function's return type) // - run multiple `get_block` calls concurrently - let block = store.get_block(&cid).await?; + let block = store + .get_block(&cid) + .await + .map_err(Error::BlockStoreError)?; for ref_cid in references(cid, &block, Vec::new())? { if !self.visited.contains(&ref_cid) { self.frontier.push_front(ref_cid); @@ -89,7 +91,7 @@ impl DagWalk { pub fn stream( self, store: &impl BlockStore, - ) -> impl Stream> + Unpin + '_ { + ) -> impl Stream> + Unpin + '_ { Box::pin(try_unfold(self, move |mut this| async move { let maybe_block = this.next(store).await?; Ok(maybe_block.map(|b| (b, this))) @@ -110,7 +112,7 @@ impl DagWalk { } /// Skip a node from the traversal for now. - pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<()> { + pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<(), Error> { let (cid, bytes) = block; let refs = references(cid, bytes, HashSet::new())?; self.visited.insert(cid); @@ -124,6 +126,7 @@ impl DagWalk { #[cfg(test)] mod tests { use super::*; + use anyhow::Result; use futures::TryStreamExt; use libipld::Ipld; use wnfs_common::MemoryBlockStore; diff --git a/car-mirror/src/error.rs b/car-mirror/src/error.rs new file mode 100644 index 0000000..1612f55 --- /dev/null +++ b/car-mirror/src/error.rs @@ -0,0 +1,92 @@ +use libipld::Cid; + +use crate::incremental_verification::BlockState; + +/// Errors raised from the CAR mirror library +#[derive(thiserror::Error, Debug)] +pub enum Error { + /// An error raised during receival of blocks, when more than the configured maximum + /// bytes are received in a single batch. See the `Config` type. + #[error("Received more than {receive_maximum} bytes ({block_bytes}), aborting request.")] + TooManyBytes { + /// The configured amount of maximum bytes to receive + receive_maximum: usize, + /// The actual amount of bytes received so far + block_bytes: usize, + }, + + /// This library only supports a subset of default codecs, including DAG-CBOR, DAG-JSON, DAG-PB and more.g + /// This is raised if an unknown codec is read from a CID. See the `libipld` library for more information. + #[error("Unsupported codec in Cid: {cid}")] + UnsupportedCodec { + /// The CID with the unsupported codec + cid: Cid, + }, + + /// This library only supports a subset of default hash functions, including SHA-256, SHA-3, BLAKE3 and more. + /// This is raised if an unknown hash code is read from a CID. See the `libipld` library for more information. + #[error("Unsupported hash code in CID {cid}")] + UnsupportedHashCode { + /// The CID with the unsupported hash function + cid: Cid, + }, + + /// This error is raised when the hash function that the `BlockStore` uses a different hashing function + /// than the blocks which are received over the wire. + /// This error will be removed in the future, when the block store trait gets modified to support specifying + /// the hash function. + #[error("BlockStore uses an incompatible hashing function: CID mismatched, expected {cid}, got {actual_cid}")] + BlockStoreIncompatible { + /// The expected CID + cid: Box, + /// The CID returned from the BlockStore implementation + actual_cid: Box, + }, + + // ------------- + // Anyhow Errors + // ------------- + /// An error raised when trying to parse a block (e.g. to look for further links) + #[error("Error during block parsing: {0}")] + ParsingError(anyhow::Error), + + /// An error rasied when trying to read or write a CAR file. + #[error("CAR (de)serialization error: {0}")] + CarFileError(anyhow::Error), + + /// An error rasied from the blockstore. + #[error("BlockStore error: {0}")] + BlockStoreError(anyhow::Error), + + // ---------- + // Sub-errors + // ---------- + /// Errors related to incremental verification + #[error(transparent)] + IncrementalVerificationError(#[from] IncrementalVerificationError), +} + +/// Errors related to incremental verification +#[derive(thiserror::Error, Debug)] +pub enum IncrementalVerificationError { + /// Raised when we receive a block with a CID that we don't expect. + /// We only expect blocks when they're related to the root CID of a DAG. + /// So a CID needs to have a path back to the root. + #[error("Expected to want block {cid}, but block state is: {block_state:?}")] + ExpectedWantedBlock { + /// The CID of the block we're currently processing + cid: Box, + /// The block state it has during incremental verification. + /// So either we already have it or it's unexpected. + block_state: BlockState, + }, + + /// Raised when the block stored in the CAR file doesn't match its hash. + #[error("Digest mismatch in CAR file: expected {cid}, got {actual_cid}")] + DigestMismatch { + /// The expected CID + cid: Box, + /// The CID it actually hashes to + actual_cid: Box, + }, +} diff --git a/car-mirror/src/incremental_verification.rs b/car-mirror/src/incremental_verification.rs index c65171b..04f7152 100644 --- a/car-mirror/src/incremental_verification.rs +++ b/car-mirror/src/incremental_verification.rs @@ -1,5 +1,7 @@ -use crate::dag_walk::DagWalk; -use anyhow::{anyhow, bail, Result}; +use crate::{ + dag_walk::DagWalk, + error::{Error, IncrementalVerificationError}, +}; use bytes::Bytes; use libipld_core::{ cid::Cid, @@ -37,7 +39,7 @@ impl IncrementalDagVerification { pub async fn new( roots: impl IntoIterator, store: &impl BlockStore, - ) -> Result { + ) -> Result { let mut this = Self { want_cids: roots.into_iter().collect(), have_cids: HashSet::new(), @@ -49,20 +51,21 @@ impl IncrementalDagVerification { } #[instrument(level = "trace", skip_all, fields(num_want = self.want_cids.len(), num_have = self.have_cids.len()))] - async fn update_have_cids(&mut self, store: &impl BlockStore) -> Result<()> { + async fn update_have_cids(&mut self, store: &impl BlockStore) -> Result<(), Error> { let mut dag_walk = DagWalk::breadth_first(self.want_cids.iter().cloned()); loop { match dag_walk.next(store).await { - Err(e) => { + Err(Error::BlockStoreError(e)) => { if let Some(BlockStoreError::CIDNotFound(not_found)) = e.downcast_ref::() { self.want_cids.insert(*not_found); } else { - bail!(e); + return Err(Error::BlockStoreError(e)); } } + Err(e) => return Err(e), Ok(Some((cid, _))) => { self.want_cids.remove(&cid); self.have_cids.insert(cid); @@ -106,33 +109,47 @@ impl IncrementalDagVerification { &mut self, block: (Cid, Bytes), store: &impl BlockStore, - ) -> Result<()> { + ) -> Result<(), Error> { let (cid, bytes) = block; let block_state = self.block_state(cid); if !matches!(block_state, BlockState::Want) { - bail!("Incremental verification failed. Block state is: {block_state:?}, expected BlockState::Want"); + return Err(IncrementalVerificationError::ExpectedWantedBlock { + cid: Box::new(cid), + block_state, + } + .into()); } let hash_func: Code = cid .hash() .code() .try_into() - .map_err(|_| anyhow!("Unsupported hash code in CID {cid}"))?; + .map_err(|_| Error::UnsupportedHashCode { cid })?; let hash = hash_func.digest(bytes.as_ref()); if &hash != cid.hash() { - let result_cid = Cid::new_v1(cid.codec(), hash); - bail!("Digest mismatch in CAR file: expected {cid}, got {result_cid}"); + let actual_cid = Cid::new_v1(cid.codec(), hash); + return Err(IncrementalVerificationError::DigestMismatch { + cid: Box::new(cid), + actual_cid: Box::new(actual_cid), + } + .into()); } - let result_cid = store.put_block(bytes, cid.codec()).await?; + let actual_cid = store + .put_block(bytes, cid.codec()) + .await + .map_err(Error::BlockStoreError)?; // TODO(matheus23): The BlockStore chooses the hashing function, // so it may choose a different hashing function, causing a mismatch - if result_cid != cid { - bail!("BlockStore uses an incompatible hashing function: CID mismatched, expected {cid}, got {result_cid}"); + if actual_cid != cid { + return Err(Error::BlockStoreIncompatible { + cid: Box::new(cid), + actual_cid: Box::new(actual_cid), + }); } self.update_have_cids(store).await?; diff --git a/car-mirror/src/lib.rs b/car-mirror/src/lib.rs index b40d1a0..5b256e3 100644 --- a/car-mirror/src/lib.rs +++ b/car-mirror/src/lib.rs @@ -13,6 +13,8 @@ pub mod test_utils; pub mod common; /// Algorithms for walking IPLD directed acyclic graphs pub mod dag_walk; +/// Error types +pub mod error; /// Algorithms for doing incremental verification of IPLD DAGs on the receiving end. pub mod incremental_verification; /// Data types that are sent over-the-wire and relevant serialization code. diff --git a/car-mirror/src/pull.rs b/car-mirror/src/pull.rs index 8bfdbb8..4b018fa 100644 --- a/car-mirror/src/pull.rs +++ b/car-mirror/src/pull.rs @@ -1,8 +1,8 @@ use crate::{ common::{block_receive, block_send, CarFile, Config, ReceiverState}, + error::Error, messages::PullRequest, }; -use anyhow::Result; use libipld::Cid; use wnfs_common::BlockStore; @@ -22,7 +22,7 @@ pub async fn request( last_response: Option, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { Ok(block_receive(root, last_response, config, store) .await? .into()) @@ -34,7 +34,7 @@ pub async fn response( request: PullRequest, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { let receiver_state = Some(ReceiverState::from(request)); block_send(root, receiver_state, config, store).await } diff --git a/car-mirror/src/push.rs b/car-mirror/src/push.rs index 0fb229d..aa5618b 100644 --- a/car-mirror/src/push.rs +++ b/car-mirror/src/push.rs @@ -1,8 +1,8 @@ use crate::{ common::{block_receive, block_send, CarFile, Config, ReceiverState}, + error::Error, messages::PushResponse, }; -use anyhow::Result; use libipld_core::cid::Cid; use wnfs_common::BlockStore; @@ -21,7 +21,7 @@ pub async fn request( last_response: Option, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { let receiver_state = last_response.map(ReceiverState::from); block_send(root, receiver_state, config, store).await } @@ -39,7 +39,7 @@ pub async fn response( request: CarFile, config: &Config, store: &impl BlockStore, -) -> Result { +) -> Result { Ok(block_receive(root, Some(request), config, store) .await? .into())