From bda4ffc4a75ebd373aaa2945ac07207ef2bb3e10 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison <190532+Fraser999@users.noreply.github.com> Date: Thu, 26 Sep 2024 13:32:43 +0100 Subject: [PATCH 1/2] chore(sequencer)!: put blocks and deposits to non-verified storage (ENG-812) (#1525) ## Summary This changes where sequencer blocks and the `TracePrefixed` native asset are stored, and also persists `Deposit`s to non-verifiable storage. ## Background Sequencer blocks are currently written to verified storage, which is unnecessary as all of their constituent data is already available there. We want to avoid the cost of storing full blocks in verified storage, but still have the ability to provide a third party with a full block if requested. To further assist with this, deposits will also be written to non-verified storage. It also seemed appropriate to store the `TracePrefixed` native asset in verified storage, since this is something which all validators should reach consensus over. ## Changes - `SequencerBlock`s are now stored in non-verifiable storage. - `Deposit`s are now stored in non-verifiable storage. - The native asset is now stored in verifiable storage. ## Testing Existing unit tests have been updated as required to assert these changes. ## Breaking Changelist - The set of data being written to verified storage has changed, which is a breaking change in terms of on-disk data and generation of state root hashes. Otherwise no APIs have changed. ## Related Issues Closes #1493. --- crates/astria-sequencer/src/api_state_ext.rs | 63 +- crates/astria-sequencer/src/app/mod.rs | 31 +- ...ransaction_with_every_action_snapshot.snap | 59 +- ..._changes__app_finalize_block_snapshot.snap | 59 +- ...reaking_changes__app_genesis_snapshot.snap | 62 +- .../astria-sequencer/src/app/tests_app/mod.rs | 27 +- .../src/app/tests_execute_transaction.rs | 6 +- .../astria-sequencer/src/assets/state_ext.rs | 6 +- .../src/bridge/bridge_lock_action.rs | 5 +- .../astria-sequencer/src/bridge/state_ext.rs | 646 +++--------------- .../src/ibc/ics20_transfer.rs | 30 +- crates/astria-sequencer/src/utils.rs | 11 - 12 files changed, 256 insertions(+), 749 deletions(-) diff --git a/crates/astria-sequencer/src/api_state_ext.rs b/crates/astria-sequencer/src/api_state_ext.rs index c42510b9c..17c6b2545 100644 --- a/crates/astria-sequencer/src/api_state_ext.rs +++ b/crates/astria-sequencer/src/api_state_ext.rs @@ -34,28 +34,28 @@ use cnidarium::{ use prost::Message; use tracing::instrument; -fn block_hash_by_height_key(height: u64) -> String { - format!("blockhash/{height}") +fn block_hash_by_height_key(height: u64) -> Vec { + [b"blockhash/".as_slice(), &height.to_le_bytes()].concat() } -fn sequencer_block_header_by_hash_key(hash: &[u8]) -> String { - format!("blockheader/{}", crate::utils::Hex(hash)) +fn sequencer_block_header_by_hash_key(hash: &[u8]) -> Vec { + [b"blockheader/", hash].concat() } -fn rollup_data_by_hash_and_rollup_id_key(hash: &[u8], rollup_id: &RollupId) -> String { - format!("rollupdata/{}/{}", crate::utils::Hex(hash), rollup_id) +fn rollup_data_by_hash_and_rollup_id_key(hash: &[u8], rollup_id: &RollupId) -> Vec { + [b"rollupdata/", hash, rollup_id.as_ref()].concat() } -fn rollup_ids_by_hash_key(hash: &[u8]) -> String { - format!("rollupids/{}", crate::utils::Hex(hash)) +fn rollup_ids_by_hash_key(hash: &[u8]) -> Vec { + [b"rollupids/", hash].concat() } -fn rollup_transactions_proof_by_hash_key(hash: &[u8]) -> String { - format!("rolluptxsproof/{}", crate::utils::Hex(hash)) +fn rollup_transactions_proof_by_hash_key(hash: &[u8]) -> Vec { + [b"rolluptxsproof/", hash].concat() } -fn rollup_ids_proof_by_hash_key(hash: &[u8]) -> String { - format!("rollupidsproof/{}", crate::utils::Hex(hash)) +fn rollup_ids_proof_by_hash_key(hash: &[u8]) -> Vec { + [b"rollupidsproof/", hash].concat() } #[derive(BorshSerialize, BorshDeserialize)] @@ -139,7 +139,7 @@ pub(crate) trait StateReadExt: StateRead { async fn get_block_hash_by_height(&self, height: u64) -> Result<[u8; 32]> { let key = block_hash_by_height_key(height); let Some(hash) = self - .get_raw(&key) + .nonverifiable_get_raw(&key) .await .map_err(anyhow_to_eyre) .wrap_err("failed to read block hash by height from state")? @@ -160,7 +160,7 @@ pub(crate) trait StateReadExt: StateRead { ) -> Result { let key = sequencer_block_header_by_hash_key(hash); let Some(header_bytes) = self - .get_raw(&key) + .nonverifiable_get_raw(&key) .await .map_err(anyhow_to_eyre) .wrap_err("failed to read raw sequencer block from state")? @@ -179,7 +179,7 @@ pub(crate) trait StateReadExt: StateRead { async fn get_rollup_ids_by_block_hash(&self, hash: &[u8]) -> Result> { let key = rollup_ids_by_hash_key(hash); let Some(rollup_ids_bytes) = self - .get_raw(&key) + .nonverifiable_get_raw(&key) .await .map_err(anyhow_to_eyre) .wrap_err("failed to read rollup IDs by block hash from state")? @@ -195,7 +195,7 @@ pub(crate) trait StateReadExt: StateRead { #[instrument(skip_all)] async fn get_sequencer_block_by_hash(&self, hash: &[u8]) -> Result { let Some(header_bytes) = self - .get_raw(&sequencer_block_header_by_hash_key(hash)) + .nonverifiable_get_raw(&sequencer_block_header_by_hash_key(hash)) .await .map_err(anyhow_to_eyre) .wrap_err("failed to read raw sequencer block from state")? @@ -214,10 +214,11 @@ pub(crate) trait StateReadExt: StateRead { let mut rollup_transactions = Vec::with_capacity(rollup_ids.len()); for id in &rollup_ids { let key = rollup_data_by_hash_and_rollup_id_key(hash, id); - let raw = - self.get_raw(&key).await.map_err(anyhow_to_eyre).wrap_err( - "failed to read rollup data by block hash and rollup ID from state", - )?; + let raw = self + .nonverifiable_get_raw(&key) + .await + .map_err(anyhow_to_eyre) + .context("failed to read rollup data by block hash and rollup ID from state")?; if let Some(raw) = raw { let raw = raw.as_slice(); let rollup_data = raw::RollupTransactions::decode(raw) @@ -227,7 +228,7 @@ pub(crate) trait StateReadExt: StateRead { } let Some(rollup_transactions_proof) = self - .get_raw(&rollup_transactions_proof_by_hash_key(hash)) + .nonverifiable_get_raw(&rollup_transactions_proof_by_hash_key(hash)) .await .map_err(anyhow_to_eyre) .wrap_err("failed to read rollup transactions proof by block hash from state")? @@ -240,7 +241,7 @@ pub(crate) trait StateReadExt: StateRead { .wrap_err("failed to decode rollup transactions proof from raw bytes")?; let Some(rollup_ids_proof) = self - .get_raw(&rollup_ids_proof_by_hash_key(hash)) + .nonverifiable_get_raw(&rollup_ids_proof_by_hash_key(hash)) .await .map_err(anyhow_to_eyre) .wrap_err("failed to read rollup IDs proof by block hash from state")? @@ -284,7 +285,7 @@ pub(crate) trait StateReadExt: StateRead { ) -> Result { let key = rollup_data_by_hash_and_rollup_id_key(hash, rollup_id); let Some(bytes) = self - .get_raw(&key) + .nonverifiable_get_raw(&key) .await .map_err(anyhow_to_eyre) .wrap_err("failed to read rollup data by block hash and rollup ID from state")? @@ -306,7 +307,7 @@ pub(crate) trait StateReadExt: StateRead { hash: &[u8], ) -> Result<(primitiveRaw::Proof, primitiveRaw::Proof)> { let Some(rollup_transactions_proof) = self - .get_raw(&rollup_transactions_proof_by_hash_key(hash)) + .nonverifiable_get_raw(&rollup_transactions_proof_by_hash_key(hash)) .await .map_err(anyhow_to_eyre) .wrap_err("failed to read rollup transactions proof by block hash from state")? @@ -319,7 +320,7 @@ pub(crate) trait StateReadExt: StateRead { .wrap_err("failed to decode rollup transactions proof from raw bytes")?; let Some(rollup_ids_proof) = self - .get_raw(&rollup_ids_proof_by_hash_key(hash)) + .nonverifiable_get_raw(&rollup_ids_proof_by_hash_key(hash)) .await .map_err(anyhow_to_eyre) .wrap_err("failed to read rollup IDs proof by block hash from state")? @@ -348,7 +349,7 @@ pub(crate) trait StateWriteExt: StateWrite { // 6. block hash to rollup IDs proof let key = block_hash_by_height_key(block.height().into()); - self.put_raw(key, block.block_hash().to_vec()); + self.nonverifiable_put_raw(key, block.block_hash().to_vec()); let rollup_ids = block .rollup_transactions() @@ -359,7 +360,7 @@ pub(crate) trait StateWriteExt: StateWrite { let key = rollup_ids_by_hash_key(&block.block_hash()); - self.put_raw( + self.nonverifiable_put_raw( key, borsh::to_vec(&RollupIdSeq(rollup_ids)) .wrap_err("failed to serialize rollup IDs list")?, @@ -374,18 +375,18 @@ pub(crate) trait StateWriteExt: StateWrite { rollup_ids_proof, } = block.into_parts(); let header = header.into_raw(); - self.put_raw(key, header.encode_to_vec()); + self.nonverifiable_put_raw(key, header.encode_to_vec()); for (id, rollup_data) in rollup_transactions { let key = rollup_data_by_hash_and_rollup_id_key(&block_hash, &id); - self.put_raw(key, rollup_data.into_raw().encode_to_vec()); + self.nonverifiable_put_raw(key, rollup_data.into_raw().encode_to_vec()); } let key = rollup_transactions_proof_by_hash_key(&block_hash); - self.put_raw(key, rollup_transactions_proof.into_raw().encode_to_vec()); + self.nonverifiable_put_raw(key, rollup_transactions_proof.into_raw().encode_to_vec()); let key = rollup_ids_proof_by_hash_key(&block_hash); - self.put_raw(key, rollup_ids_proof.into_raw().encode_to_vec()); + self.nonverifiable_put_raw(key, rollup_ids_proof.into_raw().encode_to_vec()); Ok(()) } diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index 66a9e2c5a..acaa7b2b9 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -335,11 +335,7 @@ impl App { self.metrics .record_proposal_transactions(signed_txs_included.len()); - let deposits = self - .state - .get_block_deposits() - .await - .wrap_err("failed to get block deposits in prepare_proposal")?; + let deposits = self.state.get_cached_block_deposits(); self.metrics.record_proposal_deposits(deposits.len()); // generate commitment to sequence::Actions and deposits and commitment to the rollup IDs @@ -444,11 +440,7 @@ impl App { ); self.metrics.record_proposal_transactions(signed_txs.len()); - let deposits = self - .state - .get_block_deposits() - .await - .wrap_err("failed to get block deposits in process_proposal")?; + let deposits = self.state.get_cached_block_deposits(); self.metrics.record_proposal_deposits(deposits.len()); let GeneratedCommitments { @@ -871,21 +863,16 @@ impl App { let end_block = self.end_block(height.value(), sudo_address).await?; - // get and clear block deposits from state + // get deposits for this block from state's ephemeral cache and put them to storage. let mut state_tx = StateDelta::new(self.state.clone()); - let deposits = self - .state - .get_block_deposits() - .await - .wrap_err("failed to get block deposits in end_block")?; - state_tx - .clear_block_deposits() - .await - .wrap_err("failed to clear block deposits")?; + let deposits_in_this_block = self.state.get_cached_block_deposits(); debug!( - deposits = %telemetry::display::json(&deposits), + deposits = %telemetry::display::json(&deposits_in_this_block), "got block deposits from state" ); + state_tx + .put_deposits(&block_hash, deposits_in_this_block.clone()) + .wrap_err("failed to put deposits to state")?; let sequencer_block = SequencerBlock::try_from_block_info_and_data( block_hash, @@ -898,7 +885,7 @@ impl App { .into_iter() .map(std::convert::Into::into) .collect(), - deposits, + deposits_in_this_block, ) .wrap_err("failed to convert block info and data to SequencerBlock")?; state_tx diff --git a/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_execute_transaction_with_every_action_snapshot.snap b/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_execute_transaction_with_every_action_snapshot.snap index 1d2458f1b..52c47f2f6 100644 --- a/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_execute_transaction_with_every_action_snapshot.snap +++ b/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_execute_transaction_with_every_action_snapshot.snap @@ -1,38 +1,39 @@ --- source: crates/astria-sequencer/src/app/tests_breaking_changes.rs +assertion_line: 308 expression: app.app_hash.as_bytes() --- [ - 237, + 67, + 124, + 63, + 240, 228, - 62, - 229, - 45, + 207, + 78, + 64, + 191, + 89, + 84, + 121, + 150, + 21, + 207, + 248, + 173, + 132, 77, - 247, + 126, + 148, + 252, 239, - 251, - 224, - 244, - 97, - 68, - 46, - 184, - 181, - 205, - 86, - 212, - 153, - 66, - 146, - 179, - 120, - 206, - 95, - 76, - 11, - 0, - 184, - 137, - 173 + 104, + 130, + 55, + 201, + 32, + 57, + 167, + 215, + 228 ] diff --git a/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_finalize_block_snapshot.snap b/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_finalize_block_snapshot.snap index 5a4d1cb39..33551b345 100644 --- a/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_finalize_block_snapshot.snap +++ b/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_finalize_block_snapshot.snap @@ -1,38 +1,39 @@ --- source: crates/astria-sequencer/src/app/tests_breaking_changes.rs +assertion_line: 157 expression: app.app_hash.as_bytes() --- [ - 111, - 25, - 76, - 238, - 112, - 77, - 102, - 234, - 8, - 97, - 24, - 100, - 73, - 128, - 228, - 106, - 82, - 255, - 119, - 93, - 248, + 7, 224, - 51, - 239, 115, - 58, - 9, - 149, - 86, - 23, + 113, + 195, + 128, + 219, 248, - 114 + 198, + 108, + 251, + 204, + 202, + 182, + 150, + 203, + 69, + 213, + 169, + 101, + 228, + 90, + 61, + 94, + 59, + 180, + 251, + 59, + 119, + 37, + 42, + 216 ] diff --git a/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_genesis_snapshot.snap b/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_genesis_snapshot.snap index 6df05ffad..f5f80e899 100644 --- a/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_genesis_snapshot.snap +++ b/crates/astria-sequencer/src/app/snapshots/astria_sequencer__app__tests_breaking_changes__app_genesis_snapshot.snap @@ -3,36 +3,36 @@ source: crates/astria-sequencer/src/app/tests_breaking_changes.rs expression: app.app_hash.as_bytes() --- [ - 230, - 203, - 68, - 175, - 166, - 230, - 219, - 247, - 217, - 191, - 121, - 236, - 155, - 53, - 148, - 22, - 12, - 121, - 60, - 22, - 214, - 93, + 7, + 34, + 84, + 104, + 180, + 74, + 207, + 31, + 198, + 255, + 107, + 249, + 25, + 37, + 103, + 202, + 199, + 132, + 141, + 201, + 64, + 172, 26, - 177, - 216, - 135, - 217, - 212, - 93, - 40, - 173, - 94 + 24, + 80, + 182, + 114, + 182, + 189, + 220, + 109, + 211 ] diff --git a/crates/astria-sequencer/src/app/tests_app/mod.rs b/crates/astria-sequencer/src/app/tests_app/mod.rs index 1c0253308..9b41f4473 100644 --- a/crates/astria-sequencer/src/app/tests_app/mod.rs +++ b/crates/astria-sequencer/src/app/tests_app/mod.rs @@ -6,6 +6,7 @@ use astria_core::{ primitive::v1::{ asset::TracePrefixed, RollupId, + TransactionId, }, protocol::{ genesis::v1alpha1::Account, @@ -54,10 +55,7 @@ use crate::{ StateWriteExt as _, ValidatorSet, }, - bridge::{ - StateReadExt as _, - StateWriteExt as _, - }, + bridge::StateWriteExt as _, proposal::commitment::generate_rollup_datas_commitment, state_ext::StateReadExt as _, test_utils::{ @@ -304,6 +302,23 @@ async fn app_create_sequencer_block_with_sequenced_data_and_deposits() { state_tx .put_bridge_account_ibc_asset(bridge_address, nria()) .unwrap(); + // Put a deposit from a previous block to ensure it is not mixed in with deposits for this + // block (it has a different amount and tx ID to the later deposit). + let old_deposit = Deposit { + bridge_address, + rollup_id, + amount: 99, + asset: nria().into(), + destination_chain_address: "nootwashere".to_string(), + source_transaction_id: TransactionId::new([99; 32]), + source_action_index: starting_index_of_action, + }; + state_tx + .put_deposits( + &[32u8; 32], + HashMap::from_iter([(rollup_id, vec![old_deposit])]), + ) + .unwrap(); app.apply(state_tx); app.prepare_commit(storage.clone()).await.unwrap(); app.commit(storage.clone()).await; @@ -361,10 +376,6 @@ async fn app_create_sequencer_block_with_sequenced_data_and_deposits() { .unwrap(); app.commit(storage).await; - // ensure deposits are cleared at the end of the block - let deposit_events = app.state.get_deposit_events(&rollup_id).await.unwrap(); - assert_eq!(deposit_events.len(), 0); - let block = app.state.get_sequencer_block_by_height(1).await.unwrap(); let mut deposits = vec![]; for (_, rollup_data) in block.rollup_transactions() { diff --git a/crates/astria-sequencer/src/app/tests_execute_transaction.rs b/crates/astria-sequencer/src/app/tests_execute_transaction.rs index 23075d001..30a6290ec 100644 --- a/crates/astria-sequencer/src/app/tests_execute_transaction.rs +++ b/crates/astria-sequencer/src/app/tests_execute_transaction.rs @@ -773,7 +773,8 @@ async fn app_execute_transaction_bridge_lock_action_ok() { bridge_before_balance + amount ); - let deposits = app.state.get_deposit_events(&rollup_id).await.unwrap(); + let all_deposits = app.state.get_cached_block_deposits(); + let deposits = all_deposits.get(&rollup_id).unwrap(); assert_eq!(deposits.len(), 1); assert_eq!(deposits[0], expected_deposit); } @@ -1116,7 +1117,8 @@ async fn app_execute_transaction_action_index_correctly_increments() { app.execute_transaction(signed_tx.clone()).await.unwrap(); assert_eq!(app.state.get_account_nonce(alice_address).await.unwrap(), 1); - let deposits = app.state.get_deposit_events(&rollup_id).await.unwrap(); + let all_deposits = app.state.get_cached_block_deposits(); + let deposits = all_deposits.get(&rollup_id).unwrap(); assert_eq!(deposits.len(), 2); assert_eq!(deposits[0].source_action_index, starting_index_of_action); assert_eq!( diff --git a/crates/astria-sequencer/src/assets/state_ext.rs b/crates/astria-sequencer/src/assets/state_ext.rs index 0b281544e..11075bb0d 100644 --- a/crates/astria-sequencer/src/assets/state_ext.rs +++ b/crates/astria-sequencer/src/assets/state_ext.rs @@ -30,7 +30,7 @@ struct DenominationTrace(String); const BLOCK_FEES_PREFIX: &str = "block_fees/"; const FEE_ASSET_PREFIX: &str = "fee_asset/"; -const NATIVE_ASSET_KEY: &[u8] = b"nativeasset"; +const NATIVE_ASSET_KEY: &str = "nativeasset"; fn asset_storage_key>(asset: TAsset) -> String { format!("asset/{}", crate::storage_keys::hunks::Asset::from(asset)) @@ -71,7 +71,7 @@ pub(crate) trait StateReadExt: StateRead { #[instrument(skip_all)] async fn get_native_asset(&self) -> Result { let Some(bytes) = self - .nonverifiable_get_raw(NATIVE_ASSET_KEY) + .get_raw(NATIVE_ASSET_KEY) .await .map_err(anyhow_to_eyre) .wrap_err("failed to read raw native asset from state")? @@ -193,7 +193,7 @@ impl StateReadExt for T {} pub(crate) trait StateWriteExt: StateWrite { #[instrument(skip_all)] fn put_native_asset(&mut self, asset: &asset::TracePrefixed) { - self.nonverifiable_put_raw(NATIVE_ASSET_KEY.to_vec(), asset.to_string().into_bytes()); + self.put_raw(NATIVE_ASSET_KEY.to_string(), asset.to_string().into_bytes()); } #[instrument(skip_all)] diff --git a/crates/astria-sequencer/src/bridge/bridge_lock_action.rs b/crates/astria-sequencer/src/bridge/bridge_lock_action.rs index 670f05184..6e9279f49 100644 --- a/crates/astria-sequencer/src/bridge/bridge_lock_action.rs +++ b/crates/astria-sequencer/src/bridge/bridge_lock_action.rs @@ -142,10 +142,7 @@ impl ActionHandler for BridgeLockAction { .wrap_err("failed to deduct fee from account balance")?; state.record(deposit_abci_event); - state - .put_deposit_event(deposit) - .await - .wrap_err("failed to put deposit event into state")?; + state.cache_deposit_event(deposit); Ok(()) } } diff --git a/crates/astria-sequencer/src/bridge/state_ext.rs b/crates/astria-sequencer/src/bridge/state_ext.rs index 1a3b15c76..8c80d63a4 100644 --- a/crates/astria-sequencer/src/bridge/state_ext.rs +++ b/crates/astria-sequencer/src/bridge/state_ext.rs @@ -1,7 +1,4 @@ -use std::collections::{ - HashMap, - HashSet, -}; +use std::collections::HashMap; use astria_core::{ generated::sequencerblock::v1alpha1::Deposit as RawDeposit, @@ -33,8 +30,6 @@ use cnidarium::{ StateRead, StateWrite, }; -use futures::StreamExt as _; -use hex::ToHex as _; use prost::Message as _; use tracing::{ debug, @@ -62,10 +57,22 @@ struct AssetId([u8; 32]); #[derive(BorshSerialize, BorshDeserialize, Debug)] struct Fee(u128); +/// A wrapper to support storing a `Vec`. +/// +/// We don't currently have Borsh-encoding for `Deposit` and we also don't have a standalone +/// protobuf type representing a collection of `Deposit`s. +/// +/// This will be replaced (very soon hopefully) by a proper storage type able to be wholly Borsh- +/// encoded. Until then, we'll protobuf-encode the individual deposits and this is a collection of +/// those encoded values. +#[derive(BorshSerialize, BorshDeserialize, Debug)] +struct Deposits(Vec>); + const BRIDGE_ACCOUNT_PREFIX: &str = "bridgeacc"; const BRIDGE_ACCOUNT_SUDO_PREFIX: &str = "bsudo"; const BRIDGE_ACCOUNT_WITHDRAWER_PREFIX: &str = "bwithdrawer"; -const DEPOSIT_PREFIX: &str = "deposit"; +const DEPOSITS_EPHEMERAL_KEY: &str = "deposits"; +const DEPOSIT_PREFIX: &[u8] = b"deposit/"; const INIT_BRIDGE_ACCOUNT_BASE_FEE_STORAGE_KEY: &str = "initbridgeaccfee"; const BRIDGE_LOCK_BYTE_COST_MULTIPLIER_STORAGE_KEY: &str = "bridgelockmultiplier"; const BRIDGE_SUDO_CHANGE_FEE_STORAGE_KEY: &str = "bridgesudofee"; @@ -109,16 +116,8 @@ fn asset_id_storage_key(address: &T) -> String { ) } -fn deposit_storage_key_prefix(rollup_id: &RollupId) -> String { - format!("{DEPOSIT_PREFIX}/{}", rollup_id.encode_hex::()) -} - -fn deposit_storage_key(rollup_id: &RollupId, nonce: u32) -> Vec { - format!("{}/{}", deposit_storage_key_prefix(rollup_id), nonce).into() -} - -fn deposit_nonce_storage_key(rollup_id: &RollupId) -> Vec { - format!("depositnonce/{}", rollup_id.encode_hex::()).into() +fn deposit_storage_key(block_hash: &[u8; 32], rollup_id: &RollupId) -> Vec { + [DEPOSIT_PREFIX, block_hash, rollup_id.as_ref()].concat() } fn bridge_account_sudo_address_storage_key(address: &T) -> String { @@ -266,76 +265,37 @@ pub(crate) trait StateReadExt: StateRead + address::StateReadExt { } #[instrument(skip_all)] - async fn get_deposit_nonce(&self, rollup_id: &RollupId) -> Result { - let bytes = self - .nonverifiable_get_raw(&deposit_nonce_storage_key(rollup_id)) + fn get_cached_block_deposits(&self) -> HashMap> { + self.object_get(DEPOSITS_EPHEMERAL_KEY).unwrap_or_default() + } + + #[instrument(skip_all)] + async fn get_deposits( + &self, + block_hash: &[u8; 32], + rollup_id: &RollupId, + ) -> Result> { + let Some(bytes) = self + .nonverifiable_get_raw(&deposit_storage_key(block_hash, rollup_id)) .await .map_err(anyhow_to_eyre) - .wrap_err("failed reading raw deposit nonce from state")?; - let Some(bytes) = bytes else { - // no deposits for this rollup id yet; return 0 - return Ok(0); + .wrap_err("failed reading raw deposits from state")? + else { + return Ok(vec![]); }; - let Nonce(nonce) = - Nonce(u32::from_be_bytes(bytes.try_into().expect( - "all deposit nonces stored should be 4 bytes; this is a bug", - ))); - Ok(nonce) - } - - #[instrument(skip_all)] - async fn get_deposit_rollup_ids(&self) -> Result> { - let mut stream = std::pin::pin!(self.nonverifiable_prefix_raw(DEPOSIT_PREFIX.as_bytes())); - let mut rollup_ids = HashSet::new(); - while let Some(Ok((key, _))) = stream.next().await { - // the deposit key is of the form "deposit/{rollup_id}/{nonce}" - let key_str = - String::from_utf8(key).wrap_err("failed to convert deposit key to string")?; - let key_parts = key_str.split('/').collect::>(); - if key_parts.len() != 3 { - continue; - } - let rollup_id_bytes = - hex::decode(key_parts[1]).wrap_err("invalid rollup ID hex string")?; - let rollup_id = - RollupId::try_from_slice(&rollup_id_bytes).wrap_err("invalid rollup ID bytes")?; - rollup_ids.insert(rollup_id); - } - Ok(rollup_ids) - } + let pb_deposits = borsh::from_slice::(&bytes) + .wrap_err("failed to reconstruct protobuf deposits from storage")?; - #[instrument(skip_all)] - async fn get_deposit_events(&self, rollup_id: &RollupId) -> Result> { - let mut stream = std::pin::pin!( - self.nonverifiable_prefix_raw(deposit_storage_key_prefix(rollup_id).as_bytes()) - ); - let mut deposits = Vec::new(); - while let Some(Ok((_, value))) = stream.next().await { - let raw = RawDeposit::decode(value.as_ref()).wrap_err("invalid deposit bytes")?; + let mut deposits = Vec::with_capacity(pb_deposits.0.len()); + for pb_deposit in pb_deposits.0 { + let raw = RawDeposit::decode(pb_deposit.as_ref()).wrap_err("invalid deposit bytes")?; let deposit = Deposit::try_from_raw(raw).wrap_err("invalid deposit raw proto")?; deposits.push(deposit); } Ok(deposits) } - #[instrument(skip_all)] - async fn get_block_deposits(&self) -> Result>> { - let deposit_rollup_ids = self - .get_deposit_rollup_ids() - .await - .wrap_err("failed to get deposit rollup IDs")?; - let mut deposit_events = HashMap::new(); - for rollup_id in deposit_rollup_ids { - let rollup_deposit_events = self - .get_deposit_events(&rollup_id) - .await - .wrap_err("failed to get deposit events")?; - deposit_events.insert(rollup_id, rollup_deposit_events); - } - Ok(deposit_events) - } - #[instrument(skip_all)] async fn get_init_bridge_account_base_fee(&self) -> Result { let bytes = self @@ -483,52 +443,33 @@ pub(crate) trait StateWriteExt: StateWrite { Ok(()) } - // the deposit "nonce" for a given rollup ID during a given block. - // this is only used to generate storage keys for each of the deposits within a block, - // and is reset to 0 at the beginning of each block. + /// Push the deposit onto the end of a Vec of deposits for this rollup ID. These are held in + /// state's ephemeral store, pending being written to permanent storage during `finalize_block`. #[instrument(skip_all)] - fn put_deposit_nonce(&mut self, rollup_id: &RollupId, nonce: u32) { - self.nonverifiable_put_raw( - deposit_nonce_storage_key(rollup_id), - nonce.to_be_bytes().to_vec(), - ); - } - - // allow: false positive due to proc macro; fixed with rust/clippy 1.81 - #[allow(clippy::blocks_in_conditions)] - #[instrument(skip_all, err)] - async fn put_deposit_event(&mut self, deposit: Deposit) -> Result<()> { - let nonce = self.get_deposit_nonce(&deposit.rollup_id).await?; - self.put_deposit_nonce( - &deposit.rollup_id, - nonce.checked_add(1).ok_or_eyre("nonce overflowed")?, - ); - - let key = deposit_storage_key(&deposit.rollup_id, nonce); - self.nonverifiable_put_raw(key, deposit.into_raw().encode_to_vec()); - Ok(()) - } - - // clears the deposit nonce and all deposits for for a given rollup ID. - #[instrument(skip_all)] - async fn clear_deposit_info(&mut self, rollup_id: &RollupId) { - self.nonverifiable_delete(deposit_nonce_storage_key(rollup_id)); - let mut stream = std::pin::pin!( - self.nonverifiable_prefix_raw(deposit_storage_key_prefix(rollup_id).as_bytes()) - ); - while let Some(Ok((key, _))) = stream.next().await { - self.nonverifiable_delete(key); - } + fn cache_deposit_event(&mut self, deposit: Deposit) { + let mut cached_deposits = self.get_cached_block_deposits(); + cached_deposits + .entry(deposit.rollup_id) + .or_default() + .push(deposit); + self.object_put(DEPOSITS_EPHEMERAL_KEY, cached_deposits); } #[instrument(skip_all)] - async fn clear_block_deposits(&mut self) -> Result<()> { - let deposit_rollup_ids = self - .get_deposit_rollup_ids() - .await - .wrap_err("failed to get deposit rollup ids")?; - for rollup_id in deposit_rollup_ids { - self.clear_deposit_info(&rollup_id).await; + fn put_deposits( + &mut self, + block_hash: &[u8; 32], + all_deposits: HashMap>, + ) -> Result<()> { + for (rollup_id, deposits) in all_deposits { + let key = deposit_storage_key(block_hash, &rollup_id); + let serialized_deposits = deposits + .into_iter() + .map(|deposit| deposit.into_raw().encode_to_vec()) + .collect(); + let value = borsh::to_vec(&Deposits(serialized_deposits)) + .wrap_err("failed to serialize deposits")?; + self.nonverifiable_put_raw(key, value); } Ok(()) } @@ -586,14 +527,7 @@ mod test { use cnidarium::StateDelta; use insta::assert_snapshot; - use super::{ - asset_id_storage_key, - bridge_account_sudo_address_storage_key, - bridge_account_withdrawer_address_storage_key, - rollup_id_storage_key, - StateReadExt as _, - StateWriteExt as _, - }; + use super::*; use crate::test_utils::astria_address; fn asset_0() -> asset::Denom { @@ -759,90 +693,18 @@ mod test { } #[tokio::test] - async fn get_deposit_nonce_uninitialized_ok() { - let storage = cnidarium::TempStorage::new().await.unwrap(); - let snapshot = storage.latest_snapshot(); - let state = StateDelta::new(snapshot); - - let rollup_id = RollupId::new([2u8; 32]); - - // uninitialized ok - assert_eq!( - state - .get_deposit_nonce(&rollup_id) - .await - .expect("call to get deposit nonce should not fail on uninitialized rollup ids"), - 0u32, - "uninitialized rollup id nonce should be zero" - ); - } - - #[tokio::test] - async fn put_deposit_nonce() { - let storage = cnidarium::TempStorage::new().await.unwrap(); - let snapshot = storage.latest_snapshot(); - let mut state = StateDelta::new(snapshot); - - let rollup_id = RollupId::new([2u8; 32]); - let mut nonce = 1u32; - - // can write - state.put_deposit_nonce(&rollup_id, nonce); - assert_eq!( - state - .get_deposit_nonce(&rollup_id) - .await - .expect("a rollup id nonce was written and must exist inside the database"), - nonce, - "stored nonce did not match expected" - ); - - // can update - nonce = 2u32; - state.put_deposit_nonce(&rollup_id, nonce); - assert_eq!( - state - .get_deposit_nonce(&rollup_id) - .await - .expect("a rollup id nonce was written and must exist inside the database"), - nonce, - "stored nonce did not match expected" - ); - - // writing to different account is ok - let rollup_id_1 = RollupId::new([3u8; 32]); - let nonce_1 = 3u32; - state.put_deposit_nonce(&rollup_id_1, nonce_1); - assert_eq!( - state - .get_deposit_nonce(&rollup_id_1) - .await - .expect("a rollup id nonce was written and must exist inside the database"), - nonce_1, - "additional stored nonce did not match expected" - ); - assert_eq!( - state - .get_deposit_nonce(&rollup_id) - .await - .expect("a rollup id nonce was written and must exist inside the database"), - nonce, - "original stored nonce did not match expected" - ); - } - - #[tokio::test] - async fn get_deposit_events_empty_ok() { + async fn get_deposits_empty_ok() { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); let state = StateDelta::new(snapshot); + let block_hash = [32; 32]; let rollup_id = RollupId::new([2u8; 32]); // no events ok assert_eq!( state - .get_deposit_events(&rollup_id) + .get_deposits(&block_hash, &rollup_id) .await .expect("call for rollup id with no deposit events should not fail"), vec![], @@ -852,12 +714,13 @@ mod test { #[tokio::test] #[allow(clippy::too_many_lines)] // allow: it's a test - async fn get_deposit_events() { + async fn get_deposits() { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); let mut state = StateDelta::new(snapshot); - let rollup_id = RollupId::new([1u8; 32]); + let block_hash = [32; 32]; + let rollup_id_1 = RollupId::new([1u8; 32]); let bridge_address = astria_address(&[42u8; 20]); let amount = 10u128; let asset = asset_0(); @@ -865,7 +728,7 @@ mod test { let mut deposit = Deposit { bridge_address, - rollup_id, + rollup_id: rollup_id_1, amount, asset: asset.clone(), destination_chain_address: destination_chain_address.to_string(), @@ -873,30 +736,22 @@ mod test { source_action_index: 0, }; - let mut deposits = vec![deposit.clone()]; + let mut all_deposits = HashMap::new(); + let mut rollup_1_deposits = vec![deposit.clone()]; + all_deposits.insert(rollup_id_1, rollup_1_deposits.clone()); // can write state - .put_deposit_event(deposit.clone()) - .await - .expect("writing deposit events should be ok"); + .put_deposits(&block_hash, all_deposits.clone()) + .unwrap(); assert_eq!( state - .get_deposit_events(&rollup_id) + .get_deposits(&block_hash, &rollup_id_1) .await .expect("deposit info was written to the database and must exist"), - deposits, + rollup_1_deposits, "stored deposits do not match what was expected" ); - // nonce is correct - assert_eq!( - state - .get_deposit_nonce(&rollup_id) - .await - .expect("calls to get nonce should not fail"), - 1u32, - "nonce was consumed and should've been incremented" - ); // can write additional deposit = Deposit { @@ -904,366 +759,47 @@ mod test { source_action_index: 1, ..deposit }; - deposits.append(&mut vec![deposit.clone()]); + rollup_1_deposits.push(deposit.clone()); + all_deposits.insert(rollup_id_1, rollup_1_deposits.clone()); state - .put_deposit_event(deposit.clone()) - .await - .expect("writing deposit events should be ok"); - let mut returned_deposits = state - .get_deposit_events(&rollup_id) - .await - .expect("deposit info was written to the database and must exist"); - returned_deposits.sort_by_key(|d| d.amount); - deposits.sort_by_key(|d| d.amount); - assert_eq!( - returned_deposits, deposits, - "stored deposits do not match what was expected" - ); - // nonce is correct + .put_deposits(&block_hash, all_deposits.clone()) + .unwrap(); assert_eq!( state - .get_deposit_nonce(&rollup_id) + .get_deposits(&block_hash, &rollup_id_1) .await - .expect("calls to get nonce should not fail"), - 2u32, - "nonce was consumed and should've been incremented" + .expect("deposit info was written to the database and must exist"), + rollup_1_deposits, + "stored deposits do not match what was expected" ); // can write different rollup id and both ok - let rollup_id_1 = RollupId::new([2u8; 32]); + let rollup_id_2 = RollupId::new([2u8; 32]); deposit = Deposit { - rollup_id: rollup_id_1, + rollup_id: rollup_id_2, source_action_index: 2, ..deposit }; - let deposits_1 = vec![deposit.clone()]; - state - .put_deposit_event(deposit) - .await - .expect("writing deposit events should be ok"); + let rollup_2_deposits = vec![deposit.clone()]; + all_deposits.insert(rollup_id_2, rollup_2_deposits.clone()); + state.put_deposits(&block_hash, all_deposits).unwrap(); assert_eq!( state - .get_deposit_events(&rollup_id_1) + .get_deposits(&block_hash, &rollup_id_2) .await .expect("deposit info was written to the database and must exist"), - deposits_1, + rollup_2_deposits, "stored deposits do not match what was expected" ); // verify original still ok - returned_deposits = state - .get_deposit_events(&rollup_id) - .await - .expect("deposit info was written to the database and must exist"); - returned_deposits.sort_by_key(|d| d.amount); - assert_eq!( - returned_deposits, deposits, - "stored deposits do not match what was expected" - ); - } - - #[tokio::test] - async fn get_deposit_rollup_ids() { - let storage = cnidarium::TempStorage::new().await.unwrap(); - let snapshot = storage.latest_snapshot(); - let mut state = StateDelta::new(snapshot); - - let rollup_id_0 = RollupId::new([1u8; 32]); - let bridge_address = astria_address(&[42u8; 20]); - let amount = 10u128; - let asset = asset_0(); - let destination_chain_address = "0xdeadbeef"; - - let mut deposit = Deposit { - bridge_address, - rollup_id: rollup_id_0, - amount, - asset: asset.clone(), - destination_chain_address: destination_chain_address.to_string(), - source_transaction_id: TransactionId::new([0; 32]), - source_action_index: 0, - }; - - // write same rollup id twice - state - .put_deposit_event(deposit.clone()) - .await - .expect("writing deposit events should be ok"); - - // writing to same rollup id does not create duplicates - state - .put_deposit_event(deposit.clone()) - .await - .expect("writing deposit events should be ok"); - - // writing additional different rollup id - let rollup_id_1 = RollupId::new([2u8; 32]); - deposit = Deposit { - rollup_id: rollup_id_1, - source_action_index: 1, - ..deposit - }; - state - .put_deposit_event(deposit) - .await - .expect("writing deposit events should be ok"); - // ensure only two rollup ids are in system - let rollups = state - .get_deposit_rollup_ids() - .await - .expect("deposit info was written rollup ids should still be in database"); - assert_eq!(rollups.len(), 2, "only two rollup ids should exits"); - assert!( - rollups.contains(&rollup_id_0), - "deposit data was written for rollup and it should exist" - ); - assert!( - rollups.contains(&rollup_id_1), - "deposit data was written for rollup and it should exist" - ); - } - - #[tokio::test] - async fn clear_deposit_info_uninitialized_ok() { - let storage = cnidarium::TempStorage::new().await.unwrap(); - let snapshot = storage.latest_snapshot(); - let mut state = StateDelta::new(snapshot); - - let rollup_id = RollupId::new([1u8; 32]); - // uninitialized delete ok - state.clear_deposit_info(&rollup_id).await; - } - - #[tokio::test] - async fn clear_deposit_info() { - let storage = cnidarium::TempStorage::new().await.unwrap(); - let snapshot = storage.latest_snapshot(); - let mut state = StateDelta::new(snapshot); - - let rollup_id = RollupId::new([1u8; 32]); - let bridge_address = astria_address(&[42u8; 20]); - let amount = 10u128; - let asset = asset_0(); - let destination_chain_address = "0xdeadbeef"; - - let deposit = Deposit { - bridge_address, - rollup_id, - amount, - asset: asset.clone(), - destination_chain_address: destination_chain_address.to_string(), - source_transaction_id: TransactionId::new([0; 32]), - source_action_index: 0, - }; - - let deposits = vec![deposit.clone()]; - - // can write - state - .put_deposit_event(deposit) - .await - .expect("writing deposit events should be ok"); assert_eq!( state - .get_deposit_events(&rollup_id) + .get_deposits(&block_hash, &rollup_id_1) .await .expect("deposit info was written to the database and must exist"), - deposits, + rollup_1_deposits, "stored deposits do not match what was expected" ); - - // can delete - state.clear_deposit_info(&rollup_id).await; - assert_eq!( - state - .get_deposit_events(&rollup_id) - .await - .expect("deposit should return empty when none exists"), - vec![], - "deposits were cleared and should return empty vector" - ); - assert_eq!( - state - .get_deposit_nonce(&rollup_id) - .await - .expect("calls to get nonce should not fail"), - 0u32, - "nonce should have been deleted also" - ); - } - - #[tokio::test] - async fn clear_deposit_info_multiple_accounts() { - let storage = cnidarium::TempStorage::new().await.unwrap(); - let snapshot = storage.latest_snapshot(); - let mut state = StateDelta::new(snapshot); - - let rollup_id = RollupId::new([1u8; 32]); - let bridge_address = astria_address(&[42u8; 20]); - let amount = 10u128; - let asset = asset_0(); - let destination_chain_address = "0xdeadbeef"; - let mut deposit = Deposit { - bridge_address, - rollup_id, - amount, - asset: asset.clone(), - destination_chain_address: destination_chain_address.to_string(), - source_transaction_id: TransactionId::new([0; 32]), - source_action_index: 0, - }; - - // write to first - state - .put_deposit_event(deposit) - .await - .expect("writing deposit events should be ok"); - - // write to second - let rollup_id_1 = RollupId::new([2u8; 32]); - deposit = Deposit { - bridge_address, - rollup_id: rollup_id_1, - amount, - asset: asset.clone(), - destination_chain_address: destination_chain_address.to_string(), - source_transaction_id: TransactionId::new([0; 32]), - source_action_index: 1, - }; - let deposits_1 = vec![deposit.clone()]; - - state - .put_deposit_event(deposit) - .await - .expect("writing deposit events for rollup 2 should be ok"); - - // delete first rollup's info - state.clear_deposit_info(&rollup_id).await; - assert_eq!( - state - .get_deposit_events(&rollup_id) - .await - .expect("deposit should return empty when none exists"), - vec![], - "deposits were cleared and should return empty vector" - ); - assert_eq!( - state - .get_deposit_nonce(&rollup_id) - .await - .expect("calls to get nonce should not fail"), - 0u32, - "nonce should have been deleted also" - ); - - // second rollup's info should be intact - assert_eq!( - state - .get_deposit_events(&rollup_id_1) - .await - .expect("deposit should return empty when none exists"), - deposits_1, - "deposits were written to the database and should exist" - ); - assert_eq!( - state - .get_deposit_nonce(&rollup_id_1) - .await - .expect("calls to get nonce should not fail"), - 1u32, - "nonce was written to the database and should exist" - ); - } - - #[tokio::test] - async fn clear_block_info_uninitialized_ok() { - let storage = cnidarium::TempStorage::new().await.unwrap(); - let snapshot = storage.latest_snapshot(); - let mut state = StateDelta::new(snapshot); - - // uninitialized delete ok - state - .clear_block_deposits() - .await - .expect("calls to clear block deposit should succeed"); - } - - #[tokio::test] - async fn clear_block_deposits() { - let storage = cnidarium::TempStorage::new().await.unwrap(); - let snapshot = storage.latest_snapshot(); - let mut state = StateDelta::new(snapshot); - - let rollup_id = RollupId::new([1u8; 32]); - let bridge_address = astria_address(&[42u8; 20]); - let amount = 10u128; - let asset = asset_0(); - let destination_chain_address = "0xdeadbeef"; - let mut deposit = Deposit { - bridge_address, - rollup_id, - amount, - asset: asset.clone(), - destination_chain_address: destination_chain_address.to_string(), - source_transaction_id: TransactionId::new([0; 32]), - source_action_index: 0, - }; - - // write to first - state - .put_deposit_event(deposit.clone()) - .await - .expect("writing deposit events should be ok"); - - // write to second - let rollup_id_1 = RollupId::new([2u8; 32]); - deposit = Deposit { - rollup_id: rollup_id_1, - source_action_index: 1, - ..deposit - }; - state - .put_deposit_event(deposit.clone()) - .await - .expect("writing deposit events for rollup 2 should be ok"); - - // delete all info - state - .clear_block_deposits() - .await - .expect("clearing deposits call should not fail"); - assert_eq!( - state - .get_deposit_events(&rollup_id) - .await - .expect("deposit should return empty when none exists"), - vec![], - "deposits were cleared and should return empty vector" - ); - // check that all info was deleted - assert_eq!( - state - .get_deposit_events(&rollup_id_1) - .await - .expect("deposit should return empty when none exists"), - vec![], - "deposits were cleared and should return empty vector" - ); - assert_eq!( - state - .get_deposit_nonce(&rollup_id) - .await - .expect("deposit should return empty when none exists"), - 0u32, - "nonce should have been deleted also" - ); - assert_eq!( - state - .get_deposit_nonce(&rollup_id_1) - .await - .expect("deposit should return empty when none exists"), - 0u32, - "nonce should have been deleted also" - ); } #[test] diff --git a/crates/astria-sequencer/src/ibc/ics20_transfer.rs b/crates/astria-sequencer/src/ibc/ics20_transfer.rs index 65201c5ca..67126d817 100644 --- a/crates/astria-sequencer/src/ibc/ics20_transfer.rs +++ b/crates/astria-sequencer/src/ibc/ics20_transfer.rs @@ -739,10 +739,7 @@ async fn emit_deposit( }; let deposit_abci_event = create_deposit_event(&deposit); state.record(deposit_abci_event); - state - .put_deposit_event(deposit) - .await - .wrap_err("failed to put deposit event into state")?; + state.cache_deposit_event(deposit); Ok(()) } @@ -968,10 +965,7 @@ mod tests { ); assert_eq!(balance, 100); - let deposits = state_tx - .get_block_deposits() - .await - .expect("a deposit should exist as a result of the transfer to a bridge account"); + let deposits = state_tx.get_cached_block_deposits(); assert_eq!(deposits.len(), 1); let expected_deposit = Deposit { @@ -1049,10 +1043,7 @@ mod tests { .expect("receipt of funds to a rollup should have updated funds in the bridge account"); assert_eq!(balance, amount); - let deposits = state_tx - .get_block_deposits() - .await - .expect("a deposit should exist as a result of the transfer to a bridge account"); + let deposits = state_tx.get_cached_block_deposits(); assert_eq!(deposits.len(), 1); let expected_deposit = Deposit { @@ -1290,10 +1281,7 @@ mod tests { .expect("rollup withdrawal refund should have updated funds in the bridge address"); assert_eq!(balance, amount); - let deposit = state_tx - .get_block_deposits() - .await - .expect("a deposit should exist as a result of the rollup withdrawal refund"); + let deposit = state_tx.get_cached_block_deposits(); let expected_deposit = Deposit { bridge_address, @@ -1366,10 +1354,7 @@ mod tests { .expect("refunds of rollup withdrawals should be credited to the bridge account"); assert_eq!(balance, amount); - let deposits = state_tx - .get_block_deposits() - .await - .expect("a deposit should exist as a result of the rollup withdrawal refund"); + let deposits = state_tx.get_cached_block_deposits(); let deposit = deposits .get(&rollup_id) @@ -1450,10 +1435,7 @@ mod tests { .expect("refunding a rollup should add the tokens to its bridge address"); assert_eq!(balance, amount); - let deposits = state_tx - .get_block_deposits() - .await - .expect("a deposit should exist as a result of the rollup withdrawal refund"); + let deposits = state_tx.get_cached_block_deposits(); assert_eq!(deposits.len(), 1); let deposit = deposits.get(&rollup_id).unwrap().first().unwrap(); diff --git a/crates/astria-sequencer/src/utils.rs b/crates/astria-sequencer/src/utils.rs index 3851f6732..4f96fddca 100644 --- a/crates/astria-sequencer/src/utils.rs +++ b/crates/astria-sequencer/src/utils.rs @@ -13,17 +13,6 @@ use tendermint::abci::{ EventAttributeIndexExt as _, }; -pub(crate) struct Hex<'a>(pub(crate) &'a [u8]); - -impl<'a> std::fmt::Display for Hex<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for byte in self.0 { - f.write_fmt(format_args!("{byte:02x}"))?; - } - Ok(()) - } -} - pub(crate) fn cometbft_to_sequencer_validator( value: tendermint::validator::Update, ) -> Result { From 33dae4256f7f6d1f936f9e0de166695be1bf416d Mon Sep 17 00:00:00 2001 From: Ethan Oroshiba Date: Thu, 26 Sep 2024 09:07:05 -0500 Subject: [PATCH 2/2] fix(conductor): fix flaky soft_and_firm test (#1472) ## Summary Fixed flakiness in conductor `soft_and_firm::simple` test. ## Background Test would previously fail ~6% of the time due to receiving firm block before soft block, hence never fulfilling the soft update commitment mock. ## Changes - Split `simple()` into 2 tests: 1. Tests for receiving soft block first, and then firm block (and enforcing this order). 2. Tests for receiving firm block first, then ignoring the later soft block response at the same height. The tests for receiving the soft block at the next height. - made changes to `astria-grpc` to enable a delayed response ## Testing All tests passing. ## Related Issues closes #1143 --- .../tests/blackbox/helpers/macros.rs | 76 +++++-- .../tests/blackbox/helpers/mod.rs | 43 ++-- .../tests/blackbox/soft_and_firm.rs | 215 ++++++++++++++++-- crates/astria-grpc-mock/src/mock.rs | 12 +- crates/astria-grpc-mock/src/mock_server.rs | 11 +- crates/astria-grpc-mock/src/mock_set.rs | 18 +- crates/astria-grpc-mock/src/mounted_mock.rs | 14 +- crates/astria-grpc-mock/src/response.rs | 99 +++++--- 8 files changed, 375 insertions(+), 113 deletions(-) diff --git a/crates/astria-conductor/tests/blackbox/helpers/macros.rs b/crates/astria-conductor/tests/blackbox/helpers/macros.rs index df3520530..b416db47f 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/macros.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/macros.rs @@ -115,6 +115,20 @@ macro_rules! mount_celestia_blobs { celestia_height: $celestia_height:expr, sequencer_heights: [ $($sequencer_height:expr),+ ] $(,)? + ) => { + mount_celestia_blobs!( + $test_env, + celestia_height: $celestia_height, + sequencer_heights: [ $($sequencer_height),+ ], + delay: None, + ) + }; + ( + $test_env:ident, + celestia_height: $celestia_height:expr, + sequencer_heights: [ $($sequencer_height:expr),+ ], + delay: $delay:expr + $(,)? ) => {{ let blobs = $crate::helpers::make_blobs(&[ $( $sequencer_height ),+ ]); $test_env @@ -122,6 +136,7 @@ macro_rules! mount_celestia_blobs { $celestia_height, $crate::sequencer_namespace(), vec![blobs.header], + $delay, ) .await; $test_env @@ -129,6 +144,7 @@ macro_rules! mount_celestia_blobs { $celestia_height, $crate::rollup_namespace(), vec![blobs.rollup], + $delay, ) .await }}; @@ -177,6 +193,22 @@ macro_rules! mount_get_commitment_state { #[macro_export] macro_rules! mount_update_commitment_state { + ( + $test_env:ident, + firm: ( number: $firm_number:expr, hash: $firm_hash:expr, parent: $firm_parent:expr$(,)? ), + soft: ( number: $soft_number:expr, hash: $soft_hash:expr, parent: $soft_parent:expr$(,)? ), + base_celestia_height: $base_celestia_height:expr + $(,)? + ) => { + mount_update_commitment_state!( + $test_env, + mock_name: None, + firm: ( number: $firm_number, hash: $firm_hash, parent: $firm_parent, ), + soft: ( number: $soft_number, hash: $soft_hash, parent: $soft_parent, ), + base_celestia_height: $base_celestia_height, + expected_calls: 1, + ) + }; ( $test_env:ident, mock_name: $mock_name:expr, @@ -184,6 +216,24 @@ macro_rules! mount_update_commitment_state { soft: ( number: $soft_number:expr, hash: $soft_hash:expr, parent: $soft_parent:expr$(,)? ), base_celestia_height: $base_celestia_height:expr $(,)? + ) => { + mount_update_commitment_state!( + $test_env, + mock_name: $mock_name, + firm: ( number: $firm_number, hash: $firm_hash, parent: $firm_parent, ), + soft: ( number: $soft_number, hash: $soft_hash, parent: $soft_parent, ), + base_celestia_height: $base_celestia_height, + expected_calls: 1, + ) + }; + ( + $test_env:ident, + mock_name: $mock_name:expr, + firm: ( number: $firm_number:expr, hash: $firm_hash:expr, parent: $firm_parent:expr$(,)? ), + soft: ( number: $soft_number:expr, hash: $soft_hash:expr, parent: $soft_parent:expr$(,)? ), + base_celestia_height: $base_celestia_height:expr, + expected_calls: $expected_calls:expr + $(,)? ) => { $test_env .mount_update_commitment_state( @@ -201,24 +251,10 @@ macro_rules! mount_update_commitment_state { ), base_celestia_height: $base_celestia_height, ), + $expected_calls, ) .await }; - ( - $test_env:ident, - firm: ( number: $firm_number:expr, hash: $firm_hash:expr, parent: $firm_parent:expr$(,)? ), - soft: ( number: $soft_number:expr, hash: $soft_hash:expr, parent: $soft_parent:expr$(,)? ), - base_celestia_height: $base_celestia_height:expr - $(,)? - ) => { - mount_update_commitment_state!( - $test_env, - mock_name: None, - firm: ( number: $firm_number, hash: $firm_hash, parent: $firm_parent, ), - soft: ( number: $soft_number, hash: $soft_hash, parent: $soft_parent, ), - base_celestia_height: $base_celestia_height, - ) - }; } #[macro_export] @@ -270,7 +306,7 @@ macro_rules! mount_executed_block { #[macro_export] macro_rules! mount_get_filtered_sequencer_block { - ($test_env:ident, sequencer_height: $height:expr $(,)?) => { + ($test_env:ident, sequencer_height: $height:expr, delay: $delay:expr $(,)?) => { $test_env .mount_get_filtered_sequencer_block( ::astria_core::generated::sequencerblock::v1alpha1::GetFilteredSequencerBlockRequest { @@ -278,9 +314,17 @@ macro_rules! mount_get_filtered_sequencer_block { rollup_ids: vec![$crate::ROLLUP_ID.to_raw()], }, $crate::filtered_sequencer_block!(sequencer_height: $height), + $delay, ) .await; }; + ($test_env:ident, sequencer_height: $height:expr$(,)?) => { + mount_get_filtered_sequencer_block!( + $test_env, + sequencer_height: $height, + delay: Duration::from_secs(0), + ) + }; } #[macro_export] diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index 01af51131..4cf983517 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -19,11 +19,7 @@ use astria_core::{ }, primitive::v1::RollupId, }; -use astria_grpc_mock::{ - response::ResponseResult, - AnyMessage, - Respond, -}; +use astria_grpc_mock::response::error_response; use bytes::Bytes; use celestia_types::{ nmt::Namespace, @@ -198,6 +194,7 @@ impl TestConductor { celestia_height: u64, namespace: Namespace, blobs: Vec, + delay: Option, ) { use base64::prelude::*; use wiremock::{ @@ -209,6 +206,7 @@ impl TestConductor { Request, ResponseTemplate, }; + let delay = delay.unwrap_or(Duration::from_millis(0)); let namespace_params = BASE64_STANDARD.encode(namespace.as_bytes()); Mock::given(body_partial_json(json!({ "jsonrpc": "2.0", @@ -222,11 +220,13 @@ impl TestConductor { .respond_with(move |request: &Request| { let body: serde_json::Value = serde_json::from_slice(&request.body).unwrap(); let id = body.get("id"); - ResponseTemplate::new(200).set_body_json(json!({ - "jsonrpc": "2.0", - "id": id, - "result": blobs, - })) + ResponseTemplate::new(200) + .set_body_json(json!({ + "jsonrpc": "2.0", + "id": id, + "result": blobs, + })) + .set_delay(delay) }) .expect(1..) .mount(&self.mock_http) @@ -407,6 +407,7 @@ impl TestConductor { &self, expected_pbjson: S, response: FilteredSequencerBlock, + delay: Duration, ) { use astria_grpc_mock::{ matcher::message_partial_pbjson, @@ -417,7 +418,7 @@ impl TestConductor { "get_filtered_sequencer_block", message_partial_pbjson(&expected_pbjson), ) - .respond_with(constant_response(response)) + .respond_with(constant_response(response).set_delay(delay)) .expect(1..) .mount(&self.mock_grpc.mock_server) .await; @@ -427,6 +428,7 @@ impl TestConductor { &self, mock_name: Option<&str>, commitment_state: CommitmentState, + expected_calls: u64, ) -> astria_grpc_mock::MockGuard { use astria_core::generated::execution::v1alpha2::UpdateCommitmentStateRequest; use astria_grpc_mock::{ @@ -444,7 +446,7 @@ impl TestConductor { if let Some(name) = mock_name { mock = mock.with_name(name); } - mock.expect(1) + mock.expect(expected_calls) .mount_as_scoped(&self.mock_grpc.mock_server) .await } @@ -697,20 +699,3 @@ pub fn rollup_namespace() -> Namespace { pub fn sequencer_namespace() -> Namespace { astria_core::celestia::namespace_v0_from_sha256_of_bytes(SEQUENCER_CHAIN_ID.as_bytes()) } - -pub struct ErrorResponse { - status: tonic::Status, -} - -impl Respond for ErrorResponse { - fn respond(&self, _req: &tonic::Request) -> ResponseResult { - Err(self.status.clone()) - } -} - -#[must_use] -pub fn error_response(code: tonic::Code) -> ErrorResponse { - ErrorResponse { - status: tonic::Status::new(code, "error"), - } -} diff --git a/crates/astria-conductor/tests/blackbox/soft_and_firm.rs b/crates/astria-conductor/tests/blackbox/soft_and_firm.rs index 27cf904ab..4d6679259 100644 --- a/crates/astria-conductor/tests/blackbox/soft_and_firm.rs +++ b/crates/astria-conductor/tests/blackbox/soft_and_firm.rs @@ -33,12 +33,8 @@ use crate::{ /// 4. block information for rollup number 1, sequencer height 2 is reconstructed from Celestia /// height 1 /// 5. the rollup's firm commitment state is updated (but without executing the block) -/// -/// NOTE: there is a potential race condition in this test in that the information could be first -/// retrieved from Celestia before Sequencer and executed against the rollup. In that case step 3. -/// would be skipped (no soft commitment update). #[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn simple() { +async fn executes_soft_first_then_updates_firm() { let test_conductor = spawn_conductor(CommitLevel::SoftAndFirm).await; mount_get_genesis_info!( @@ -74,10 +70,51 @@ async fn simple() { height: 1u32, ); + mount_get_filtered_sequencer_block!( + test_conductor, + sequencer_height: 3, + ); + + let execute_block = mount_executed_block!( + test_conductor, + number: 2, + hash: [2; 64], + parent: [1; 64], + ); + + let update_commitment_state_soft = mount_update_commitment_state!( + test_conductor, + firm: ( + number: 1, + hash: [1; 64], + parent: [0; 64], + ), + soft: ( + number: 2, + hash: [2; 64], + parent: [1; 64], + ), + base_celestia_height: 1, + ); + + timeout( + Duration::from_millis(500), + join( + execute_block.wait_until_satisfied(), + update_commitment_state_soft.wait_until_satisfied(), + ), + ) + .await + .expect( + "Conductor should have executed the block and updated the soft commitment state within \ + 500ms", + ); + mount_celestia_blobs!( test_conductor, celestia_height: 1, sequencer_heights: [3], + delay: Some(Duration::from_millis(500)) ); mount_sequencer_commit!( @@ -87,11 +124,99 @@ async fn simple() { mount_sequencer_validator_set!(test_conductor, height: 2u32); - mount_get_filtered_sequencer_block!( + let update_commitment_state_firm = mount_update_commitment_state!( test_conductor, - sequencer_height: 3, + firm: ( + number: 2, + hash: [2; 64], + parent: [1; 64], + ), + soft: ( + number: 2, + hash: [2; 64], + parent: [1; 64], + ), + base_celestia_height: 1, + ); + + timeout( + Duration::from_millis(1000), + update_commitment_state_firm.wait_until_satisfied(), + ) + .await + .expect("conductor should have updated the firm commitment state within 1000ms"); +} + +/// Tests if a single block is executed and the rollup's state updated after first receiving a firm +/// block, ensuring that update commitment state is not called upon receiving a tardy soft block. +/// Then, ensures the conductor updates the state for the soft block at the next height. +/// +/// The following steps occur: +/// 1. Firm and soft blocks at the current height are mounted, the soft block with a 500ms delay to +/// allow for the firm block to be received first. +/// 2. The soft block for the next height is mounted with a 1000ms delay, so that execution and +/// state update of the current height happen before receipt of the next block. +/// 3. Mounts are made for firm and soft update commitment state calls, with the soft mount +/// expecting exactly 0 calls. +/// 4. 1000ms is allotted for the conductor to execute the block and update the firm commitment +/// state, noting that this allows time to test for an erroneously updated soft commitment state +/// before the conductor receives the next block. +/// 5. 2000ms is allotted for the conductor to execute the next block and update the soft commitment +/// state at the next height. +#[expect( + clippy::too_many_lines, + reason = "all mounts and test logic are necessary" +)] +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn executes_firm_then_soft_at_next_height() { + let test_conductor = spawn_conductor(CommitLevel::SoftAndFirm).await; + + mount_get_genesis_info!( + test_conductor, + sequencer_genesis_block_height: 1, + celestia_block_variance: 10, + ); + + mount_get_commitment_state!( + test_conductor, + firm: ( + number: 1, + hash: [1; 64], + parent: [0; 64], + ), + soft: ( + number: 1, + hash: [1; 64], + parent: [0; 64], + ), + base_celestia_height: 1, ); + mount_abci_info!( + test_conductor, + latest_sequencer_height: 4, + ); + + mount_sequencer_genesis!(test_conductor); + + mount_celestia_header_network_head!( + test_conductor, + height: 1u32, + ); + + mount_celestia_blobs!( + test_conductor, + celestia_height: 1, + sequencer_heights: [3], + ); + + mount_sequencer_commit!( + test_conductor, + height: 3u32, + ); + + mount_sequencer_validator_set!(test_conductor, height: 2u32); + let execute_block = mount_executed_block!( test_conductor, number: 2, @@ -99,8 +224,44 @@ async fn simple() { parent: [1; 64], ); - let update_commitment_state_soft = mount_update_commitment_state!( + // Mount soft block at current height with a slight delay + mount_get_filtered_sequencer_block!( + test_conductor, + sequencer_height: 3, + delay: Duration::from_millis(500), + ); + + // Mount soft block at next height with substantial delay + mount_get_filtered_sequencer_block!( + test_conductor, + sequencer_height: 4, + delay: Duration::from_millis(1000), + ); + + let update_commitment_state_firm = mount_update_commitment_state!( test_conductor, + firm: ( + number: 2, + hash: [2; 64], + parent: [1; 64], + ), + soft: ( + number: 2, + hash: [2; 64], + parent: [1; 64], + ), + base_celestia_height: 1, + ); + + // This guard's conditions will be checked when it is dropped, ensuring that there have been 0 + // calls to update the commitment state for the stale soft block. This is done instead of + // waiting for the guard to be satisfied because if we call `wait_until_satisfied` on it, it + // will succeed immediately and future erroneous calls will not be checked. It would be most + // ideal to mount this logic directly to the server, but this workaround functions with the + // current setup of the blackbox test helpers. + let _stale_update_soft_commitment_state = mount_update_commitment_state!( + test_conductor, + mock_name: "should_be_ignored_update_commitment_state_soft", firm: ( number: 1, hash: [1; 64], @@ -112,9 +273,30 @@ async fn simple() { parent: [1; 64], ), base_celestia_height: 1, + expected_calls: 0, ); - let update_commitment_state_firm = mount_update_commitment_state!( + timeout( + Duration::from_millis(1000), + join( + execute_block.wait_until_satisfied(), + update_commitment_state_firm.wait_until_satisfied(), + ), + ) + .await + .expect( + "Conductor should have executed the block and updated the firm commitment state within \ + 1000ms", + ); + + let execute_block = mount_executed_block!( + test_conductor, + number: 3, + hash: [3; 64], + parent: [2; 64], + ); + + let update_commitment_state_soft = mount_update_commitment_state!( test_conductor, firm: ( number: 2, @@ -122,25 +304,24 @@ async fn simple() { parent: [1; 64], ), soft: ( - number: 2, - hash: [2; 64], - parent: [1; 64], + number: 3, + hash: [3; 64], + parent: [2; 64], ), base_celestia_height: 1, ); timeout( - Duration::from_millis(1000), - join3( + Duration::from_millis(2000), + join( execute_block.wait_until_satisfied(), update_commitment_state_soft.wait_until_satisfied(), - update_commitment_state_firm.wait_until_satisfied(), ), ) .await .expect( - "conductor should have executed the block and updated the soft and firm commitment states \ - within 1000ms", + "conductor should have executed the block and updated the soft commitment state within \ + 2000ms", ); } diff --git a/crates/astria-grpc-mock/src/mock.rs b/crates/astria-grpc-mock/src/mock.rs index f0a7ff3e1..fd3fa5df7 100644 --- a/crates/astria-grpc-mock/src/mock.rs +++ b/crates/astria-grpc-mock/src/mock.rs @@ -8,12 +8,10 @@ use std::ops::{ RangeToInclusive, }; -use super::{ - response::Respond, - AnyMessage, -}; +use super::AnyMessage; use crate::{ mock_server::MockGuard, + response::ResponseTemplate, MockServer, }; @@ -32,7 +30,7 @@ impl Match for Matcher { pub struct Mock { pub(crate) rpc: &'static str, pub(crate) matchers: Vec, - pub(crate) response: Box, + pub(crate) response: ResponseTemplate, pub(crate) max_n_matches: Option, pub(crate) expectation_range: Times, pub(crate) name: Option, @@ -86,7 +84,7 @@ impl MockBuilder { self } - pub fn respond_with(self, rsp: impl Respond + 'static) -> Mock { + pub fn respond_with(self, rsp: ResponseTemplate) -> Mock { let Self { rpc, matchers, @@ -94,7 +92,7 @@ impl MockBuilder { Mock { rpc, matchers, - response: Box::new(rsp), + response: rsp, max_n_matches: None, name: None, expectation_range: Times(TimesEnum::Unbounded(RangeFull)), diff --git a/crates/astria-grpc-mock/src/mock_server.rs b/crates/astria-grpc-mock/src/mock_server.rs index bd74a5b8b..09b6c8d53 100644 --- a/crates/astria-grpc-mock/src/mock_server.rs +++ b/crates/astria-grpc-mock/src/mock_server.rs @@ -54,7 +54,11 @@ impl MockServer { rpc: &'static str, req: tonic::Request, ) -> tonic::Result> { - self.state.write().await.handle_request(rpc, req) + let (response, delay) = self.state.write().await.handle_request(rpc, req); + if let Some(delay) = delay { + tokio::time::sleep(delay).await; + } + response } pub async fn register(&self, mock: Mock) { @@ -176,7 +180,10 @@ impl MockServerState { &mut self, rpc: &'static str, req: tonic::Request, - ) -> tonic::Result> { + ) -> ( + tonic::Result>, + Option, + ) { if let Some(received_requests) = &mut self.received_requests { received_requests.push((rpc, erase_request(clone_request(&req)).into())); } diff --git a/crates/astria-grpc-mock/src/mock_set.rs b/crates/astria-grpc-mock/src/mock_set.rs index 341d858a4..59e4169ff 100644 --- a/crates/astria-grpc-mock/src/mock_set.rs +++ b/crates/astria-grpc-mock/src/mock_set.rs @@ -55,29 +55,34 @@ impl MockSet { &mut self, rpc: &'static str, req: tonic::Request, - ) -> tonic::Result> { + ) -> ( + tonic::Result>, + Option, + ) { debug!(rpc, "handling request."); // perform erasure here so that it's not done in every single `Mock::matches` call. let erased = erase_request(req); let mut mock_response: Option>> = None; + let mut delay = None; for (mock, mock_state) in &mut self.mocks { if let MountedMockState::OutOfScope = mock_state { continue; } match mock.match_and_respond::(rpc, &erased) { - MockResult::NoMatch => continue, - MockResult::BadResponse(status) => { + (MockResult::NoMatch, _) => continue, + (MockResult::BadResponse(status), _) => { mock_response.replace(Err(status)); break; } - MockResult::Success(response) => { + (MockResult::Success(response), response_delay) => { mock_response.replace(response); + delay = response_delay; break; } } } - mock_response + let result = mock_response .ok_or_else(|| { let mut msg = "got unexpected request: ".to_string(); msg.push_str( @@ -86,7 +91,8 @@ impl MockSet { ); tonic::Status::not_found(msg) }) - .and_then(std::convert::identity) + .and_then(std::convert::identity); + (result, delay) } pub(crate) fn register(&mut self, mock: Mock) -> (Arc<(Notify, AtomicBool)>, MockId) { diff --git a/crates/astria-grpc-mock/src/mounted_mock.rs b/crates/astria-grpc-mock/src/mounted_mock.rs index efd14f52a..5fed8b00a 100644 --- a/crates/astria-grpc-mock/src/mounted_mock.rs +++ b/crates/astria-grpc-mock/src/mounted_mock.rs @@ -111,7 +111,7 @@ impl MountedMock { &mut self, rpc: &'static str, request: &Request, - ) -> MockResult { + ) -> (MockResult, Option) { let n_matches = u64::try_from(self.successful_responses.len() + self.bad_responses.len()).ok(); if self.inner.max_n_matches == n_matches @@ -122,16 +122,18 @@ impl MountedMock { .iter() .all(|matcher| matcher.matches(request)) { - return MockResult::NoMatch; + return (MockResult::NoMatch, None); } + let mut delay = None; let response = match self.inner.response.respond(request) { - Err(status) => { + (Err(status), _) => { self.successful_responses .push((clone_request(request), Err(status.clone()))); Ok(Err(status)) } - Ok(mock_response) => { + (Ok(mock_response), rsp_delay) => { + delay = rsp_delay; let (metadata, erased_message, extensions) = clone_response(&mock_response.inner).into_parts(); if let Ok(message) = erased_message.clone_box().into_any().downcast::() { @@ -173,8 +175,8 @@ impl MountedMock { self.notify.0.notify_waiters(); } match response { - Ok(ok) => MockResult::Success(ok), - Err(err) => MockResult::BadResponse(err), + Ok(ok) => (MockResult::Success(ok), delay), + Err(err) => (MockResult::BadResponse(err), None), } } diff --git a/crates/astria-grpc-mock/src/response.rs b/crates/astria-grpc-mock/src/response.rs index 3e05e669d..6073d64c4 100644 --- a/crates/astria-grpc-mock/src/response.rs +++ b/crates/astria-grpc-mock/src/response.rs @@ -1,4 +1,7 @@ -use std::marker::PhantomData; +use std::{ + marker::PhantomData, + time::Duration, +}; use super::{ clone_response, @@ -10,14 +13,17 @@ pub fn constant_response< T: erased_serde::Serialize + prost::Name + Clone + Default + Send + Sync + 'static, >( value: T, -) -> ConstantResponse { - ConstantResponse { - type_name: std::any::type_name::(), - response: erase_response(tonic::Response::new(value)), +) -> ResponseTemplate { + ResponseTemplate { + response: Box::new(ConstantResponse { + type_name: std::any::type_name::(), + response: erase_response(tonic::Response::new(value)), + }), + delay: None, } } -pub struct ConstantResponse { +struct ConstantResponse { type_name: &'static str, response: tonic::Response, } @@ -34,37 +40,30 @@ impl Respond for ConstantResponse { #[must_use] pub fn default_response< T: erased_serde::Serialize + prost::Name + Clone + Default + Send + Sync + 'static, ->() -> DefaultResponse { +>() -> ResponseTemplate { let response = T::default(); - DefaultResponse { - type_name: std::any::type_name::(), - response: erase_response(tonic::Response::new(response)), - } -} - -pub struct DefaultResponse { - type_name: &'static str, - response: tonic::Response, -} - -impl Respond for DefaultResponse { - fn respond(&self, _req: &tonic::Request) -> ResponseResult { - Ok(MockResponse { - type_name: self.type_name, - inner: clone_response(&self.response), - }) + ResponseTemplate { + response: Box::new(ConstantResponse { + type_name: std::any::type_name::(), + response: erase_response(tonic::Response::new(response)), + }), + delay: None, } } -pub fn dynamic_response(responder: F) -> DynamicResponse +pub fn dynamic_response(responder: F) -> ResponseTemplate where O: erased_serde::Serialize + prost::Name + Clone + 'static, - F: Fn(&I) -> O, + F: Send + Sync + 'static + Fn(&I) -> O, + I: Send + Sync + 'static, { - DynamicResponse { - type_name: std::any::type_name::(), - responder: Box::new(responder), - _phantom_data: PhantomData, + ResponseTemplate { + response: Box::new(DynamicResponse { + type_name: std::any::type_name::(), + responder: Box::new(responder), + _phantom_data: PhantomData, + }), + delay: None, } } @@ -74,6 +73,26 @@ pub struct DynamicResponse { _phantom_data: PhantomData<(I, O)>, } +struct ErrorResponse { + status: tonic::Status, +} + +impl Respond for ErrorResponse { + fn respond(&self, _req: &tonic::Request) -> ResponseResult { + Err(self.status.clone()) + } +} + +#[must_use] +pub fn error_response(code: tonic::Code) -> ResponseTemplate { + ResponseTemplate { + response: Box::new(ErrorResponse { + status: tonic::Status::new(code, "error"), + }), + delay: None, + } +} + impl Respond for DynamicResponse where I: Send + Sync + 'static, @@ -119,6 +138,26 @@ impl Clone for MockResponse { } } +pub struct ResponseTemplate { + response: Box, + delay: Option, +} + +impl ResponseTemplate { + pub(crate) fn respond( + &self, + req: &tonic::Request, + ) -> (ResponseResult, Option) { + (self.response.respond(req), self.delay) + } + + #[must_use] + pub fn set_delay(mut self, delay: Duration) -> Self { + self.delay = Some(delay); + self + } +} + pub trait Respond: Send + Sync { fn respond(&self, req: &tonic::Request) -> ResponseResult; }