diff --git a/.changelog/unreleased/bug-fixes/3075-force-fee-payment.md b/.changelog/unreleased/bug-fixes/3075-force-fee-payment.md new file mode 100644 index 0000000000..7b15a59a63 --- /dev/null +++ b/.changelog/unreleased/bug-fixes/3075-force-fee-payment.md @@ -0,0 +1,2 @@ +- Fixed the fee collection logic in `finalize_block` to match that of + `process_proposal`. ([\#3075](https://github.com/anoma/namada/issues/3075)) \ No newline at end of file diff --git a/crates/namada/src/ledger/mod.rs b/crates/namada/src/ledger/mod.rs index 14c4a8b622..a7d4dbd705 100644 --- a/crates/namada/src/ledger/mod.rs +++ b/crates/namada/src/ledger/mod.rs @@ -61,15 +61,11 @@ mod dry_run_tx { Gas::try_from(wrapper.gas_limit).into_storage_result()?; let tx_gas_meter = RefCell::new(TxGasMeter::new(gas_limit)); let tx_result = protocol::apply_wrapper_tx( - tx.clone(), + &tx, &wrapper, &request.data, - ShellParams::new( - &tx_gas_meter, - &mut temp_state, - &mut ctx.vp_wasm_cache, - &mut ctx.tx_wasm_cache, - ), + &tx_gas_meter, + &mut temp_state, None, ) .into_storage_result()?; diff --git a/crates/namada/src/ledger/protocol/mod.rs b/crates/namada/src/ledger/protocol/mod.rs index 74e17fbae3..5d67b45d3d 100644 --- a/crates/namada/src/ledger/protocol/mod.rs +++ b/crates/namada/src/ledger/protocol/mod.rs @@ -16,9 +16,9 @@ use namada_gas::TxGasMeter; use namada_sdk::tx::TX_TRANSFER_WASM; use namada_state::StorageWrite; use namada_token::event::{TokenEvent, TokenOperation, UserAccount}; -use namada_tx::data::protocol::ProtocolTxType; +use namada_tx::data::protocol::{ProtocolTx, ProtocolTxType}; use namada_tx::data::{ - BatchResults, BatchedTxResult, TxResult, TxType, VpStatusFlags, VpsResult, + BatchResults, BatchedTxResult, TxResult, VpStatusFlags, VpsResult, WrapperTx, }; use namada_tx::{BatchedTxRef, Tx}; @@ -178,51 +178,150 @@ impl From for DispatchError { } } +/// Arguments for transactions' execution +pub enum DispatchArgs<'a, CA: 'static + WasmCacheAccess + Sync> { + /// Protocol tx data + Protocol(&'a ProtocolTx), + /// Raw tx data + Raw { + /// The tx index + tx_index: TxIndex, + /// The result of the corresponding wrapper tx (missing if governance + /// transaction) + wrapper_tx_result: Option>, + /// Vp cache + vp_wasm_cache: &'a mut VpCache, + /// Tx cache + tx_wasm_cache: &'a mut TxCache, + }, + /// Wrapper tx data + Wrapper { + /// The wrapper header + wrapper: &'a WrapperTx, + /// The transaction bytes for gas accounting + tx_bytes: &'a [u8], + /// The block proposer + block_proposer: &'a Address, + }, +} + /// Dispatch a given transaction to be applied based on its type. Some storage /// updates may be derived and applied natively rather than via the wasm /// environment, in which case validity predicates will be bypassed. -#[allow(clippy::too_many_arguments)] pub fn dispatch_tx<'a, D, H, CA>( - tx: Tx, - tx_bytes: &'a [u8], - tx_index: TxIndex, + tx: &Tx, + dispatch_args: DispatchArgs<'a, CA>, tx_gas_meter: &'a RefCell, state: &'a mut WlState, - vp_wasm_cache: &'a mut VpCache, - tx_wasm_cache: &'a mut TxCache, - block_proposer: Option<&Address>, ) -> std::result::Result, DispatchError> where D: 'static + DB + for<'iter> DBIter<'iter> + Sync, H: 'static + StorageHasher + Sync, CA: 'static + WasmCacheAccess + Sync, { - match tx.header().tx_type { - // Raw trasaction type is allowed only for governance proposals - TxType::Raw => { - // No bundles of governance transactions, just take the first one - let cmt = tx.first_commitments().ok_or(Error::MissingInnerTxs)?; - let batched_tx_result = apply_wasm_tx( - BatchedTxRef { tx: &tx, cmt }, - &tx_index, - ShellParams { - tx_gas_meter, - state, - vp_wasm_cache, - tx_wasm_cache, - }, - )?; + match dispatch_args { + DispatchArgs::Raw { + tx_index, + wrapper_tx_result, + vp_wasm_cache, + tx_wasm_cache, + } => { + if let Some(mut tx_result) = wrapper_tx_result { + // Replay protection check on the batch + let tx_hash = tx.raw_header_hash(); + if state.write_log().has_replay_protection_entry(&tx_hash) { + // If the same batch has already been committed in + // this block, skip execution and return + return Err(DispatchError { + error: Error::ReplayAttempt(tx_hash), + tx_result: None, + }); + } - Ok(TxResult { - gas_used: tx_gas_meter.borrow().get_tx_consumed_gas(), - batch_results: BatchResults( - [(cmt.get_hash(), Ok(batched_tx_result))] - .into_iter() - .collect(), - ), - }) + // TODO(namada#2597): handle masp fee payment in the first inner + // tx if necessary + for cmt in tx.commitments() { + match apply_wasm_tx( + tx.batch_ref_tx(cmt), + &tx_index, + ShellParams { + tx_gas_meter, + state, + vp_wasm_cache, + tx_wasm_cache, + }, + ) { + Err(Error::GasError(ref msg)) => { + // Gas error aborts the execution of the entire + // batch + tx_result.gas_used = + tx_gas_meter.borrow().get_tx_consumed_gas(); + tx_result.batch_results.0.insert( + cmt.get_hash(), + Err(Error::GasError(msg.to_owned())), + ); + state.write_log_mut().drop_tx(); + return Err(DispatchError { + error: Error::GasError(msg.to_owned()), + tx_result: Some(tx_result), + }); + } + res => { + let is_accepted = matches!(&res, Ok(result) if result.is_accepted()); + + tx_result + .batch_results + .0 + .insert(cmt.get_hash(), res); + tx_result.gas_used = + tx_gas_meter.borrow().get_tx_consumed_gas(); + if is_accepted { + state.write_log_mut().commit_tx_to_batch(); + } else { + state.write_log_mut().drop_tx(); + + if tx.header.atomic { + // Stop the execution of an atomic batch at + // the first failed transaction + return Err(DispatchError { + error: Error::FailingAtomicBatch( + cmt.get_hash(), + ), + tx_result: Some(tx_result), + }); + } + } + } + }; + } + + Ok(tx_result) + } else { + // Governance proposal. We don't allow tx batches in this case, + // just take the first one + let cmt = + tx.first_commitments().ok_or(Error::MissingInnerTxs)?; + let batched_tx_result = apply_wasm_tx( + tx.batch_ref_tx(cmt), + &tx_index, + ShellParams { + tx_gas_meter, + state, + vp_wasm_cache, + tx_wasm_cache, + }, + )?; + Ok(TxResult { + gas_used: tx_gas_meter.borrow().get_tx_consumed_gas(), + batch_results: BatchResults( + [(cmt.get_hash(), Ok(batched_tx_result))] + .into_iter() + .collect(), + ), + }) + } } - TxType::Protocol(protocol_tx) => { + DispatchArgs::Protocol(protocol_tx) => { // No bundles of protocol transactions, only take the first one let cmt = tx.first_commitments().ok_or(Error::MissingInnerTxs)?; let batched_tx_result = @@ -237,85 +336,21 @@ where ..Default::default() }) } - TxType::Wrapper(ref wrapper) => { - let mut tx_result = apply_wrapper_tx( - tx.clone(), + DispatchArgs::Wrapper { + wrapper, + tx_bytes, + block_proposer, + } => { + let tx_result = apply_wrapper_tx( + tx, wrapper, tx_bytes, - ShellParams { - tx_gas_meter, - state, - vp_wasm_cache, - tx_wasm_cache, - }, - block_proposer, + tx_gas_meter, + state, + Some(block_proposer), ) .map_err(|e| Error::WrapperRunnerError(e.to_string()))?; - // Replay protection check on the batch - let tx_hash = tx.raw_header_hash(); - if state.write_log().has_replay_protection_entry(&tx_hash) { - // If the same batch has already been committed in - // this block, skip execution and return - return Err(DispatchError { - error: Error::ReplayAttempt(tx_hash), - tx_result: None, - }); - } - - // TODO(namada#2597): handle masp fee payment in the first inner tx - // if necessary - for cmt in tx.commitments() { - match apply_wasm_tx( - tx.batch_ref_tx(cmt), - &tx_index, - ShellParams { - tx_gas_meter, - state, - vp_wasm_cache, - tx_wasm_cache, - }, - ) { - Err(Error::GasError(ref msg)) => { - // Gas error aborts the execution of the entire batch - tx_result.gas_used = - tx_gas_meter.borrow().get_tx_consumed_gas(); - tx_result.batch_results.0.insert( - cmt.get_hash(), - Err(Error::GasError(msg.to_owned())), - ); - state.write_log_mut().drop_tx(); - return Err(DispatchError { - error: Error::GasError(msg.to_owned()), - tx_result: Some(tx_result), - }); - } - res => { - let is_accepted = - matches!(&res, Ok(result) if result.is_accepted()); - - tx_result.batch_results.0.insert(cmt.get_hash(), res); - tx_result.gas_used = - tx_gas_meter.borrow().get_tx_consumed_gas(); - if is_accepted { - state.write_log_mut().commit_tx_to_batch(); - } else { - state.write_log_mut().drop_tx(); - - if tx.header.atomic { - // Stop the execution of an atomic batch at the - // first failed transaction - return Err(DispatchError { - error: Error::FailingAtomicBatch( - cmt.get_hash(), - ), - tx_result: Some(tx_result), - }); - } - } - } - }; - } Ok(tx_result) } } @@ -343,40 +378,38 @@ where /// - gas accounting // TODO(namada#2597): this must signal to the caller if we need masp fee payment // in the first inner tx of the batch -pub(crate) fn apply_wrapper_tx( - tx: Tx, +pub(crate) fn apply_wrapper_tx( + tx: &Tx, wrapper: &WrapperTx, tx_bytes: &[u8], - mut shell_params: ShellParams<'_, S, D, H, CA>, + tx_gas_meter: &RefCell, + state: &mut S, block_proposer: Option<&Address>, ) -> Result> where S: State + Sync, D: 'static + DB + for<'iter> DBIter<'iter> + Sync, H: 'static + StorageHasher + Sync, - CA: 'static + WasmCacheAccess + Sync, { let wrapper_tx_hash = tx.header_hash(); // Write wrapper tx hash to storage - shell_params - .state + state .write_log_mut() .write_tx_hash(wrapper_tx_hash) .expect("Error while writing tx hash to storage"); // Charge fee before performing any fallible operations - charge_fee(wrapper, wrapper_tx_hash, &mut shell_params, block_proposer)?; + charge_fee(wrapper, wrapper_tx_hash, state, block_proposer)?; // Account for gas - shell_params - .tx_gas_meter + tx_gas_meter .borrow_mut() .add_wrapper_gas(tx_bytes) .map_err(|err| Error::GasError(err.to_string()))?; Ok(TxResult { - gas_used: shell_params.tx_gas_meter.borrow().get_tx_consumed_gas(), + gas_used: tx_gas_meter.borrow().get_tx_consumed_gas(), batch_results: BatchResults::default(), }) } @@ -385,34 +418,30 @@ where /// - Fee amount overflows /// - Not enough funds are available to pay the entire amount of the fee /// - The accumulated fee amount to be credited to the block proposer overflows -fn charge_fee( +fn charge_fee( wrapper: &WrapperTx, wrapper_tx_hash: Hash, - shell_params: &mut ShellParams<'_, S, D, H, CA>, + state: &mut S, block_proposer: Option<&Address>, ) -> Result<()> where S: State + Sync, D: 'static + DB + for<'iter> DBIter<'iter> + Sync, H: 'static + StorageHasher + Sync, - CA: 'static + WasmCacheAccess + Sync, { // Charge or check fees before propagating any possible error let payment_result = match block_proposer { - Some(block_proposer) => transfer_fee( - shell_params.state, - block_proposer, - wrapper, - wrapper_tx_hash, - ), + Some(block_proposer) => { + transfer_fee(state, block_proposer, wrapper, wrapper_tx_hash) + } None => { - check_fees(shell_params.state, wrapper)?; + check_fees(state, wrapper)?; Ok(()) } }; // Commit tx write log even in case of subsequent errors - shell_params.state.write_log_mut().commit_tx(); + state.write_log_mut().commit_tx(); payment_result } @@ -1270,7 +1299,7 @@ mod tests { // "execute" a dummy tx, by manually performing its state changes let (dummy_tx, changed_keys, verifiers) = { - let mut tx = Tx::from_type(TxType::Raw); + let mut tx = Tx::from_type(namada_tx::data::TxType::Raw); tx.set_code(namada_tx::Code::new(vec![], None)); tx.set_data(namada_tx::Data::new(vec![])); diff --git a/crates/node/src/shell/finalize_block.rs b/crates/node/src/shell/finalize_block.rs index 257d16216e..ee65f3b057 100644 --- a/crates/node/src/shell/finalize_block.rs +++ b/crates/node/src/shell/finalize_block.rs @@ -15,7 +15,7 @@ use namada::ledger::events::EmitEvents; use namada::ledger::gas::GasMetering; use namada::ledger::ibc; use namada::ledger::pos::namada_proof_of_stake; -use namada::ledger::protocol::DispatchError; +use namada::ledger::protocol::{DispatchArgs, DispatchError}; use namada::proof_of_stake; use namada::proof_of_stake::storage::{ find_validator_by_raw_hash, write_last_block_proposer_address, @@ -146,214 +146,29 @@ where // Tracks the accepted transactions self.state.in_mem_mut().block.results = BlockResults::default(); let mut changed_keys = BTreeSet::new(); - for (tx_index, processed_tx) in req.txs.iter().enumerate() { - let tx = if let Ok(tx) = Tx::try_from(processed_tx.tx.as_ref()) { - tx - } else { - tracing::error!( - "FinalizeBlock received a tx that could not be \ - deserialized to a Tx type. This is likely a protocol \ - transaction." - ); - continue; - }; - - let result_code = ResultCode::from_u32(processed_tx.result.code) - .expect("Result code conversion should not fail"); - - // If [`process_proposal`] rejected a Tx due to invalid signature, - // emit an event here and move on to next tx. - if result_code == ResultCode::InvalidSig { - let base_event = match tx.header().tx_type { - TxType::Wrapper(_) | TxType::Protocol(_) => { - new_tx_event(&tx, height.0) - } - _ => { - tracing::error!( - "Internal logic error: FinalizeBlock received a \ - tx with an invalid signature error code that \ - could not be deserialized to a WrapperTx / \ - ProtocolTx type" - ); - continue; - } - }; - response.events.emit( - base_event - .with(Code(result_code)) - .with(Info(format!( - "Tx rejected: {}", - &processed_tx.result.info - ))) - .with(GasUsed(0.into())), - ); - continue; - } - - if tx.validate_tx().is_err() { - tracing::error!( - "Internal logic error: FinalizeBlock received tx that \ - could not be deserialized to a valid TxType" - ); - continue; - }; - let tx_header = tx.header(); - // If [`process_proposal`] rejected a Tx, emit an event here and - // move on to next tx - if result_code != ResultCode::Ok { - response.events.emit( - new_tx_event(&tx, height.0) - .with(Code(result_code)) - .with(Info(format!( - "Tx rejected: {}", - &processed_tx.result.info - ))) - .with(GasUsed(0.into())), - ); - continue; - } - - let (tx_gas_meter, block_proposer) = match &tx_header.tx_type { - TxType::Wrapper(wrapper) => { - stats.increment_wrapper_txs(); - let gas_limit = match Gas::try_from(wrapper.gas_limit) { - Ok(value) => value, - Err(_) => { - response.events.emit( - new_tx_event(&tx, height.0) - .with(Code(ResultCode::InvalidTx)) - .with(Info( - "The wrapper gas limit overflowed gas \ - representation" - .to_owned(), - )) - .with(GasUsed(0.into())), - ); - continue; - } - }; - let gas_meter = TxGasMeter::new(gas_limit); - for cmt in tx.commitments() { - if let Some(code_sec) = tx - .get_section(cmt.code_sechash()) - .and_then(|x| Section::code_sec(x.as_ref())) - { - stats.increment_tx_type( - code_sec.code.hash().to_string(), - ); - } - } - (gas_meter, Some(&native_block_proposer_address)) - } - TxType::Raw => { - tracing::error!( - "Internal logic error: FinalizeBlock received a \ - TxType::Raw transaction" - ); - continue; - } - TxType::Protocol(protocol_tx) => match protocol_tx.tx { - ProtocolTxType::BridgePoolVext - | ProtocolTxType::BridgePool - | ProtocolTxType::ValSetUpdateVext - | ProtocolTxType::ValidatorSetUpdate => { - (TxGasMeter::new_from_sub_limit(0.into()), None) - } - ProtocolTxType::EthEventsVext => { - let ext = - ethereum_tx_data_variants::EthEventsVext::try_from( - &tx, - ) - .unwrap(); - if self - .mode - .get_validator_address() - .map(|validator| { - validator == &ext.data.validator_addr - }) - .unwrap_or(false) - { - for event in ext.data.ethereum_events.iter() { - self.mode.dequeue_eth_event(event); - } - } - (TxGasMeter::new_from_sub_limit(0.into()), None) - } - ProtocolTxType::EthereumEvents => { - let digest = - ethereum_tx_data_variants::EthereumEvents::try_from( - &tx - ).unwrap(); - if let Some(address) = - self.mode.get_validator_address().cloned() - { - let this_signer = &( - address, - self.state.in_mem().get_last_block_height(), - ); - for MultiSignedEthEvent { event, signers } in - &digest.events - { - if signers.contains(this_signer) { - self.mode.dequeue_eth_event(event); - } - } - } - (TxGasMeter::new_from_sub_limit(0.into()), None) - } - }, - }; - let replay_protection_hashes = - if matches!(tx_header.tx_type, TxType::Wrapper(_)) { - Some(ReplayProtectionHashes { - raw_header_hash: tx.raw_header_hash(), - header_hash: tx.header_hash(), - }) - } else { - None - }; - let tx_gas_meter = RefCell::new(tx_gas_meter); - let mut tx_event = new_tx_event(&tx, height.0); - let is_atomic_batch = tx.header.atomic; - let commitments_len = tx.commitments().len() as u64; - let tx_hash = tx.header_hash(); - let dispatch_result = protocol::dispatch_tx( - tx, - processed_tx.tx.as_ref(), - TxIndex::must_from_usize(tx_index), - &tx_gas_meter, - &mut self.state, - &mut self.vp_wasm_cache, - &mut self.tx_wasm_cache, - block_proposer, - ); - let tx_gas_meter = tx_gas_meter.into_inner(); - let consumed_gas = tx_gas_meter.get_tx_consumed_gas(); - - // save the gas cost - self.update_tx_gas(tx_hash, consumed_gas.into()); + // Execute wrapper and protocol transactions + let successful_wrappers = self.retrieve_and_execute_transactions( + &native_block_proposer_address, + &req.txs, + ExecutionArgs { + response: &mut response, + changed_keys: &mut changed_keys, + stats: &mut stats, + height, + }, + ); - self.evaluate_tx_result( - &mut response, - dispatch_result, - TxData { - is_atomic_batch, - tx_header: &tx_header, - commitments_len, - tx_index, - replay_protection_hashes, - consumed_gas, - height, - }, - TxLogs { - tx_event: &mut tx_event, - stats: &mut stats, - changed_keys: &mut changed_keys, - }, - ); - response.events.emit(tx_event); - } + // Execute inner transactions + self.execute_tx_batches( + successful_wrappers, + ExecutionArgs { + response: &mut response, + changed_keys: &mut changed_keys, + stats: &mut stats, + height, + }, + ); stats.set_tx_cache_size( self.tx_wasm_cache.get_size(), @@ -524,8 +339,10 @@ where } } - // Evaluate the result of a batch. Commit or drop the storage changes, - // update stats and event, manage replay protection. + // Evaluate the result of a transaction. Commit or drop the storage changes, + // update stats and event, manage replay protection. For successful wrapper + // transactions return the relevant data and delay the evaluation after the + // batch execution fn evaluate_tx_result( &mut self, response: &mut shim::response::FinalizeBlock, @@ -535,14 +352,26 @@ where >, tx_data: TxData<'_>, mut tx_logs: TxLogs<'_>, - ) { + ) -> Option { match dispatch_result { - Ok(tx_result) => self.handle_inner_tx_results( - response, - tx_result, - tx_data, - &mut tx_logs, - ), + Ok(tx_result) => match tx_data.tx.header.tx_type { + TxType::Wrapper(_) => { + // Return withouth emitting any events + return Some(WrapperCache { + tx: tx_data.tx.to_owned(), + tx_index: tx_data.tx_index, + gas_meter: tx_data.tx_gas_meter, + event: tx_logs.tx_event, + tx_result, + }); + } + _ => self.handle_inner_tx_results( + response, + tx_result, + tx_data, + &mut tx_logs, + ), + }, Err(DispatchError { error: protocol::Error::WrapperRunnerError(msg), tx_result: _, @@ -557,7 +386,7 @@ where ); tx_logs .tx_event - .extend(GasUsed(tx_data.consumed_gas)) + .extend(GasUsed(tx_data.tx_gas_meter.get_tx_consumed_gas())) .extend(Info(msg.to_string())) .extend(Code(ResultCode::InvalidTx)); // Make sure to clean the write logs for the next transaction @@ -582,7 +411,7 @@ where tx_logs .tx_event - .extend(GasUsed(tx_data.consumed_gas)) + .extend(GasUsed(tx_data.tx_gas_meter.get_tx_consumed_gas())) .extend(Info(msg.to_string())) .extend(Code(ResultCode::WasmRuntimeError)); @@ -595,6 +424,9 @@ where ); } } + + response.events.emit(tx_logs.tx_event); + None } // Evaluate the results of all the transactions of the batch. Commit or drop @@ -613,7 +445,6 @@ where is_any_tx_invalid, } = temp_log.check_inner_results( &tx_result, - tx_data.tx_header, tx_data.tx_index, tx_data.height, ); @@ -674,7 +505,6 @@ where is_any_tx_invalid: _, } = temp_log.check_inner_results( &tx_result, - tx_data.tx_header, tx_data.tx_index, tx_data.height, ); @@ -718,47 +548,357 @@ where } fn handle_batch_error_reprot(&mut self, err: &Error, tx_data: TxData<'_>) { - // If user transaction didn't fail because of out of gas nor - // invalid section commitment, commit its hash to prevent - // replays - if matches!(tx_data.tx_header.tx_type, TxType::Wrapper(_)) { - if !matches!( - err, - Error::TxApply(protocol::Error::GasError(_)) - | Error::TxApply(protocol::Error::ReplayAttempt(_)) + // If user transaction didn't fail because of out of gas nor replay + // attempt, commit its hash to prevent replays. If it failed because of + // a replay attempt just remove the redundant wrapper hash + if !matches!( + err, + Error::TxApply(protocol::Error::GasError(_)) + | Error::TxApply(protocol::Error::ReplayAttempt(_)) + ) { + self.commit_batch_hash(tx_data.replay_protection_hashes); + } else if let Error::TxApply(protocol::Error::ReplayAttempt(_)) = err { + // Remove the wrapper hash but keep the inner tx + // hash. A replay of the wrapper is impossible since + // the inner tx hash is committed to storage and + // we validate the wrapper against that hash too + let header_hash = tx_data + .replay_protection_hashes + .expect("This cannot fail") + .header_hash; + self.state + .redundant_tx_hash(&header_hash) + .expect("Error while marking tx hash as redundant"); + } + } + + // Get the transactions from the consensus engine, preprocess and execute + // them. Return the cache of successful wrapper transactions later used when + // executing the inner txs. + fn retrieve_and_execute_transactions( + &mut self, + native_block_proposer_address: &Address, + processed_txs: &[shim::request::ProcessedTx], + ExecutionArgs { + response, + changed_keys, + stats, + height, + }: ExecutionArgs<'_>, + ) -> Vec { + let mut successful_wrappers = vec![]; + + for (tx_index, processed_tx) in processed_txs.iter().enumerate() { + let tx = if let Ok(tx) = Tx::try_from(processed_tx.tx.as_ref()) { + tx + } else { + tracing::error!( + "FinalizeBlock received a tx that could not be \ + deserialized to a Tx type. This is likely a protocol \ + transaction." + ); + continue; + }; + + let result_code = ResultCode::from_u32(processed_tx.result.code) + .expect("Result code conversion should not fail"); + + // If [`process_proposal`] rejected a Tx due to invalid signature, + // emit an event here and move on to next tx. + if result_code == ResultCode::InvalidSig { + let base_event = match tx.header().tx_type { + TxType::Wrapper(_) | TxType::Protocol(_) => { + new_tx_event(&tx, height.0) + } + _ => { + tracing::error!( + "Internal logic error: FinalizeBlock received a \ + tx with an invalid signature error code that \ + could not be deserialized to a WrapperTx / \ + ProtocolTx type" + ); + continue; + } + }; + response.events.emit( + base_event + .with(Code(result_code)) + .with(Info(format!( + "Tx rejected: {}", + &processed_tx.result.info + ))) + .with(GasUsed(0.into())), + ); + continue; + } + + if tx.validate_tx().is_err() { + tracing::error!( + "Internal logic error: FinalizeBlock received tx that \ + could not be deserialized to a valid TxType" + ); + continue; + }; + let tx_header = tx.header(); + // If [`process_proposal`] rejected a Tx, emit an event here and + // move on to next tx + if result_code != ResultCode::Ok { + response.events.emit( + new_tx_event(&tx, height.0) + .with(Code(result_code)) + .with(Info(format!( + "Tx rejected: {}", + &processed_tx.result.info + ))) + .with(GasUsed(0.into())), + ); + continue; + } + + let (dispatch_args, tx_gas_meter): ( + DispatchArgs<'_, WasmCacheRwAccess>, + TxGasMeter, + ) = match &tx_header.tx_type { + TxType::Wrapper(wrapper) => { + stats.increment_wrapper_txs(); + + let gas_limit = match Gas::try_from(wrapper.gas_limit) { + Ok(value) => value, + Err(_) => { + response.events.emit( + new_tx_event(&tx, height.0) + .with(Code(ResultCode::InvalidTx)) + .with(Info( + "The wrapper gas limit overflowed gas \ + representation" + .to_owned(), + )) + .with(GasUsed(0.into())), + ); + continue; + } + }; + let tx_gas_meter = TxGasMeter::new(gas_limit); + for cmt in tx.commitments() { + if let Some(code_sec) = tx + .get_section(cmt.code_sechash()) + .and_then(|x| Section::code_sec(x.as_ref())) + { + stats.increment_tx_type( + code_sec.code.hash().to_string(), + ); + } + } + ( + DispatchArgs::Wrapper { + wrapper, + tx_bytes: processed_tx.tx.as_ref(), + block_proposer: native_block_proposer_address, + }, + tx_gas_meter, + ) + } + TxType::Raw => { + tracing::error!( + "Internal logic error: FinalizeBlock received a \ + TxType::Raw transaction" + ); + continue; + } + TxType::Protocol(protocol_tx) => { + match protocol_tx.tx { + ProtocolTxType::BridgePoolVext + | ProtocolTxType::BridgePool + | ProtocolTxType::ValSetUpdateVext + | ProtocolTxType::ValidatorSetUpdate => (), + + ProtocolTxType::EthEventsVext => { + let ext = + ethereum_tx_data_variants::EthEventsVext::try_from(&tx) + .unwrap(); + if self + .mode + .get_validator_address() + .map(|validator| { + validator == &ext.data.validator_addr + }) + .unwrap_or(false) + { + for event in ext.data.ethereum_events.iter() { + self.mode.dequeue_eth_event(event); + } + } + } + ProtocolTxType::EthereumEvents => { + let digest = + ethereum_tx_data_variants::EthereumEvents::try_from( + &tx, + ) + .unwrap(); + if let Some(address) = + self.mode.get_validator_address().cloned() + { + let this_signer = &( + address, + self.state.in_mem().get_last_block_height(), + ); + for MultiSignedEthEvent { event, signers } in + &digest.events + { + if signers.contains(this_signer) { + self.mode.dequeue_eth_event(event); + } + } + } + } + } + ( + DispatchArgs::Protocol(protocol_tx), + TxGasMeter::new_from_sub_limit(0.into()), + ) + } + }; + let tx_event = new_tx_event(&tx, height.0); + let is_atomic_batch = tx.header.atomic; + let commitments_len = tx.commitments().len() as u64; + let tx_hash = tx.header_hash(); + let tx_gas_meter = RefCell::new(tx_gas_meter); + + let dispatch_result = protocol::dispatch_tx( + &tx, + dispatch_args, + &tx_gas_meter, + &mut self.state, + ); + let tx_gas_meter = tx_gas_meter.into_inner(); + let consumed_gas = tx_gas_meter.get_tx_consumed_gas(); + + // save the gas cost + self.update_tx_gas(tx_hash, consumed_gas.into()); + + if let Some(wrapper_cache) = self.evaluate_tx_result( + response, + dispatch_result, + TxData { + is_atomic_batch, + tx: &tx, + commitments_len, + tx_index, + replay_protection_hashes: None, + tx_gas_meter, + height, + }, + TxLogs { + tx_event, + stats, + changed_keys, + }, ) { - self.commit_batch_hash(tx_data.replay_protection_hashes); - } else if let Error::TxApply(protocol::Error::ReplayAttempt(_)) = - err - { - // Remove the wrapper hash but keep the inner tx - // hash. A replay of the wrapper is impossible since - // the inner tx hash is committed to storage and - // we validate the wrapper against that hash too - let header_hash = tx_data - .replay_protection_hashes - .expect("This cannot fail") - .header_hash; - self.state - .redundant_tx_hash(&header_hash) - .expect("Error while marking tx hash as redundant"); + successful_wrappers.push(wrapper_cache); } } + + successful_wrappers + } + + // Execute the transaction batches for successful wrapper transactions + fn execute_tx_batches( + &mut self, + successful_wrappers: Vec, + ExecutionArgs { + response, + changed_keys, + stats, + height, + }: ExecutionArgs<'_>, + ) { + for WrapperCache { + mut tx, + tx_index, + gas_meter: tx_gas_meter, + event: tx_event, + tx_result: wrapper_tx_result, + } in successful_wrappers + { + let tx_hash = tx.header_hash(); + let is_atomic_batch = tx.header.atomic; + let commitments_len = tx.commitments().len() as u64; + let replay_protection_hashes = Some(ReplayProtectionHashes { + raw_header_hash: tx.raw_header_hash(), + header_hash: tx.header_hash(), + }); + + // change tx type to raw for execution + tx.update_header(TxType::Raw); + let tx_gas_meter = RefCell::new(tx_gas_meter); + let dispatch_result = protocol::dispatch_tx( + &tx, + DispatchArgs::Raw { + tx_index: TxIndex::must_from_usize(tx_index), + wrapper_tx_result: Some(wrapper_tx_result), + vp_wasm_cache: &mut self.vp_wasm_cache, + tx_wasm_cache: &mut self.tx_wasm_cache, + }, + &tx_gas_meter, + &mut self.state, + ); + let tx_gas_meter = tx_gas_meter.into_inner(); + let consumed_gas = tx_gas_meter.get_tx_consumed_gas(); + + // update the gas cost of the corresponding wrapper + self.update_tx_gas(tx_hash, consumed_gas.into()); + + self.evaluate_tx_result( + response, + dispatch_result, + TxData { + is_atomic_batch, + tx: &tx, + commitments_len, + tx_index, + replay_protection_hashes, + tx_gas_meter, + height, + }, + TxLogs { + tx_event, + stats, + changed_keys, + }, + ); + } } } +struct ExecutionArgs<'finalize> { + response: &'finalize mut shim::response::FinalizeBlock, + changed_keys: &'finalize mut BTreeSet, + stats: &'finalize mut InternalStats, + height: BlockHeight, +} + +// Caches the execution of a wrapper transaction to be used when later executing +// the inner batch +struct WrapperCache { + tx: Tx, + tx_index: usize, + gas_meter: TxGasMeter, + event: Event, + tx_result: namada::tx::data::TxResult, +} + struct TxData<'tx> { is_atomic_batch: bool, - tx_header: &'tx namada::tx::Header, + tx: &'tx Tx, commitments_len: u64, tx_index: usize, replay_protection_hashes: Option, - consumed_gas: Gas, + tx_gas_meter: TxGasMeter, height: BlockHeight, } struct TxLogs<'finalize> { - tx_event: &'finalize mut Event, + tx_event: Event, stats: &'finalize mut InternalStats, changed_keys: &'finalize mut BTreeSet, } @@ -818,7 +958,6 @@ impl<'finalize> TempTxLogs { fn check_inner_results( &mut self, tx_result: &namada::tx::data::TxResult, - tx_header: &namada::tx::Header, tx_index: usize, height: BlockHeight, ) -> ValidityFlags { @@ -876,13 +1015,10 @@ impl<'finalize> TempTxLogs { } } Err(e) => { - // this branch can only be reached by inner txs tracing::trace!("Inner tx {} failed: {}", cmt_hash, e); // If inner transaction didn't fail because of invalid // section commitment, commit its hash to prevent replays - if matches!(tx_header.tx_type, TxType::Wrapper(_)) - && !matches!(e, protocol::Error::MissingSection(_)) - { + if !matches!(e, protocol::Error::MissingSection(_)) { flags.commit_batch_hash = true; } @@ -1171,9 +1307,8 @@ mod test_finalize_block { ) } - /// Check that if a wrapper tx was rejected by [`process_proposal`], - /// check that the correct event is returned. Check that it does - /// not appear in the queue of txs to be decrypted + /// Check that if a wrapper tx was rejected by [`process_proposal`], the + /// correct event is returned. #[test] fn test_process_proposal_rejected_wrapper_tx() { let (mut shell, _, _, _) = setup(); @@ -1190,24 +1325,36 @@ mod test_finalize_block { ) .unwrap(); + // Need ordered tx hashes because the events can be emitted out of order + let mut ordered_hashes = vec![]; // create some wrapper txs for i in 0u64..4 { - let (_, mut processed_tx) = mk_wrapper_tx(&shell, &keypair); + let (tx, mut processed_tx) = mk_wrapper_tx(&shell, &keypair); processed_tx.result.code = u32::try_from(i.rem_euclid(2)).unwrap(); processed_txs.push(processed_tx); + ordered_hashes.push(tx.header_hash()); } // check that the correct events were created - for (index, event) in shell + for event in shell .finalize_block(FinalizeBlock { txs: processed_txs.clone(), ..Default::default() }) .expect("Test failed") .iter() - .enumerate() { assert_eq!(*event.kind(), APPLIED_TX); + let hash = event.read_attribute::().expect("Test failed"); + let index = ordered_hashes + .iter() + .enumerate() + .find_map( + |(idx, tx_hash)| { + if tx_hash == &hash { Some(idx) } else { None } + }, + ) + .unwrap(); let code = event .read_attribute::() .expect("Test failed") @@ -3098,10 +3245,10 @@ mod test_finalize_block { // transaction } - /// Test that if a transaction fails because of out-of-gas, - /// invalid signature or wrong section commitment, its hash - /// is not committed to storage. Also checks that a tx failing for other - /// reason has its hash written to storage. + /// Test that if a transaction fails because of out-of-gas, invalid + /// signature or wrong section commitment, its hash is not committed to + /// storage. Also checks that a tx failing for other reasons has its + /// hash written to storage. #[test] fn test_tx_hash_handling() { let (mut shell, _broadcaster, _, _) = setup(); diff --git a/crates/node/src/shell/governance.rs b/crates/node/src/shell/governance.rs index dbde3d0204..18a68e0218 100644 --- a/crates/node/src/shell/governance.rs +++ b/crates/node/src/shell/governance.rs @@ -403,16 +403,16 @@ where let cmt = tx.first_commitments().unwrap().to_owned(); let dispatch_result = protocol::dispatch_tx( - tx, - &[], /* this is used to compute the fee - * based on the code size. We dont - * need it here. */ - TxIndex::default(), - &RefCell::new(TxGasMeter::new_from_sub_limit(u64::MAX.into())), /* No gas limit for governance proposal */ + &tx, + protocol::DispatchArgs::Raw { + tx_index: TxIndex::default(), + wrapper_tx_result: None, + vp_wasm_cache: &mut shell.vp_wasm_cache, + tx_wasm_cache: &mut shell.tx_wasm_cache, + }, + // No gas limit for governance proposal + &RefCell::new(TxGasMeter::new_from_sub_limit(u64::MAX.into())), &mut shell.state, - &mut shell.vp_wasm_cache, - &mut shell.tx_wasm_cache, - None, ); shell .state diff --git a/crates/tests/src/integration/ledger_tests.rs b/crates/tests/src/integration/ledger_tests.rs index 40816bdf67..04138c2eff 100644 --- a/crates/tests/src/integration/ledger_tests.rs +++ b/crates/tests/src/integration/ledger_tests.rs @@ -5,13 +5,15 @@ use assert_matches::assert_matches; use borsh_ext::BorshSerializeExt; use color_eyre::eyre::Result; use data_encoding::HEXLOWER; +use namada::account::AccountPublicKeysMap; use namada::core::collections::HashMap; -use namada::token; -use namada_apps_lib::wallet::defaults; +use namada::token::{self, Amount, DenominatedAmount}; +use namada_apps_lib::wallet::defaults::{self, albert_keypair}; use namada_core::dec::Dec; use namada_core::storage::Epoch; use namada_core::token::NATIVE_MAX_DECIMAL_PLACES; use namada_node::shell::testing::client::run; +use namada_node::shell::testing::node::NodeResults; use namada_node::shell::testing::utils::{Bin, CapturedOutput}; use namada_sdk::tx::{TX_TRANSFER_WASM, VP_USER_WASM}; use namada_test_utils::TestWasms; @@ -1578,3 +1580,218 @@ fn change_validator_metadata() -> Result<()> { Ok(()) } + +// Test that fee payment is enforced and aligned with process proposal. The test +// generates a tx that subtract funds from the fee payer of a following tx. Test +// that wrappers (and fee payments) are evaluated before the inner transactions. +#[test] +fn enforce_fee_payment() -> Result<()> { + // This address doesn't matter for tests. But an argument is required. + let validator_one_rpc = "http://127.0.0.1:26567"; + // 1. start the ledger node + let (node, _services) = setup::setup()?; + + let tempdir = tempfile::tempdir().unwrap(); + let mut txs_bytes = vec![]; + + let captured = CapturedOutput::of(|| { + run( + &node, + Bin::Client, + vec![ + "balance", + "--owner", + ALBERT_KEY, + "--token", + NAM, + "--node", + validator_one_rpc, + ], + ) + }); + assert!(captured.result.is_ok()); + assert!(captured.contains("nam: 2000000")); + + run( + &node, + Bin::Client, + vec![ + "transfer", + "--source", + ALBERT_KEY, + "--target", + BERTHA, + "--token", + NAM, + "--amount", + // We want this transaction to consume all the remaining available + // balance. If we executed the inner txs right after the + // corresponding wrapper's fee payment this would succeed (but + // this is not the case) + "1900000", + "--output-folder-path", + tempdir.path().to_str().unwrap(), + "--dump-tx", + "--ledger-address", + validator_one_rpc, + ], + )?; + node.assert_success(); + let file_path = tempdir + .path() + .read_dir() + .unwrap() + .next() + .unwrap() + .unwrap() + .path(); + txs_bytes.push(std::fs::read(&file_path).unwrap()); + std::fs::remove_file(&file_path).unwrap(); + + run( + &node, + Bin::Client, + vec![ + "transfer", + "--source", + ALBERT_KEY, + "--target", + CHRISTEL, + "--token", + NAM, + "--amount", + "50", + "--gas-payer", + ALBERT_KEY, + "--output-folder-path", + tempdir.path().to_str().unwrap(), + "--dump-tx", + "--ledger-address", + validator_one_rpc, + ], + )?; + node.assert_success(); + let file_path = tempdir + .path() + .read_dir() + .unwrap() + .next() + .unwrap() + .unwrap() + .path(); + txs_bytes.push(std::fs::read(&file_path).unwrap()); + std::fs::remove_file(&file_path).unwrap(); + + let sk = albert_keypair(); + let pk = sk.to_public(); + + let native_token = node + .shell + .lock() + .unwrap() + .state + .in_mem() + .native_token + .clone(); + + let mut txs = vec![]; + for bytes in txs_bytes { + let mut tx = namada::tx::Tx::deserialize(&bytes).unwrap(); + tx.add_wrapper( + namada::tx::data::wrapper::Fee { + amount_per_gas_unit: DenominatedAmount::native( + Amount::native_whole(1), + ), + token: native_token.clone(), + }, + pk.clone(), + 100_000.into(), + ); + tx.sign_raw(vec![sk.clone()], AccountPublicKeysMap::default(), None); + tx.sign_wrapper(sk.clone()); + + txs.push(tx.to_bytes()); + } + + node.clear_results(); + node.submit_txs(txs); + { + let results = node.results.lock().unwrap(); + // If empty than failed in process proposal + assert!(!results.is_empty()); + + for result in results.iter() { + assert!(matches!(result, NodeResults::Ok)); + } + } + // Finalize the next block to execute the txs + node.clear_results(); + node.finalize_and_commit(); + { + let results = node.results.lock().unwrap(); + for result in results.iter() { + assert!(matches!(result, NodeResults::Ok)); + } + } + + // Assert balances + let captured = CapturedOutput::of(|| { + run( + &node, + Bin::Client, + vec![ + "balance", + "--owner", + ALBERT_KEY, + "--token", + NAM, + "--node", + validator_one_rpc, + ], + ) + }); + assert!(captured.result.is_ok()); + // This is the result of the two fee payemnts and the successful transfer to + // Christel + assert!(captured.contains("nam: 1799950")); + + let captured = CapturedOutput::of(|| { + run( + &node, + Bin::Client, + vec![ + "balance", + "--owner", + BERTHA, + "--token", + NAM, + "--node", + validator_one_rpc, + ], + ) + }); + assert!(captured.result.is_ok()); + // Bertha must not receive anything because the transaction fails. This is + // because we evaluate fee payments before the inner transactions, so by the + // time we execute the transfer, Albert doesn't have enough funds anynmore + assert!(captured.contains("nam: 2000000")); + + let captured = CapturedOutput::of(|| { + run( + &node, + Bin::Client, + vec![ + "balance", + "--owner", + CHRISTEL, + "--token", + NAM, + "--node", + validator_one_rpc, + ], + ) + }); + assert!(captured.result.is_ok()); + assert!(captured.contains("nam: 2000050")); + Ok(()) +} diff --git a/crates/tx/src/data/protocol.rs b/crates/tx/src/data/protocol.rs index 33995d8a81..fd994cb47c 100644 --- a/crates/tx/src/data/protocol.rs +++ b/crates/tx/src/data/protocol.rs @@ -56,6 +56,7 @@ impl ProtocolTx { } #[derive( + Copy, Clone, Debug, BorshSerialize, @@ -66,7 +67,6 @@ impl ProtocolTx { Deserialize, PartialEq, )] -#[allow(clippy::large_enum_variant)] /// Types of protocol messages to be sent pub enum ProtocolTxType { /// Ethereum events contained in vote extensions that