Skip to content

Commit

Permalink
BEEFY: Disarm finality notifications to prevent pinning (#5129)
Browse files Browse the repository at this point in the history
This should prevent excessive pinning of blocks while we are waiting for
the block ancestry to be downloaded after gap sync.
We spawn a new task that gets polled to transform finality notifications
into an unpinned counterpart. Before this PR, finality notifications
were kept in the notification channel. This led to pinning cache
overflows.

fixes #4389

---------

Co-authored-by: Bastian Köcher <[email protected]>
  • Loading branch information
skunert and bkchr authored Jul 26, 2024
1 parent fc07bda commit 5dc0670
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 47 deletions.
16 changes: 16 additions & 0 deletions prdoc/pr_5129.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: Prevent finalized notification hoarding in beefy gadget

doc:
- audience: Node Operator
description: |
This PR fixes the error message "Notification block pinning limit
reached." during warp sync. Finality notifications in BEEFY are now
constantly being consumed and don't keep blocks pinned for extended
periods of time.

crates:
- name: sc-consensus-beefy
bump: minor
104 changes: 82 additions & 22 deletions substrate/client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ use futures::{stream::Fuse, FutureExt, StreamExt};
use log::{debug, error, info, warn};
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotifications, Finalizer};
use sc_client_api::{Backend, BlockBackend, BlockchainEvents, FinalityNotification, Finalizer};
use sc_consensus::BlockImport;
use sc_network::{NetworkRequest, NotificationService, ProtocolName};
use sc_network_gossip::{GossipEngine, Network as GossipNetwork, Syncing as GossipSyncing};
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver};
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{Backend as BlockchainBackend, HeaderBackend};
use sp_consensus::{Error as ConsensusError, SyncOracle};
Expand All @@ -49,7 +50,9 @@ use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block, Header as HeaderT, NumberFor, Zero};
use std::{
collections::{BTreeMap, VecDeque},
future::Future,
marker::PhantomData,
pin::Pin,
sync::Arc,
time::Duration,
};
Expand Down Expand Up @@ -87,6 +90,8 @@ const LOG_TARGET: &str = "beefy";

const HEADER_SYNC_DELAY: Duration = Duration::from_secs(60);

type FinalityNotifications<Block> =
sc_utils::mpsc::TracingUnboundedReceiver<UnpinnedFinalityNotification<Block>>;
/// A convenience BEEFY client trait that defines all the type bounds a BEEFY client
/// has to satisfy. Ideally that should actually be a trait alias. Unfortunately as
/// of today, Rust does not allow a type alias to be used as a trait bound. Tracking
Expand Down Expand Up @@ -484,6 +489,30 @@ where
}
}

/// Finality notification for consumption by BEEFY worker.
/// This is a stripped down version of `sc_client_api::FinalityNotification` which does not keep
/// blocks pinned.
struct UnpinnedFinalityNotification<B: Block> {
/// Finalized block header hash.
pub hash: B::Hash,
/// Finalized block header.
pub header: B::Header,
/// Path from the old finalized to new finalized parent (implicitly finalized blocks).
///
/// This maps to the range `(old_finalized, new_finalized)`.
pub tree_route: Arc<[B::Hash]>,
}

impl<B: Block> From<FinalityNotification<B>> for UnpinnedFinalityNotification<B> {
fn from(value: FinalityNotification<B>) -> Self {
UnpinnedFinalityNotification {
hash: value.hash,
header: value.header,
tree_route: value.tree_route,
}
}
}

/// Start the BEEFY gadget.
///
/// This is a thin shim around running and awaiting a BEEFY worker.
Expand Down Expand Up @@ -525,10 +554,13 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S, AuthorityId>(

let metrics = register_metrics(prometheus_registry.clone());

let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

// Subscribe to finality notifications and justifications before waiting for runtime pallet and
// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
let mut finality_notifications = client.finality_notification_stream().fuse();
let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();
let finality_notifications = client.finality_notification_stream();
let (mut transformer, mut finality_notifications) =
finality_notification_transformer_future(finality_notifications);

let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
Expand Down Expand Up @@ -582,7 +614,11 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S, AuthorityId>(
_ = &mut beefy_comms.gossip_engine => {
error!(target: LOG_TARGET, "🥩 Gossip engine has unexpectedly terminated.");
return
}
},
_ = &mut transformer => {
error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated.");
return
},
};

let worker = worker_builder.build(
Expand All @@ -594,30 +630,54 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S, AuthorityId>(
is_authority,
);

match futures::future::select(
Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)),
Box::pin(on_demand_justifications_handler.run()),
)
.await
{
// On `ConsensusReset` error, just reinit and restart voter.
futures::future::Either::Left(((error::Error::ConsensusReset, reuse_comms), _)) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
beefy_comms = reuse_comms;
continue;
},
// On other errors, bring down / finish the task.
futures::future::Either::Left(((worker_err, _), _)) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err)
futures::select! {
result = worker.run(&mut block_import_justif, &mut finality_notifications).fuse() => {
match result {
(error::Error::ConsensusReset, reuse_comms) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
beefy_comms = reuse_comms;
continue;
},
(err, _) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", err)
}
}
},
futures::future::Either::Right((odj_handler_err, _)) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err)
odj_handler_error = on_demand_justifications_handler.run().fuse() => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_error)
},
};
_ = &mut transformer => {
error!(target: LOG_TARGET, "🥩 Finality notification transformer task has unexpectedly terminated.");
}
}
return;
}
}

/// Produce a future that transformes finality notifications into a struct that does not keep blocks
/// pinned.
fn finality_notification_transformer_future<B>(
mut finality_notifications: sc_client_api::FinalityNotifications<B>,
) -> (
Pin<Box<futures::future::Fuse<impl Future<Output = ()> + Sized>>>,
Fuse<TracingUnboundedReceiver<UnpinnedFinalityNotification<B>>>,
)
where
B: Block,
{
let (tx, rx) = tracing_unbounded("beefy-notification-transformer-channel", 10000);
let transformer_fut = async move {
while let Some(notification) = finality_notifications.next().await {
debug!(target: LOG_TARGET, "🥩 Transforming grandpa notification. #{}({:?})", notification.header.number(), notification.hash);
if let Err(err) = tx.unbounded_send(UnpinnedFinalityNotification::from(notification)) {
error!(target: LOG_TARGET, "🥩 Unable to send transformed notification. Shutting down. err = {}", err);
return
};
}
};
(Box::pin(transformer_fut.fuse()), rx.fuse())
}

/// Waits until the parent header of `current` is available and returns it.
///
/// When the node uses GRANDPA warp sync it initially downloads only the mandatory GRANDPA headers.
Expand Down
74 changes: 55 additions & 19 deletions substrate/client/consensus/beefy/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ use crate::{
request_response::{on_demand_justifications_protocol_config, BeefyJustifsRequestHandler},
},
error::Error,
gossip_protocol_name,
finality_notification_transformer_future, gossip_protocol_name,
justification::*,
wait_for_runtime_pallet,
worker::PersistedState,
BeefyRPCLinks, BeefyVoterLinks, BeefyWorkerBuilder, KnownPeers,
BeefyRPCLinks, BeefyVoterLinks, BeefyWorkerBuilder, KnownPeers, UnpinnedFinalityNotification,
};
use futures::{
future,
stream::{Fuse, FuturesUnordered},
Future, FutureExt, StreamExt,
};
use futures::{future, stream::FuturesUnordered, Future, FutureExt, StreamExt};
use parking_lot::Mutex;
use sc_block_builder::BlockBuilderBuilder;
use sc_client_api::{Backend as BackendT, BlockchainEvents, FinalityNotifications, HeaderBackend};
Expand All @@ -49,7 +53,7 @@ use sc_network_test::{
Block, BlockImportAdapter, FullPeerConfig, PassThroughVerifier, Peer, PeersClient,
PeersFullClient, TestNetFactory,
};
use sc_utils::notification::NotificationReceiver;
use sc_utils::{mpsc::TracingUnboundedReceiver, notification::NotificationReceiver};
use serde::{Deserialize, Serialize};
use sp_api::{ApiRef, ProvideRuntimeApi};
use sp_application_crypto::key_types::BEEFY as BEEFY_KEY_TYPE;
Expand Down Expand Up @@ -371,7 +375,7 @@ pub(crate) fn create_beefy_keystore(authority: &BeefyKeyring<AuthorityId>) -> Ke

async fn voter_init_setup(
net: &mut BeefyTestNet,
finality: &mut futures::stream::Fuse<FinalityNotifications<Block>>,
finality: &mut futures::stream::Fuse<crate::FinalityNotifications<Block>>,
api: &TestApi,
) -> Result<PersistedState<Block, ecdsa_crypto::AuthorityId>, Error> {
let backend = net.peer(0).client().as_backend();
Expand All @@ -391,6 +395,15 @@ async fn voter_init_setup(
.await
}

fn start_finality_worker(
finality: FinalityNotifications<Block>,
) -> Fuse<TracingUnboundedReceiver<UnpinnedFinalityNotification<Block>>> {
let (transformer, finality_notifications) = finality_notification_transformer_future(finality);
let tokio_handle = tokio::runtime::Handle::current();
tokio_handle.spawn(transformer);
finality_notifications
}

// Spawns beefy voters. Returns a future to spawn on the runtime.
fn initialize_beefy<API>(
net: &mut BeefyTestNet,
Expand Down Expand Up @@ -1020,13 +1033,17 @@ async fn should_initialize_voter_at_genesis() {

// push 15 blocks with `AuthorityChange` digests every 10 blocks
let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await;
let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);

// finalize 13 without justifications
net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap();

let api = TestApi::with_validator_set(&validator_set);
// load persistent state - nothing in DB, should init at genesis
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// Test initialization at session boundary.
// verify voter initialized with two sessions starting at blocks 1 and 10
Expand Down Expand Up @@ -1061,14 +1078,18 @@ async fn should_initialize_voter_at_custom_genesis() {

// push 15 blocks with `AuthorityChange` digests every 15 blocks
let hashes = net.generate_blocks_and_sync(15, 15, &validator_set, false).await;
let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);

// finalize 3, 5, 8 without justifications
net.peer(0).client().as_client().finalize_block(hashes[3], None).unwrap();
net.peer(0).client().as_client().finalize_block(hashes[5], None).unwrap();
net.peer(0).client().as_client().finalize_block(hashes[8], None).unwrap();

// load persistent state - nothing in DB, should init at genesis
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// Test initialization at session boundary.
// verify voter initialized with single session starting at block `custom_pallet_genesis` (7)
Expand Down Expand Up @@ -1098,7 +1119,8 @@ async fn should_initialize_voter_at_custom_genesis() {

net.peer(0).client().as_client().finalize_block(hashes[10], None).unwrap();
// load persistent state - state preset in DB, but with different pallet genesis
let new_persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let new_persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// verify voter initialized with single session starting at block `new_pallet_genesis` (10)
let sessions = new_persisted_state.voting_oracle().sessions();
Expand Down Expand Up @@ -1129,7 +1151,9 @@ async fn should_initialize_voter_when_last_final_is_session_boundary() {
// push 15 blocks with `AuthorityChange` digests every 10 blocks
let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await;

let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);

// finalize 13 without justifications
net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap();
Expand All @@ -1153,7 +1177,8 @@ async fn should_initialize_voter_when_last_final_is_session_boundary() {

let api = TestApi::with_validator_set(&validator_set);
// load persistent state - nothing in DB, should init at session boundary
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// verify voter initialized with single session starting at block 10
assert_eq!(persisted_state.voting_oracle().sessions().len(), 1);
Expand Down Expand Up @@ -1183,7 +1208,9 @@ async fn should_initialize_voter_at_latest_finalized() {
// push 15 blocks with `AuthorityChange` digests every 10 blocks
let hashes = net.generate_blocks_and_sync(15, 10, &validator_set, false).await;

let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);

// finalize 13 without justifications
net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap();
Expand All @@ -1206,7 +1233,8 @@ async fn should_initialize_voter_at_latest_finalized() {

let api = TestApi::with_validator_set(&validator_set);
// load persistent state - nothing in DB, should init at last BEEFY finalized
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// verify voter initialized with single session starting at block 12
assert_eq!(persisted_state.voting_oracle().sessions().len(), 1);
Expand Down Expand Up @@ -1239,12 +1267,15 @@ async fn should_initialize_voter_at_custom_genesis_when_state_unavailable() {

// push 30 blocks with `AuthorityChange` digests every 5 blocks
let hashes = net.generate_blocks_and_sync(30, 5, &validator_set, false).await;
let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);
// finalize 30 without justifications
net.peer(0).client().as_client().finalize_block(hashes[30], None).unwrap();

// load persistent state - nothing in DB, should init at genesis
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// Test initialization at session boundary.
// verify voter initialized with all sessions pending, first one starting at block 5 (start of
Expand Down Expand Up @@ -1282,14 +1313,18 @@ async fn should_catch_up_when_loading_saved_voter_state() {

// push 30 blocks with `AuthorityChange` digests every 10 blocks
let hashes = net.generate_blocks_and_sync(30, 10, &validator_set, false).await;
let mut finality = net.peer(0).client().as_client().finality_notification_stream().fuse();
let finality = net.peer(0).client().as_client().finality_notification_stream();

let mut finality_notifications = start_finality_worker(finality);

// finalize 13 without justifications
net.peer(0).client().as_client().finalize_block(hashes[13], None).unwrap();

let api = TestApi::with_validator_set(&validator_set);

// load persistent state - nothing in DB, should init at genesis
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// Test initialization at session boundary.
// verify voter initialized with two sessions starting at blocks 1 and 10
Expand All @@ -1316,7 +1351,8 @@ async fn should_catch_up_when_loading_saved_voter_state() {
// finalize 25 without justifications
net.peer(0).client().as_client().finalize_block(hashes[25], None).unwrap();
// load persistent state - state preset in DB
let persisted_state = voter_init_setup(&mut net, &mut finality, &api).await.unwrap();
let persisted_state =
voter_init_setup(&mut net, &mut finality_notifications, &api).await.unwrap();

// Verify voter initialized with old sessions plus a new one starting at block 20.
// There shouldn't be any duplicates.
Expand Down
Loading

0 comments on commit 5dc0670

Please sign in to comment.