Skip to content

Commit

Permalink
Replacing notify by cancellation token to correctly shutdown tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Mar 27, 2024
1 parent 26222f9 commit 945dab5
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 24 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
jemallocator = "0.5"
tokio-util = "0.7.10"

quinn = "0.10.2"
quinn-proto = "0.10.5"
Expand Down
1 change: 1 addition & 0 deletions cluster-endpoints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,4 @@ itertools = {workspace = true}
prometheus = { workspace = true }
lazy_static = { workspace = true }
tonic-health = { workspace = true }
tokio-util = { workspace = true }
9 changes: 4 additions & 5 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;

Expand Down Expand Up @@ -55,7 +54,7 @@ impl FromYellowstoneExtractor for BlockMetaHashExtractor {
fn create_grpc_multiplex_processed_block_stream(
grpc_sources: &Vec<GrpcSourceConfig>,
processed_block_sender: async_channel::Sender<ProducedBlock>,
exit_notfier: Arc<Notify>,
exit_notfier: CancellationToken,
) -> Vec<AnyhowJoinHandle> {
let commitment_config = CommitmentConfig::processed();

Expand Down Expand Up @@ -139,7 +138,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
let (processed_block_sender, processed_block_reciever) =
async_channel::unbounded::<ProducedBlock>();

let exit_notify = Arc::new(Notify::new());
let exit_notify = CancellationToken::new();
let processed_blocks_tasks = create_grpc_multiplex_processed_block_stream(
&grpc_sources,
processed_block_sender,
Expand Down Expand Up @@ -245,7 +244,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
}
exit_notify.notify_waiters();
exit_notify.cancel();
futures::future::join_all(processed_blocks_tasks).await;
}
})
Expand Down
6 changes: 3 additions & 3 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use solana_sdk::{
};
use solana_transaction_status::{Reward, RewardType};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Notify;
use tokio_util::sync::CancellationToken;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::{SubscribeRequestFilterSlots, SubscribeUpdateSlot};

Expand Down Expand Up @@ -298,7 +298,7 @@ pub fn create_block_processing_task(
grpc_x_token: Option<String>,
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
exit_notfier: Arc<Notify>,
exit_notfier: CancellationToken,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -361,7 +361,7 @@ pub fn create_block_processing_task(
}
};
},
_ = exit_notfier.notified() => {
_ = exit_notfier.cancelled() => {
break;
}
}
Expand Down
1 change: 1 addition & 0 deletions services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ quinn = { workspace = true }
chrono = { workspace = true }
rustls = { workspace = true }
solana-lite-rpc-core = { workspace = true }
tokio-util = { workspace = true }

[dev-dependencies]
tracing = { workspace = true }
Expand Down
15 changes: 8 additions & 7 deletions services/src/quic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use std::{
Arc,
},
};
use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};
use tokio_util::sync::CancellationToken;

pub type EndpointPool = RotatingQueue<Endpoint>;

Expand All @@ -40,7 +41,7 @@ pub struct QuicConnection {
identity: Pubkey,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: Arc<Notify>,
exit_notify: CancellationToken,
timeout_counters: Arc<AtomicU64>,
has_connected_once: Arc<AtomicBool>,
}
Expand All @@ -51,7 +52,7 @@ impl QuicConnection {
endpoint: Endpoint,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: Arc<Notify>,
exit_notify: CancellationToken,
) -> Self {
Self {
connection: Arc::new(RwLock::new(None)),
Expand Down Expand Up @@ -134,7 +135,7 @@ impl QuicConnection {
conn = self.get_connection() => {
conn
},
_ = exit_notify.notified() => {
_ = exit_notify.cancelled() => {
break;
}
};
Expand All @@ -149,7 +150,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.cancelled() => {
break;
}
};
Expand All @@ -164,7 +165,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.notified() => {
_ = exit_notify.cancelled() => {
break;
}
};
Expand Down Expand Up @@ -247,7 +248,7 @@ impl QuicConnectionPool {
endpoints: EndpointPool,
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
exit_notify: Arc<Notify>,
exit_notify: CancellationToken,
nb_connection: usize,
max_number_of_unistream_connection: usize,
) -> Self {
Expand Down
9 changes: 5 additions & 4 deletions services/src/quic_connection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::{sync::Notify, time::timeout};
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;

lazy_static::lazy_static! {
static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
Expand Down Expand Up @@ -219,7 +220,7 @@ impl QuicConnectionUtils {
addr: SocketAddr,
connection_timeout: Duration,
connection_retry_count: usize,
exit_notified: Arc<Notify>,
exit_notified: CancellationToken,
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
Expand All @@ -228,7 +229,7 @@ impl QuicConnectionUtils {
res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
_ = exit_notified.cancelled() => {
break;
}
}
Expand All @@ -238,7 +239,7 @@ impl QuicConnectionUtils {
res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.notified() => {
_ = exit_notified.cancelled() => {
break;
}
}
Expand Down
11 changes: 6 additions & 5 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tokio::sync::{
broadcast::{Receiver, Sender},
Notify,
};
use tokio_util::sync::CancellationToken;

use crate::{
quic_connection::{PooledConnection, QuicConnectionPool},
Expand Down Expand Up @@ -49,7 +50,7 @@ struct ActiveConnection {
tpu_address: SocketAddr,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
exit_notifier: Arc<Notify>,
exit_notifier: CancellationToken,
}

impl ActiveConnection {
Expand All @@ -66,7 +67,7 @@ impl ActiveConnection {
identity,
data_cache,
connection_parameters,
exit_notifier: Arc::new(Notify::new()),
exit_notifier: CancellationToken::new(),
}
}

Expand Down Expand Up @@ -116,7 +117,7 @@ impl ActiveConnection {
tx = transaction_reciever.recv() => {
tx
},
_ = exit_notifier.notified() => {
_ = exit_notifier.cancelled() => {
break;
}
};
Expand Down Expand Up @@ -209,7 +210,7 @@ impl ActiveConnection {
});
}
},
_ = exit_notifier.notified() => {
_ = exit_notifier.cancelled() => {
break 'main_loop;
}
}
Expand Down Expand Up @@ -287,7 +288,7 @@ impl TpuConnectionManager {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value.exit_notifier.notify_waiters();
value.exit_notifier.cancel();
false
} else {
true
Expand Down

0 comments on commit 945dab5

Please sign in to comment.