Skip to content

Commit

Permalink
bench: add max_timeout_ms to send_and_confirm_bulk_transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Lou-Kamades committed Mar 26, 2024
1 parent a7f5823 commit 0568dc7
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 31 deletions.
46 changes: 30 additions & 16 deletions bench/src/benches/confirmation_rate.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use crate::{create_rng, generate_txs, BenchmarkTransactionParams};
use anyhow::Context;
use log::{debug, info, trace, warn};
use std::ops::Add;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context};

use crate::benches::rpc_interface::{
send_and_confirm_bulk_transactions, ConfirmationResponseFromRpc,
Expand All @@ -14,11 +14,11 @@ use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer};

#[derive(Debug, serde::Serialize)]
pub struct RpcStat {
// confirmation_time: f32,
// mode_slot: u64,
// confirmed: u64,
// unconfirmed: u64,
// failed: u64,
// confirmation_time: f32,
// mode_slot: u64,
// confirmed: u64,
// unconfirmed: u64,
// failed: u64,
tx_sent: u64,
tx_confirmed: u64,
// in ms
Expand All @@ -34,6 +34,7 @@ pub async fn confirmation_rate(
payer_path: &Path,
rpc_url: String,
tx_params: BenchmarkTransactionParams,
max_timeout_ms: u64,
txs_per_round: usize,
num_of_runs: usize,
) -> anyhow::Result<()> {
Expand All @@ -50,17 +51,23 @@ pub async fn confirmation_rate(
let mut rpc_results = Vec::with_capacity(num_of_runs);

for _ in 0..num_of_runs {
match send_bulk_txs_and_wait(&rpc, &payer, txs_per_round, &tx_params).await.context("send bulk tx and wait") {
match send_bulk_txs_and_wait(&rpc, &payer, txs_per_round, &tx_params, max_timeout_ms)
.await
.context("send bulk tx and wait")
{
Ok(stat) => {
rpc_results.push(stat);
}
Err(err) => {
warn!("Failed to send bulk txs and wait - no rpc stats available: {}", err);
warn!(
"Failed to send bulk txs and wait - no rpc stats available: {}",
err
);
}
}
}

if rpc_results.len() > 0 {
if !rpc_results.is_empty() {
info!("avg_rpc: {:?}", calc_stats_avg(&rpc_results));
} else {
info!("avg_rpc: n/a");
Expand All @@ -73,19 +80,21 @@ pub async fn send_bulk_txs_and_wait(
payer: &Keypair,
num_txns: usize,
tx_params: &BenchmarkTransactionParams,
max_timeout_ms: u64,
) -> anyhow::Result<RpcStat> {
trace!("Get latest blockhash and generate transactions");
let hash = rpc.get_latest_blockhash().await
.map_err(|err| {
log::error!("Error get latest blockhash : {err:?}");
err
})?;
let hash = rpc.get_latest_blockhash().await.map_err(|err| {
log::error!("Error get latest blockhash : {err:?}");
err
})?;
let mut rng = create_rng(None);
let txs = generate_txs(num_txns, payer, hash, &mut rng, tx_params);

trace!("Sending {} transactions in bulk ..", txs.len());
let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(rpc, &txs).await.context("send and confirm bulk tx")?;
send_and_confirm_bulk_transactions(rpc, &txs, max_timeout_ms)
.await
.context("send and confirm bulk tx")?;
trace!("Done sending {} transaction.", txs.len());

let mut tx_sent = 0;
Expand All @@ -96,7 +105,12 @@ pub async fn send_bulk_txs_and_wait(
let mut sum_slot_confirmation_time = 0;
for (tx_sig, confirmation_response) in tx_and_confirmations_from_rpc {
match confirmation_response {
ConfirmationResponseFromRpc::Success(slot_sent, slot_confirmed, commitment_status, confirmation_time) => {
ConfirmationResponseFromRpc::Success(
slot_sent,
slot_confirmed,
commitment_status,
confirmation_time,
) => {
debug!(
"Signature {} confirmed with level {:?} after {:.02}ms, {} slots",
tx_sig,
Expand Down
7 changes: 4 additions & 3 deletions bench/src/benches/confirmation_slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ pub async fn confirmation_slot(
let a_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(a_delay)).await;
debug!("(A) send tx {}", rpc_a_tx.signatures[0]);
send_and_confirm_transaction(&rpc_a, rpc_a_tx)
send_and_confirm_transaction(&rpc_a, rpc_a_tx, max_timeout_ms)
.await
.unwrap_or_else(|e| {
error!("Failed to send_and_confirm_transaction for A: {}", e);
Expand All @@ -120,7 +120,7 @@ pub async fn confirmation_slot(
let b_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(b_delay)).await;
debug!("(B) send tx {}", rpc_b_tx.signatures[0]);
send_and_confirm_transaction(&rpc_b, rpc_b_tx)
send_and_confirm_transaction(&rpc_b, rpc_b_tx, max_timeout_ms)
.await
.unwrap_or_else(|e| {
error!("Failed to send_and_confirm_transaction for B: {}", e);
Expand Down Expand Up @@ -176,9 +176,10 @@ async fn create_tx(
async fn send_and_confirm_transaction(
rpc: &RpcClient,
tx: Transaction,
max_timeout_ms: u64,
) -> anyhow::Result<ConfirmationSlotInfo> {
let result_vec: Vec<(Signature, ConfirmationResponseFromRpc)> =
send_and_confirm_bulk_transactions(rpc, &[tx]).await?;
send_and_confirm_bulk_transactions(rpc, &[tx], max_timeout_ms).await?;

let (signature, confirmation_response) = result_vec.into_iter().next().unwrap();

Expand Down
27 changes: 19 additions & 8 deletions bench/src/benches/rpc_interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use solana_transaction_status::TransactionConfirmationStatus;
use std::collections::{HashMap, HashSet};
use std::iter::zip;
use std::sync::Arc;
use std::time::{Duration};
use std::time::Duration;
use tokio::time::Instant;
use url::Url;

Expand All @@ -36,9 +36,12 @@ pub enum ConfirmationResponseFromRpc {
pub async fn send_and_confirm_bulk_transactions(
rpc_client: &RpcClient,
txs: &[Transaction],
max_timeout_ms: u64,
) -> anyhow::Result<Vec<(Signature, ConfirmationResponseFromRpc)>> {
trace!("Polling for next slot ..");
let send_slot = poll_next_slot_start(rpc_client).await.context("poll for next start slot")?;
let send_slot = poll_next_slot_start(rpc_client)
.await
.context("poll for next start slot")?;
trace!("Send slot: {}", send_slot);

let send_config = RpcSendTransactionConfig {
Expand All @@ -60,12 +63,14 @@ pub async fn send_and_confirm_bulk_transactions(

let after_send_slot = rpc_client
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await.context("get slot afterwards")?;
.await
.context("get slot afterwards")?;

// optimal value is "0"
debug!(
"Sent {} transactions within {} slots",
txs.len(), after_send_slot - send_slot
txs.len(),
after_send_slot - send_slot
);

let num_sent_ok = batch_sigs_or_fails
Expand Down Expand Up @@ -139,11 +144,17 @@ pub async fn send_and_confirm_bulk_transactions(
// "Too many inputs provided; max 256"
for chunk in tx_batch.chunks(256) {
// fail hard if not possible to poll status
let chunk_responses = rpc_client.get_signature_statuses(&chunk).await.expect("get signature statuses");
let chunk_responses = rpc_client
.get_signature_statuses(chunk)
.await
.expect("get signature statuses");
batch_status.extend(chunk_responses.value);
};
}
if status_started_at.elapsed() > Duration::from_millis(100) {
warn!("SLOW get_signature_statuses took {:?}", status_started_at.elapsed());
warn!(
"SLOW get_signature_statuses took {:?}",
status_started_at.elapsed()
);
}
let elapsed = started_at.elapsed();

Expand Down Expand Up @@ -188,7 +199,7 @@ pub async fn send_and_confirm_bulk_transactions(
break 'pooling_loop;
}

if iteration == 100 {
if started_at.elapsed() > Duration::from_millis(max_timeout_ms) {
info!("Timeout waiting for transactions to confirmed after {} iterations - giving up on {}", iteration, pending_status_set.len());
break 'pooling_loop;
}
Expand Down
6 changes: 5 additions & 1 deletion bench/src/benchnew.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use bench::{
BenchmarkTransactionParams,
};
use clap::{Parser, Subcommand};
use log::trace;

#[derive(Parser, Debug)]
#[clap(version, about)]
Expand Down Expand Up @@ -41,6 +40,9 @@ enum SubCommand {
rpc_url: String,
#[clap(short, long)]
size_tx: TxSize,
/// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed
#[clap(short, long, default_value_t = 15_000)]
max_timeout_ms: u64,
#[clap(short, long)]
txns_per_round: usize,
#[clap(short, long)]
Expand Down Expand Up @@ -106,6 +108,7 @@ async fn main() {
payer_path,
rpc_url,
size_tx,
max_timeout_ms,
txns_per_round,
num_of_runs,
cu_price,
Expand All @@ -116,6 +119,7 @@ async fn main() {
tx_size: size_tx,
cu_price_micro_lamports: cu_price,
},
max_timeout_ms,
txns_per_round,
num_of_runs,
)
Expand Down
7 changes: 4 additions & 3 deletions util/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
pub mod encoding;
pub mod secrets;
pub mod statistics;

// http://mango.rpcpool.com/c232ab232ba2323
pub fn obfuscate_rpcurl(rpc_addr: &str) -> String {
if rpc_addr.contains("rpcpool.com") {
return rpc_addr.replacen(char::is_numeric, "X", 99);
}
rpc_addr.to_string()
}
pub mod encoding;
pub mod secrets;
pub mod statistics;

0 comments on commit 0568dc7

Please sign in to comment.