Skip to content

Commit

Permalink
geyser: support snapshot data (#182)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Oct 12, 2023
1 parent a915f08 commit 87918a2
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 72 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi

### Breaking

## 2023-10-12

- yellowstone-grpc-geyser-1.10.0+solana.1.16.16

### Features

- geyser: support snapshot data ([#182](https://github.com/rpcpool/yellowstone-grpc/pull/182)).

## 2023-10-10

- yellowstone-grpc-client-1.11.1+solana.1.16.16
Expand Down
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
members = [
"examples/rust", # 1.10.0+solana.1.16.16
"yellowstone-grpc-client", # 1.11.1+solana.1.16.16
"yellowstone-grpc-geyser", # 1.9.1+solana.1.16.16
"yellowstone-grpc-geyser", # 1.10.0+solana.1.16.16
"yellowstone-grpc-kafka", # 1.0.0-rc.3+solana.1.16.16
"yellowstone-grpc-proto", # 1.10.0+solana.1.16.16
]
Expand Down
3 changes: 2 additions & 1 deletion yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "1.9.1+solana.1.16.16"
version = "1.10.0+solana.1.16.16"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Plugin"
Expand All @@ -18,6 +18,7 @@ base64 = "0.21.0"
bincode = "1.3.3"
bs58 = "0.4.0"
clap = { version = "4.3.24", features = ["cargo", "derive"] }
crossbeam-channel = "0.5.8"
futures = "0.3.24"
hyper = { version = "0.14.20", features = ["server"] }
lazy_static = "1.4.0"
Expand Down
2 changes: 2 additions & 0 deletions yellowstone-grpc-geyser/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
},
"grpc": {
"address": "0.0.0.0:10000",
"snapshot_plugin_channel_capacity": null,
"snapshot_client_channel_capacity": "50_000_000",
"channel_capacity": "100_000",
"unary_concurrency_limit": 100,
"unary_disabled": false,
Expand Down
43 changes: 43 additions & 0 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ pub struct ConfigGrpc {
pub address: SocketAddr,
/// TLS config
pub tls_config: Option<ConfigGrpcServerTls>,
/// Capacity of the channel used for accounts from snapshot,
/// on reaching the limit Sender block validator startup.
#[serde(
default = "ConfigGrpc::snapshot_plugin_channel_capacity_default",
deserialize_with = "deserialize_usize_str_maybe"
)]
pub snapshot_plugin_channel_capacity: Option<usize>,
/// Capacity of the client channel, applicable only with snapshot
#[serde(
default = "ConfigGrpc::snapshot_client_channel_capacity_default",
deserialize_with = "deserialize_usize_str"
)]
pub snapshot_client_channel_capacity: usize,
/// Capacity of the channel per connection
#[serde(
default = "ConfigGrpc::channel_capacity_default",
Expand All @@ -85,6 +98,14 @@ pub struct ConfigGrpc {
}

impl ConfigGrpc {
const fn snapshot_plugin_channel_capacity_default() -> Option<usize> {
None
}

const fn snapshot_client_channel_capacity_default() -> usize {
50_000_000
}

const fn channel_capacity_default() -> usize {
250_000
}
Expand Down Expand Up @@ -313,6 +334,28 @@ where
}
}

fn deserialize_usize_str_maybe<'de, D>(deserializer: D) -> Result<Option<usize>, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum Value {
Integer(usize),
String(String),
}

match Option::<Value>::deserialize(deserializer)? {
Some(Value::Integer(value)) => Ok(Some(value)),
Some(Value::String(value)) => value
.replace('_', "")
.parse::<usize>()
.map(Some)
.map_err(de::Error::custom),
None => Ok(None),
}
}

fn deserialize_pubkey_set<'de, D>(deserializer: D) -> Result<HashSet<Pubkey>, D::Error>
where
D: Deserializer<'de>,
Expand Down
167 changes: 121 additions & 46 deletions yellowstone-grpc-geyser/src/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use {
},
tokio::{
fs,
sync::{broadcast, mpsc, Notify, RwLock, Semaphore},
sync::{broadcast, mpsc, Mutex, Notify, RwLock, Semaphore},
time::{sleep, Duration, Instant},
},
tokio_stream::wrappers::ReceiverStream,
Expand Down Expand Up @@ -677,15 +677,21 @@ pub struct GrpcService {
config: ConfigGrpc,
blocks_meta: Option<BlockMetaStorage>,
subscribe_id: AtomicUsize,
snapshot_rx: Mutex<Option<crossbeam_channel::Receiver<Option<Message>>>>,
broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc<Vec<Message>>)>,
}

impl GrpcService {
#[allow(clippy::type_complexity)]
pub async fn create(
config: ConfigGrpc,
block_fail_action: ConfigBlockFailAction,
) -> Result<
(mpsc::UnboundedSender<Message>, Arc<Notify>),
(
Option<crossbeam_channel::Sender<Option<Message>>>,
mpsc::UnboundedSender<Message>,
Arc<Notify>,
),
Box<dyn std::error::Error + Send + Sync>,
> {
// Bind service address
Expand All @@ -695,6 +701,15 @@ impl GrpcService {
Some(Duration::from_secs(20)), // tcp_keepalive
)?;

// Snapshot channel
let (snapshot_tx, snapshot_rx) = match config.snapshot_plugin_channel_capacity {
Some(cap) => {
let (tx, rx) = crossbeam_channel::bounded(cap);
(Some(tx), Some(rx))
}
None => (None, None),
};

// Blocks meta storage
let (blocks_meta, blocks_meta_tx) = if config.unary_disabled {
(None, None)
Expand Down Expand Up @@ -723,6 +738,7 @@ impl GrpcService {
config,
blocks_meta,
subscribe_id: AtomicUsize::new(0),
snapshot_rx: Mutex::new(snapshot_rx),
broadcast_tx: broadcast_tx.clone(),
})
.accept_compressed(CompressionEncoding::Gzip)
Expand Down Expand Up @@ -753,7 +769,7 @@ impl GrpcService {
.await
});

Ok((messages_tx, shutdown))
Ok((snapshot_tx, messages_tx, shutdown))
}

async fn geyser_loop(
Expand Down Expand Up @@ -1021,65 +1037,118 @@ impl GrpcService {
mut filter: Filter,
stream_tx: mpsc::Sender<TonicResult<SubscribeUpdate>>,
mut client_rx: mpsc::UnboundedReceiver<Option<Filter>>,
mut snapshot_rx: Option<crossbeam_channel::Receiver<Option<Message>>>,
mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc<Vec<Message>>)>,
drop_client: impl FnOnce(),
) {
CONNECTIONS_TOTAL.inc();
info!("client #{id}: new");
'outer: loop {
tokio::select! {
message = client_rx.recv() => {
match message {
Some(Some(filter_new)) => {
filter = filter_new;
info!("client #{id}: filter updated");
}
Some(None) => {
break 'outer;
},
None => {
break 'outer;

let mut is_alive = true;
if let Some(snapshot_rx) = snapshot_rx.take() {
info!("client #{id}: going to receive snapshot data");

// we start with default filter, for snapshot we need wait actual filter first
match client_rx.recv().await {
Some(Some(filter_new)) => {
filter = filter_new;
info!("client #{id}: filter updated");
}
Some(None) => {
is_alive = false;
}
None => {
is_alive = false;
}
};

while is_alive {
let message = match snapshot_rx.try_recv() {
Ok(message) => {
MESSAGE_QUEUE_SIZE.dec();
match message {
Some(message) => message,
None => break,
}
}
Err(crossbeam_channel::TryRecvError::Empty) => {
sleep(Duration::from_millis(1)).await;
continue;
}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("client #{id}: snapshot channel disconnected");
is_alive = false;
break;
}
};

for message in filter.get_update(&message) {
if stream_tx.send(Ok(message)).await.is_err() {
error!("client #{id}: stream closed");
is_alive = false;
break;
}
}
message = messages_rx.recv() => {
match message {
Ok((commitment, messages)) => {
if commitment == filter.get_commitment_level() {
for message in messages.iter() {
for message in filter.get_update(message) {
match stream_tx.try_send(Ok(message)) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
error!("client #{id}: lagged to send update");
tokio::spawn(async move {
let _ = stream_tx.send(Err(Status::internal("lagged"))).await;
});
break 'outer;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
error!("client #{id}: stream closed");
break 'outer;
}
}
}

if is_alive {
'outer: loop {
tokio::select! {
message = client_rx.recv() => {
match message {
Some(Some(filter_new)) => {
filter = filter_new;
info!("client #{id}: filter updated");
}
Some(None) => {
break 'outer;
},
None => {
break 'outer;
}
}
}
message = messages_rx.recv() => {
let (commitment, messages) = match message {
Ok((commitment, messages)) => (commitment, messages),
Err(broadcast::error::RecvError::Closed) => {
break 'outer;
},
Err(broadcast::error::RecvError::Lagged(_)) => {
info!("client #{id}: lagged to receive geyser messages");
tokio::spawn(async move {
let _ = stream_tx.send(Err(Status::internal("lagged"))).await;
});
break 'outer;
}
};

if commitment == filter.get_commitment_level() {
for message in messages.iter() {
for message in filter.get_update(message) {
match stream_tx.try_send(Ok(message)) {
Ok(()) => {}
Err(mpsc::error::TrySendError::Full(_)) => {
error!("client #{id}: lagged to send update");
tokio::spawn(async move {
let _ = stream_tx.send(Err(Status::internal("lagged"))).await;
});
break 'outer;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
error!("client #{id}: stream closed");
break 'outer;
}
}
}
}
}
Err(broadcast::error::RecvError::Closed) => {
break 'outer;
},
Err(broadcast::error::RecvError::Lagged(_)) => {
info!("client #{id}: lagged to receive geyser messages");
tokio::spawn(async move {
let _ = stream_tx.send(Err(Status::internal("lagged"))).await;
});
break 'outer;
}
}
}
}
}

info!("client #{id}: removed");
CONNECTIONS_TOTAL.dec();
drop_client();
Expand Down Expand Up @@ -1109,7 +1178,12 @@ impl Geyser for GrpcService {
&self.config.filters,
)
.expect("empty filter");
let (stream_tx, stream_rx) = mpsc::channel(self.config.channel_capacity);
let snapshot_rx = self.snapshot_rx.lock().await.take();
let (stream_tx, stream_rx) = mpsc::channel(if snapshot_rx.is_some() {
self.config.snapshot_client_channel_capacity
} else {
self.config.channel_capacity
});
let (client_tx, client_rx) = mpsc::unbounded_channel();
let notify_exit1 = Arc::new(Notify::new());
let notify_exit2 = Arc::new(Notify::new());
Expand Down Expand Up @@ -1192,6 +1266,7 @@ impl Geyser for GrpcService {
filter,
stream_tx,
client_rx,
snapshot_rx,
self.broadcast_tx.subscribe(),
move || {
notify_exit1.notify_one();
Expand Down
Loading

0 comments on commit 87918a2

Please sign in to comment.