Skip to content

Commit

Permalink
Correctly exiting tasks with additional atomic bool variable
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Mar 28, 2024
1 parent 3773319 commit 44aeb79
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 10 deletions.
9 changes: 8 additions & 1 deletion cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
Expand Down Expand Up @@ -56,6 +57,7 @@ fn create_grpc_multiplex_processed_block_stream(
grpc_sources: &Vec<GrpcSourceConfig>,
processed_block_sender: async_channel::Sender<ProducedBlock>,
exit_notfier: Arc<Notify>,
do_exit: Arc<AtomicBool>,
) -> Vec<AnyhowJoinHandle> {
let commitment_config = CommitmentConfig::processed();

Expand All @@ -69,13 +71,14 @@ fn create_grpc_multiplex_processed_block_stream(
block_sender,
yellowstone_grpc_proto::geyser::CommitmentLevel::Processed,
exit_notfier.clone(),
do_exit.clone(),
));
streams.push(block_reciever)
}
let merging_streams: AnyhowJoinHandle = tokio::task::spawn(async move {
const MAX_SIZE: usize = 1024;
let mut slots_processed = BTreeSet::<u64>::new();
loop {
while !do_exit.load(std::sync::atomic::Ordering::Relaxed) {
let block_message = futures::stream::select_all(streams.clone()).next().await;
if let Some(block) = block_message {
let slot = block.slot;
Expand All @@ -96,6 +99,7 @@ fn create_grpc_multiplex_processed_block_stream(
}
}
}
Ok(())
});
tasks.push(merging_streams);
tasks
Expand Down Expand Up @@ -140,10 +144,12 @@ pub fn create_grpc_multiplex_blocks_subscription(
async_channel::unbounded::<ProducedBlock>();

let exit_notify = Arc::new(Notify::new());
let do_exit = Arc::new(AtomicBool::new(false));
let processed_blocks_tasks = create_grpc_multiplex_processed_block_stream(
&grpc_sources,
processed_block_sender,
exit_notify.clone(),
do_exit.clone(),
);

let confirmed_blockmeta_stream = create_grpc_multiplex_block_meta_stream(
Expand Down Expand Up @@ -245,6 +251,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
}
do_exit.store(true, std::sync::atomic::Ordering::Relaxed);
exit_notify.notify_waiters();
futures::future::join_all(processed_blocks_tasks).await;
}
Expand Down
9 changes: 6 additions & 3 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use solana_sdk::{
transaction::TransactionError,
};
use solana_transaction_status::{Reward, RewardType};
use std::sync::atomic::AtomicBool;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Notify;
use yellowstone_grpc_client::GeyserGrpcClient;
Expand Down Expand Up @@ -299,9 +300,10 @@ pub fn create_block_processing_task(
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
exit_notfier: Arc<Notify>,
do_exit: Arc<AtomicBool>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
loop {
'main_loop: while !do_exit.load(std::sync::atomic::Ordering::Relaxed) {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"block_client".to_string(),
Expand Down Expand Up @@ -330,7 +332,7 @@ pub fn create_block_processing_task(
)
.await?;

loop {
while !do_exit.load(std::sync::atomic::Ordering::Relaxed) {
tokio::select! {
message = stream.next() => {
let Some(Ok(message)) = message else {
Expand Down Expand Up @@ -362,7 +364,7 @@ pub fn create_block_processing_task(
};
},
_ = exit_notfier.notified() => {
break;
break 'main_loop;
}
}
}
Expand All @@ -371,6 +373,7 @@ pub fn create_block_processing_task(
log::error!("Grpc block subscription broken (resubscribing)");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
Ok(())
})
}

Expand Down
22 changes: 16 additions & 6 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ use solana_lite_rpc_core::{
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use std::{
collections::HashMap,
net::SocketAddr,
sync::{atomic::AtomicBool, Arc},
time::Duration,
};
use tokio::sync::{
broadcast::{Receiver, Sender},
Notify,
Expand Down Expand Up @@ -81,6 +86,8 @@ impl ActiveConnection {

let identity = self.identity;
let exit_notifier = self.exit_notifier.clone();
let exit_notifier_children = Arc::new(Notify::new());
let do_exit = Arc::new(AtomicBool::new(false));

NB_QUIC_ACTIVE_CONNECTIONS.inc();

Expand All @@ -96,7 +103,7 @@ impl ActiveConnection {
self.endpoints.clone(),
addr,
self.connection_parameters,
exit_notifier.clone(),
exit_notifier_children.clone(),
max_number_of_connections,
max_uni_stream_connections,
);
Expand All @@ -107,16 +114,17 @@ impl ActiveConnection {
let priorization_heap = priorization_heap.clone();
let data_cache = self.data_cache.clone();
let fill_notify = fill_notify.clone();
let exit_notifier = exit_notifier.clone();
let exit_notifier_children = exit_notifier_children.clone();
let do_exit = do_exit.clone();
tokio::spawn(async move {
let mut current_blockheight =
data_cache.block_information_store.get_last_blockheight();
loop {
while !do_exit.load(std::sync::atomic::Ordering::Relaxed) {
let tx = tokio::select! {
tx = transaction_reciever.recv() => {
tx
},
_ = exit_notifier.notified() => {
_ = exit_notifier_children.notified() => {
break;
}
};
Expand Down Expand Up @@ -214,7 +222,8 @@ impl ActiveConnection {
}
}
}

do_exit.store(true, std::sync::atomic::Ordering::Relaxed);
exit_notifier_children.notify_waiters();
let _ = heap_filler_task.await;
let elements_removed = priorization_heap.clear().await;
TRANSACTIONS_IN_HEAP.sub(elements_removed as i64);
Expand Down Expand Up @@ -288,6 +297,7 @@ impl TpuConnectionManager {
trace!("removing a connection for {}", key.to_string());
// ignore error for exit channel
value.exit_notifier.notify_waiters();
value.exit_notifier.notify_one();
false
} else {
true
Expand Down

0 comments on commit 44aeb79

Please sign in to comment.