Skip to content

Commit

Permalink
fix(withdrawer): use block subscription in batcher; send to destinati…
Browse files Browse the repository at this point in the history
…on_chain_address (#1157)

## Summary
fixes found during testing:
1. use block subscription in batcher. previously, txs were only sent out
of the batcher when a new event occurred with a higher block number,
which is wrong. it should have been sending upon any new block being
built.
2. set recipient in `BridgeUnlock` to `destination_chain_address`
instead of event `sender`.

## Background
bugs!!

## Changes
- use block subscription in batcher and send event batch upon new block
being received
- set recipient in `BridgeUnlock` to `destination_chain_address` instead
of event `sender`.

## Testing
unit tests
  • Loading branch information
noot authored Jun 5, 2024
1 parent b62563d commit 08cb823
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 14 deletions.
15 changes: 5 additions & 10 deletions crates/astria-bridge-withdrawer/src/withdrawer/ethereum/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use astria_core::{
primitive::v1::{
asset,
asset::Denom,
Address,
},
protocol::transaction::v1alpha1::{
action::{
Expand Down Expand Up @@ -95,7 +94,7 @@ fn event_to_bridge_unlock(
transaction_hash,
};
let action = BridgeUnlockAction {
to: event.sender.to_fixed_bytes().into(),
to: event.destination_chain_address.to_fixed_bytes().into(),
amount: event
.amount
.as_u128()
Expand Down Expand Up @@ -127,11 +126,7 @@ fn event_to_ics20_withdrawal(
// TODO: make this configurable
const ICS20_WITHDRAWAL_TIMEOUT: Duration = Duration::from_secs(300);

let sender: [u8; 20] = event
.sender
.as_bytes()
.try_into()
.expect("U160 must be 20 bytes");
let sender = event.sender.to_fixed_bytes().into();
let denom = rollup_asset_denom.clone();

let (_, channel) = denom
Expand All @@ -152,7 +147,7 @@ fn event_to_ics20_withdrawal(
// returned to the rollup.
// this is only ok for now because addresses on the sequencer and the rollup are both 20
// bytes, but this won't work otherwise.
return_address: Address::from(sender),
return_address: sender,
amount: event
.amount
.as_u128()
Expand Down Expand Up @@ -207,7 +202,7 @@ mod tests {
};

let expected_action = BridgeUnlockAction {
to: [0u8; 20].into(),
to: [1u8; 20].into(),
amount: 99,
memo: serde_json::to_vec(&BridgeUnlockMemo {
block_number: 1.into(),
Expand Down Expand Up @@ -239,7 +234,7 @@ mod tests {
};

let expected_action = BridgeUnlockAction {
to: [0u8; 20].into(),
to: [1u8; 20].into(),
amount: 99,
memo: serde_json::to_vec(&BridgeUnlockMemo {
block_number: 1.into(),
Expand Down
45 changes: 41 additions & 4 deletions crates/astria-bridge-withdrawer/src/withdrawer/ethereum/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use astria_eyre::{
};
use ethers::{
contract::LogMeta,
core::types::Block,
providers::{
Middleware,
Provider,
StreamExt as _,
Ws,
Expand All @@ -26,6 +28,7 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use tracing::{
error,
info,
warn,
};
Expand Down Expand Up @@ -106,7 +109,7 @@ impl Watcher {
.await
.wrap_err("failed to connect to ethereum RPC endpoint")?,
);
let contract = AstriaWithdrawer::new(contract_address, provider);
let contract = AstriaWithdrawer::new(contract_address, provider.clone());

let base_chain_asset_precision = contract
.base_chain_asset_precision()
Expand All @@ -121,6 +124,7 @@ impl Watcher {

let batcher = Batcher::new(
event_rx,
provider,
batch_tx,
&shutdown_token,
fee_asset_id,
Expand Down Expand Up @@ -214,6 +218,7 @@ async fn watch_for_ics20_withdrawal_events(

struct Batcher {
event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>,
provider: Arc<Provider<Ws>>,
batch_tx: mpsc::Sender<Batch>,
shutdown_token: CancellationToken,
fee_asset_id: asset::Id,
Expand All @@ -224,6 +229,7 @@ struct Batcher {
impl Batcher {
pub(crate) fn new(
event_rx: mpsc::Receiver<(WithdrawalEvent, LogMeta)>,
provider: Arc<Provider<Ws>>,
batch_tx: mpsc::Sender<Batch>,
shutdown_token: &CancellationToken,
fee_asset_id: asset::Id,
Expand All @@ -232,17 +238,22 @@ impl Batcher {
) -> Self {
Self {
event_rx,
provider,
batch_tx,
shutdown_token: shutdown_token.clone(),
fee_asset_id,
rollup_asset_denom,
asset_withdrawal_divisor,
}
}
}

impl Batcher {
pub(crate) async fn run(mut self) -> Result<()> {
let mut block_rx = self
.provider
.subscribe_blocks()
.await
.wrap_err("failed to subscribe to blocks")?;

let mut curr_batch = Batch {
actions: Vec::new(),
rollup_height: 0,
Expand All @@ -254,6 +265,32 @@ impl Batcher {
info!("batcher shutting down");
break;
}
block = block_rx.next() => {
if let Some(Block { number, .. }) = block {
let Some(block_number) = number else {
// don't think this should happen
warn!("block number missing; skipping");
continue;
};

if block_number.as_u64() > curr_batch.rollup_height {
if !curr_batch.actions.is_empty() {
self.batch_tx
.send(curr_batch)
.await
.wrap_err("failed to send batched events; receiver dropped?")?;
}

curr_batch = Batch {
actions: Vec::new(),
rollup_height: block_number.as_u64(),
};
}
} else {
error!("block stream closed; shutting down batcher");
break;
}
}
item = self.event_rx.recv() => {
if let Some((event, meta)) = item {
let event_with_metadata = EventWithMetadata {
Expand Down Expand Up @@ -281,7 +318,7 @@ impl Batcher {
};
}
} else {
warn!("event receiver dropped; shutting down batcher");
error!("event receiver dropped; shutting down batcher");
break;
}
}
Expand Down

0 comments on commit 08cb823

Please sign in to comment.