Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: testing with loom and poll #312

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
385 changes: 187 additions & 198 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ ceramic-store = { path = "./store" }
cid = { version = "0.11", features = ["serde-codec"] }
clap = { version = "4", features = ["derive", "env"] }
clap_mangen = "0.2.2"
concurrent-queue = "2.4.0"
console = { version = "0.15", default-features = false }
console-subscriber = "0.2"
criterion2 = "0.7.0"
Expand All @@ -70,6 +71,7 @@ deadqueue = "0.2.3"
derivative = "2.2"
derive_more = "0.99.17"
dirs-next = "2"
event-listener = "4.0.3"
expect-test = "1.4.1"
fastmurmur3 = "0.1.2"
fnv = "1.0.7"
Expand Down Expand Up @@ -101,12 +103,14 @@ keyed_priority_queue = "0.4.1"
lazy_static = "1.4"
libp2p = { version = "0.53", default-features = false }
libp2p-identity = { version = "0.2", features = ["peerid", "ed25519"] }
libp2p-swarm-test = { version = "0.3.0" }
loom = { version = "0.7.1", features = ["futures"] }
lru = "0.10"
mime = "0.3"
mime_classifier = "0.0.1"
mime_guess = "2.0.4"
minicbor = { version = "0.19.1", features = ["std", "derive", "half"] }
mockall = "0.11.4"
mockall = "0.12.1"
multiaddr = "0.18"
multibase = "0.9"
multihash = { version = "0.19" }
Expand All @@ -130,7 +134,7 @@ quic-rpc = { version = "0.3.2", default-features = false }
rand = "0.8.5"
rand_chacha = "0.3.1"
rayon = "1.5.3"
recon = { path = "./recon/" }
recon = { path = "./recon" }
regex = "1.7.1"
relative-path = "1.7.2"
reqwest = { version = "0.11.10", default-features = false }
Expand Down
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

RUSTFLAGS = -D warnings --cfg tokio_unstable
CARGO = RUSTFLAGS='${RUSTFLAGS}' cargo
CARGO_LOOM = RUSTFLAGS='${RUSTFLAGS} --cfg loomtest' cargo

RELEASE_LEVEL ?= minor

Expand Down Expand Up @@ -83,9 +84,9 @@ release-pr:
.PHONY: test
test:
# Test with default features
$(CARGO) test --locked --release
$(CARGO_LOOM) test --locked --release
# Test with all features
$(CARGO) test --locked --release --all-features
$(CARGO_LOOM) test --locked --release --all-features

.PHONY: check-fmt
check-fmt:
Expand Down
5 changes: 5 additions & 0 deletions p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ criterion2.workspace = true
rand_chacha.workspace = true
test-log.workspace = true
ceramic-store.workspace = true
mockall.workspace = true
recon = { workspace = true, features = ["test-utils"]}

[target.'cfg(loomtest)'.dependencies]
loom.workspace = true

[[bench]]
name = "lru_cache"
Expand Down
150 changes: 150 additions & 0 deletions p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,153 @@ where
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use cid::Cid;
use libp2p::swarm::{ConnectionId, ToSwarm};
use mockall::mock;
use prometheus_client::registry::Registry;
use recon::test_utils::{
InjectedEvent, MockReconForEventId, MockReconForInterest, TestBehaviour, TestSwarm,
};

fn convert_behaviour_event(ev: Event) -> Option<recon::libp2p::Event> {
match ev {
Event::Ping(_ev) => None,
Event::Identify(_ev) => None,
Event::Kademlia(_ev) => None,
Event::Mdns(_ev) => None,
Event::Bitswap(_ev) => None,
Event::Autonat(_ev) => None,
Event::Relay(_ev) => None,
Event::RelayClient(_ev) => None,
Event::Dcutr(_ev) => None,
Event::PeerManager(_ev) => None,
Event::Recon(ev) => Some(ev),
Event::Void => None,
}
}

mock! {
BitswapStore {}

#[async_trait::async_trait]
impl iroh_bitswap::Store for BitswapStore {
async fn get_size(&self, cid: &Cid) -> Result<usize>;
async fn get(&self, cid: &Cid) -> Result<Block>;
async fn has(&self, cid: &Cid) -> Result<bool>;
async fn put(&self, block: &Block) -> Result<bool>;
}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn behavior_runs() {
let mut cfg = Libp2pConfig::default();
cfg.autonat = false;
cfg.bitswap_server = false;
cfg.bitswap_client = false;
cfg.mdns = false;
cfg.relay_client = false;
cfg.relay_server = false;
let behavior = NodeBehaviour::new(
&Keypair::generate_ed25519(),
&cfg,
None,
Some((
MockReconForInterest::default(),
MockReconForEventId::default(),
)),
Arc::new(MockBitswapStore::default()),
Metrics::register(&mut Registry::default()),
)
.await
.unwrap();
let swarm = TestSwarm::from_behaviour(TestBehaviour {
inner: behavior,
convert: Box::new(|_, ev| {
if let ToSwarm::GenerateEvent(ev) = ev {
if let Some(ev) = convert_behaviour_event(ev) {
Some(ToSwarm::GenerateEvent(ev))
} else {
None
}
} else {
None
}
}),
api: Box::new(|_, _| ()),
});
let driver = swarm.drive();

tokio::time::sleep(Duration::from_secs(1)).await;

let stats = driver.stop().await;
assert!(stats.polled >= 1);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn behavior_polled_when_using_public_api() {
let mut mock_interest = MockReconForInterest::default();
mock_interest
.expect_clone()
.returning(|| MockReconForInterest::default());
let mut mock_event = MockReconForEventId::default();
mock_event
.expect_clone()
.returning(|| MockReconForEventId::default());
let mut cfg = Libp2pConfig::default();
cfg.autonat = false;
cfg.bitswap_server = false;
cfg.bitswap_client = false;
cfg.mdns = false;
cfg.relay_client = false;
cfg.relay_server = false;
let behavior = NodeBehaviour::new(
&Keypair::generate_ed25519(),
&cfg,
None,
Some((mock_interest, mock_event)),
Arc::new(MockBitswapStore::default()),
Metrics::register(&mut Registry::default()),
)
.await
.unwrap();
let swarm = TestSwarm::from_behaviour(TestBehaviour {
inner: behavior,
convert: Box::new(|_, ev| {
if let ToSwarm::GenerateEvent(ev) = ev {
if let Some(ev) = convert_behaviour_event(ev) {
Some(ToSwarm::GenerateEvent(ev))
} else {
None
}
} else {
None
}
}),
api: Box::new(|beh, _| {
beh.handle_established_inbound_connection(
ConnectionId::new_unchecked(0),
recon::test_utils::PEER_ID.parse().unwrap(),
&"/ip4/1.2.3.4/tcp/443".parse().unwrap(),
&"/ip4/1.2.3.4/tcp/443".parse().unwrap(),
)
.unwrap();
}),
});
let driver = swarm.drive();

tokio::time::sleep(Duration::from_secs(1)).await;

driver
.inject(InjectedEvent::Api("connect".to_string()))
.await;

tokio::time::sleep(Duration::from_secs(1)).await;

let stats = driver.stop().await;
assert!(stats.polled >= 2);
}
}
61 changes: 60 additions & 1 deletion p2p/src/behaviour/ceramic_peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl NetworkBehaviour for CeramicPeerManager {
fn poll(
&mut self,
cx: &mut Context<'_>,
) -> Poll<libp2p::swarm::ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
) -> Poll<ToSwarm<Self::ToSwarm, libp2p::swarm::THandlerInEvent<Self>>> {
for (peer_id, peer) in self.ceramic_peers.iter_mut() {
if let Some(mut dial_future) = peer.dial_future.take() {
match dial_future.as_mut().poll_unpin(cx) {
Expand Down Expand Up @@ -290,3 +290,62 @@ impl CeramicPeer {
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use libp2p::core::{ConnectedPoint, Endpoint};
use prometheus_client::registry::Registry;
use recon::test_utils::*;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn swarm_should_drive_poll() {
let metrics = Metrics::register(&mut Registry::default());
let beh = CeramicPeerManager::new(&[], metrics).unwrap();
let swarm = TestSwarm::from_behaviour(TestBehaviour {
inner: beh,
convert: Box::new(|_, _ev| None),
api: Box::new(|_, _| ()),
});

let driver = swarm.drive();

tokio::time::sleep(Duration::from_secs(1)).await;

let stats = driver.stop().await;

assert!(stats.polled >= 1);
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn swarm_should_not_poll_if_event_injected() {
let metrics = Metrics::register(&mut Registry::default());
let beh = CeramicPeerManager::new(&[], metrics).unwrap();
let swarm = TestSwarm::from_behaviour(TestBehaviour {
inner: beh,
convert: Box::new(|_, _ev| None),
api: Box::new(|_, _| ()),
});

let driver = swarm.drive();

let conn_id = ConnectionId::new_unchecked(0);
driver
.inject(InjectedEvent::InboundConnection(
PeerId::random(),
conn_id.clone(),
ConnectedPoint::Dialer {
address: "/ip4/1.2.3.4/tcp/443".parse().unwrap(),
role_override: Endpoint::Dialer,
},
))
.await;

tokio::time::sleep(Duration::from_secs(1)).await;

let stats = driver.stop().await;

assert!(stats.polled >= 1);
assert_eq!(*stats.handler_polled.get(&conn_id).unwrap(), 0);
dav1do marked this conversation as resolved.
Show resolved Hide resolved
}
}
14 changes: 12 additions & 2 deletions recon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ ceramic-metrics.workspace = true
futures.workspace = true
hex = "0.4.3"
libp2p-identity.workspace = true
libp2p.workspace = true
libp2p = { workspace = true, features = ["macros"] }
libp2p-swarm-test = { workspace = true, optional = true }
mockall = { workspace = true, optional = true }
multihash-codetable.workspace = true
prometheus-client.workspace = true
serde.workspace = true
Expand All @@ -33,7 +35,8 @@ codespan-reporting = "0.11.1"
expect-test.workspace = true
lalrpop-util = { version = "0.20.0", features = ["lexer"] }
libp2p = { workspace = true, features = ["ping"] }
libp2p-swarm-test = "0.3.0"
libp2p-swarm-test = { workspace = true }
mockall.workspace = true
pin-project = "1.1.3"
pretty = "0.12.1"
quickcheck = "1.0.3"
Expand All @@ -45,5 +48,12 @@ tokio-stream.workspace = true
tokio-util.workspace = true
tracing-subscriber.workspace = true

#[target.'cfg(loomtest)'.dependencies]
loom.workspace = true

[build-dependencies]
lalrpop = "0.20.0"

[features]
default = []
test-utils = ["mockall", "libp2p-swarm-test"]
12 changes: 6 additions & 6 deletions recon/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
//! Recon is a network protocol for set reconciliation
#![warn(missing_docs, missing_debug_implementations, clippy::all)]
#![warn(missing_docs, clippy::all)]

pub use crate::{
client::{Client, Server},
error::Error,
metrics::Metrics,
recon::{
btreestore::BTreeStore, AssociativeHash, EventIdStore, FullInterests, HashCount,
InsertResult, InterestProvider, InterestStore, Key, Range, Recon, ReconInterestProvider,
ReconItem, Store, SyncState,
AssociativeHash, EventIdStore, FullInterests, HashCount, InsertResult, InterestProvider,
InterestStore, Key, Range, Recon, ReconInterestProvider, ReconItem, Store, SyncState,
},
sha256a::Sha256a,
};
Expand All @@ -21,8 +20,9 @@ pub mod protocol;
mod recon;
mod sha256a;

#[cfg(test)]
mod tests;
/// Testing utilities related to recon
#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

/// A result type that wraps a recon Error
pub type Result<T> = std::result::Result<T, Error>;
Loading
Loading