Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: attempt to fetch operation status in database upon relayer restart #5182

Merged
merged 27 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
1242b7b
feat: attempt to fetch operation status in database upon relayer restart
kamiyaa Jan 2, 2025
4a9636b
add run_restart_check
kamiyaa Jan 5, 2025
79ae529
add e2e check for db message status retrieval
kamiyaa Jan 15, 2025
8140788
clippy
kamiyaa Jan 15, 2025
027265e
revert config
kamiyaa Jan 15, 2025
468ec37
add debug message when status is not found in db
kamiyaa Jan 20, 2025
c4de13d
refactor from_persisted_retries
kamiyaa Jan 20, 2025
78203a9
update E2E to check for message status invariant
kamiyaa Jan 21, 2025
451f78d
remove spacing
kamiyaa Jan 23, 2025
62d60c6
fix spacing
kamiyaa Jan 23, 2025
98499ab
refactor get_matching_lines to take in a slice of Vec<String>
kamiyaa Jan 24, 2025
709da91
cargo clippy
kamiyaa Jan 24, 2025
99e651d
Update rust/main/utils/run-locally/src/utils.rs
kamiyaa Jan 24, 2025
87a35ee
refactor: refactor get_matching_lines
kamiyaa Jan 24, 2025
858e1a0
refactor: move wait loop logic into function
kamiyaa Jan 24, 2025
b98e781
Merge branch 'main' into jeff/operation-status
kamiyaa Jan 27, 2025
548ea10
style: cargo clippy
kamiyaa Jan 27, 2025
57fa387
fix: add Default impl to MerkleTreeBuilder to satisfy clippy
kamiyaa Jan 27, 2025
a6159ea
Merge branch 'main' into jeff/operation-status
kamiyaa Jan 28, 2025
99fd6f7
fix: wait_for_condition returning incorrect value
kamiyaa Jan 28, 2025
6d240cb
fix: address comments
kamiyaa Jan 28, 2025
041919f
style: clippy
kamiyaa Jan 28, 2025
e03e37b
feat: add loop_invariant as a param as well
kamiyaa Jan 29, 2025
1f189d6
fix: add liveliness check to both wait_for_condition calls
kamiyaa Jan 29, 2025
28808ff
Merge branch 'main' into jeff/operation-status
kamiyaa Jan 29, 2025
a91b08d
fix: rename function
kamiyaa Jan 29, 2025
d50db51
fix: add shutdown
kamiyaa Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust/main/agents/relayer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod msg;

mod merkle_tree;
mod msg;
mod processor;
mod prover;
mod relayer;
Expand Down
6 changes: 6 additions & 0 deletions rust/main/agents/relayer/src/merkle_tree/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ impl Display for MerkleTreeBuilder {
}
}

impl Default for MerkleTreeBuilder {
fn default() -> Self {
Self::new()
}
}

/// MerkleTreeBuilder errors
#[derive(Debug, thiserror::Error)]
pub enum MerkleTreeBuilderError {
Expand Down
3 changes: 2 additions & 1 deletion rust/main/agents/relayer/src/msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ pub(crate) mod gas_payment;
pub(crate) mod metadata;
pub(crate) mod op_queue;
pub(crate) mod op_submitter;
pub(crate) mod pending_message;
pub(crate) mod processor;

pub mod pending_message;

pub use gas_payment::GAS_EXPENDITURE_LOG_MESSAGE;
10 changes: 2 additions & 8 deletions rust/main/agents/relayer/src/msg/op_submitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,8 @@ async fn receive_task(
// make sure things are getting wired up correctly; if this works in testing it
// should also be valid in production.
debug_assert_eq!(*op.destination_domain(), domain);
let status = op.retrieve_status_from_db().unwrap_or_else(|| {
trace!(
?op,
"No status found for message, defaulting to FirstPrepareAttempt"
);
PendingOperationStatus::FirstPrepareAttempt
});
prepare_queue.push(op, Some(status)).await;
let op_status = op.status();
prepare_queue.push(op, Some(op_status)).await;
}
}

Expand Down
64 changes: 46 additions & 18 deletions rust/main/agents/relayer/src/msg/pending_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use hyperlane_core::{
};
use prometheus::{IntCounter, IntGauge};
use serde::Serialize;
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument};
use tracing::{debug, error, info, info_span, instrument, trace, warn, Instrument, Level};

use super::{
gas_payment::{GasPaymentEnforcer, GasPolicyStatus},
Expand All @@ -36,6 +36,8 @@ pub const CONFIRM_DELAY: Duration = if cfg!(any(test, feature = "test-utils")) {
Duration::from_secs(60 * 10)
};

pub const RETRIEVED_MESSAGE_LOG: &str = "Message status retrieved from db";

/// The message context contains the links needed to submit a message. Each
/// instance is for a unique origin -> destination pairing.
pub struct MessageContext {
Expand Down Expand Up @@ -510,27 +512,53 @@ impl PendingMessage {
ctx: Arc<MessageContext>,
app_context: Option<String>,
) -> Self {
let mut pm = Self::new(
message,
ctx,
// Since we don't persist the message status for now, assume it's the first attempt
PendingOperationStatus::FirstPrepareAttempt,
app_context,
);
match pm
.ctx
// Attempt to fetch status about message from database
let message_status = match ctx.origin_db.retrieve_status_by_message_id(&message.id()) {
kamiyaa marked this conversation as resolved.
Show resolved Hide resolved
Ok(Some(status)) => {
// This event is used for E2E tests to ensure message statuses
// are being properly loaded from the db
tracing::event!(
if cfg!(feature = "test-utils") {
Level::DEBUG
} else {
Level::TRACE
},
?status,
id=?message.id(),
RETRIEVED_MESSAGE_LOG,
);
status
}
_ => {
tracing::event!(
if cfg!(feature = "test-utils") {
Level::DEBUG
} else {
Level::TRACE
},
"Message status not found in db"
);
PendingOperationStatus::FirstPrepareAttempt
}
};

let num_retries = match ctx
.origin_db
.retrieve_pending_message_retry_count_by_message_id(&pm.message.id())
.retrieve_pending_message_retry_count_by_message_id(&message.id())
{
Ok(Some(num_retries)) => {
let next_attempt_after = PendingMessage::calculate_msg_backoff(num_retries)
.map(|dur| Instant::now() + dur);
pm.num_retries = num_retries;
pm.next_attempt_after = next_attempt_after;
}
Ok(Some(num_retries)) => num_retries,
r => {
trace!(message_id = ?pm.message.id(), result = ?r, "Failed to read retry count from HyperlaneDB for message.")
trace!(message_id = ?message.id(), result = ?r, "Failed to read retry count from HyperlaneDB for message.");
0
}
};

let mut pm = Self::new(message, ctx, message_status, app_context);
if num_retries > 0 {
let next_attempt_after =
PendingMessage::calculate_msg_backoff(num_retries).map(|dur| Instant::now() + dur);
pm.num_retries = num_retries;
pm.next_attempt_after = next_attempt_after;
}
pm
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,52 +90,70 @@ pub fn termination_invariants_met(
const TX_ID_INDEXING_LOG_MESSAGE: &str = "Found log(s) for tx id";

let relayer_logfile = File::open(log_file_path)?;
let invariant_logs = &[
STORING_NEW_MESSAGE_LOG_MESSAGE,
LOOKING_FOR_EVENTS_LOG_MESSAGE,
GAS_EXPENDITURE_LOG_MESSAGE,
HYPER_INCOMING_BODY_LOG_MESSAGE,
TX_ID_INDEXING_LOG_MESSAGE,

let storing_new_msg_line_filter = vec![STORING_NEW_MESSAGE_LOG_MESSAGE];
let looking_for_events_line_filter = vec![LOOKING_FOR_EVENTS_LOG_MESSAGE];
let gas_expenditure_line_filter = vec![GAS_EXPENDITURE_LOG_MESSAGE];
let hyper_incoming_body_line_filter = vec![HYPER_INCOMING_BODY_LOG_MESSAGE];
let tx_id_indexing_line_filter = vec![TX_ID_INDEXING_LOG_MESSAGE];
let invariant_logs = vec![
storing_new_msg_line_filter.clone(),
looking_for_events_line_filter.clone(),
gas_expenditure_line_filter.clone(),
hyper_incoming_body_line_filter.clone(),
tx_id_indexing_line_filter.clone(),
];
let log_counts = get_matching_lines(&relayer_logfile, invariant_logs);

// Zero insertion messages don't reach `submit` stage where gas is spent, so we only expect these logs for the other messages.
// TODO: Sometimes we find more logs than expected. This may either mean that gas is deducted twice for the same message due to a bug,
// or that submitting the message transaction fails for some messages. Figure out which is the case and convert this check to
// strict equality.
// EDIT: Having had a quick look, it seems like there are some legitimate reverts happening in the confirm step
// (`Transaction attempting to process message either reverted or was reorged`)
// in which case more gas expenditure logs than messages are expected.
let gas_expenditure_log_count = log_counts.get(GAS_EXPENDITURE_LOG_MESSAGE).unwrap();
let gas_expenditure_log_count = *log_counts
.get(&gas_expenditure_line_filter)
.expect("Failed to get gas expenditure log count");
assert!(
gas_expenditure_log_count >= &total_messages_expected,
gas_expenditure_log_count >= total_messages_expected,
"Didn't record gas payment for all delivered messages. Got {} gas payment logs, expected at least {}",
gas_expenditure_log_count,
total_messages_expected
);
// These tests check that we fixed https://github.com/hyperlane-xyz/hyperlane-monorepo/issues/3915, where some logs would not show up

let storing_new_msg_log_count = *log_counts
.get(&storing_new_msg_line_filter)
.expect("Failed to get storing new msg log count");
assert!(
log_counts.get(STORING_NEW_MESSAGE_LOG_MESSAGE).unwrap() > &0,
storing_new_msg_log_count > 0,
"Didn't find any logs about storing messages in db"
);
let looking_for_events_log_count = *log_counts
.get(&looking_for_events_line_filter)
.expect("Failed to get looking for events log count");
assert!(
log_counts.get(LOOKING_FOR_EVENTS_LOG_MESSAGE).unwrap() > &0,
looking_for_events_log_count > 0,
"Didn't find any logs about looking for events in index range"
);
let total_tx_id_log_count = log_counts.get(TX_ID_INDEXING_LOG_MESSAGE).unwrap();
let total_tx_id_log_count = *log_counts
.get(&tx_id_indexing_line_filter)
.expect("Failed to get tx id indexing log count");
assert!(
// there are 3 txid-indexed events:
// - relayer: merkle insertion and gas payment
// - scraper: gas payment
// some logs are emitted for multiple events, so requiring there to be at least
// `config.kathy_messages` logs is a reasonable approximation, since all three of these events
// are expected to be logged for each message.
*total_tx_id_log_count as u64 >= config.kathy_messages,
total_tx_id_log_count as u64 >= config.kathy_messages,
"Didn't find as many tx id logs as expected. Found {} and expected {}",
total_tx_id_log_count,
config.kathy_messages
);
assert!(
log_counts.get(HYPER_INCOMING_BODY_LOG_MESSAGE).is_none(),
log_counts.get(&hyper_incoming_body_line_filter).is_none(),
"Verbose logs not expected at the log level set in e2e"
);

Expand Down
Loading