From 5dc0670a85d575480e0840204c20a5771cd8f7d3 Mon Sep 17 00:00:00 2001 From: Sebastian Kunert Date: Fri, 26 Jul 2024 16:33:12 +0200 Subject: [PATCH] BEEFY: Disarm finality notifications to prevent pinning (#5129) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This should prevent excessive pinning of blocks while we are waiting for the block ancestry to be downloaded after gap sync. We spawn a new task that gets polled to transform finality notifications into an unpinned counterpart. Before this PR, finality notifications were kept in the notification channel. This led to pinning cache overflows. fixes #4389 --------- Co-authored-by: Bastian Köcher --- prdoc/pr_5129.prdoc | 16 +++ substrate/client/consensus/beefy/src/lib.rs | 104 ++++++++++++++---- substrate/client/consensus/beefy/src/tests.rs | 74 +++++++++---- .../client/consensus/beefy/src/worker.rs | 12 +- 4 files changed, 159 insertions(+), 47 deletions(-) create mode 100644 prdoc/pr_5129.prdoc diff --git a/prdoc/pr_5129.prdoc b/prdoc/pr_5129.prdoc new file mode 100644 index 000000000000..beb7eb4d282d --- /dev/null +++ b/prdoc/pr_5129.prdoc @@ -0,0 +1,16 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Prevent finalized notification hoarding in beefy gadget + +doc: + - audience: Node Operator + description: | + This PR fixes the error message "Notification block pinning limit + reached." during warp sync. Finality notifications in BEEFY are now + constantly being consumed and don't keep blocks pinned for extended + periods of time. + +crates: + - name: sc-consensus-beefy + bump: minor diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs index a47bfe1dbe24..f7c999aad591 100644 --- a/substrate/client/consensus/beefy/src/lib.rs +++ b/substrate/client/consensus/beefy/src/lib.rs @@ -35,10 +35,11 @@ use futures::{stream::Fuse, FutureExt, StreamExt}; use log::{debug, error, info, warn}; use parking_lot::Mutex; use prometheus_endpoint::Registry; -use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer}; +use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotification, Finalizer}; use sc_consensus::BlockImport; use sc_network::{NetworkRequest, NotificationService, ProtocolName}; use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; use sp_api::ProvideRuntimeApi; use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend}; use sp_consensus::{Error as ConsensusError, SyncOracle}; @@ -49,7 +50,9 @@ use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero}; use std::{ collections::{BTreeMap, VecDeque}, + future::Future, marker::PhantomData, + pin::Pin, sync::Arc, time::Duration, }; @@ -87,6 +90,8 @@ const LOG_TARGET: &str = "beefy"; const HEADER_SYNC_DELAY: Duration = Duration::from_secs(60); +type FinalityNotifications = + sc_utils::mpsc::TracingUnboundedReceiver>; /// A convenience BEEFY client trait that defines all the type bounds a BEEFY client /// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as /// of today, Rust does not allow a type alias to be used as a trait bound. Tracking @@ -484,6 +489,30 @@ where } } +/// Finality notification for consumption by BEEFY worker. +/// This is a stripped down version of `sc_client_api::FinalityNotification` which does not keep +/// blocks pinned. +struct UnpinnedFinalityNotification { + /// Finalized block header hash. + pub hash: B::Hash, + /// Finalized block header. + pub header: B::Header, + /// Path from the old finalized to new finalized parent (implicitly finalized blocks). + /// + /// This maps to the range `(old_finalized, new_finalized)`. + pub tree_route: Arc<[B::Hash]>, +} + +impl From> for UnpinnedFinalityNotification { + fn from(value: FinalityNotification) -> Self { + UnpinnedFinalityNotification { + hash: value.hash, + header: value.header, + tree_route: value.tree_route, + } + } +} + /// Start the BEEFY gadget. /// /// This is a thin shim around running and awaiting a BEEFY worker. @@ -525,10 +554,13 @@ pub async fn start_beefy_gadget( let metrics = register_metrics(prometheus_registry.clone()); + let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse(); + // Subscribe to finality notifications and justifications before waiting for runtime pallet and // reuse the streams, so we don't miss notifications while waiting for pallet to be available. - let mut finality_notifications = client.finality_notification_stream().fuse(); - let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse(); + let finality_notifications = client.finality_notification_stream(); + let (mut transformer, mut finality_notifications) = + finality_notification_transformer_future(finality_notifications); let known_peers = Arc::new(Mutex::new(KnownPeers::new())); // Default votes filter is to discard everything. @@ -582,7 +614,11 @@ pub async fn start_beefy_gadget( _ = &mut beefy_comms.gossip_engine => { error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated."); return - } + }, + _ = &mut transformer => { + error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated."); + return + }, }; let worker = worker_builder.build( @@ -594,30 +630,54 @@ pub async fn start_beefy_gadget( is_authority, ); - match futures::future::select( - Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)), - Box::pin(on_demand_justifications_handler.run()), - ) - .await - { - // On `ConsensusReset` error, just reinit and restart voter. - futures::future::Either::Left(((error::Error::ConsensusReset, reuse_comms), _)) => { - error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset); - beefy_comms = reuse_comms; - continue; - }, - // On other errors, bring down / finish the task. - futures::future::Either::Left(((worker_err, _), _)) => { - error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err) + futures::select! { + result = worker.run(&mut block_import_justif, &mut finality_notifications).fuse() => { + match result { + (error::Error::ConsensusReset, reuse_comms) => { + error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset); + beefy_comms = reuse_comms; + continue; + }, + (err, _) => { + error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", err) + } + } }, - futures::future::Either::Right((odj_handler_err, _)) => { - error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err) + odj_handler_error = on_demand_justifications_handler.run().fuse() => { + error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_error) }, - }; + _ = &mut transformer => { + error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated."); + } + } return; } } +/// Produce a future that transformes finality notifications into a struct that does not keep blocks +/// pinned. +fn finality_notification_transformer_future( + mut finality_notifications: sc_client_api::FinalityNotifications, +) -> ( + Pin + Sized>>>, + Fuse>>, +) +where + B: Block, +{ + let (tx, rx) = tracing_unbounded("beefy-notification-transformer-channel", 10000); + let transformer_fut = async move { + while let Some(notification) = finality_notifications.next().await { + debug!(target: LOG_TARGET, "🥩 Transforming grandpa notification. #{}({:?})", notification.header.number(), notification.hash); + if let Err(err) = tx.unbounded_send(UnpinnedFinalityNotification::from(notification)) { + error!(target: LOG_TARGET, "🥩 Unable to send transformed notification. Shutting down. err = {}", err); + return + }; + } + }; + (Box::pin(transformer_fut.fuse()), rx.fuse()) +} + /// Waits until the parent header of `current` is available and returns it. /// /// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers. diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs index d8f5b39dbbaa..4b1dd447320b 100644 --- a/substrate/client/consensus/beefy/src/tests.rs +++ b/substrate/client/consensus/beefy/src/tests.rs @@ -30,13 +30,17 @@ use crate::{ request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler}, }, error::Error, - gossip_protocol_name, + finality_notification_transformer_future, gossip_protocol_name, justification::*, wait_for_runtime_pallet, worker::PersistedState, - BeefyRPCLinks, BeefyVoterLinks, BeefyWorkerBuilder, KnownPeers, + BeefyRPCLinks, BeefyVoterLinks, BeefyWorkerBuilder, KnownPeers, UnpinnedFinalityNotification, +}; +use futures::{ + future, + stream::{Fuse, FuturesUnordered}, + Future, FutureExt, StreamExt, }; -use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use parking_lot::Mutex; use sc_block_builder::BlockBuilderBuilder; use sc_client_api::{Backend as BackendT, BlockchainEvents, FinalityNotifications, HeaderBackend}; @@ -49,7 +53,7 @@ use sc_network_test::{ Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, Peer, PeersClient, PeersFullClient, TestNetFactory, }; -use sc_utils::notification::NotificationReceiver; +use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver}; use serde::{Deserialize, Serialize}; use sp_api::{ApiRef, ProvideRuntimeApi}; use sp_application_crypto::key_types::BEEFY as BEEFY_KEY_TYPE; @@ -371,7 +375,7 @@ pub(crate) fn create_beefy_keystore(authority: &BeefyKeyring) -> Ke async fn voter_init_setup( net: &mut BeefyTestNet, - finality: &mut futures::stream::Fuse>, + finality: &mut futures::stream::Fuse>, api: &TestApi, ) -> Result, Error> { let backend = net.peer(0).client().as_backend(); @@ -391,6 +395,15 @@ async fn voter_init_setup( .await } +fn start_finality_worker( + finality: FinalityNotifications, +) -> Fuse>> { + let (transformer, finality_notifications) = finality_notification_transformer_future(finality); + let tokio_handle = tokio::runtime::Handle::current(); + tokio_handle.spawn(transformer); + finality_notifications +} + // Spawns beefy voters. Returns a future to spawn on the runtime. fn initialize_beefy( net: &mut BeefyTestNet, @@ -1020,13 +1033,17 @@ async fn should_initialize_voter_at_genesis() { // push 15 blocks with `AuthorityChange` digests every 10 blocks let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); + // finalize 13 without justifications net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap(); let api = TestApi::with_validator_set(&validator_set); // load persistent state - nothing in DB, should init at genesis - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with two sessions starting at blocks 1 and 10 @@ -1061,14 +1078,18 @@ async fn should_initialize_voter_at_custom_genesis() { // push 15 blocks with `AuthorityChange` digests every 15 blocks let hashes = net.generate_blocks_and_sync(15, 15, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); + // finalize 3, 5, 8 without justifications net.peer(0).client().as_client().finalize_block(hashes[3], None).unwrap(); net.peer(0).client().as_client().finalize_block(hashes[5], None).unwrap(); net.peer(0).client().as_client().finalize_block(hashes[8], None).unwrap(); // load persistent state - nothing in DB, should init at genesis - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with single session starting at block `custom_pallet_genesis` (7) @@ -1098,7 +1119,8 @@ async fn should_initialize_voter_at_custom_genesis() { net.peer(0).client().as_client().finalize_block(hashes[10], None).unwrap(); // load persistent state - state preset in DB, but with different pallet genesis - let new_persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let new_persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // verify voter initialized with single session starting at block `new_pallet_genesis` (10) let sessions = new_persisted_state.voting_oracle().sessions(); @@ -1129,7 +1151,9 @@ async fn should_initialize_voter_when_last_final_is_session_boundary() { // push 15 blocks with `AuthorityChange` digests every 10 blocks let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); // finalize 13 without justifications net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap(); @@ -1153,7 +1177,8 @@ async fn should_initialize_voter_when_last_final_is_session_boundary() { let api = TestApi::with_validator_set(&validator_set); // load persistent state - nothing in DB, should init at session boundary - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // verify voter initialized with single session starting at block 10 assert_eq!(persisted_state.voting_oracle().sessions().len(), 1); @@ -1183,7 +1208,9 @@ async fn should_initialize_voter_at_latest_finalized() { // push 15 blocks with `AuthorityChange` digests every 10 blocks let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); // finalize 13 without justifications net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap(); @@ -1206,7 +1233,8 @@ async fn should_initialize_voter_at_latest_finalized() { let api = TestApi::with_validator_set(&validator_set); // load persistent state - nothing in DB, should init at last BEEFY finalized - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // verify voter initialized with single session starting at block 12 assert_eq!(persisted_state.voting_oracle().sessions().len(), 1); @@ -1239,12 +1267,15 @@ async fn should_initialize_voter_at_custom_genesis_when_state_unavailable() { // push 30 blocks with `AuthorityChange` digests every 5 blocks let hashes = net.generate_blocks_and_sync(30, 5, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); // finalize 30 without justifications net.peer(0).client().as_client().finalize_block(hashes[30], None).unwrap(); // load persistent state - nothing in DB, should init at genesis - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with all sessions pending, first one starting at block 5 (start of @@ -1282,14 +1313,18 @@ async fn should_catch_up_when_loading_saved_voter_state() { // push 30 blocks with `AuthorityChange` digests every 10 blocks let hashes = net.generate_blocks_and_sync(30, 10, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); + // finalize 13 without justifications net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap(); let api = TestApi::with_validator_set(&validator_set); // load persistent state - nothing in DB, should init at genesis - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with two sessions starting at blocks 1 and 10 @@ -1316,7 +1351,8 @@ async fn should_catch_up_when_loading_saved_voter_state() { // finalize 25 without justifications net.peer(0).client().as_client().finalize_block(hashes[25], None).unwrap(); // load persistent state - state preset in DB - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // Verify voter initialized with old sessions plus a new one starting at block 20. // There shouldn't be any duplicates. diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs index 4a9f7a2d0e3b..ca714ffadd77 100644 --- a/substrate/client/consensus/beefy/src/worker.rs +++ b/substrate/client/consensus/beefy/src/worker.rs @@ -29,14 +29,14 @@ use crate::{ metric_inc, metric_set, metrics::VoterMetrics, round::{Rounds, VoteImportResult}, - BeefyComms, BeefyVoterLinks, LOG_TARGET, + BeefyComms, BeefyVoterLinks, UnpinnedFinalityNotification, LOG_TARGET, }; use sp_application_crypto::RuntimeAppPublic; use codec::{Codec, Decode, DecodeAll, Encode}; use futures::{stream::Fuse, FutureExt, StreamExt}; use log::{debug, error, info, trace, warn}; -use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend}; +use sc_client_api::{Backend, HeaderBackend}; use sc_utils::notification::NotificationReceiver; use sp_api::ProvideRuntimeApi; use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; @@ -447,20 +447,20 @@ where fn handle_finality_notification( &mut self, - notification: &FinalityNotification, + notification: &UnpinnedFinalityNotification, ) -> Result<(), Error> { let header = ¬ification.header; debug!( target: LOG_TARGET, "🥩 Finality notification: header(number {:?}, hash {:?}) tree_route {:?}", header.number(), - header.hash(), + notification.hash, notification.tree_route, ); self.runtime .runtime_api() - .beefy_genesis(header.hash()) + .beefy_genesis(notification.hash) .ok() .flatten() .filter(|genesis| *genesis == self.persisted_state.pallet_genesis) @@ -845,7 +845,7 @@ where block_import_justif: &mut Fuse< NotificationReceiver>, >, - finality_notifications: &mut Fuse>, + finality_notifications: &mut Fuse>, ) -> (Error, BeefyComms) { info!( target: LOG_TARGET,