Skip to content

Commit

Permalink
fix(agents/relayer): update metrics even if no new event is found (#5178
Browse files Browse the repository at this point in the history
)

### Description

- Follow up to
#5074, the
cursor_max_sequence(event_type="merkle_tree_insertion") will be used for
the highest merkle index on a chain. The logic to update this metric has
been moved to when checking for new logs so we can update the metrics
even if new events are not found.
- Renamed `latest_leaf_index` to `latest_tree_insertion_index` for
clarity.

Related #5151


### Drive-by changes

None

### Related issues

- fixes  hyperlane-xyz/issues#1396


### Backward compatibility

Yes

### Testing

e2e + Manual
  • Loading branch information
aroralanuk authored Jan 17, 2025
1 parent fed42c3 commit 3d6ecae
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 24 deletions.
8 changes: 4 additions & 4 deletions rust/main/agents/relayer/src/merkle_tree/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl MerkleTreeProcessor {
// Update the metrics
// we assume that leaves are inserted in order so this will be monotonically increasing
self.metrics
.latest_leaf_index_gauge
.latest_tree_insertion_index_gauge
.set(insertion.index() as i64);
Some(insertion)
} else {
Expand All @@ -89,14 +89,14 @@ impl MerkleTreeProcessor {

#[derive(Debug)]
pub struct MerkleTreeProcessorMetrics {
latest_leaf_index_gauge: IntGauge,
latest_tree_insertion_index_gauge: IntGauge,
}

impl MerkleTreeProcessorMetrics {
pub fn new(metrics: &CoreMetrics, origin: &HyperlaneDomain) -> Self {
Self {
latest_leaf_index_gauge: metrics
.latest_leaf_index()
latest_tree_insertion_index_gauge: metrics
.latest_tree_insertion_index()
.with_label_values(&[origin.name()]),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,17 +101,6 @@ impl<T: Debug + Clone + Sync + Send + Indexable + 'static> ForwardSequenceAwareS
}
}

/// Get target sequence or return 0 if request failed
pub async fn target_sequence(&self) -> u32 {
let (count, _) = self
.latest_sequence_querier
.latest_sequence_count_and_tip()
.await
.ok()
.unwrap_or((None, 0));
count.unwrap_or(0).saturating_sub(1)
}

/// Get the last indexed sequence or 0 if no logs have been indexed yet.
pub fn last_sequence(&self) -> u32 {
self.last_indexed_snapshot.sequence.unwrap_or(0)
Expand All @@ -134,6 +123,10 @@ impl<T: Debug + Clone + Sync + Send + Indexable + 'static> ForwardSequenceAwareS
return Ok(None);
};

// for updating metrics even if there's no indexable events available
let max_sequence = onchain_sequence_count.saturating_sub(1) as i64;
self.update_metrics(max_sequence).await;

let current_sequence = self.current_indexing_snapshot.sequence;
let range = match current_sequence.cmp(&onchain_sequence_count) {
Ordering::Equal => {
Expand Down Expand Up @@ -432,7 +425,7 @@ impl<T: Debug + Clone + Sync + Send + Indexable + 'static> ForwardSequenceAwareS
}

// Updates the cursor metrics.
async fn update_metrics(&self) {
async fn update_metrics(&self, max_sequence: i64) {
let mut labels = hashmap! {
"event_type" => T::name(),
"chain" => self.domain.name(),
Expand All @@ -452,7 +445,6 @@ impl<T: Debug + Clone + Sync + Send + Indexable + 'static> ForwardSequenceAwareS
.set(sequence as i64);

labels.remove("cursor_type");
let max_sequence = self.target_sequence().await as i64;
self.metrics
.cursor_max_sequence
.with(&labels)
Expand Down Expand Up @@ -501,7 +493,6 @@ impl<T: Send + Sync + Clone + Debug + Indexable + 'static> ContractSyncCursor<T>
logs: Vec<(Indexed<T>, LogMeta)>,
range: RangeInclusive<u32>,
) -> Result<()> {
self.update_metrics().await;
// Remove any sequence duplicates, filter out any logs preceding our current snapshot,
// and sort in ascending order.
let logs = indexed_to_sequence_indexed_array(logs)?
Expand Down
12 changes: 6 additions & 6 deletions rust/main/hyperlane-base/src/metrics/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct CoreMetrics {
span_counts: IntCounterVec,
span_events: IntCounterVec,
last_known_message_nonce: IntGaugeVec,
latest_leaf_index: IntGaugeVec,
latest_tree_insertion_index: IntGaugeVec,
submitter_queue_length: IntGaugeVec,

operations_processed_count: IntCounterVec,
Expand Down Expand Up @@ -113,9 +113,9 @@ impl CoreMetrics {
registry
)?;

let latest_leaf_index = register_int_gauge_vec_with_registry!(
let latest_tree_insertion_index = register_int_gauge_vec_with_registry!(
opts!(
namespaced!("latest_leaf_index"),
namespaced!("latest_tree_insertion_index"),
"Latest leaf index inserted into the merkle tree",
const_labels_ref
),
Expand Down Expand Up @@ -188,7 +188,7 @@ impl CoreMetrics {
span_counts,
span_events,
last_known_message_nonce,
latest_leaf_index,
latest_tree_insertion_index,

submitter_queue_length,

Expand Down Expand Up @@ -325,8 +325,8 @@ impl CoreMetrics {
///
/// Labels:
/// - `origin`: Origin chain the leaf index is being tracked at.
pub fn latest_leaf_index(&self) -> IntGaugeVec {
self.latest_leaf_index.clone()
pub fn latest_tree_insertion_index(&self) -> IntGaugeVec {
self.latest_tree_insertion_index.clone()
}

/// Latest message nonce in the validator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,23 @@ pub fn termination_invariants_met(
return Ok(false);
}

let merkle_tree_max_sequence = fetch_metric(
RELAYER_METRICS_PORT,
"hyperlane_cursor_max_sequence",
&hashmap! {"event_type" => "merkle_tree_insertion"},
)?;
// check for each origin that the highest tree index seen by the syncer == # of messages sent + # of double insertions
// LHS: sum(merkle_tree_max_sequence) + len(merkle_tree_max_sequence) (each is index so we add 1 to each)
// RHS: total_messages_expected + non_matching_igp_messages + (config.kathy_messages as u32 / 4) * 2 (double insertions)
let non_zero_sequence_count =
merkle_tree_max_sequence.iter().filter(|&&x| x > 0).count() as u32;
assert_eq!(
merkle_tree_max_sequence.iter().sum::<u32>() + non_zero_sequence_count,
total_messages_expected
+ SOL_MESSAGES_WITH_NON_MATCHING_IGP
+ (config.kathy_messages as u32 / 4) * 2
);

if let Some((solana_cli_tools_path, solana_config_path)) =
solana_cli_tools_path.zip(solana_config_path)
{
Expand Down

0 comments on commit 3d6ecae

Please sign in to comment.