Skip to content

Commit

Permalink
Fix block stream throughput problems
Browse files Browse the repository at this point in the history
By temporarily pasting in a function to connect to block streams via a
more generously configured endpoint.
  • Loading branch information
ckamm committed Mar 22, 2024
1 parent 088a2d8 commit a12f750
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 7 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,4 @@ solana-lite-rpc-block-priofees = {path = "block_priofees", version="0.2.4"}
async-trait = "0.1.68"
yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" }
tonic-health = "0.10"
1 change: 1 addition & 0 deletions cluster-endpoints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ yellowstone-grpc-proto = { workspace = true }
itertools = {workspace = true}
prometheus = { workspace = true }
lazy_static = { workspace = true }
tonic-health = { workspace = true }
11 changes: 6 additions & 5 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use crate::grpc_subscription::{
};
use anyhow::Context;
use futures::{Stream, StreamExt};
use geyser_grpc_connector::{
GeyserFilter, GrpcSourceConfig,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use geyser_grpc_connector::{GeyserFilter, GrpcSourceConfig};

use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use itertools::Itertools;
use log::{debug, info, trace, warn};
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
Expand All @@ -19,7 +18,6 @@ use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::time::{Duration, Instant};
use geyser_grpc_connector::grpc_subscription_autoreconnect_streams::create_geyser_reconnecting_stream;
use tokio::sync::broadcast::Receiver;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::SubscribeUpdate;
Expand Down Expand Up @@ -79,7 +77,10 @@ fn create_grpc_multiplex_processed_block_stream(
let block_message = futures::stream::select_all(streams.clone()).next().await;
if last_metrics.elapsed() > Duration::from_secs(10) {
last_metrics = Instant::now();
info!("merging block streams: queue length {:?}", streams.iter().map(|s| s.len()).collect_vec());
info!(
"merging block streams: queue length {:?}",
streams.iter().map(|s| s.len()).collect_vec()
);
}
if let Some(block) = block_message {
let slot = block.slot;
Expand Down
47 changes: 45 additions & 2 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,45 @@ pub fn from_grpc_block_update(
}
}

// TODO: use function from geyser-grpc-connector
use bytes::Bytes;
use std::time::Duration;
use tonic::metadata::{errors::InvalidMetadataValue, AsciiMetadataValue};
use tonic::service::Interceptor;
use tonic::transport::ClientTlsConfig;
use tonic_health::pb::health_client::HealthClient;
use yellowstone_grpc_client::InterceptorXToken;
use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient;
use yellowstone_grpc_proto::tonic;
async fn connect_with_timeout_hacked<E, T>(
endpoint: E,
x_token: Option<T>,
) -> anyhow::Result<GeyserGrpcClient<impl Interceptor>>
where
E: Into<Bytes>,
T: TryInto<AsciiMetadataValue, Error = InvalidMetadataValue>,
{
let endpoint = tonic::transport::Endpoint::from_shared(endpoint)?
.buffer_size(Some(65536))
.initial_connection_window_size(4194304)
.initial_stream_window_size(4194304)
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(10))
// .http2_adaptive_window()
.tls_config(ClientTlsConfig::new())?;

let x_token: Option<AsciiMetadataValue> = x_token.map(|v| v.try_into()).transpose()?;
let interceptor = InterceptorXToken { x_token };

let channel = endpoint.connect_lazy();
let client = GeyserGrpcClient::new(
HealthClient::with_interceptor(channel.clone(), interceptor.clone()),
GeyserClient::with_interceptor(channel, interceptor)
.max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()),
);
Ok(client)
}

pub fn create_block_processing_task(
grpc_addr: String,
grpc_x_token: Option<String>,
Expand All @@ -274,7 +313,7 @@ pub fn create_block_processing_task(

// connect to grpc
let mut client =
GeyserGrpcClient::connect(grpc_addr.clone(), grpc_x_token.clone(), None)?;
connect_with_timeout_hacked(grpc_addr.clone(), grpc_x_token.clone()).await?;
let mut stream = client
.subscribe_once(
HashMap::new(),
Expand All @@ -298,7 +337,11 @@ pub fn create_block_processing_task(

match update {
UpdateOneof::Block(block) => {
log::trace!("received block, hash: {} slot: {}", block.blockhash, block.slot);
log::trace!(
"received block, hash: {} slot: {}",
block.blockhash,
block.slot
);
block_sx
.send(block)
.await
Expand Down

0 comments on commit a12f750

Please sign in to comment.