From 64b91061a159d7c919bf2f020c29e1cf514d8843 Mon Sep 17 00:00:00 2001 From: Ethan Oroshiba Date: Wed, 11 Sep 2024 08:30:38 -0500 Subject: [PATCH] fix(bridge-withdrawer, cli, sequencer-client): migrate from `broadcast_tx_commit` to `broadcast_tx_sync` (#1376) ## Summary Deleted all usage of `broadcast_tx_commit` and instead implemented `broadcast_tx_sync`. ## Background Previously, we relied on `broadcast_tx_commit` to submit transactions to the sequencer. [CometBFT RPC docs warn against this](https://docs.cometbft.com/main/rpc/#/Tx/broadcast_tx_commit) and instead favor using `broadcast_tx_sync`. ## Changes - Deleted all instances of `broadcast_tx_commit`, replacing with `broadcast_tx_sync`. - Added function `wait_for_tx_inclusion()` to probe the sequencer client for a new transaction of the given hash with backoff. ## Testing - Added unit test to `sequencer-client` to ensure `wait_for_tx_inclusion()` works properly ## Related Issues closes #1283 --------- Co-authored-by: noot <36753753+noot@users.noreply.github.com> --- Cargo.lock | 3 + .../src/bridge_withdrawer/submitter/mod.rs | 46 ++++++++---- .../tests/blackbox/helpers/mock_cometbft.rs | 62 +++++++--------- .../helpers/test_bridge_withdrawer.rs | 12 ++-- .../tests/blackbox/main.rs | 8 +-- .../astria-cli/src/commands/bridge/submit.rs | 21 +++--- crates/astria-cli/src/commands/sequencer.rs | 22 +++--- crates/astria-sequencer-client/Cargo.toml | 3 + .../src/extension_trait.rs | 71 ++++++++++++++----- .../astria-sequencer-client/src/tests/http.rs | 67 ++++++++++++----- 10 files changed, 197 insertions(+), 118 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1773ce6a2b..e63fd11611 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -835,11 +835,13 @@ name = "astria-sequencer-client" version = "0.1.0" dependencies = [ "astria-core", + "astria-eyre", "async-trait", "futures", "futures-util", "hex", "hex-literal", + "humantime", "prost", "regex", "serde", @@ -852,6 +854,7 @@ dependencies = [ "tokio-stream", "tokio-test", "tracing", + "tryhard", "wiremock", ] diff --git a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs index 21ffba98b5..620bc55f00 100644 --- a/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs +++ b/crates/astria-bridge-withdrawer/src/bridge_withdrawer/submitter/mod.rs @@ -19,13 +19,17 @@ use astria_core::{ }; use astria_eyre::eyre::{ self, + ensure, eyre, Context, }; pub(crate) use builder::Builder; pub(super) use builder::Handle; use sequencer_client::{ - tendermint_rpc::endpoint::broadcast::tx_commit, + tendermint_rpc::endpoint::{ + broadcast::tx_sync, + tx, + }, Address, SequencerClientExt, SignedTransaction, @@ -167,7 +171,7 @@ impl Submitter { debug!(tx_hash = %telemetry::display::hex(&signed.sha256_of_proto_encoding()), "signed transaction"); // submit transaction and handle response - let rsp = submit_tx( + let (check_tx, tx_response) = submit_tx( sequencer_cometbft_client.clone(), signed, state.clone(), @@ -175,31 +179,31 @@ impl Submitter { ) .await .context("failed to submit transaction to cometbft")?; - if let tendermint::abci::Code::Err(check_tx_code) = rsp.check_tx.code { + if let tendermint::abci::Code::Err(check_tx_code) = check_tx.code { Err(eyre!( "check_tx failure upon submitting transaction to sequencer: transaction failed to \ be included in the mempool, aborting. abci.code = {check_tx_code}, abci.log = \ {}, rollup.height = {rollup_height}", - rsp.check_tx.log + check_tx.log )) - } else if let tendermint::abci::Code::Err(deliver_tx_code) = rsp.tx_result.code { + } else if let tendermint::abci::Code::Err(deliver_tx_code) = tx_response.tx_result.code { Err(eyre!( "deliver_tx failure upon submitting transaction to sequencer: transaction failed \ to be executed in a block, aborting. abci.code = {deliver_tx_code}, abci.log = \ {}, rollup.height = {rollup_height}", - rsp.tx_result.log, + tx_response.tx_result.log, )) } else { // update state after successful submission info!( - sequencer.block = rsp.height.value(), - sequencer.tx_hash = %rsp.hash, + sequencer.block = tx_response.height.value(), + sequencer.tx_hash = %tx_response.hash, rollup.height = rollup_height, "withdraw batch successfully executed." ); state.set_last_rollup_height_submitted(rollup_height); - state.set_last_sequencer_height(rsp.height.value()); - state.set_last_sequencer_tx_hash(rsp.hash); + state.set_last_sequencer_height(tx_response.height.value()); + state.set_last_sequencer_tx_hash(tx_response.hash); Ok(()) } } @@ -230,7 +234,7 @@ async fn submit_tx( tx: SignedTransaction, state: Arc, metrics: &'static Metrics, -) -> eyre::Result { +) -> eyre::Result<(tx_sync::Response, tx::Response)> { let nonce = tx.nonce(); metrics.set_current_nonce(nonce); let start = std::time::Instant::now(); @@ -261,21 +265,33 @@ async fn submit_tx( async move {} }, ); - let res = tryhard::retry_fn(|| { + let check_tx = tryhard::retry_fn(|| { let client = client.clone(); let tx = tx.clone(); let span = info_span!(parent: span.clone(), "attempt send"); - async move { client.submit_transaction_commit(tx).await }.instrument(span) + async move { client.submit_transaction_sync(tx).await }.instrument(span) }) .with_config(retry_config) .await .wrap_err("failed sending transaction after 1024 attempts"); - state.set_sequencer_connected(res.is_ok()); + state.set_sequencer_connected(check_tx.is_ok()); metrics.record_sequencer_submission_latency(start.elapsed()); - res + let check_tx = check_tx?; + + ensure!(check_tx.code.is_ok(), "check_tx failed: {}", check_tx.log); + + let tx_response = client.wait_for_tx_inclusion(check_tx.hash).await; + + ensure!( + tx_response.tx_result.code.is_ok(), + "deliver_tx failed: {}", + tx_response.tx_result.log + ); + + Ok((check_tx, tx_response)) } #[instrument(skip_all, err)] diff --git a/crates/astria-bridge-withdrawer/tests/blackbox/helpers/mock_cometbft.rs b/crates/astria-bridge-withdrawer/tests/blackbox/helpers/mock_cometbft.rs index d6c7a0ea2d..90dc6790a9 100644 --- a/crates/astria-bridge-withdrawer/tests/blackbox/helpers/mock_cometbft.rs +++ b/crates/astria-bridge-withdrawer/tests/blackbox/helpers/mock_cometbft.rs @@ -10,19 +10,13 @@ use sequencer_client::{ SignedTransaction, }; use tendermint::{ - abci::{ - response::CheckTx, - types::ExecTxResult, - }, + abci::types::ExecTxResult, block::Height, chain, }; use tendermint_rpc::{ endpoint::{ - broadcast::{ - tx_commit, - tx_sync, - }, + broadcast::tx_sync, tx, }, response, @@ -46,38 +40,37 @@ use super::test_bridge_withdrawer::{ }; #[must_use] -pub fn make_tx_commit_success_response() -> tx_commit::Response { - tx_commit::Response { - check_tx: CheckTx::default(), - tx_result: ExecTxResult::default(), +pub fn make_tx_sync_success_response() -> tx_sync::Response { + tx_sync::Response { + code: 0.into(), + data: vec![].into(), + log: "tx success".to_string(), hash: vec![0u8; 32].try_into().unwrap(), - height: Height::default(), } } #[must_use] -pub fn make_tx_commit_check_tx_failure_response() -> tx_commit::Response { - tx_commit::Response { - check_tx: CheckTx { - code: 1.into(), - ..CheckTx::default() - }, - tx_result: ExecTxResult::default(), +pub fn make_tx_sync_failure_response() -> tx_sync::Response { + tx_sync::Response { + code: 1.into(), + data: vec![].into(), + log: "tx failed".to_string(), hash: vec![0u8; 32].try_into().unwrap(), - height: Height::default(), } } #[must_use] -pub fn make_tx_commit_deliver_tx_failure_response() -> tx_commit::Response { - tx_commit::Response { - check_tx: CheckTx::default(), +pub fn make_tx_failure_response() -> tx::Response { + tx::Response { + hash: vec![0u8; 32].try_into().unwrap(), + height: Height::default(), + index: 0, tx_result: ExecTxResult { code: 1.into(), ..ExecTxResult::default() }, - hash: vec![0u8; 32].try_into().unwrap(), - height: Height::default(), + tx: vec![], + proof: None, } } @@ -308,28 +301,25 @@ fn prepare_tx_response(response: tx::Response) -> Mock { .expect(1) } -pub async fn mount_broadcast_tx_commit_response( - server: &MockServer, - response: tx_commit::Response, -) { - prepare_broadcast_tx_commit_response(response) +pub async fn mount_broadcast_tx_sync_response(server: &MockServer, response: tx_sync::Response) { + prepare_broadcast_tx_sync_response(response) .mount(server) .await; } -pub async fn mount_broadcast_tx_commit_response_as_scoped( +pub async fn mount_broadcast_tx_sync_response_as_scoped( server: &MockServer, - response: tx_commit::Response, + response: tx_sync::Response, ) -> MockGuard { - prepare_broadcast_tx_commit_response(response) + prepare_broadcast_tx_sync_response(response) .mount_as_scoped(server) .await } -fn prepare_broadcast_tx_commit_response(response: tx_commit::Response) -> Mock { +fn prepare_broadcast_tx_sync_response(response: tx_sync::Response) -> Mock { let wrapper = response::Wrapper::new_with_id(tendermint_rpc::Id::Num(1), Some(response), None); Mock::given(body_partial_json(serde_json::json!({ - "method": "broadcast_tx_commit" + "method": "broadcast_tx_sync" }))) .respond_with( ResponseTemplate::new(200) diff --git a/crates/astria-bridge-withdrawer/tests/blackbox/helpers/test_bridge_withdrawer.rs b/crates/astria-bridge-withdrawer/tests/blackbox/helpers/test_bridge_withdrawer.rs index a301a26b69..983e8dc5be 100644 --- a/crates/astria-bridge-withdrawer/tests/blackbox/helpers/test_bridge_withdrawer.rs +++ b/crates/astria-bridge-withdrawer/tests/blackbox/helpers/test_bridge_withdrawer.rs @@ -50,13 +50,13 @@ use tracing::{ use super::{ ethereum::AstriaBridgeableERC20DeployerConfig, - make_tx_commit_success_response, + make_tx_sync_success_response, mock_cometbft::{ mount_default_chain_id, mount_get_nonce_response, mount_native_fee_asset, }, - mount_broadcast_tx_commit_response_as_scoped, + mount_broadcast_tx_sync_response_as_scoped, mount_ibc_fee_asset, mount_last_bridge_tx_hash_response, MockSequencerServer, @@ -225,12 +225,10 @@ impl TestBridgeWithdrawer { .await; } - pub async fn mount_broadcast_tx_commit_success_response_as_scoped( - &self, - ) -> wiremock::MockGuard { - mount_broadcast_tx_commit_response_as_scoped( + pub async fn mount_broadcast_tx_sync_success_response_as_scoped(&self) -> wiremock::MockGuard { + mount_broadcast_tx_sync_response_as_scoped( &self.cometbft_mock, - make_tx_commit_success_response(), + make_tx_sync_success_response(), ) .await } diff --git a/crates/astria-bridge-withdrawer/tests/blackbox/main.rs b/crates/astria-bridge-withdrawer/tests/blackbox/main.rs index 00aa141f39..4d4488efb0 100644 --- a/crates/astria-bridge-withdrawer/tests/blackbox/main.rs +++ b/crates/astria-bridge-withdrawer/tests/blackbox/main.rs @@ -20,7 +20,7 @@ async fn native_sequencer_withdraw_success() { .mount_pending_nonce_response(1, "process batch 1") .await; let broadcast_guard = test_env - .mount_broadcast_tx_commit_success_response_as_scoped() + .mount_broadcast_tx_sync_success_response_as_scoped() .await; // send a native sequencer withdrawal tx to the rollup @@ -57,7 +57,7 @@ async fn native_ics20_withdraw_success() { .mount_pending_nonce_response(1, "process batch 1") .await; let broadcast_guard = test_env - .mount_broadcast_tx_commit_success_response_as_scoped() + .mount_broadcast_tx_sync_success_response_as_scoped() .await; // send an ics20 withdrawal tx to the rollup @@ -94,7 +94,7 @@ async fn erc20_sequencer_withdraw_success() { .mount_pending_nonce_response(1, "process batch 1") .await; let broadcast_guard = test_env - .mount_broadcast_tx_commit_success_response_as_scoped() + .mount_broadcast_tx_sync_success_response_as_scoped() .await; // mint some erc20 tokens to the rollup wallet @@ -137,7 +137,7 @@ async fn erc20_ics20_withdraw_success() { .mount_pending_nonce_response(1, "process batch 1") .await; let broadcast_guard = test_env - .mount_broadcast_tx_commit_success_response_as_scoped() + .mount_broadcast_tx_sync_success_response_as_scoped() .await; // mint some erc20 tokens to the rollup wallet diff --git a/crates/astria-cli/src/commands/bridge/submit.rs b/crates/astria-cli/src/commands/bridge/submit.rs index 7287fee1a4..c332a5a6d1 100644 --- a/crates/astria-cli/src/commands/bridge/submit.rs +++ b/crates/astria-cli/src/commands/bridge/submit.rs @@ -12,7 +12,7 @@ use astria_core::{ }, }; use astria_sequencer_client::{ - tendermint_rpc::endpoint, + tendermint_rpc::endpoint::tx::Response, Address, HttpClient, SequencerClientExt as _, @@ -117,7 +117,7 @@ async fn submit_transaction( prefix: &str, signing_key: &SigningKey, actions: Vec, -) -> eyre::Result { +) -> eyre::Result { let from_address = Address::builder() .array(signing_key.verification_key().address_bytes()) .prefix(prefix) @@ -138,18 +138,17 @@ async fn submit_transaction( } .into_signed(signing_key); let res = client - .submit_transaction_commit(tx) + .submit_transaction_sync(tx) .await .wrap_err("failed to submit transaction")?; + + let tx_response = client.wait_for_tx_inclusion(res.hash).await; + + ensure!(res.code.is_ok(), "failed to check tx: {}", res.log); ensure!( - res.check_tx.code.is_ok(), - "failed to check tx: {}", - res.check_tx.log - ); - ensure!( - res.tx_result.code.is_ok(), + tx_response.tx_result.code.is_ok(), "failed to execute tx: {}", - res.tx_result.log + tx_response.tx_result.log ); - Ok(res) + Ok(tx_response) } diff --git a/crates/astria-cli/src/commands/sequencer.rs b/crates/astria-cli/src/commands/sequencer.rs index c7c601ee62..46259e4763 100644 --- a/crates/astria-cli/src/commands/sequencer.rs +++ b/crates/astria-cli/src/commands/sequencer.rs @@ -21,7 +21,7 @@ use astria_core::{ }, }; use astria_sequencer_client::{ - tendermint_rpc::endpoint, + tendermint_rpc::endpoint::tx::Response, Client, HttpClient, SequencerClientExt, @@ -453,7 +453,7 @@ async fn submit_transaction( prefix: &str, private_key: &str, action: Action, -) -> eyre::Result { +) -> eyre::Result { let sequencer_client = HttpClient::new(sequencer_url).wrap_err("failed constructing http sequencer client")?; @@ -484,20 +484,20 @@ async fn submit_transaction( } .into_signed(&sequencer_key); let res = sequencer_client - .submit_transaction_commit(tx) + .submit_transaction_sync(tx) .await .wrap_err("failed to submit transaction")?; + + let tx_response = sequencer_client.wait_for_tx_inclusion(res.hash).await; + + ensure!(res.code.is_ok(), "failed to check tx: {}", res.log); + ensure!( - res.check_tx.code.is_ok(), - "failed to check tx: {}", - res.check_tx.log - ); - ensure!( - res.tx_result.code.is_ok(), + tx_response.tx_result.code.is_ok(), "failed to execute tx: {}", - res.tx_result.log + tx_response.tx_result.log ); - Ok(res) + Ok(tx_response) } #[cfg(test)] diff --git a/crates/astria-sequencer-client/Cargo.toml b/crates/astria-sequencer-client/Cargo.toml index d86fa7bb6a..5921d6a281 100644 --- a/crates/astria-sequencer-client/Cargo.toml +++ b/crates/astria-sequencer-client/Cargo.toml @@ -9,16 +9,19 @@ homepage = "https://astria.org" [dependencies] astria-core = { path = "../astria-core" } +astria-eyre = { path = "../astria-eyre" } async-trait = { workspace = true } futures = { workspace = true } hex = { workspace = true } +humantime = { workspace = true } prost = { workspace = true } tendermint = { workspace = true } tendermint-proto = { workspace = true } tendermint-rpc = { workspace = true } tokio = { version = "1.3.6", default-features = false, features = ["time"] } tracing = { workspace = true } +tryhard = { workspace = true } thiserror = { workspace = true } tokio-stream = { workspace = true } futures-util = "0.3.30" diff --git a/crates/astria-sequencer-client/src/extension_trait.rs b/crates/astria-sequencer-client/src/extension_trait.rs index 7f68100582..671d562310 100644 --- a/crates/astria-sequencer-client/src/extension_trait.rs +++ b/crates/astria-sequencer-client/src/extension_trait.rs @@ -70,14 +70,15 @@ use tendermint_rpc::HttpClient; #[cfg(feature = "websocket")] use tendermint_rpc::WebSocketClient; use tendermint_rpc::{ - endpoint::broadcast::{ - tx_commit, - tx_sync, - }, + endpoint::broadcast::tx_sync, event::EventData, Client, SubscriptionClient, }; +use tracing::{ + instrument, + warn, +}; #[cfg(feature = "http")] impl SequencerClientExt for HttpClient {} @@ -664,21 +665,59 @@ pub trait SequencerClientExt: Client { .map_err(|e| Error::tendermint_rpc("broadcast_tx_sync", e)) } - /// Submits the given transaction to the Sequencer node. - /// - /// This method blocks until the transaction is committed. - /// It returns the results of `CheckTx` and `DeliverTx`. + /// Probes the sequencer for a transaction of given hash with a backoff. /// /// # Errors /// - /// - If calling the tendermint RPC endpoint fails. - async fn submit_transaction_commit( + /// - If the transaction is not found. + /// - If the transaction execution failed. + /// - If the transaction proof is missing. + #[allow(clippy::blocks_in_conditions)] // Allow: erroneous clippy warning. Should be fixed in Rust 1.81 + #[instrument(skip_all)] + async fn wait_for_tx_inclusion( &self, - tx: SignedTransaction, - ) -> Result { - let tx_bytes = tx.into_raw().encode_to_vec(); - self.broadcast_tx_commit(tx_bytes) - .await - .map_err(|e| Error::tendermint_rpc("broadcast_tx_commit", e)) + tx_hash: tendermint::hash::Hash, + ) -> tendermint_rpc::endpoint::tx::Response { + use std::time::Duration; + + use tokio::time::Instant; + + // The min seconds to sleep after receiving a GetTx response and sending the next request. + const MIN_POLL_INTERVAL_MILLIS: u64 = 100; + // The max seconds to sleep after receiving a GetTx response and sending the next request. + const MAX_POLL_INTERVAL_MILLIS: u64 = 2000; + // How long to wait after starting `confirm_submission` before starting to log errors. + const START_LOGGING_DELAY: Duration = Duration::from_millis(2000); + // The minimum duration between logging errors. + const LOG_ERROR_INTERVAL: Duration = Duration::from_millis(2000); + + let start = Instant::now(); + let mut logged_at = start; + + let mut log_if_due = |error: tendermint_rpc::Error| { + if start.elapsed() <= START_LOGGING_DELAY || logged_at.elapsed() <= LOG_ERROR_INTERVAL { + return; + } + warn!( + %error, + %tx_hash, + elapsed_seconds = start.elapsed().as_secs_f32(), + "waiting to confirm transaction inclusion" + ); + logged_at = Instant::now(); + }; + + let mut sleep_millis = MIN_POLL_INTERVAL_MILLIS; + loop { + tokio::time::sleep(Duration::from_millis(sleep_millis)).await; + match self.tx(tx_hash, false).await { + Ok(tx) => return tx, + Err(error) => { + sleep_millis = + std::cmp::min(sleep_millis.saturating_mul(2), MAX_POLL_INTERVAL_MILLIS); + log_if_due(error); + } + } + } } } diff --git a/crates/astria-sequencer-client/src/tests/http.rs b/crates/astria-sequencer-client/src/tests/http.rs index df339f0691..5bfbc89f00 100644 --- a/crates/astria-sequencer-client/src/tests/http.rs +++ b/crates/astria-sequencer-client/src/tests/http.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use astria_core::{ crypto::SigningKey, generated::protocol::asset::v1alpha1::AllowedFeeAssetsResponse, @@ -13,14 +15,21 @@ use hex_literal::hex; use prost::bytes::Bytes; use serde_json::json; use tendermint::{ + abci::{ + self, + Code, + }, block::Height, + merkle, + tx::Proof, Hash, }; use tendermint_rpc::{ - endpoint::broadcast::tx_commit::v0_37::DialectResponse, + endpoint::tx, response::Wrapper, Id, }; +use tokio::time::timeout; use wiremock::{ matchers::{ body_partial_json, @@ -116,13 +125,10 @@ async fn register_broadcast_tx_sync_response( .await } -async fn register_broadcast_tx_commit_response( - server: &MockServer, - response: DialectResponse, -) -> MockGuard { +async fn register_tx_response(server: &MockServer, response: tx::Response) -> MockGuard { let wrapper = Wrapper::new_with_id(Id::Num(1), Some(response), None); Mock::given(body_partial_json(json!({ - "method": "broadcast_tx_commit" + "method": "tx" }))) .respond_with( ResponseTemplate::new(200) @@ -371,25 +377,50 @@ async fn submit_tx_sync() { } #[tokio::test] -async fn submit_tx_commit() { - use tendermint_rpc::dialect; - +async fn wait_for_tx_inclusion() { let MockSequencer { server, client, } = MockSequencer::start().await; + let proof = Proof { + root_hash: Hash::Sha256([0; 32]), + data: vec![1, 2, 3, 4], + proof: merkle::Proof { + total: 1, + index: 1, + leaf_hash: Hash::Sha256([0; 32]), + aunts: vec![], + }, + }; - let server_response = DialectResponse { - check_tx: dialect::CheckTx::default(), - deliver_tx: dialect::DeliverTx::default(), + let tx_server_response = tx::Response { hash: Hash::Sha256([0; 32]), - height: Height::from(1u32), + height: Height::try_from(1u64).unwrap(), + index: 1, + tx_result: abci::types::ExecTxResult { + code: Code::default(), + data: Bytes::from(vec![1, 2, 3, 4]), + log: "ethan was here".to_string(), + info: String::new(), + gas_wanted: 0, + gas_used: 0, + events: vec![], + codespace: String::new(), + }, + tx: vec![], + proof: Some(proof), }; - let _guard = register_broadcast_tx_commit_response(&server, server_response).await; - let signed_tx = create_signed_transaction(); + let _tx_response_guard = register_tx_response(&server, tx_server_response.clone()).await; + + let response = client.wait_for_tx_inclusion(tx_server_response.hash); + + let response = timeout(Duration::from_millis(1000), response) + .await + .expect("should have received a transaction response within 1000ms"); - let response = client.submit_transaction_commit(signed_tx).await.unwrap(); - assert_eq!(response.check_tx.code, 0.into()); - assert_eq!(response.tx_result.code, 0.into()); + assert_eq!(response.tx_result.code, tx_server_response.tx_result.code); + assert_eq!(response.tx_result.data, tx_server_response.tx_result.data); + assert_eq!(response.tx_result.log, tx_server_response.tx_result.log); + assert_eq!(response.hash, tx_server_response.hash); }