diff --git a/.github/workflows/clippy_test.yml b/.github/workflows/clippy_test.yml index f267d618..625a9515 100644 --- a/.github/workflows/clippy_test.yml +++ b/.github/workflows/clippy_test.yml @@ -28,7 +28,7 @@ jobs: - uses: actions-rust-lang/setup-rust-toolchain@v1 with: # use toolchain version from rust-toolchain.toml - toolchain: nightly-2024-01-05 + toolchain: nightly-2023-10-05 components: rustfmt, clippy cache: true # avoid the default "-D warnings" which thrashes cache @@ -48,5 +48,5 @@ jobs: - name: Run fmt+clippy run: | - cargo +nightly-2024-01-05 fmt --all --check - cargo +nightly-2024-01-05 clippy --locked --workspace --all-targets -- -D warnings + cargo +nightly-2023-10-05 fmt --all --check + cargo +nightly-2023-10-05 clippy --locked --workspace --all-targets -- -D warnings diff --git a/Cargo.lock b/Cargo.lock index d46b09dd..2c821591 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -392,8 +392,8 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", - "zstd 0.13.0", - "zstd-safe 7.0.0", + "zstd 0.13.1", + "zstd-safe 7.1.0", ] [[package]] @@ -460,9 +460,9 @@ dependencies = [ [[package]] name = "autocfg" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +checksum = "f1fdabc7756949593fe60f30ec81974b613357de856987752631dea1e3394c80" [[package]] name = "autotools" @@ -572,7 +572,7 @@ version = "0.2.4" dependencies = [ "anyhow", "bincode", - "clap 4.5.3", + "clap 4.5.4", "csv", "dashmap 5.5.3", "dirs", @@ -868,9 +868,9 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.35" +version = "0.4.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eaf5903dcbc0a39312feb77df2ff4c76387d591b9fc7b04a238dcf8bb62639a" +checksum = "8a0d04d43504c61aa6c7531f1871dd0d418d91130162063b789da00fd7057a5e" dependencies = [ "android-tzdata", "iana-time-zone", @@ -923,9 +923,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.3" +version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "949626d00e063efc93b6dca932419ceb5432f99769911c0b995f7e884c778813" +checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" dependencies = [ "clap_builder", "clap_derive", @@ -945,9 +945,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.3" +version = "4.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90239a040c80f5e14809ca132ddc4176ab33d5e17e49691793296e3fcb34d72f" +checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -2236,9 +2236,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1a46d1a171d865aa5f83f92695765caa047a9b4cbae2cbf37dbd613a793fd4c" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "jemalloc-sys" @@ -2566,7 +2566,7 @@ dependencies = [ "bytes", "cap", "chrono", - "clap 4.5.3", + "clap 4.5.4", "const_env", "dashmap 5.5.3", "dotenv", @@ -2675,9 +2675,9 @@ dependencies = [ [[package]] name = "memoffset" -version = "0.9.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" dependencies = [ "autocfg", ] @@ -3686,7 +3686,7 @@ dependencies = [ "aho-corasick", "memchr", "regex-automata 0.4.6", - "regex-syntax 0.8.2", + "regex-syntax 0.8.3", ] [[package]] @@ -3706,7 +3706,7 @@ checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.2", + "regex-syntax 0.8.3", ] [[package]] @@ -3717,9 +3717,9 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "regex-syntax" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" [[package]] name = "reqwest" @@ -4033,9 +4033,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.114" +version = "1.0.115" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5f09b1bd632ef549eaa9f60a1f8de742bdbc698e6cee2095fc84dde5f549ae0" +checksum = "12dc5c46daa8e9fdf4f5e71b6cf9a53f2487da0e86e55808e2d35539666497dd" dependencies = [ "itoa", "ryu", @@ -4537,7 +4537,7 @@ dependencies = [ "async-trait", "bench", "chrono", - "clap 4.5.3", + "clap 4.5.4", "futures", "futures-util", "itertools 0.10.5", @@ -4703,7 +4703,7 @@ dependencies = [ "bs58", "bytes", "chrono", - "clap 4.5.3", + "clap 4.5.4", "dashmap 5.5.3", "dotenv", "futures", @@ -4742,7 +4742,7 @@ dependencies = [ "bs58", "bytes", "chrono", - "clap 4.5.3", + "clap 4.5.4", "countmap", "crossbeam-channel", "dashmap 5.5.3", @@ -4960,7 +4960,7 @@ dependencies = [ "libsecp256k1", "light-poseidon", "log", - "memoffset 0.9.0", + "memoffset 0.9.1", "num-bigint 0.4.4", "num-derive 0.3.3", "num-traits 0.2.18", @@ -6852,11 +6852,11 @@ dependencies = [ [[package]] name = "zstd" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bffb3309596d527cfcba7dfc6ed6052f1d39dfbd7c867aa2e865e4a449c10110" +checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" dependencies = [ - "zstd-safe 7.0.0", + "zstd-safe 7.1.0", ] [[package]] @@ -6871,18 +6871,18 @@ dependencies = [ [[package]] name = "zstd-safe" -version = "7.0.0" +version = "7.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43747c7422e2924c11144d5229878b98180ef8b06cca4ab5af37afc8a8d8ea3e" +checksum = "1cd99b45c6bc03a018c8b8a86025678c87e55526064e38f9df301989dce7ec0a" dependencies = [ "zstd-sys", ] [[package]] name = "zstd-sys" -version = "2.0.9+zstd.1.5.5" +version = "2.0.10+zstd.1.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e16efa8a874a0481a574084d34cc26fdb3b99627480f785888deb6386506656" +checksum = "c253a4914af5bafc8fa8c86ee400827e83cf6ec01195ec1f1ed8441bf00d65aa" dependencies = [ "cc", "pkg-config", diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index c1a5f50d..e57b806b 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -270,12 +270,12 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option, (cu_requested, prioritization_fees) } -// not called pub fn create_block_processing_task( grpc_addr: String, grpc_x_token: Option, block_sx: async_channel::Sender, commitment_level: CommitmentLevel, + exit_notfier: Arc, ) -> AnyhowJoinHandle { tokio::spawn(async move { loop { @@ -307,33 +307,44 @@ pub fn create_block_processing_task( ) .await?; - while let Some(message) = stream.next().await { - let message = message?; - - let Some(update) = message.update_oneof else { - continue; - }; - - match update { - UpdateOneof::Block(block) => { - log::trace!( - "received block, hash: {} slot: {}", - block.blockhash, - block.slot - ); - block_sx - .send(block) - .await - .context("Problem sending on block channel")?; + loop { + tokio::select! { + message = stream.next() => { + let Some(Ok(message)) = message else { + break; + }; + + let Some(update) = message.update_oneof else { + continue; + }; + + match update { + UpdateOneof::Block(block) => { + log::trace!( + "received block, hash: {} slot: {}", + block.blockhash, + block.slot + ); + block_sx + .send(block) + .await + .context("Problem sending on block channel")?; + } + UpdateOneof::Ping(_) => { + log::trace!("GRPC Ping"); + } + _ => { + log::trace!("unknown GRPC notification"); + } + }; + }, + _ = exit_notfier.notified() => { + break; } - UpdateOneof::Ping(_) => { - log::trace!("GRPC Ping"); - } - _ => { - log::trace!("unknown GRPC notification"); - } - }; + } } + drop(stream); + drop(client); log::error!("Grpc block subscription broken (resubscribing)"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; } diff --git a/core/src/structures/prioritization_fee_heap.rs b/core/src/structures/prioritization_fee_heap.rs index 6bd26a8a..dd47e1fd 100644 --- a/core/src/structures/prioritization_fee_heap.rs +++ b/core/src/structures/prioritization_fee_heap.rs @@ -85,7 +85,7 @@ impl PrioritizationFeesHeap { } } - pub async fn remove_expired_transactions(&self, current_blockheight: u64) { + pub async fn remove_expired_transactions(&self, current_blockheight: u64) -> usize { let mut write_lock = self.map.lock().await; let mut cells_to_remove = vec![]; let mut signatures_to_remove = vec![]; @@ -104,9 +104,11 @@ impl PrioritizationFeesHeap { for p in cells_to_remove { write_lock.map.remove(&p); } + let signatures_len = signatures_to_remove.len(); for sig in signatures_to_remove { write_lock.signatures.remove(&sig); } + signatures_len } pub async fn size(&self) -> usize { diff --git a/run_clippy_fmt.sh b/run_clippy_fmt.sh index ed379a8b..4b1c613b 100755 --- a/run_clippy_fmt.sh +++ b/run_clippy_fmt.sh @@ -1,2 +1,2 @@ -cargo +nightly-2024-01-05 fmt --all -cargo +nightly-2024-01-05 clippy --locked --workspace --all-targets -- -D warnings \ No newline at end of file +cargo +nightly-2023-10-05 fmt --all +cargo +nightly-2023-10-05 clippy --locked --workspace --all-targets -- -D warnings \ No newline at end of file diff --git a/services/src/quic_connection.rs b/services/src/quic_connection.rs index 82e68202..3678307f 100644 --- a/services/src/quic_connection.rs +++ b/services/src/quic_connection.rs @@ -14,7 +14,7 @@ use std::{ Arc, }, }; -use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore}; +use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore}; pub type EndpointPool = RotatingQueue; @@ -40,7 +40,7 @@ pub struct QuicConnection { identity: Pubkey, socket_address: SocketAddr, connection_params: QuicConnectionParameters, - exit_signal: Arc, + exit_notify: Arc, timeout_counters: Arc, has_connected_once: Arc, } @@ -51,7 +51,7 @@ impl QuicConnection { endpoint: Endpoint, socket_address: SocketAddr, connection_params: QuicConnectionParameters, - exit_signal: Arc, + exit_notify: Arc, ) -> Self { Self { connection: Arc::new(RwLock::new(None)), @@ -60,7 +60,7 @@ impl QuicConnection { identity, socket_address, connection_params, - exit_signal, + exit_notify, timeout_counters: Arc::new(AtomicU64::new(0)), has_connected_once: Arc::new(AtomicBool::new(false)), } @@ -74,7 +74,7 @@ impl QuicConnection { self.socket_address, self.connection_params.connection_timeout, self.connection_params.connection_retry_count, - self.exit_signal.clone(), + self.exit_notify.clone(), ) .await } @@ -127,32 +127,48 @@ impl QuicConnection { pub async fn send_transaction(&self, tx: Vec) { let connection_retry_count = self.connection_params.connection_retry_count; for _ in 0..connection_retry_count { - if self.exit_signal.load(Ordering::Relaxed) { - // return - return; - } - let mut do_retry = false; - let connection = self.get_connection().await; + let exit_notify = self.exit_notify.clone(); + + let connection = tokio::select! { + conn = self.get_connection() => { + conn + }, + _ = exit_notify.notified() => { + break; + } + }; if let Some(connection) = connection { TRIED_SEND_TRANSCTION_TRIED.inc(); let current_stable_id = connection.stable_id() as u64; - match QuicConnectionUtils::open_unistream( - connection, - self.connection_params.unistream_timeout, - ) - .await - { + let open_uni_result = tokio::select! { + res = QuicConnectionUtils::open_unistream( + connection, + self.connection_params.unistream_timeout, + ) => { + res + }, + _ = exit_notify.notified() => { + break; + } + }; + match open_uni_result { Ok(send_stream) => { - match QuicConnectionUtils::write_all( - send_stream, - &tx, - self.identity, - self.connection_params, - ) - .await - { + let write_add_result = tokio::select! { + res = QuicConnectionUtils::write_all( + send_stream, + &tx, + self.identity, + self.connection_params, + ) => { + res + }, + _ = exit_notify.notified() => { + break; + } + }; + match write_add_result { Ok(()) => { SEND_TRANSCTION_SUCESSFUL.inc(); } @@ -231,7 +247,7 @@ impl QuicConnectionPool { endpoints: EndpointPool, socket_address: SocketAddr, connection_parameters: QuicConnectionParameters, - exit_signal: Arc, + exit_notify: Arc, nb_connection: usize, max_number_of_unistream_connection: usize, ) -> Self { @@ -243,7 +259,7 @@ impl QuicConnectionPool { endpoints.get().expect("Should get and endpoint"), socket_address, connection_parameters, - exit_signal.clone(), + exit_notify.clone(), )); } Self { diff --git a/services/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs index dc864610..cc3a1da0 100644 --- a/services/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -11,13 +11,10 @@ use solana_lite_rpc_core::network_utils::apply_gso_workaround; use solana_sdk::pubkey::Pubkey; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::Arc, time::Duration, }; -use tokio::time::timeout; +use tokio::{sync::Notify, time::timeout}; lazy_static::lazy_static! { static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge = @@ -222,15 +219,29 @@ impl QuicConnectionUtils { addr: SocketAddr, connection_timeout: Duration, connection_retry_count: usize, - exit_signal: Arc, + exit_notified: Arc, ) -> Option { for _ in 0..connection_retry_count { let conn = if already_connected { NB_QUIC_0RTT_ATTEMPTED.inc(); - Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout).await + tokio::select! { + res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => { + res + }, + _ = exit_notified.notified() => { + break; + } + } } else { NB_QUIC_CONN_ATTEMPTED.inc(); - Self::make_connection(endpoint.clone(), addr, connection_timeout).await + tokio::select! { + res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => { + res + }, + _ = exit_notified.notified() => { + break; + } + } }; match conn { Ok(conn) => { @@ -239,9 +250,6 @@ impl QuicConnectionUtils { } Err(e) => { trace!("Could not connect to {} because of error {}", identity, e); - if exit_signal.load(Ordering::Relaxed) { - break; - } } } } diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 9200c60b..44cae9c0 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -13,15 +13,7 @@ use solana_lite_rpc_core::{ }; use solana_sdk::pubkey::Pubkey; use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams; -use std::{ - collections::HashMap, - net::SocketAddr, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::Duration, -}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; use tokio::sync::{ broadcast::{Receiver, Sender}, Notify, @@ -54,9 +46,9 @@ struct ActiveConnection { endpoints: RotatingQueue, identity: Pubkey, tpu_address: SocketAddr, - exit_signal: Arc, data_cache: DataCache, connection_parameters: QuicConnectionParameters, + exit_notifier: Arc, } impl ActiveConnection { @@ -71,9 +63,9 @@ impl ActiveConnection { endpoints, tpu_address, identity, - exit_signal: Arc::new(AtomicBool::new(false)), data_cache, connection_parameters, + exit_notifier: Arc::new(Notify::new()), } } @@ -81,13 +73,13 @@ impl ActiveConnection { async fn listen( &self, mut transaction_reciever: Receiver, - exit_notifier: Arc, addr: SocketAddr, identity_stakes: IdentityStakesData, ) { let fill_notify = Arc::new(Notify::new()); let identity = self.identity; + let exit_notifier = self.exit_notifier.clone(); NB_QUIC_ACTIVE_CONNECTIONS.inc(); @@ -98,13 +90,12 @@ impl ActiveConnection { identity_stakes.stakes, identity_stakes.total_stakes, ); - let exit_signal = self.exit_signal.clone(); let connection_pool = QuicConnectionPool::new( identity, self.endpoints.clone(), addr, self.connection_parameters, - exit_signal.clone(), + exit_notifier.clone(), max_number_of_connections, max_uni_stream_connections, ); @@ -115,12 +106,19 @@ impl ActiveConnection { let priorization_heap = priorization_heap.clone(); let data_cache = self.data_cache.clone(); let fill_notify = fill_notify.clone(); - let exit_signal = exit_signal.clone(); + let exit_notifier = exit_notifier.clone(); tokio::spawn(async move { let mut current_blockheight = data_cache.block_information_store.get_last_blockheight(); - while !exit_signal.load(Ordering::Relaxed) { - let tx = transaction_reciever.recv().await; + loop { + let tx = tokio::select! { + tx = transaction_reciever.recv() => { + tx + }, + _ = exit_notifier.notified() => { + break; + } + }; match tx { Ok(transaction_sent_info) => { if data_cache @@ -142,9 +140,10 @@ impl ActiveConnection { // give more priority to transaction sender tokio::time::sleep(Duration::from_micros(50)).await; // remove all expired transactions from the queue - priorization_heap + let elements_removed = priorization_heap .remove_expired_transactions(current_blockheight) .await; + TRANSACTIONS_IN_HEAP.sub(elements_removed as i64); } } Err(e) => { @@ -167,26 +166,16 @@ impl ActiveConnection { let _permit = permit; connection.get_connection().await; }); - } + }; 'main_loop: loop { - // exit signal set - if exit_signal.load(Ordering::Relaxed) { - break; - } - tokio::select! { _ = fill_notify.notified() => { - loop { - // exit signal set - if exit_signal.load(Ordering::Relaxed) { - break 'main_loop; - } - + 'process_heap: loop { let Some(tx) = priorization_heap.pop().await else { // wait to get notification from fill event - break; + break 'process_heap; }; TRANSACTIONS_IN_HEAP.dec(); @@ -225,7 +214,7 @@ impl ActiveConnection { } } - heap_filler_task.abort(); + let _ = heap_filler_task.await; let elements_removed = priorization_heap.clear().await; TRANSACTIONS_IN_HEAP.sub(elements_removed as i64); NB_QUIC_ACTIVE_CONNECTIONS.dec(); @@ -234,26 +223,20 @@ impl ActiveConnection { pub fn start_listening( &self, transaction_reciever: Receiver, - exit_notifier: Arc, identity_stakes: IdentityStakesData, ) { let addr = self.tpu_address; let this = self.clone(); tokio::spawn(async move { - this.listen(transaction_reciever, exit_notifier, addr, identity_stakes) + this.listen(transaction_reciever, addr, identity_stakes) .await; }); } } -struct ActiveConnectionWithExitNotifier { - pub active_connection: ActiveConnection, - pub exit_notifier: Arc, -} - pub struct TpuConnectionManager { endpoints: RotatingQueue, - identity_to_active_connection: Arc>>, + identity_to_active_connection: Arc>, } impl TpuConnectionManager { @@ -291,21 +274,10 @@ impl TpuConnectionManager { connection_parameters, ); // using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever - let exit_notifier = Arc::new(Notify::new()); - let broadcast_receiver = broadcast_sender.subscribe(); - active_connection.start_listening( - broadcast_receiver, - exit_notifier.clone(), - identity_stakes, - ); - self.identity_to_active_connection.insert( - *identity, - Arc::new(ActiveConnectionWithExitNotifier { - active_connection, - exit_notifier, - }), - ); + active_connection.start_listening(broadcast_receiver, identity_stakes); + self.identity_to_active_connection + .insert(*identity, active_connection); } } @@ -314,11 +286,7 @@ impl TpuConnectionManager { if !connections_to_keep.contains_key(key) { trace!("removing a connection for {}", key.to_string()); // ignore error for exit channel - value - .active_connection - .exit_signal - .store(true, Ordering::Relaxed); - value.exit_notifier.notify_one(); + value.exit_notifier.notify_waiters(); false } else { true