Skip to content

Commit

Permalink
Adding custom tpu send transaction example (#380)
Browse files Browse the repository at this point in the history
* Adding an example for custom tpu send transaction

* Fixing the custom tpu example

* Optimizing SentTransactionInfo, and calculating TPS

* Reverting unwanted changes

* After groovies review
  • Loading branch information
godmodegalactus authored Apr 2, 2024
1 parent 4d77001 commit 6813341
Show file tree
Hide file tree
Showing 19 changed files with 667 additions and 57 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ pub fn poll_cluster_info(
loop {
match rpc_client.get_cluster_nodes().await {
Ok(cluster_nodes) => {
debug!("get cluster_nodes from rpc: {:?}", cluster_nodes.len());
if let Err(e) = contact_info_sender.send(cluster_nodes) {
warn!("rpc_cluster_info channel has no receivers {e:?}");
}
Expand All @@ -23,7 +22,7 @@ pub fn poll_cluster_info(
Err(error) => {
warn!("rpc_cluster_info failed <{:?}> - retrying", error);
// throttle
tokio::time::sleep(Duration::from_secs(2500)).await;
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
Expand Down Expand Up @@ -51,7 +50,7 @@ pub fn poll_vote_accounts(
Err(error) => {
warn!("rpc_vote_accounts failed <{:?}> - retrying", error);
// throttle
tokio::time::sleep(Duration::from_secs(2500)).await;
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ pub mod stores;
pub mod structures;
pub mod traits;
pub mod types;
pub mod utils;

pub type AnyhowJoinHandle = tokio::task::JoinHandle<anyhow::Result<()>>;
6 changes: 3 additions & 3 deletions core/src/structures/prioritization_fee_heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl PrioritizationFeesHeap {
#[cfg(test)]
mod tests {
use solana_sdk::signature::Signature;
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use crate::structures::{
prioritization_fee_heap::PrioritizationFeesHeap, transaction_sent_info::SentTransactionInfo,
Expand All @@ -139,7 +139,7 @@ mod tests {
let tx_creator = |signature, prioritization_fee| SentTransactionInfo {
signature,
slot: 0,
transaction: vec![],
transaction: Arc::new(vec![]),
last_valid_block_height: 0,
prioritization_fee,
};
Expand Down Expand Up @@ -205,7 +205,7 @@ mod tests {
let info = SentTransactionInfo {
signature: Signature::new_unique(),
slot: height + 1,
transaction: vec![],
transaction: Arc::new(vec![]),
last_valid_block_height: height + 10,
prioritization_fee,
};
Expand Down
4 changes: 3 additions & 1 deletion core/src/structures/transaction_sent_info.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use solana_sdk::signature::Signature;
use solana_sdk::slot_history::Slot;

Expand All @@ -7,7 +9,7 @@ pub type WireTransaction = Vec<u8>;
pub struct SentTransactionInfo {
pub signature: Signature,
pub slot: Slot,
pub transaction: WireTransaction,
pub transaction: Arc<WireTransaction>,
pub last_valid_block_height: u64,
pub prioritization_fee: u64,
}
33 changes: 33 additions & 0 deletions core/src/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::time::Duration;

use log::debug;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::time::{timeout, Instant};

use crate::{structures::block_info::BlockInfo, types::BlockInfoStream};

pub async fn wait_till_block_of_commitment_is_recieved(
mut blockinfo_stream: BlockInfoStream,
commitment_config: CommitmentConfig,
) -> BlockInfo {
let started = Instant::now();
loop {
match timeout(Duration::from_millis(1000), blockinfo_stream.recv()).await {
Ok(Ok(block_info)) => {
if block_info.commitment_config == commitment_config {
return block_info;
}
}
Err(_elapsed) => {
debug!(
"waiting for latest block info ({}) ... {:.02}ms",
commitment_config.commitment,
started.elapsed().as_secs_f32() * 1000.0
);
}
Ok(Err(error)) => {
panic!("Did not recv block info : {error:?}");
}
}
}
}
18 changes: 17 additions & 1 deletion examples/custom-tpu-send-transactions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,20 @@ edition.workspace = true

[dependencies]
solana-lite-rpc-services = {workspace = true}
solana-lite-rpc-core = {workspace = true}
solana-lite-rpc-core = {workspace = true}
solana-lite-rpc-cluster-endpoints = {workspace = true}

solana-sdk = { workspace = true }
solana-rpc-client = { workspace = true }

tokio = "1.28.2"
clap = { workspace = true }
anyhow = { workspace = true }
dashmap = { workspace = true }
rand = "0.8.5"
rand_chacha = "0.3.1"
log = { workspace = true }
itertools = { workspace = true }
bincode = { workspace = true }
futures = { workspace = true }
tracing-subscriber = { workspace = true }
39 changes: 39 additions & 0 deletions examples/custom-tpu-send-transactions/src/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use clap::Parser;

#[derive(Parser, Debug, Clone)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// config.json
#[arg(short, long, default_value = "http://127.0.0.1:8899")]
pub rpc_url: String,

#[arg(short, long)]
pub grpc_url: Option<String>,

#[arg(short, long)]
pub x_token: Option<String>,

#[arg(short, long)]
pub transaction_count: Option<usize>,

#[arg(short, long, default_value_t = 1)]
pub number_of_seconds: usize,

#[arg(short, long)]
pub fee_payer: String,

#[arg(short, long)]
pub staked_identity: Option<String>,

#[arg(short, long)]
pub priority_fees: Option<u64>,

#[arg(short = 'a', long, default_value_t = 256)]
pub additional_signers: usize,

#[arg(short = 'b', long, default_value_t = 0.1)]
pub signers_transfer_balance: f64,

#[arg(long)]
pub fanout_slots: Option<u64>,
}
Loading

0 comments on commit 6813341

Please sign in to comment.