Skip to content

Commit

Permalink
fix(bridge-withdrawer, cli, sequencer-client): migrate from `broadcas…
Browse files Browse the repository at this point in the history
…t_tx_commit` to `broadcast_tx_sync` (#1376)

## Summary
Deleted all usage of `broadcast_tx_commit` and instead implemented
`broadcast_tx_sync`.

## Background
Previously, we relied on `broadcast_tx_commit` to submit transactions to
the sequencer. [CometBFT RPC docs warn against
this](https://docs.cometbft.com/main/rpc/#/Tx/broadcast_tx_commit) and
instead favor using `broadcast_tx_sync`.

## Changes
- Deleted all instances of `broadcast_tx_commit`, replacing with
`broadcast_tx_sync`.
- Added function `wait_for_tx_inclusion()` to probe the sequencer client
for a new transaction of the given hash with backoff.

## Testing
- Added unit test to `sequencer-client` to ensure
`wait_for_tx_inclusion()` works properly

## Related Issues
closes #1283

---------

Co-authored-by: noot <[email protected]>
  • Loading branch information
ethanoroshiba and noot authored Sep 11, 2024
1 parent 665479b commit 64b9106
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 118 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ use astria_core::{
};
use astria_eyre::eyre::{
self,
ensure,
eyre,
Context,
};
pub(crate) use builder::Builder;
pub(super) use builder::Handle;
use sequencer_client::{
tendermint_rpc::endpoint::broadcast::tx_commit,
tendermint_rpc::endpoint::{
broadcast::tx_sync,
tx,
},
Address,
SequencerClientExt,
SignedTransaction,
Expand Down Expand Up @@ -167,39 +171,39 @@ impl Submitter {
debug!(tx_hash = %telemetry::display::hex(&signed.sha256_of_proto_encoding()), "signed transaction");

// submit transaction and handle response
let rsp = submit_tx(
let (check_tx, tx_response) = submit_tx(
sequencer_cometbft_client.clone(),
signed,
state.clone(),
metrics,
)
.await
.context("failed to submit transaction to cometbft")?;
if let tendermint::abci::Code::Err(check_tx_code) = rsp.check_tx.code {
if let tendermint::abci::Code::Err(check_tx_code) = check_tx.code {
Err(eyre!(
"check_tx failure upon submitting transaction to sequencer: transaction failed to \
be included in the mempool, aborting. abci.code = {check_tx_code}, abci.log = \
{}, rollup.height = {rollup_height}",
rsp.check_tx.log
check_tx.log
))
} else if let tendermint::abci::Code::Err(deliver_tx_code) = rsp.tx_result.code {
} else if let tendermint::abci::Code::Err(deliver_tx_code) = tx_response.tx_result.code {
Err(eyre!(
"deliver_tx failure upon submitting transaction to sequencer: transaction failed \
to be executed in a block, aborting. abci.code = {deliver_tx_code}, abci.log = \
{}, rollup.height = {rollup_height}",
rsp.tx_result.log,
tx_response.tx_result.log,
))
} else {
// update state after successful submission
info!(
sequencer.block = rsp.height.value(),
sequencer.tx_hash = %rsp.hash,
sequencer.block = tx_response.height.value(),
sequencer.tx_hash = %tx_response.hash,
rollup.height = rollup_height,
"withdraw batch successfully executed."
);
state.set_last_rollup_height_submitted(rollup_height);
state.set_last_sequencer_height(rsp.height.value());
state.set_last_sequencer_tx_hash(rsp.hash);
state.set_last_sequencer_height(tx_response.height.value());
state.set_last_sequencer_tx_hash(tx_response.hash);
Ok(())
}
}
Expand Down Expand Up @@ -230,7 +234,7 @@ async fn submit_tx(
tx: SignedTransaction,
state: Arc<State>,
metrics: &'static Metrics,
) -> eyre::Result<tx_commit::Response> {
) -> eyre::Result<(tx_sync::Response, tx::Response)> {
let nonce = tx.nonce();
metrics.set_current_nonce(nonce);
let start = std::time::Instant::now();
Expand Down Expand Up @@ -261,21 +265,33 @@ async fn submit_tx(
async move {}
},
);
let res = tryhard::retry_fn(|| {
let check_tx = tryhard::retry_fn(|| {
let client = client.clone();
let tx = tx.clone();
let span = info_span!(parent: span.clone(), "attempt send");
async move { client.submit_transaction_commit(tx).await }.instrument(span)
async move { client.submit_transaction_sync(tx).await }.instrument(span)
})
.with_config(retry_config)
.await
.wrap_err("failed sending transaction after 1024 attempts");

state.set_sequencer_connected(res.is_ok());
state.set_sequencer_connected(check_tx.is_ok());

metrics.record_sequencer_submission_latency(start.elapsed());

res
let check_tx = check_tx?;

ensure!(check_tx.code.is_ok(), "check_tx failed: {}", check_tx.log);

let tx_response = client.wait_for_tx_inclusion(check_tx.hash).await;

ensure!(
tx_response.tx_result.code.is_ok(),
"deliver_tx failed: {}",
tx_response.tx_result.log
);

Ok((check_tx, tx_response))
}

#[instrument(skip_all, err)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,13 @@ use sequencer_client::{
SignedTransaction,
};
use tendermint::{
abci::{
response::CheckTx,
types::ExecTxResult,
},
abci::types::ExecTxResult,
block::Height,
chain,
};
use tendermint_rpc::{
endpoint::{
broadcast::{
tx_commit,
tx_sync,
},
broadcast::tx_sync,
tx,
},
response,
Expand All @@ -46,38 +40,37 @@ use super::test_bridge_withdrawer::{
};

#[must_use]
pub fn make_tx_commit_success_response() -> tx_commit::Response {
tx_commit::Response {
check_tx: CheckTx::default(),
tx_result: ExecTxResult::default(),
pub fn make_tx_sync_success_response() -> tx_sync::Response {
tx_sync::Response {
code: 0.into(),
data: vec![].into(),
log: "tx success".to_string(),
hash: vec![0u8; 32].try_into().unwrap(),
height: Height::default(),
}
}

#[must_use]
pub fn make_tx_commit_check_tx_failure_response() -> tx_commit::Response {
tx_commit::Response {
check_tx: CheckTx {
code: 1.into(),
..CheckTx::default()
},
tx_result: ExecTxResult::default(),
pub fn make_tx_sync_failure_response() -> tx_sync::Response {
tx_sync::Response {
code: 1.into(),
data: vec![].into(),
log: "tx failed".to_string(),
hash: vec![0u8; 32].try_into().unwrap(),
height: Height::default(),
}
}

#[must_use]
pub fn make_tx_commit_deliver_tx_failure_response() -> tx_commit::Response {
tx_commit::Response {
check_tx: CheckTx::default(),
pub fn make_tx_failure_response() -> tx::Response {
tx::Response {
hash: vec![0u8; 32].try_into().unwrap(),
height: Height::default(),
index: 0,
tx_result: ExecTxResult {
code: 1.into(),
..ExecTxResult::default()
},
hash: vec![0u8; 32].try_into().unwrap(),
height: Height::default(),
tx: vec![],
proof: None,
}
}

Expand Down Expand Up @@ -308,28 +301,25 @@ fn prepare_tx_response(response: tx::Response) -> Mock {
.expect(1)
}

pub async fn mount_broadcast_tx_commit_response(
server: &MockServer,
response: tx_commit::Response,
) {
prepare_broadcast_tx_commit_response(response)
pub async fn mount_broadcast_tx_sync_response(server: &MockServer, response: tx_sync::Response) {
prepare_broadcast_tx_sync_response(response)
.mount(server)
.await;
}

pub async fn mount_broadcast_tx_commit_response_as_scoped(
pub async fn mount_broadcast_tx_sync_response_as_scoped(
server: &MockServer,
response: tx_commit::Response,
response: tx_sync::Response,
) -> MockGuard {
prepare_broadcast_tx_commit_response(response)
prepare_broadcast_tx_sync_response(response)
.mount_as_scoped(server)
.await
}

fn prepare_broadcast_tx_commit_response(response: tx_commit::Response) -> Mock {
fn prepare_broadcast_tx_sync_response(response: tx_sync::Response) -> Mock {
let wrapper = response::Wrapper::new_with_id(tendermint_rpc::Id::Num(1), Some(response), None);
Mock::given(body_partial_json(serde_json::json!({
"method": "broadcast_tx_commit"
"method": "broadcast_tx_sync"
})))
.respond_with(
ResponseTemplate::new(200)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ use tracing::{

use super::{
ethereum::AstriaBridgeableERC20DeployerConfig,
make_tx_commit_success_response,
make_tx_sync_success_response,
mock_cometbft::{
mount_default_chain_id,
mount_get_nonce_response,
mount_native_fee_asset,
},
mount_broadcast_tx_commit_response_as_scoped,
mount_broadcast_tx_sync_response_as_scoped,
mount_ibc_fee_asset,
mount_last_bridge_tx_hash_response,
MockSequencerServer,
Expand Down Expand Up @@ -225,12 +225,10 @@ impl TestBridgeWithdrawer {
.await;
}

pub async fn mount_broadcast_tx_commit_success_response_as_scoped(
&self,
) -> wiremock::MockGuard {
mount_broadcast_tx_commit_response_as_scoped(
pub async fn mount_broadcast_tx_sync_success_response_as_scoped(&self) -> wiremock::MockGuard {
mount_broadcast_tx_sync_response_as_scoped(
&self.cometbft_mock,
make_tx_commit_success_response(),
make_tx_sync_success_response(),
)
.await
}
Expand Down
8 changes: 4 additions & 4 deletions crates/astria-bridge-withdrawer/tests/blackbox/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn native_sequencer_withdraw_success() {
.mount_pending_nonce_response(1, "process batch 1")
.await;
let broadcast_guard = test_env
.mount_broadcast_tx_commit_success_response_as_scoped()
.mount_broadcast_tx_sync_success_response_as_scoped()
.await;

// send a native sequencer withdrawal tx to the rollup
Expand Down Expand Up @@ -57,7 +57,7 @@ async fn native_ics20_withdraw_success() {
.mount_pending_nonce_response(1, "process batch 1")
.await;
let broadcast_guard = test_env
.mount_broadcast_tx_commit_success_response_as_scoped()
.mount_broadcast_tx_sync_success_response_as_scoped()
.await;

// send an ics20 withdrawal tx to the rollup
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn erc20_sequencer_withdraw_success() {
.mount_pending_nonce_response(1, "process batch 1")
.await;
let broadcast_guard = test_env
.mount_broadcast_tx_commit_success_response_as_scoped()
.mount_broadcast_tx_sync_success_response_as_scoped()
.await;

// mint some erc20 tokens to the rollup wallet
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn erc20_ics20_withdraw_success() {
.mount_pending_nonce_response(1, "process batch 1")
.await;
let broadcast_guard = test_env
.mount_broadcast_tx_commit_success_response_as_scoped()
.mount_broadcast_tx_sync_success_response_as_scoped()
.await;

// mint some erc20 tokens to the rollup wallet
Expand Down
21 changes: 10 additions & 11 deletions crates/astria-cli/src/commands/bridge/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use astria_core::{
},
};
use astria_sequencer_client::{
tendermint_rpc::endpoint,
tendermint_rpc::endpoint::tx::Response,
Address,
HttpClient,
SequencerClientExt as _,
Expand Down Expand Up @@ -117,7 +117,7 @@ async fn submit_transaction(
prefix: &str,
signing_key: &SigningKey,
actions: Vec<Action>,
) -> eyre::Result<endpoint::broadcast::tx_commit::Response> {
) -> eyre::Result<Response> {
let from_address = Address::builder()
.array(signing_key.verification_key().address_bytes())
.prefix(prefix)
Expand All @@ -138,18 +138,17 @@ async fn submit_transaction(
}
.into_signed(signing_key);
let res = client
.submit_transaction_commit(tx)
.submit_transaction_sync(tx)
.await
.wrap_err("failed to submit transaction")?;

let tx_response = client.wait_for_tx_inclusion(res.hash).await;

ensure!(res.code.is_ok(), "failed to check tx: {}", res.log);
ensure!(
res.check_tx.code.is_ok(),
"failed to check tx: {}",
res.check_tx.log
);
ensure!(
res.tx_result.code.is_ok(),
tx_response.tx_result.code.is_ok(),
"failed to execute tx: {}",
res.tx_result.log
tx_response.tx_result.log
);
Ok(res)
Ok(tx_response)
}
Loading

0 comments on commit 64b9106

Please sign in to comment.