diff --git a/crates/p2p/examples/stress_test_sync_client.rs b/crates/p2p/examples/stress_test_sync_client.rs new file mode 100644 index 0000000000..7bad105767 --- /dev/null +++ b/crates/p2p/examples/stress_test_sync_client.rs @@ -0,0 +1,146 @@ +#![deny(rust_2018_idioms)] + +use std::env::args; +use std::str::FromStr; +use std::time::Duration; + +use anyhow::Context; +use futures::StreamExt; +use libp2p::identity::Keypair; +use libp2p::multiaddr::Protocol; +use libp2p::Multiaddr; +use p2p::RateLimit; +use p2p_proto::common::{BlockNumberOrHash, Direction, Iteration}; +use p2p_proto::transaction::TransactionsRequest; +use pathfinder_common::ChainId; + +const USAGE: &str = "Usage: stress_test_sync_client \ + \ + "; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let server_addr = args().nth(1).context(USAGE)?; + let server_addr = Multiaddr::from_str(&server_addr).context(USAGE)?; + let server_peer_id = server_addr + .iter() + .find_map(|x| match x { + Protocol::P2p(peer_id) => Some(peer_id), + _ => None, + }) + .context(USAGE)?; + let max_concurrent_streams = args() + .nth(2) + .unwrap_or("100".to_string()) + .parse::() + .context(USAGE)?; + let num_requests = args() + .nth(3) + .unwrap_or("100".to_string()) + .parse::() + .context(USAGE)?; + let blocks_per_request = args() + .nth(4) + .unwrap_or("100".to_string()) + .parse::() + .context(USAGE)?; + let initial_delay_ms = args() + .nth(5) + .unwrap_or("0".to_string()) + .parse::() + .context(USAGE)?; + let initial_delay = Duration::from_millis(initial_delay_ms); + + let keypair = Keypair::generate_ed25519(); + let (client, mut event_rx, main_loop) = p2p::new( + keypair, + p2p::Config { + direct_connection_timeout: Duration::from_secs(60 * 60), + relay_connection_timeout: Duration::from_secs(1), + max_inbound_direct_peers: 10, + max_inbound_relayed_peers: 0, + max_outbound_peers: 10, + eviction_timeout: Duration::ZERO, + ip_whitelist: Default::default(), + bootstrap_period: None, + inbound_connections_rate_limit: RateLimit { + max: 10, + interval: Duration::from_secs(1), + }, + kad_name: None, + stream_timeout: Duration::from_secs(60 * 60), + max_concurrent_streams, + }, + ChainId::SEPOLIA_TESTNET, + ); + + let main_loop_handle = tokio::task::spawn(main_loop.run()); + + client + .start_listening("/ip4/0.0.0.0/tcp/0".parse().expect("Valid multiaddr")) + .await?; + + client.dial(server_peer_id, server_addr.clone()).await?; + + tracing::info!("Waiting to start sending requests..."); + + tokio::time::sleep(initial_delay).await; + + let client_fut = futures::stream::iter(0..num_requests).map(|start| { + let client = client.clone(); + async move { + tracing::info!(%start, "Requesting transactions for"); + match client + .send_transactions_sync_request( + server_peer_id, + TransactionsRequest { + iteration: Iteration { + start: BlockNumberOrHash::Number(start * blocks_per_request), + direction: Direction::Forward, + // Max allowed by pathfinder (as a server) + limit: blocks_per_request, + step: 1.into(), + }, + }, + ) + .await + { + Ok(mut rx) => { + let mut txn_counter = 0; + while let Some(response) = rx.next().await { + match response { + Ok(_) => { + txn_counter += 1; + } + Err(error) => { + tracing::warn!(%start, %error, "Failed to get response after {txn_counter} responses"); + return; + } + } + } + + tracing::info!(%start, "++++ Received {txn_counter} transactions for"); + } + Err(error) => tracing::warn!(%start, %error, "Failed to get response stream for"), + } + } + }).buffer_unordered(max_concurrent_streams).fold((), |_, _| async {}); + + tokio::select! { + event = event_rx.recv() => { + tracing::debug!("Received event: {:?}", event); + } + _ = main_loop_handle => { + println!("Main loop finished"); + } + _ = client_fut => { + println!("Client finished"); + } + } + + Ok(()) +} diff --git a/crates/p2p_stream/src/lib.rs b/crates/p2p_stream/src/lib.rs index 8124ea513a..31c088c553 100644 --- a/crates/p2p_stream/src/lib.rs +++ b/crates/p2p_stream/src/lib.rs @@ -164,7 +164,7 @@ impl fmt::Display for OutboundFailure { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { OutboundFailure::DialFailure => write!(f, "Failed to dial the requested peer"), - OutboundFailure::Timeout => write!(f, "Timeout while waiting for a response"), + OutboundFailure::Timeout => write!(f, "Opening outbound stream timed out"), OutboundFailure::ConnectionClosed => { write!(f, "Connection was closed before a response was received") } @@ -349,7 +349,7 @@ where /// connection is established. /// /// > **Note**: In order for such a dialing attempt to succeed, - /// > the `RequestResponse` protocol must be embedded + /// > the `p2p_stream` protocol must be embedded /// > in another `NetworkBehaviour` that provides peer and /// > address discovery. pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId { diff --git a/crates/pathfinder/src/bin/pathfinder/config.rs b/crates/pathfinder/src/bin/pathfinder/config.rs index a35f052b63..ee7faaf0db 100644 --- a/crates/pathfinder/src/bin/pathfinder/config.rs +++ b/crates/pathfinder/src/bin/pathfinder/config.rs @@ -567,7 +567,7 @@ Example: long_help = "Maximum allowed number of concurrent streams per each \ request/response-stream protocol per connection.", value_name = "LIMIT", - default_value = "1", + default_value = "100", env = "PATHFINDER_P2P_EXPERIMENTAL_MAX_CONCURRENT_STREAMS" )] max_concurrent_streams: usize, diff --git a/crates/pathfinder/src/p2p_network.rs b/crates/pathfinder/src/p2p_network.rs index cab8a2b717..b4faeaed12 100644 --- a/crates/pathfinder/src/p2p_network.rs +++ b/crates/pathfinder/src/p2p_network.rs @@ -137,27 +137,27 @@ async fn handle_p2p_event( p2p::Event::InboundHeadersSyncRequest { request, channel, .. } => { - get_headers(storage, request, channel).await?; + get_headers(storage, request, channel).await; } p2p::Event::InboundClassesSyncRequest { request, channel, .. } => { - get_classes(storage, request, channel).await?; + get_classes(storage, request, channel).await; } p2p::Event::InboundStateDiffsSyncRequest { request, channel, .. } => { - get_state_diffs(storage, request, channel).await?; + get_state_diffs(storage, request, channel).await; } p2p::Event::InboundTransactionsSyncRequest { request, channel, .. } => { - get_transactions(storage, request, channel).await?; + get_transactions(storage, request, channel).await; } p2p::Event::InboundEventsSyncRequest { request, channel, .. } => { - get_events(storage, request, channel).await?; + get_events(storage, request, channel).await; } p2p::Event::BlockPropagation { from, new_block } => { tracing::info!(%from, ?new_block, "Block Propagation"); diff --git a/crates/pathfinder/src/p2p_network/sync_handlers.rs b/crates/pathfinder/src/p2p_network/sync_handlers.rs index 7cd3452ef8..0239fb3af4 100644 --- a/crates/pathfinder/src/p2p_network/sync_handlers.rs +++ b/crates/pathfinder/src/p2p_network/sync_handlers.rs @@ -40,7 +40,7 @@ pub async fn get_headers( storage: Storage, request: BlockHeadersRequest, tx: futures::channel::mpsc::Sender, -) -> anyhow::Result<()> { +) { spawn_blocking_get(request, storage, blocking::get_headers, tx).await } @@ -48,7 +48,7 @@ pub async fn get_classes( storage: Storage, request: ClassesRequest, tx: futures::channel::mpsc::Sender, -) -> anyhow::Result<()> { +) { spawn_blocking_get(request, storage, blocking::get_classes, tx).await } @@ -56,7 +56,7 @@ pub async fn get_state_diffs( storage: Storage, request: StateDiffsRequest, tx: futures::channel::mpsc::Sender, -) -> anyhow::Result<()> { +) { spawn_blocking_get(request, storage, blocking::get_state_diffs, tx).await } @@ -64,7 +64,7 @@ pub async fn get_transactions( storage: Storage, request: TransactionsRequest, tx: futures::channel::mpsc::Sender, -) -> anyhow::Result<()> { +) { spawn_blocking_get(request, storage, blocking::get_transactions, tx).await } @@ -72,7 +72,7 @@ pub async fn get_events( storage: Storage, request: EventsRequest, tx: futures::channel::mpsc::Sender, -) -> anyhow::Result<()> { +) { spawn_blocking_get(request, storage, blocking::get_events, tx).await } @@ -399,16 +399,22 @@ fn get_start_block_number( } /// Spawns a blocking task and forwards the result to the given channel. -/// Bails out early if the database operation fails or sending fails. +/// **Does not wait for the DB operation to finish.** /// The `getter` function is expected to send partial results through the tokio /// channel as soon as possible, ideally after each database read operation. +/// +/// ## Important +/// +/// This function must detach the thread used to run the blocking DB operation +/// otherwise the entire p2p swarm will be blocked. +/// +/// Related issue: async fn spawn_blocking_get( request: Request, storage: Storage, getter: Getter, mut tx: futures::channel::mpsc::Sender, -) -> anyhow::Result<()> -where +) where Request: Send + 'static, Response: Send + 'static, Getter: FnOnce(Transaction<'_>, Request, mpsc::Sender) -> anyhow::Result<()> @@ -419,31 +425,25 @@ where let (sync_tx, mut rx) = mpsc::channel(1); // For backpressure - let db_fut = async { - util::task::spawn_blocking(move |_| { - let _g = span.enter(); - let mut connection = storage - .connection() - .context("Opening database connection")?; - let db_tx = connection - .transaction() - .context("Creating database transaction")?; - getter(db_tx, request, sync_tx) - }) - .await - .context("Database read panic or shutting down")? - .context("Database read") - }; - - let fwd_fut = async move { + // Detach so we can exit the function asap + util::task::spawn(async move { while let Some(x) = rx.recv().await { tx.send(x).await.context("Sending item")?; } Ok::<_, anyhow::Error>(()) - }; - // Bail out early, either when db fails or sending fails - tokio::try_join!(db_fut, fwd_fut)?; - Ok(()) + }); + + // Detach so we can exit the function asap + util::task::spawn_blocking(move |_| { + let _g = span.enter(); + let mut connection = storage + .connection() + .context("Opening database connection")?; + let db_tx = connection + .transaction() + .context("Creating database transaction")?; + getter(db_tx, request, sync_tx) + }); } /// Returns next block number considering direction.