From 571c8da7d855185167bb7cbbd500ffa3b85dcf1f Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 10:04:13 +0100 Subject: [PATCH 01/17] Move several structures from send_transaction_service.rs Also introduce TransactionClient, but don't use it in this commit. --- send-transaction-service/src/lib.rs | 2 + .../src/send_transaction_service.rs | 205 +----------------- .../src/send_transaction_service_stats.rs | 148 +++++++++++++ .../src/transaction_client.rs | 161 ++++++++++++++ 4 files changed, 320 insertions(+), 196 deletions(-) create mode 100644 send-transaction-service/src/send_transaction_service_stats.rs create mode 100644 send-transaction-service/src/transaction_client.rs diff --git a/send-transaction-service/src/lib.rs b/send-transaction-service/src/lib.rs index fe2bc0c7af5b7f..960ff1cb3c90e7 100644 --- a/send-transaction-service/src/lib.rs +++ b/send-transaction-service/src/lib.rs @@ -1,6 +1,8 @@ #![allow(clippy::arithmetic_side_effects)] pub mod send_transaction_service; +pub mod send_transaction_service_stats; pub mod tpu_info; +pub mod transaction_client; #[macro_use] extern crate solana_metrics; diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 8cc21b12359639..208fca879155dc 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -1,5 +1,11 @@ use { - crate::tpu_info::TpuInfo, + crate::{ + send_transaction_service_stats::{ + SendTransactionServiceStats, SendTransactionServiceStatsReport, + }, + tpu_info::TpuInfo, + transaction_client::CurrentLeaderInfo, + }, crossbeam_channel::{Receiver, RecvTimeoutError}, log::*, solana_client::connection_cache::{ConnectionCache, Protocol}, @@ -8,7 +14,7 @@ use { solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ clock::Slot, hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, - signature::Signature, timing::AtomicInterval, transport::TransportError, + signature::Signature, transport::TransportError, }, std::{ collections::{ @@ -17,7 +23,7 @@ use { }, net::SocketAddr, sync::{ - atomic::{AtomicBool, AtomicU64, Ordering}, + atomic::{AtomicBool, Ordering}, Arc, Mutex, RwLock, }, thread::{self, sleep, Builder, JoinHandle}, @@ -135,199 +141,6 @@ impl Default for Config { /// processing the transactions that need to be retried. pub const MAX_RETRY_SLEEP_MS: u64 = 1000; -/// The leader info refresh rate. -pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000; - -/// A struct responsible for holding up-to-date leader information -/// used for sending transactions. -pub struct CurrentLeaderInfo -where - T: TpuInfo + std::marker::Send + 'static, -{ - /// The last time the leader info was refreshed - last_leader_refresh: Option, - - /// The leader info - leader_info: Option, - - /// How often to refresh the leader info - refresh_rate: Duration, -} - -impl CurrentLeaderInfo -where - T: TpuInfo + std::marker::Send + 'static, -{ - /// Get the leader info, refresh if expired - pub fn get_leader_info(&mut self) -> Option<&T> { - if let Some(leader_info) = self.leader_info.as_mut() { - let now = Instant::now(); - let need_refresh = self - .last_leader_refresh - .map(|last| now.duration_since(last) >= self.refresh_rate) - .unwrap_or(true); - - if need_refresh { - leader_info.refresh_recent_peers(); - self.last_leader_refresh = Some(now); - } - } - self.leader_info.as_ref() - } - - pub fn new(leader_info: Option) -> Self { - Self { - last_leader_refresh: None, - leader_info, - refresh_rate: Duration::from_millis(LEADER_INFO_REFRESH_RATE_MS), - } - } -} - -/// Metrics of the send-transaction-service. -#[derive(Default)] -struct SendTransactionServiceStats { - /// Count of the received transactions - received_transactions: AtomicU64, - - /// Count of the received duplicate transactions - received_duplicate_transactions: AtomicU64, - - /// Count of transactions sent in batch - sent_transactions: AtomicU64, - - /// Count of transactions not being added to retry queue - /// due to queue size limit - retry_queue_overflow: AtomicU64, - - /// retry queue size - retry_queue_size: AtomicU64, - - /// The count of calls of sending transactions which can be in batch or single. - send_attempt_count: AtomicU64, - - /// Time spent on transactions in micro seconds - send_us: AtomicU64, - - /// Send failure count - send_failure_count: AtomicU64, - - /// Count of nonced transactions - nonced_transactions: AtomicU64, - - /// Count of rooted transactions - rooted_transactions: AtomicU64, - - /// Count of expired transactions - expired_transactions: AtomicU64, - - /// Count of transactions exceeding max retries - transactions_exceeding_max_retries: AtomicU64, - - /// Count of retries of transactions - retries: AtomicU64, - - /// Count of transactions failed - failed_transactions: AtomicU64, -} - -#[derive(Default)] -struct SendTransactionServiceStatsReport { - stats: SendTransactionServiceStats, - last_report: AtomicInterval, -} - -impl SendTransactionServiceStatsReport { - /// report metrics of the send transaction service - fn report(&self) { - if self - .last_report - .should_update(SEND_TRANSACTION_METRICS_REPORT_RATE_MS) - { - datapoint_info!( - "send_transaction_service", - ( - "recv-tx", - self.stats.received_transactions.swap(0, Ordering::Relaxed), - i64 - ), - ( - "recv-duplicate", - self.stats - .received_duplicate_transactions - .swap(0, Ordering::Relaxed), - i64 - ), - ( - "sent-tx", - self.stats.sent_transactions.swap(0, Ordering::Relaxed), - i64 - ), - ( - "retry-queue-overflow", - self.stats.retry_queue_overflow.swap(0, Ordering::Relaxed), - i64 - ), - ( - "retry-queue-size", - self.stats.retry_queue_size.swap(0, Ordering::Relaxed), - i64 - ), - ( - "send-us", - self.stats.send_us.swap(0, Ordering::Relaxed), - i64 - ), - ( - "send-attempt-count", - self.stats.send_attempt_count.swap(0, Ordering::Relaxed), - i64 - ), - ( - "send-failure-count", - self.stats.send_failure_count.swap(0, Ordering::Relaxed), - i64 - ), - ( - "nonced-tx", - self.stats.nonced_transactions.swap(0, Ordering::Relaxed), - i64 - ), - ( - "rooted-tx", - self.stats.rooted_transactions.swap(0, Ordering::Relaxed), - i64 - ), - ( - "expired-tx", - self.stats.expired_transactions.swap(0, Ordering::Relaxed), - i64 - ), - ( - "max-retries-exceeded-tx", - self.stats - .transactions_exceeding_max_retries - .swap(0, Ordering::Relaxed), - i64 - ), - ( - "retries", - self.stats.retries.swap(0, Ordering::Relaxed), - i64 - ), - ( - "failed-tx", - self.stats.failed_transactions.swap(0, Ordering::Relaxed), - i64 - ) - ); - } - } -} - -/// Report the send transaction memtrics for every 5 seconds. -const SEND_TRANSACTION_METRICS_REPORT_RATE_MS: u64 = 5000; - impl SendTransactionService { pub fn new( tpu_address: SocketAddr, diff --git a/send-transaction-service/src/send_transaction_service_stats.rs b/send-transaction-service/src/send_transaction_service_stats.rs new file mode 100644 index 00000000000000..98b2fa7f0c8e79 --- /dev/null +++ b/send-transaction-service/src/send_transaction_service_stats.rs @@ -0,0 +1,148 @@ +use { + solana_sdk::timing::AtomicInterval, + std::sync::atomic::{AtomicU64, Ordering}, +}; + +/// Report the send transaction metrics for every 5 seconds. +const SEND_TRANSACTION_METRICS_REPORT_RATE_MS: u64 = 5000; + +/// Metrics of the send-transaction-service. +#[derive(Default)] +pub struct SendTransactionServiceStats { + /// Count of the received transactions + pub received_transactions: AtomicU64, + + /// Count of the received duplicate transactions + pub received_duplicate_transactions: AtomicU64, + + /// Count of transactions sent in batch + pub sent_transactions: AtomicU64, + + /// Count of transactions not being added to retry queue + /// due to queue size limit + pub retry_queue_overflow: AtomicU64, + + /// retry queue size + pub retry_queue_size: AtomicU64, + + /// The count of calls of sending transactions which can be in batch or single. + pub send_attempt_count: AtomicU64, + + /// Time spent on transactions in micro seconds + pub send_us: AtomicU64, + + /// Send failure count + pub send_failure_count: AtomicU64, + + /// Count of nonced transactions + pub nonced_transactions: AtomicU64, + + /// Count of rooted transactions + pub rooted_transactions: AtomicU64, + + /// Count of expired transactions + pub expired_transactions: AtomicU64, + + /// Count of transactions exceeding max retries + pub transactions_exceeding_max_retries: AtomicU64, + + /// Count of retries of transactions + pub retries: AtomicU64, + + /// Count of transactions failed + pub failed_transactions: AtomicU64, +} + +#[derive(Default)] +pub(crate) struct SendTransactionServiceStatsReport { + pub stats: SendTransactionServiceStats, + last_report: AtomicInterval, +} + +impl SendTransactionServiceStatsReport { + /// report metrics of the send transaction service + pub fn report(&self) { + if self + .last_report + .should_update(SEND_TRANSACTION_METRICS_REPORT_RATE_MS) + { + datapoint_info!( + "send_transaction_service", + ( + "recv-tx", + self.stats.received_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "recv-duplicate", + self.stats + .received_duplicate_transactions + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "sent-tx", + self.stats.sent_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "retry-queue-overflow", + self.stats.retry_queue_overflow.swap(0, Ordering::Relaxed), + i64 + ), + ( + "retry-queue-size", + self.stats.retry_queue_size.swap(0, Ordering::Relaxed), + i64 + ), + ( + "send-us", + self.stats.send_us.swap(0, Ordering::Relaxed), + i64 + ), + ( + "send-attempt-count", + self.stats.send_attempt_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "send-failure-count", + self.stats.send_failure_count.swap(0, Ordering::Relaxed), + i64 + ), + ( + "nonced-tx", + self.stats.nonced_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "rooted-tx", + self.stats.rooted_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "expired-tx", + self.stats.expired_transactions.swap(0, Ordering::Relaxed), + i64 + ), + ( + "max-retries-exceeded-tx", + self.stats + .transactions_exceeding_max_retries + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "retries", + self.stats.retries.swap(0, Ordering::Relaxed), + i64 + ), + ( + "failed-tx", + self.stats.failed_transactions.swap(0, Ordering::Relaxed), + i64 + ) + ); + } + } +} diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs new file mode 100644 index 00000000000000..19c20c26d350c2 --- /dev/null +++ b/send-transaction-service/src/transaction_client.rs @@ -0,0 +1,161 @@ +use { + crate::{send_transaction_service_stats::SendTransactionServiceStats, tpu_info::TpuInfo}, + log::warn, + solana_client::connection_cache::ConnectionCache, + solana_connection_cache::client_connection::ClientConnection as TpuConnection, + solana_measure::measure::Measure, + std::{ + net::SocketAddr, + sync::{atomic::Ordering, Arc, Mutex}, + time::{Duration, Instant}, + }, +}; + +pub trait TransactionClient { + fn send_transactions_in_batch( + &self, + wire_transactions: Vec>, + stats: &SendTransactionServiceStats, + ); +} + +pub struct ConnectionCacheClient { + connection_cache: Arc, + tpu_address: SocketAddr, + tpu_peers: Option>, + leader_info_provider: Arc>>, + leader_forward_count: u64, +} + +impl ConnectionCacheClient +where + T: TpuInfo + std::marker::Send + 'static, +{ + fn new( + connection_cache: Arc, + tpu_address: SocketAddr, + tpu_peers: Option>, + leader_info: Option, + leader_forward_count: u64, + ) -> Self { + let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info))); + Self { + connection_cache, + tpu_address, + tpu_peers, + leader_info_provider, + leader_forward_count, + } + } + + fn get_tpu_addresses_with_slots<'a>( + &'a self, + leader_info: Option<&'a T>, + ) -> Vec<&'a SocketAddr> { + leader_info + .map(|leader_info| { + leader_info + .get_leader_tpus(self.leader_forward_count, self.connection_cache.protocol()) + }) + .filter(|addresses| !addresses.is_empty()) + .unwrap_or_else(|| vec![&self.tpu_address]) + } + + fn send_transactions( + &self, + peer: &SocketAddr, + wire_transactions: Vec>, + stats: &SendTransactionServiceStats, + ) { + let mut measure = Measure::start("send-us"); + let conn = self.connection_cache.get_connection(peer); + let result = conn.send_data_batch_async(wire_transactions); + + if let Err(err) = result { + warn!( + "Failed to send transaction transaction to {}: {:?}", + self.tpu_address, err + ); + stats.send_failure_count.fetch_add(1, Ordering::Relaxed); + } + + measure.stop(); + stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed); + stats.send_attempt_count.fetch_add(1, Ordering::Relaxed); + } +} + +impl TransactionClient for ConnectionCacheClient +where + T: TpuInfo + std::marker::Send + 'static, +{ + fn send_transactions_in_batch( + &self, + wire_transactions: Vec>, + stats: &SendTransactionServiceStats, + ) { + // Processing the transactions in batch + let mut addresses = self + .tpu_peers + .as_ref() + .map(|addrs| addrs.iter().collect::>()) + .unwrap_or_default(); + let mut leader_info_provider = self.leader_info_provider.lock().unwrap(); + let leader_info = leader_info_provider.get_leader_info(); + let leader_addresses = self.get_tpu_addresses_with_slots(leader_info); + addresses.extend(leader_addresses); + + for address in &addresses { + self.send_transactions(address, wire_transactions.clone(), stats); + } + } +} + +/// The leader info refresh rate. +pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000; + +/// A struct responsible for holding up-to-date leader information +/// used for sending transactions. +pub(crate) struct CurrentLeaderInfo +where + T: TpuInfo + std::marker::Send + 'static, +{ + /// The last time the leader info was refreshed + last_leader_refresh: Option, + + /// The leader info + leader_info: Option, + + /// How often to refresh the leader info + refresh_rate: Duration, +} + +impl CurrentLeaderInfo +where + T: TpuInfo + std::marker::Send + 'static, +{ + /// Get the leader info, refresh if expired + pub fn get_leader_info(&mut self) -> Option<&T> { + if let Some(leader_info) = self.leader_info.as_mut() { + let now = Instant::now(); + let need_refresh = self + .last_leader_refresh + .map(|last| now.duration_since(last) >= self.refresh_rate) + .unwrap_or(true); + + if need_refresh { + leader_info.refresh_recent_peers(); + self.last_leader_refresh = Some(now); + } + } + self.leader_info.as_ref() + } + + pub fn new(leader_info: Option) -> Self { + Self { + last_leader_refresh: None, + leader_info, + refresh_rate: Duration::from_millis(LEADER_INFO_REFRESH_RATE_MS), + } + } +} From d2445601a79569c1c9e7a034570401d65c4d0e0d Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 11:54:33 +0100 Subject: [PATCH 02/17] use new ConnectionClientClient code --- Cargo.lock | 1 + banks-server/src/banks_server.rs | 2 +- rpc/src/rpc.rs | 2 +- rpc/src/rpc_service.rs | 2 +- send-transaction-service/Cargo.toml | 1 + .../src/send_transaction_service.rs | 425 +++++++----------- .../src/transaction_client.rs | 18 +- 7 files changed, 180 insertions(+), 271 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 37a8bd785cf4b3..3550b459295262 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8150,6 +8150,7 @@ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ "crossbeam-channel", + "itertools 0.12.1", "log", "solana-client", "solana-connection-cache", diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index f2d2d10da85abb..4ff7368a961d49 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -458,7 +458,7 @@ pub async fn start_tcp_server( &bank_forks, None, receiver, - &connection_cache, + connection_cache.clone(), 5_000, 0, exit.clone(), diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 40bd47f71c0e16..2fbf66ec6988b9 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -377,7 +377,7 @@ impl JsonRpcRequestProcessor { &bank_forks, None, receiver, - &connection_cache, + connection_cache, 1000, 1, exit.clone(), diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index fed748d709472b..f388433d611923 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -479,7 +479,7 @@ impl JsonRpcService { &bank_forks, leader_info, receiver, - &connection_cache, + connection_cache, send_transaction_service_config, exit, )); diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml index a69c366a358fdc..07ad3f5a5b886c 100644 --- a/send-transaction-service/Cargo.toml +++ b/send-transaction-service/Cargo.toml @@ -11,6 +11,7 @@ edition = { workspace = true } [dependencies] crossbeam-channel = { workspace = true } +itertools = { workspace = true } log = { workspace = true } solana-client = { workspace = true } solana-connection-cache = { workspace = true } diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 208fca879155dc..0755cfb968b7ef 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -4,17 +4,15 @@ use { SendTransactionServiceStats, SendTransactionServiceStatsReport, }, tpu_info::TpuInfo, - transaction_client::CurrentLeaderInfo, + transaction_client::{ConnectionCacheClient, TransactionClient}, }, crossbeam_channel::{Receiver, RecvTimeoutError}, + itertools::Itertools, log::*, - solana_client::connection_cache::{ConnectionCache, Protocol}, - solana_connection_cache::client_connection::ClientConnection as TpuConnection, - solana_measure::measure::Measure, + solana_client::connection_cache::ConnectionCache, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ - clock::Slot, hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, - signature::Signature, transport::TransportError, + hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature, }, std::{ collections::{ @@ -147,7 +145,7 @@ impl SendTransactionService { bank_forks: &Arc>, leader_info: Option, receiver: Receiver, - connection_cache: &Arc, + connection_cache: Arc, retry_rate_ms: u64, leader_forward_count: u64, exit: Arc, @@ -173,7 +171,7 @@ impl SendTransactionService { bank_forks: &Arc>, leader_info: Option, receiver: Receiver, - connection_cache: &Arc, + connection_cache: Arc, config: Config, exit: Arc, ) -> Self { @@ -181,26 +179,33 @@ impl SendTransactionService { let retry_transactions = Arc::new(Mutex::new(HashMap::new())); - let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(leader_info))); + let client = ConnectionCacheClient::new( + connection_cache, + tpu_address, + config.tpu_peers, + leader_info, + config.leader_forward_count, + ); let receive_txn_thread = Self::receive_txn_thread( - tpu_address, receiver, - leader_info_provider.clone(), - connection_cache.clone(), - config.clone(), + client.clone(), retry_transactions.clone(), stats_report.clone(), + config.batch_send_rate_ms, + config.batch_size, + config.retry_pool_max_size, exit.clone(), ); let retry_thread = Self::retry_thread( - tpu_address, bank_forks.clone(), - leader_info_provider, - connection_cache.clone(), - config, + client, retry_transactions, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, stats_report, exit.clone(), ); @@ -213,26 +218,23 @@ impl SendTransactionService { /// Thread responsible for receiving transactions from RPC clients. fn receive_txn_thread( - tpu_address: SocketAddr, receiver: Receiver, - leader_info_provider: Arc>>, - connection_cache: Arc, - config: Config, + client: ConnectionCacheClient, retry_transactions: Arc>>, stats_report: Arc, + batch_send_rate_ms: u64, + batch_size: usize, + retry_pool_max_size: usize, exit: Arc, ) -> JoinHandle<()> { let mut last_batch_sent = Instant::now(); let mut transactions = HashMap::new(); - info!( - "Starting send-transaction-service::receive_txn_thread with config {:?}", - config - ); + info!("Starting send-transaction-service::receive_txn_thread with config.",); Builder::new() .name("solStxReceive".to_string()) .spawn(move || loop { - let recv_timeout_ms = config.batch_send_rate_ms; + let recv_timeout_ms = batch_send_rate_ms; let stats = &stats_report.stats; let recv_result = receiver.recv_timeout(Duration::from_millis(recv_timeout_ms)); if exit.load(Ordering::Relaxed) { @@ -268,20 +270,17 @@ impl SendTransactionService { } if (!transactions.is_empty() - && last_batch_sent.elapsed().as_millis() as u64 >= config.batch_send_rate_ms) - || transactions.len() >= config.batch_size + && last_batch_sent.elapsed().as_millis() as u64 >= batch_send_rate_ms) + || transactions.len() >= batch_size { stats .sent_transactions .fetch_add(transactions.len() as u64, Ordering::Relaxed); - Self::send_transactions_in_batch( - &tpu_address, - &transactions, - leader_info_provider.lock().unwrap().get_leader_info(), - &connection_cache, - &config, - stats, - ); + let wire_transactions = transactions + .values() + .map(|transaction_info| transaction_info.wire_transaction.clone()) + .collect::>>(); + client.send_transactions_in_batch(wire_transactions, stats); let last_sent_time = Instant::now(); { // take a lock of retry_transactions and move the batch to the retry set. @@ -292,7 +291,7 @@ impl SendTransactionService { let retry_len = retry_transactions.len(); let entry = retry_transactions.entry(signature); if let Entry::Vacant(_) = entry { - if retry_len >= config.retry_pool_max_size { + if retry_len >= retry_pool_max_size { break; } else { transaction_info.last_sent_time = Some(last_sent_time); @@ -319,23 +318,21 @@ impl SendTransactionService { /// Thread responsible for retrying transactions fn retry_thread( - tpu_address: SocketAddr, bank_forks: Arc>, - leader_info_provider: Arc>>, - connection_cache: Arc, - config: Config, + client: ConnectionCacheClient, retry_transactions: Arc>>, + retry_rate_ms: u64, + service_max_retries: usize, + default_max_retries: Option, + batch_size: usize, stats_report: Arc, exit: Arc, ) -> JoinHandle<()> { - info!( - "Starting send-transaction-service::retry_thread with config {:?}", - config - ); + info!("Starting send-transaction-service::retry_thread with config."); Builder::new() .name("solStxRetry".to_string()) .spawn(move || loop { - let retry_interval_ms = config.retry_rate_ms; + let retry_interval_ms = retry_rate_ms; let stats = &stats_report.stats; sleep(Duration::from_millis( MAX_RETRY_SLEEP_MS.min(retry_interval_ms), @@ -356,11 +353,12 @@ impl SendTransactionService { let _result = Self::process_transactions( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + retry_rate_ms, + service_max_retries, + default_max_retries, + batch_size, stats, ); stats_report.report(); @@ -369,60 +367,22 @@ impl SendTransactionService { .unwrap() } - /// Process transactions in batch. - fn send_transactions_in_batch( - tpu_address: &SocketAddr, - transactions: &HashMap, - leader_info: Option<&T>, - connection_cache: &Arc, - config: &Config, - stats: &SendTransactionServiceStats, - ) { - // Processing the transactions in batch - let mut addresses = config - .tpu_peers - .as_ref() - .map(|addrs| addrs.iter().map(|a| (a, 0)).collect::>()) - .unwrap_or_default(); - let leader_addresses = Self::get_tpu_addresses_with_slots( - tpu_address, - leader_info, - config, - connection_cache.protocol(), - ); - addresses.extend(leader_addresses); - - let wire_transactions = transactions - .iter() - .map(|(_, transaction_info)| { - debug!( - "Sending transacation {} to (address, slot): {:?}", - transaction_info.signature, addresses, - ); - transaction_info.wire_transaction.as_ref() - }) - .collect::>(); - - for (address, _) in &addresses { - Self::send_transactions(address, &wire_transactions, connection_cache, stats); - } - } - /// Retry transactions sent before. fn process_transactions( working_bank: &Bank, root_bank: &Bank, - tpu_address: &SocketAddr, transactions: &mut HashMap, - leader_info_provider: &Arc>>, - connection_cache: &Arc, - config: &Config, + client: &ConnectionCacheClient, + retry_rate_ms: u64, + service_max_retries: usize, + default_max_retries: Option, + batch_size: usize, stats: &SendTransactionServiceStats, ) -> ProcessTransactionsResult { let mut result = ProcessTransactionsResult::default(); let mut batched_transactions = HashSet::new(); - let retry_rate = Duration::from_millis(config.retry_rate_ms); + let retry_rate = Duration::from_millis(retry_rate_ms); transactions.retain(|signature, transaction_info| { if transaction_info.durable_nonce_info.is_some() { @@ -460,8 +420,8 @@ impl SendTransactionService { let max_retries = transaction_info .max_retries - .or(config.default_max_retries) - .map(|max_retries| max_retries.min(config.service_max_retries)); + .or(default_max_retries) + .map(|max_retries| max_retries.min(service_max_retries)); if let Some(max_retries) = max_retries { if transaction_info.retries >= max_retries { @@ -516,114 +476,17 @@ impl SendTransactionService { let wire_transactions = transactions .iter() .filter(|(signature, _)| batched_transactions.contains(signature)) - .map(|(_, transaction_info)| transaction_info.wire_transaction.as_ref()) - .collect::>(); - - let iter = wire_transactions.chunks(config.batch_size); - for chunk in iter { - let mut addresses = config - .tpu_peers - .as_ref() - .map(|addrs| addrs.iter().collect::>()) - .unwrap_or_default(); - let mut leader_info_provider = leader_info_provider.lock().unwrap(); - let leader_info = leader_info_provider.get_leader_info(); - let leader_addresses = Self::get_tpu_addresses( - tpu_address, - leader_info, - config, - connection_cache.protocol(), - ); - addresses.extend(leader_addresses); - - for address in &addresses { - Self::send_transactions(address, chunk, connection_cache, stats); - } + .map(|(_, transaction_info)| transaction_info.wire_transaction.clone()); + + let iter = wire_transactions.chunks(batch_size); + for chunk in &iter { + let chunk = chunk.collect(); + client.send_transactions_in_batch(chunk, stats); } } result } - fn send_transaction( - tpu_address: &SocketAddr, - wire_transaction: &[u8], - connection_cache: &Arc, - ) -> Result<(), TransportError> { - let conn = connection_cache.get_connection(tpu_address); - conn.send_data_async(wire_transaction.to_vec()) - } - - fn send_transactions_with_metrics( - tpu_address: &SocketAddr, - wire_transactions: &[&[u8]], - connection_cache: &Arc, - ) -> Result<(), TransportError> { - let wire_transactions = wire_transactions.iter().map(|t| t.to_vec()).collect(); - let conn = connection_cache.get_connection(tpu_address); - conn.send_data_batch_async(wire_transactions) - } - - fn send_transactions( - tpu_address: &SocketAddr, - wire_transactions: &[&[u8]], - connection_cache: &Arc, - stats: &SendTransactionServiceStats, - ) { - let mut measure = Measure::start("send-us"); - let result = if wire_transactions.len() == 1 { - Self::send_transaction(tpu_address, wire_transactions[0], connection_cache) - } else { - Self::send_transactions_with_metrics(tpu_address, wire_transactions, connection_cache) - }; - - if let Err(err) = result { - warn!( - "Failed to send transaction transaction to {}: {:?}", - tpu_address, err - ); - stats.send_failure_count.fetch_add(1, Ordering::Relaxed); - } - - measure.stop(); - stats.send_us.fetch_add(measure.as_us(), Ordering::Relaxed); - stats.send_attempt_count.fetch_add(1, Ordering::Relaxed); - } - - fn get_tpu_addresses<'a, T: TpuInfo>( - tpu_address: &'a SocketAddr, - leader_info: Option<&'a T>, - config: &'a Config, - protocol: Protocol, - ) -> Vec<&'a SocketAddr> { - let addresses = leader_info - .as_ref() - .map(|leader_info| leader_info.get_leader_tpus(config.leader_forward_count, protocol)); - addresses - .map(|address_list| { - if address_list.is_empty() { - vec![tpu_address] - } else { - address_list - } - }) - .unwrap_or_else(|| vec![tpu_address]) - } - - fn get_tpu_addresses_with_slots<'a, T: TpuInfo>( - tpu_address: &'a SocketAddr, - leader_info: Option<&'a T>, - config: &'a Config, - protocol: Protocol, - ) -> Vec<(&'a SocketAddr, Slot)> { - leader_info - .as_ref() - .map(|leader_info| { - leader_info.get_leader_tpus_with_slots(config.leader_forward_count, protocol) - }) - .filter(|addresses| !addresses.is_empty()) - .unwrap_or_else(|| vec![(tpu_address, 0)]) - } - pub fn join(self) -> thread::Result<()> { self.receive_txn_thread.join()?; self.exit.store(true, Ordering::Relaxed); @@ -661,7 +524,7 @@ mod test { &bank_forks, None, receiver, - &connection_cache, + connection_cache, 1000, 1, Arc::new(AtomicBool::new(false)), @@ -695,7 +558,7 @@ mod test { &bank_forks, None, receiver, - &connection_cache, + connection_cache, 1000, 1, exit.clone(), @@ -713,6 +576,22 @@ mod test { } } + fn create_client( + tpu_peers: Option>, + leader_forward_count: u64, + ) -> ConnectionCacheClient { + let tpu_address = "127.0.0.1:0".parse().unwrap(); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + ConnectionCacheClient::new( + connection_cache, + tpu_address, + tpu_peers, + None, + leader_forward_count, + ) + } + #[test] fn process_transactions() { solana_logger::setup(); @@ -720,7 +599,6 @@ mod test { let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let tpu_address = "127.0.0.1:0".parse().unwrap(); let config = Config { leader_forward_count: 1, ..Config::default() @@ -767,7 +645,6 @@ mod test { let mut transactions = HashMap::new(); info!("Expired transactions are dropped..."); - let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); let stats = SendTransactionServiceStats::default(); transactions.insert( Signature::default(), @@ -780,15 +657,17 @@ mod test { Some(Instant::now()), ), ); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + let client = create_client(config.tpu_peers, config.leader_forward_count); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -815,11 +694,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -846,11 +726,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -877,11 +758,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -910,11 +792,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -953,11 +836,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -972,11 +856,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -996,7 +881,6 @@ mod test { let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let tpu_address = "127.0.0.1:0".parse().unwrap(); let config = Config { leader_forward_count: 1, ..Config::default() @@ -1064,17 +948,17 @@ mod test { Some(Instant::now()), ), ); - let leader_info_provider = Arc::new(Mutex::new(CurrentLeaderInfo::new(None))); let stats = SendTransactionServiceStats::default(); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + let client = create_client(config.tpu_peers, config.leader_forward_count); let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1100,11 +984,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1132,11 +1017,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1162,11 +1048,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1193,11 +1080,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert!(transactions.is_empty()); @@ -1224,11 +1112,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -1257,11 +1146,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 1); @@ -1287,11 +1177,12 @@ mod test { let result = SendTransactionService::process_transactions::( &working_bank, &root_bank, - &tpu_address, &mut transactions, - &leader_info_provider, - &connection_cache, - &config, + &client, + config.retry_rate_ms, + config.service_max_retries, + config.default_max_retries, + config.batch_size, &stats, ); assert_eq!(transactions.len(), 0); diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index 19c20c26d350c2..c38007d64ec67a 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -27,11 +27,27 @@ pub struct ConnectionCacheClient { leader_forward_count: u64, } +// Manual implementation of Clone without requiring T to be Clone +impl Clone for ConnectionCacheClient +where + T: TpuInfo + std::marker::Send + 'static, +{ + fn clone(&self) -> Self { + Self { + connection_cache: Arc::clone(&self.connection_cache), + tpu_address: self.tpu_address, + tpu_peers: self.tpu_peers.clone(), + leader_info_provider: Arc::clone(&self.leader_info_provider), + leader_forward_count: self.leader_forward_count, + } + } +} + impl ConnectionCacheClient where T: TpuInfo + std::marker::Send + 'static, { - fn new( + pub fn new( connection_cache: Arc, tpu_address: SocketAddr, tpu_peers: Option>, From b8b1df6fd0e5aeec5996cd1fdedd60f19687645a Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 12:19:32 +0100 Subject: [PATCH 03/17] Update Cargo.lock --- programs/sbf/Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 89d56b13fb096f..238640c15bec24 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6889,6 +6889,7 @@ name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ "crossbeam-channel", + "itertools 0.12.1", "log", "solana-client", "solana-connection-cache", From 4aeb142ccf22d9bb717fb281c30f5e9ea4d25df3 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 12:23:41 +0100 Subject: [PATCH 04/17] Add tpu-client-next to the root Cargo.toml --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 52d36f3b238ffb..a51cc1309bc719 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -521,6 +521,7 @@ solana-test-validator = { path = "test-validator", version = "=2.2.0" } solana-thin-client = { path = "thin-client", version = "=2.2.0" } solana-transaction-error = { path = "sdk/transaction-error", version = "=2.2.0" } solana-tpu-client = { path = "tpu-client", version = "=2.2.0", default-features = false } +solana-tpu-client-next = { path = "tpu-client-next", version = "=2.2.0" } solana-transaction-status = { path = "transaction-status", version = "=2.2.0" } solana-transaction-status-client-types = { path = "transaction-status-client-types", version = "=2.2.0" } solana-transaction-metrics-tracker = { path = "transaction-metrics-tracker", version = "=2.2.0" } From 17f66fd7a3cf8b863521555a7641ff8ec780eed9 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 14:11:07 +0100 Subject: [PATCH 05/17] Change LeaderUpdater trait to accept mut self --- tpu-client-next/src/leader_updater.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tpu-client-next/src/leader_updater.rs b/tpu-client-next/src/leader_updater.rs index 5e07b9b0bfe612..63d0938da46d51 100644 --- a/tpu-client-next/src/leader_updater.rs +++ b/tpu-client-next/src/leader_updater.rs @@ -35,7 +35,7 @@ pub trait LeaderUpdater: Send { /// If the current leader estimation is incorrect and transactions are sent to /// only one estimated leader, there is a risk of losing all the transactions, /// depending on the forwarding policy. - fn next_leaders(&self, lookahead_slots: u64) -> Vec; + fn next_leaders(&mut self, lookahead_slots: u64) -> Vec; /// Stop [`LeaderUpdater`] and releases all associated resources. async fn stop(&mut self); @@ -98,7 +98,7 @@ struct LeaderUpdaterService { #[async_trait] impl LeaderUpdater for LeaderUpdaterService { - fn next_leaders(&self, lookahead_slots: u64) -> Vec { + fn next_leaders(&mut self, lookahead_slots: u64) -> Vec { self.leader_tpu_service.leader_tpu_sockets(lookahead_slots) } @@ -116,7 +116,7 @@ struct PinnedLeaderUpdater { #[async_trait] impl LeaderUpdater for PinnedLeaderUpdater { - fn next_leaders(&self, _lookahead_slots: u64) -> Vec { + fn next_leaders(&mut self, _lookahead_slots: u64) -> Vec { self.address.clone() } From 39cd11e0ca7c2b67e0df99c60ba088466c6bd71b Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 15:30:54 +0100 Subject: [PATCH 06/17] add spawn_tpu_client_send_txs --- send-transaction-service/Cargo.toml | 7 +- .../src/transaction_client.rs | 112 +++++++++++++++++- 2 files changed, 115 insertions(+), 4 deletions(-) diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml index 07ad3f5a5b886c..42ab90b664121b 100644 --- a/send-transaction-service/Cargo.toml +++ b/send-transaction-service/Cargo.toml @@ -10,6 +10,9 @@ license = { workspace = true } edition = { workspace = true } [dependencies] +# TODO(klykov): remove this dependency when trait for leader updater +# will be changed +async-trait = { workspace = true } crossbeam-channel = { workspace = true } itertools = { workspace = true } log = { workspace = true } @@ -19,7 +22,9 @@ solana-measure = { workspace = true } solana-metrics = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } -solana-tpu-client = { workspace = true } +solana-tpu-client-next = { workspace = true } +tokio = { workspace = true, features = ["full"] } +tokio-util = { workspace = true } [dev-dependencies] solana-logger = { workspace = true } diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index c38007d64ec67a..ce2e5aa6aece5d 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -1,14 +1,25 @@ use { crate::{send_transaction_service_stats::SendTransactionServiceStats, tpu_info::TpuInfo}, + async_trait::async_trait, log::warn, - solana_client::connection_cache::ConnectionCache, + solana_client::connection_cache::{ConnectionCache, Protocol}, solana_connection_cache::client_connection::ClientConnection as TpuConnection, solana_measure::measure::Measure, + solana_tpu_client_next::{ + connection_workers_scheduler::ConnectionWorkersSchedulerConfig, + leader_updater::LeaderUpdater, transaction_batch::TransactionBatch, + ConnectionWorkersScheduler, + }, std::{ - net::SocketAddr, - sync::{atomic::Ordering, Arc, Mutex}, + net::{Ipv4Addr, SocketAddr}, + sync::{atomic::Ordering, Arc, Mutex, RwLock}, time::{Duration, Instant}, }, + tokio::{ + runtime::Runtime, + sync::mpsc::{self, Sender}, + }, + tokio_util::sync::CancellationToken, }; pub trait TransactionClient { @@ -127,6 +138,101 @@ where } } +pub struct SendTransactionServiceLeaderUpdater { + leader_info_provider: CurrentLeaderInfo, + my_tpu_address: SocketAddr, + tpu_peers: Option>, +} + +//TODO(klykov): it is should not be async trait because we don't need here it to be stoppable +#[async_trait] +impl LeaderUpdater for SendTransactionServiceLeaderUpdater +where + T: TpuInfo + std::marker::Send + 'static, +{ + //TODO(klykov): So each call will lead to shit tons of allocations, I think + //we should recalculate only once a slot or something? + fn next_leaders(&mut self, lookahead_slots: u64) -> Vec { + // it is &mut because it is not a service! so it needs to update the state + // so i had to change the interface + let discovered_peers = self + .leader_info_provider + .get_leader_info() + .map(|leader_info| leader_info.get_leader_tpus(lookahead_slots / 4, Protocol::QUIC)) + .filter(|addresses| !addresses.is_empty()) + .unwrap_or_else(|| vec![&self.my_tpu_address]); + let mut all_peers = self.tpu_peers.clone().unwrap_or_default(); + all_peers.extend(discovered_peers.into_iter().cloned()); + all_peers + } + async fn stop(&mut self) {} +} + +struct TpuClientNextClient { + sender: Sender, +} + +impl TransactionClient for TpuClientNextClient { + fn send_transactions_in_batch( + &self, + wire_transactions: Vec>, + stats: &SendTransactionServiceStats, + ) { + let res = self + .sender + .try_send(TransactionBatch::new(wire_transactions)); + match res { + Ok(_) => { + stats.send_attempt_count.fetch_add(1, Ordering::Relaxed); + } + Err(_) => { + warn!("Failed to send transaction transaction, transaction chanel is full."); + stats.send_failure_count.fetch_add(1, Ordering::Relaxed); + } + } + } +} + +pub(crate) fn spawn_tpu_client_send_txs( + runtime: Runtime, + my_tpu_address: SocketAddr, + tpu_peers: Option>, + leader_info: Option, + leader_forward_count: u64, +) -> Sender +where + T: TpuInfo + std::marker::Send + 'static, +{ + let leader_info_provider = CurrentLeaderInfo::new(leader_info); + + let (transaction_sender, transaction_receiver) = mpsc::channel(16); // random number of now + let validator_identity = None; + runtime.spawn(async move { + let cancel = CancellationToken::new(); + let leader_updater = SendTransactionServiceLeaderUpdater { + leader_info_provider, + my_tpu_address, + tpu_peers, + }; + let config = ConnectionWorkersSchedulerConfig { + bind: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0), + stake_identity: validator_identity, //TODO In CC, do we send with identity? + num_connections: 1, + skip_check_transaction_age: true, + worker_channel_size: 2, + max_reconnect_attempts: 4, + lookahead_slots: leader_forward_count, + }; + let _scheduler = tokio::spawn(ConnectionWorkersScheduler::run( + config, + Box::new(leader_updater), + transaction_receiver, + cancel.clone(), + )); + }); + transaction_sender +} + /// The leader info refresh rate. pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000; From 99c2ef0ac4d81748a96a890c7c884364058b7cf2 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 16:29:33 +0100 Subject: [PATCH 07/17] Change the interface of SendTransactionService to accept client. It still uses ConnectionCacheClient everywhere --- banks-server/src/banks_server.rs | 12 +- rpc/src/rpc.rs | 31 ++-- rpc/src/rpc_service.rs | 18 +- .../src/send_transaction_service.rs | 155 +++++++----------- .../src/transaction_client.rs | 2 +- 5 files changed, 91 insertions(+), 127 deletions(-) diff --git a/banks-server/src/banks_server.rs b/banks-server/src/banks_server.rs index 4ff7368a961d49..5340d848b09ee6 100644 --- a/banks-server/src/banks_server.rs +++ b/banks-server/src/banks_server.rs @@ -28,6 +28,7 @@ use { solana_send_transaction_service::{ send_transaction_service::{SendTransactionService, TransactionInfo}, tpu_info::NullTpuInfo, + transaction_client::ConnectionCacheClient, }, std::{ io, @@ -453,17 +454,16 @@ pub async fn start_tcp_server( .map(move |chan| { let (sender, receiver) = unbounded(); - SendTransactionService::new::( + let client = ConnectionCacheClient::::new( + connection_cache.clone(), tpu_addr, - &bank_forks, None, - receiver, - connection_cache.clone(), - 5_000, + None, 0, - exit.clone(), ); + SendTransactionService::new(&bank_forks, receiver, client, 5_000, exit.clone()); + let server = BanksServer::new( bank_forks.clone(), block_commitment_cache.clone(), diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 2fbf66ec6988b9..27ee43033f0fac 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -83,6 +83,7 @@ use { solana_send_transaction_service::{ send_transaction_service::{SendTransactionService, TransactionInfo}, tpu_info::NullTpuInfo, + transaction_client::ConnectionCacheClient, }, solana_stake_program, solana_storage_bigtable::Error as StorageError, @@ -372,16 +373,14 @@ impl JsonRpcRequestProcessor { .tpu(connection_cache.protocol()) .unwrap(); let (sender, receiver) = unbounded(); - SendTransactionService::new::( + let client = ConnectionCacheClient::::new( + connection_cache.clone(), tpu_address, - &bank_forks, None, - receiver, - connection_cache, - 1000, + None, 1, - exit.clone(), ); + SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); let startup_verification_complete = Arc::clone(bank.get_startup_verification_complete()); @@ -6481,16 +6480,14 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - SendTransactionService::new::( + let client = ConnectionCacheClient::::new( + connection_cache.clone(), tpu_address, - &bank_forks, None, - receiver, - &connection_cache, - 1000, + None, 1, - exit, ); + SendTransactionService::new(&bank_forks, receiver, client, 1000, exit); let mut bad_transaction = system_transaction::transfer( &mint_keypair, @@ -6755,16 +6752,14 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - SendTransactionService::new::( + let client = ConnectionCacheClient::::new( + connection_cache.clone(), tpu_address, - &bank_forks, None, - receiver, - &connection_cache, - 1000, + None, 1, - exit, ); + SendTransactionService::new(&bank_forks, receiver, client, 1000, exit); assert_eq!( request_processor.get_block_commitment(0), RpcBlockCommitment { diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index f388433d611923..a5290befa33b4d 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -36,7 +36,10 @@ use { exit::Exit, genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH, hash::Hash, native_token::lamports_to_sol, }, - solana_send_transaction_service::send_transaction_service::{self, SendTransactionService}, + solana_send_transaction_service::{ + send_transaction_service::{self, SendTransactionService}, + transaction_client::ConnectionCacheClient, + }, solana_storage_bigtable::CredentialType, std::{ net::SocketAddr, @@ -474,15 +477,20 @@ impl JsonRpcService { let leader_info = poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder)); - let _send_transaction_service = Arc::new(SendTransactionService::new_with_config( + let client = ConnectionCacheClient::new( + connection_cache, tpu_address, - &bank_forks, + send_transaction_service_config.tpu_peers.clone(), //TODO(klykov): check if we can avoid cloning leader_info, + send_transaction_service_config.leader_forward_count, + ); + let _send_transaction_service = SendTransactionService::new_with_config( + &bank_forks, receiver, - connection_cache, + client, send_transaction_service_config, exit, - )); + ); #[cfg(test)] let test_request_processor = request_processor.clone(); diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 0755cfb968b7ef..4ebd507b3697bd 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -3,13 +3,11 @@ use { send_transaction_service_stats::{ SendTransactionServiceStats, SendTransactionServiceStatsReport, }, - tpu_info::TpuInfo, - transaction_client::{ConnectionCacheClient, TransactionClient}, + transaction_client::TransactionClient, }, crossbeam_channel::{Receiver, RecvTimeoutError}, itertools::Itertools, log::*, - solana_client::connection_cache::ConnectionCache, solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::{ hash::Hash, nonce_account, pubkey::Pubkey, saturating_add_assign, signature::Signature, @@ -140,38 +138,24 @@ impl Default for Config { pub const MAX_RETRY_SLEEP_MS: u64 = 1000; impl SendTransactionService { - pub fn new( - tpu_address: SocketAddr, + pub fn new( bank_forks: &Arc>, - leader_info: Option, receiver: Receiver, - connection_cache: Arc, + client: Client, retry_rate_ms: u64, - leader_forward_count: u64, exit: Arc, ) -> Self { let config = Config { retry_rate_ms, - leader_forward_count, ..Config::default() }; - Self::new_with_config( - tpu_address, - bank_forks, - leader_info, - receiver, - connection_cache, - config, - exit, - ) + Self::new_with_config::(bank_forks, receiver, client, config, exit) } - pub fn new_with_config( - tpu_address: SocketAddr, + pub fn new_with_config( bank_forks: &Arc>, - leader_info: Option, receiver: Receiver, - connection_cache: Arc, + client: Client, config: Config, exit: Arc, ) -> Self { @@ -179,14 +163,6 @@ impl SendTransactionService { let retry_transactions = Arc::new(Mutex::new(HashMap::new())); - let client = ConnectionCacheClient::new( - connection_cache, - tpu_address, - config.tpu_peers, - leader_info, - config.leader_forward_count, - ); - let receive_txn_thread = Self::receive_txn_thread( receiver, client.clone(), @@ -217,9 +193,9 @@ impl SendTransactionService { } /// Thread responsible for receiving transactions from RPC clients. - fn receive_txn_thread( + fn receive_txn_thread( receiver: Receiver, - client: ConnectionCacheClient, + client: Client, retry_transactions: Arc>>, stats_report: Arc, batch_send_rate_ms: u64, @@ -317,9 +293,9 @@ impl SendTransactionService { } /// Thread responsible for retrying transactions - fn retry_thread( + fn retry_thread( bank_forks: Arc>, - client: ConnectionCacheClient, + client: Client, retry_transactions: Arc>>, retry_rate_ms: u64, service_max_retries: usize, @@ -368,11 +344,11 @@ impl SendTransactionService { } /// Retry transactions sent before. - fn process_transactions( + fn process_transactions( working_bank: &Bank, root_bank: &Bank, transactions: &mut HashMap, - client: &ConnectionCacheClient, + client: &Client, retry_rate_ms: u64, service_max_retries: usize, default_max_retries: Option, @@ -498,8 +474,9 @@ impl SendTransactionService { mod test { use { super::*, - crate::tpu_info::NullTpuInfo, + crate::{tpu_info::NullTpuInfo, transaction_client::ConnectionCacheClient}, crossbeam_channel::{bounded, unbounded}, + solana_client::connection_cache::ConnectionCache, solana_sdk::{ account::AccountSharedData, genesis_config::create_genesis_config, @@ -511,22 +488,34 @@ mod test { std::ops::Sub, }; + fn create_client( + tpu_peers: Option>, + leader_forward_count: u64, + ) -> ConnectionCacheClient { + let tpu_address = "127.0.0.1:0".parse().unwrap(); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + ConnectionCacheClient::new( + connection_cache, + tpu_address, + tpu_peers, + None, + leader_forward_count, + ) + } + #[test] fn service_exit() { - let tpu_address = "127.0.0.1:0".parse().unwrap(); let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); let (sender, receiver) = unbounded(); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let send_transaction_service = SendTransactionService::new::( - tpu_address, + let client = create_client(None, 1); + let send_transaction_service = SendTransactionService::new( &bank_forks, - None, receiver, - connection_cache, + client, 1000, - 1, Arc::new(AtomicBool::new(false)), ); @@ -536,7 +525,6 @@ mod test { #[test] fn validator_exit() { - let tpu_address = "127.0.0.1:0".parse().unwrap(); let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); let (sender, receiver) = bounded(0); @@ -552,17 +540,9 @@ mod test { }; let exit = Arc::new(AtomicBool::new(false)); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let _send_transaction_service = SendTransactionService::new::( - tpu_address, - &bank_forks, - None, - receiver, - connection_cache, - 1000, - 1, - exit.clone(), - ); + let client = create_client(None, 1); + let _send_transaction_service = + SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); sender.send(dummy_tx_info()).unwrap(); @@ -576,22 +556,6 @@ mod test { } } - fn create_client( - tpu_peers: Option>, - leader_forward_count: u64, - ) -> ConnectionCacheClient { - let tpu_address = "127.0.0.1:0".parse().unwrap(); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - - ConnectionCacheClient::new( - connection_cache, - tpu_address, - tpu_peers, - None, - leader_forward_count, - ) - } - #[test] fn process_transactions() { solana_logger::setup(); @@ -599,10 +563,9 @@ mod test { let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let config = Config { - leader_forward_count: 1, - ..Config::default() - }; + + let leader_forward_count = 1; + let config = Config::default(); let root_bank = Bank::new_from_parent( bank_forks.read().unwrap().working_bank(), @@ -658,8 +621,8 @@ mod test { ), ); - let client = create_client(config.tpu_peers, config.leader_forward_count); - let result = SendTransactionService::process_transactions::( + let client = create_client(config.tpu_peers, leader_forward_count); + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -691,7 +654,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -723,7 +686,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -755,7 +718,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -789,7 +752,7 @@ mod test { ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -833,7 +796,7 @@ mod test { Some(Instant::now().sub(Duration::from_millis(4000))), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -853,7 +816,7 @@ mod test { ..ProcessTransactionsResult::default() } ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -881,10 +844,8 @@ mod test { let (mut genesis_config, mint_keypair) = create_genesis_config(4); genesis_config.fee_rate_governor = solana_sdk::fee_calculator::FeeRateGovernor::new(0, 0); let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let config = Config { - leader_forward_count: 1, - ..Config::default() - }; + let leader_forward_count = 1; + let config = Config::default(); let root_bank = Bank::new_from_parent( bank_forks.read().unwrap().working_bank(), @@ -949,8 +910,8 @@ mod test { ), ); let stats = SendTransactionServiceStats::default(); - let client = create_client(config.tpu_peers, config.leader_forward_count); - let result = SendTransactionService::process_transactions::( + let client = create_client(config.tpu_peers, leader_forward_count); + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -981,7 +942,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1014,7 +975,7 @@ mod test { Some(Instant::now().sub(Duration::from_millis(4000))), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1045,7 +1006,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1077,7 +1038,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1109,7 +1070,7 @@ mod test { Some(Instant::now()), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1143,7 +1104,7 @@ mod test { Some(Instant::now().sub(Duration::from_millis(4000))), ), ); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, @@ -1174,7 +1135,7 @@ mod test { let nonce_account = AccountSharedData::new_data(43, &new_nonce_state, &system_program::id()).unwrap(); working_bank.store_account(&nonce_address, &nonce_account); - let result = SendTransactionService::process_transactions::( + let result = SendTransactionService::process_transactions( &working_bank, &root_bank, &mut transactions, diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index ce2e5aa6aece5d..17b7e61b1b893b 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -12,7 +12,7 @@ use { }, std::{ net::{Ipv4Addr, SocketAddr}, - sync::{atomic::Ordering, Arc, Mutex, RwLock}, + sync::{atomic::Ordering, Arc, Mutex}, time::{Duration, Instant}, }, tokio::{ From caeae5f2f647d68078f4619312ac148410998514 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Fri, 1 Nov 2024 16:30:15 +0100 Subject: [PATCH 08/17] Update Cargo.lock --- Cargo.lock | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 3550b459295262..3ae2e23ae2b7c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8149,6 +8149,7 @@ checksum = "468aa43b7edb1f9b7b7b686d5c3aeb6630dc1708e86e31343499dd5c4d775183" name = "solana-send-transaction-service" version = "2.2.0" dependencies = [ + "async-trait", "crossbeam-channel", "itertools 0.12.1", "log", @@ -8159,7 +8160,9 @@ dependencies = [ "solana-metrics", "solana-runtime", "solana-sdk", - "solana-tpu-client", + "solana-tpu-client-next", + "tokio", + "tokio-util 0.7.12", ] [[package]] From 02bbf6514663b1f3993f805db9a87bf2740ac3d9 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Sat, 2 Nov 2024 16:02:42 +0100 Subject: [PATCH 09/17] Make RUNTIME from quic-client public --- quic-client/src/quic_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/quic-client/src/quic_client.rs b/quic-client/src/quic_client.rs index 8c8e8e5338993f..45d379303f0d25 100644 --- a/quic-client/src/quic_client.rs +++ b/quic-client/src/quic_client.rs @@ -68,7 +68,7 @@ impl AsyncTaskSemaphore { lazy_static! { static ref ASYNC_TASK_SEMAPHORE: AsyncTaskSemaphore = AsyncTaskSemaphore::new(MAX_OUTSTANDING_TASK); - static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() + pub static ref RUNTIME: Runtime = tokio::runtime::Builder::new_multi_thread() .thread_name("solQuicClientRt") .enable_all() .build() From 7b672b07e576e9e69e128fcace19073814ba29fb Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Sat, 2 Nov 2024 16:03:27 +0100 Subject: [PATCH 10/17] Parametrize SendTransactionService tests with client --- send-transaction-service/Cargo.toml | 1 + .../src/send_transaction_service.rs | 127 ++++++++++++++---- .../src/transaction_client.rs | 100 +++++++++----- 3 files changed, 165 insertions(+), 63 deletions(-) diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml index 42ab90b664121b..ba6a52eeda9209 100644 --- a/send-transaction-service/Cargo.toml +++ b/send-transaction-service/Cargo.toml @@ -29,6 +29,7 @@ tokio-util = { workspace = true } [dev-dependencies] solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } +solana-quic-client = { workspace = true } [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 4ebd507b3697bd..9a17ba98f7edb7 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -474,9 +474,15 @@ impl SendTransactionService { mod test { use { super::*, - crate::{tpu_info::NullTpuInfo, transaction_client::ConnectionCacheClient}, + crate::{ + tpu_info::NullTpuInfo, + transaction_client::{ + spawn_tpu_client_send_txs, Cancelable, ConnectionCacheClient, TpuClientNextClient, + }, + }, crossbeam_channel::{bounded, unbounded}, solana_client::connection_cache::ConnectionCache, + solana_quic_client::quic_client::RUNTIME, solana_sdk::{ account::AccountSharedData, genesis_config::create_genesis_config, @@ -488,43 +494,81 @@ mod test { std::ops::Sub, }; - fn create_client( - tpu_peers: Option>, - leader_forward_count: u64, - ) -> ConnectionCacheClient { - let tpu_address = "127.0.0.1:0".parse().unwrap(); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - - ConnectionCacheClient::new( - connection_cache, - tpu_address, - tpu_peers, - None, - leader_forward_count, - ) + trait CreateClient: TransactionClient { + fn create_client(tpu_peers: Option>, leader_forward_count: u64) -> Self; } - #[test] - fn service_exit() { + impl CreateClient for ConnectionCacheClient { + fn create_client(tpu_peers: Option>, leader_forward_count: u64) -> Self { + let tpu_address = "127.0.0.1:0".parse().unwrap(); + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + ConnectionCacheClient::new( + connection_cache, + tpu_address, + tpu_peers, + None, + leader_forward_count, + ) + } + } + + impl CreateClient for TpuClientNextClient { + fn create_client(tpu_peers: Option>, leader_forward_count: u64) -> Self { + let runtime = &*RUNTIME; + let tpu_address = "127.0.0.1:0".parse().unwrap(); + + spawn_tpu_client_send_txs::( + runtime, + tpu_address, + tpu_peers, + None, + leader_forward_count, + ) + } + } + + // Define type alias to simplify definition of test functions. + trait ClientWithCreator: + CreateClient + TransactionClient + Cancelable + Send + Clone + 'static + { + } + impl ClientWithCreator for T where + T: CreateClient + TransactionClient + Cancelable + Send + Clone + 'static + { + } + + fn service_exit() { let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); let (sender, receiver) = unbounded(); - let client = create_client(None, 1); + let client = C::create_client(None, 1); + let send_transaction_service = SendTransactionService::new( &bank_forks, receiver, - client, + client.clone(), 1000, Arc::new(AtomicBool::new(false)), ); drop(sender); send_transaction_service.join().unwrap(); + client.cancel(); } #[test] - fn validator_exit() { + fn service_exit_with_connection_cache() { + service_exit::>(); + } + + #[test] + fn service_exit_with_tpu_client_next() { + service_exit::(); + } + + fn validator_exit() { let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); let (sender, receiver) = bounded(0); @@ -540,14 +584,15 @@ mod test { }; let exit = Arc::new(AtomicBool::new(false)); - let client = create_client(None, 1); + let client = C::create_client(None, 1); let _send_transaction_service = - SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); + SendTransactionService::new(&bank_forks, receiver, client.clone(), 1000, exit.clone()); sender.send(dummy_tx_info()).unwrap(); thread::spawn(move || { exit.store(true, Ordering::Relaxed); + client.cancel(); }); let mut option = Ok(()); @@ -557,7 +602,16 @@ mod test { } #[test] - fn process_transactions() { + fn validator_exit_with_connection_cache() { + validator_exit::>(); + } + + #[test] + fn validator_exit_with_tpu_client_next() { + validator_exit::(); + } + + fn process_transactions() { solana_logger::setup(); let (mut genesis_config, mint_keypair) = create_genesis_config(4); @@ -621,7 +675,7 @@ mod test { ), ); - let client = create_client(config.tpu_peers, leader_forward_count); + let client = C::create_client(config.tpu_peers, leader_forward_count); let result = SendTransactionService::process_transactions( &working_bank, &root_bank, @@ -835,10 +889,20 @@ mod test { ..ProcessTransactionsResult::default() } ); + client.cancel(); } #[test] - fn test_retry_durable_nonce_transactions() { + fn process_transactions_with_connection_cache() { + process_transactions::>(); + } + + #[test] + fn process_transactions_with_tpu_client_next() { + process_transactions::(); + } + + fn retry_durable_nonce_transactions() { solana_logger::setup(); let (mut genesis_config, mint_keypair) = create_genesis_config(4); @@ -910,7 +974,7 @@ mod test { ), ); let stats = SendTransactionServiceStats::default(); - let client = create_client(config.tpu_peers, leader_forward_count); + let client = C::create_client(config.tpu_peers, leader_forward_count); let result = SendTransactionService::process_transactions( &working_bank, &root_bank, @@ -1154,5 +1218,16 @@ mod test { ..ProcessTransactionsResult::default() } ); + client.cancel(); + } + + #[test] + fn retry_durable_nonce_transactions_with_connection_cache() { + retry_durable_nonce_transactions::>(); + } + + #[test] + fn retry_durable_nonce_transactions_with_tpu_client_next() { + retry_durable_nonce_transactions::(); } } diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index 17b7e61b1b893b..55b5878ad5e2c8 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -22,6 +22,10 @@ use { tokio_util::sync::CancellationToken, }; +// Alias trait to shorten function definitions. +pub trait TpuInfoWithSendStatic: TpuInfo + std::marker::Send + 'static {} +impl TpuInfoWithSendStatic for T where T: TpuInfo + std::marker::Send + 'static {} + pub trait TransactionClient { fn send_transactions_in_batch( &self, @@ -30,7 +34,11 @@ pub trait TransactionClient { ); } -pub struct ConnectionCacheClient { +pub trait Cancelable { + fn cancel(&self); +} + +pub struct ConnectionCacheClient { connection_cache: Arc, tpu_address: SocketAddr, tpu_peers: Option>, @@ -38,10 +46,10 @@ pub struct ConnectionCacheClient { leader_forward_count: u64, } -// Manual implementation of Clone without requiring T to be Clone +// Manual implementation of Clone to avoid requiring T to be Clone impl Clone for ConnectionCacheClient where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { fn clone(&self) -> Self { Self { @@ -56,7 +64,7 @@ where impl ConnectionCacheClient where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { pub fn new( connection_cache: Arc, @@ -114,7 +122,7 @@ where impl TransactionClient for ConnectionCacheClient where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { fn send_transactions_in_batch( &self, @@ -138,7 +146,14 @@ where } } -pub struct SendTransactionServiceLeaderUpdater { +impl Cancelable for ConnectionCacheClient +where + T: TpuInfoWithSendStatic, +{ + fn cancel(&self) {} +} + +pub struct SendTransactionServiceLeaderUpdater { leader_info_provider: CurrentLeaderInfo, my_tpu_address: SocketAddr, tpu_peers: Option>, @@ -148,7 +163,7 @@ pub struct SendTransactionServiceLeaderUpdater LeaderUpdater for SendTransactionServiceLeaderUpdater where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { //TODO(klykov): So each call will lead to shit tons of allocations, I think //we should recalculate only once a slot or something? @@ -168,8 +183,16 @@ where async fn stop(&mut self) {} } -struct TpuClientNextClient { +#[derive(Clone)] +pub struct TpuClientNextClient { sender: Sender, + cancel: CancellationToken, +} + +impl Cancelable for TpuClientNextClient { + fn cancel(&self) { + self.cancel.cancel(); + } } impl TransactionClient for TpuClientNextClient { @@ -194,43 +217,46 @@ impl TransactionClient for TpuClientNextClient { } pub(crate) fn spawn_tpu_client_send_txs( - runtime: Runtime, + runtime: &Runtime, my_tpu_address: SocketAddr, tpu_peers: Option>, leader_info: Option, leader_forward_count: u64, -) -> Sender +) -> TpuClientNextClient where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { let leader_info_provider = CurrentLeaderInfo::new(leader_info); - let (transaction_sender, transaction_receiver) = mpsc::channel(16); // random number of now + let (sender, receiver) = mpsc::channel(16); // random number of now let validator_identity = None; - runtime.spawn(async move { - let cancel = CancellationToken::new(); - let leader_updater = SendTransactionServiceLeaderUpdater { - leader_info_provider, - my_tpu_address, - tpu_peers, - }; - let config = ConnectionWorkersSchedulerConfig { - bind: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0), - stake_identity: validator_identity, //TODO In CC, do we send with identity? - num_connections: 1, - skip_check_transaction_age: true, - worker_channel_size: 2, - max_reconnect_attempts: 4, - lookahead_slots: leader_forward_count, - }; - let _scheduler = tokio::spawn(ConnectionWorkersScheduler::run( - config, - Box::new(leader_updater), - transaction_receiver, - cancel.clone(), - )); + let cancel = CancellationToken::new(); + let _handle = runtime.spawn({ + let cancel = cancel.clone(); + async move { + let leader_updater = SendTransactionServiceLeaderUpdater { + leader_info_provider, + my_tpu_address, + tpu_peers, + }; + let config = ConnectionWorkersSchedulerConfig { + bind: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0), + stake_identity: validator_identity, //TODO In CC, do we send with identity? + num_connections: 1, + skip_check_transaction_age: true, + worker_channel_size: 2, + max_reconnect_attempts: 4, + lookahead_slots: leader_forward_count, + }; + let _scheduler = tokio::spawn(ConnectionWorkersScheduler::run( + config, + Box::new(leader_updater), + receiver, + cancel.clone(), + )); + } }); - transaction_sender + TpuClientNextClient { sender, cancel } } /// The leader info refresh rate. @@ -240,7 +266,7 @@ pub const LEADER_INFO_REFRESH_RATE_MS: u64 = 1000; /// used for sending transactions. pub(crate) struct CurrentLeaderInfo where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { /// The last time the leader info was refreshed last_leader_refresh: Option, @@ -254,7 +280,7 @@ where impl CurrentLeaderInfo where - T: TpuInfo + std::marker::Send + 'static, + T: TpuInfoWithSendStatic, { /// Get the leader info, refresh if expired pub fn get_leader_info(&mut self) -> Option<&T> { From c9c60cc09361398f63055a8cf1024a3df66f6431 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Sat, 2 Nov 2024 17:21:17 +0100 Subject: [PATCH 11/17] Use ConnectionCacheClient::create_client in rpc (for now for CC only) --- rpc/src/rpc.rs | 75 +++++-------------- send-transaction-service/Cargo.toml | 2 +- .../src/create_client_for_tests.rs | 67 +++++++++++++++++ send-transaction-service/src/lib.rs | 3 + .../src/send_transaction_service.rs | 61 +++------------ 5 files changed, 100 insertions(+), 108 deletions(-) create mode 100644 send-transaction-service/src/create_client_for_tests.rs diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 27ee43033f0fac..be98e1aee58cde 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -20,7 +20,7 @@ use { accounts::AccountAddressFilter, accounts_index::{AccountIndex, AccountSecondaryIndexes, IndexKey, ScanConfig}, }, - solana_client::connection_cache::{ConnectionCache, Protocol}, + solana_client::connection_cache::Protocol, solana_entry::entry::Entry, solana_faucet::faucet::request_airdrop_transaction, solana_feature_set as feature_set, @@ -81,9 +81,8 @@ use { }, }, solana_send_transaction_service::{ + create_client_for_tests::ClientWithCreator, send_transaction_service::{SendTransactionService, TransactionInfo}, - tpu_info::NullTpuInfo, - transaction_client::ConnectionCacheClient, }, solana_stake_program, solana_storage_bigtable::Error as StorageError, @@ -349,11 +348,11 @@ impl JsonRpcRequestProcessor { ) } + // TODO(klykov): Why don't than declare it for tests? // Useful for unit testing - pub fn new_from_bank( + pub fn new_from_bank( bank: Bank, socket_addr_space: SocketAddrSpace, - connection_cache: Arc, ) -> Self { let genesis_hash = bank.hash(); let bank_forks = BankForks::new_rw_arc(bank); @@ -368,18 +367,10 @@ impl JsonRpcRequestProcessor { ); ClusterInfo::new(contact_info, keypair, socket_addr_space) }); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .unwrap(); + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); let (sender, receiver) = unbounded(); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), - tpu_address, - None, - None, - 1, - ); + + let client = Client::create_client(my_tpu_address, None, 1); SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone()); let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank)); @@ -4375,6 +4366,10 @@ pub mod tests { }, vote::state::VoteState, }, + solana_send_transaction_service::{ + create_client_for_tests::CreateClient, tpu_info::NullTpuInfo, + transaction_client::ConnectionCacheClient, + }, solana_transaction_status::{ EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta, TransactionDetails, @@ -4759,11 +4754,9 @@ pub mod tests { let bob_pubkey = solana_sdk::pubkey::new_rand(); let genesis = create_genesis_config(100); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::>( bank, SocketAddrSpace::Unspecified, - connection_cache, ); let bank = meta.bank_forks.read().unwrap().root_bank(); @@ -4782,11 +4775,9 @@ pub mod tests { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::>( bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -4814,11 +4805,9 @@ pub mod tests { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::>( bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -4942,11 +4931,9 @@ pub mod tests { let bob_pubkey = solana_sdk::pubkey::new_rand(); let genesis = create_genesis_config(10); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::>( bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -6412,11 +6399,9 @@ pub mod tests { fn test_rpc_send_bad_tx() { let genesis = create_genesis_config(100); let bank = Bank::new_for_tests(&genesis.genesis_config); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let meta = JsonRpcRequestProcessor::new_from_bank( + let meta = JsonRpcRequestProcessor::new_from_bank::>( bank, SocketAddrSpace::Unspecified, - connection_cache, ); let mut io = MetaIoHandler::default(); @@ -6456,11 +6441,7 @@ pub mod tests { ); ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) }); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .unwrap(); + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); let (meta, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), None, @@ -6480,13 +6461,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), - tpu_address, - None, - None, - 1, - ); + let client = ConnectionCacheClient::::create_client(my_tpu_address, None, 1); SendTransactionService::new(&bank_forks, receiver, client, 1000, exit); let mut bad_transaction = system_transaction::transfer( @@ -6726,13 +6701,9 @@ pub mod tests { ))); let cluster_info = Arc::new(new_test_cluster_info()); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .unwrap(); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); let (request_processor, receiver) = JsonRpcRequestProcessor::new( JsonRpcConfig::default(), None, @@ -6752,13 +6723,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - let client = ConnectionCacheClient::::new( - connection_cache.clone(), - tpu_address, - None, - None, - 1, - ); + let client = ConnectionCacheClient::::create_client(my_tpu_address, None, 1); SendTransactionService::new(&bank_forks, receiver, client, 1000, exit); assert_eq!( request_processor.get_block_commitment(0), diff --git a/send-transaction-service/Cargo.toml b/send-transaction-service/Cargo.toml index ba6a52eeda9209..b4dfda75c56a81 100644 --- a/send-transaction-service/Cargo.toml +++ b/send-transaction-service/Cargo.toml @@ -20,6 +20,7 @@ solana-client = { workspace = true } solana-connection-cache = { workspace = true } solana-measure = { workspace = true } solana-metrics = { workspace = true } +solana-quic-client = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } solana-tpu-client-next = { workspace = true } @@ -29,7 +30,6 @@ tokio-util = { workspace = true } [dev-dependencies] solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } -solana-quic-client = { workspace = true } [package.metadata.docs.rs] targets = ["x86_64-unknown-linux-gnu"] diff --git a/send-transaction-service/src/create_client_for_tests.rs b/send-transaction-service/src/create_client_for_tests.rs new file mode 100644 index 00000000000000..6f3486aede7b6f --- /dev/null +++ b/send-transaction-service/src/create_client_for_tests.rs @@ -0,0 +1,67 @@ +//! This module contains functionality required to create tests parametrized +//! with the client type. + +use { + crate::{ + tpu_info::NullTpuInfo, + transaction_client::TransactionClient, + transaction_client::{ + spawn_tpu_client_send_txs, Cancelable, ConnectionCacheClient, TpuClientNextClient, + }, + }, + solana_client::connection_cache::ConnectionCache, + solana_quic_client::quic_client::RUNTIME, + std::{net::SocketAddr, sync::Arc}, +}; + +pub trait CreateClient: TransactionClient { + fn create_client( + my_tpu_address: SocketAddr, + tpu_peers: Option>, + leader_forward_count: u64, + ) -> Self; +} + +impl CreateClient for ConnectionCacheClient { + fn create_client( + my_tpu_address: SocketAddr, + tpu_peers: Option>, + leader_forward_count: u64, + ) -> Self { + let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + ConnectionCacheClient::new( + connection_cache, + my_tpu_address, + tpu_peers, + None, + leader_forward_count, + ) + } +} + +impl CreateClient for TpuClientNextClient { + fn create_client( + my_tpu_address: SocketAddr, + tpu_peers: Option>, + leader_forward_count: u64, + ) -> Self { + let runtime = &*RUNTIME; + spawn_tpu_client_send_txs::( + runtime, + my_tpu_address, + tpu_peers, + None, + leader_forward_count, + ) + } +} + +// Define type alias to simplify definition of test functions. +pub trait ClientWithCreator: + CreateClient + TransactionClient + Cancelable + Send + Clone + 'static +{ +} +impl ClientWithCreator for T where + T: CreateClient + TransactionClient + Cancelable + Send + Clone + 'static +{ +} diff --git a/send-transaction-service/src/lib.rs b/send-transaction-service/src/lib.rs index 960ff1cb3c90e7..bcd32021f4923b 100644 --- a/send-transaction-service/src/lib.rs +++ b/send-transaction-service/src/lib.rs @@ -4,5 +4,8 @@ pub mod send_transaction_service_stats; pub mod tpu_info; pub mod transaction_client; +//TODO(klykov): shall it be under #[cfg(feature = "dev-context-only-utils")]? +pub mod create_client_for_tests; + #[macro_use] extern crate solana_metrics; diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 9a17ba98f7edb7..0212a3461333df 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -475,14 +475,11 @@ mod test { use { super::*, crate::{ + create_client_for_tests::ClientWithCreator, tpu_info::NullTpuInfo, - transaction_client::{ - spawn_tpu_client_send_txs, Cancelable, ConnectionCacheClient, TpuClientNextClient, - }, + transaction_client::{ConnectionCacheClient, TpuClientNextClient}, }, crossbeam_channel::{bounded, unbounded}, - solana_client::connection_cache::ConnectionCache, - solana_quic_client::quic_client::RUNTIME, solana_sdk::{ account::AccountSharedData, genesis_config::create_genesis_config, @@ -494,56 +491,12 @@ mod test { std::ops::Sub, }; - trait CreateClient: TransactionClient { - fn create_client(tpu_peers: Option>, leader_forward_count: u64) -> Self; - } - - impl CreateClient for ConnectionCacheClient { - fn create_client(tpu_peers: Option>, leader_forward_count: u64) -> Self { - let tpu_address = "127.0.0.1:0".parse().unwrap(); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); - - ConnectionCacheClient::new( - connection_cache, - tpu_address, - tpu_peers, - None, - leader_forward_count, - ) - } - } - - impl CreateClient for TpuClientNextClient { - fn create_client(tpu_peers: Option>, leader_forward_count: u64) -> Self { - let runtime = &*RUNTIME; - let tpu_address = "127.0.0.1:0".parse().unwrap(); - - spawn_tpu_client_send_txs::( - runtime, - tpu_address, - tpu_peers, - None, - leader_forward_count, - ) - } - } - - // Define type alias to simplify definition of test functions. - trait ClientWithCreator: - CreateClient + TransactionClient + Cancelable + Send + Clone + 'static - { - } - impl ClientWithCreator for T where - T: CreateClient + TransactionClient + Cancelable + Send + Clone + 'static - { - } - fn service_exit() { let bank = Bank::default_for_tests(); let bank_forks = BankForks::new_rw_arc(bank); let (sender, receiver) = unbounded(); - let client = C::create_client(None, 1); + let client = C::create_client("127.0.0.1:0".parse().unwrap(), None, 1); let send_transaction_service = SendTransactionService::new( &bank_forks, @@ -584,7 +537,7 @@ mod test { }; let exit = Arc::new(AtomicBool::new(false)); - let client = C::create_client(None, 1); + let client = C::create_client("127.0.0.1:0".parse().unwrap(), None, 1); let _send_transaction_service = SendTransactionService::new(&bank_forks, receiver, client.clone(), 1000, exit.clone()); @@ -675,7 +628,11 @@ mod test { ), ); - let client = C::create_client(config.tpu_peers, leader_forward_count); + let client = C::create_client( + "127.0.0.1:0".parse().unwrap(), + config.tpu_peers, + leader_forward_count, + ); let result = SendTransactionService::process_transactions( &working_bank, &root_bank, From 580312a41edd48546f26d429b47828650ed54525 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 4 Nov 2024 07:49:19 +0100 Subject: [PATCH 12/17] Update Cargo.lock --- Cargo.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.lock b/Cargo.lock index 3ae2e23ae2b7c3..9645f2fbbcd3d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8158,6 +8158,7 @@ dependencies = [ "solana-logger", "solana-measure", "solana-metrics", + "solana-quic-client", "solana-runtime", "solana-sdk", "solana-tpu-client-next", From 708a24b3e192800d5e76a93fa14a15e828a12a0f Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 4 Nov 2024 09:11:22 +0100 Subject: [PATCH 13/17] fix compilation error in one test --- send-transaction-service/src/send_transaction_service.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/send-transaction-service/src/send_transaction_service.rs b/send-transaction-service/src/send_transaction_service.rs index 0212a3461333df..4cb60bbcc36705 100644 --- a/send-transaction-service/src/send_transaction_service.rs +++ b/send-transaction-service/src/send_transaction_service.rs @@ -931,7 +931,11 @@ mod test { ), ); let stats = SendTransactionServiceStats::default(); - let client = C::create_client(config.tpu_peers, leader_forward_count); + let client = C::create_client( + "127.0.0.1:0".parse().unwrap(), + config.tpu_peers, + leader_forward_count, + ); let result = SendTransactionService::process_transactions( &working_bank, &root_bank, From 1cac5c2f790cec8a2408205aeea9aadc881d66e2 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 4 Nov 2024 10:47:04 +0100 Subject: [PATCH 14/17] parametrize rpc tests with clients --- rpc/src/rpc.rs | 111 +++++++++++++++++++++++++++++++++++-------------- 1 file changed, 80 insertions(+), 31 deletions(-) diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index be98e1aee58cde..daad0e38e9d25b 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -4367,8 +4367,9 @@ pub mod tests { vote::state::VoteState, }, solana_send_transaction_service::{ - create_client_for_tests::CreateClient, tpu_info::NullTpuInfo, - transaction_client::ConnectionCacheClient, + create_client_for_tests::CreateClient, + tpu_info::NullTpuInfo, + transaction_client::{ConnectionCacheClient, TpuClientNextClient}, }, solana_transaction_status::{ EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta, @@ -4749,15 +4750,11 @@ pub mod tests { } } - #[test] - fn test_rpc_request_processor_new() { + fn rpc_request_processor_new() { let bob_pubkey = solana_sdk::pubkey::new_rand(); let genesis = create_genesis_config(100); let bank = Bank::new_for_tests(&genesis.genesis_config); - let meta = JsonRpcRequestProcessor::new_from_bank::>( - bank, - SocketAddrSpace::Unspecified, - ); + let meta = JsonRpcRequestProcessor::new_from_bank::(bank, SocketAddrSpace::Unspecified); let bank = meta.bank_forks.read().unwrap().root_bank(); bank.transfer(20, &genesis.mint_keypair, &bob_pubkey) @@ -4771,14 +4768,20 @@ pub mod tests { } #[test] - fn test_rpc_get_balance() { + fn test_rpc_request_processor_new_connection_cache() { + rpc_request_processor_new::>(); + } + + #[test] + fn test_rpc_request_processor_new_tpu_client_next() { + rpc_request_processor_new::(); + } + + fn rpc_get_balance() { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Bank::new_for_tests(&genesis.genesis_config); - let meta = JsonRpcRequestProcessor::new_from_bank::>( - bank, - SocketAddrSpace::Unspecified, - ); + let meta = JsonRpcRequestProcessor::new_from_bank::(bank, SocketAddrSpace::Unspecified); let mut io = MetaIoHandler::default(); io.extend_with(rpc_minimal::MinimalImpl.to_delegate()); @@ -4801,14 +4804,20 @@ pub mod tests { } #[test] - fn test_rpc_get_balance_via_client() { + fn test_rpc_get_balance_new_connection_cache() { + rpc_get_balance::>(); + } + + #[test] + fn test_rpc_get_balance_new_tpu_client_next() { + rpc_get_balance::(); + } + + fn rpc_get_balance_via_client() { let genesis = create_genesis_config(20); let mint_pubkey = genesis.mint_keypair.pubkey(); let bank = Bank::new_for_tests(&genesis.genesis_config); - let meta = JsonRpcRequestProcessor::new_from_bank::>( - bank, - SocketAddrSpace::Unspecified, - ); + let meta = JsonRpcRequestProcessor::new_from_bank::(bank, SocketAddrSpace::Unspecified); let mut io = MetaIoHandler::default(); io.extend_with(rpc_minimal::MinimalImpl.to_delegate()); @@ -4832,6 +4841,15 @@ pub mod tests { assert_eq!(response, 20); } + #[test] + fn test_rpc_get_balance_via_client_connection_cache() { + rpc_get_balance_via_client::>(); + } + #[test] + fn test_rpc_get_balance_via_client_tpu_client_next() { + rpc_get_balance_via_client::(); + } + #[test] fn test_rpc_get_cluster_nodes() { let rpc = RpcHandler::start(); @@ -4926,15 +4944,11 @@ pub mod tests { assert_eq!(result, expected); } - #[test] - fn test_rpc_get_tx_count() { + fn rpc_get_tx_count() { let bob_pubkey = solana_sdk::pubkey::new_rand(); let genesis = create_genesis_config(10); let bank = Bank::new_for_tests(&genesis.genesis_config); - let meta = JsonRpcRequestProcessor::new_from_bank::>( - bank, - SocketAddrSpace::Unspecified, - ); + let meta = JsonRpcRequestProcessor::new_from_bank::(bank, SocketAddrSpace::Unspecified); let mut io = MetaIoHandler::default(); io.extend_with(rpc_minimal::MinimalImpl.to_delegate()); @@ -4960,6 +4974,15 @@ pub mod tests { assert_eq!(result, expected); } + #[test] + fn test_rpc_get_tx_count_connection_cache() { + rpc_get_tx_count::>(); + } + #[test] + fn test_rpc_get_tx_count_tpu_client_next() { + rpc_get_tx_count::(); + } + #[test] fn test_rpc_minimum_ledger_slot() { let rpc = RpcHandler::start(); @@ -6395,8 +6418,7 @@ pub mod tests { assert_eq!(result, expected); } - #[test] - fn test_rpc_send_bad_tx() { + fn rpc_send_bad_tx() { let genesis = create_genesis_config(100); let bank = Bank::new_for_tests(&genesis.genesis_config); let meta = JsonRpcRequestProcessor::new_from_bank::>( @@ -6415,7 +6437,15 @@ pub mod tests { } #[test] - fn test_rpc_send_transaction_preflight() { + fn test_rpc_send_bad_tx_connection_cache() { + rpc_send_bad_tx::>(); + } + #[test] + fn test_rpc_send_bad_tx_tpu_client_next() { + rpc_send_bad_tx::(); + } + + fn rpc_send_transaction_preflight() { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(exit.clone()); let ledger_path = get_tmp_ledger_path!(); @@ -6461,7 +6491,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - let client = ConnectionCacheClient::::create_client(my_tpu_address, None, 1); + let client = C::create_client(my_tpu_address, None, 1); SendTransactionService::new(&bank_forks, receiver, client, 1000, exit); let mut bad_transaction = system_transaction::transfer( @@ -6565,6 +6595,16 @@ pub mod tests { ); } + #[test] + fn test_rpc_send_transaction_preflight_with_connection_cache() { + rpc_send_transaction_preflight::>(); + } + + #[test] + fn test_rpc_send_transaction_preflight_with_tpu_client_next() { + rpc_send_transaction_preflight::(); + } + #[test] fn test_rpc_verify_filter() { let filter = RpcFilterType::Memcmp(Memcmp::new( @@ -6677,8 +6717,7 @@ pub mod tests { assert_eq!(result, expected); } - #[test] - fn test_rpc_processor_get_block_commitment() { + fn rpc_processor_get_block_commitment() { let exit = Arc::new(AtomicBool::new(false)); let validator_exit = create_validator_exit(exit.clone()); let bank_forks = new_bank_forks().0; @@ -6723,7 +6762,7 @@ pub mod tests { Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), ); - let client = ConnectionCacheClient::::create_client(my_tpu_address, None, 1); + let client = C::create_client(my_tpu_address, None, 1); SendTransactionService::new(&bank_forks, receiver, client, 1000, exit); assert_eq!( request_processor.get_block_commitment(0), @@ -6748,6 +6787,16 @@ pub mod tests { ); } + #[test] + fn test_rpc_processor_get_block_commitment_with_connection_cache() { + rpc_processor_get_block_commitment::>(); + } + + #[test] + fn test_rpc_processor_get_block_commitment_with_tpu_client_next() { + rpc_processor_get_block_commitment::(); + } + #[test] fn test_rpc_get_block_commitment() { let rpc = RpcHandler::start(); From b1963e3f19bf30e145919ce44dd8c46f2a937485 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 4 Nov 2024 17:11:13 +0100 Subject: [PATCH 15/17] Parametrized JsonRpcRequestProcessor and its tests --- core/src/validator.rs | 17 ++++- rpc/src/lib.rs | 2 +- rpc/src/rpc.rs | 7 +- rpc/src/rpc_service.rs | 67 ++++++++++--------- .../src/transaction_client.rs | 10 +++ 5 files changed, 63 insertions(+), 40 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index f8eb2a06b36e9c..9f73f49bde23cd 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -83,6 +83,7 @@ use { }, solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_rpc::{ + cluster_tpu_info::ClusterTpuInfo, max_slots::MaxSlots, optimistically_confirmed_bank_tracker::{ BankNotificationSenderConfig, OptimisticallyConfirmedBank, @@ -125,6 +126,7 @@ use { timing::timestamp, }, solana_send_transaction_service::send_transaction_service, + solana_send_transaction_service::transaction_client::ConnectionCacheClient, solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, solana_turbine::{self, broadcast_stage::BroadcastStageType}, solana_unified_scheduler_pool::DefaultSchedulerPool, @@ -1044,6 +1046,18 @@ impl Validator { None }; + let my_tpu_address = cluster_info + .my_contact_info() + .tpu(connection_cache.protocol()) + .map_err(|err| ValidatorError::Other(format!("{err}")))?; + let leader_info = ClusterTpuInfo::new(cluster_info.clone(), poh_recorder.clone()); + let client = ConnectionCacheClient::new( + connection_cache.clone(), + my_tpu_address, + config.send_transaction_service_config.tpu_peers.clone(), + Some(leader_info), + config.send_transaction_service_config.leader_forward_count, + ); let json_rpc_service = JsonRpcService::new( rpc_addr, config.rpc_config.clone(), @@ -1052,7 +1066,6 @@ impl Validator { block_commitment_cache.clone(), blockstore.clone(), cluster_info.clone(), - Some(poh_recorder.clone()), genesis_config.hash(), ledger_path, config.validator_exit.clone(), @@ -1063,7 +1076,7 @@ impl Validator { config.send_transaction_service_config.clone(), max_slots.clone(), leader_schedule_cache.clone(), - connection_cache.clone(), + client, max_complete_transaction_status_slot, max_complete_rewards_slot, prioritization_fee_cache.clone(), diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 9763ebd791a162..21aea6d8dc20d6 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -1,5 +1,5 @@ #![allow(clippy::arithmetic_side_effects)] -mod cluster_tpu_info; +pub mod cluster_tpu_info; pub mod filter; pub mod max_slots; pub mod optimistically_confirmed_bank_tracker; diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index daad0e38e9d25b..dc79c4e349439a 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -348,7 +348,6 @@ impl JsonRpcRequestProcessor { ) } - // TODO(klykov): Why don't than declare it for tests? // Useful for unit testing pub fn new_from_bank( bank: Bank, @@ -4367,7 +4366,6 @@ pub mod tests { vote::state::VoteState, }, solana_send_transaction_service::{ - create_client_for_tests::CreateClient, tpu_info::NullTpuInfo, transaction_client::{ConnectionCacheClient, TpuClientNextClient}, }, @@ -6421,10 +6419,7 @@ pub mod tests { fn rpc_send_bad_tx() { let genesis = create_genesis_config(100); let bank = Bank::new_for_tests(&genesis.genesis_config); - let meta = JsonRpcRequestProcessor::new_from_bank::>( - bank, - SocketAddrSpace::Unspecified, - ); + let meta = JsonRpcRequestProcessor::new_from_bank::(bank, SocketAddrSpace::Unspecified); let mut io = MetaIoHandler::default(); io.extend_with(rpc_full::FullImpl.to_delegate()); diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index a5290befa33b4d..fa799e78cd7e95 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -2,7 +2,6 @@ use { crate::{ - cluster_tpu_info::ClusterTpuInfo, max_slots::MaxSlots, optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, rpc::{rpc_accounts::*, rpc_accounts_scan::*, rpc_bank::*, rpc_full::*, rpc_minimal::*, *}, @@ -16,7 +15,6 @@ use { RequestMiddlewareAction, ServerBuilder, }, regex::Regex, - solana_client::connection_cache::ConnectionCache, solana_gossip::cluster_info::ClusterInfo, solana_ledger::{ bigtable_upload::ConfirmedBlockUploadConfig, @@ -25,7 +23,6 @@ use { }, solana_metrics::inc_new_counter_info, solana_perf::thread::renice_this_thread, - solana_poh::poh_recorder::PohRecorder, solana_runtime::{ bank_forks::BankForks, commitment::BlockCommitmentCache, prioritization_fee_cache::PrioritizationFeeCache, @@ -38,7 +35,7 @@ use { }, solana_send_transaction_service::{ send_transaction_service::{self, SendTransactionService}, - transaction_client::ConnectionCacheClient, + transaction_client::TransactionClient, }, solana_storage_bigtable::CredentialType, std::{ @@ -335,7 +332,7 @@ fn process_rest(bank_forks: &Arc>, path: &str) -> Option( rpc_addr: SocketAddr, config: JsonRpcConfig, snapshot_config: Option, @@ -343,7 +340,6 @@ impl JsonRpcService { block_commitment_cache: Arc>, blockstore: Arc, cluster_info: Arc, - poh_recorder: Option>>, genesis_hash: Hash, ledger_path: &Path, validator_exit: Arc>, @@ -354,7 +350,7 @@ impl JsonRpcService { send_transaction_service_config: send_transaction_service::Config, max_slots: Arc, leader_schedule_cache: Arc, - connection_cache: Arc, + client: Client, max_complete_transaction_status_slot: Arc, max_complete_rewards_slot: Arc, prioritization_fee_cache: Arc, @@ -376,11 +372,6 @@ impl JsonRpcService { LARGEST_ACCOUNTS_CACHE_DURATION, ))); - let tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .map_err(|err| format!("{err}"))?; - // sadly, some parts of our current rpc implemention block the jsonrpc's // _socket-listening_ event loop for too long, due to (blocking) long IO or intesive CPU, // causing no further processing of incoming requests and ultimatily innocent clients timing-out. @@ -475,15 +466,6 @@ impl JsonRpcService { prioritization_fee_cache, ); - let leader_info = - poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder)); - let client = ConnectionCacheClient::new( - connection_cache, - tpu_address, - send_transaction_service_config.tpu_peers.clone(), //TODO(klykov): check if we can avoid cloning - leader_info, - send_transaction_service_config.leader_forward_count, - ); let _send_transaction_service = SendTransactionService::new_with_config( &bank_forks, receiver, @@ -591,6 +573,7 @@ mod tests { use { super::*, crate::rpc::{create_validator_exit, tests::new_test_cluster_info}, + solana_client::connection_cache::Protocol, solana_ledger::{ genesis_utils::{create_genesis_config, GenesisConfigInfo}, get_tmp_ledger_path_auto_delete, @@ -601,6 +584,12 @@ mod tests { genesis_config::{ClusterType, DEFAULT_GENESIS_ARCHIVE}, signature::Signer, }, + solana_send_transaction_service::{ + create_client_for_tests::ClientWithCreator, + send_transaction_service::{self}, + tpu_info::NullTpuInfo, + transaction_client::{ConnectionCacheClient, TpuClientNextClient}, + }, std::{ io::Write, net::{IpAddr, Ipv4Addr}, @@ -608,8 +597,7 @@ mod tests { tokio::runtime::Runtime, }; - #[test] - fn test_rpc_new() { + fn rpc_new() { let GenesisConfigInfo { genesis_config, mint_keypair, @@ -630,7 +618,19 @@ mod tests { let block_commitment_cache = Arc::new(RwLock::new(BlockCommitmentCache::default())); let optimistically_confirmed_bank = OptimisticallyConfirmedBank::locked_from_bank_forks_root(&bank_forks); - let connection_cache = Arc::new(ConnectionCache::new("connection_cache_test")); + + let send_transaction_service_config = send_transaction_service::Config { + retry_rate_ms: 1000, + leader_forward_count: 1, + ..send_transaction_service::Config::default() + }; + + let my_tpu_address = cluster_info.my_contact_info().tpu(Protocol::QUIC).unwrap(); + let client = C::create_client( + my_tpu_address, + send_transaction_service_config.tpu_peers.clone(), + send_transaction_service_config.leader_forward_count, + ); let mut rpc_service = JsonRpcService::new( rpc_addr, JsonRpcConfig::default(), @@ -639,7 +639,6 @@ mod tests { block_commitment_cache, blockstore, cluster_info, - None, Hash::default(), &PathBuf::from("farf"), validator_exit, @@ -647,14 +646,10 @@ mod tests { Arc::new(AtomicBool::new(false)), Arc::new(AtomicBool::new(true)), optimistically_confirmed_bank, - send_transaction_service::Config { - retry_rate_ms: 1000, - leader_forward_count: 1, - ..send_transaction_service::Config::default() - }, + send_transaction_service_config, Arc::new(MaxSlots::default()), Arc::new(LeaderScheduleCache::default()), - connection_cache, + client, Arc::new(AtomicU64::default()), Arc::new(AtomicU64::default()), Arc::new(PrioritizationFeeCache::default()), @@ -675,6 +670,16 @@ mod tests { rpc_service.join().unwrap(); } + #[test] + fn test_rpc_new_with_connection_cache() { + rpc_new::>(); + } + + #[test] + fn test_rpc_new_with_tpu_client_next() { + rpc_new::(); + } + fn create_bank_forks() -> Arc> { let GenesisConfigInfo { mut genesis_config, .. diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index 55b5878ad5e2c8..e37f77bb8557df 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -32,6 +32,8 @@ pub trait TransactionClient { wire_transactions: Vec>, stats: &SendTransactionServiceStats, ); + + fn protocol(&self) -> Protocol; } pub trait Cancelable { @@ -144,6 +146,10 @@ where self.send_transactions(address, wire_transactions.clone(), stats); } } + + fn protocol(&self) -> Protocol { + self.connection_cache.protocol() + } } impl Cancelable for ConnectionCacheClient @@ -214,6 +220,10 @@ impl TransactionClient for TpuClientNextClient { } } } + + fn protocol(&self) -> Protocol { + Protocol::QUIC + } } pub(crate) fn spawn_tpu_client_send_txs( From 08ba2b10690065c2c2c8aae61f70671b403722ce Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 4 Nov 2024 17:22:06 +0100 Subject: [PATCH 16/17] Add use-tpu-client-next validator argument --- core/src/validator.rs | 2 ++ local-cluster/src/validator_configs.rs | 1 + validator/src/cli.rs | 10 ++++++++++ validator/src/main.rs | 1 + 4 files changed, 14 insertions(+) diff --git a/core/src/validator.rs b/core/src/validator.rs index 9f73f49bde23cd..f26e7d00091087 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -289,6 +289,7 @@ pub struct ValidatorConfig { pub replay_transactions_threads: NonZeroUsize, pub tvu_shred_sigverify_threads: NonZeroUsize, pub delay_leader_block_for_pending_fork: bool, + pub use_tpu_client_next: bool, } impl Default for ValidatorConfig { @@ -362,6 +363,7 @@ impl Default for ValidatorConfig { replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"), tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"), delay_leader_block_for_pending_fork: false, + use_tpu_client_next: false, } } } diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index 6031b637d01e50..61f301e85424a7 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -74,6 +74,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { replay_transactions_threads: config.replay_transactions_threads, tvu_shred_sigverify_threads: config.tvu_shred_sigverify_threads, delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork, + use_tpu_client_next: config.use_tpu_client_next, } } diff --git a/validator/src/cli.rs b/validator/src/cli.rs index eaacee531d5e22..de0c3f72916d4a 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -1556,6 +1556,16 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { was created before we started creating ours.", ), ) + .arg( + Arg::with_name("use_tpu_client_next") + .hidden(hidden_unless_forced()) + .long("use-tpu-client-next") + .takes_value(false) + .help( + "Use tpu-client-next crate to send transactions over TPU ports. If not set,\ + ConnectionCache is used instead." + ), + ) .arg( Arg::with_name("block_verification_method") .long("block-verification-method") diff --git a/validator/src/main.rs b/validator/src/main.rs index 0f8c2af1d16ec3..39b9c9d1baf8ed 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1549,6 +1549,7 @@ pub fn main() { tvu_shred_sigverify_threads: tvu_sigverify_threads, delay_leader_block_for_pending_fork: matches .is_present("delay_leader_block_for_pending_fork"), + use_tpu_client_next: matches.is_present("use_tpu_client_next"), wen_restart_proto_path: value_t!(matches, "wen_restart", PathBuf).ok(), wen_restart_coordinator: value_t!(matches, "wen_restart_coordinator", Pubkey).ok(), ..ValidatorConfig::default() From f5c21d1573f0fb5a5cbb5f1ef9c073f90625ee50 Mon Sep 17 00:00:00 2001 From: Kirill Lykov Date: Mon, 4 Nov 2024 18:26:11 +0100 Subject: [PATCH 17/17] Create in validator one of the two clients --- core/src/validator.rs | 127 ++++++++++++------ .../src/create_client_for_tests.rs | 1 + .../src/transaction_client.rs | 7 +- 3 files changed, 93 insertions(+), 42 deletions(-) diff --git a/core/src/validator.rs b/core/src/validator.rs index f26e7d00091087..a03f7b81006b0d 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -81,6 +81,7 @@ use { poh_recorder::PohRecorder, poh_service::{self, PohService}, }, + solana_quic_client::quic_client::RUNTIME, solana_rayon_threadlimit::{get_max_thread_count, get_thread_count}, solana_rpc::{ cluster_tpu_info::ClusterTpuInfo, @@ -125,8 +126,10 @@ use { signature::{Keypair, Signer}, timing::timestamp, }, - solana_send_transaction_service::send_transaction_service, - solana_send_transaction_service::transaction_client::ConnectionCacheClient, + solana_send_transaction_service::{ + send_transaction_service, + transaction_client::{spawn_tpu_client_send_txs, ConnectionCacheClient}, + }, solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, solana_turbine::{self, broadcast_stage::BroadcastStageType}, solana_unified_scheduler_pool::DefaultSchedulerPool, @@ -995,6 +998,8 @@ impl Validator { let staked_nodes = Arc::new(RwLock::new(StakedNodes::default())); + // ConnectionCache might be used for JsonRpc and for Forwarding. Since the later is not migrated yet to the tpu-client-next, + // create ConnectionCache regardless of config.use_tpu_client_next for now let connection_cache = match use_quic { true => { let connection_cache = ConnectionCache::new_with_client_options( @@ -1048,42 +1053,82 @@ impl Validator { None }; - let my_tpu_address = cluster_info - .my_contact_info() - .tpu(connection_cache.protocol()) - .map_err(|err| ValidatorError::Other(format!("{err}")))?; let leader_info = ClusterTpuInfo::new(cluster_info.clone(), poh_recorder.clone()); - let client = ConnectionCacheClient::new( - connection_cache.clone(), - my_tpu_address, - config.send_transaction_service_config.tpu_peers.clone(), - Some(leader_info), - config.send_transaction_service_config.leader_forward_count, - ); - let json_rpc_service = JsonRpcService::new( - rpc_addr, - config.rpc_config.clone(), - Some(config.snapshot_config.clone()), - bank_forks.clone(), - block_commitment_cache.clone(), - blockstore.clone(), - cluster_info.clone(), - genesis_config.hash(), - ledger_path, - config.validator_exit.clone(), - exit.clone(), - rpc_override_health_check.clone(), - startup_verification_complete, - optimistically_confirmed_bank.clone(), - config.send_transaction_service_config.clone(), - max_slots.clone(), - leader_schedule_cache.clone(), - client, - max_complete_transaction_status_slot, - max_complete_rewards_slot, - prioritization_fee_cache.clone(), - ) - .map_err(ValidatorError::Other)?; + // TODO(klykov): consider using Box to make this shorter? + let json_rpc_service = if config.use_tpu_client_next { + let my_tpu_address = cluster_info + .my_contact_info() + .tpu(Protocol::QUIC) + .map_err(|err| ValidatorError::Other(format!("{err}")))?; + let client = spawn_tpu_client_send_txs( + &*RUNTIME, // use the same runtime as ConnectionCache + my_tpu_address, + config.send_transaction_service_config.tpu_peers.clone(), + Some(leader_info), + config.send_transaction_service_config.leader_forward_count, + Some((*identity_keypair).insecure_clone()), + ); + JsonRpcService::new( + rpc_addr, + config.rpc_config.clone(), + Some(config.snapshot_config.clone()), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + genesis_config.hash(), + ledger_path, + config.validator_exit.clone(), + exit.clone(), + rpc_override_health_check.clone(), + startup_verification_complete, + optimistically_confirmed_bank.clone(), + config.send_transaction_service_config.clone(), + max_slots.clone(), + leader_schedule_cache.clone(), + client, + max_complete_transaction_status_slot, + max_complete_rewards_slot, + prioritization_fee_cache.clone(), + ) + .map_err(ValidatorError::Other)? + } else { + let my_tpu_address = cluster_info + .my_contact_info() + .tpu(connection_cache.protocol()) + .map_err(|err| ValidatorError::Other(format!("{err}")))?; + let client = ConnectionCacheClient::new( + connection_cache.clone(), + my_tpu_address, + config.send_transaction_service_config.tpu_peers.clone(), + Some(leader_info), + config.send_transaction_service_config.leader_forward_count, + ); + JsonRpcService::new( + rpc_addr, + config.rpc_config.clone(), + Some(config.snapshot_config.clone()), + bank_forks.clone(), + block_commitment_cache.clone(), + blockstore.clone(), + cluster_info.clone(), + genesis_config.hash(), + ledger_path, + config.validator_exit.clone(), + exit.clone(), + rpc_override_health_check.clone(), + startup_verification_complete, + optimistically_confirmed_bank.clone(), + config.send_transaction_service_config.clone(), + max_slots.clone(), + leader_schedule_cache.clone(), + client, + max_complete_transaction_status_slot, + max_complete_rewards_slot, + prioritization_fee_cache.clone(), + ) + .map_err(ValidatorError::Other)? + }; let pubsub_service = if !config.rpc_config.full_api { None @@ -1431,7 +1476,7 @@ impl Validator { config.wait_to_vote_slot, accounts_background_request_sender.clone(), config.runtime_config.log_messages_bytes_limit, - json_rpc_service.is_some().then_some(&connection_cache), // for the cache warmer only used for STS for RPC service + (json_rpc_service.is_some() && config.use_tpu_client_next).then_some(&connection_cache), // for the cache warmer only used for STS for RPC service &prioritization_fee_cache, banking_tracer.clone(), turbine_quic_endpoint_sender.clone(), @@ -1523,7 +1568,11 @@ impl Validator { ); *start_progress.write().unwrap() = ValidatorStartProgress::Running; - key_notifies.push(connection_cache); + if config.use_tpu_client_next { + unimplemented!(); + } else { + key_notifies.push(connection_cache); + } *admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit { bank_forks: bank_forks.clone(), diff --git a/send-transaction-service/src/create_client_for_tests.rs b/send-transaction-service/src/create_client_for_tests.rs index 6f3486aede7b6f..a26b8dfc9ab2d9 100644 --- a/send-transaction-service/src/create_client_for_tests.rs +++ b/send-transaction-service/src/create_client_for_tests.rs @@ -52,6 +52,7 @@ impl CreateClient for TpuClientNextClient { tpu_peers, None, leader_forward_count, + None, ) } } diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index e37f77bb8557df..1c25a603649d77 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -5,6 +5,7 @@ use { solana_client::connection_cache::{ConnectionCache, Protocol}, solana_connection_cache::client_connection::ClientConnection as TpuConnection, solana_measure::measure::Measure, + solana_sdk::signature::Keypair, solana_tpu_client_next::{ connection_workers_scheduler::ConnectionWorkersSchedulerConfig, leader_updater::LeaderUpdater, transaction_batch::TransactionBatch, @@ -226,12 +227,13 @@ impl TransactionClient for TpuClientNextClient { } } -pub(crate) fn spawn_tpu_client_send_txs( +pub fn spawn_tpu_client_send_txs( runtime: &Runtime, my_tpu_address: SocketAddr, tpu_peers: Option>, leader_info: Option, leader_forward_count: u64, + validator_identity: Option, ) -> TpuClientNextClient where T: TpuInfoWithSendStatic, @@ -239,7 +241,6 @@ where let leader_info_provider = CurrentLeaderInfo::new(leader_info); let (sender, receiver) = mpsc::channel(16); // random number of now - let validator_identity = None; let cancel = CancellationToken::new(); let _handle = runtime.spawn({ let cancel = cancel.clone(); @@ -251,7 +252,7 @@ where }; let config = ConnectionWorkersSchedulerConfig { bind: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0), - stake_identity: validator_identity, //TODO In CC, do we send with identity? + stake_identity: validator_identity, num_connections: 1, skip_check_transaction_age: true, worker_channel_size: 2,