Skip to content

Commit

Permalink
fix: Correctly handle raw-codec CIDs/blocks (#37)
Browse files Browse the repository at this point in the history
Major things in this PR:
- Correctly handle raw-codec CIDs/blocks, they were previously not transferred due to a bug in `IncrementalDagVerification`.
- Update wnfs-common to 0.1.26 so this crate is compatible with the latest rs-wnfs.
- Introduce another cache, a positive cache for checking if we already have a block. (Work on #28)
- Make sure the main request/response futures are `Send`
  • Loading branch information
matheus23 authored Jan 2, 2024
1 parent cb57d41 commit b22fb94
Show file tree
Hide file tree
Showing 15 changed files with 509 additions and 76 deletions.
31 changes: 27 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 8 additions & 7 deletions car-mirror-benches/benches/artificially_slow_blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use car_mirror::{
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use libipld::Cid;
use std::time::Duration;
use wnfs_common::{BlockStore, MemoryBlockStore};
use wnfs_common::{utils::CondSend, BlockStore, MemoryBlockStore};

pub fn push_throttled(c: &mut Criterion) {
let mut rvg = car_mirror::test_utils::Rvg::deterministic();
Expand All @@ -28,9 +28,9 @@ pub fn push_throttled(c: &mut Criterion) {
},
|(client_store, root)| {
let client_store = &ThrottledBlockStore(client_store);
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let server_store = &ThrottledBlockStore::new();
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -75,9 +75,9 @@ pub fn pull_throttled(c: &mut Criterion) {
},
|(server_store, root)| {
let server_store = &ThrottledBlockStore(server_store);
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let client_store = &ThrottledBlockStore::new();
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -109,15 +109,16 @@ pub fn pull_throttled(c: &mut Criterion) {
#[derive(Debug, Clone)]
struct ThrottledBlockStore(MemoryBlockStore);

#[async_trait(?Send)]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl BlockStore for ThrottledBlockStore {
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
let bytes = self.0.get_block(cid).await?;
async_std::task::sleep(Duration::from_micros(50)).await; // Block fetching is artifically slowed by 50 microseconds
Ok(bytes)
}

async fn put_block(&self, bytes: impl Into<Bytes>, codec: u64) -> Result<Cid> {
async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
self.0.put_block(bytes, codec).await
}
}
Expand Down
8 changes: 4 additions & 4 deletions car-mirror-benches/benches/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ pub fn push(c: &mut Criterion) {
(store, root)
},
|(ref client_store, root)| {
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let server_store = &MemoryBlockStore::new();
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -68,9 +68,9 @@ pub fn pull(c: &mut Criterion) {
(store, root)
},
|(ref server_store, root)| {
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let client_store = &MemoryBlockStore::new();
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down
8 changes: 4 additions & 4 deletions car-mirror-benches/benches/simulated_latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ pub fn pull_with_simulated_latency(
links_to_padded_ipld(block_padding),
));
let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap();
let cache = InMemoryCache::new(10_000);
let cache = InMemoryCache::new(10_000, 150_000);
(store, cache, root)
},
|(ref server_store, ref server_cache, root)| {
let client_store = &MemoryBlockStore::new();
let client_cache = &InMemoryCache::new(10_000);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -145,12 +145,12 @@ pub fn push_with_simulated_latency(
links_to_padded_ipld(block_padding),
));
let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap();
let cache = InMemoryCache::new(10_000);
let cache = InMemoryCache::new(10_000, 150_000);
(store, cache, root)
},
|(ref client_store, ref client_cache, root)| {
let server_store = &MemoryBlockStore::new();
let server_cache = &InMemoryCache::new(10_000);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down
2 changes: 1 addition & 1 deletion car-mirror-wasm/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)]
#![deny(unreachable_pub, private_in_public)]
#![deny(unreachable_pub)]

//! car-mirror
Expand Down
3 changes: 2 additions & 1 deletion car-mirror/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ thiserror = "1.0"
tokio = { version = "^1", default-features = false }
tracing = "0.1"
tracing-subscriber = "0.3"
wnfs-common = "0.1.24"
wnfs-common = "0.1.26"

[dev-dependencies]
async-std = { version = "1.11", features = ["attributes"] }
car-mirror = { path = ".", features = ["test_utils"] }
proptest = "1.1"
roaring-graphs = "0.12"
test-strategy = "0.3"
testresult = "0.3.0"

[features]
default = []
Expand Down
74 changes: 70 additions & 4 deletions car-mirror/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct Config {

/// Some information that the block receiving end provides the block sending end
/// in order to deduplicate block transfers.
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct ReceiverState {
/// At least *some* of the subgraph roots that are missing for sure on the receiving end.
pub missing_subgraph_roots: Vec<Cid>,
Expand All @@ -67,7 +67,7 @@ pub struct CarFile {
///
/// It returns a `CarFile` of (a subset) of all blocks below `root`, that
/// are thought to be missing on the receiving end.
#[instrument(skip(config, store, cache))]
#[instrument(skip_all, fields(root, last_state))]
pub async fn block_send(
root: Cid,
last_state: Option<ReceiverState>,
Expand Down Expand Up @@ -126,7 +126,7 @@ pub async fn block_send(
/// It takes a `CarFile`, verifies that its contents are related to the
/// `root` and returns some information to help the block sending side
/// figure out what blocks to send next.
#[instrument(skip(last_car, config, store, cache), fields(car_bytes = last_car.as_ref().map(|car| car.bytes.len())))]
#[instrument(skip_all, fields(root, car_bytes = last_car.as_ref().map(|car| car.bytes.len())))]
pub async fn block_receive(
root: Cid,
last_car: Option<CarFile>,
Expand Down Expand Up @@ -221,7 +221,7 @@ pub fn references<E: Extend<Cid>>(

async fn verify_missing_subgraph_roots(
root: Cid,
missing_subgraph_roots: &Vec<Cid>,
missing_subgraph_roots: &[Cid],
store: &impl BlockStore,
cache: &impl Cache,
) -> Result<Vec<Cid>, Error> {
Expand Down Expand Up @@ -450,3 +450,69 @@ impl Default for Config {
}
}
}

impl std::fmt::Debug for ReceiverState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let have_cids_bloom = self
.have_cids_bloom
.as_ref()
.map_or("None".into(), |bloom| {
format!(
"Some(BloomFilter(k_hashes = {}, {} bytes))",
bloom.hash_count(),
bloom.as_bytes().len()
)
});
f.debug_struct("ReceiverState")
.field(
"missing_subgraph_roots.len() == ",
&self.missing_subgraph_roots.len(),
)
.field("have_cids_bloom", &have_cids_bloom)
.finish()
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{test_utils::assert_cond_send_sync, traits::NoCache};
use testresult::TestResult;
use wnfs_common::MemoryBlockStore;

#[allow(clippy::unreachable, unused)]
fn test_assert_send() {
assert_cond_send_sync(|| {
block_send(
unimplemented!(),
unimplemented!(),
unimplemented!(),
unimplemented!() as &MemoryBlockStore,
&NoCache,
)
});
assert_cond_send_sync(|| {
block_receive(
unimplemented!(),
unimplemented!(),
unimplemented!(),
unimplemented!() as &MemoryBlockStore,
&NoCache,
)
})
}

#[test]
fn test_receiver_state_is_not_a_huge_debug() -> TestResult {
let state = ReceiverState {
have_cids_bloom: Some(BloomFilter::new_from_size(4096, 1000)),
missing_subgraph_roots: vec![Cid::default(); 1000],
};

let debug_print = format!("{state:#?}");

assert!(debug_print.len() < 1000);

Ok(())
}
}
42 changes: 37 additions & 5 deletions car-mirror/src/incremental_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,11 @@ impl IncrementalDagVerification {
Ok(this)
}

#[instrument(level = "trace", skip_all, fields(num_want = self.want_cids.len(), num_have = self.have_cids.len()))]
async fn update_have_cids(
/// Updates the state of incremental dag verification.
/// This goes through all "want" blocks and what they link to,
/// removing items that we now have and don't want anymore.
#[instrument(level = "trace", skip_all)]
pub async fn update_have_cids(
&mut self,
store: &impl BlockStore,
cache: &impl Cache,
Expand All @@ -68,25 +71,54 @@ impl IncrementalDagVerification {
if let Some(BlockStoreError::CIDNotFound(not_found)) =
e.downcast_ref::<BlockStoreError>()
{
self.want_cids.insert(*not_found);
tracing::trace!(%not_found, "Missing block, adding to want list");
self.mark_as_want(*not_found);
} else {
return Err(Error::BlockStoreError(e));
}
}
Err(e) => return Err(e),
Ok(Some(cid)) => {
self.want_cids.remove(&cid);
self.have_cids.insert(cid);
let not_found = matches!(
store.get_block(&cid).await,
Err(e) if matches!(e.downcast_ref(), Some(BlockStoreError::CIDNotFound(_)))
);

if not_found {
tracing::trace!(%cid, "Missing block, adding to want list");
self.mark_as_want(cid);
} else {
self.mark_as_have(cid);
}
}
Ok(None) => {
break;
}
}
}

tracing::debug!(
num_want = self.want_cids.len(),
num_have = self.have_cids.len(),
"Finished dag verification"
);

Ok(())
}

fn mark_as_want(&mut self, want: Cid) {
if self.have_cids.contains(&want) {
tracing::warn!(%want, "Marking a CID as wanted, that we have previously marked as having!");
self.have_cids.remove(&want);
}
self.want_cids.insert(want);
}

fn mark_as_have(&mut self, have: Cid) {
self.want_cids.remove(&have);
self.have_cids.insert(have);
}

/// Check the state of a CID to find out whether
/// - we expect it as one of the next possible blocks to receive (Want)
/// - we have already stored it (Have)
Expand Down
2 changes: 1 addition & 1 deletion car-mirror/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_debug_implementations, missing_docs, rust_2018_idioms)]
#![deny(unreachable_pub, private_in_public)]
#![deny(unreachable_pub)]

//! car-mirror
Expand Down
Loading

0 comments on commit b22fb94

Please sign in to comment.