Skip to content

Commit

Permalink
Merge pull request #29 from fission-codes/matheus23/tracing
Browse files Browse the repository at this point in the history
feat: Implement tracing & `thiserror` error types
  • Loading branch information
matheus23 authored Sep 1, 2023
2 parents 7e9b79a + 15a3cb6 commit 6cea1d2
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 47 deletions.
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions car-mirror/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ proptest = { version = "1.1", optional = true }
roaring-graphs = { version = "0.12", optional = true }
serde = "1.0.183"
serde_ipld_dagcbor = "0.4.0"
thiserror = "1.0.47"
tracing = "0.1"
tracing-subscriber = "0.3"
wnfs-common = "0.1.23"
Expand Down
124 changes: 107 additions & 17 deletions car-mirror/src/common.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use anyhow::{anyhow, bail, Result};
#![allow(unknown_lints)] // Because the `instrument` macro contains some `#[allow]`s that rust 1.66 doesn't know yet.

use anyhow::anyhow;
use bytes::Bytes;
use deterministic_bloom::runtime_size::BloomFilter;
use futures::TryStreamExt;
use iroh_car::{CarHeader, CarReader, CarWriter};
use libipld::{Ipld, IpldCodec};
use libipld_core::{cid::Cid, codec::References};
use std::io::Cursor;
use tracing::{debug, instrument, trace, warn};
use wnfs_common::BlockStore;

use crate::{
dag_walk::DagWalk,
error::Error,
incremental_verification::{BlockState, IncrementalDagVerification},
messages::{Bloom, PullRequest, PushResponse},
};
Expand Down Expand Up @@ -63,12 +67,13 @@ pub struct CarFile {
///
/// It returns a `CarFile` of (a subset) of all blocks below `root`, that
/// are thought to be missing on the receiving end.
#[instrument(skip(config, store))]
pub async fn block_send(
root: Cid,
last_state: Option<ReceiverState>,
config: &Config,
store: &impl BlockStore,
) -> Result<CarFile> {
) -> Result<CarFile, Error> {
let ReceiverState {
ref missing_subgraph_roots,
have_cids_bloom,
Expand All @@ -86,6 +91,30 @@ pub async fn block_send(
.try_collect()
.await?;

if subgraph_roots.len() != missing_subgraph_roots.len() {
let unrelated_roots = missing_subgraph_roots
.iter()
.filter(|cid| !subgraph_roots.contains(cid))
.map(|cid| cid.to_string())
.collect::<Vec<_>>()
.join(", ");

warn!(
unrelated_roots = %unrelated_roots,
"got asked for DAG-unrelated blocks"
);
}

if let Some(bloom) = &have_cids_bloom {
debug!(
size_bits = bloom.as_bytes().len() * 8,
hash_count = bloom.hash_count(),
ones_count = bloom.count_ones(),
estimated_fpr = bloom.current_false_positive_rate(),
"received 'have cids' bloom",
);
}

let bloom = have_cids_bloom.unwrap_or_else(|| BloomFilter::new_with(1, Box::new([0]))); // An empty bloom that contains nothing

let mut writer = CarWriter::new(
Expand All @@ -102,16 +131,35 @@ pub async fn block_send(
Vec::new(),
);

writer.write_header().await?;
writer
.write_header()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;

let mut block_bytes = 0;
let mut dag_walk = DagWalk::breadth_first(subgraph_roots.clone());
while let Some((cid, block)) = dag_walk.next(store).await? {
if bloom.contains(&cid.to_bytes()) && !subgraph_roots.contains(&cid) {
debug!(
cid = %cid,
bloom_contains = bloom.contains(&cid.to_bytes()),
subgraph_roots_contains = subgraph_roots.contains(&cid),
"skipped writing block"
);
continue;
}

writer.write(cid, &block).await?;
debug!(
cid = %cid,
num_bytes = block.len(),
frontier_size = dag_walk.frontier.len(),
"writing block to CAR",
);

writer
.write(cid, &block)
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;

// TODO(matheus23): Count the actual bytes sent?
// At the moment, this is a rough estimate. iroh-car could be improved to return the written bytes.
Expand All @@ -122,7 +170,11 @@ pub async fn block_send(
}

Ok(CarFile {
bytes: writer.finish().await?.into(),
bytes: writer
.finish()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?
.into(),
})
}

Expand All @@ -134,33 +186,49 @@ pub async fn block_send(
/// It takes a `CarFile`, verifies that its contents are related to the
/// `root` and returns some information to help the block sending side
/// figure out what blocks to send next.
#[instrument(skip(last_car, config, store), fields(car_bytes = last_car.as_ref().map(|car| car.bytes.len())))]
pub async fn block_receive(
root: Cid,
last_car: Option<CarFile>,
config: &Config,
store: &impl BlockStore,
) -> Result<ReceiverState> {
) -> Result<ReceiverState, Error> {
let mut dag_verification = IncrementalDagVerification::new([root], store).await?;

if let Some(car) = last_car {
let mut reader = CarReader::new(Cursor::new(car.bytes)).await?;
let mut reader = CarReader::new(Cursor::new(car.bytes))
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?;
let mut block_bytes = 0;

while let Some((cid, vec)) = reader.next_block().await? {
while let Some((cid, vec)) = reader
.next_block()
.await
.map_err(|e| Error::CarFileError(anyhow!(e)))?
{
let block = Bytes::from(vec);

debug!(
cid = %cid,
num_bytes = block.len(),
"reading block from CAR",
);

block_bytes += block.len();
if block_bytes > config.receive_maximum {
bail!(
"Received more than {} bytes ({block_bytes}), aborting request.",
config.receive_maximum
);
return Err(Error::TooManyBytes {
block_bytes,
receive_maximum: config.receive_maximum,
});
}

match dag_verification.block_state(cid) {
BlockState::Have => continue,
BlockState::Unexpected => {
eprintln!("Warn: Received block {cid} out of order, may be due to bloom false positive.");
trace!(
cid = %cid,
"received block out of order (possibly due to bloom false positive)"
);
break;
}
BlockState::Want => {
Expand Down Expand Up @@ -188,6 +256,14 @@ pub async fn block_receive(
});
}

if missing_subgraph_roots.is_empty() {
// We're done. No need to compute a bloom.
return Ok(ReceiverState {
missing_subgraph_roots,
have_cids_bloom: None,
});
}

let mut bloom =
BloomFilter::new_from_fpr_po2(bloom_capacity, (config.bloom_fpr)(bloom_capacity));

Expand All @@ -196,6 +272,15 @@ pub async fn block_receive(
.iter()
.for_each(|cid| bloom.insert(&cid.to_bytes()));

debug!(
inserted_elements = bloom_capacity,
size_bits = bloom.as_bytes().len() * 8,
hash_count = bloom.hash_count(),
ones_count = bloom.count_ones(),
estimated_fpr = bloom.current_false_positive_rate(),
"built 'have cids' bloom",
);

Ok(ReceiverState {
missing_subgraph_roots,
have_cids_bloom: Some(bloom),
Expand All @@ -207,13 +292,18 @@ pub async fn block_receive(
/// This will error out if
/// - the codec is not supported
/// - the block can't be parsed.
pub fn references<E: Extend<Cid>>(cid: Cid, block: impl AsRef<[u8]>, mut refs: E) -> Result<E> {
pub fn references<E: Extend<Cid>>(
cid: Cid,
block: impl AsRef<[u8]>,
mut refs: E,
) -> Result<E, Error> {
let codec: IpldCodec = cid
.codec()
.try_into()
.map_err(|_| anyhow!("Unsupported codec in Cid: {cid}"))?;
.map_err(|_| Error::UnsupportedCodec { cid })?;

<Ipld as References<IpldCodec>>::references(codec, &mut Cursor::new(block), &mut refs)?;
<Ipld as References<IpldCodec>>::references(codec, &mut Cursor::new(block), &mut refs)
.map_err(Error::ParsingError)?;
Ok(refs)
}

Expand Down Expand Up @@ -310,7 +400,7 @@ impl Default for Config {
send_minimum: 128 * 1024, // 128KiB
receive_maximum: 512 * 1024, // 512KiB
max_roots_per_round: 1000, // max. ~41KB of CIDs
bloom_fpr: |num_of_elems| 0.1 / num_of_elems as f64,
bloom_fpr: |num_of_elems| f64::min(0.001, 0.1 / num_of_elems as f64),
}
}
}
15 changes: 9 additions & 6 deletions car-mirror/src/dag_walk.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::common::references;
use anyhow::Result;
use crate::{common::references, error::Error};
use bytes::Bytes;
use futures::{stream::try_unfold, Stream};
use libipld_core::cid::Cid;
Expand Down Expand Up @@ -54,7 +53,7 @@ impl DagWalk {
/// Return the next node in the traversal.
///
/// Returns `None` if no nodes are left to be visited.
pub async fn next(&mut self, store: &impl BlockStore) -> Result<Option<(Cid, Bytes)>> {
pub async fn next(&mut self, store: &impl BlockStore) -> Result<Option<(Cid, Bytes)>, Error> {
let cid = loop {
let popped = if self.breadth_first {
self.frontier.pop_back()
Expand All @@ -75,7 +74,10 @@ impl DagWalk {
// TODO: Two opportunities for performance improvement:
// - skip Raw CIDs. They can't have further links (but needs adjustment to this function's return type)
// - run multiple `get_block` calls concurrently
let block = store.get_block(&cid).await?;
let block = store
.get_block(&cid)
.await
.map_err(Error::BlockStoreError)?;
for ref_cid in references(cid, &block, Vec::new())? {
if !self.visited.contains(&ref_cid) {
self.frontier.push_front(ref_cid);
Expand All @@ -89,7 +91,7 @@ impl DagWalk {
pub fn stream(
self,
store: &impl BlockStore,
) -> impl Stream<Item = Result<(Cid, Bytes)>> + Unpin + '_ {
) -> impl Stream<Item = Result<(Cid, Bytes), Error>> + Unpin + '_ {
Box::pin(try_unfold(self, move |mut this| async move {
let maybe_block = this.next(store).await?;
Ok(maybe_block.map(|b| (b, this)))
Expand All @@ -110,7 +112,7 @@ impl DagWalk {
}

/// Skip a node from the traversal for now.
pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<()> {
pub fn skip_walking(&mut self, block: (Cid, Bytes)) -> Result<(), Error> {
let (cid, bytes) = block;
let refs = references(cid, bytes, HashSet::new())?;
self.visited.insert(cid);
Expand All @@ -124,6 +126,7 @@ impl DagWalk {
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use futures::TryStreamExt;
use libipld::Ipld;
use wnfs_common::MemoryBlockStore;
Expand Down
Loading

0 comments on commit 6cea1d2

Please sign in to comment.