Skip to content

Commit

Permalink
Merge pull request #518 from paritytech/AndreiEres/on-demand-delay
Browse files Browse the repository at this point in the history
Track on-demand latency
  • Loading branch information
AndreiEres authored Aug 30, 2023
2 parents 974b096 + 2b75423 commit 0df5012
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 14 deletions.
43 changes: 42 additions & 1 deletion parachain-tracer/src/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ struct MetricsInner {
para_block_times_sec: HistogramVec,
/// Parachain's on-demand orders
para_on_demand_orders: GaugeVec,
/// Latency between ordering a slot by a parachain and its last backed candidate in relay blocks
para_on_demand_delay: GaugeVec,
/// Latency between ordering a slot by a parachain and its last backed candidate in seconds
para_on_demand_delay_sec: GaugeVec,
/// Finality lag
finality_lag: Gauge,
}
Expand Down Expand Up @@ -197,7 +201,8 @@ impl Metrics {
}
}

pub(crate) fn on_on_demand_order(&self, order: &OnDemandOrder) {
/// Update on-demand orders
pub(crate) fn handle_on_demand_order(&self, order: &OnDemandOrder) {
if let Some(metrics) = &self.0 {
let para_str: String = order.para_id.to_string();
metrics
Expand All @@ -207,6 +212,28 @@ impl Metrics {
}
}

/// Update on-demand latency in blocks
pub(crate) fn handle_on_demand_delay(&self, delay_blocks: u32, para_id: u32, until: &str) {
if let Some(metrics) = &self.0 {
let para_str: String = para_id.to_string();
metrics
.para_on_demand_delay
.with_label_values(&[&para_str[..], until])
.set(delay_blocks as f64);
}
}

/// Update on-demand latency in seconds
pub(crate) fn handle_on_demand_delay_sec(&self, delay_sec: Duration, para_id: u32, until: &str) {
if let Some(metrics) = &self.0 {
let para_str: String = para_id.to_string();
metrics
.para_on_demand_delay_sec
.with_label_values(&[&para_str[..], until])
.set(delay_sec.as_secs_f64());
}
}

pub(crate) fn on_finality_lag(&self, lag: u32) {
if let Some(metrics) = &self.0 {
metrics.finality_lag.set(lag.into());
Expand Down Expand Up @@ -339,6 +366,20 @@ fn register_metrics(registry: &Registry) -> Result<Metrics> {
)?,
registry,
)?,
para_on_demand_delay: prometheus_endpoint::register(
GaugeVec::new(
Opts::new("pc_para_on_demand_delay", "Latency (in relay chain blocks) between when the parachain orders a core and when first candidate is scheduled or backed on that core."),
&["parachain_id", "until"],
)?,
registry,
)?,
para_on_demand_delay_sec: prometheus_endpoint::register(
GaugeVec::new(
Opts::new("pc_para_on_demand_delay_sec", "Latency (in seconds) between when the parachain orders a core and when first candidate is scheduled or backed on that core."),
&["parachain_id"],
)?,
registry,
)?,
finality_lag: prometheus_endpoint::register(
Gauge::new("pc_finality_lag", "Finality lag")?,
registry,
Expand Down
81 changes: 68 additions & 13 deletions parachain-tracer/src/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ pub struct SubxtTracker {
current_relay_block_ts: Option<Timestamp>,
/// Current on-demand order
on_demand_order: Option<OnDemandOrder>,
/// Relay block where the last on-demand order was placed
on_demand_order_block: Option<BlockNumber>,
/// Timestamp where the last on-demand order was placed
on_demand_order_ts: Option<Timestamp>,
/// On-demand parachain was scheduled in current relay block
on_demand_scheduled: bool,
/// Last observed finality lag
finality_lag: Option<u32>,
/// Last relay chain block timestamp.
Expand Down Expand Up @@ -304,6 +310,13 @@ impl ParachainBlockTracker for SubxtTracker {
}
self.stats.on_backed();
metrics.on_backed(self.para_id);

if self.handle_on_demand_delay("backed", metrics) {
self.on_demand_order_block = None;
}
if self.handle_on_demand_delay_sec("backed", metrics) {
self.on_demand_order_ts = None;
}
},
ParachainBlockState::PendingAvailability | ParachainBlockState::Included => {
self.progress_availability(metrics).await;
Expand Down Expand Up @@ -347,14 +360,19 @@ impl ParachainBlockTracker for SubxtTracker {
metrics.on_block(tm.as_secs_f64(), self.para_id);
}

if let Some(ref order) = self.on_demand_order {
metrics.on_on_demand_order(order);
}

if let Some(finality_lag) = self.finality_lag {
metrics.on_finality_lag(finality_lag);
}

if self.handle_on_demand_order(metrics) {
self.on_demand_order = None;
}
if self.on_demand_scheduled {
let _sent = self.handle_on_demand_delay("scheduled", metrics);
let _sent = self.handle_on_demand_delay_sec("scheduled", metrics);
self.on_demand_scheduled = false;
}

self.update.clone()
}
}
Expand Down Expand Up @@ -383,6 +401,9 @@ impl SubxtTracker {
previous_relay_block: None,
current_relay_block_ts: None,
on_demand_order: None,
on_demand_order_block: None,
on_demand_order_ts: None,
on_demand_scheduled: false,
finality_lag: None,
disputes: Vec::new(),
last_assignment: None,
Expand All @@ -408,6 +429,17 @@ impl SubxtTracker {
.storage_read_prefixed(CollectorPrefixType::OnDemandOrder(self.para_id), block_hash)
.await
.map(|v| v.into_inner::<OnDemandOrder>().unwrap());

if self.on_demand_order.is_none() {
return
}

if let Some((block_number, _)) = self.current_relay_block {
self.on_demand_order_block = Some(block_number);
}
if let Some(ts) = self.current_relay_block_ts {
self.on_demand_order_ts = Some(ts);
}
}

async fn get_session_keys(&self, session_index: u32) -> Option<Vec<AccountId32>> {
Expand Down Expand Up @@ -584,8 +616,9 @@ impl SubxtTracker {
self.get_core_assignments_via_claim_queue(block_hash).await?,
Err(e) => return Err(e.into()),
};
if let Some((core, _)) = assignments.iter().find(|(_, ids)| ids.contains(&self.para_id)) {
if let Some((core, scheduled_ids)) = assignments.iter().find(|(_, ids)| ids.contains(&self.para_id)) {
self.current_candidate.assigned_core = Some(*core);
self.on_demand_scheduled = self.on_demand_order.is_some() && scheduled_ids[0] == self.para_id;
}
Ok(())
}
Expand Down Expand Up @@ -800,7 +833,7 @@ impl SubxtTracker {
relay_block_number,
self.last_included_block,
backed_in,
self.get_candidate_time(),
self.get_time_diff(self.current_relay_block_ts, self.last_included_at_ts),
self.para_id,
);
self.last_included_block = Some(relay_block_number);
Expand All @@ -825,13 +858,10 @@ impl SubxtTracker {
Duration::from_millis(cur_ts).saturating_sub(Duration::from_millis(base_ts))
}

/// Returns the time for the current candidate
pub fn get_candidate_time(&self) -> Option<Duration> {
let current = self.current_relay_block_ts;
let base = self.last_included_at_ts;
match (current, base) {
(Some(current), Some(base)) =>
Some(Duration::from_millis(current).saturating_sub(Duration::from_millis(base))),
/// Returns a time difference between optional timestamps
pub fn get_time_diff(&self, lhs: Option<u64>, rhs: Option<u64>) -> Option<Duration> {
match (lhs, rhs) {
(Some(lhs), Some(rhs)) => Some(Duration::from_millis(lhs).saturating_sub(Duration::from_millis(rhs))),
_ => None,
}
}
Expand All @@ -840,6 +870,31 @@ impl SubxtTracker {
pub fn summary(&self) -> &ParachainStats {
&self.stats
}

fn handle_on_demand_order(&self, metrics: &Metrics) -> bool {
if let Some(ref order) = self.on_demand_order {
metrics.handle_on_demand_order(order);
return true
}
false
}

fn handle_on_demand_delay(&self, until: &str, metrics: &Metrics) -> bool {
if let (Some(on_demand_block), Some((relay_block, _))) = (self.on_demand_order_block, self.current_relay_block)
{
metrics.handle_on_demand_delay(relay_block.saturating_sub(on_demand_block), self.para_id, until);
return true
}
false
}

fn handle_on_demand_delay_sec(&self, until: &str, metrics: &Metrics) -> bool {
if let Some(diff) = self.get_time_diff(self.current_relay_block_ts, self.on_demand_order_ts) {
metrics.handle_on_demand_delay_sec(diff, self.para_id, until);
return true
}
false
}
}

// Examines session info (if any) and find the corresponding validator
Expand Down

0 comments on commit 0df5012

Please sign in to comment.