diff --git a/cluster-endpoints/src/grpc_multiplex.rs b/cluster-endpoints/src/grpc_multiplex.rs index 71457867..8265f2e3 100644 --- a/cluster-endpoints/src/grpc_multiplex.rs +++ b/cluster-endpoints/src/grpc_multiplex.rs @@ -13,7 +13,7 @@ use solana_lite_rpc_core::structures::slot_notification::SlotNotification; use solana_lite_rpc_core::AnyhowJoinHandle; use solana_sdk::clock::Slot; use solana_sdk::commitment_config::CommitmentConfig; -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use std::time::Duration; use tokio::sync::broadcast::Receiver; use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; @@ -87,15 +87,27 @@ pub fn create_grpc_multiplex_blocks_subscription( streams.push(block_reciever) } let merging_streams: AnyhowJoinHandle = tokio::task::spawn(async move { - let last_slot = 0; + let mut slots_processed = BTreeSet::::new(); loop { let block_message = futures::stream::select_all(streams.clone()).next().await; + const MAX_SIZE: usize = 1024; if let Some(block) = block_message { - if block.slot > last_slot { + let slot = block.slot; + // check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value + // it means that the slot is too old to process + if !slots_processed.contains(&slot) + && (slots_processed.len() < MAX_SIZE / 2 + || slot + > slots_processed.first().cloned().unwrap_or_default()) + { confirmed_block_sender .send(map_block_update(block, commitment_config)) .context("Issue to send confirmed block")?; + slots_processed.insert(slot); + if slots_processed.len() > MAX_SIZE { + slots_processed.pop_first(); + } } } }