Skip to content

Commit

Permalink
Minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Mar 28, 2024
1 parent 44aeb79 commit df095e1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 7 deletions.
14 changes: 11 additions & 3 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,22 @@ fn create_grpc_multiplex_processed_block_stream(
block_sender,
yellowstone_grpc_proto::geyser::CommitmentLevel::Processed,
exit_notfier.clone(),
do_exit.clone(),
));
streams.push(block_reciever)
}
let merging_streams: AnyhowJoinHandle = tokio::task::spawn(async move {
const MAX_SIZE: usize = 1024;
let mut slots_processed = BTreeSet::<u64>::new();
while !do_exit.load(std::sync::atomic::Ordering::Relaxed) {
let block_message = futures::stream::select_all(streams.clone()).next().await;
let mut select_all = futures::stream::select_all(streams.clone());
let block_message = tokio::select! {
message = select_all.next() => {
message
},
_ = exit_notfier.notified() => {
break;
}
};
if let Some(block) = block_message {
let slot = block.slot;
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
Expand Down Expand Up @@ -251,8 +258,9 @@ pub fn create_grpc_multiplex_blocks_subscription(
}
}
}
do_exit.store(true, std::sync::atomic::Ordering::Relaxed);
exit_notify.notify_waiters();
exit_notify.notify_one();
do_exit.store(true, std::sync::atomic::Ordering::Relaxed);
futures::future::join_all(processed_blocks_tasks).await;
}
})
Expand Down
6 changes: 2 additions & 4 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use solana_sdk::{
transaction::TransactionError,
};
use solana_transaction_status::{Reward, RewardType};
use std::sync::atomic::AtomicBool;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Notify;
use yellowstone_grpc_client::GeyserGrpcClient;
Expand Down Expand Up @@ -300,10 +299,9 @@ pub fn create_block_processing_task(
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
exit_notfier: Arc<Notify>,
do_exit: Arc<AtomicBool>,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
'main_loop: while !do_exit.load(std::sync::atomic::Ordering::Relaxed) {
'main_loop: loop {
let mut blocks_subs = HashMap::new();
blocks_subs.insert(
"block_client".to_string(),
Expand Down Expand Up @@ -332,7 +330,7 @@ pub fn create_block_processing_task(
)
.await?;

while !do_exit.load(std::sync::atomic::Ordering::Relaxed) {
loop {
tokio::select! {
message = stream.next() => {
let Some(Ok(message)) = message else {
Expand Down

0 comments on commit df095e1

Please sign in to comment.