diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 45fd44a7c05..35b6cf876e3 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -121,7 +121,7 @@ pub enum TransactionDownloadVerifyError { #[error("transaction download / verification was cancelled")] Cancelled, - #[error("transaction did not pass consensus validation")] + #[error("transaction did not pass consensus validation, error: {0:?}")] Invalid(#[from] zebra_consensus::error::TransactionError), } diff --git a/zebrad/tests/common/lightwalletd/send_transaction_test.rs b/zebrad/tests/common/lightwalletd/send_transaction_test.rs index bee6cf78356..e5ed18ea1b7 100644 --- a/zebrad/tests/common/lightwalletd/send_transaction_test.rs +++ b/zebrad/tests/common/lightwalletd/send_transaction_test.rs @@ -17,15 +17,19 @@ //! already been seen in a block. use std::{cmp::min, sync::Arc, time::Duration}; +use tower::BoxError; -use color_eyre::eyre::Result; +use color_eyre::eyre::{eyre, Result}; use zebra_chain::{ - parameters::Network::{self, *}, + block::Block, + parameters::Network::*, serialization::ZcashSerialize, transaction::{self, Transaction}, }; +use zebra_node_services::rpc_client::RpcRequestClient; use zebra_rpc::queue::CHANNEL_AND_QUEUE_CAPACITY; +use zebra_test::prelude::TestChild; use zebrad::components::mempool::downloads::MAX_INBOUND_CONCURRENCY; use crate::common::{ @@ -34,10 +38,14 @@ use crate::common::{ lightwalletd::{ can_spawn_lightwalletd_for_rpc, spawn_lightwalletd_for_rpc, sync::wait_for_zebrad_and_lightwalletd_sync, - wallet_grpc::{self, connect_to_lightwalletd, Empty, Exclude}, + wallet_grpc::{ + self, compact_tx_streamer_client::CompactTxStreamerClient, connect_to_lightwalletd, + Empty, Exclude, + }, }, + regtest::MiningRpcMethods, sync::LARGE_CHECKPOINT_TIMEOUT, - test_type::TestType::{self, *}, + test_type::TestType::*, }; /// The maximum number of transactions we want to send in the test. @@ -85,11 +93,19 @@ pub async fn run() -> Result<()> { "running gRPC send transaction test using lightwalletd & zebrad", ); - let transactions = - load_transactions_from_future_blocks(network.clone(), test_type, test_name).await?; + let mut count = 0; + let blocks: Vec = + get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS) + .await? + .into_iter() + .take_while(|block| { + count += block.transactions.len() - 1; + count <= max_sent_transactions() + }) + .collect(); tracing::info!( - transaction_count = ?transactions.len(), + blocks_count = ?blocks.len(), partial_sync_path = ?zebrad_state_path, "got transactions to send, spawning isolated zebrad...", ); @@ -113,6 +129,8 @@ pub async fn run() -> Result<()> { let zebra_rpc_address = zebra_rpc_address.expect("lightwalletd test must have RPC port"); + let zebrad_rpc_client = RpcRequestClient::new(zebra_rpc_address); + tracing::info!( ?test_type, ?zebra_rpc_address, @@ -164,6 +182,48 @@ pub async fn run() -> Result<()> { .await? .into_inner(); + let mut has_tx_with_shielded_elements = false; + let mut counter = 0; + + for block in blocks { + let (zebrad_child, has_shielded_elements, count) = send_transactions_from_block( + zebrad, + &mut rpc_client, + &zebrad_rpc_client, + block.clone(), + ) + .await?; + + zebrad = zebrad_child; + + has_tx_with_shielded_elements |= has_shielded_elements; + counter += count; + + zebrad_rpc_client.submit_block(block).await?; + } + + // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. + assert!( + !has_tx_with_shielded_elements || counter >= 1, + "failed to read v4+ transactions with shielded elements \ + from future blocks in mempool via lightwalletd" + ); + + Ok(()) +} + +/// Sends non-coinbase transactions from a block to the mempool, verifies that the transactions +/// can be found in the mempool via lightwalletd, and commits the block to Zebra's chainstate. +/// +/// Returns the zebrad test child that's handling the RPC requests. + +#[tracing::instrument(skip_all)] +async fn send_transactions_from_block( + mut zebrad: TestChild, + rpc_client: &mut CompactTxStreamerClient, + zebrad_rpc_client: &RpcRequestClient, + block: Block, +) -> Result<(TestChild, bool, usize)> { // Lightwalletd won't call `get_raw_mempool` again until 2 seconds after the last call: // // @@ -171,6 +231,16 @@ pub async fn run() -> Result<()> { let sleep_until_lwd_last_mempool_refresh = tokio::time::sleep(std::time::Duration::from_secs(4)); + let transactions: Vec<_> = block + .transactions + .iter() + .filter(|tx| !tx.is_coinbase()) + .collect(); + + if transactions.is_empty() { + return Ok((zebrad, false, 0)); + } + let transaction_hashes: Vec = transactions.iter().map(|tx| tx.hash()).collect(); @@ -181,7 +251,7 @@ pub async fn run() -> Result<()> { ); let mut has_tx_with_shielded_elements = false; - for transaction in transactions { + for &transaction in &transactions { let transaction_hash = transaction.hash(); // See @@ -195,20 +265,29 @@ pub async fn run() -> Result<()> { tracing::info!(?transaction_hash, "sending transaction..."); - let request = prepare_send_transaction_request(transaction); - - let response = rpc_client.send_transaction(request).await?.into_inner(); + let request = prepare_send_transaction_request(transaction.clone()); + + match rpc_client.send_transaction(request).await { + Ok(response) => assert_eq!(response.into_inner(), expected_response), + Err(err) => { + tracing::warn!(?err, "failed to send transaction"); + zebrad_rpc_client + .send_transaction(transaction) + .await + .map_err(|e| eyre!(e))?; + } + }; + } - assert_eq!(response, expected_response); + if transactions.len() >= 10 { + // Check if some transaction is sent to mempool, + // Fails if there are only coinbase transactions in the first 50 future blocks + tracing::info!("waiting for mempool to verify some transactions..."); + zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; } - // Check if some transaction is sent to mempool, - // Fails if there are only coinbase transactions in the first 50 future blocks - tracing::info!("waiting for mempool to verify some transactions..."); - zebrad.expect_stdout_line_matches("sending mempool transaction broadcast")?; // Wait for more transactions to verify, `GetMempoolTx` only returns txs where tx.HasShieldedElements() // - tokio::time::sleep(std::time::Duration::from_secs(2)).await; sleep_until_lwd_last_mempool_refresh.await; tracing::info!("calling GetMempoolTx gRPC to fetch transactions..."); @@ -233,7 +312,7 @@ pub async fn run() -> Result<()> { if tx_log.is_err() { tracing::info!("lightwalletd didn't query the mempool, skipping mempool contents checks"); - return Ok(()); + return Ok((zebrad, has_tx_with_shielded_elements, 0)); } tracing::info!("checking the mempool contains some of the sent transactions..."); @@ -251,15 +330,6 @@ pub async fn run() -> Result<()> { counter += 1; } - // GetMempoolTx: make sure at least one of the transactions were inserted into the mempool. - // - // TODO: Update `load_transactions_from_future_blocks()` to return block height offsets and, - // only check if a transaction from the first block has shielded elements - assert!( - !has_tx_with_shielded_elements || counter >= 1, - "failed to read v4+ transactions with shielded elements from future blocks in mempool via lightwalletd" - ); - // TODO: GetMempoolStream: make sure at least one of the transactions were inserted into the mempool. tracing::info!("calling GetMempoolStream gRPC to fetch transactions..."); let mut transaction_stream = rpc_client.get_mempool_stream(Empty {}).await?.into_inner(); @@ -270,32 +340,7 @@ pub async fn run() -> Result<()> { _counter += 1; } - Ok(()) -} - -/// Loads transactions from a few block(s) after the chain tip of the cached state. -/// -/// Returns a list of non-coinbase transactions from blocks that have not been finalized to disk -/// in the `ZEBRA_CACHED_STATE_DIR`. -/// -/// ## Panics -/// -/// If the provided `test_type` doesn't need an rpc server and cached state -#[tracing::instrument] -async fn load_transactions_from_future_blocks( - network: Network, - test_type: TestType, - test_name: &str, -) -> Result>> { - let transactions = get_future_blocks(&network, test_type, test_name, MAX_NUM_FUTURE_BLOCKS) - .await? - .into_iter() - .flat_map(|block| block.transactions) - .filter(|transaction| !transaction.is_coinbase()) - .take(max_sent_transactions()) - .collect(); - - Ok(transactions) + Ok((zebrad, has_tx_with_shielded_elements, counter)) } /// Prepare a request to send to lightwalletd that contains a transaction to be sent. @@ -307,3 +352,21 @@ fn prepare_send_transaction_request(transaction: Arc) -> wallet_grp height: 0, } } + +trait SendTransactionMethod { + async fn send_transaction( + &self, + transaction: &Arc, + ) -> Result; +} + +impl SendTransactionMethod for RpcRequestClient { + async fn send_transaction( + &self, + transaction: &Arc, + ) -> Result { + let tx_data = hex::encode(transaction.zcash_serialize_to_vec()?); + self.json_result_from_call("sendrawtransaction", format!(r#"["{tx_data}"]"#)) + .await + } +} diff --git a/zebrad/tests/common/lightwalletd/sync.rs b/zebrad/tests/common/lightwalletd/sync.rs index 8dce05a9150..3a55aedb5c2 100644 --- a/zebrad/tests/common/lightwalletd/sync.rs +++ b/zebrad/tests/common/lightwalletd/sync.rs @@ -32,7 +32,7 @@ pub fn wait_for_zebrad_and_lightwalletd_sync< wait_for_zebrad_mempool: bool, wait_for_zebrad_tip: bool, ) -> Result<(TestChild, TestChild

)> { - let is_zebrad_finished = AtomicBool::new(false); + let is_zebrad_finished = AtomicBool::new(!wait_for_zebrad_tip); let is_lightwalletd_finished = AtomicBool::new(false); let is_zebrad_finished = &is_zebrad_finished;