Skip to content

Commit

Permalink
bench: fix startup race listening to websocket
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Aug 8, 2024
1 parent fd72c5f commit 198ca00
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 28 deletions.
10 changes: 7 additions & 3 deletions bench/src/benches/confirmation_rate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,18 @@ pub async fn send_bulk_txs_and_wait(
max_timeout: Duration,
) -> anyhow::Result<Metric> {
trace!("Get latest blockhash and generate transactions");
let hash = rpc.get_latest_blockhash().await.map_err(|err| {
let recent_blockhash = rpc.get_latest_blockhash().await.map_err(|err| {
log::error!("Error get latest blockhash : {err:?}");
err
})?;
let mut rng = create_rng(None);
let txs = generate_txs(num_txs, payer, hash, &mut rng, tx_params);
let txs = generate_txs(num_txs, payer, recent_blockhash, &mut rng, tx_params);

trace!("Sending {} transactions in bulk ..", txs.len());
trace!(
"Sending {} transactions with blockhash {} to RPC sendTransaction in bulk ..",
txs.len(),
recent_blockhash
);
let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(
rpc,
Expand Down
36 changes: 18 additions & 18 deletions bench/src/benches/rpc_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use futures::TryFutureExt;
use itertools::Itertools;
use log::{debug, trace, warn};

use solana_lite_rpc_util::obfuscate_rpcurl;
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client::rpc_client::SerializableTransaction;
use solana_rpc_client_api::client_error::ErrorKind;
Expand Down Expand Up @@ -35,7 +34,7 @@ pub enum ConfirmationResponseFromRpc {
// RPC error on send_transaction
SendError(Arc<ErrorKind>),
// (sent slot at confirmed commitment, confirmed slot, ..., ...)
// transaction_confirmation_status is "confirmed" (finalized is not reported by blockSubscribe websocket
// transaction_confirmation_status is "confirmed"
Success(Slot, Slot, TransactionConfirmationStatus, Duration),
// timout waiting for confirmation status
Timeout(Duration),
Expand All @@ -48,12 +47,11 @@ pub async fn send_and_confirm_bulk_transactions(
txs: &[VersionedTransaction],
max_timeout: Duration,
) -> anyhow::Result<Vec<(Signature, ConfirmationResponseFromRpc)>> {
debug!("Send transaction with timeout {:?}", max_timeout);
trace!("Polling for next slot ..");
let send_slot = poll_next_slot_start(rpc_client)
.await
.context("poll for next start slot")?;
trace!("Send slot: {}", send_slot);
debug!(
"send_transaction for {} txs with timeout {:.03}s",
txs.len(),
max_timeout.as_secs_f32()
);

let send_config = RpcSendTransactionConfig {
skip_preflight: true,
Expand All @@ -65,7 +63,7 @@ pub async fn send_and_confirm_bulk_transactions(

let tx_listener_startup_token = CancellationToken::new();

// note: we get confirmed but never finalized
// note: we get confirmed but not finalized
let tx_listener_startup_token_cp = tx_listener_startup_token.clone();
let (tx_status_map, _jh_collector) = start_tx_status_collector(
tx_status_websocket_addr.clone(),
Expand All @@ -78,6 +76,12 @@ pub async fn send_and_confirm_bulk_transactions(
// waiting for thread to cancel the token
tx_listener_startup_token.cancelled().await;

trace!("Waiting for next slot before sending transactions ..");
let send_slot = poll_next_slot_start(rpc_client)
.await
.context("poll for next start slot")?;
trace!("Send slot: {}", send_slot);

let started_at = Instant::now();
trace!(
"Sending {} transactions via RPC (retries=off) ..",
Expand Down Expand Up @@ -119,9 +123,9 @@ pub async fn send_and_confirm_bulk_transactions(
for (i, tx_sig) in txs.iter().enumerate() {
let tx_sent = batch_sigs_or_fails[i].is_ok();
if tx_sent {
trace!("- tx_sent {}", tx_sig.get_signature());
debug!("- tx_sent {}", tx_sig.get_signature());
} else {
trace!("- tx_fail {}", tx_sig.get_signature());
debug!("- tx_fail {}", tx_sig.get_signature());
}
}
let elapsed = started_at.elapsed();
Expand Down Expand Up @@ -156,13 +160,9 @@ pub async fn send_and_confirm_bulk_transactions(

// items get moved from pending_status_set to result_status_map

debug!(
"Waiting for transaction confirmations from websocket source <{}> ..",
obfuscate_rpcurl(tx_status_websocket_addr.as_str())
);
let started_at = Instant::now();
let timeout_at = started_at + max_timeout;
// "poll" the status dashmap
// "poll" the status dashmap which gets updated by the tx status collector task
'polling_loop: for iteration in 1.. {
let iteration_ends_at = started_at + Duration::from_millis(iteration * 100);
assert_eq!(
Expand All @@ -179,7 +179,7 @@ pub async fn send_and_confirm_bulk_transactions(
// status is confirmed
if pending_status_set.remove(tx_sig) {
trace!(
"take status for sig {:?} and confirmed_slot: {:?} from websocket source",
"websocket source tx status for sig {:?} and confirmed_slot: {:?}",
tx_sig,
confirmed_slot
);
Expand Down Expand Up @@ -264,7 +264,7 @@ pub async fn poll_next_slot_start(rpc_client: &RpcClient) -> Result<Slot, Error>
let slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await?;
trace!("polling slot {}", slot);
trace!(".. polling slot {}", slot);
if let Some(last_slot) = last_slot {
if last_slot + 1 == slot {
break slot;
Expand Down
10 changes: 4 additions & 6 deletions bench/src/benches/tx_status_websocket_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ use url::Url;
use websocket_tungstenite_retry::websocket_stable;
use websocket_tungstenite_retry::websocket_stable::WsMessage;

// returns map of transaction signatures to the slot they were confirmed
// returns map of transaction signatures to the slot they were confirmed (or finalized)
// the caller must await for the token to be cancelled to prevent startup race condition
pub async fn start_tx_status_collector(
ws_url: Url,
payer_pubkey: Pubkey,
commitment_config: CommitmentConfig,
startup_token: CancellationToken,
) -> (Arc<DashMap<Signature, Slot>>, AbortHandle) {
// e.g. "commitment"
let commitment_str = format!("{:?}", commitment_config);

// note: no commitment paramter is provided; according to the docs we get confirmed+finalized but never processed
let mut web_socket_slots = websocket_stable::StableWebSocket::new_with_timeout(
ws_url,
json!({
Expand Down Expand Up @@ -57,6 +57,7 @@ pub async fn start_tx_status_collector(
let jh = tokio::spawn(async move {
// notify the caller that we are ready to receive messages
startup_token.cancel();
debug!("Websocket subscription to 'blockSubscribe' is ready to observe signatures in confirmed blocks");
while let Ok(msg) = channel.recv().await {
if let WsMessage::Text(payload) = msg {
let ws_result: jsonrpsee_types::SubscriptionResponse<Response<RpcBlockUpdate>> =
Expand All @@ -72,10 +73,7 @@ pub async fn start_tx_status_collector(
{
for tx_sig in tx_sigs_from_block {
let tx_sig = Signature::from_str(&tx_sig).unwrap();
debug!(
"Transaction signature found in block: {} - slot {}",
tx_sig, slot
);
debug!("Transaction signature found in block {}: {}", slot, tx_sig);
map.entry(tx_sig).or_insert(slot);
}
}
Expand Down
2 changes: 1 addition & 1 deletion bench/src/benchnew.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ enum SubCommand {
#[clap(short, long)]
size_tx: TxSize,
/// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed
#[clap(short, long, default_value_t = 15_000)]
#[clap(short, long, default_value_t = 30_000)]
max_timeout_ms: u64,
#[clap(short, long)]
num_of_runs: usize,
Expand Down

0 comments on commit 198ca00

Please sign in to comment.