From e217ada9f4ff51ded12952f96563182ebbbb83ce Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 10:04:13 +0100 Subject: [PATCH 1/4] Move several structures from send_transaction_service.rs Also introduce TransactionClient, but don't use it in this commit. --- send-transaction-service/src/lib.rs | 2 + .../src/send_transaction_service.rs | 205 +----------------- .../src/send_transaction_service_stats.rs | 148 +++++++++++++ .../src/transaction_client.rs | 161 ++++++++++++++ 4 files changed, 320 insertions(+), 196 deletions(-) create mode 100644 send-transaction-service/src/send_transaction_service_stats.rs create mode 100644 send-transaction-service/src/transaction_client.rs diff --git a/send-transaction-service/src/lib.rs b/send-transaction-service/src/lib.rs index fe2bc0c7af5b7f..960ff1cb3c90e7 100644 --- a/send-transaction-service/src/lib.rs +++ b/send-transaction-service/src/lib.rs @@ -1,6 +1,8 @@ #![allow(clippy::arithmetic_side_effects)] pub mod send_transaction_service; +pub mod send_transaction_service_stats; pub mod tpu_info; +pub mod transaction_client; #[macro_use] extern crate solana_metrics; diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 8cc21b12359639..208fca879155dc 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -1,5 +1,11 @@ use { - crate::tpu_info::TpuInfo, + crate::{ + send_transaction_service_stats::{ + SendTransactionServiceStats, SendTransactionServiceStatsReport, + }, + tpu_info::TpuInfo, + transaction_client::CurrentLeaderInfo, + }, crossbeam_channel::{Receiver, RecvTimeoutError}, log::*, solana_client::connection_cache::{ConnectionCache, Protocol}, @@ -8,7 +14,7 @@ use { solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ clock::Slot, hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, - signature::Signature, timing::AtomicInterval, transport::TransportError, + signature::Signature, transport::TransportError, }, std::{ collections::{ @@ -17,7 +23,7 @@ use { }, net::SocketAddr, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, Mutex, RwLock, }, thread::{self, sleep, Builder, JoinHandle}, @@ -135,199 +141,6 @@ impl Default for Config { /// processing the transactions that need to be retried. pub const MAX_RETRY_SLEEP_MS: u64 = 1000; -/// The leader info refresh rate. -pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000; - -/// A struct responsible for holding up-to-date leader information -/// used for sending transactions. -pub struct CurrentLeaderInfo -where - T: TpuInfo + std::marker::Send + 'static, -{ - /// The last time the leader info was refreshed - last_leader_refresh: Option, - - /// The leader info - leader_info: Option, - - /// How often to refresh the leader info - refresh_rate: Duration, -} - -impl CurrentLeaderInfo -where - T: TpuInfo + std::marker::Send + 'static, -{ - /// Get the leader info, refresh if expired - pub fn get_leader_info(&mut self) -> Option<&T> { - if let Some(leader_info) = self.leader_info.as_mut() { - let now = Instant::now(); - let need_refresh = self - .last_leader_refresh - .map(|last| now.duration_since(last) >= self.refresh_rate) - .unwrap_or(true); - - if need_refresh { - leader_info.refresh_recent_peers(); - self.last_leader_refresh = Some(now); - } - } - self.leader_info.as_ref() - } - - pub fn new(leader_info: Option) -> Self { - Self { - last_leader_refresh: None, - leader_info, - refresh_rate: Duration::from_millis(LEADER_INFO_REFRESH_RATE_MS), - } - } -} - -/// Metrics of the send-transaction-service. -#[derive(Default)] -struct SendTransactionServiceStats { - /// Count of the received transactions - received_transactions: AtomicU64, - - /// Count of the received duplicate transactions - received_duplicate_transactions: AtomicU64, - - /// Count of transactions sent in batch - sent_transactions: AtomicU64, - - /// Count of transactions not being added to retry queue - /// due to queue size limit - retry_queue_overflow: AtomicU64, - - /// retry queue size - retry_queue_size: AtomicU64, - - /// The count of calls of sending transactions which can be in batch or single. - send_attempt_count: AtomicU64, - - /// Time spent on transactions in micro seconds - send_us: AtomicU64, - - /// Send failure count - send_failure_count: AtomicU64, - - /// Count of nonced transactions - nonced_transactions: AtomicU64, - - /// Count of rooted transactions - rooted_transactions: AtomicU64, - - /// Count of expired transactions - expired_transactions: AtomicU64, - - /// Count of transactions exceeding max retries - transactions_exceeding_max_retries: AtomicU64, - - /// Count of retries of transactions - retries: AtomicU64, - - /// Count of transactions failed - failed_transactions: AtomicU64, -} - -#[derive(Default)] -struct SendTransactionServiceStatsReport { - stats: SendTransactionServiceStats, - last_report: AtomicInterval, -} - -impl SendTransactionServiceStatsReport { - /// report metrics of the send transaction service - fn report(&self) { - if self - .last_report - .should_update(SEND_TRANSACTION_METRICS_REPORT_RATE_MS) - { - datapoint_info!( - "send_transaction_service", - ( - "recv-tx", - self.stats.received_transactions.swap(0, Ordering::Relaxed), - i64 - ), - ( - "recv-duplicate", - self.stats - .received_duplicate_transactions - .swap(0, Ordering::Relaxed), - i64 - ), - ( - "sent-tx", - self.stats.sent_transactions.swap(0, Ordering::Relaxed), - i64 - ), - ( - "retry-queue-overflow", - self.stats.retry_queue_overflow.swap(0, Ordering::Relaxed), - i64 - ), - ( - "retry-queue-size", - self.stats.retry_queue_size.swap(0, Ordering::Relaxed), - i64 - ), - ( - "send-us", - self.stats.send_us.swap(0, Ordering::Relaxed), - i64 - ), - ( - "send-attempt-count", - self.stats.send_attempt_count.swap(0, Ordering::Relaxed), - i64 - ), - ( - "send-failure-count", - self.stats.send_failure_count.swap(0, Ordering::Relaxed), - i64 - ), - ( - "nonced-tx", - self.stats.nonced_transactions.swap(0, Ordering::Relaxed), - i64 - ), - ( - "rooted-tx", - self.stats.rooted_transactions.swap(0, Ordering::Relaxed), - i64 - ), - ( - "expired-tx", - self.stats.expired_transactions.swap(0, Ordering::Relaxed), - i64 - ), - ( - "max-retries-exceeded-tx", - self.stats - .transactions_exceeding_max_retries - .swap(0, Ordering::Relaxed), - i64 - ), - ( - "retries", - self.stats.retries.swap(0, Ordering::Relaxed), - i64 - ), - ( - "failed-tx", - self.stats.failed_transactions.swap(0, Ordering::Relaxed), - i64 - ) - ); - } - } -} - -/// Report the send transaction memtrics for every 5 seconds. -const SEND_TRANSACTION_METRICS_REPORT_RATE_MS: u64 = 5000; - impl SendTransactionService { pub fn new( tpu_address: SocketAddr, diff --git a/send-transaction-service/src/send_transaction_service_stats.rs b/send-transaction-service/src/send_transaction_service_stats.rs new file mode 100644 index 00000000000000..98b2fa7f0c8e79 --- /dev/null +++ b/send-transaction-service/src/send_transaction_service_stats.rs @@ -0,0 +1,148 @@ +use { + solana_sdk::timing::AtomicInterval, + std::sync::atomic::{AtomicU64, Ordering}, +}; + +/// Report the send transaction metrics for every 5 seconds. +const SEND_TRANSACTION_METRICS_REPORT_RATE_MS: u64 = 5000; + +/// Metrics of the send-transaction-service. +#[derive(Default)] +pub struct SendTransactionServiceStats { + /// Count of the received transactions + pub received_transactions: AtomicU64, + + /// Count of the received duplicate transactions + pub received_duplicate_transactions: AtomicU64, + + /// Count of transactions sent in batch + pub sent_transactions: AtomicU64, + + /// Count of transactions not being added to retry queue + /// due to queue size limit + pub retry_queue_overflow: AtomicU64, + + /// retry queue size + pub retry_queue_size: AtomicU64, + + /// The count of calls of sending transactions which can be in batch or single. + pub send_attempt_count: AtomicU64, + + /// Time spent on transactions in micro seconds + pub send_us: AtomicU64, + + /// Send failure count + pub send_failure_count: AtomicU64, + + /// Count of nonced transactions + pub nonced_transactions: AtomicU64, + + /// Count of rooted transactions + pub rooted_transactions: AtomicU64, + + /// Count of expired transactions + pub expired_transactions: AtomicU64, + + /// Count of transactions exceeding max retries + pub transactions_exceeding_max_retries: AtomicU64, + + /// Count of retries of transactions + pub retries: AtomicU64, + + /// Count of transactions failed + pub failed_transactions: AtomicU64, +} + +#[derive(Default)] +pub(crate) struct SendTransactionServiceStatsReport { + pub stats: SendTransactionServiceStats, + last_report: AtomicInterval, +} + +impl SendTransactionServiceStatsReport { + /// report metrics of the send transaction service + pub fn report(&self) { + if self + .last_report + .should_update(SEND_TRANSACTION_METRICS_REPORT_RATE_MS) + { + datapoint_info!( + "send_transaction_service", + ( + "recv-tx", + self.stats.received_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "recv-duplicate", + self.stats + .received_duplicate_transactions + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "sent-tx", + self.stats.sent_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "retry-queue-overflow", + self.stats.retry_queue_overflow.swap(0, Ordering::Relaxed), + i64 + ), + ( + "retry-queue-size", + self.stats.retry_queue_size.swap(0, Ordering::Relaxed), + i64 + ), + ( + "send-us", + self.stats.send_us.swap(0, Ordering::Relaxed), + i64 + ), + ( + "send-attempt-count", + self.stats.send_attempt_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "send-failure-count", + self.stats.send_failure_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "nonced-tx", + self.stats.nonced_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "rooted-tx", + self.stats.rooted_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "expired-tx", + self.stats.expired_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "max-retries-exceeded-tx", + self.stats + .transactions_exceeding_max_retries + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "retries", + self.stats.retries.swap(0, Ordering::Relaxed), + i64 + ), + ( + "failed-tx", + self.stats.failed_transactions.swap(0, Ordering::Relaxed), + i64 + ) + ); + } + } +} diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs new file mode 100644 index 00000000000000..19c20c26d350c2 --- /dev/null +++ b/send-transaction-service/src/transaction_client.rs @@ -0,0 +1,161 @@ +use { + crate::{send_transaction_service_stats::SendTransactionServiceStats, tpu_info::TpuInfo}, + log::warn, + solana_client::connection_cache::ConnectionCache, + solana_connection_cache::client_connection::ClientConnection as TpuConnection, + solana_measure::measure::Measure, + std::{ + net::SocketAddr, + sync::{atomic::Ordering, Arc, Mutex}, + time::{Duration, Instant}, + }, +}; + +pub trait TransactionClient { + fn send_transactions_in_batch( + &self, + wire_transactions: Vec>, + stats: &SendTransactionServiceStats, + ); +} + +pub struct ConnectionCacheClient { + connection_cache: Arc, + tpu_address: SocketAddr, + tpu_peers: Option>, + leader_info_provider: Arc>>, + leader_forward_count: u64, +} + +impl ConnectionCacheClient +where + T: TpuInfo + std::marker::Send + 'static, +{ + fn new( + connection_cache: Arc, + tpu_address: SocketAddr, + tpu_peers: Option>, + leader_info: Option, + leader_forward_count: u64, + ) -> Self { + let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info))); + Self { + connection_cache, + tpu_address, + tpu_peers, + leader_info_provider, + leader_forward_count, + } + } + + fn get_tpu_addresses_with_slots<'a>( + &'a self, + leader_info: Option<&'a T>, + ) -> Vec<&'a SocketAddr> { + leader_info + .map(|leader_info| { + leader_info + .get_leader_tpus(self.leader_forward_count, self.connection_cache.protocol()) + }) + .filter(|addresses| !addresses.is_empty()) + .unwrap_or_else(|| vec![&self.tpu_address]) + } + + fn send_transactions( + &self, + peer: &SocketAddr, + wire_transactions: Vec>, + stats: &SendTransactionServiceStats, + ) { + let mut measure = Measure::start("send-us"); + let conn = self.connection_cache.get_connection(peer); + let result = conn.send_data_batch_async(wire_transactions); + + if let Err(err) = result { + warn!( + "Failed to send transaction transaction to {}: {:?}", + self.tpu_address, err + ); + stats.send_failure_count.fetch_add(1, Ordering::Relaxed); + } + + measure.stop(); + stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed); + stats.send_attempt_count.fetch_add(1, Ordering::Relaxed); + } +} + +impl TransactionClient for ConnectionCacheClient +where + T: TpuInfo + std::marker::Send + 'static, +{ + fn send_transactions_in_batch( + &self, + wire_transactions: Vec>, + stats: &SendTransactionServiceStats, + ) { + // Processing the transactions in batch + let mut addresses = self + .tpu_peers + .as_ref() + .map(|addrs| addrs.iter().collect::>()) + .unwrap_or_default(); + let mut leader_info_provider = self.leader_info_provider.lock().unwrap(); + let leader_info = leader_info_provider.get_leader_info(); + let leader_addresses = self.get_tpu_addresses_with_slots(leader_info); + addresses.extend(leader_addresses); + + for address in &addresses { + self.send_transactions(address, wire_transactions.clone(), stats); + } + } +} + +/// The leader info refresh rate. +pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000; + +/// A struct responsible for holding up-to-date leader information +/// used for sending transactions. +pub(crate) struct CurrentLeaderInfo +where + T: TpuInfo + std::marker::Send + 'static, +{ + /// The last time the leader info was refreshed + last_leader_refresh: Option, + + /// The leader info + leader_info: Option, + + /// How often to refresh the leader info + refresh_rate: Duration, +} + +impl CurrentLeaderInfo +where + T: TpuInfo + std::marker::Send + 'static, +{ + /// Get the leader info, refresh if expired + pub fn get_leader_info(&mut self) -> Option<&T> { + if let Some(leader_info) = self.leader_info.as_mut() { + let now = Instant::now(); + let need_refresh = self + .last_leader_refresh + .map(|last| now.duration_since(last) >= self.refresh_rate) + .unwrap_or(true); + + if need_refresh { + leader_info.refresh_recent_peers(); + self.last_leader_refresh = Some(now); + } + } + self.leader_info.as_ref() + } + + pub fn new(leader_info: Option) -> Self { + Self { + last_leader_refresh: None, + leader_info, + refresh_rate: Duration::from_millis(LEADER_INFO_REFRESH_RATE_MS), + } + } +} From b8236d1ed2dcf1530a6aa4a414b44dbdb81ce101 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 11:54:33 +0100 Subject: [PATCH 2/4] use new ConnectionClientClient code --- Cargo.lock | 1 + banks-server/src/banks_server.rs | 2 +- rpc/src/rpc.rs | 2 +- rpc/src/rpc_service.rs | 2 +- send-transaction-service/Cargo.toml | 1 + .../src/send_transaction_service.rs | 425 +++++++----------- .../src/transaction_client.rs | 18 +- 7 files changed, 180 insertions(+), 271 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d3b18093e7e78f..5d45886d488a0d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8150,6 +8150,7 @@ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ "crossbeam-channel", + "itertools 0.12.1", "log", "solana-client", "solana-connection-cache", diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index f2d2d10da85abb..4ff7368a961d49 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -458,7 +458,7 @@ pub async fn start_tcp_server( &bank_forks, None, receiver, - &connection_cache, + connection_cache.clone(), 5_000, 0, exit.clone(), diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 40bd47f71c0e16..2fbf66ec6988b9 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -377,7 +377,7 @@ impl JsonRpcRequestProcessor { &bank_forks, None, receiver, - &connection_cache, + connection_cache, 1000, 1, exit.clone(), diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index fed748d709472b..f388433d611923 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -479,7 +479,7 @@ impl JsonRpcService { &bank_forks, leader_info, receiver, - &connection_cache, + connection_cache, send_transaction_service_config, exit, )); diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml index a69c366a358fdc..07ad3f5a5b886c 100644 --- a/send-transaction-service/Cargo.toml +++ b/send-transaction-service/Cargo.toml @@ -11,6 +11,7 @@ edition = { workspace = true } [dependencies] crossbeam-channel = { workspace = true } +itertools = { workspace = true } log = { workspace = true } solana-client = { workspace = true } solana-connection-cache = { workspace = true } diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 208fca879155dc..0755cfb968b7ef 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -4,17 +4,15 @@ use { SendTransactionServiceStats, SendTransactionServiceStatsReport, }, tpu_info::TpuInfo, - transaction_client::CurrentLeaderInfo, + transaction_client::{ConnectionCacheClient, TransactionClient}, }, crossbeam_channel::{Receiver, RecvTimeoutError}, + itertools::Itertools, log::*, - solana_client::connection_cache::{ConnectionCache, Protocol}, - solana_connection_cache::client_connection::ClientConnection as TpuConnection, - solana_measure::measure::Measure, + solana_client::connection_cache::ConnectionCache, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ - clock::Slot, hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, - signature::Signature, transport::TransportError, + hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature, }, std::{ collections::{ @@ -147,7 +145,7 @@ impl SendTransactionService { bank_forks: &Arc>, leader_info: Option, receiver: Receiver, - connection_cache: &Arc, + connection_cache: Arc, retry_rate_ms: u64, leader_forward_count: u64, exit: Arc, @@ -173,7 +171,7 @@ impl SendTransactionService { bank_forks: &Arc>, leader_info: Option, receiver: Receiver, - connection_cache: &Arc, + connection_cache: Arc, config: Config, exit: Arc, ) -> Self { @@ -181,26 +179,33 @@ impl SendTransactionService { let retry_transactions = Arc::new(Mutex::new(HashMap::new())); - let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info))); + let client = ConnectionCacheClient::new( + connection_cache, + tpu_address, + config.tpu_peers, + leader_info, + config.leader_forward_count, + ); let receive_txn_thread = Self::receive_txn_thread( - tpu_address, receiver, - leader_info_provider.clone(), - connection_cache.clone(), - config.clone(), + client.clone(), retry_transactions.clone(), stats_report.clone(), + config.batch_send_rate_ms, + config.batch_size, + config.retry_pool_max_size, exit.clone(), ); let retry_thread = Self::retry_thread( - tpu_address, bank_forks.clone(), - leader_info_provider, - connection_cache.clone(), - config, + client, retry_transactions, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, stats_report, exit.clone(), ); @@ -213,26 +218,23 @@ impl SendTransactionService { /// Thread responsible for receiving transactions from RPC clients. fn receive_txn_thread( - tpu_address: SocketAddr, receiver: Receiver, - leader_info_provider: Arc>>, - connection_cache: Arc, - config: Config, + client: ConnectionCacheClient, retry_transactions: Arc>>, stats_report: Arc, + batch_send_rate_ms: u64, + batch_size: usize, + retry_pool_max_size: usize, exit: Arc, ) -> JoinHandle<()> { let mut last_batch_sent = Instant::now(); let mut transactions = HashMap::new(); - info!( - "Starting send-transaction-service::receive_txn_thread with config {:?}", - config - ); + info!("Starting send-transaction-service::receive_txn_thread with config.",); Builder::new() .name("solStxReceive".to_string()) .spawn(move || loop { - let recv_timeout_ms = config.batch_send_rate_ms; + let recv_timeout_ms = batch_send_rate_ms; let stats = &stats_report.stats; let recv_result = receiver.recv_timeout(Duration::from_millis(recv_timeout_ms)); if exit.load(Ordering::Relaxed) { @@ -268,20 +270,17 @@ impl SendTransactionService { } if (!transactions.is_empty() - && last_batch_sent.elapsed().as_millis() as u64 >= config.batch_send_rate_ms) - || transactions.len() >= config.batch_size + && last_batch_sent.elapsed().as_millis() as u64 >= batch_send_rate_ms) + || transactions.len() >= batch_size { stats .sent_transactions .fetch_add(transactions.len() as u64, Ordering::Relaxed); - Self::send_transactions_in_batch( - &tpu_address, - &transactions, - leader_info_provider.lock().unwrap().get_leader_info(), - &connection_cache, - &config, - stats, - ); + let wire_transactions = transactions + .values() + .map(|transaction_info| transaction_info.wire_transaction.clone()) + .collect::>>(); + client.send_transactions_in_batch(wire_transactions, stats); let last_sent_time = Instant::now(); { // take a lock of retry_transactions and move the batch to the retry set. @@ -292,7 +291,7 @@ impl SendTransactionService { let retry_len = retry_transactions.len(); let entry = retry_transactions.entry(signature); if let Entry::Vacant(_) = entry { - if retry_len >= config.retry_pool_max_size { + if retry_len >= retry_pool_max_size { break; } else { transaction_info.last_sent_time = Some(last_sent_time); @@ -319,23 +318,21 @@ impl SendTransactionService { /// Thread responsible for retrying transactions fn retry_thread( - tpu_address: SocketAddr, bank_forks: Arc>, - leader_info_provider: Arc>>, - connection_cache: Arc, - config: Config, + client: ConnectionCacheClient, retry_transactions: Arc>>, + retry_rate_ms: u64, + service_max_retries: usize, + default_max_retries: Option, + batch_size: usize, stats_report: Arc, exit: Arc, ) -> JoinHandle<()> { - info!( - "Starting send-transaction-service::retry_thread with config {:?}", - config - ); + info!("Starting send-transaction-service::retry_thread with config."); Builder::new() .name("solStxRetry".to_string()) .spawn(move || loop { - let retry_interval_ms = config.retry_rate_ms; + let retry_interval_ms = retry_rate_ms; let stats = &stats_report.stats; sleep(Duration::from_millis( MAX_RETRY_SLEEP_MS.min(retry_interval_ms), @@ -356,11 +353,12 @@ impl SendTransactionService { let _result = Self::process_transactions( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + retry_rate_ms, + service_max_retries, + default_max_retries, + batch_size, stats, ); stats_report.report(); @@ -369,60 +367,22 @@ impl SendTransactionService { .unwrap() } - /// Process transactions in batch. - fn send_transactions_in_batch( - tpu_address: &SocketAddr, - transactions: &HashMap, - leader_info: Option<&T>, - connection_cache: &Arc, - config: &Config, - stats: &SendTransactionServiceStats, - ) { - // Processing the transactions in batch - let mut addresses = config - .tpu_peers - .as_ref() - .map(|addrs| addrs.iter().map(|a| (a, 0)).collect::>()) - .unwrap_or_default(); - let leader_addresses = Self::get_tpu_addresses_with_slots( - tpu_address, - leader_info, - config, - connection_cache.protocol(), - ); - addresses.extend(leader_addresses); - - let wire_transactions = transactions - .iter() - .map(|(_, transaction_info)| { - debug!( - "Sending transacation {} to (address, slot): {:?}", - transaction_info.signature, addresses, - ); - transaction_info.wire_transaction.as_ref() - }) - .collect::>(); - - for (address, _) in &addresses { - Self::send_transactions(address, &wire_transactions, connection_cache, stats); - } - } - /// Retry transactions sent before. fn process_transactions( working_bank: &Bank, root_bank: &Bank, - tpu_address: &SocketAddr, transactions: &mut HashMap, - leader_info_provider: &Arc>>, - connection_cache: &Arc, - config: &Config, + client: &ConnectionCacheClient, + retry_rate_ms: u64, + service_max_retries: usize, + default_max_retries: Option, + batch_size: usize, stats: &SendTransactionServiceStats, ) -> ProcessTransactionsResult { let mut result = ProcessTransactionsResult::default(); let mut batched_transactions = HashSet::new(); - let retry_rate = Duration::from_millis(config.retry_rate_ms); + let retry_rate = Duration::from_millis(retry_rate_ms); transactions.retain(|signature, transaction_info| { if transaction_info.durable_nonce_info.is_some() { @@ -460,8 +420,8 @@ impl SendTransactionService { let max_retries = transaction_info .max_retries - .or(config.default_max_retries) - .map(|max_retries| max_retries.min(config.service_max_retries)); + .or(default_max_retries) + .map(|max_retries| max_retries.min(service_max_retries)); if let Some(max_retries) = max_retries { if transaction_info.retries >= max_retries { @@ -516,114 +476,17 @@ impl SendTransactionService { let wire_transactions = transactions .iter() .filter(|(signature, _)| batched_transactions.contains(signature)) - .map(|(_, transaction_info)| transaction_info.wire_transaction.as_ref()) - .collect::>(); - - let iter = wire_transactions.chunks(config.batch_size); - for chunk in iter { - let mut addresses = config - .tpu_peers - .as_ref() - .map(|addrs| addrs.iter().collect::>()) - .unwrap_or_default(); - let mut leader_info_provider = leader_info_provider.lock().unwrap(); - let leader_info = leader_info_provider.get_leader_info(); - let leader_addresses = Self::get_tpu_addresses( - tpu_address, - leader_info, - config, - connection_cache.protocol(), - ); - addresses.extend(leader_addresses); - - for address in &addresses { - Self::send_transactions(address, chunk, connection_cache, stats); - } + .map(|(_, transaction_info)| transaction_info.wire_transaction.clone()); + + let iter = wire_transactions.chunks(batch_size); + for chunk in &iter { + let chunk = chunk.collect(); + client.send_transactions_in_batch(chunk, stats); } } result } - fn send_transaction( - tpu_address: &SocketAddr, - wire_transaction: &[u8], - connection_cache: &Arc, - ) -> Result<(), TransportError> { - let conn = connection_cache.get_connection(tpu_address); - conn.send_data_async(wire_transaction.to_vec()) - } - - fn send_transactions_with_metrics( - tpu_address: &SocketAddr, - wire_transactions: &[&[u8]], - connection_cache: &Arc, - ) -> Result<(), TransportError> { - let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect(); - let conn = connection_cache.get_connection(tpu_address); - conn.send_data_batch_async(wire_transactions) - } - - fn send_transactions( - tpu_address: &SocketAddr, - wire_transactions: &[&[u8]], - connection_cache: &Arc, - stats: &SendTransactionServiceStats, - ) { - let mut measure = Measure::start("send-us"); - let result = if wire_transactions.len() == 1 { - Self::send_transaction(tpu_address, wire_transactions[0], connection_cache) - } else { - Self::send_transactions_with_metrics(tpu_address, wire_transactions, connection_cache) - }; - - if let Err(err) = result { - warn!( - "Failed to send transaction transaction to {}: {:?}", - tpu_address, err - ); - stats.send_failure_count.fetch_add(1, Ordering::Relaxed); - } - - measure.stop(); - stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed); - stats.send_attempt_count.fetch_add(1, Ordering::Relaxed); - } - - fn get_tpu_addresses<'a, T: TpuInfo>( - tpu_address: &'a SocketAddr, - leader_info: Option<&'a T>, - config: &'a Config, - protocol: Protocol, - ) -> Vec<&'a SocketAddr> { - let addresses = leader_info - .as_ref() - .map(|leader_info| leader_info.get_leader_tpus(config.leader_forward_count, protocol)); - addresses - .map(|address_list| { - if address_list.is_empty() { - vec![tpu_address] - } else { - address_list - } - }) - .unwrap_or_else(|| vec![tpu_address]) - } - - fn get_tpu_addresses_with_slots<'a, T: TpuInfo>( - tpu_address: &'a SocketAddr, - leader_info: Option<&'a T>, - config: &'a Config, - protocol: Protocol, - ) -> Vec<(&'a SocketAddr, Slot)> { - leader_info - .as_ref() - .map(|leader_info| { - leader_info.get_leader_tpus_with_slots(config.leader_forward_count, protocol) - }) - .filter(|addresses| !addresses.is_empty()) - .unwrap_or_else(|| vec![(tpu_address, 0)]) - } - pub fn join(self) -> thread::Result<()> { self.receive_txn_thread.join()?; self.exit.store(true, Ordering::Relaxed); @@ -661,7 +524,7 @@ mod test { &bank_forks, None, receiver, - &connection_cache, + connection_cache, 1000, 1, Arc::new(AtomicBool::new(false)), @@ -695,7 +558,7 @@ mod test { &bank_forks, None, receiver, - &connection_cache, + connection_cache, 1000, 1, exit.clone(), @@ -713,6 +576,22 @@ mod test { } } + fn create_client( + tpu_peers: Option>, + leader_forward_count: u64, + ) -> ConnectionCacheClient { + let tpu_address = "127.0.0.1:0".parse().unwrap(); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + ConnectionCacheClient::new( + connection_cache, + tpu_address, + tpu_peers, + None, + leader_forward_count, + ) + } + #[test] fn process_transactions() { solana_logger::setup(); @@ -720,7 +599,6 @@ mod test { let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let tpu_address = "127.0.0.1:0".parse().unwrap(); let config = Config { leader_forward_count: 1, ..Config::default() @@ -767,7 +645,6 @@ mod test { let mut transactions = HashMap::new(); info!("Expired transactions are dropped..."); - let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); let stats = SendTransactionServiceStats::default(); transactions.insert( Signature::default(), @@ -780,15 +657,17 @@ mod test { Some(Instant::now()), ), ); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + let client = create_client(config.tpu_peers, config.leader_forward_count); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -815,11 +694,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -846,11 +726,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -877,11 +758,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -910,11 +792,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -953,11 +836,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -972,11 +856,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -996,7 +881,6 @@ mod test { let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let tpu_address = "127.0.0.1:0".parse().unwrap(); let config = Config { leader_forward_count: 1, ..Config::default() @@ -1064,17 +948,17 @@ mod test { Some(Instant::now()), ), ); - let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); let stats = SendTransactionServiceStats::default(); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + let client = create_client(config.tpu_peers, config.leader_forward_count); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1100,11 +984,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1132,11 +1017,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1162,11 +1048,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1193,11 +1080,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1224,11 +1112,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -1257,11 +1146,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -1287,11 +1177,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 0); diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index 19c20c26d350c2..c38007d64ec67a 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -27,11 +27,27 @@ pub struct ConnectionCacheClient { leader_forward_count: u64, } +// Manual implementation of Clone without requiring T to be Clone +impl Clone for ConnectionCacheClient +where + T: TpuInfo + std::marker::Send + 'static, +{ + fn clone(&self) -> Self { + Self { + connection_cache: Arc::clone(&self.connection_cache), + tpu_address: self.tpu_address, + tpu_peers: self.tpu_peers.clone(), + leader_info_provider: Arc::clone(&self.leader_info_provider), + leader_forward_count: self.leader_forward_count, + } + } +} + impl ConnectionCacheClient where T: TpuInfo + std::marker::Send + 'static, { - fn new( + pub fn new( connection_cache: Arc, tpu_address: SocketAddr, tpu_peers: Option>, From 95811e96f3aa63c6ee50f82f9efd9e79a348e264 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 12:19:32 +0100 Subject: [PATCH 3/4] Update Cargo.lock --- programs/sbf/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index ee1c9b2fc8ae09..38ab0fee325503 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6889,6 +6889,7 @@ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ "crossbeam-channel", + "itertools 0.12.1", "log", "solana-client", "solana-connection-cache", From 213aa75214b2a519593fbef987e05cb926e30d56 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 4 Nov 2024 08:04:41 +0100 Subject: [PATCH 4/4] pass CC by value in some tests --- rpc/src/rpc.rs | 4 ++-- send-transaction-service/src/transaction_client.rs | 7 ++----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 2fbf66ec6988b9..215b23d8732b3f 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -6486,7 +6486,7 @@ pub mod tests { &bank_forks, None, receiver, - &connection_cache, + connection_cache, 1000, 1, exit, @@ -6760,7 +6760,7 @@ pub mod tests { &bank_forks, None, receiver, - &connection_cache, + connection_cache, 1000, 1, exit, diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index c38007d64ec67a..d7910b2b8609c3 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -64,10 +64,7 @@ where } } - fn get_tpu_addresses_with_slots<'a>( - &'a self, - leader_info: Option<&'a T>, - ) -> Vec<&'a SocketAddr> { + fn get_tpu_addresses<'a>(&'a self, leader_info: Option<&'a T>) -> Vec<&'a SocketAddr> { leader_info .map(|leader_info| { leader_info @@ -118,7 +115,7 @@ where .unwrap_or_default(); let mut leader_info_provider = self.leader_info_provider.lock().unwrap(); let leader_info = leader_info_provider.get_leader_info(); - let leader_addresses = self.get_tpu_addresses_with_slots(leader_info); + let leader_addresses = self.get_tpu_addresses(leader_info); addresses.extend(leader_addresses); for address in &addresses {