diff --git a/Cargo.lock b/Cargo.lock index c258331e..62708df7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4422,7 +4422,6 @@ dependencies = [ "solana-version", "thiserror", "tokio", - "tokio-util", "tonic-health", "yellowstone-grpc-client", "yellowstone-grpc-proto", @@ -4611,7 +4610,6 @@ dependencies = [ "solana-version", "thiserror", "tokio", - "tokio-util", "tracing", "tracing-subscriber", ] diff --git a/Cargo.toml b/Cargo.toml index 3cf06d5d..4e8d4cdb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,7 +61,6 @@ lazy_static = "1.4.0" dotenv = "0.15.0" async-channel = "1.8.0" jemallocator = "0.5" -tokio-util = "0.7.10" quinn = "0.10.2" quinn-proto = "0.10.5" diff --git a/cluster-endpoints/Cargo.toml b/cluster-endpoints/Cargo.toml index 11688cf1..d6387cb7 100644 --- a/cluster-endpoints/Cargo.toml +++ b/cluster-endpoints/Cargo.toml @@ -45,4 +45,3 @@ itertools = {workspace = true} prometheus = { workspace = true } lazy_static = { workspace = true } tonic-health = { workspace = true } -tokio-util = { workspace = true } \ No newline at end of file diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs index b5c65cba..a0287f12 100644 --- a/cluster-endpoints/src/grpc_multiplex.rs +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -16,9 +16,10 @@ use solana_lite_rpc_core::AnyhowJoinHandle; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use std::collections::{BTreeSet, HashMap, HashSet}; +use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast::Receiver; -use tokio_util::sync::CancellationToken; +use tokio::sync::Notify; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; @@ -54,7 +55,7 @@ impl FromYellowstoneExtractor for BlockMetaHashExtractor { fn create_grpc_multiplex_processed_block_stream( grpc_sources: &Vec, processed_block_sender: async_channel::Sender, - exit_notfier: CancellationToken, + exit_notfier: Arc, ) -> Vec { let commitment_config = CommitmentConfig::processed(); @@ -138,7 +139,7 @@ pub fn create_grpc_multiplex_blocks_subscription( let (processed_block_sender, processed_block_reciever) = async_channel::unbounded::(); - let exit_notify = CancellationToken::new(); + let exit_notify = Arc::new(Notify::new()); let processed_blocks_tasks = create_grpc_multiplex_processed_block_stream( &grpc_sources, processed_block_sender, @@ -244,7 +245,7 @@ pub fn create_grpc_multiplex_blocks_subscription( } } } - exit_notify.cancel(); + exit_notify.notify_waiters(); futures::future::join_all(processed_blocks_tasks).await; } }) diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index 7d2d2ac6..2d00460a 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -33,7 +33,7 @@ use solana_sdk::{ }; use solana_transaction_status::{Reward, RewardType}; use std::{collections::HashMap, sync::Arc}; -use tokio_util::sync::CancellationToken; +use tokio::sync::Notify; use yellowstone_grpc_client::GeyserGrpcClient; use yellowstone_grpc_proto::geyser::{SubscribeRequestFilterSlots, SubscribeUpdateSlot}; @@ -298,7 +298,7 @@ pub fn create_block_processing_task( grpc_x_token: Option, block_sx: async_channel::Sender, commitment_level: CommitmentLevel, - exit_notfier: CancellationToken, + exit_notfier: Arc, ) -> AnyhowJoinHandle { tokio::spawn(async move { loop { @@ -361,7 +361,7 @@ pub fn create_block_processing_task( } }; }, - _ = exit_notfier.cancelled() => { + _ = exit_notfier.notified() => { break; } } diff --git a/services/Cargo.toml b/services/Cargo.toml index c6285eea..97b22b2d 100644 --- a/services/Cargo.toml +++ b/services/Cargo.toml @@ -39,7 +39,6 @@ quinn = { workspace = true } chrono = { workspace = true } rustls = { workspace = true } solana-lite-rpc-core = { workspace = true } -tokio-util = { workspace = true } [dev-dependencies] tracing = { workspace = true } diff --git a/services/src/quic_connection.rs b/services/src/quic_connection.rs index 8cca8b21..3678307f 100644 --- a/services/src/quic_connection.rs +++ b/services/src/quic_connection.rs @@ -14,8 +14,7 @@ use std::{ Arc, }, }; -use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore}; -use tokio_util::sync::CancellationToken; +use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore}; pub type EndpointPool = RotatingQueue; @@ -41,7 +40,7 @@ pub struct QuicConnection { identity: Pubkey, socket_address: SocketAddr, connection_params: QuicConnectionParameters, - exit_notify: CancellationToken, + exit_notify: Arc, timeout_counters: Arc, has_connected_once: Arc, } @@ -52,7 +51,7 @@ impl QuicConnection { endpoint: Endpoint, socket_address: SocketAddr, connection_params: QuicConnectionParameters, - exit_notify: CancellationToken, + exit_notify: Arc, ) -> Self { Self { connection: Arc::new(RwLock::new(None)), @@ -135,7 +134,7 @@ impl QuicConnection { conn = self.get_connection() => { conn }, - _ = exit_notify.cancelled() => { + _ = exit_notify.notified() => { break; } }; @@ -150,7 +149,7 @@ impl QuicConnection { ) => { res }, - _ = exit_notify.cancelled() => { + _ = exit_notify.notified() => { break; } }; @@ -165,7 +164,7 @@ impl QuicConnection { ) => { res }, - _ = exit_notify.cancelled() => { + _ = exit_notify.notified() => { break; } }; @@ -248,7 +247,7 @@ impl QuicConnectionPool { endpoints: EndpointPool, socket_address: SocketAddr, connection_parameters: QuicConnectionParameters, - exit_notify: CancellationToken, + exit_notify: Arc, nb_connection: usize, max_number_of_unistream_connection: usize, ) -> Self { diff --git a/services/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs index bec7539b..cc3a1da0 100644 --- a/services/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -14,8 +14,7 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::time::timeout; -use tokio_util::sync::CancellationToken; +use tokio::{sync::Notify, time::timeout}; lazy_static::lazy_static! { static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge = @@ -220,7 +219,7 @@ impl QuicConnectionUtils { addr: SocketAddr, connection_timeout: Duration, connection_retry_count: usize, - exit_notified: CancellationToken, + exit_notified: Arc, ) -> Option { for _ in 0..connection_retry_count { let conn = if already_connected { @@ -229,7 +228,7 @@ impl QuicConnectionUtils { res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => { res }, - _ = exit_notified.cancelled() => { + _ = exit_notified.notified() => { break; } } @@ -239,7 +238,7 @@ impl QuicConnectionUtils { res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => { res }, - _ = exit_notified.cancelled() => { + _ = exit_notified.notified() => { break; } } diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 88d74a09..9f10f0a4 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -18,7 +18,6 @@ use tokio::sync::{ broadcast::{Receiver, Sender}, Notify, }; -use tokio_util::sync::CancellationToken; use crate::{ quic_connection::{PooledConnection, QuicConnectionPool}, @@ -50,7 +49,7 @@ struct ActiveConnection { tpu_address: SocketAddr, data_cache: DataCache, connection_parameters: QuicConnectionParameters, - exit_notifier: CancellationToken, + exit_notifier: Arc, } impl ActiveConnection { @@ -67,7 +66,7 @@ impl ActiveConnection { identity, data_cache, connection_parameters, - exit_notifier: CancellationToken::new(), + exit_notifier: Arc::new(Notify::new()), } } @@ -117,7 +116,7 @@ impl ActiveConnection { tx = transaction_reciever.recv() => { tx }, - _ = exit_notifier.cancelled() => { + _ = exit_notifier.notified() => { break; } }; @@ -210,7 +209,7 @@ impl ActiveConnection { }); } }, - _ = exit_notifier.cancelled() => { + _ = exit_notifier.notified() => { break 'main_loop; } } @@ -288,7 +287,7 @@ impl TpuConnectionManager { if !connections_to_keep.contains_key(key) { trace!("removing a connection for {}", key.to_string()); // ignore error for exit channel - value.exit_notifier.cancel(); + value.exit_notifier.notify_waiters(); false } else { true