diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs index 0e200aa1..450eb44b 100644 --- a/cluster-endpoints/src/grpc_multiplex.rs +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -10,6 +10,7 @@ use geyser_grpc_connector::grpcmultiplex_fastestwins::{ create_multiplexed_stream, FromYellowstoneExtractor, }; +use itertools::Itertools; use log::{debug, info, trace, warn}; use solana_lite_rpc_core::structures::produced_block::ProducedBlock; use solana_lite_rpc_core::structures::slot_notification::SlotNotification; @@ -17,9 +18,8 @@ use solana_lite_rpc_core::AnyhowJoinHandle; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; use std::collections::{BTreeSet, HashMap, HashSet}; -use std::time::Duration; +use std::time::{Duration, Instant}; use tokio::sync::broadcast::Receiver; -use tokio::sync::mpsc::UnboundedSender; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; use yellowstone_grpc_proto::geyser::SubscribeUpdate; @@ -54,7 +54,7 @@ impl FromYellowstoneExtractor for BlockMetaHashExtractor { fn create_grpc_multiplex_processed_block_stream( grpc_sources: &Vec, - processed_block_sender: UnboundedSender, + processed_block_sender: async_channel::Sender, ) -> Vec { let commitment_config = CommitmentConfig::processed(); @@ -71,10 +71,15 @@ fn create_grpc_multiplex_processed_block_stream( streams.push(block_reciever) } let merging_streams: AnyhowJoinHandle = tokio::task::spawn(async move { + const MAX_SIZE: usize = 1024; let mut slots_processed = BTreeSet::::new(); + let mut last_metrics = Instant::now(); loop { let block_message = futures::stream::select_all(streams.clone()).next().await; - const MAX_SIZE: usize = 1024; + if last_metrics.elapsed() > Duration::from_secs(10) { + last_metrics = Instant::now(); + info!("merging block streams: queue length {:?}", streams.iter().map(|s| s.len()).collect_vec()); + } if let Some(block) = block_message { let slot = block.slot; // check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value @@ -85,6 +90,7 @@ fn create_grpc_multiplex_processed_block_stream( { processed_block_sender .send(from_grpc_block_update(block, commitment_config)) + .await .context("Issue to send confirmed block")?; slots_processed.insert(slot); if slots_processed.len() > MAX_SIZE { @@ -133,8 +139,8 @@ pub fn create_grpc_multiplex_blocks_subscription( let jh_block_emitter_task = { tokio::task::spawn(async move { loop { - let (processed_block_sender, mut processed_block_reciever) = - tokio::sync::mpsc::unbounded_channel::(); + let (processed_block_sender, processed_block_reciever) = + async_channel::unbounded::(); let processed_blocks_tasks = create_grpc_multiplex_processed_block_stream( &grpc_sources, @@ -232,6 +238,9 @@ pub fn create_grpc_multiplex_blocks_subscription( log::error!("block or block meta stream stopped restaring blocks"); break; } + + info!("processed block receiver queue length: {}", processed_block_reciever.len()); + cleanup_without_recv_blocks += 1; cleanup_without_finalized_recv_blocks_meta += 1; cleanup_without_confirmed_recv_blocks_meta += 1; diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index 8c56ac09..c536db19 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -298,6 +298,7 @@ pub fn create_block_processing_task( match update { UpdateOneof::Block(block) => { + log::info!("received block, hash: {} slot: {}", block.blockhash, block.slot); block_sx .send(block) .await