Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backing: Back-off from backing on approval checking lag #2908

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -602,3 +602,7 @@ wasmi = { opt-level = 3 }
x25519-dalek = { opt-level = 3 }
yamux = { opt-level = 3 }
zeroize = { opt-level = 3 }

[patch.crates-io]
orchestra = { git = "https://github.com/paritytech/orchestra", branch = "alexggh/expose_meters", default-features = false, features = ["futures_channel"] }
metered = { package = "prioritized-metered-channel", git = "https://github.com/paritytech/orchestra", branch = "alexggh/expose_meters", default-features = false, features = ["futures_channel"] }
1 change: 1 addition & 0 deletions polkadot/node/core/backing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ futures = "0.3.21"
sp-keystore = { path = "../../../../substrate/primitives/keystore" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-parachain-primitives = { path = "../../../parachain", default-features = false }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
erasure-coding = { package = "polkadot-erasure-coding", path = "../../../erasure-coding" }
Expand Down
27 changes: 22 additions & 5 deletions polkadot/node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ use futures::{

use error::{Error, FatalResult};
use polkadot_node_primitives::{
AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
ValidationResult,
approval::WorkRateLimiter, AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD,
StatementWithPVD, ValidationResult,
};
use polkadot_node_subsystem::{
messages::{
Expand All @@ -103,6 +103,7 @@ use polkadot_node_subsystem_util::{
},
Validator,
};
use polkadot_parachain_primitives::primitives::IsSystem;
use polkadot_primitives::{
BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceipt,
CommittedCandidateReceipt, CoreIndex, CoreState, ExecutorParams, Hash, Id as ParaId,
Expand Down Expand Up @@ -279,6 +280,9 @@ struct State {
background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
/// The handle to the keystore used for signing.
keystore: KeystorePtr,
/// A work rate limiter, it is used by `ApprovalDistributionSubsystem` to inform backing
/// subsytem it is overloaded and it should refrain from creating new work.
rate_limit: WorkRateLimiter,
}

impl State {
Expand All @@ -293,6 +297,7 @@ impl State {
per_candidate: HashMap::new(),
background_validation_tx,
keystore,
rate_limit: WorkRateLimiter::default(),
}
}
}
Expand Down Expand Up @@ -745,16 +750,28 @@ async fn handle_communication<Context>(
metrics: &Metrics,
) -> Result<(), Error> {
match message {
CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => {
handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
},
CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) =>
if !state.rate_limit.rate_limited(candidate.hash().as_bytes()[0]) ||
candidate.descriptor.para_id.is_system()
{
handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
} else {
gum::info!(
target: LOG_TARGET,
slow_down = ?state.rate_limit,
"Back-off from backing because approval distribution is loaded"
);
},
CandidateBackingMessage::Statement(relay_parent, statement) => {
handle_statement_message(ctx, state, relay_parent, statement, metrics).await?;
},
CandidateBackingMessage::GetBackedCandidates(requested_candidates, tx) =>
handle_get_backed_candidates_message(state, requested_candidates, tx, metrics)?,
CandidateBackingMessage::CanSecond(request, tx) =>
handle_can_second_request(ctx, state, request, tx).await,
CandidateBackingMessage::RateLimitBacking(rate_limit) => {
state.rate_limit = rate_limit;
},
}

Ok(())
Expand Down
33 changes: 31 additions & 2 deletions polkadot/node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,14 @@ use polkadot_node_primitives::approval::{
AsBitIndex, AssignmentCertKindV2, CandidateBitfield, IndirectAssignmentCertV2,
IndirectSignedApprovalVoteV2,
},
WorkRateLimiter,
};
use polkadot_node_subsystem::{
messages::{
ApprovalCheckResult, ApprovalDistributionMessage, ApprovalVotingMessage,
AssignmentCheckResult, NetworkBridgeEvent, NetworkBridgeTxMessage,
AssignmentCheckResult, CandidateBackingMessage, NetworkBridgeEvent, NetworkBridgeTxMessage,
},
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemSender,
};
use polkadot_node_subsystem_util::reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL};
use polkadot_primitives::{
Expand Down Expand Up @@ -2287,6 +2288,15 @@ async fn modify_reputation(
reputation.modify(sender, peer_id, rep).await;
}

// Normally for 100 parachains and 500 validator, would generate around
// 3000 assignments and 3000 approval per block, so having 20_000 waiting
// in the queue is not a good sign, it means we are falling back on work
// and we need to back-off from creating new work untill we caught up.
const TOLERATED_PENDING_MESSAGES: u32 = 20_000;
/// The number of messages for each slowing down level, we need to fastly react
/// if we are past the tipping point, so that quickly recover/catch up.
const LEVEL_WIDTH: u32 = 3_000;

#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
impl ApprovalDistribution {
/// Create a new instance of the [`ApprovalDistribution`] subsystem.
Expand Down Expand Up @@ -2314,12 +2324,31 @@ impl ApprovalDistribution {
let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
let mut reputation_delay = new_reputation_delay();

let new_workload = || futures_timer::Delay::new(Duration::from_secs(1)).fuse();
let mut workload_update = new_workload();

loop {
select! {
_ = reputation_delay => {
state.reputation.send(ctx.sender()).await;
reputation_delay = new_reputation_delay();
},
_ = workload_update => {
let num_in_bounded_queue = ctx.bounded_meter().num_in_queue() as u32;
let num_in_unbounded_queue = ctx.unbounded_meter().num_in_queue() as u32;
gum::trace!(target: LOG_TARGET, ?num_in_bounded_queue, ?num_in_unbounded_queue, "Messages in the queue");

ctx.sender().send_message(
CandidateBackingMessage::RateLimitBacking(
WorkRateLimiter::new(
num_in_bounded_queue + num_in_unbounded_queue,
TOLERATED_PENDING_MESSAGES,
LEVEL_WIDTH
)
)
).await;
workload_update = new_workload();
}
message = ctx.recv().fuse() => {
let message = match message {
Ok(message) => message,
Expand Down
9 changes: 5 additions & 4 deletions polkadot/node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ pub use polkadot_node_metrics::{

pub use orchestra as gen;
pub use orchestra::{
contextbounds, orchestra, subsystem, FromOrchestra, MapSubsystem, MessagePacket,
OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext,
SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters,
SubsystemSender, TimeoutExt, ToOrchestra, TrySendError,
contextbounds, metered::Meter, orchestra, subsystem, FromOrchestra, MapSubsystem,
MessagePacket, OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem,
SubsystemContext, SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts,
SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra, TrySendError,
};

#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))]
Expand Down Expand Up @@ -580,6 +580,7 @@ pub struct Overseer<SupportsParachains> {
#[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [
NetworkBridgeTxMessage,
ApprovalVotingMessage,
CandidateBackingMessage,
])]
approval_distribution: ApprovalDistribution,

Expand Down
60 changes: 59 additions & 1 deletion polkadot/node/primitives/src/approval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

//! Types relevant for approval.

use std::cmp::min;

/// A list of primitives introduced in v1.
pub mod v1 {
use sp_consensus_babe as babe_primitives;
Expand Down Expand Up @@ -530,9 +532,43 @@ pub mod v2 {
}
}

/// A uber-simple rate limiter, that returns based on a work_id if should
/// rate limited or not, it provides and exponential rate limiting once we
/// are past the tolerated pending items.
#[derive(Debug, Default)]
pub struct WorkRateLimiter {
level: u32,
}
// Max level
const MAX_LEVEL: u32 = 8;

impl WorkRateLimiter {
/// Builds a new WorkRateLimiter out of
/// `num_pending_items` - the number of working items waiting for processing.
/// `tolerated_pending_items` - acceptable peding_items, when pending items
/// is bellow this level, no rate limited is required.
/// `level_width` - the number of peding_items each level covers.
pub fn new(num_pending_items: u32, tolerated_pending_items: u32, level_width: u32) -> Self {
let level = if num_pending_items > tolerated_pending_items {
1 + (num_pending_items - tolerated_pending_items) / level_width
} else {
0
};

WorkRateLimiter { level: min(level, MAX_LEVEL) }
}

/// Returns true if the items should be rate limited and false otherwise.
pub fn rate_limited(&self, item_identifier: u8) -> bool {
item_identifier as u32 % u32::pow(2, self.level) != 0
}
}
#[cfg(test)]
mod test {
use super::v2::{BitIndex, Bitfield};
use super::{
v2::{BitIndex, Bitfield},
WorkRateLimiter,
};

use polkadot_primitives::{CandidateIndex, CoreIndex};

Expand Down Expand Up @@ -582,4 +618,26 @@ mod test {
assert_eq!(bitfield.inner_mut().count_ones(), 1);
assert_eq!(bitfield.len(), 21);
}

#[test]
fn test_work_rate_limiter() {
let slow_down = WorkRateLimiter::new(200, 40, 12);
assert!(!slow_down.rate_limited(0));
assert!(slow_down.rate_limited(1));
assert!(slow_down.rate_limited(3));
assert!(slow_down.rate_limited(255));

let slow_down = WorkRateLimiter::new(450, 40, 12);
assert!(slow_down.rate_limited(255));

let slow_down = WorkRateLimiter::new(40, 40, 12);
assert!(!slow_down.rate_limited(0));
assert!(!slow_down.rate_limited(1));
assert!(!slow_down.rate_limited(2));

let slow_down = WorkRateLimiter::new(0, 40, 12);
assert!(!slow_down.rate_limited(0));
assert!(!slow_down.rate_limited(1));
assert!(!slow_down.rate_limited(2));
}
}
4 changes: 4 additions & 0 deletions polkadot/node/subsystem-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use polkadot_node_primitives::{
approval::{
v1::BlockApprovalMeta,
v2::{CandidateBitfield, IndirectAssignmentCertV2, IndirectSignedApprovalVoteV2},
WorkRateLimiter,
},
AvailableData, BabeEpoch, BlockWeight, CandidateVotes, CollationGenerationConfig,
CollationSecondedSignal, DisputeMessage, DisputeStatus, ErasureChunk, PoV,
Expand Down Expand Up @@ -98,6 +99,9 @@ pub enum CandidateBackingMessage {
/// Disputes Subsystem, though that escalation is deferred until the approval voting stage to
/// guarantee availability. Agreements are simply tallied until a quorum is reached.
Statement(Hash, SignedFullStatementWithPVD),
/// Rate limit backing, it is used by approval-distribution to slow-down backing subsytem when
/// it is overloaded.
RateLimitBacking(WorkRateLimiter),
}

/// Blanket error for validation failing for internal reasons.
Expand Down
Loading