Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TC-1 #359

Merged
merged 26 commits into from
Mar 27, 2024
Merged

TC-1 #359

Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
318d669
benchmarks: make cu_price configurable
Lou-Kamades Mar 13, 2024
6764a5a
benchmarks: confirmation slot bench attempts to minimize RPC distance
Lou-Kamades Mar 13, 2024
f55df40
benchmarks: add timeout_ms to confirmation_slot bench
Lou-Kamades Mar 13, 2024
c79b8f5
bench: add function to post data to Ping Thing
Lou-Kamades Mar 15, 2024
8d26c71
Merge branch 'main' into lou/tc-1
Lou-Kamades Mar 19, 2024
22a2eec
update cli merge confilicts
Lou-Kamades Mar 19, 2024
d278be1
bench: (TC1) add option for posting confirmation data to Ping Thing
Lou-Kamades Mar 19, 2024
3af8e53
bench: review comments
Lou-Kamades Mar 21, 2024
ee9bdf8
bench: convert ConfirmationSlotResult to an enum
Lou-Kamades Mar 21, 2024
b2c91ac
bench: fix tx size test
Lou-Kamades Mar 21, 2024
fd181d6
configure rpc_client with level=confirmed
grooviegermanikus Mar 22, 2024
1e5c1af
moved around stuff
grooviegermanikus Mar 22, 2024
d7c57c6
bench: ConfirmationResponseFromRpc::Success includes sent and confirm…
Lou-Kamades Mar 24, 2024
764952e
revamped status counting
grooviegermanikus Mar 26, 2024
69f5d97
add more narrow logs
grooviegermanikus Mar 26, 2024
a7f5823
Merge branch 'main' into lou/tc-1
Lou-Kamades Mar 26, 2024
0568dc7
bench: add max_timeout_ms to send_and_confirm_bulk_transactions
Lou-Kamades Mar 26, 2024
efbc428
cleanup
grooviegermanikus Mar 26, 2024
7881652
minor checks
grooviegermanikus Mar 26, 2024
9c2df94
simple result report for confirmation-slot
grooviegermanikus Mar 26, 2024
40bf799
Merge remote-tracking branch 'origin/lou/tc-1' into lou/tc-1
grooviegermanikus Mar 26, 2024
dd10fa3
bench: add TC-1 postgres result struct
Lou-Kamades Mar 27, 2024
48889e7
restore timeout
grooviegermanikus Mar 27, 2024
7e4b25b
Merge remote-tracking branch 'origin/lou/tc-1' into lou/tc-1
grooviegermanikus Mar 27, 2024
0d93ebf
rename txns -> txs, round -> run
grooviegermanikus Mar 27, 2024
259a895
add work-in-progress warning
grooviegermanikus Mar 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dashmap = { workspace = true }
bincode = { workspace = true }
itertools = "0.10.5"
spl-memo = "4.0.0"
reqwest = "0.11.26"
lazy_static = "1.4.0"

[dev-dependencies]
Expand Down
9 changes: 7 additions & 2 deletions bench/src/benches/api_load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ use solana_sdk::signature::{read_keypair_file, Keypair, Signer};
use crate::create_memo_tx_small;

// TC3 measure how much load the API endpoint can take
pub async fn api_load(payer_path: &Path, rpc_url: String, time_ms: u64) -> anyhow::Result<()> {
pub async fn api_load(
payer_path: &Path,
rpc_url: String,
time_ms: u64,
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
cu_price_micro_lamports: u64,
) -> anyhow::Result<()> {
warn!("THIS IS WORK IN PROGRESS");

let rpc = Arc::new(RpcClient::new(rpc_url));
Expand All @@ -40,7 +45,7 @@ pub async fn api_load(payer_path: &Path, rpc_url: String, time_ms: u64) -> anyho

tokio::spawn(async move {
let msg = msg.as_bytes();
let tx = create_memo_tx_small(msg, &payer, hash);
let tx = create_memo_tx_small(msg, &payer, hash, cu_price_micro_lamports);
match rpc.send_transaction(&tx).await {
Ok(_) => success.fetch_add(1, Ordering::Relaxed),
Err(_) => failed.fetch_add(1, Ordering::Relaxed),
Expand Down
20 changes: 18 additions & 2 deletions bench/src/benches/confirmation_rate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub async fn confirmation_rate(
tx_size: TxSize,
txns_per_round: usize,
num_rounds: usize,
cu_price_micro_lamports: u64,
) -> anyhow::Result<()> {
warn!("THIS IS WORK IN PROGRESS");

Expand All @@ -49,7 +50,14 @@ pub async fn confirmation_rate(
let mut rpc_results = Vec::with_capacity(num_rounds);

for _ in 0..num_rounds {
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
let stat: RpcStat = send_bulk_txs_and_wait(&rpc, &payer, txns_per_round, tx_size).await?;
let stat: RpcStat = send_bulk_txs_and_wait(
&rpc,
&payer,
txns_per_round,
tx_size,
cu_price_micro_lamports,
)
.await?;
rpc_results.push(stat);
}

Expand All @@ -62,10 +70,18 @@ pub async fn send_bulk_txs_and_wait(
payer: &Keypair,
num_txns: usize,
tx_size: TxSize,
cu_price_micro_lamports: u64,
) -> anyhow::Result<RpcStat> {
let hash = rpc.get_latest_blockhash().await?;
let mut rng = create_rng(None);
let txs = generate_txs(num_txns, payer, hash, &mut rng, tx_size);
let txs = generate_txs(
num_txns,
payer,
hash,
&mut rng,
tx_size,
cu_price_micro_lamports,
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
);

let started_at = tokio::time::Instant::now();

Expand Down
202 changes: 173 additions & 29 deletions bench/src/benches/confirmation_slot.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,107 @@
use std::path::Path;
use std::time::Duration;

use crate::metrics::{PingThing, PingThingTxType};
use crate::tx_size::TxSize;
use crate::{create_memo_tx, create_rng, send_and_confirm_transactions, Rng8};
use anyhow::Context;
use log::{info, warn};
use crate::{create_memo_tx, create_rng, Rng8};
use log::{debug, info};
use solana_rpc_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::signature::{read_keypair_file, Signer};
use solana_rpc_client_api::config::{RpcSendTransactionConfig, RpcTransactionConfig};
use solana_sdk::signature::{read_keypair_file, Signature, Signer};
use solana_sdk::transaction::Transaction;
use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair};
use solana_transaction_status::UiTransactionEncoding;
use tokio::time::{sleep, Instant};
use tracing::error;

/// TC1 send 2 txs (one via LiteRPC, one via Solana RPC) and compare confirmation slot (=slot distance)
#[derive(Default, PartialEq, Eq)]
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
pub struct ConfirmationSlotResult {
pub signature: Signature,
pub slot_sent: u64,
pub slot_landed: u64,
pub send_duration: Duration,
}

#[allow(clippy::too_many_arguments)]
/// TC1 -- Send 2 txs to separate RPCs and compare confirmation slot.
/// The benchmark attempts to minimize the effect of real-world distance and synchronize the time that each transaction reaches the RPC.
/// This is achieved by delaying submission of the transaction to the "nearer" RPC.
/// Delay time is calculated as half of the difference in duration of [getHealth](https://solana.com/docs/rpc/http/gethealth) calls to both RPCs.
pub async fn confirmation_slot(
payer_path: &Path,
rpc_a_url: String,
rpc_b_url: String,
tx_size: TxSize,
max_timeout_ms: u64,
num_rounds: usize,
cu_price_micro_lamports: u64,
ping_thing_token: Option<String>,
) -> anyhow::Result<()> {
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
warn!("THIS IS WORK IN PROGRESS");

let rpc_a = RpcClient::new(rpc_a_url);
info!("RPC A: {}", rpc_a.url());

let rpc_b = RpcClient::new(rpc_b_url);
info!("RPC B: {}", rpc_b.url());
info!("START BENCHMARK: confirmation_slot");
info!("RPC A: {}", rpc_a_url);
info!("RPC B: {}", rpc_b_url);

let mut rng = create_rng(None);
let payer = read_keypair_file(payer_path).expect("payer file");
info!("Payer: {}", payer.pubkey().to_string());

let rpc_a_tx = create_tx(&rpc_a, &payer, &mut rng, tx_size).await?;
let rpc_b_tx = create_tx(&rpc_b, &payer, &mut rng, tx_size).await?;
for _ in 0..num_rounds {
let rpc_a = RpcClient::new(rpc_a_url.clone());
let rpc_b = RpcClient::new(rpc_b_url.clone());
let time_a = rpc_roundtrip_duration(&rpc_a).await?.as_secs_f64();
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
let time_b = rpc_roundtrip_duration(&rpc_b).await?.as_secs_f64();

debug!("{} (A) latency: {}", rpc_a.url(), time_a);
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
debug!("{} (B) latency: {}", rpc_b.url(), time_b);

let rpc_a_tx =
create_tx(&rpc_a, &payer, &mut rng, tx_size, cu_price_micro_lamports).await?;
let rpc_b_tx =
create_tx(&rpc_b, &payer, &mut rng, tx_size, cu_price_micro_lamports).await?;

let half_round_trip = (time_a - time_b).abs() / 2.0;
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
let (a_delay, b_delay) = if time_a > time_b {
(0f64, half_round_trip)
} else {
(half_round_trip, 0f64)
};

debug!("A delay: {}, B delay: {}", a_delay, b_delay);

let a_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(a_delay)).await;
send_transaction(&rpc_a, rpc_a_tx, max_timeout_ms)
.await
.unwrap_or_else(|e| {
error!("Failed to confirm txn for A: {}", e);
ConfirmationSlotResult::default()
})
});

let b_task = tokio::spawn(async move {
sleep(Duration::from_secs_f64(b_delay)).await;
send_transaction(&rpc_b, rpc_b_tx, max_timeout_ms)
.await
.unwrap_or_else(|e| {
error!("Failed to confirm txn for B: {}", e);
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
ConfirmationSlotResult::default()
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
})
});

let (rpc_slot, lite_rpc_slot) = tokio::join!(
send_transaction_and_get_slot(&rpc_a, rpc_a_tx),
send_transaction_and_get_slot(&rpc_b, rpc_b_tx)
);
let (a, b) = tokio::join!(a_task, b_task);
let a_result = a?;
let b_result = b?;

info!("rpc_slot: {}", rpc_slot?);
info!("lite_rpc_slot: {}", lite_rpc_slot?);
if let Some(ref token) = ping_thing_token {
submit_ping_thing_stats(&a_result, token.as_str()).await?;
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
submit_ping_thing_stats(&b_result, token.as_str()).await?;
};

info!(
"a_slot: {}, b_slot: {}\n",
a_result.slot_landed, b_result.slot_landed
);
}

Ok(())
}
Expand All @@ -47,19 +111,99 @@ async fn create_tx(
payer: &Keypair,
rng: &mut Rng8,
tx_size: TxSize,
cu_price_micro_lamports: u64,
) -> anyhow::Result<Transaction> {
let hash = rpc.get_latest_blockhash().await?;

Ok(create_memo_tx(payer, hash, rng, tx_size))
Ok(create_memo_tx(
payer,
hash,
rng,
tx_size,
cu_price_micro_lamports,
))
}

async fn send_transaction(
rpc: &RpcClient,
tx: Transaction,
timeout_ms: u64,
) -> anyhow::Result<ConfirmationSlotResult> {
let send_config = RpcSendTransactionConfig {
skip_preflight: true,
preflight_commitment: None,
encoding: None,
max_retries: Some(3),
min_context_slot: None,
};
let slot_sent = rpc
.get_slot_with_commitment(CommitmentConfig::confirmed())
.await?;
let signature = rpc
.send_transaction_with_config(&tx, send_config)
.await
.map_err(|err| anyhow::anyhow!("{:?}", err))?;
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved

let start_time = Instant::now();
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
loop {
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
if start_time.elapsed() >= Duration::from_millis(timeout_ms) {
return Ok(ConfirmationSlotResult::default()); // Timeout occurred
}
let confirmed = rpc
.confirm_transaction_with_commitment(&signature, CommitmentConfig::confirmed())
.await
.unwrap();
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
if confirmed.value {
break;
}
}

let send_duration = start_time.elapsed();
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved

let fetch_config = RpcTransactionConfig {
encoding: Some(UiTransactionEncoding::Json),
commitment: Some(CommitmentConfig::confirmed()),
max_supported_transaction_version: Some(0),
};
let transaction = rpc
.get_transaction_with_config(&signature, fetch_config)
.await?;

Ok(ConfirmationSlotResult {
signature,
slot_sent,
slot_landed: transaction.slot,
send_duration,
})
}

async fn send_transaction_and_get_slot(client: &RpcClient, tx: Transaction) -> anyhow::Result<u64> {
let status = send_and_confirm_transactions(client, &[tx], CommitmentConfig::confirmed(), None)
.await?
.into_iter()
.next()
.unwrap()?
.context("unable to confirm tx")?;
pub async fn rpc_roundtrip_duration(rpc: &RpcClient) -> anyhow::Result<Duration> {
let start = Instant::now();
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
rpc.get_health().await?;
let duration = start.elapsed();
Ok(duration)
}

Ok(status.slot)
async fn submit_ping_thing_stats(
result: &ConfirmationSlotResult,
ping_thing_token: &str,
) -> anyhow::Result<()> {
if result == &ConfirmationSlotResult::default() {
// ignore timed-out transactions
return Ok(());
};
let ping_client = PingThing {
cluster: crate::metrics::ClusterKeys::Mainnet,
Lou-Kamades marked this conversation as resolved.
Show resolved Hide resolved
va_api_key: ping_thing_token.to_string(),
};
ping_client
.submit_confirmed_stats(
result.send_duration,
result.signature,
PingThingTxType::Memo,
true,
result.slot_sent,
result.slot_landed,
)
.await
}
Loading
Loading