Skip to content

Commit

Permalink
Revert "Replacing notify by cancellation token to correctly shutdown …
Browse files Browse the repository at this point in the history
…tasks"

This reverts commit 945dab5.
  • Loading branch information
godmodegalactus committed Mar 28, 2024
1 parent 945dab5 commit 3773319
Show file tree
Hide file tree
Showing 9 changed files with 24 additions and 31 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
jemallocator = "0.5"
tokio-util = "0.7.10"

quinn = "0.10.2"
quinn-proto = "0.10.5"
Expand Down
1 change: 0 additions & 1 deletion cluster-endpoints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,3 @@ itertools = {workspace = true}
prometheus = { workspace = true }
lazy_static = { workspace = true }
tonic-health = { workspace = true }
tokio-util = { workspace = true }
9 changes: 5 additions & 4 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use tokio_util::sync::CancellationToken;
use tokio::sync::Notify;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;

Expand Down Expand Up @@ -54,7 +55,7 @@ impl FromYellowstoneExtractor for BlockMetaHashExtractor {
fn create_grpc_multiplex_processed_block_stream(
grpc_sources: &Vec<GrpcSourceConfig>,
processed_block_sender: async_channel::Sender<ProducedBlock>,
exit_notfier: CancellationToken,
exit_notfier: Arc<Notify>,
) -> Vec<AnyhowJoinHandle> {
let commitment_config = CommitmentConfig::processed();

Expand Down Expand Up @@ -138,7 +139,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
let (processed_block_sender, processed_block_reciever) =
async_channel::unbounded::<ProducedBlock>();

let exit_notify = CancellationToken::new();
let exit_notify = Arc::new(Notify::new());
let processed_blocks_tasks = create_grpc_multiplex_processed_block_stream(
&grpc_sources,
processed_block_sender,
Expand Down Expand Up @@ -244,7 +245,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
}
exit_notify.cancel();
exit_notify.notify_waiters();
futures::future::join_all(processed_blocks_tasks).await;
}
})
Expand Down
6 changes: 3 additions & 3 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use solana_sdk::{
};
use solana_transaction_status::{Reward, RewardType};
use std::{collections::HashMap, sync::Arc};
use tokio_util::sync::CancellationToken;
use tokio::sync::Notify;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::{SubscribeRequestFilterSlots, SubscribeUpdateSlot};

Expand Down Expand Up @@ -298,7 +298,7 @@ pub fn create_block_processing_task(
grpc_x_token: Option<String>,
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
exit_notfier: CancellationToken,
exit_notfier: Arc<Notify>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -361,7 +361,7 @@ pub fn create_block_processing_task(
}
};
},
_ = exit_notfier.cancelled() => {
_ = exit_notfier.notified() => {
break;
}
}
Expand Down
1 change: 0 additions & 1 deletion services/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ quinn = { workspace = true }
chrono = { workspace = true }
rustls = { workspace = true }
solana-lite-rpc-core = { workspace = true }
tokio-util = { workspace = true }

[dev-dependencies]
tracing = { workspace = true }
Expand Down
15 changes: 7 additions & 8 deletions services/src/quic_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use std::{
Arc,
},
};
use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};
use tokio_util::sync::CancellationToken;
use tokio::sync::{Notify, OwnedSemaphorePermit, RwLock, Semaphore};

pub type EndpointPool = RotatingQueue<Endpoint>;

Expand All @@ -41,7 +40,7 @@ pub struct QuicConnection {
identity: Pubkey,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: CancellationToken,
exit_notify: Arc<Notify>,
timeout_counters: Arc<AtomicU64>,
has_connected_once: Arc<AtomicBool>,
}
Expand All @@ -52,7 +51,7 @@ impl QuicConnection {
endpoint: Endpoint,
socket_address: SocketAddr,
connection_params: QuicConnectionParameters,
exit_notify: CancellationToken,
exit_notify: Arc<Notify>,
) -> Self {
Self {
connection: Arc::new(RwLock::new(None)),
Expand Down Expand Up @@ -135,7 +134,7 @@ impl QuicConnection {
conn = self.get_connection() => {
conn
},
_ = exit_notify.cancelled() => {
_ = exit_notify.notified() => {
break;
}
};
Expand All @@ -150,7 +149,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.cancelled() => {
_ = exit_notify.notified() => {
break;
}
};
Expand All @@ -165,7 +164,7 @@ impl QuicConnection {
) => {
res
},
_ = exit_notify.cancelled() => {
_ = exit_notify.notified() => {
break;
}
};
Expand Down Expand Up @@ -248,7 +247,7 @@ impl QuicConnectionPool {
endpoints: EndpointPool,
socket_address: SocketAddr,
connection_parameters: QuicConnectionParameters,
exit_notify: CancellationToken,
exit_notify: Arc<Notify>,
nb_connection: usize,
max_number_of_unistream_connection: usize,
) -> Self {
Expand Down
9 changes: 4 additions & 5 deletions services/src/quic_connection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use std::{
sync::Arc,
time::Duration,
};
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;
use tokio::{sync::Notify, time::timeout};

lazy_static::lazy_static! {
static ref NB_QUIC_0RTT_ATTEMPTED: GenericGauge<prometheus::core::AtomicI64> =
Expand Down Expand Up @@ -220,7 +219,7 @@ impl QuicConnectionUtils {
addr: SocketAddr,
connection_timeout: Duration,
connection_retry_count: usize,
exit_notified: CancellationToken,
exit_notified: Arc<Notify>,
) -> Option<Connection> {
for _ in 0..connection_retry_count {
let conn = if already_connected {
Expand All @@ -229,7 +228,7 @@ impl QuicConnectionUtils {
res = Self::make_connection_0rtt(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.cancelled() => {
_ = exit_notified.notified() => {
break;
}
}
Expand All @@ -239,7 +238,7 @@ impl QuicConnectionUtils {
res = Self::make_connection(endpoint.clone(), addr, connection_timeout) => {
res
},
_ = exit_notified.cancelled() => {
_ = exit_notified.notified() => {
break;
}
}
Expand Down
11 changes: 5 additions & 6 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use tokio::sync::{
broadcast::{Receiver, Sender},
Notify,
};
use tokio_util::sync::CancellationToken;

use crate::{
quic_connection::{PooledConnection, QuicConnectionPool},
Expand Down Expand Up @@ -50,7 +49,7 @@ struct ActiveConnection {
tpu_address: SocketAddr,
data_cache: DataCache,
connection_parameters: QuicConnectionParameters,
exit_notifier: CancellationToken,
exit_notifier: Arc<Notify>,
}

impl ActiveConnection {
Expand All @@ -67,7 +66,7 @@ impl ActiveConnection {
identity,
data_cache,
connection_parameters,
exit_notifier: CancellationToken::new(),
exit_notifier: Arc::new(Notify::new()),
}
}

Expand Down Expand Up @@ -117,7 +116,7 @@ impl ActiveConnection {
tx = transaction_reciever.recv() => {
tx
},
_ = exit_notifier.cancelled() => {
_ = exit_notifier.notified() => {
break;
}
};
Expand Down Expand Up @@ -210,7 +209,7 @@ impl ActiveConnection {
});
}
},
_ = exit_notifier.cancelled() => {
_ = exit_notifier.notified() => {
break 'main_loop;
}
}
Expand Down Expand Up @@ -288,7 +287,7 @@ impl TpuConnectionManager {
if !connections_to_keep.contains_key(key) {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value.exit_notifier.cancel();
value.exit_notifier.notify_waiters();
false
} else {
true
Expand Down

0 comments on commit 3773319

Please sign in to comment.