From 087cc7f204d8aa4331d5802852bba25499697127 Mon Sep 17 00:00:00 2001 From: galactus <96341601+godmodegalactus@users.noreply.github.com> Date: Mon, 25 Mar 2024 17:44:57 +0100 Subject: [PATCH] Merge prod main 2024 03 25 (#369) * Use jemalloc * Solving issue of finalized meta after processed block (#365) * Solving issue of finalized meta after processed block * Fixing the broken test * Removing unwanted logs * Add logs on block queues and reception time * reduce level of block reception log * Revert "Solving issue of finalized meta after processed block (#365)" This reverts commit c09700fd7963f0946d282a25ffd07f90e9384b46. Production runs show more stable memory use but worse performance on landing transactions. * use yellowstone grpc with hacked-windowsize * upgraded geyser-grpc-connector + increased timeout * Fix block stream throughput problems By temporarily pasting in a function to connect to block streams via a more generously configured endpoint. * Enabling unstable tokio on fly * update to v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize3 --------- Co-authored-by: Christian Kamm Co-authored-by: GroovieGermanikus --- Cargo.lock | 126 ++++++++------- Cargo.toml | 1 + cluster-endpoints/Cargo.toml | 5 +- cluster-endpoints/src/grpc_subscription.rs | 180 +++++++++++++++++++++ 4 files changed, 248 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 434bdf77..59873651 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,18 +433,18 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] name = "async-trait" -version = "0.1.78" +version = "0.1.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85" +checksum = "a507401cad91ec6a857ed5513a2073c82a9b9048762b885bb98655b306964681" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -520,9 +520,9 @@ dependencies = [ [[package]] name = "backtrace" -version = "0.3.70" +version = "0.3.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95d8e92cac0961e91dbd517496b00f7e9b92363dbe6d42c3198268323798860c" +checksum = "26b05800d2e817c8b3b4b54abd461726265fa9789ae34330622f2db9ee696f9d" dependencies = [ "addr2line", "cc", @@ -816,7 +816,7 @@ checksum = "4da9a32f3fed317401fa3c862968128267c3106685286e15d5aaa3d7389c2f60" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -827,9 +827,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" [[package]] name = "cap" @@ -949,7 +949,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -1242,7 +1242,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -1253,7 +1253,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -1408,7 +1408,7 @@ checksum = "487585f4d0c6655fe74905e2504d8ad6908e4db67f744eb140876906c2f3175d" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -1431,7 +1431,7 @@ checksum = "a6cbae11b3de8fce2a456e8ea3dada226b35fe791f0dc1d360c0941f0bb681f3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -1519,7 +1519,7 @@ checksum = "03cdc46ec28bd728e67540c528013c6a10eb69a02eb31078a1bda695438cbfb8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -1565,9 +1565,9 @@ checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" [[package]] name = "fastrand" -version = "2.0.1" +version = "2.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25cbce373ec4653f1a01a31e8a5e5ec0c622dc27ff9c4e6606eefef5cbbed4a5" +checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" [[package]] name = "feature-probe" @@ -1683,7 +1683,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -1776,7 +1776,7 @@ dependencies = [ [[package]] name = "geyser-grpc-connector" version = "0.10.1+yellowstone.1.12" -source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15#7e2fd7f97d69335fcb14fafc67287759af268733" +source = "git+https://github.com/blockworks-foundation/geyser-grpc-connector.git?tag=v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize3#ae56e0f5f894933bea046e8f220f74df3eab5355" dependencies = [ "anyhow", "async-stream", @@ -1789,6 +1789,7 @@ dependencies = [ "merge-streams", "solana-sdk", "tokio", + "tonic-health", "tracing", "url", "yellowstone-grpc-client", @@ -1870,7 +1871,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap 2.2.5", + "indexmap 2.2.6", "slab", "tokio", "tokio-util", @@ -2153,9 +2154,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.2.5" +version = "2.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b0b929d511467233429c45a44ac1dcaa21ba0f5ba11e4879e6ed28ddb4f9df4" +checksum = "168fb715dda47215e360912c096649d23d58bf392ac62f73919e831745e40f26" dependencies = [ "equivalent", "hashbrown 0.14.3", @@ -2857,7 +2858,7 @@ checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -2947,7 +2948,7 @@ dependencies = [ "proc-macro-crate 1.3.1", "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -2959,7 +2960,7 @@ dependencies = [ "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -3021,7 +3022,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -3138,7 +3139,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" dependencies = [ "fixedbitset", - "indexmap 2.2.5", + "indexmap 2.2.6", ] [[package]] @@ -3176,7 +3177,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -3289,12 +3290,12 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "prettyplease" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5" +checksum = "8d3928fb5db768cb86f891ff014f0144589297e3c6a1aba6ed7cecfdace270c7" dependencies = [ "proc-macro2", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -3376,7 +3377,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.53", + "syn 2.0.55", "tempfile", "which", ] @@ -3391,7 +3392,7 @@ dependencies = [ "itertools 0.11.0", "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -3435,7 +3436,7 @@ checksum = "9e2e25ee72f5b24d773cae88422baddefff7714f97aab68d96fe2b6fc4a28fb2" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -3583,9 +3584,9 @@ checksum = "e6e97ca3dbabd81e6033cfe09f0cef37c89f34f2a9223cab7cf99305d46a9633" [[package]] name = "rayon" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4963ed1bc86e4f3ee217022bd855b297cef07fb9eac5dfa1f788b220b49b3bd" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" dependencies = [ "either", "rayon-core", @@ -3647,9 +3648,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.3" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62dbe01f0b06f9d8dc7d49e05a0785f153b00b2c227856282f671e0318c9b15" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", @@ -3919,7 +3920,7 @@ checksum = "1db149f81d46d2deba7cd3c50772474707729550221e69588478ebf9ada425ae" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -3993,7 +3994,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -4038,7 +4039,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -4272,7 +4273,7 @@ dependencies = [ "dashmap 4.0.2", "futures", "futures-util", - "indexmap 2.2.5", + "indexmap 2.2.6", "indicatif", "log", "quinn", @@ -4318,7 +4319,7 @@ dependencies = [ "bincode", "crossbeam-channel", "futures-util", - "indexmap 2.2.5", + "indexmap 2.2.6", "log", "rand 0.8.5", "rayon", @@ -4369,7 +4370,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -4566,6 +4567,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tonic-health", "tracing", "yellowstone-grpc-client", "yellowstone-grpc-proto", @@ -5150,7 +5152,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -5170,7 +5172,7 @@ dependencies = [ "crossbeam-channel", "futures-util", "histogram", - "indexmap 2.2.5", + "indexmap 2.2.6", "itertools 0.10.5", "libc", "log", @@ -5215,7 +5217,7 @@ dependencies = [ "async-trait", "bincode", "futures-util", - "indexmap 2.2.5", + "indexmap 2.2.6", "indicatif", "log", "rayon", @@ -5413,7 +5415,7 @@ checksum = "07fd7858fc4ff8fb0e34090e41d7eb06a823e1057945c26d480bfc21d2338a93" dependencies = [ "quote", "spl-discriminator-syn", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -5425,7 +5427,7 @@ dependencies = [ "proc-macro2", "quote", "sha2 0.10.8", - "syn 2.0.53", + "syn 2.0.55", "thiserror", ] @@ -5482,7 +5484,7 @@ dependencies = [ "proc-macro2", "quote", "sha2 0.10.8", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -5642,9 +5644,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.53" +version = "2.0.55" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032" +checksum = "002a1b3dbf967edfafc32655d0f377ab0bb7b994aa1d32c8cc7e9b8bf3ebb8f0" dependencies = [ "proc-macro2", "quote", @@ -5743,7 +5745,7 @@ checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -5858,7 +5860,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -5970,7 +5972,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.2.5", + "indexmap 2.2.6", "toml_datetime", "winnow", ] @@ -5981,7 +5983,7 @@ version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8534fd7f78b5405e860340ad6575217ce99f38d4d5c8f2442cb5ecb50090e1" dependencies = [ - "indexmap 2.2.5", + "indexmap 2.2.6", "toml_datetime", "winnow", ] @@ -6027,7 +6029,7 @@ dependencies = [ "proc-macro2", "prost-build", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -6125,7 +6127,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -6393,7 +6395,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", "wasm-bindgen-shared", ] @@ -6427,7 +6429,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6751,7 +6753,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] @@ -6771,7 +6773,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.53", + "syn 2.0.55", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index c93e9fac..489c37aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,3 +88,4 @@ solana-lite-rpc-accounts-on-demand = {path = "accounts-on-demand", version = "0. async-trait = "0.1.68" yellowstone-grpc-client = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" } yellowstone-grpc-proto = { git = "https://github.com/rpcpool/yellowstone-grpc.git", tag = "v1.12.0+solana.1.17.15" } +tonic-health = "0.10" diff --git a/cluster-endpoints/Cargo.toml b/cluster-endpoints/Cargo.toml index faf31155..333b5096 100644 --- a/cluster-endpoints/Cargo.toml +++ b/cluster-endpoints/Cargo.toml @@ -9,7 +9,7 @@ license = "AGPL" [dependencies] #geyser-grpc-connector = { path = "../../geyser-grpc-connector" } -geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" } +geyser-grpc-connector = { tag = "v0.10.3+yellowstone.1.12+solana.1.17.15-hacked-windowsize3", git = "https://github.com/blockworks-foundation/geyser-grpc-connector.git" } solana-sdk = { workspace = true } solana-rpc-client-api = { workspace = true } @@ -46,4 +46,5 @@ yellowstone-grpc-client = { workspace = true } yellowstone-grpc-proto = { workspace = true } itertools = {workspace = true} prometheus = { workspace = true } -lazy_static = { workspace = true } \ No newline at end of file +lazy_static = { workspace = true } +tonic-health = { workspace = true } diff --git a/cluster-endpoints/src/grpc_subscription.rs b/cluster-endpoints/src/grpc_subscription.rs index 2f6a0bcd..2e3baae9 100644 --- a/cluster-endpoints/src/grpc_subscription.rs +++ b/cluster-endpoints/src/grpc_subscription.rs @@ -3,6 +3,8 @@ use crate::grpc::gprc_accounts_streaming::create_grpc_account_streaming; use crate::grpc_multiplex::{ create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_processed_slots_subscription, }; +use anyhow::Context; +use futures::StreamExt; use geyser_grpc_connector::GrpcSourceConfig; use itertools::Itertools; use log::trace; @@ -30,8 +32,13 @@ use solana_sdk::{ }; use solana_transaction_status::{Reward, RewardType}; use std::cell::OnceCell; +use std::collections::HashMap; use std::sync::Arc; use tracing::trace_span; +use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof; +use yellowstone_grpc_proto::geyser::{ + CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterSlots, SubscribeUpdateSlot, +}; use crate::rpc_polling::vote_accounts_and_cluster_info_polling::{ poll_cluster_info, poll_vote_accounts, @@ -259,6 +266,179 @@ fn map_compute_budget_instructions(message: &VersionedMessage) -> (Option, (cu_requested, prioritization_fees) } +// TODO: use function from geyser-grpc-connector +use bytes::Bytes; +use std::time::Duration; +use tonic::metadata::{errors::InvalidMetadataValue, AsciiMetadataValue}; +use tonic::service::Interceptor; +use tonic::transport::ClientTlsConfig; +use tonic_health::pb::health_client::HealthClient; +use yellowstone_grpc_client::{GeyserGrpcClient, InterceptorXToken}; +use yellowstone_grpc_proto::geyser::geyser_client::GeyserClient; +use yellowstone_grpc_proto::tonic; + +// note: not called +async fn connect_with_timeout_hacked( + endpoint: E, + x_token: Option, +) -> anyhow::Result> +where + E: Into, + T: TryInto, +{ + let endpoint = tonic::transport::Endpoint::from_shared(endpoint)? + .buffer_size(Some(65536)) + .initial_connection_window_size(4194304) + .initial_stream_window_size(4194304) + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(10)) + // .http2_adaptive_window() + .tls_config(ClientTlsConfig::new())?; + + let x_token: Option = x_token.map(|v| v.try_into()).transpose()?; + let interceptor = InterceptorXToken { x_token }; + + let channel = endpoint.connect_lazy(); + let client = GeyserGrpcClient::new( + HealthClient::with_interceptor(channel.clone(), interceptor.clone()), + GeyserClient::with_interceptor(channel, interceptor) + .max_decoding_message_size(GeyserGrpcClient::max_decoding_message_size()), + ); + Ok(client) +} + +// note used +pub fn create_block_processing_task( + grpc_addr: String, + grpc_x_token: Option, + block_sx: async_channel::Sender, + commitment_level: CommitmentLevel, +) -> AnyhowJoinHandle { + tokio::spawn(async move { + loop { + let mut blocks_subs = HashMap::new(); + blocks_subs.insert( + "block_client".to_string(), + SubscribeRequestFilterBlocks { + account_include: Default::default(), + include_transactions: Some(true), + include_accounts: Some(false), + include_entries: Some(false), + }, + ); + + // connect to grpc + let mut client = + connect_with_timeout_hacked(grpc_addr.clone(), grpc_x_token.clone()).await?; + let mut stream = client + .subscribe_once( + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + blocks_subs, + Default::default(), + Some(commitment_level), + Default::default(), + None, + ) + .await?; + + while let Some(message) = stream.next().await { + let message = message?; + + let Some(update) = message.update_oneof else { + continue; + }; + + match update { + UpdateOneof::Block(block) => { + log::trace!( + "received block, hash: {} slot: {}", + block.blockhash, + block.slot + ); + block_sx + .send(block) + .await + .context("Problem sending on block channel")?; + } + UpdateOneof::Ping(_) => { + log::trace!("GRPC Ping"); + } + _ => { + log::trace!("unknown GRPC notification"); + } + }; + } + log::error!("Grpc block subscription broken (resubscribing)"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + }) +} + +// not used +pub fn create_slot_stream_task( + grpc_addr: String, + grpc_x_token: Option, + slot_sx: async_channel::Sender, + commitment_level: CommitmentLevel, +) -> AnyhowJoinHandle { + tokio::spawn(async move { + loop { + let mut slots = HashMap::new(); + slots.insert( + "client_slot".to_string(), + SubscribeRequestFilterSlots { + filter_by_commitment: Some(true), + }, + ); + + // connect to grpc + let mut client = + GeyserGrpcClient::connect(grpc_addr.clone(), grpc_x_token.clone(), None)?; + let mut stream = client + .subscribe_once( + slots, + Default::default(), + HashMap::new(), + Default::default(), + HashMap::new(), + Default::default(), + Some(commitment_level), + Default::default(), + None, + ) + .await?; + + while let Some(message) = stream.next().await { + let message = message?; + + let Some(update) = message.update_oneof else { + continue; + }; + + match update { + UpdateOneof::Slot(slot) => { + slot_sx + .send(slot) + .await + .context("Problem sending on block channel")?; + } + UpdateOneof::Ping(_) => { + log::trace!("GRPC Ping"); + } + _ => { + log::trace!("unknown GRPC notification"); + } + }; + } + log::error!("Grpc block subscription broken (resubscribing)"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + }) +} + pub fn create_grpc_subscription( rpc_client: Arc, grpc_sources: Vec,