Skip to content

Commit

Permalink
refactor: Take impl BlockStore instead of &impl BlockStore in gen…
Browse files Browse the repository at this point in the history
…eral (#40)

Also:
- Remove `async_trait`
- Expose blanket `impl<C: Cache> Cache for &C` and `Box<C>`
- Move the `trait` module to `cache`
- Remove `has_block` from `trait Cache`
- Add a blockstore wrapper utility `CacheMissing` under the `quick_cache` feature

---

* refactor: Remove `async_trait` dependency

* refactor: Use `impl Trait` instead of `&impl Trait` everywhere

* chore: Allow wnfs-wg git dependencies

* refactor: Blanket impl `Cache` for `&C` and `Box<C>`

Instead of blanket-implementing for all `Deref` types.

This makes error messages better & is generally seems to be the soft consensus.

* refactor: Use newer `BlockStore` trait methods

* feat: Implement `CacheMissing` blockstore wrapper

* feat: Use new error types for `BlockStoreError`

* chore: Fix lint

* chore: Update to released wnfs version 0.2.0
  • Loading branch information
matheus23 authored Feb 15, 2024
1 parent 3067020 commit 370fd83
Show file tree
Hide file tree
Showing 18 changed files with 746 additions and 636 deletions.
49 changes: 26 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 13 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,19 @@
members = [
"car-mirror",
"car-mirror-benches",
"car-mirror-wasm"
,
"examples"]
"car-mirror-wasm",
"examples"
]

[workspace.dependencies]
anyhow = "1.0"
async-stream = "0.3.5"
bytes = "1.4"
futures = "0.3"
libipld = "0.16"
libipld-core = "0.16"
serde_ipld_dagcbor = "0.4"
wnfs-common = { version = "0.2.0" }

# See https://doc.rust-lang.org/cargo/reference/profiles.html for more info.
[profile.release.package.car-mirror-wasm]
Expand Down
11 changes: 5 additions & 6 deletions car-mirror-benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,13 @@ edition = "2021"
authors = ["Philipp Krüger <[email protected]>"]

[dependencies]
anyhow = "1.0"
anyhow = { workspace = true }
async-std = { version = "1.11", features = ["attributes"] }
async-trait = "0.1"
bytes = "1.4.0"
bytes = { workspace = true }
car-mirror = { path = "../car-mirror", version = "0.1", features = ["test_utils", "quick_cache"] }
libipld = "0.16.0"
serde_ipld_dagcbor = "0.4.0"
wnfs-common = "0.1.23"
libipld = { workspace = true }
serde_ipld_dagcbor = { workspace = true }
wnfs-common = { workspace = true }

[dev-dependencies]
criterion = { version = "0.4", default-features = false }
Expand Down
64 changes: 42 additions & 22 deletions car-mirror-benches/benches/artificially_slow_blockstore.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use car_mirror::{
cache::{CacheMissing, InMemoryCache},
common::Config,
pull, push,
test_utils::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore},
traits::InMemoryCache,
};
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use libipld::Cid;
use std::time::Duration;
use wnfs_common::{utils::CondSend, BlockStore, MemoryBlockStore};
use wnfs_common::{utils::CondSend, BlockStore, BlockStoreError, MemoryBlockStore};

pub fn push_throttled(c: &mut Criterion) {
let mut rvg = car_mirror::test_utils::Rvg::deterministic();
Expand All @@ -27,27 +26,34 @@ pub fn push_throttled(c: &mut Criterion) {
(store, root)
},
|(client_store, root)| {
let client_store = &ThrottledBlockStore(client_store);
let client_cache = &InMemoryCache::new(10_000, 150_000);
let server_store = &ThrottledBlockStore::new();
let server_cache = &InMemoryCache::new(10_000, 150_000);
let client_store = &CacheMissing::new(100_000, ThrottledBlockStore(client_store));
let client_cache = &InMemoryCache::new(100_000);
let server_store = &CacheMissing::new(100_000, ThrottledBlockStore::new());
let server_cache = &InMemoryCache::new(100_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
async_std::task::block_on(async move {
let mut request =
push::request(root, None, config, client_store, client_cache).await?;
let mut last_response = None;
loop {
let request = push::request(
root,
last_response,
config,
client_store.clone(),
client_cache.clone(),
)
.await?;

let response =
push::response(root, request, config, server_store, server_cache)
.await?;

if response.indicates_finished() {
break;
}
request =
push::request(root, Some(response), config, client_store, client_cache)
.await?;

last_response = Some(response);
}

Ok::<(), anyhow::Error>(())
Expand All @@ -74,10 +80,10 @@ pub fn pull_throttled(c: &mut Criterion) {
(store, root)
},
|(server_store, root)| {
let server_store = &ThrottledBlockStore(server_store);
let server_cache = &InMemoryCache::new(10_000, 150_000);
let client_store = &ThrottledBlockStore::new();
let client_cache = &InMemoryCache::new(10_000, 150_000);
let server_store = &CacheMissing::new(100_000, ThrottledBlockStore(server_store));
let server_cache = &InMemoryCache::new(100_000);
let client_store = &CacheMissing::new(100_000, ThrottledBlockStore::new());
let client_cache = &InMemoryCache::new(100_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -109,18 +115,32 @@ pub fn pull_throttled(c: &mut Criterion) {
#[derive(Debug, Clone)]
struct ThrottledBlockStore(MemoryBlockStore);

#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl BlockStore for ThrottledBlockStore {
async fn get_block(&self, cid: &Cid) -> Result<Bytes> {
let bytes = self.0.get_block(cid).await?;
async fn get_block(&self, cid: &Cid) -> Result<Bytes, BlockStoreError> {
async_std::task::sleep(Duration::from_micros(50)).await; // Block fetching is artifically slowed by 50 microseconds
Ok(bytes)
self.0.get_block(cid).await
}

async fn put_block(&self, bytes: impl Into<Bytes> + CondSend, codec: u64) -> Result<Cid> {
async fn put_block(
&self,
bytes: impl Into<Bytes> + CondSend,
codec: u64,
) -> Result<Cid, BlockStoreError> {
self.0.put_block(bytes, codec).await
}

async fn put_block_keyed(
&self,
cid: Cid,
bytes: impl Into<Bytes> + CondSend,
) -> Result<(), BlockStoreError> {
self.0.put_block_keyed(cid, bytes).await
}

async fn has_block(&self, cid: &Cid) -> Result<bool, BlockStoreError> {
async_std::task::sleep(Duration::from_micros(50)).await; // Block fetching is artifically slowed by 50 microseconds
self.0.has_block(cid).await
}
}

impl ThrottledBlockStore {
Expand Down
10 changes: 5 additions & 5 deletions car-mirror-benches/benches/in_memory.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use car_mirror::{
cache::InMemoryCache,
common::Config,
pull, push,
test_utils::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore},
traits::InMemoryCache,
};
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use wnfs_common::MemoryBlockStore;
Expand All @@ -22,9 +22,9 @@ pub fn push(c: &mut Criterion) {
(store, root)
},
|(ref client_store, root)| {
let client_cache = &InMemoryCache::new(10_000, 150_000);
let client_cache = &InMemoryCache::new(100_000);
let server_store = &MemoryBlockStore::new();
let server_cache = &InMemoryCache::new(10_000, 150_000);
let server_cache = &InMemoryCache::new(100_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -68,9 +68,9 @@ pub fn pull(c: &mut Criterion) {
(store, root)
},
|(ref server_store, root)| {
let server_cache = &InMemoryCache::new(10_000, 150_000);
let server_cache = &InMemoryCache::new(100_000);
let client_store = &MemoryBlockStore::new();
let client_cache = &InMemoryCache::new(10_000, 150_000);
let client_cache = &InMemoryCache::new(100_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down
10 changes: 5 additions & 5 deletions car-mirror-benches/benches/simulated_latency.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use car_mirror::{
cache::InMemoryCache,
common::Config,
pull, push,
test_utils::{arb_ipld_dag, links_to_padded_ipld, setup_blockstore},
traits::InMemoryCache,
};
use criterion::{criterion_group, criterion_main, BatchSize, Criterion};
use std::{ops::Range, time::Duration};
Expand Down Expand Up @@ -68,12 +68,12 @@ pub fn pull_with_simulated_latency(
links_to_padded_ipld(block_padding),
));
let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap();
let cache = InMemoryCache::new(10_000, 150_000);
let cache = InMemoryCache::new(100_000);
(store, cache, root)
},
|(ref server_store, ref server_cache, root)| {
let client_store = &MemoryBlockStore::new();
let client_cache = &InMemoryCache::new(10_000, 150_000);
let client_cache = &InMemoryCache::new(100_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down Expand Up @@ -145,12 +145,12 @@ pub fn push_with_simulated_latency(
links_to_padded_ipld(block_padding),
));
let store = async_std::task::block_on(setup_blockstore(blocks)).unwrap();
let cache = InMemoryCache::new(10_000, 150_000);
let cache = InMemoryCache::new(100_000);
(store, cache, root)
},
|(ref client_store, ref client_cache, root)| {
let server_store = &MemoryBlockStore::new();
let server_cache = &InMemoryCache::new(10_000, 150_000);
let server_cache = &InMemoryCache::new(100_000);
let config = &Config::default();

// Simulate a multi-round protocol run in-memory
Expand Down
16 changes: 8 additions & 8 deletions car-mirror/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@ path = "src/lib.rs"
doctest = true

[dependencies]
anyhow = "1.0"
async-trait = "0.1.73"
bytes = "1.4"
anyhow = { workspace = true }
async-stream = { workspace = true }
bytes = { workspace = true }
deterministic-bloom = "0.1"
futures = "0.3"
futures = { workspace = true }
iroh-car = "0.4"
libipld = "0.16"
libipld-core = "0.16"
libipld = { workspace = true }
libipld-core = { workspace = true }
proptest = { version = "1.1", optional = true }
quick_cache = { version = "0.4", optional = true }
roaring-graphs = { version = "0.12", optional = true }
serde = "^1"
serde_ipld_dagcbor = "0.4"
serde_ipld_dagcbor = { workspace = true }
thiserror = "1.0"
tokio = { version = "^1", default-features = false }
tracing = "0.1"
wnfs-common = "0.1.26"
wnfs-common = { workspace = true }

[dev-dependencies]
async-std = { version = "1.11", features = ["attributes"] }
Expand Down
Loading

0 comments on commit 370fd83

Please sign in to comment.