From 7144a4ea56db6ca41d6073f47ab0523b079baf3f Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Wed, 10 Jan 2024 17:04:34 +0200 Subject: [PATCH 1/2] backing: Back-off from backing on approval checking lag If approval checking falls behind a certain threshold, it means that the network could not process the assignments and approvals needed for approving the block fast enough, so we need to back-off from creating new work, to give the opportunity of the approvals subsystems to catch up. Continously, creating new work is not a good idea because of the way approvals subsystems work, so if the system is slow on processing the assignments and approvals for the current block, either because we are behind on work from previous blocks or because the network is slow, validators will simply trigger new tranches which in turn causes more delays so we are going to create the conditions for the system to never catch up and fall behind. Hence, why we need a mechanism to ensure that instead of falling more and more behind we actually allow the system to automatically catch up and start working in optimal conditions. This PR achieves that, by abstaining from backing new candidates if the node is behind on approvals beyond a certain threshold. Signed-off-by: Alexandru Gheorghe --- polkadot/node/core/provisioner/src/lib.rs | 64 ++++++++++++++++--- polkadot/node/core/provisioner/src/tests.rs | 27 +++++++- .../node/service/src/relay_chain_selection.rs | 7 ++ polkadot/node/subsystem-types/src/messages.rs | 2 + 4 files changed, 90 insertions(+), 10 deletions(-) diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 8893bdc6549d..24da2d0f5f7c 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -43,7 +43,10 @@ use polkadot_primitives::{ BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption, SignedAvailabilityBitfield, ValidatorIndex, }; -use std::collections::{BTreeMap, HashMap}; +use std::{ + cmp::max, + collections::{BTreeMap, HashMap}, +}; mod disputes; mod error; @@ -52,6 +55,8 @@ mod metrics; pub use self::metrics::*; use error::{Error, FatalResult}; +const TOLERATED_APPROVAL_CHECKING_LAG: i32 = 8; + #[cfg(test)] mod tests; @@ -124,10 +129,17 @@ impl ProvisionerSubsystem { async fn run(mut ctx: Context, metrics: Metrics) -> FatalResult<()> { let mut inherent_delays = InherentDelays::new(); let mut per_relay_parent = HashMap::new(); + let mut approval_checking_lag: BlockNumber = 0; loop { - let result = - run_iteration(&mut ctx, &mut per_relay_parent, &mut inherent_delays, &metrics).await; + let result = run_iteration( + &mut ctx, + &mut per_relay_parent, + &mut inherent_delays, + &metrics, + &mut approval_checking_lag, + ) + .await; match result { Ok(()) => break, @@ -144,6 +156,7 @@ async fn run_iteration( per_relay_parent: &mut HashMap, inherent_delays: &mut InherentDelays, metrics: &Metrics, + approval_checking_lag: &mut BlockNumber, ) -> Result<(), Error> { loop { futures::select! { @@ -155,7 +168,7 @@ async fn run_iteration( FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}, FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), FromOrchestra::Communication { msg } => { - handle_communication(ctx, per_relay_parent, msg, metrics).await?; + handle_communication(ctx, per_relay_parent, msg, metrics, approval_checking_lag).await?; }, } }, @@ -171,7 +184,7 @@ async fn run_iteration( let return_senders = std::mem::take(&mut state.awaiting_inherent); if !return_senders.is_empty() { - send_inherent_data_bg(ctx, &state, return_senders, metrics.clone()).await?; + send_inherent_data_bg(ctx, &state, return_senders, metrics.clone(), *approval_checking_lag).await?; } } } @@ -207,6 +220,7 @@ async fn handle_communication( per_relay_parent: &mut HashMap, message: ProvisionerMessage, metrics: &Metrics, + approval_checking_lag: &mut BlockNumber, ) -> Result<(), Error> { match message { ProvisionerMessage::RequestInherentData(relay_parent, return_sender) => { @@ -215,8 +229,14 @@ async fn handle_communication( if let Some(state) = per_relay_parent.get_mut(&relay_parent) { if state.is_inherent_ready { gum::trace!(target: LOG_TARGET, ?relay_parent, "Calling send_inherent_data."); - send_inherent_data_bg(ctx, &state, vec![return_sender], metrics.clone()) - .await?; + send_inherent_data_bg( + ctx, + &state, + vec![return_sender], + metrics.clone(), + *approval_checking_lag, + ) + .await?; } else { gum::trace!( target: LOG_TARGET, @@ -237,6 +257,9 @@ async fn handle_communication( note_provisionable_data(state, &span, data); } }, + ProvisionerMessage::ApprovalCheckingLagUpdate(lag) => { + *approval_checking_lag = lag; + }, } Ok(()) @@ -248,6 +271,7 @@ async fn send_inherent_data_bg( per_relay_parent: &PerRelayParent, return_senders: Vec>, metrics: Metrics, + approval_checking_lag: BlockNumber, ) -> Result<(), Error> { let leaf = per_relay_parent.leaf.clone(); let signed_bitfields = per_relay_parent.signed_bitfields.clone(); @@ -275,6 +299,7 @@ async fn send_inherent_data_bg( return_senders, &mut sender, &metrics, + approval_checking_lag, ) // Make sure call is not taking forever: .timeout(SEND_INHERENT_DATA_TIMEOUT) .map(|v| match v { @@ -386,6 +411,7 @@ async fn send_inherent_data( return_senders: Vec>, from_job: &mut impl overseer::ProvisionerSenderTrait, metrics: &Metrics, + approval_checking_lag: BlockNumber, ) -> Result<(), Error> { gum::trace!( target: LOG_TARGET, @@ -436,6 +462,7 @@ async fn send_inherent_data( prospective_parachains_mode, leaf.hash, from_job, + approval_checking_lag, ) .await?; @@ -708,12 +735,13 @@ async fn select_candidates( prospective_parachains_mode: ProspectiveParachainsMode, relay_parent: Hash, sender: &mut impl overseer::ProvisionerSenderTrait, + approval_checking_lag: BlockNumber, ) -> Result, Error> { gum::trace!(target: LOG_TARGET, leaf_hash=?relay_parent, "before GetBackedCandidates"); - let selected_candidates = match prospective_parachains_mode { + let mut selected_candidates = match prospective_parachains_mode { ProspectiveParachainsMode::Enabled { .. } => request_backable_candidates(availability_cores, bitfields, relay_parent, sender).await?, ProspectiveParachainsMode::Disabled => @@ -727,6 +755,26 @@ async fn select_candidates( .await?, }; + // Eponentially back-off if we are past a tolerated approval checking lag. + // This way we avoid creating new work for approval subsystem and give it + // the opportunity to catch if it has falled behind. + let max_candidates_to_back = selected_candidates.len() / + usize::pow( + 2, + max(0, approval_checking_lag as i32 - TOLERATED_APPROVAL_CHECKING_LAG) as u32, + ); + + if max_candidates_to_back != selected_candidates.len() { + gum::info!( + target: LOG_TARGET, + available_for_backing = ?selected_candidates.len(), + ?max_candidates_to_back, + ?approval_checking_lag, + "High approval checking lag throttle backing" + ); + selected_candidates.truncate(max_candidates_to_back); + } + // now get the backed candidates corresponding to these candidate receipts let (tx, rx) = oneshot::channel(); sender.send_unbounded_message(CandidateBackingMessage::GetBackedCandidates( diff --git a/polkadot/node/core/provisioner/src/tests.rs b/polkadot/node/core/provisioner/src/tests.rs index 1d7bdfcfcb89..c732de5db883 100644 --- a/polkadot/node/core/provisioner/src/tests.rs +++ b/polkadot/node/core/provisioner/src/tests.rs @@ -399,6 +399,7 @@ mod select_candidates { prospective_parachains_mode, Default::default(), &mut tx, + 0, ) .await .unwrap(); @@ -406,11 +407,19 @@ mod select_candidates { ) } + #[test] + fn selects_correct_candidates() { + run_selects_correct_candidates(0); + run_selects_correct_candidates(2); + run_selects_correct_candidates(5); + run_selects_correct_candidates(6); + run_selects_correct_candidates(10); + } + // this tests that only the appropriate candidates get selected. // To accomplish this, we supply a candidate list containing one candidate per possible core; // the candidate selection algorithm must filter them to the appropriate set - #[test] - fn selects_correct_candidates() { + fn run_selects_correct_candidates(approval_checking_lag: BlockNumber) { let mock_cores = mock_availability_cores(); let empty_hash = PersistedValidationData::::default().hash(); @@ -475,10 +484,21 @@ mod select_candidates { prospective_parachains_mode, Default::default(), &mut tx, + approval_checking_lag, ) .await .unwrap(); + assert_eq!( + expected_candidates.len() / + usize::pow( + 2, + max(0, approval_checking_lag as i32 - TOLERATED_APPROVAL_CHECKING_LAG) + as u32, + ), + result.len() + ); + result.into_iter().for_each(|c| { assert!( expected_candidates.iter().any(|c2| c.candidate.corresponds_to(c2)), @@ -553,6 +573,7 @@ mod select_candidates { prospective_parachains_mode, Default::default(), &mut tx, + 0, ) .await .unwrap(); @@ -620,6 +641,7 @@ mod select_candidates { prospective_parachains_mode, Default::default(), &mut tx, + 0, ) .await .unwrap(); @@ -686,6 +708,7 @@ mod select_candidates { prospective_parachains_mode, Default::default(), &mut tx, + 0, ) .await .unwrap(); diff --git a/polkadot/node/service/src/relay_chain_selection.rs b/polkadot/node/service/src/relay_chain_selection.rs index 5fae6a96de4e..d598ba36216a 100644 --- a/polkadot/node/service/src/relay_chain_selection.rs +++ b/polkadot/node/service/src/relay_chain_selection.rs @@ -43,6 +43,7 @@ use polkadot_node_subsystem::messages::{ ApprovalDistributionMessage, ApprovalVotingMessage, ChainSelectionMessage, DisputeCoordinatorMessage, HighestApprovedAncestorBlock, }; +use polkadot_node_subsystem_types::messages::ProvisionerMessage; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_overseer::{AllMessages, Handle}; use polkadot_primitives::{Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader}; @@ -483,6 +484,12 @@ where std::any::type_name::(), ) .await; + overseer_handle + .send_msg( + ProvisionerMessage::ApprovalCheckingLagUpdate(lag), + std::any::type_name::(), + ) + .await; }; spawn_handle.spawn( diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index c7675c84b91c..db96ee5f1ef3 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -825,6 +825,8 @@ pub enum ProvisionerMessage { RequestInherentData(Hash, oneshot::Sender), /// This data should become part of a relay chain block ProvisionableData(Hash, ProvisionableData), + /// Approval checking lag update measured in blocks. + ApprovalCheckingLagUpdate(BlockNumber), } /// Message to the Collation Generation subsystem. From 641af64852892cfb29371c12d98c85a83344d9b9 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe Date: Mon, 15 Jan 2024 14:11:18 +0200 Subject: [PATCH 2/2] Try 2 Signed-off-by: Alexandru Gheorghe --- Cargo.lock | 32 +++++++--- Cargo.toml | 4 ++ polkadot/node/core/backing/Cargo.toml | 1 + polkadot/node/core/backing/src/lib.rs | 27 ++++++-- polkadot/node/core/provisioner/src/lib.rs | 64 +++---------------- polkadot/node/core/provisioner/src/tests.rs | 27 +------- .../network/approval-distribution/src/lib.rs | 33 +++++++++- polkadot/node/overseer/src/lib.rs | 9 +-- polkadot/node/primitives/src/approval.rs | 60 ++++++++++++++++- .../node/service/src/relay_chain_selection.rs | 7 -- polkadot/node/subsystem-types/src/messages.rs | 6 +- 11 files changed, 159 insertions(+), 111 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2e2e1dfdb0e2..b8d3f66e8001 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4123,7 +4123,7 @@ dependencies = [ "polkadot-primitives", "polkadot-service", "polkadot-test-client", - "prioritized-metered-channel", + "prioritized-metered-channel 0.5.1", "sc-cli", "sc-client-api", "sc-sysinfo", @@ -8987,8 +8987,7 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestra" version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46d78e1deb2a8d54fc1f063a544130db4da31dfe4d5d3b493186424910222a76" +source = "git+https://github.com/paritytech/orchestra?branch=alexggh/expose_meters#0bcf7b7a05da7b5770824fee83794b5443f01e19" dependencies = [ "async-trait", "dyn-clonable", @@ -8996,7 +8995,7 @@ dependencies = [ "futures-timer", "orchestra-proc-macro", "pin-project", - "prioritized-metered-channel", + "prioritized-metered-channel 0.5.0", "thiserror", "tracing", ] @@ -9004,8 +9003,7 @@ dependencies = [ [[package]] name = "orchestra-proc-macro" version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d035b1f968d91a826f2e34a9d6d02cb2af5aa7ca39ebd27922d850ab4b2dd2c6" +source = "git+https://github.com/paritytech/orchestra?branch=alexggh/expose_meters#0bcf7b7a05da7b5770824fee83794b5443f01e19" dependencies = [ "expander 2.0.0", "indexmap 2.0.0", @@ -12480,6 +12478,7 @@ dependencies = [ "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", + "polkadot-parachain-primitives", "polkadot-primitives", "polkadot-primitives-test-helpers", "polkadot-statement-table", @@ -12847,7 +12846,7 @@ dependencies = [ "parity-scale-codec", "polkadot-primitives", "polkadot-test-service", - "prioritized-metered-channel", + "prioritized-metered-channel 0.5.1", "prometheus-parse", "sc-cli", "sc-service", @@ -12995,7 +12994,7 @@ dependencies = [ "polkadot-overseer", "polkadot-primitives", "polkadot-primitives-test-helpers", - "prioritized-metered-channel", + "prioritized-metered-channel 0.5.1", "rand 0.8.5", "sc-client-api", "schnellru", @@ -13025,7 +13024,7 @@ dependencies = [ "polkadot-node-subsystem-types", "polkadot-primitives", "polkadot-primitives-test-helpers", - "prioritized-metered-channel", + "prioritized-metered-channel 0.5.1", "sc-client-api", "sp-api", "sp-core", @@ -14021,6 +14020,21 @@ dependencies = [ "uint", ] +[[package]] +name = "prioritized-metered-channel" +version = "0.5.0" +source = "git+https://github.com/paritytech/orchestra?branch=alexggh/expose_meters#0bcf7b7a05da7b5770824fee83794b5443f01e19" +dependencies = [ + "coarsetime", + "crossbeam-queue", + "derive_more", + "futures", + "futures-timer", + "nanorand", + "thiserror", + "tracing", +] + [[package]] name = "prioritized-metered-channel" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 231aab8dee9c..d2498d3713b5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -602,3 +602,7 @@ wasmi = { opt-level = 3 } x25519-dalek = { opt-level = 3 } yamux = { opt-level = 3 } zeroize = { opt-level = 3 } + +[patch.crates-io] +orchestra = { git = "https://github.com/paritytech/orchestra", branch = "alexggh/expose_meters", default-features = false, features = ["futures_channel"] } +metered = { package = "prioritized-metered-channel", git = "https://github.com/paritytech/orchestra", branch = "alexggh/expose_meters", default-features = false, features = ["futures_channel"] } diff --git a/polkadot/node/core/backing/Cargo.toml b/polkadot/node/core/backing/Cargo.toml index 16ed11e7eec9..d2dbb0b7229a 100644 --- a/polkadot/node/core/backing/Cargo.toml +++ b/polkadot/node/core/backing/Cargo.toml @@ -14,6 +14,7 @@ futures = "0.3.21" sp-keystore = { path = "../../../../substrate/primitives/keystore" } polkadot-primitives = { path = "../../../primitives" } polkadot-node-primitives = { path = "../../primitives" } +polkadot-parachain-primitives = { path = "../../../parachain", default-features = false } polkadot-node-subsystem = { path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } erasure-coding = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" } diff --git a/polkadot/node/core/backing/src/lib.rs b/polkadot/node/core/backing/src/lib.rs index 434051f1b00f..4a73f7a52e5f 100644 --- a/polkadot/node/core/backing/src/lib.rs +++ b/polkadot/node/core/backing/src/lib.rs @@ -80,8 +80,8 @@ use futures::{ use error::{Error, FatalResult}; use polkadot_node_primitives::{ - AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD, - ValidationResult, + approval::WorkRateLimiter, AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, + StatementWithPVD, ValidationResult, }; use polkadot_node_subsystem::{ messages::{ @@ -103,6 +103,7 @@ use polkadot_node_subsystem_util::{ }, Validator, }; +use polkadot_parachain_primitives::primitives::IsSystem; use polkadot_primitives::{ BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt, CommittedCandidateReceipt, CoreIndex, CoreState, ExecutorParams, Hash, Id as ParaId, @@ -279,6 +280,9 @@ struct State { background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>, /// The handle to the keystore used for signing. keystore: KeystorePtr, + /// A work rate limiter, it is used by `ApprovalDistributionSubsystem` to inform backing + /// subsytem it is overloaded and it should refrain from creating new work. + rate_limit: WorkRateLimiter, } impl State { @@ -293,6 +297,7 @@ impl State { per_candidate: HashMap::new(), background_validation_tx, keystore, + rate_limit: WorkRateLimiter::default(), } } } @@ -745,9 +750,18 @@ async fn handle_communication( metrics: &Metrics, ) -> Result<(), Error> { match message { - CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => { - handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?; - }, + CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => + if !state.rate_limit.rate_limited(candidate.hash().as_bytes()[0]) || + candidate.descriptor.para_id.is_system() + { + handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?; + } else { + gum::info!( + target: LOG_TARGET, + slow_down = ?state.rate_limit, + "Back-off from backing because approval distribution is loaded" + ); + }, CandidateBackingMessage::Statement(relay_parent, statement) => { handle_statement_message(ctx, state, relay_parent, statement, metrics).await?; }, @@ -755,6 +769,9 @@ async fn handle_communication( handle_get_backed_candidates_message(state, requested_candidates, tx, metrics)?, CandidateBackingMessage::CanSecond(request, tx) => handle_can_second_request(ctx, state, request, tx).await, + CandidateBackingMessage::RateLimitBacking(rate_limit) => { + state.rate_limit = rate_limit; + }, } Ok(()) diff --git a/polkadot/node/core/provisioner/src/lib.rs b/polkadot/node/core/provisioner/src/lib.rs index 24da2d0f5f7c..8893bdc6549d 100644 --- a/polkadot/node/core/provisioner/src/lib.rs +++ b/polkadot/node/core/provisioner/src/lib.rs @@ -43,10 +43,7 @@ use polkadot_primitives::{ BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, Hash, Id as ParaId, OccupiedCoreAssumption, SignedAvailabilityBitfield, ValidatorIndex, }; -use std::{ - cmp::max, - collections::{BTreeMap, HashMap}, -}; +use std::collections::{BTreeMap, HashMap}; mod disputes; mod error; @@ -55,8 +52,6 @@ mod metrics; pub use self::metrics::*; use error::{Error, FatalResult}; -const TOLERATED_APPROVAL_CHECKING_LAG: i32 = 8; - #[cfg(test)] mod tests; @@ -129,17 +124,10 @@ impl ProvisionerSubsystem { async fn run(mut ctx: Context, metrics: Metrics) -> FatalResult<()> { let mut inherent_delays = InherentDelays::new(); let mut per_relay_parent = HashMap::new(); - let mut approval_checking_lag: BlockNumber = 0; loop { - let result = run_iteration( - &mut ctx, - &mut per_relay_parent, - &mut inherent_delays, - &metrics, - &mut approval_checking_lag, - ) - .await; + let result = + run_iteration(&mut ctx, &mut per_relay_parent, &mut inherent_delays, &metrics).await; match result { Ok(()) => break, @@ -156,7 +144,6 @@ async fn run_iteration( per_relay_parent: &mut HashMap, inherent_delays: &mut InherentDelays, metrics: &Metrics, - approval_checking_lag: &mut BlockNumber, ) -> Result<(), Error> { loop { futures::select! { @@ -168,7 +155,7 @@ async fn run_iteration( FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}, FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), FromOrchestra::Communication { msg } => { - handle_communication(ctx, per_relay_parent, msg, metrics, approval_checking_lag).await?; + handle_communication(ctx, per_relay_parent, msg, metrics).await?; }, } }, @@ -184,7 +171,7 @@ async fn run_iteration( let return_senders = std::mem::take(&mut state.awaiting_inherent); if !return_senders.is_empty() { - send_inherent_data_bg(ctx, &state, return_senders, metrics.clone(), *approval_checking_lag).await?; + send_inherent_data_bg(ctx, &state, return_senders, metrics.clone()).await?; } } } @@ -220,7 +207,6 @@ async fn handle_communication( per_relay_parent: &mut HashMap, message: ProvisionerMessage, metrics: &Metrics, - approval_checking_lag: &mut BlockNumber, ) -> Result<(), Error> { match message { ProvisionerMessage::RequestInherentData(relay_parent, return_sender) => { @@ -229,14 +215,8 @@ async fn handle_communication( if let Some(state) = per_relay_parent.get_mut(&relay_parent) { if state.is_inherent_ready { gum::trace!(target: LOG_TARGET, ?relay_parent, "Calling send_inherent_data."); - send_inherent_data_bg( - ctx, - &state, - vec![return_sender], - metrics.clone(), - *approval_checking_lag, - ) - .await?; + send_inherent_data_bg(ctx, &state, vec![return_sender], metrics.clone()) + .await?; } else { gum::trace!( target: LOG_TARGET, @@ -257,9 +237,6 @@ async fn handle_communication( note_provisionable_data(state, &span, data); } }, - ProvisionerMessage::ApprovalCheckingLagUpdate(lag) => { - *approval_checking_lag = lag; - }, } Ok(()) @@ -271,7 +248,6 @@ async fn send_inherent_data_bg( per_relay_parent: &PerRelayParent, return_senders: Vec>, metrics: Metrics, - approval_checking_lag: BlockNumber, ) -> Result<(), Error> { let leaf = per_relay_parent.leaf.clone(); let signed_bitfields = per_relay_parent.signed_bitfields.clone(); @@ -299,7 +275,6 @@ async fn send_inherent_data_bg( return_senders, &mut sender, &metrics, - approval_checking_lag, ) // Make sure call is not taking forever: .timeout(SEND_INHERENT_DATA_TIMEOUT) .map(|v| match v { @@ -411,7 +386,6 @@ async fn send_inherent_data( return_senders: Vec>, from_job: &mut impl overseer::ProvisionerSenderTrait, metrics: &Metrics, - approval_checking_lag: BlockNumber, ) -> Result<(), Error> { gum::trace!( target: LOG_TARGET, @@ -462,7 +436,6 @@ async fn send_inherent_data( prospective_parachains_mode, leaf.hash, from_job, - approval_checking_lag, ) .await?; @@ -735,13 +708,12 @@ async fn select_candidates( prospective_parachains_mode: ProspectiveParachainsMode, relay_parent: Hash, sender: &mut impl overseer::ProvisionerSenderTrait, - approval_checking_lag: BlockNumber, ) -> Result, Error> { gum::trace!(target: LOG_TARGET, leaf_hash=?relay_parent, "before GetBackedCandidates"); - let mut selected_candidates = match prospective_parachains_mode { + let selected_candidates = match prospective_parachains_mode { ProspectiveParachainsMode::Enabled { .. } => request_backable_candidates(availability_cores, bitfields, relay_parent, sender).await?, ProspectiveParachainsMode::Disabled => @@ -755,26 +727,6 @@ async fn select_candidates( .await?, }; - // Eponentially back-off if we are past a tolerated approval checking lag. - // This way we avoid creating new work for approval subsystem and give it - // the opportunity to catch if it has falled behind. - let max_candidates_to_back = selected_candidates.len() / - usize::pow( - 2, - max(0, approval_checking_lag as i32 - TOLERATED_APPROVAL_CHECKING_LAG) as u32, - ); - - if max_candidates_to_back != selected_candidates.len() { - gum::info!( - target: LOG_TARGET, - available_for_backing = ?selected_candidates.len(), - ?max_candidates_to_back, - ?approval_checking_lag, - "High approval checking lag throttle backing" - ); - selected_candidates.truncate(max_candidates_to_back); - } - // now get the backed candidates corresponding to these candidate receipts let (tx, rx) = oneshot::channel(); sender.send_unbounded_message(CandidateBackingMessage::GetBackedCandidates( diff --git a/polkadot/node/core/provisioner/src/tests.rs b/polkadot/node/core/provisioner/src/tests.rs index c732de5db883..1d7bdfcfcb89 100644 --- a/polkadot/node/core/provisioner/src/tests.rs +++ b/polkadot/node/core/provisioner/src/tests.rs @@ -399,7 +399,6 @@ mod select_candidates { prospective_parachains_mode, Default::default(), &mut tx, - 0, ) .await .unwrap(); @@ -407,19 +406,11 @@ mod select_candidates { ) } - #[test] - fn selects_correct_candidates() { - run_selects_correct_candidates(0); - run_selects_correct_candidates(2); - run_selects_correct_candidates(5); - run_selects_correct_candidates(6); - run_selects_correct_candidates(10); - } - // this tests that only the appropriate candidates get selected. // To accomplish this, we supply a candidate list containing one candidate per possible core; // the candidate selection algorithm must filter them to the appropriate set - fn run_selects_correct_candidates(approval_checking_lag: BlockNumber) { + #[test] + fn selects_correct_candidates() { let mock_cores = mock_availability_cores(); let empty_hash = PersistedValidationData::::default().hash(); @@ -484,21 +475,10 @@ mod select_candidates { prospective_parachains_mode, Default::default(), &mut tx, - approval_checking_lag, ) .await .unwrap(); - assert_eq!( - expected_candidates.len() / - usize::pow( - 2, - max(0, approval_checking_lag as i32 - TOLERATED_APPROVAL_CHECKING_LAG) - as u32, - ), - result.len() - ); - result.into_iter().for_each(|c| { assert!( expected_candidates.iter().any(|c2| c.candidate.corresponds_to(c2)), @@ -573,7 +553,6 @@ mod select_candidates { prospective_parachains_mode, Default::default(), &mut tx, - 0, ) .await .unwrap(); @@ -641,7 +620,6 @@ mod select_candidates { prospective_parachains_mode, Default::default(), &mut tx, - 0, ) .await .unwrap(); @@ -708,7 +686,6 @@ mod select_candidates { prospective_parachains_mode, Default::default(), &mut tx, - 0, ) .await .unwrap(); diff --git a/polkadot/node/network/approval-distribution/src/lib.rs b/polkadot/node/network/approval-distribution/src/lib.rs index d520febaef51..433def47d5b3 100644 --- a/polkadot/node/network/approval-distribution/src/lib.rs +++ b/polkadot/node/network/approval-distribution/src/lib.rs @@ -41,13 +41,14 @@ use polkadot_node_primitives::approval::{ AsBitIndex, AssignmentCertKindV2, CandidateBitfield, IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2, }, + WorkRateLimiter, }; use polkadot_node_subsystem::{ messages::{ ApprovalCheckResult, ApprovalDistributionMessage, ApprovalVotingMessage, - AssignmentCheckResult, NetworkBridgeEvent, NetworkBridgeTxMessage, + AssignmentCheckResult, CandidateBackingMessage, NetworkBridgeEvent, NetworkBridgeTxMessage, }, - overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, + overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemSender, }; use polkadot_node_subsystem_util::reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL}; use polkadot_primitives::{ @@ -2287,6 +2288,15 @@ async fn modify_reputation( reputation.modify(sender, peer_id, rep).await; } +// Normally for 100 parachains and 500 validator, would generate around +// 3000 assignments and 3000 approval per block, so having 20_000 waiting +// in the queue is not a good sign, it means we are falling back on work +// and we need to back-off from creating new work untill we caught up. +const TOLERATED_PENDING_MESSAGES: u32 = 20_000; +/// The number of messages for each slowing down level, we need to fastly react +/// if we are past the tipping point, so that quickly recover/catch up. +const LEVEL_WIDTH: u32 = 3_000; + #[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)] impl ApprovalDistribution { /// Create a new instance of the [`ApprovalDistribution`] subsystem. @@ -2314,12 +2324,31 @@ impl ApprovalDistribution { let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse(); let mut reputation_delay = new_reputation_delay(); + let new_workload = || futures_timer::Delay::new(Duration::from_secs(1)).fuse(); + let mut workload_update = new_workload(); + loop { select! { _ = reputation_delay => { state.reputation.send(ctx.sender()).await; reputation_delay = new_reputation_delay(); }, + _ = workload_update => { + let num_in_bounded_queue = ctx.bounded_meter().num_in_queue() as u32; + let num_in_unbounded_queue = ctx.unbounded_meter().num_in_queue() as u32; + gum::trace!(target: LOG_TARGET, ?num_in_bounded_queue, ?num_in_unbounded_queue, "Messages in the queue"); + + ctx.sender().send_message( + CandidateBackingMessage::RateLimitBacking( + WorkRateLimiter::new( + num_in_bounded_queue + num_in_unbounded_queue, + TOLERATED_PENDING_MESSAGES, + LEVEL_WIDTH + ) + ) + ).await; + workload_update = new_workload(); + } message = ctx.recv().fuse() => { let message = match message { Ok(message) => message, diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index f4eddf1f41ce..c7ce52380493 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -105,10 +105,10 @@ pub use polkadot_node_metrics::{ pub use orchestra as gen; pub use orchestra::{ - contextbounds, orchestra, subsystem, FromOrchestra, MapSubsystem, MessagePacket, - OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext, - SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters, - SubsystemSender, TimeoutExt, ToOrchestra, TrySendError, + contextbounds, metered::Meter, orchestra, subsystem, FromOrchestra, MapSubsystem, + MessagePacket, OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, + SubsystemContext, SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, + SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra, TrySendError, }; #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] @@ -580,6 +580,7 @@ pub struct Overseer { #[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [ NetworkBridgeTxMessage, ApprovalVotingMessage, + CandidateBackingMessage, ])] approval_distribution: ApprovalDistribution, diff --git a/polkadot/node/primitives/src/approval.rs b/polkadot/node/primitives/src/approval.rs index f2a79e025aff..5791772bad1a 100644 --- a/polkadot/node/primitives/src/approval.rs +++ b/polkadot/node/primitives/src/approval.rs @@ -16,6 +16,8 @@ //! Types relevant for approval. +use std::cmp::min; + /// A list of primitives introduced in v1. pub mod v1 { use sp_consensus_babe as babe_primitives; @@ -530,9 +532,43 @@ pub mod v2 { } } +/// A uber-simple rate limiter, that returns based on a work_id if should +/// rate limited or not, it provides and exponential rate limiting once we +/// are past the tolerated pending items. +#[derive(Debug, Default)] +pub struct WorkRateLimiter { + level: u32, +} +// Max level +const MAX_LEVEL: u32 = 8; + +impl WorkRateLimiter { + /// Builds a new WorkRateLimiter out of + /// `num_pending_items` - the number of working items waiting for processing. + /// `tolerated_pending_items` - acceptable peding_items, when pending items + /// is bellow this level, no rate limited is required. + /// `level_width` - the number of peding_items each level covers. + pub fn new(num_pending_items: u32, tolerated_pending_items: u32, level_width: u32) -> Self { + let level = if num_pending_items > tolerated_pending_items { + 1 + (num_pending_items - tolerated_pending_items) / level_width + } else { + 0 + }; + + WorkRateLimiter { level: min(level, MAX_LEVEL) } + } + + /// Returns true if the items should be rate limited and false otherwise. + pub fn rate_limited(&self, item_identifier: u8) -> bool { + item_identifier as u32 % u32::pow(2, self.level) != 0 + } +} #[cfg(test)] mod test { - use super::v2::{BitIndex, Bitfield}; + use super::{ + v2::{BitIndex, Bitfield}, + WorkRateLimiter, + }; use polkadot_primitives::{CandidateIndex, CoreIndex}; @@ -582,4 +618,26 @@ mod test { assert_eq!(bitfield.inner_mut().count_ones(), 1); assert_eq!(bitfield.len(), 21); } + + #[test] + fn test_work_rate_limiter() { + let slow_down = WorkRateLimiter::new(200, 40, 12); + assert!(!slow_down.rate_limited(0)); + assert!(slow_down.rate_limited(1)); + assert!(slow_down.rate_limited(3)); + assert!(slow_down.rate_limited(255)); + + let slow_down = WorkRateLimiter::new(450, 40, 12); + assert!(slow_down.rate_limited(255)); + + let slow_down = WorkRateLimiter::new(40, 40, 12); + assert!(!slow_down.rate_limited(0)); + assert!(!slow_down.rate_limited(1)); + assert!(!slow_down.rate_limited(2)); + + let slow_down = WorkRateLimiter::new(0, 40, 12); + assert!(!slow_down.rate_limited(0)); + assert!(!slow_down.rate_limited(1)); + assert!(!slow_down.rate_limited(2)); + } } diff --git a/polkadot/node/service/src/relay_chain_selection.rs b/polkadot/node/service/src/relay_chain_selection.rs index d598ba36216a..5fae6a96de4e 100644 --- a/polkadot/node/service/src/relay_chain_selection.rs +++ b/polkadot/node/service/src/relay_chain_selection.rs @@ -43,7 +43,6 @@ use polkadot_node_subsystem::messages::{ ApprovalDistributionMessage, ApprovalVotingMessage, ChainSelectionMessage, DisputeCoordinatorMessage, HighestApprovedAncestorBlock, }; -use polkadot_node_subsystem_types::messages::ProvisionerMessage; use polkadot_node_subsystem_util::metrics::{self, prometheus}; use polkadot_overseer::{AllMessages, Handle}; use polkadot_primitives::{Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader}; @@ -484,12 +483,6 @@ where std::any::type_name::(), ) .await; - overseer_handle - .send_msg( - ProvisionerMessage::ApprovalCheckingLagUpdate(lag), - std::any::type_name::(), - ) - .await; }; spawn_handle.spawn( diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index db96ee5f1ef3..d4fa22002df7 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -35,6 +35,7 @@ use polkadot_node_primitives::{ approval::{ v1::BlockApprovalMeta, v2::{CandidateBitfield, IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2}, + WorkRateLimiter, }, AvailableData, BabeEpoch, BlockWeight, CandidateVotes, CollationGenerationConfig, CollationSecondedSignal, DisputeMessage, DisputeStatus, ErasureChunk, PoV, @@ -98,6 +99,9 @@ pub enum CandidateBackingMessage { /// Disputes Subsystem, though that escalation is deferred until the approval voting stage to /// guarantee availability. Agreements are simply tallied until a quorum is reached. Statement(Hash, SignedFullStatementWithPVD), + /// Rate limit backing, it is used by approval-distribution to slow-down backing subsytem when + /// it is overloaded. + RateLimitBacking(WorkRateLimiter), } /// Blanket error for validation failing for internal reasons. @@ -825,8 +829,6 @@ pub enum ProvisionerMessage { RequestInherentData(Hash, oneshot::Sender), /// This data should become part of a relay chain block ProvisionableData(Hash, ProvisionableData), - /// Approval checking lag update measured in blocks. - ApprovalCheckingLagUpdate(BlockNumber), } /// Message to the Collation Generation subsystem.