Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(p2p_network/sync_handlers): sync handlers wait for DB op to finish causing p2p server's swarm to stall #2594

Merged
merged 5 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 146 additions & 0 deletions crates/p2p/examples/stress_test_sync_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#![deny(rust_2018_idioms)]

use std::env::args;
use std::str::FromStr;
use std::time::Duration;

use anyhow::Context;
use futures::StreamExt;
use libp2p::identity::Keypair;
use libp2p::multiaddr::Protocol;
use libp2p::Multiaddr;
use p2p::RateLimit;
use p2p_proto::common::{BlockNumberOrHash, Direction, Iteration};
use p2p_proto::transaction::TransactionsRequest;
use pathfinder_common::ChainId;

const USAGE: &str = "Usage: stress_test_sync_client <server-multiaddr-with-peer-id> \
<max-concurrent-request-streams> <num-requests> <blocks-per-request> \
<initial-delay-ms>";

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();

let server_addr = args().nth(1).context(USAGE)?;
let server_addr = Multiaddr::from_str(&server_addr).context(USAGE)?;
let server_peer_id = server_addr
.iter()
.find_map(|x| match x {
Protocol::P2p(peer_id) => Some(peer_id),
_ => None,
})
.context(USAGE)?;
let max_concurrent_streams = args()
.nth(2)
.unwrap_or("100".to_string())
.parse::<usize>()
.context(USAGE)?;
let num_requests = args()
.nth(3)
.unwrap_or("100".to_string())
.parse::<u64>()
.context(USAGE)?;
let blocks_per_request = args()
.nth(4)
.unwrap_or("100".to_string())
.parse::<u64>()
.context(USAGE)?;
let initial_delay_ms = args()
.nth(5)
.unwrap_or("0".to_string())
.parse::<u64>()
.context(USAGE)?;
let initial_delay = Duration::from_millis(initial_delay_ms);

let keypair = Keypair::generate_ed25519();
let (client, mut event_rx, main_loop) = p2p::new(
keypair,
p2p::Config {
direct_connection_timeout: Duration::from_secs(60 * 60),
relay_connection_timeout: Duration::from_secs(1),
max_inbound_direct_peers: 10,
max_inbound_relayed_peers: 0,
max_outbound_peers: 10,
eviction_timeout: Duration::ZERO,
ip_whitelist: Default::default(),
bootstrap_period: None,
inbound_connections_rate_limit: RateLimit {
max: 10,
interval: Duration::from_secs(1),
},
kad_name: None,
stream_timeout: Duration::from_secs(60 * 60),
max_concurrent_streams,
},
ChainId::SEPOLIA_TESTNET,
);

let main_loop_handle = tokio::task::spawn(main_loop.run());

client
.start_listening("/ip4/0.0.0.0/tcp/0".parse().expect("Valid multiaddr"))
.await?;

client.dial(server_peer_id, server_addr.clone()).await?;

tracing::info!("Waiting to start sending requests...");

tokio::time::sleep(initial_delay).await;

let client_fut = futures::stream::iter(0..num_requests).map(|start| {
let client = client.clone();
async move {
tracing::info!(%start, "Requesting transactions for");
match client
.send_transactions_sync_request(
server_peer_id,
TransactionsRequest {
iteration: Iteration {
start: BlockNumberOrHash::Number(start * blocks_per_request),
direction: Direction::Forward,
// Max allowed by pathfinder (as a server)
limit: blocks_per_request,
step: 1.into(),
},
},
)
.await
{
Ok(mut rx) => {
let mut txn_counter = 0;
while let Some(response) = rx.next().await {
match response {
Ok(_) => {
txn_counter += 1;
}
Err(error) => {
tracing::warn!(%start, %error, "Failed to get response after {txn_counter} responses");
return;
}
}
}

tracing::info!(%start, "++++ Received {txn_counter} transactions for");
}
Err(error) => tracing::warn!(%start, %error, "Failed to get response stream for"),
}
}
}).buffer_unordered(max_concurrent_streams).fold((), |_, _| async {});

tokio::select! {
event = event_rx.recv() => {
tracing::debug!("Received event: {:?}", event);
}
_ = main_loop_handle => {
println!("Main loop finished");
}
_ = client_fut => {
println!("Client finished");
}
}

Ok(())
}
4 changes: 2 additions & 2 deletions crates/p2p_stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl fmt::Display for OutboundFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
OutboundFailure::DialFailure => write!(f, "Failed to dial the requested peer"),
OutboundFailure::Timeout => write!(f, "Timeout while waiting for a response"),
OutboundFailure::Timeout => write!(f, "Opening outbound stream timed out"),
OutboundFailure::ConnectionClosed => {
write!(f, "Connection was closed before a response was received")
}
Expand Down Expand Up @@ -349,7 +349,7 @@ where
/// connection is established.
///
/// > **Note**: In order for such a dialing attempt to succeed,
/// > the `RequestResponse` protocol must be embedded
/// > the `p2p_stream` protocol must be embedded
/// > in another `NetworkBehaviour` that provides peer and
/// > address discovery.
pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> OutboundRequestId {
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/src/bin/pathfinder/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,7 @@ Example:
long_help = "Maximum allowed number of concurrent streams per each \
request/response-stream protocol per connection.",
value_name = "LIMIT",
default_value = "1",
default_value = "100",
env = "PATHFINDER_P2P_EXPERIMENTAL_MAX_CONCURRENT_STREAMS"
)]
max_concurrent_streams: usize,
Expand Down
10 changes: 5 additions & 5 deletions crates/pathfinder/src/p2p_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,27 +137,27 @@ async fn handle_p2p_event(
p2p::Event::InboundHeadersSyncRequest {
request, channel, ..
} => {
get_headers(storage, request, channel).await?;
get_headers(storage, request, channel).await;
}
p2p::Event::InboundClassesSyncRequest {
request, channel, ..
} => {
get_classes(storage, request, channel).await?;
get_classes(storage, request, channel).await;
}
p2p::Event::InboundStateDiffsSyncRequest {
request, channel, ..
} => {
get_state_diffs(storage, request, channel).await?;
get_state_diffs(storage, request, channel).await;
}
p2p::Event::InboundTransactionsSyncRequest {
request, channel, ..
} => {
get_transactions(storage, request, channel).await?;
get_transactions(storage, request, channel).await;
}
p2p::Event::InboundEventsSyncRequest {
request, channel, ..
} => {
get_events(storage, request, channel).await?;
get_events(storage, request, channel).await;
}
p2p::Event::BlockPropagation { from, new_block } => {
tracing::info!(%from, ?new_block, "Block Propagation");
Expand Down
58 changes: 29 additions & 29 deletions crates/pathfinder/src/p2p_network/sync_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,39 @@ pub async fn get_headers(
storage: Storage,
request: BlockHeadersRequest,
tx: futures::channel::mpsc::Sender<BlockHeadersResponse>,
) -> anyhow::Result<()> {
) {
spawn_blocking_get(request, storage, blocking::get_headers, tx).await
}

pub async fn get_classes(
storage: Storage,
request: ClassesRequest,
tx: futures::channel::mpsc::Sender<ClassesResponse>,
) -> anyhow::Result<()> {
) {
spawn_blocking_get(request, storage, blocking::get_classes, tx).await
}

pub async fn get_state_diffs(
storage: Storage,
request: StateDiffsRequest,
tx: futures::channel::mpsc::Sender<StateDiffsResponse>,
) -> anyhow::Result<()> {
) {
spawn_blocking_get(request, storage, blocking::get_state_diffs, tx).await
}

pub async fn get_transactions(
storage: Storage,
request: TransactionsRequest,
tx: futures::channel::mpsc::Sender<TransactionsResponse>,
) -> anyhow::Result<()> {
) {
spawn_blocking_get(request, storage, blocking::get_transactions, tx).await
}

pub async fn get_events(
storage: Storage,
request: EventsRequest,
tx: futures::channel::mpsc::Sender<EventsResponse>,
) -> anyhow::Result<()> {
) {
spawn_blocking_get(request, storage, blocking::get_events, tx).await
}

Expand Down Expand Up @@ -399,16 +399,22 @@ fn get_start_block_number(
}

/// Spawns a blocking task and forwards the result to the given channel.
/// Bails out early if the database operation fails or sending fails.
/// **Does not wait for the DB operation to finish.**
/// The `getter` function is expected to send partial results through the tokio
/// channel as soon as possible, ideally after each database read operation.
///
/// ## Important
///
/// This function must detach the thread used to run the blocking DB operation
/// otherwise the entire p2p swarm will be blocked.
///
/// Related issue: <https://github.com/eqlabs/pathfinder/issues/2351>
async fn spawn_blocking_get<Request, Response, Getter>(
request: Request,
storage: Storage,
getter: Getter,
mut tx: futures::channel::mpsc::Sender<Response>,
) -> anyhow::Result<()>
where
) where
Request: Send + 'static,
Response: Send + 'static,
Getter: FnOnce(Transaction<'_>, Request, mpsc::Sender<Response>) -> anyhow::Result<()>
Expand All @@ -419,31 +425,25 @@ where

let (sync_tx, mut rx) = mpsc::channel(1); // For backpressure

let db_fut = async {
util::task::spawn_blocking(move |_| {
let _g = span.enter();
let mut connection = storage
.connection()
.context("Opening database connection")?;
let db_tx = connection
.transaction()
.context("Creating database transaction")?;
getter(db_tx, request, sync_tx)
})
.await
.context("Database read panic or shutting down")?
.context("Database read")
};

let fwd_fut = async move {
// Detach so we can exit the function asap
util::task::spawn(async move {
while let Some(x) = rx.recv().await {
tx.send(x).await.context("Sending item")?;
}
Ok::<_, anyhow::Error>(())
};
// Bail out early, either when db fails or sending fails
tokio::try_join!(db_fut, fwd_fut)?;
Ok(())
});

// Detach so we can exit the function asap
util::task::spawn_blocking(move |_| {
let _g = span.enter();
let mut connection = storage
.connection()
.context("Opening database connection")?;
let db_tx = connection
.transaction()
.context("Creating database transaction")?;
getter(db_tx, request, sync_tx)
});
}

/// Returns next block number considering direction.
Expand Down