Skip to content

Commit

Permalink
Add logs on block queues and reception time
Browse files Browse the repository at this point in the history
  • Loading branch information
ckamm committed Mar 22, 2024
1 parent 9de7b62 commit 5dbc06c
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 6 deletions.
21 changes: 15 additions & 6 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};

use itertools::Itertools;
use log::{debug, info, trace, warn};
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::time::Duration;
use std::time::{Duration, Instant};
use tokio::sync::broadcast::Receiver;
use tokio::sync::mpsc::UnboundedSender;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;

Expand Down Expand Up @@ -54,7 +54,7 @@ impl FromYellowstoneExtractor for BlockMetaHashExtractor {

fn create_grpc_multiplex_processed_block_stream(
grpc_sources: &Vec<GrpcSourceConfig>,
processed_block_sender: UnboundedSender<ProducedBlock>,
processed_block_sender: async_channel::Sender<ProducedBlock>,
) -> Vec<AnyhowJoinHandle> {
let commitment_config = CommitmentConfig::processed();

Expand All @@ -71,10 +71,15 @@ fn create_grpc_multiplex_processed_block_stream(
streams.push(block_reciever)
}
let merging_streams: AnyhowJoinHandle = tokio::task::spawn(async move {
const MAX_SIZE: usize = 1024;
let mut slots_processed = BTreeSet::<u64>::new();
let mut last_metrics = Instant::now();
loop {
let block_message = futures::stream::select_all(streams.clone()).next().await;
const MAX_SIZE: usize = 1024;
if last_metrics.elapsed() > Duration::from_secs(10) {
last_metrics = Instant::now();
info!("merging block streams: queue length {:?}", streams.iter().map(|s| s.len()).collect_vec());
}
if let Some(block) = block_message {
let slot = block.slot;
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
Expand All @@ -85,6 +90,7 @@ fn create_grpc_multiplex_processed_block_stream(
{
processed_block_sender
.send(from_grpc_block_update(block, commitment_config))
.await
.context("Issue to send confirmed block")?;
slots_processed.insert(slot);
if slots_processed.len() > MAX_SIZE {
Expand Down Expand Up @@ -133,8 +139,8 @@ pub fn create_grpc_multiplex_blocks_subscription(
let jh_block_emitter_task = {
tokio::task::spawn(async move {
loop {
let (processed_block_sender, mut processed_block_reciever) =
tokio::sync::mpsc::unbounded_channel::<ProducedBlock>();
let (processed_block_sender, processed_block_reciever) =
async_channel::unbounded::<ProducedBlock>();

let processed_blocks_tasks = create_grpc_multiplex_processed_block_stream(
&grpc_sources,
Expand Down Expand Up @@ -232,6 +238,9 @@ pub fn create_grpc_multiplex_blocks_subscription(
log::error!("block or block meta stream stopped restaring blocks");
break;
}

info!("processed block receiver queue length: {}", processed_block_reciever.len());

cleanup_without_recv_blocks += 1;
cleanup_without_finalized_recv_blocks_meta += 1;
cleanup_without_confirmed_recv_blocks_meta += 1;
Expand Down
1 change: 1 addition & 0 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ pub fn create_block_processing_task(

match update {
UpdateOneof::Block(block) => {
log::info!("received block, hash: {} slot: {}", block.blockhash, block.slot);
block_sx
.send(block)
.await
Expand Down

0 comments on commit 5dbc06c

Please sign in to comment.