diff --git a/prdoc/pr_5129.prdoc b/prdoc/pr_5129.prdoc new file mode 100644 index 000000000000..beb7eb4d282d --- /dev/null +++ b/prdoc/pr_5129.prdoc @@ -0,0 +1,16 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Prevent finalized notification hoarding in beefy gadget + +doc: + - audience: Node Operator + description: | + This PR fixes the error message "Notification block pinning limit + reached." during warp sync. Finality notifications in BEEFY are now + constantly being consumed and don't keep blocks pinned for extended + periods of time. + +crates: + - name: sc-consensus-beefy + bump: minor diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs index a47bfe1dbe24..f7c999aad591 100644 --- a/substrate/client/consensus/beefy/src/lib.rs +++ b/substrate/client/consensus/beefy/src/lib.rs @@ -35,10 +35,11 @@ use futures::{stream::Fuse, FutureExt, StreamExt}; use log::{debug, error, info, warn}; use parking_lot::Mutex; use prometheus_endpoint::Registry; -use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer}; +use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotification, Finalizer}; use sc_consensus::BlockImport; use sc_network::{NetworkRequest, NotificationService, ProtocolName}; use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing}; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; use sp_api::ProvideRuntimeApi; use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend}; use sp_consensus::{Error as ConsensusError, SyncOracle}; @@ -49,7 +50,9 @@ use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero}; use std::{ collections::{BTreeMap, VecDeque}, + future::Future, marker::PhantomData, + pin::Pin, sync::Arc, time::Duration, }; @@ -87,6 +90,8 @@ const LOG_TARGET: &str = "beefy"; const HEADER_SYNC_DELAY: Duration = Duration::from_secs(60); +type FinalityNotifications = + sc_utils::mpsc::TracingUnboundedReceiver>; /// A convenience BEEFY client trait that defines all the type bounds a BEEFY client /// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as /// of today, Rust does not allow a type alias to be used as a trait bound. Tracking @@ -484,6 +489,30 @@ where } } +/// Finality notification for consumption by BEEFY worker. +/// This is a stripped down version of `sc_client_api::FinalityNotification` which does not keep +/// blocks pinned. +struct UnpinnedFinalityNotification { + /// Finalized block header hash. + pub hash: B::Hash, + /// Finalized block header. + pub header: B::Header, + /// Path from the old finalized to new finalized parent (implicitly finalized blocks). + /// + /// This maps to the range `(old_finalized, new_finalized)`. + pub tree_route: Arc<[B::Hash]>, +} + +impl From> for UnpinnedFinalityNotification { + fn from(value: FinalityNotification) -> Self { + UnpinnedFinalityNotification { + hash: value.hash, + header: value.header, + tree_route: value.tree_route, + } + } +} + /// Start the BEEFY gadget. /// /// This is a thin shim around running and awaiting a BEEFY worker. @@ -525,10 +554,13 @@ pub async fn start_beefy_gadget( let metrics = register_metrics(prometheus_registry.clone()); + let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse(); + // Subscribe to finality notifications and justifications before waiting for runtime pallet and // reuse the streams, so we don't miss notifications while waiting for pallet to be available. - let mut finality_notifications = client.finality_notification_stream().fuse(); - let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse(); + let finality_notifications = client.finality_notification_stream(); + let (mut transformer, mut finality_notifications) = + finality_notification_transformer_future(finality_notifications); let known_peers = Arc::new(Mutex::new(KnownPeers::new())); // Default votes filter is to discard everything. @@ -582,7 +614,11 @@ pub async fn start_beefy_gadget( _ = &mut beefy_comms.gossip_engine => { error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated."); return - } + }, + _ = &mut transformer => { + error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated."); + return + }, }; let worker = worker_builder.build( @@ -594,30 +630,54 @@ pub async fn start_beefy_gadget( is_authority, ); - match futures::future::select( - Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)), - Box::pin(on_demand_justifications_handler.run()), - ) - .await - { - // On `ConsensusReset` error, just reinit and restart voter. - futures::future::Either::Left(((error::Error::ConsensusReset, reuse_comms), _)) => { - error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset); - beefy_comms = reuse_comms; - continue; - }, - // On other errors, bring down / finish the task. - futures::future::Either::Left(((worker_err, _), _)) => { - error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err) + futures::select! { + result = worker.run(&mut block_import_justif, &mut finality_notifications).fuse() => { + match result { + (error::Error::ConsensusReset, reuse_comms) => { + error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset); + beefy_comms = reuse_comms; + continue; + }, + (err, _) => { + error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", err) + } + } }, - futures::future::Either::Right((odj_handler_err, _)) => { - error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err) + odj_handler_error = on_demand_justifications_handler.run().fuse() => { + error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_error) }, - }; + _ = &mut transformer => { + error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated."); + } + } return; } } +/// Produce a future that transformes finality notifications into a struct that does not keep blocks +/// pinned. +fn finality_notification_transformer_future( + mut finality_notifications: sc_client_api::FinalityNotifications, +) -> ( + Pin + Sized>>>, + Fuse>>, +) +where + B: Block, +{ + let (tx, rx) = tracing_unbounded("beefy-notification-transformer-channel", 10000); + let transformer_fut = async move { + while let Some(notification) = finality_notifications.next().await { + debug!(target: LOG_TARGET, "🥩 Transforming grandpa notification. #{}({:?})", notification.header.number(), notification.hash); + if let Err(err) = tx.unbounded_send(UnpinnedFinalityNotification::from(notification)) { + error!(target: LOG_TARGET, "🥩 Unable to send transformed notification. Shutting down. err = {}", err); + return + }; + } + }; + (Box::pin(transformer_fut.fuse()), rx.fuse()) +} + /// Waits until the parent header of `current` is available and returns it. /// /// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers. diff --git a/substrate/client/consensus/beefy/src/tests.rs b/substrate/client/consensus/beefy/src/tests.rs index d8f5b39dbbaa..4b1dd447320b 100644 --- a/substrate/client/consensus/beefy/src/tests.rs +++ b/substrate/client/consensus/beefy/src/tests.rs @@ -30,13 +30,17 @@ use crate::{ request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler}, }, error::Error, - gossip_protocol_name, + finality_notification_transformer_future, gossip_protocol_name, justification::*, wait_for_runtime_pallet, worker::PersistedState, - BeefyRPCLinks, BeefyVoterLinks, BeefyWorkerBuilder, KnownPeers, + BeefyRPCLinks, BeefyVoterLinks, BeefyWorkerBuilder, KnownPeers, UnpinnedFinalityNotification, +}; +use futures::{ + future, + stream::{Fuse, FuturesUnordered}, + Future, FutureExt, StreamExt, }; -use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt}; use parking_lot::Mutex; use sc_block_builder::BlockBuilderBuilder; use sc_client_api::{Backend as BackendT, BlockchainEvents, FinalityNotifications, HeaderBackend}; @@ -49,7 +53,7 @@ use sc_network_test::{ Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, Peer, PeersClient, PeersFullClient, TestNetFactory, }; -use sc_utils::notification::NotificationReceiver; +use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver}; use serde::{Deserialize, Serialize}; use sp_api::{ApiRef, ProvideRuntimeApi}; use sp_application_crypto::key_types::BEEFY as BEEFY_KEY_TYPE; @@ -371,7 +375,7 @@ pub(crate) fn create_beefy_keystore(authority: &BeefyKeyring) -> Ke async fn voter_init_setup( net: &mut BeefyTestNet, - finality: &mut futures::stream::Fuse>, + finality: &mut futures::stream::Fuse>, api: &TestApi, ) -> Result, Error> { let backend = net.peer(0).client().as_backend(); @@ -391,6 +395,15 @@ async fn voter_init_setup( .await } +fn start_finality_worker( + finality: FinalityNotifications, +) -> Fuse>> { + let (transformer, finality_notifications) = finality_notification_transformer_future(finality); + let tokio_handle = tokio::runtime::Handle::current(); + tokio_handle.spawn(transformer); + finality_notifications +} + // Spawns beefy voters. Returns a future to spawn on the runtime. fn initialize_beefy( net: &mut BeefyTestNet, @@ -1020,13 +1033,17 @@ async fn should_initialize_voter_at_genesis() { // push 15 blocks with `AuthorityChange` digests every 10 blocks let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); + // finalize 13 without justifications net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap(); let api = TestApi::with_validator_set(&validator_set); // load persistent state - nothing in DB, should init at genesis - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with two sessions starting at blocks 1 and 10 @@ -1061,14 +1078,18 @@ async fn should_initialize_voter_at_custom_genesis() { // push 15 blocks with `AuthorityChange` digests every 15 blocks let hashes = net.generate_blocks_and_sync(15, 15, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); + // finalize 3, 5, 8 without justifications net.peer(0).client().as_client().finalize_block(hashes[3], None).unwrap(); net.peer(0).client().as_client().finalize_block(hashes[5], None).unwrap(); net.peer(0).client().as_client().finalize_block(hashes[8], None).unwrap(); // load persistent state - nothing in DB, should init at genesis - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with single session starting at block `custom_pallet_genesis` (7) @@ -1098,7 +1119,8 @@ async fn should_initialize_voter_at_custom_genesis() { net.peer(0).client().as_client().finalize_block(hashes[10], None).unwrap(); // load persistent state - state preset in DB, but with different pallet genesis - let new_persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let new_persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // verify voter initialized with single session starting at block `new_pallet_genesis` (10) let sessions = new_persisted_state.voting_oracle().sessions(); @@ -1129,7 +1151,9 @@ async fn should_initialize_voter_when_last_final_is_session_boundary() { // push 15 blocks with `AuthorityChange` digests every 10 blocks let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); // finalize 13 without justifications net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap(); @@ -1153,7 +1177,8 @@ async fn should_initialize_voter_when_last_final_is_session_boundary() { let api = TestApi::with_validator_set(&validator_set); // load persistent state - nothing in DB, should init at session boundary - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // verify voter initialized with single session starting at block 10 assert_eq!(persisted_state.voting_oracle().sessions().len(), 1); @@ -1183,7 +1208,9 @@ async fn should_initialize_voter_at_latest_finalized() { // push 15 blocks with `AuthorityChange` digests every 10 blocks let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); // finalize 13 without justifications net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap(); @@ -1206,7 +1233,8 @@ async fn should_initialize_voter_at_latest_finalized() { let api = TestApi::with_validator_set(&validator_set); // load persistent state - nothing in DB, should init at last BEEFY finalized - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // verify voter initialized with single session starting at block 12 assert_eq!(persisted_state.voting_oracle().sessions().len(), 1); @@ -1239,12 +1267,15 @@ async fn should_initialize_voter_at_custom_genesis_when_state_unavailable() { // push 30 blocks with `AuthorityChange` digests every 5 blocks let hashes = net.generate_blocks_and_sync(30, 5, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); // finalize 30 without justifications net.peer(0).client().as_client().finalize_block(hashes[30], None).unwrap(); // load persistent state - nothing in DB, should init at genesis - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with all sessions pending, first one starting at block 5 (start of @@ -1282,14 +1313,18 @@ async fn should_catch_up_when_loading_saved_voter_state() { // push 30 blocks with `AuthorityChange` digests every 10 blocks let hashes = net.generate_blocks_and_sync(30, 10, &validator_set, false).await; - let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse(); + let finality = net.peer(0).client().as_client().finality_notification_stream(); + + let mut finality_notifications = start_finality_worker(finality); + // finalize 13 without justifications net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap(); let api = TestApi::with_validator_set(&validator_set); // load persistent state - nothing in DB, should init at genesis - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // Test initialization at session boundary. // verify voter initialized with two sessions starting at blocks 1 and 10 @@ -1316,7 +1351,8 @@ async fn should_catch_up_when_loading_saved_voter_state() { // finalize 25 without justifications net.peer(0).client().as_client().finalize_block(hashes[25], None).unwrap(); // load persistent state - state preset in DB - let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap(); + let persisted_state = + voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap(); // Verify voter initialized with old sessions plus a new one starting at block 20. // There shouldn't be any duplicates. diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs index 4a9f7a2d0e3b..ca714ffadd77 100644 --- a/substrate/client/consensus/beefy/src/worker.rs +++ b/substrate/client/consensus/beefy/src/worker.rs @@ -29,14 +29,14 @@ use crate::{ metric_inc, metric_set, metrics::VoterMetrics, round::{Rounds, VoteImportResult}, - BeefyComms, BeefyVoterLinks, LOG_TARGET, + BeefyComms, BeefyVoterLinks, UnpinnedFinalityNotification, LOG_TARGET, }; use sp_application_crypto::RuntimeAppPublic; use codec::{Codec, Decode, DecodeAll, Encode}; use futures::{stream::Fuse, FutureExt, StreamExt}; use log::{debug, error, info, trace, warn}; -use sc_client_api::{Backend, FinalityNotification, FinalityNotifications, HeaderBackend}; +use sc_client_api::{Backend, HeaderBackend}; use sc_utils::notification::NotificationReceiver; use sp_api::ProvideRuntimeApi; use sp_arithmetic::traits::{AtLeast32Bit, Saturating}; @@ -447,20 +447,20 @@ where fn handle_finality_notification( &mut self, - notification: &FinalityNotification, + notification: &UnpinnedFinalityNotification, ) -> Result<(), Error> { let header = ¬ification.header; debug!( target: LOG_TARGET, "🥩 Finality notification: header(number {:?}, hash {:?}) tree_route {:?}", header.number(), - header.hash(), + notification.hash, notification.tree_route, ); self.runtime .runtime_api() - .beefy_genesis(header.hash()) + .beefy_genesis(notification.hash) .ok() .flatten() .filter(|genesis| *genesis == self.persisted_state.pallet_genesis) @@ -845,7 +845,7 @@ where block_import_justif: &mut Fuse< NotificationReceiver>, >, - finality_notifications: &mut Fuse>, + finality_notifications: &mut Fuse>, ) -> (Error, BeefyComms) { info!( target: LOG_TARGET,