Skip to content

Commit

Permalink
benchmarks: add timeout_ms to confirmation_slot bench
Browse files Browse the repository at this point in the history
  • Loading branch information
Lou-Kamades committed Mar 15, 2024
1 parent 6764a5a commit f55df40
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 59 deletions.
126 changes: 73 additions & 53 deletions bench/src/benches/confirmation_slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,77 +3,86 @@ use std::time::Duration;

use crate::tx_size::TxSize;
use crate::{create_memo_tx, create_rng, Rng8};
use anyhow::Context;
use log::{info, warn};
use log::{debug, info};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_rpc_client_api::config::{RpcSendTransactionConfig, RpcTransactionConfig};
use solana_sdk::signature::{read_keypair_file, Signer};
use solana_sdk::transaction::Transaction;
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
use solana_transaction_status::UiTransactionEncoding;
use tokio::time::{sleep, Instant};
use tracing::error;

/// TC1 -- Send 2 txs to separate RPCs and compare confirmation slot.
/// The benchmark attempts to minimize the effect of real-world distance and synchronize the time that each transaction reaches the RPC.
/// This is achieved via delayed sending the transaction to the nearer RPC.
/// This is achieved by delaying submission of the transaction to the "nearer" RPC.
/// Delay time is calculated as half of the difference in duration of [getHealth](https://solana.com/docs/rpc/http/gethealth) calls to both RPCs.
pub async fn confirmation_slot(
payer_path: &Path,
rpc_a_url: String,
rpc_b_url: String,
tx_size: TxSize,
max_timeout_ms: u64,
num_rounds: usize,
cu_price_micro_lamports: u64,
) -> anyhow::Result<()> {
warn!("THIS IS WORK IN PROGRESS");

let rpc_a = RpcClient::new(rpc_a_url);
info!("RPC A: {}", rpc_a.url());

let rpc_b = RpcClient::new(rpc_b_url);
info!("RPC B: {}", rpc_b.url());
info!("START BENCHMARK: confirmation_slot");
info!("RPC A: {}", rpc_a_url);
info!("RPC B: {}", rpc_b_url);

let mut rng = create_rng(None);
let payer = read_keypair_file(payer_path).expect("payer file");
info!("Payer: {}", payer.pubkey().to_string());

// TODO: loop here

let time_a = rpc_roundtrip_duration(&rpc_a).await?.as_secs_f64();
let time_b = rpc_roundtrip_duration(&rpc_b).await?.as_secs_f64();

info!("{} (A) latency: {}", rpc_a.url(), time_a);
info!("{} (B) latency: {}", rpc_b.url(), time_b);

let rpc_a_tx = create_tx(&rpc_a, &payer, &mut rng, tx_size, cu_price_micro_lamports).await?;
let rpc_b_tx = create_tx(&rpc_b, &payer, &mut rng, tx_size, cu_price_micro_lamports).await?;

let half_round_trip = (time_a - time_b).abs() / 2.0;
let (a_delay, b_delay) = if time_a > time_b {
(0f64, half_round_trip)
} else {
(half_round_trip, 0f64)
};

info!("A delay: {}, B delay: {}", a_delay, b_delay);

let a_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(a_delay)).await;
send_transaction_and_get_slot(&rpc_a, rpc_a_tx)
.await
.unwrap()
});

let b_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(b_delay)).await;
send_transaction_and_get_slot(&rpc_b, rpc_b_tx)
.await
.unwrap()
});

let (a_slot, b_slot) = tokio::join!(a_task, b_task);

info!("a_slot: {}", a_slot?);
info!("b_slot: {}", b_slot?);
// TODO: aggregate stats

for i in 0..num_rounds {

Check warning on line 39 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

unused variable: `i`

Check warning on line 39 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / Test lite-rpc against running Validator

unused variable: `i`

Check failure on line 39 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

unused variable: `i`

Check warning on line 39 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

unused variable: `i`

Check warning on line 39 in bench/src/benches/confirmation_slot.rs

View workflow job for this annotation

GitHub Actions / lite-rpc full build

unused variable: `i`
let rpc_a = RpcClient::new(rpc_a_url.clone());
let rpc_b = RpcClient::new(rpc_b_url.clone());
let time_a = rpc_roundtrip_duration(&rpc_a).await?.as_secs_f64();
let time_b = rpc_roundtrip_duration(&rpc_b).await?.as_secs_f64();

debug!("{} (A) latency: {}", rpc_a.url(), time_a);
debug!("{} (B) latency: {}", rpc_b.url(), time_b);

let rpc_a_tx =
create_tx(&rpc_a, &payer, &mut rng, tx_size, cu_price_micro_lamports).await?;
let rpc_b_tx =
create_tx(&rpc_b, &payer, &mut rng, tx_size, cu_price_micro_lamports).await?;

let half_round_trip = (time_a - time_b).abs() / 2.0;
let (a_delay, b_delay) = if time_a > time_b {
(0f64, half_round_trip)
} else {
(half_round_trip, 0f64)
};

info!("A delay: {}, B delay: {}", a_delay, b_delay);

let a_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(a_delay)).await;
send_transaction_and_get_slot(&rpc_a, rpc_a_tx, max_timeout_ms)
.await
.unwrap_or_else(|e| {
error!("Failed to confirm txn for A: {}", e);
0
})
});

let b_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(b_delay)).await;
send_transaction_and_get_slot(&rpc_b, rpc_b_tx, max_timeout_ms)
.await
.unwrap_or_else(|e| {
error!("Failed to confirm txn for B: {}", e);
0
})
});

let (a_slot, b_slot) = tokio::join!(a_task, b_task);

info!("a_slot: {}, b_slot: {}\n", a_slot?, b_slot?);
}

Ok(())
}
Expand All @@ -96,20 +105,32 @@ async fn create_tx(
))
}

async fn send_transaction_and_get_slot(rpc: &RpcClient, tx: Transaction) -> anyhow::Result<u64> {
async fn send_transaction_and_get_slot(
rpc: &RpcClient,
tx: Transaction,
timeout_ms: u64,
) -> anyhow::Result<u64> {
let send_config = RpcSendTransactionConfig {
skip_preflight: true,
preflight_commitment: None,
encoding: None,
max_retries: Some(3),
min_context_slot: None,
};
let signature = rpc.send_transaction_with_config(&tx, send_config).await?;
let signature = rpc
.send_transaction_with_config(&tx, send_config)
.await
.map_err(|err| anyhow::anyhow!("{:?}", err))?;

let start_time = Instant::now();
loop {
if start_time.elapsed() >= Duration::from_millis(timeout_ms) {
return Ok(0); // Timeout occurred
}
let confirmed = rpc
.confirm_transaction_with_commitment(&signature, CommitmentConfig::confirmed())
.await?;
.await
.unwrap();
if confirmed.value {
break;
}
Expand All @@ -122,8 +143,7 @@ async fn send_transaction_and_get_slot(rpc: &RpcClient, tx: Transaction) -> anyh
};
let transaction = rpc
.get_transaction_with_config(&signature, fetch_config)
.await
.context("Failed to fetch transaction")?;
.await?;
Ok(transaction.slot)
}

Expand Down
27 changes: 21 additions & 6 deletions bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ enum SubCommand {
rpc_url: String,
#[clap(short, long)]
time_ms: u64,
/// (Optional) The CU price in micro lamports
/// The CU price in micro lamports
#[clap(short, long, default_value_t = 3)]
#[arg(short = 'f')]
cu_price: u64,
Expand All @@ -42,7 +42,7 @@ enum SubCommand {
txns_per_round: usize,
#[clap(short, long)]
num_rounds: usize,
/// (Optional) The CU price in micro lamports
/// The CU price in micro lamports
#[clap(short, long, default_value_t = 300)]
#[arg(short = 'f')]
cu_price: u64,
Expand All @@ -61,7 +61,12 @@ enum SubCommand {
rpc_b: String,
#[clap(short, long)]
size_tx: TxSize,
/// (Optional) The CU price in micro lamports
/// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed
#[clap(short, long, default_value_t = 3000)]
max_timeout_ms: u64,
#[clap(short, long)]
num_rounds: usize,
/// The CU price in micro lamports
#[clap(short, long, default_value_t = 300)]
#[arg(short = 'f')]
cu_price: u64,
Expand Down Expand Up @@ -114,9 +119,19 @@ async fn main() {
rpc_a,
rpc_b,
size_tx,
max_timeout_ms,
num_rounds,
cu_price,
} => confirmation_slot(
&payer_path,
rpc_a,
rpc_b,
size_tx,
max_timeout_ms,
num_rounds,
cu_price,
} => confirmation_slot(&payer_path, rpc_a, rpc_b, size_tx, cu_price)
.await
.unwrap(),
)
.await
.unwrap(),
}
}

0 comments on commit f55df40

Please sign in to comment.