From a2f20d5a5365ce2e7a79427f82437a73123bf9c6 Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Fri, 29 Nov 2024 22:16:38 -0500 Subject: [PATCH] proto: add `replay_from_slot` --- CHANGELOG.md | 1 + examples/rust/src/bin/client.rs | 6 + examples/rust/src/bin/tx-blocktime.rs | 1 + yellowstone-grpc-geyser/config.json | 5 +- yellowstone-grpc-geyser/src/config.rs | 7 + yellowstone-grpc-geyser/src/grpc.rs | 183 +++++++++++++++--- yellowstone-grpc-proto/proto/geyser.proto | 1 + .../src/plugin/filter/filter.rs | 9 + 8 files changed, 179 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cc986e77..9c18521e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The minor version will be incremented upon a breaking change and the patch versi - proto: add tonic feature ([#474](https://github.com/rpcpool/yellowstone-grpc/pull/474)) - geyser: use default compression as gzip and zstd ([#475](https://github.com/rpcpool/yellowstone-grpc/pull/475)) +- proto: add `replay_from_slot` ([#477](https://github.com/rpcpool/yellowstone-grpc/pull/477)) ### Breaking diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index 2372625d..1102ebf2 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -254,6 +254,10 @@ struct ActionSubscribe { #[clap(long)] blocks_meta: bool, + /// Replay message from slot + #[clap(long)] + replay_from_slot: Option, + /// Send ping in subscribe request #[clap(long)] ping: Option, @@ -447,6 +451,7 @@ impl Action { commitment: commitment.map(|x| x as i32), accounts_data_slice, ping, + replay_from_slot: args.replay_from_slot, }, args.resub.unwrap_or(0), args.stats, @@ -803,6 +808,7 @@ async fn geyser_subscribe( commitment: None, accounts_data_slice: Vec::default(), ping: None, + replay_from_slot: None, }) .await .map_err(GeyserGrpcClientError::SubscribeSendError)?; diff --git a/examples/rust/src/bin/tx-blocktime.rs b/examples/rust/src/bin/tx-blocktime.rs index 4162e36b..4aac795d 100644 --- a/examples/rust/src/bin/tx-blocktime.rs +++ b/examples/rust/src/bin/tx-blocktime.rs @@ -111,6 +111,7 @@ async fn main() -> anyhow::Result<()> { commitment: Some(commitment as i32), accounts_data_slice: vec![], ping: None, + replay_from_slot: None, }) .await?; diff --git a/yellowstone-grpc-geyser/config.json b/yellowstone-grpc-geyser/config.json index 5f347210..a14f65f1 100644 --- a/yellowstone-grpc-geyser/config.json +++ b/yellowstone-grpc-geyser/config.json @@ -20,8 +20,9 @@ "unary_concurrency_limit": 100, "unary_disabled": false, "x_token": null, - "filter_name_size_limit": 32, - "filter_names_size_limit": 1024, + "replay_stored_slots": 0, + "filter_name_size_limit": 128, + "filter_names_size_limit": 4096, "filter_names_cleanup_interval": "1s", "filter_limits": { "accounts": { diff --git a/yellowstone-grpc-geyser/src/config.rs b/yellowstone-grpc-geyser/src/config.rs index d03b20fe..c6dbb825 100644 --- a/yellowstone-grpc-geyser/src/config.rs +++ b/yellowstone-grpc-geyser/src/config.rs @@ -119,6 +119,9 @@ pub struct ConfigGrpc { with = "humantime_serde" )] pub filter_names_cleanup_interval: Duration, + /// Number of slots stored for replay + #[serde(default = "ConfigGrpc::default_replay_stored_slots")] + pub replay_stored_slots: u64, } impl ConfigGrpc { @@ -153,6 +156,10 @@ impl ConfigGrpc { const fn default_filter_names_cleanup_interval() -> Duration { Duration::from_secs(1) } + + const fn default_replay_stored_slots() -> u64 { + 0 + } } #[derive(Debug, Clone, Deserialize)] diff --git a/yellowstone-grpc-geyser/src/grpc.rs b/yellowstone-grpc-geyser/src/grpc.rs index 79c8f17e..7d41166e 100644 --- a/yellowstone-grpc-geyser/src/grpc.rs +++ b/yellowstone-grpc-geyser/src/grpc.rs @@ -20,7 +20,7 @@ use { tokio::{ fs, runtime::Builder, - sync::{broadcast, mpsc, Mutex, Notify, RwLock, Semaphore}, + sync::{broadcast, mpsc, oneshot, Mutex, Notify, RwLock, Semaphore}, task::spawn_blocking, time::{sleep, Duration, Instant}, }, @@ -249,9 +249,22 @@ impl BlockMetaStorage { } } +#[derive(Debug, Default)] +struct MessageId { + id: u64, +} + +impl MessageId { + fn next(&mut self) -> u64 { + self.id = self.id.checked_add(1).expect("message id overflow"); + self.id + } +} + #[derive(Debug, Default)] struct SlotMessages { - messages: Vec>, // Option is used for accounts with low write_version + messages: Vec>, // Option is used for accounts with low write_version + messages_slots: Vec<(u64, Message)>, block_meta: Option>, transactions: Vec>, accounts_dedup: HashMap, // (write_version, message_index) @@ -266,7 +279,7 @@ struct SlotMessages { } impl SlotMessages { - pub fn try_seal(&mut self) -> Option { + pub fn try_seal(&mut self, msgid_gen: &mut MessageId) -> Option<(u64, Message)> { if !self.sealed { if let Some(block_meta) = &self.block_meta { let executed_transaction_count = block_meta.executed_transaction_count as usize; @@ -285,17 +298,18 @@ impl SlotMessages { let mut accounts = Vec::with_capacity(self.messages.len()); for item in self.messages.iter().flatten() { - if let Message::Account(account) = item { + if let (_msgid, Message::Account(account)) = item { accounts.push(Arc::clone(&account.account)); } } - let message = Message::Block(Arc::new(MessageBlock::new( + let message_block = Message::Block(Arc::new(MessageBlock::new( Arc::clone(block_meta), transactions, accounts, entries, ))); + let message = (msgid_gen.next(), message_block); self.messages.push(Some(message.clone())); self.sealed = true; @@ -309,6 +323,14 @@ impl SlotMessages { } } +type BroadcastedMessage = (CommitmentLevel, Arc>); + +type ReplayStoredSlotsRequest = ( + CommitmentLevel, + Slot, + oneshot::Sender>>, +); + #[derive(Debug)] pub struct GrpcService { config_snapshot_client_channel_capacity: usize, @@ -317,7 +339,8 @@ pub struct GrpcService { blocks_meta: Option, subscribe_id: AtomicUsize, snapshot_rx: Mutex>>>, - broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, + broadcast_tx: broadcast::Sender, + replay_stored_slots_tx: mpsc::Sender, debug_clients_tx: Option>, filter_names: Arc>, } @@ -361,6 +384,13 @@ impl GrpcService { // Messages to clients combined by commitment let (broadcast_tx, _) = broadcast::channel(config.channel_capacity); + // attempt to prevent spam of geyser loop with capacity eq 1 + let (replay_stored_slots_tx, replay_stored_slots_rx) = + mpsc::channel(if config.replay_stored_slots == 0 { + 0 + } else { + 1 + }); // gRPC server builder with optional TLS let mut server_builder = Server::builder(); @@ -391,6 +421,7 @@ impl GrpcService { subscribe_id: AtomicUsize::new(0), snapshot_rx: Mutex::new(snapshot_rx), broadcast_tx: broadcast_tx.clone(), + replay_stored_slots_tx, debug_clients_tx, filter_names, }) @@ -411,7 +442,13 @@ impl GrpcService { .enable_all() .build() .expect("Failed to create a new runtime for geyser loop") - .block_on(Self::geyser_loop(messages_rx, blocks_meta_tx, broadcast_tx)); + .block_on(Self::geyser_loop( + messages_rx, + blocks_meta_tx, + broadcast_tx, + replay_stored_slots_rx, + config.replay_stored_slots, + )); }); // Run Server @@ -446,11 +483,14 @@ impl GrpcService { async fn geyser_loop( mut messages_rx: mpsc::UnboundedReceiver, blocks_meta_tx: Option>, - broadcast_tx: broadcast::Sender<(CommitmentLevel, Arc>)>, + broadcast_tx: broadcast::Sender, + mut replay_stored_slots_rx: mpsc::Receiver, + replay_stored_slots: u64, ) { const PROCESSED_MESSAGES_MAX: usize = 31; const PROCESSED_MESSAGES_SLEEP: Duration = Duration::from_millis(10); + let mut msgid_gen = MessageId::default(); let mut messages: BTreeMap = Default::default(); let mut processed_messages = Vec::with_capacity(PROCESSED_MESSAGES_MAX); let mut processed_first_slot = None; @@ -461,6 +501,7 @@ impl GrpcService { tokio::select! { Some(message) = messages_rx.recv() => { metrics::message_queue_size_dec(); + let msgid = msgid_gen.next(); // Update metrics if let Message::Slot(slot_message) = &message { @@ -482,8 +523,8 @@ impl GrpcService { processed_first_slot = Some(msg.slot); } Message::Slot(msg) if msg.status == CommitmentLevel::Finalized => { - // keep extra 10 slots - if let Some(msg_slot) = msg.slot.checked_sub(10) { + // keep extra 10 slots + slots for replay + if let Some(msg_slot) = msg.slot.checked_sub(10 + replay_stored_slots) { loop { match messages.keys().next().cloned() { Some(slot) if slot < msg_slot => { @@ -542,8 +583,10 @@ impl GrpcService { _ => {} } } - if !matches!(&message, Message::Slot(_)) { - slot_messages.messages.push(Some(message.clone())); + if matches!(&message, Message::Slot(_)) { + slot_messages.messages_slots.push((msgid, message.clone())); + } else { + slot_messages.messages.push(Some((msgid, message.clone()))); // If we already build Block message, new message will be a problem if slot_messages.sealed && !(matches!(&message, Message::Entry(_)) && slot_messages.entries_count == 0) { @@ -565,11 +608,11 @@ impl GrpcService { metrics::update_invalid_blocks("unexpected message: BlockMeta (duplicate)"); } slot_messages.block_meta = Some(Arc::clone(msg)); - sealed_block_msg = slot_messages.try_seal(); + sealed_block_msg = slot_messages.try_seal(&mut msgid_gen); } Message::Transaction(msg) => { slot_messages.transactions.push(Arc::clone(&msg.transaction)); - sealed_block_msg = slot_messages.try_seal(); + sealed_block_msg = slot_messages.try_seal(&mut msgid_gen); } // Dedup accounts by max write_version Message::Account(msg) => { @@ -587,7 +630,7 @@ impl GrpcService { } Message::Entry(msg) => { slot_messages.entries.push(Arc::clone(msg)); - sealed_block_msg = slot_messages.try_seal(); + sealed_block_msg = slot_messages.try_seal(&mut msgid_gen); } _ => {} } @@ -602,7 +645,7 @@ impl GrpcService { } else { None }; - messages_vec.push(message); + messages_vec.push((msgid, message)); // sometimes we do not receive all statuses if let Some((slot, status)) = slot_status { @@ -623,19 +666,20 @@ impl GrpcService { } slots.push(parent); - messages_vec.push(Message::Slot(MessageSlot { + let message_slot = Message::Slot(MessageSlot { slot: parent, parent: entry.parent_slot, status, dead_error: None, - })); + }); + messages_vec.push((msgid_gen.next(), message_slot)); metrics::missed_status_message_inc(status); } } } for message in messages_vec.into_iter().rev() { - if let Message::Slot(slot) = &message { + if let Message::Slot(slot) = &message.1 { let (mut confirmed_messages, mut finalized_messages) = match slot.status { CommitmentLevel::Processed | CommitmentLevel::FirstShredReceived | CommitmentLevel::Completed | CommitmentLevel::CreatedBank | CommitmentLevel::Dead => { (Vec::with_capacity(1), Vec::with_capacity(1)) @@ -689,8 +733,8 @@ impl GrpcService { } else { let mut confirmed_messages = vec![]; let mut finalized_messages = vec![]; - if matches!(&message, Message::Block(_)) { - if let Some(slot_messages) = messages.get(&message.get_slot()) { + if matches!(&message.1, Message::Block(_)) { + if let Some(slot_messages) = messages.get(&message.1.get_slot()) { if let Some(confirmed_at) = slot_messages.confirmed_at { confirmed_messages.extend( slot_messages.messages.as_slice()[confirmed_at..].iter().filter_map(|x| x.clone()) @@ -736,6 +780,28 @@ impl GrpcService { } processed_sleep.as_mut().reset(Instant::now() + PROCESSED_MESSAGES_SLEEP); } + Some((commitment, replay_slot, tx)) = replay_stored_slots_rx.recv() => { + if let Some((slot, _)) = messages.first_key_value() { + if replay_slot < *slot { + let _ = tx.send(None); + continue; + } + } + + let mut replayed_messages = Vec::with_capacity(32_768); + for (slot, messages) in messages.iter() { + if *slot >= replay_slot { + replayed_messages.extend_from_slice(&messages.messages_slots); + if commitment == CommitmentLevel::Processed + || (commitment == CommitmentLevel::Finalized && messages.finalized) + || (commitment == CommitmentLevel::Confirmed && messages.confirmed) + { + replayed_messages.extend(messages.messages.iter().filter_map(|v| v.clone())); + } + } + } + let _ = tx.send(Some(replayed_messages)); + } else => break, } } @@ -746,9 +812,10 @@ impl GrpcService { id: usize, endpoint: String, stream_tx: mpsc::Sender>, - mut client_rx: mpsc::UnboundedReceiver>, + mut client_rx: mpsc::UnboundedReceiver, Filter)>>, mut snapshot_rx: Option>>, - mut messages_rx: broadcast::Receiver<(CommitmentLevel, Arc>)>, + mut messages_rx: broadcast::Receiver, + replay_stored_slots_tx: mpsc::Sender, debug_client_tx: Option>, drop_client: impl FnOnce(), ) { @@ -795,7 +862,7 @@ impl GrpcService { } match message { - Some(Some(filter_new)) => { + Some(Some((replay_from_slot, filter_new))) => { if let Some(msg) = filter_new.get_pong_msg() { if stream_tx.send(Ok(msg)).await.is_err() { error!("client #{id}: stream closed"); @@ -808,6 +875,57 @@ impl GrpcService { filter = filter_new; DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateFilter { id, filter: Box::new(filter.clone()) }); info!("client #{id}: filter updated"); + + if let Some(replay_from_slot) = replay_from_slot { + if replay_stored_slots_tx.max_capacity() == 0 { + info!("client #{id}: replay is not supported"); + tokio::spawn(async move { + let _ = stream_tx.send(Err(Status::internal("replay is not supported"))).await; + }); + break 'outer; + } + + let (tx, rx) = oneshot::channel(); + let commitment = filter.get_commitment_level(); + if let Err(_error) = replay_stored_slots_tx.send((commitment, replay_from_slot, tx)).await { + error!("client #{id}: failed to send replay request"); + tokio::spawn(async move { + let _ = stream_tx.send(Err(Status::internal("failed to send replay request"))).await; + }); + break 'outer; + } + + let Ok(messages) = rx.await else { + error!("client #{id}: failed to get replay response"); + tokio::spawn(async move { + let _ = stream_tx.send(Err(Status::internal("failed to get replay response"))).await; + }); + break 'outer; + }; + + let Some(mut messages) = messages else { + info!("client #{id}: replay from {replay_from_slot} is not available"); + tokio::spawn(async move { + let _ = stream_tx.send(Err( + Status::internal(format!("replay from {replay_from_slot} is not available")) + )).await; + }); + break 'outer; + }; + + messages.sort_by_key(|msg| msg.0); + for (_msgid, message) in messages.iter() { + for message in filter.get_updates(message, Some(commitment)) { + match stream_tx.send(Ok(message)).await { + Ok(()) => {} + Err(mpsc::error::SendError(_)) => { + error!("client #{id}: stream closed"); + break 'outer; + } + } + } + } + } } Some(None) => { break 'outer; @@ -826,21 +944,21 @@ impl GrpcService { Err(broadcast::error::RecvError::Lagged(_)) => { info!("client #{id}: lagged to receive geyser messages"); tokio::spawn(async move { - let _ = stream_tx.send(Err(Status::internal("lagged"))).await; + let _ = stream_tx.send(Err(Status::internal("lagged to receive geyser messages"))).await; }); break 'outer; } }; if commitment == filter.get_commitment_level() { - for message in messages.iter() { + for (_msgid, message) in messages.iter() { for message in filter.get_updates(message, Some(commitment)) { match stream_tx.try_send(Ok(message)) { Ok(()) => {} Err(mpsc::error::TrySendError::Full(_)) => { - error!("client #{id}: lagged to send update"); + error!("client #{id}: lagged to send an update"); tokio::spawn(async move { - let _ = stream_tx.send(Err(Status::internal("lagged"))).await; + let _ = stream_tx.send(Err(Status::internal("lagged to send an update"))).await; }); break 'outer; } @@ -855,7 +973,7 @@ impl GrpcService { if commitment == CommitmentLevel::Processed && debug_client_tx.is_some() { for message in messages.iter() { - if let Message::Slot(slot_message) = &message { + if let Message::Slot(slot_message) = &message.1 { DebugClientMessage::maybe_send(&debug_client_tx, || DebugClientMessage::UpdateSlot { id, slot: slot_message.slot }); } } @@ -876,7 +994,7 @@ impl GrpcService { id: usize, endpoint: &str, stream_tx: &mpsc::Sender>, - client_rx: &mut mpsc::UnboundedReceiver>, + client_rx: &mut mpsc::UnboundedReceiver, Filter)>>, snapshot_rx: crossbeam_channel::Receiver>, is_alive: &mut bool, filter: &mut Filter, @@ -886,7 +1004,7 @@ impl GrpcService { // we start with default filter, for snapshot we need wait actual filter first while *is_alive { match client_rx.recv().await { - Some(Some(filter_new)) => { + Some(Some((_replay_from_slot, filter_new))) => { if let Some(msg) = filter_new.get_pong_msg() { if stream_tx.send(Ok(msg)).await.is_err() { error!("client #{id}: stream closed"); @@ -1014,7 +1132,7 @@ impl Geyser for GrpcService { filter_names.try_clean(); if let Err(error) = match Filter::new(&request, &config_filter_limits, &mut filter_names) { - Ok(filter) => match incoming_client_tx.send(Some(filter)) { + Ok(filter) => match incoming_client_tx.send(Some((request.replay_from_slot ,filter))) { Ok(()) => Ok(()), Err(error) => Err(error.to_string()), }, @@ -1047,6 +1165,7 @@ impl Geyser for GrpcService { client_rx, snapshot_rx, self.broadcast_tx.subscribe(), + self.replay_stored_slots_tx.clone(), self.debug_clients_tx.clone(), move || { notify_exit1.notify_one(); diff --git a/yellowstone-grpc-proto/proto/geyser.proto b/yellowstone-grpc-proto/proto/geyser.proto index 613fcdb9..952dfb38 100644 --- a/yellowstone-grpc-proto/proto/geyser.proto +++ b/yellowstone-grpc-proto/proto/geyser.proto @@ -37,6 +37,7 @@ message SubscribeRequest { optional CommitmentLevel commitment = 6; repeated SubscribeRequestAccountsDataSlice accounts_data_slice = 7; optional SubscribeRequestPing ping = 9; + optional uint64 replay_from_slot = 11; } message SubscribeRequestFilterAccounts { diff --git a/yellowstone-grpc-proto/src/plugin/filter/filter.rs b/yellowstone-grpc-proto/src/plugin/filter/filter.rs index ac64aa9b..ea4f5300 100644 --- a/yellowstone-grpc-proto/src/plugin/filter/filter.rs +++ b/yellowstone-grpc-proto/src/plugin/filter/filter.rs @@ -1167,6 +1167,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + replay_from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()); @@ -1198,6 +1199,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + replay_from_slot: None, }; let mut limit = FilterLimits::default(); limit.accounts.any = false; @@ -1233,6 +1235,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + replay_from_slot: None, }; let mut limit = FilterLimits::default(); limit.transactions.any = false; @@ -1267,6 +1270,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + replay_from_slot: None, }; let mut limit = FilterLimits::default(); limit.transactions.any = false; @@ -1307,6 +1311,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + replay_from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); @@ -1371,6 +1376,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + replay_from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); @@ -1435,6 +1441,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + replay_from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); @@ -1485,6 +1492,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + replay_from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap(); @@ -1557,6 +1565,7 @@ mod tests { commitment: None, accounts_data_slice: Vec::new(), ping: None, + replay_from_slot: None, }; let limit = FilterLimits::default(); let filter = Filter::new(&config, &limit, &mut create_filter_names()).unwrap();