From 10f36f0fd4ace8fa8af4403e4ba0d3dc1c433efa Mon Sep 17 00:00:00 2001 From: Kirill Fomichev Date: Thu, 14 Nov 2024 11:13:51 +0200 Subject: [PATCH] examples: add progress bar to client tool (#456) --- .editorconfig | 22 ++ CHANGELOG.md | 1 + Cargo.lock | 67 +++++ Cargo.toml | 1 + examples/rust/Cargo.toml | 1 + examples/rust/src/bin/client.rs | 400 ++++++++++++++++++---------- yellowstone-grpc-geyser/config.json | 162 ++++++----- yellowstone-grpc-proto/src/lib.rs | 15 +- 8 files changed, 434 insertions(+), 235 deletions(-) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 00000000..6dddb172 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,22 @@ +root = true + +[*] +end_of_line = lf +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true + +[*.{diff,md}] +trim_trailing_whitespace = false + +[*.{js,json}] +indent_style = space +indent_size = 2 + +[*.proto] +indent_style = space +indent_size = 2 + +[*.rs] +indent_style = space +indent_size = 4 diff --git a/CHANGELOG.md b/CHANGELOG.md index 069b6585..cac52036 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ The minor version will be incremented upon a breaking change and the patch versi - node: remove generated grpc files ([#447](https://github.com/rpcpool/yellowstone-grpc/pull/447)) - proto: add txn_signature filter ([#445](https://github.com/rpcpool/yellowstone-grpc/pull/445)) - geyser: limit length of filter name ([#448](https://github.com/rpcpool/yellowstone-grpc/pull/448)) +- examples: add progress bar to client tool ([#456](https://github.com/rpcpool/yellowstone-grpc/pull/456)) ### Breaking diff --git a/Cargo.lock b/Cargo.lock index f468ff7a..693c4277 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -823,6 +823,19 @@ dependencies = [ "unreachable", ] +[[package]] +name = "console" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "unicode-width 0.1.14", + "windows-sys 0.52.0", +] + [[package]] name = "console_error_panic_hook" version = "0.1.7" @@ -1106,6 +1119,12 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "encoding_rs" version = "0.8.34" @@ -1786,6 +1805,19 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "indicatif" +version = "0.17.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbf675b85ed934d3c67b5c5469701eec7db22689d0a2139d856e0925fa28b281" +dependencies = [ + "console", + "number_prefix", + "portable-atomic", + "unicode-width 0.2.0", + "web-time", +] + [[package]] name = "inout" version = "0.1.3" @@ -2163,6 +2195,12 @@ dependencies = [ "libc", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "object" version = "0.36.3" @@ -2309,6 +2347,12 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "powerfmt" version = "0.2.0" @@ -4032,6 +4076,18 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-width" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dd6e30e90baa6f72411720665d41d89b9a3d039dc45b8faea1ddd07f617f6af" + +[[package]] +name = "unicode-width" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fc81956842c57dac11422a97c3b8195a1ff727f06e85c84ed2e8aa277c9a0fd" + [[package]] name = "universal-hash" version = "0.5.1" @@ -4219,6 +4275,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.4" @@ -4468,6 +4534,7 @@ dependencies = [ "env_logger 0.11.5", "futures", "hex", + "indicatif", "log", "maplit", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index 81ecef70..1973e387 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ http-body-util = "0.1.2" humantime-serde = "1.1.1" hyper = "1.4.1" hyper-util = "0.1.7" +indicatif = "0.17.9" lazy_static = "1.4.0" local-ip-address = "0.6.1" log = "0.4.17" diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index cde01bff..ee0ca7b9 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -22,6 +22,7 @@ clap = { workspace = true, features = ["derive"] } env_logger = { workspace = true } futures = { workspace = true } hex = { workspace = true } +indicatif = { workspace = true } log = { workspace = true } maplit = { workspace = true } serde_json = { workspace = true } diff --git a/examples/rust/src/bin/client.rs b/examples/rust/src/bin/client.rs index 286e07e2..0ba205b9 100644 --- a/examples/rust/src/bin/client.rs +++ b/examples/rust/src/bin/client.rs @@ -1,25 +1,32 @@ use { + anyhow::Context, backoff::{future::retry, ExponentialBackoff}, clap::{Parser, Subcommand, ValueEnum}, futures::{future::TryFutureExt, sink::SinkExt, stream::StreamExt}, + indicatif::{MultiProgress, ProgressBar, ProgressStyle}, log::{error, info}, - solana_sdk::{pubkey::Pubkey, signature::Signature, transaction::TransactionError}, - solana_transaction_status::{EncodedTransactionWithStatusMeta, UiTransactionEncoding}, - std::{collections::HashMap, env, fmt, fs::File, sync::Arc, time::Duration}, + serde_json::{json, Value}, + solana_sdk::{hash::Hash, pubkey::Pubkey, signature::Signature}, + solana_transaction_status::UiTransactionEncoding, + std::{collections::HashMap, env, fs::File, sync::Arc, time::Duration}, tokio::sync::Mutex, tonic::transport::channel::ClientTlsConfig, yellowstone_grpc_client::{GeyserGrpcClient, GeyserGrpcClientError, Interceptor}, - yellowstone_grpc_proto::prelude::{ - subscribe_request_filter_accounts_filter::Filter as AccountsFilterOneof, - subscribe_request_filter_accounts_filter_lamports::Cmp as AccountsFilterLamports, - subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, - subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, - SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, - SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterLamports, - SubscribeRequestFilterAccountsFilterMemcmp, SubscribeRequestFilterBlocks, - SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterEntry, SubscribeRequestFilterSlots, - SubscribeRequestFilterTransactions, SubscribeRequestPing, SubscribeUpdateAccount, - SubscribeUpdateTransaction, SubscribeUpdateTransactionStatus, + yellowstone_grpc_proto::{ + convert_from, + prelude::{ + subscribe_request_filter_accounts_filter::Filter as AccountsFilterOneof, + subscribe_request_filter_accounts_filter_lamports::Cmp as AccountsFilterLamports, + subscribe_request_filter_accounts_filter_memcmp::Data as AccountsFilterMemcmpOneof, + subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequest, + SubscribeRequestAccountsDataSlice, SubscribeRequestFilterAccounts, + SubscribeRequestFilterAccountsFilter, SubscribeRequestFilterAccountsFilterLamports, + SubscribeRequestFilterAccountsFilterMemcmp, SubscribeRequestFilterBlocks, + SubscribeRequestFilterBlocksMeta, SubscribeRequestFilterEntry, + SubscribeRequestFilterSlots, SubscribeRequestFilterTransactions, SubscribeRequestPing, + SubscribeUpdateAccountInfo, SubscribeUpdateEntry, SubscribeUpdateTransactionInfo, + }, + prost::Message, }, }; @@ -45,6 +52,10 @@ struct Args { #[clap(long)] commitment: Option, + /// Max message size before decoding, full blocks can be super large, default is 1GiB + #[clap(long, default_value_t = 1024 * 1024 * 1024)] + max_decoding_message_size: usize, + #[command(subcommand)] action: Action, } @@ -60,6 +71,7 @@ impl Args { .connect_timeout(Duration::from_secs(10)) .timeout(Duration::from_secs(10)) .tls_config(ClientTlsConfig::new().with_native_roots())? + .max_decoding_message_size(self.max_decoding_message_size) .connect() .await .map_err(Into::into) @@ -209,7 +221,7 @@ struct ActionSubscribe { transactions_status_account_required: Vec, #[clap(long)] - entry: bool, + entries: bool, /// Subscribe on block updates #[clap(long)] @@ -239,16 +251,20 @@ struct ActionSubscribe { #[clap(long)] ping: Option, - // Resubscribe (only to slots) after + /// Resubscribe (only to slots) after #[clap(long)] resub: Option, + + /// Show total stat instead of messages + #[clap(long, default_value_t = false)] + stats: bool, } impl Action { async fn get_subscribe_request( &self, commitment: Option, - ) -> anyhow::Result> { + ) -> anyhow::Result> { Ok(match self { Self::Subscribe(args) => { let mut accounts: AccountFilterMap = HashMap::new(); @@ -369,9 +385,9 @@ impl Action { ); } - let mut entry: EntryFilterMap = HashMap::new(); - if args.entry { - entry.insert("client".to_owned(), SubscribeRequestFilterEntry {}); + let mut entries: EntryFilterMap = HashMap::new(); + if args.entries { + entries.insert("client".to_owned(), SubscribeRequestFilterEntry {}); } let mut blocks: BlocksFilterMap = HashMap::new(); @@ -414,7 +430,7 @@ impl Action { accounts, transactions, transactions_status, - entry, + entry: entries, blocks, blocks_meta, commitment: commitment.map(|x| x as i32), @@ -422,6 +438,7 @@ impl Action { ping, }, args.resub.unwrap_or(0), + args.stats, )) } _ => None, @@ -429,110 +446,6 @@ impl Action { } } -#[derive(Debug)] -#[allow(dead_code)] -pub struct AccountPretty { - is_startup: bool, - slot: u64, - pubkey: Pubkey, - lamports: u64, - owner: Pubkey, - executable: bool, - rent_epoch: u64, - data: String, - write_version: u64, - txn_signature: String, -} - -impl From for AccountPretty { - fn from( - SubscribeUpdateAccount { - is_startup, - slot, - account, - }: SubscribeUpdateAccount, - ) -> Self { - let account = account.expect("should be defined"); - Self { - is_startup, - slot, - pubkey: Pubkey::try_from(account.pubkey).expect("valid pubkey"), - lamports: account.lamports, - owner: Pubkey::try_from(account.owner).expect("valid pubkey"), - executable: account.executable, - rent_epoch: account.rent_epoch, - data: hex::encode(account.data), - write_version: account.write_version, - txn_signature: bs58::encode(account.txn_signature.unwrap_or_default()).into_string(), - } - } -} - -#[allow(dead_code)] -pub struct TransactionPretty { - slot: u64, - signature: Signature, - is_vote: bool, - tx: EncodedTransactionWithStatusMeta, -} - -impl fmt::Debug for TransactionPretty { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - struct TxWrap<'a>(&'a EncodedTransactionWithStatusMeta); - impl<'a> fmt::Debug for TxWrap<'a> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let serialized = serde_json::to_string(self.0).expect("failed to serialize"); - fmt::Display::fmt(&serialized, f) - } - } - - f.debug_struct("TransactionPretty") - .field("slot", &self.slot) - .field("signature", &self.signature) - .field("is_vote", &self.is_vote) - .field("tx", &TxWrap(&self.tx)) - .finish() - } -} - -impl From for TransactionPretty { - fn from(SubscribeUpdateTransaction { transaction, slot }: SubscribeUpdateTransaction) -> Self { - let tx = transaction.expect("should be defined"); - Self { - slot, - signature: Signature::try_from(tx.signature.as_slice()).expect("valid signature"), - is_vote: tx.is_vote, - tx: yellowstone_grpc_proto::convert_from::create_tx_with_meta(tx) - .expect("valid tx with meta") - .encode(UiTransactionEncoding::Base64, Some(u8::MAX), true) - .expect("failed to encode"), - } - } -} - -#[allow(dead_code)] -#[derive(Debug)] -pub struct TransactionStatusPretty { - slot: u64, - signature: Signature, - is_vote: bool, - index: u64, - err: Option, -} - -impl From for TransactionStatusPretty { - fn from(status: SubscribeUpdateTransactionStatus) -> Self { - Self { - slot: status.slot, - signature: Signature::try_from(status.signature.as_slice()).expect("valid signature"), - is_vote: status.is_vote, - index: status.index, - err: yellowstone_grpc_proto::convert_from::create_tx_error(status.err.as_ref()) - .expect("valid tx err"), - } - } -} - #[tokio::main] async fn main() -> anyhow::Result<()> { env::set_var( @@ -572,14 +485,16 @@ async fn main() -> anyhow::Result<()> { .map(|response| info!("response: {response:?}")), Action::HealthWatch => geyser_health_watch(client).await, Action::Subscribe(_) => { - let (request, resub) = args + let (request, resub, stats) = args .action .get_subscribe_request(commitment) .await .map_err(backoff::Error::Permanent)? - .expect("expect subscribe action"); + .ok_or(backoff::Error::Permanent(anyhow::anyhow!( + "expect subscribe action" + )))?; - geyser_subscribe(client, request, resub).await + geyser_subscribe(client, request, resub, stats).await } Action::Ping { count } => client .ping(*count) @@ -636,7 +551,28 @@ async fn geyser_subscribe( mut client: GeyserGrpcClient, request: SubscribeRequest, resub: usize, + stats: bool, ) -> anyhow::Result<()> { + let pb_multi = MultiProgress::new(); + let mut pb_accounts_c = 0; + let pb_accounts = crate_progress_bar(&pb_multi, "accounts", false)?; + let mut pb_slots_c = 0; + let pb_slots = crate_progress_bar(&pb_multi, "slots", false)?; + let mut pb_txs_c = 0; + let pb_txs = crate_progress_bar(&pb_multi, "transactions", false)?; + let mut pb_txs_st_c = 0; + let pb_txs_st = crate_progress_bar(&pb_multi, "transactions statuses", false)?; + let mut pb_entries_c = 0; + let pb_entries = crate_progress_bar(&pb_multi, "entries", false)?; + let mut pb_blocks_mt_c = 0; + let pb_blocks_mt = crate_progress_bar(&pb_multi, "blocks meta", false)?; + let mut pb_blocks_c = 0; + let pb_blocks = crate_progress_bar(&pb_multi, "blocks", false)?; + let mut pb_pp_c = 0; + let pb_pp = crate_progress_bar(&pb_multi, "ping/pong", false)?; + let mut pb_total_c = 0; + let pb_total = crate_progress_bar(&pb_multi, "total", true)?; + let (mut subscribe_tx, mut stream) = client.subscribe_with_request(Some(request)).await?; info!("stream opened"); @@ -644,30 +580,127 @@ async fn geyser_subscribe( while let Some(message) = stream.next().await { match message { Ok(msg) => { + if stats { + let encoded_len = msg.encoded_len() as u64; + let (pb_c, pb) = match msg.update_oneof { + Some(UpdateOneof::Account(_)) => (&mut pb_accounts_c, &pb_accounts), + Some(UpdateOneof::Slot(_)) => (&mut pb_slots_c, &pb_slots), + Some(UpdateOneof::Transaction(_)) => (&mut pb_txs_c, &pb_txs), + Some(UpdateOneof::TransactionStatus(_)) => (&mut pb_txs_st_c, &pb_txs_st), + Some(UpdateOneof::Entry(_)) => (&mut pb_entries_c, &pb_entries), + Some(UpdateOneof::BlockMeta(_)) => (&mut pb_blocks_mt_c, &pb_blocks_mt), + Some(UpdateOneof::Block(_)) => (&mut pb_blocks_c, &pb_blocks), + Some(UpdateOneof::Ping(_)) => (&mut pb_pp_c, &pb_pp), + Some(UpdateOneof::Pong(_)) => (&mut pb_pp_c, &pb_pp), + None => { + error!("update not found in the message"); + break; + } + }; + *pb_c += 1; + pb.set_message(format_thousands(*pb_c)); + pb.inc(encoded_len); + pb_total_c += 1; + pb_total.set_message(format_thousands(pb_total_c)); + pb_total.inc(encoded_len); + continue; + } + + let filters = msg.filters; match msg.update_oneof { - Some(UpdateOneof::Account(account)) => { - let account: AccountPretty = account.into(); - info!( - "new account update: filters {:?}, account: {:#?}", - msg.filters, account + Some(UpdateOneof::Account(msg)) => { + let account = msg + .account + .ok_or(anyhow::anyhow!("no account in the message"))?; + let mut value = create_pretty_account(account)?; + value["isStartup"] = json!(msg.is_startup); + value["slot"] = json!(msg.slot); + print_update("account", &filters, value); + } + Some(UpdateOneof::Slot(msg)) => { + let status = CommitmentLevel::try_from(msg.status) + .context("failed to decode commitment")?; + print_update( + "slot", + &filters, + json!({ + "slot": msg.slot, + "parent": msg.parent, + "status": status.as_str_name() + }), ); - continue; } - Some(UpdateOneof::Transaction(tx)) => { - let tx: TransactionPretty = tx.into(); - info!( - "new transaction update: filters {:?}, transaction: {:#?}", - msg.filters, tx + Some(UpdateOneof::Transaction(msg)) => { + let tx = msg + .transaction + .ok_or(anyhow::anyhow!("no transaction in the message"))?; + let mut value = create_pretty_transaction(tx)?; + value["slot"] = json!(msg.slot); + print_update("transaction", &filters, value); + } + Some(UpdateOneof::TransactionStatus(msg)) => { + print_update( + "transactionStatus", + &filters, + json!({ + "slot": msg.slot, + "signature": Signature::try_from(msg.signature.as_slice()).context("invalid signature")?.to_string(), + "isVote": msg.is_vote, + "index": msg.index, + "err": convert_from::create_tx_error(msg.err.as_ref()) + .map_err(|error| anyhow::anyhow!(error)) + .context("invalid error")?, + }), + ); + } + Some(UpdateOneof::Entry(msg)) => { + print_update("entry", &filters, create_pretty_entry(msg)?); + } + Some(UpdateOneof::BlockMeta(msg)) => { + print_update( + "blockmeta", + &filters, + json!({ + "slot": msg.slot, + "blockhash": msg.blockhash, + "rewards": if let Some(rewards) = msg.rewards { + Some(convert_from::create_rewards_obj(rewards).map_err(|error| anyhow::anyhow!(error))?) + } else { + None + }, + "blockTime": msg.block_time.map(|obj| obj.timestamp), + "blockHeight": msg.block_height.map(|obj| obj.block_height), + "parentSlot": msg.parent_slot, + "parentBlockhash": msg.parent_blockhash, + "executedTransactionCount": msg.executed_transaction_count, + "entriesCount": msg.entries_count, + }), ); - continue; } - Some(UpdateOneof::TransactionStatus(status)) => { - let status: TransactionStatusPretty = status.into(); - info!( - "new transaction update: filters {:?}, transaction status: {:?}", - msg.filters, status + Some(UpdateOneof::Block(msg)) => { + print_update( + "block", + &filters, + json!({ + "slot": msg.slot, + "blockhash": msg.blockhash, + "rewards": if let Some(rewards) = msg.rewards { + Some(convert_from::create_rewards_obj(rewards).map_err(|error| anyhow::anyhow!(error))?) + } else { + None + }, + "blockTime": msg.block_time.map(|obj| obj.timestamp), + "blockHeight": msg.block_height.map(|obj| obj.block_height), + "parentSlot": msg.parent_slot, + "parentBlockhash": msg.parent_blockhash, + "executedTransactionCount": msg.executed_transaction_count, + "transactions": msg.transactions.into_iter().map(create_pretty_transaction).collect::>()?, + "updatedAccountCount": msg.updated_account_count, + "accounts": msg.accounts.into_iter().map(create_pretty_account).collect::>()?, + "entriesCount": msg.entries_count, + "entries": msg.entries.into_iter().map(create_pretty_entry).collect::>()?, + }), ); - continue; } Some(UpdateOneof::Ping(_)) => { // This is necessary to keep load balancers that expect client pings alive. If your load balancer doesn't @@ -679,9 +712,12 @@ async fn geyser_subscribe( }) .await?; } - _ => {} + Some(UpdateOneof::Pong(_)) => {} + None => { + error!("update not found in the message"); + break; + } } - info!("new message: {msg:?}") } Err(error) => { error!("error: {error:?}"); @@ -715,3 +751,71 @@ async fn geyser_subscribe( info!("stream closed"); Ok(()) } + +fn crate_progress_bar( + pb: &MultiProgress, + kind: &str, + elapsed: bool, +) -> Result { + let pb = pb.add(ProgressBar::no_length()); + let elapsed = if elapsed { " in {elapsed_precise}" } else { "" }; + let tpl = format!("{{spinner}} {kind}: {{msg}} / ~{{bytes}} (~{{bytes_per_sec}}){elapsed}"); + pb.set_style(ProgressStyle::with_template(&tpl)?); + Ok(pb) +} + +fn format_thousands(value: u64) -> String { + value + .to_string() + .as_bytes() + .rchunks(3) + .rev() + .map(std::str::from_utf8) + .collect::, _>>() + .expect("invalid number") + .join(",") +} + +fn create_pretty_account(account: SubscribeUpdateAccountInfo) -> anyhow::Result { + Ok(json!({ + "pubkey": Pubkey::try_from(account.pubkey).map_err(|_| anyhow::anyhow!("invalid account pubkey"))?.to_string(), + "lamports": account.lamports, + "owner": Pubkey::try_from(account.owner).map_err(|_| anyhow::anyhow!("invalid account owner"))?.to_string(), + "executable": account.executable, + "rentEpoch": account.rent_epoch, + "data": hex::encode(account.data), + "writeVersion": account.write_version, + "txnSignature": account.txn_signature.map(|sig| bs58::encode(sig).into_string()), + })) +} + +fn create_pretty_transaction(tx: SubscribeUpdateTransactionInfo) -> anyhow::Result { + Ok(json!({ + "signature": Signature::try_from(tx.signature.as_slice()).context("invalid signature")?.to_string(), + "isVote": tx.is_vote, + "tx": convert_from::create_tx_with_meta(tx) + .map_err(|error| anyhow::anyhow!(error)) + .context("invalid tx with meta")? + .encode(UiTransactionEncoding::Base64, Some(u8::MAX), true) + .context("failed to encode transaction")?, + })) +} + +fn create_pretty_entry(msg: SubscribeUpdateEntry) -> anyhow::Result { + Ok(json!({ + "slot": msg.slot, + "index": msg.index, + "numHashes": msg.num_hashes, + "hash": Hash::new_from_array(<[u8; 32]>::try_from(msg.hash.as_slice()).context("invalid entry hash")?).to_string(), + "executedTransactionCount": msg.executed_transaction_count, + "startingTransactionIndex": msg.starting_transaction_index, + })) +} + +fn print_update(kind: &str, filters: &[String], value: Value) { + info!( + "{kind} ({}): {}", + filters.join(","), + serde_json::to_string(&value).expect("json serialization failed") + ); +} diff --git a/yellowstone-grpc-geyser/config.json b/yellowstone-grpc-geyser/config.json index e9004630..ea9a5962 100644 --- a/yellowstone-grpc-geyser/config.json +++ b/yellowstone-grpc-geyser/config.json @@ -1,89 +1,81 @@ { - "libpath": "../target/debug/libyellowstone_grpc_geyser.so", - "log": { - "level": "info" + "libpath": "../target/debug/libyellowstone_grpc_geyser.so", + "log": { + "level": "info" + }, + "grpc": { + "address": "0.0.0.0:10000", + "tls_config": { + "cert_path": "", + "key_path": "" }, - "grpc": { - "address": "0.0.0.0:10000", - "tls_config": { - "cert_path": "", - "key_path": "" - }, - "compression": { - "accept": [ - "gzip" - ], - "send": [ - "gzip" - ] - }, - "max_decoding_message_size": "4_194_304", - "snapshot_plugin_channel_capacity": null, - "snapshot_client_channel_capacity": "50_000_000", - "channel_capacity": "100_000", - "unary_concurrency_limit": 100, - "unary_disabled": false, - "x_token": null, - "filter_name_size_limit": 32, - "filter_names_size_limit": 1024, - "filter_names_cleanup_interval": "1s", - "filters": { - "accounts": { - "max": 1, - "any": false, - "account_max": 10, - "account_reject": [ - "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" - ], - "owner_max": 10, - "owner_reject": [ - "11111111111111111111111111111111" - ] - }, - "slots": { - "max": 1 - }, - "transactions": { - "max": 1, - "any": false, - "account_include_max": 10, - "account_include_reject": [ - "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" - ], - "account_exclude_max": 10, - "account_required_max": 10 - }, - "transactions_status": { - "max": 1, - "any": false, - "account_include_max": 10, - "account_include_reject": [ - "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" - ], - "account_exclude_max": 10, - "account_required_max": 10 - }, - "blocks": { - "max": 1, - "account_include_max": 10, - "account_include_any": false, - "account_include_reject": [ - "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" - ], - "include_transactions": true, - "include_accounts": false, - "include_entries": false - }, - "blocks_meta": { - "max": 1 - }, - "entry": { - "max": 1 - } - } + "compression": { + "accept": ["gzip"], + "send": ["gzip"] }, - "prometheus": { - "address": "0.0.0.0:8999" - }, - "block_fail_action": "log" + "max_decoding_message_size": "4_194_304", + "snapshot_plugin_channel_capacity": null, + "snapshot_client_channel_capacity": "50_000_000", + "channel_capacity": "100_000", + "unary_concurrency_limit": 100, + "unary_disabled": false, + "x_token": null, + "filter_name_size_limit": 32, + "filter_names_size_limit": 1024, + "filter_names_cleanup_interval": "1s", + "filters": { + "accounts": { + "max": 1, + "any": false, + "account_max": 10, + "account_reject": ["TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA"], + "owner_max": 10, + "owner_reject": ["11111111111111111111111111111111"] + }, + "slots": { + "max": 1 + }, + "transactions": { + "max": 1, + "any": false, + "account_include_max": 10, + "account_include_reject": [ + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" + ], + "account_exclude_max": 10, + "account_required_max": 10 + }, + "transactions_status": { + "max": 1, + "any": false, + "account_include_max": 10, + "account_include_reject": [ + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" + ], + "account_exclude_max": 10, + "account_required_max": 10 + }, + "blocks": { + "max": 1, + "account_include_max": 10, + "account_include_any": false, + "account_include_reject": [ + "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA" + ], + "include_transactions": true, + "include_accounts": false, + "include_entries": false + }, + "blocks_meta": { + "max": 1 + }, + "entry": { + "max": 1 + } + } + }, + "prometheus": { + "address": "0.0.0.0:8999" + }, + "block_fail_action": "log" } diff --git a/yellowstone-grpc-proto/src/lib.rs b/yellowstone-grpc-proto/src/lib.rs index 40868a21..1c83762d 100644 --- a/yellowstone-grpc-proto/src/lib.rs +++ b/yellowstone-grpc-proto/src/lib.rs @@ -291,8 +291,8 @@ pub mod convert_from { }, solana_transaction_status::{ ConfirmedBlock, InnerInstruction, InnerInstructions, Reward, RewardType, - TransactionStatusMeta, TransactionTokenBalance, TransactionWithStatusMeta, - VersionedTransactionWithStatusMeta, + RewardsAndNumPartitions, TransactionStatusMeta, TransactionTokenBalance, + TransactionWithStatusMeta, VersionedTransactionWithStatusMeta, }, }; @@ -518,6 +518,17 @@ pub mod convert_from { }) } + pub fn create_rewards_obj(rewards: proto::Rewards) -> Result { + Ok(RewardsAndNumPartitions { + rewards: rewards + .rewards + .into_iter() + .map(create_reward) + .collect::>()?, + num_partitions: rewards.num_partitions.map(|wrapper| wrapper.num_partitions), + }) + } + pub fn create_reward(reward: proto::Reward) -> Result { Ok(Reward { pubkey: reward.pubkey,