Skip to content

Commit

Permalink
Fixing merging of confirmed block streams (#276)
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus authored Jan 16, 2024
1 parent 6f61d89 commit 36c6276
Showing 1 changed file with 15 additions and 3 deletions.
18 changes: 15 additions & 3 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap};
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
Expand Down Expand Up @@ -87,15 +87,27 @@ pub fn create_grpc_multiplex_blocks_subscription(
streams.push(block_reciever)
}
let merging_streams: AnyhowJoinHandle = tokio::task::spawn(async move {
let last_slot = 0;
let mut slots_processed = BTreeSet::<u64>::new();
loop {
let block_message =
futures::stream::select_all(streams.clone()).next().await;
const MAX_SIZE: usize = 1024;
if let Some(block) = block_message {
if block.slot > last_slot {
let slot = block.slot;
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
// it means that the slot is too old to process
if !slots_processed.contains(&slot)
&& (slots_processed.len() < MAX_SIZE / 2
|| slot
> slots_processed.first().cloned().unwrap_or_default())
{
confirmed_block_sender
.send(map_block_update(block, commitment_config))
.context("Issue to send confirmed block")?;
slots_processed.insert(slot);
if slots_processed.len() > MAX_SIZE {
slots_processed.pop_first();
}
}
}
}
Expand Down

0 comments on commit 36c6276

Please sign in to comment.