diff --git a/Cargo.lock b/Cargo.lock index 1f5120f54c..fc627be2ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3420,6 +3420,7 @@ dependencies = [ "ipnet", "itertools 0.11.0", "log", + "once_cell", "parking_lot", "rand 0.8.5", "rlimit", @@ -3803,6 +3804,7 @@ name = "kaspad" version = "0.14.1" dependencies = [ "async-channel 2.2.1", + "cfg-if 1.0.0", "clap 4.5.4", "dhat", "dirs", @@ -5639,6 +5641,7 @@ name = "simpa" version = "0.14.1" dependencies = [ "async-channel 2.2.1", + "cfg-if 1.0.0", "clap 4.5.4", "dhat", "futures", diff --git a/consensus/core/src/api/mod.rs b/consensus/core/src/api/mod.rs index 4d8fbc3b4d..6df33579c7 100644 --- a/consensus/core/src/api/mod.rs +++ b/consensus/core/src/api/mod.rs @@ -190,6 +190,10 @@ pub trait ConsensusApi: Send + Sync { unimplemented!() } + fn calc_transaction_hash_merkle_root(&self, txs: &[Transaction], pov_daa_score: u64) -> Hash { + unimplemented!() + } + fn validate_pruning_proof(&self, proof: &PruningPointProof) -> PruningImportResult<()> { unimplemented!() } diff --git a/consensus/core/src/block.rs b/consensus/core/src/block.rs index dde6fd5e75..cbd76b42dc 100644 --- a/consensus/core/src/block.rs +++ b/consensus/core/src/block.rs @@ -5,6 +5,7 @@ use crate::{ BlueWorkType, }; use kaspa_hashes::Hash; +use kaspa_utils::mem_size::MemSizeEstimator; use std::sync::Arc; /// A mutable block structure where header and transactions within can still be mutated. @@ -66,6 +67,20 @@ impl Block { pub fn from_precomputed_hash(hash: Hash, parents: Vec) -> Block { Block::from_header(Header::from_precomputed_hash(hash, parents)) } + + pub fn asses_for_cache(&self) -> Option<()> { + (self.estimate_mem_bytes() < 1_000_000).then_some(()) + } +} + +impl MemSizeEstimator for Block { + fn estimate_mem_bytes(&self) -> usize { + // Calculates mem bytes of the block (for cache tracking purposes) + size_of::() + + self.header.estimate_mem_bytes() + + size_of::>() + + self.transactions.iter().map(Transaction::estimate_mem_bytes).sum::() + } } /// An abstraction for a recallable transaction selector with persistent state @@ -105,6 +120,8 @@ pub struct BlockTemplate { pub selected_parent_timestamp: u64, pub selected_parent_daa_score: u64, pub selected_parent_hash: Hash, + /// Expected length is one less than txs length due to lack of coinbase transaction + pub calculated_fees: Vec, } impl BlockTemplate { @@ -115,8 +132,17 @@ impl BlockTemplate { selected_parent_timestamp: u64, selected_parent_daa_score: u64, selected_parent_hash: Hash, + calculated_fees: Vec, ) -> Self { - Self { block, miner_data, coinbase_has_red_reward, selected_parent_timestamp, selected_parent_daa_score, selected_parent_hash } + Self { + block, + miner_data, + coinbase_has_red_reward, + selected_parent_timestamp, + selected_parent_daa_score, + selected_parent_hash, + calculated_fees, + } } pub fn to_virtual_state_approx_id(&self) -> VirtualStateApproxId { diff --git a/consensus/core/src/config/genesis.rs b/consensus/core/src/config/genesis.rs index 204098ad2a..9f9ea21e54 100644 --- a/consensus/core/src/config/genesis.rs +++ b/consensus/core/src/config/genesis.rs @@ -231,7 +231,7 @@ mod tests { fn test_genesis_hashes() { [GENESIS, TESTNET_GENESIS, TESTNET11_GENESIS, SIMNET_GENESIS, DEVNET_GENESIS].into_iter().for_each(|genesis| { let block: Block = (&genesis).into(); - assert_hashes_eq(calc_hash_merkle_root(block.transactions.iter()), block.header.hash_merkle_root); + assert_hashes_eq(calc_hash_merkle_root(block.transactions.iter(), false), block.header.hash_merkle_root); assert_hashes_eq(block.hash(), genesis.hash); }); } diff --git a/consensus/core/src/errors/tx.rs b/consensus/core/src/errors/tx.rs index 1aeb5220fe..f21409857f 100644 --- a/consensus/core/src/errors/tx.rs +++ b/consensus/core/src/errors/tx.rs @@ -1,4 +1,5 @@ use crate::constants::MAX_SOMPI; +use crate::subnets::SubnetworkId; use crate::tx::TransactionOutpoint; use kaspa_txscript_errors::TxScriptError; use thiserror::Error; @@ -92,6 +93,9 @@ pub enum TxRuleError { #[error("calculated contextual mass (including storage mass) {0} is not equal to the committed mass field {1}")] WrongMass(u64, u64), + #[error("transaction subnetwork id {0} is neither native nor coinbase")] + SubnetworksDisabled(SubnetworkId), + /// [`TxRuleError::FeerateTooLow`] is not a consensus error but a mempool error triggered by the /// fee/mass RBF validation rule #[error("fee rate per contextual mass gram is not greater than the fee rate of the replaced transaction")] diff --git a/consensus/core/src/header.rs b/consensus/core/src/header.rs index b6c2b9bc7e..b57337afc9 100644 --- a/consensus/core/src/header.rs +++ b/consensus/core/src/header.rs @@ -1,6 +1,7 @@ use crate::{hashing, BlueWorkType}; use borsh::{BorshDeserialize, BorshSerialize}; use kaspa_hashes::Hash; +use kaspa_utils::mem_size::MemSizeEstimator; use serde::{Deserialize, Serialize}; /// @category Consensus @@ -92,6 +93,12 @@ impl Header { } } +impl MemSizeEstimator for Header { + fn estimate_mem_bytes(&self) -> usize { + size_of::() + self.parents_by_level.iter().map(|l| l.len()).sum::() * size_of::() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/consensus/core/src/merkle.rs b/consensus/core/src/merkle.rs index cfd5c90451..59c6ca7c4c 100644 --- a/consensus/core/src/merkle.rs +++ b/consensus/core/src/merkle.rs @@ -2,14 +2,10 @@ use crate::{hashing, tx::Transaction}; use kaspa_hashes::Hash; use kaspa_merkle::calc_merkle_root; -pub fn calc_hash_merkle_root_with_options<'a>(txs: impl ExactSizeIterator, include_mass_field: bool) -> Hash { +pub fn calc_hash_merkle_root<'a>(txs: impl ExactSizeIterator, include_mass_field: bool) -> Hash { calc_merkle_root(txs.map(|tx| hashing::tx::hash(tx, include_mass_field))) } -pub fn calc_hash_merkle_root<'a>(txs: impl ExactSizeIterator) -> Hash { - calc_merkle_root(txs.map(|tx| hashing::tx::hash(tx, false))) -} - #[cfg(test)] mod tests { use crate::merkle::calc_hash_merkle_root; @@ -242,7 +238,7 @@ mod tests { ), ]; assert_eq!( - calc_hash_merkle_root(txs.iter()), + calc_hash_merkle_root(txs.iter(), false), Hash::from_slice(&[ 0x46, 0xec, 0xf4, 0x5b, 0xe3, 0xba, 0xca, 0x34, 0x9d, 0xfe, 0x8a, 0x78, 0xde, 0xaf, 0x05, 0x3b, 0x0a, 0xa6, 0xd5, 0x38, 0x97, 0x4d, 0xa5, 0x0f, 0xd6, 0xef, 0xb4, 0xd2, 0x66, 0xbc, 0x8d, 0x21, diff --git a/consensus/core/src/subnets.rs b/consensus/core/src/subnets.rs index 2456f84444..756c4d40a8 100644 --- a/consensus/core/src/subnets.rs +++ b/consensus/core/src/subnets.rs @@ -4,7 +4,7 @@ use std::str::{self, FromStr}; use borsh::{BorshDeserialize, BorshSerialize}; use kaspa_utils::hex::{FromHex, ToHex}; use kaspa_utils::{serde_impl_deser_fixed_bytes_ref, serde_impl_ser_fixed_bytes_ref}; -use thiserror::Error; +use thiserror::Error; /// The size of the array used to store subnetwork IDs. pub const SUBNETWORK_ID_SIZE: usize = 20; @@ -59,35 +59,34 @@ impl SubnetworkId { *self == SUBNETWORK_ID_COINBASE || *self == SUBNETWORK_ID_REGISTRY } + /// Returns true if the subnetwork is the native subnetwork + #[inline] + pub fn is_native(&self) -> bool { + *self == SUBNETWORK_ID_NATIVE + } + /// Returns true if the subnetwork is the native or a built-in subnetwork #[inline] pub fn is_builtin_or_native(&self) -> bool { - *self == SUBNETWORK_ID_NATIVE || self.is_builtin() + self.is_native() || self.is_builtin() } } -#[derive(Error, Debug, Clone)] -pub enum SubnetworkConversionError { - #[error("Invalid bytes")] - InvalidBytes, - - #[error(transparent)] - SliceError(#[from] std::array::TryFromSliceError), - - #[error(transparent)] - HexError(#[from] faster_hex::Error), -} - +#[derive(Error, Debug, Clone)] +pub enum SubnetworkConversionError { + #[error(transparent)] + SliceError(#[from] std::array::TryFromSliceError), + + #[error(transparent)] + HexError(#[from] faster_hex::Error), +} + impl TryFrom<&[u8]> for SubnetworkId { - type Error = SubnetworkConversionError; + type Error = SubnetworkConversionError; fn try_from(value: &[u8]) -> Result { let bytes = <[u8; SUBNETWORK_ID_SIZE]>::try_from(value)?; - if bytes != Self::from_byte(0).0 && bytes != Self::from_byte(1).0 { - Err(Self::Error::InvalidBytes) - } else { - Ok(Self(bytes)) - } + Ok(Self(bytes)) } } @@ -109,30 +108,22 @@ impl ToHex for SubnetworkId { } impl FromStr for SubnetworkId { - type Err = SubnetworkConversionError; + type Err = SubnetworkConversionError; #[inline] fn from_str(hex_str: &str) -> Result { let mut bytes = [0u8; SUBNETWORK_ID_SIZE]; faster_hex::hex_decode(hex_str.as_bytes(), &mut bytes)?; - if bytes != Self::from_byte(0).0 && bytes != Self::from_byte(1).0 { - Err(Self::Err::InvalidBytes) - } else { - Ok(Self(bytes)) - } + Ok(Self(bytes)) } } impl FromHex for SubnetworkId { - type Error = SubnetworkConversionError; + type Error = SubnetworkConversionError; fn from_hex(hex_str: &str) -> Result { let mut bytes = [0u8; SUBNETWORK_ID_SIZE]; faster_hex::hex_decode(hex_str.as_bytes(), &mut bytes)?; - if bytes != Self::from_byte(0).0 && bytes != Self::from_byte(1).0 { - Err(Self::Error::InvalidBytes) - } else { - Ok(Self(bytes)) - } + Ok(Self(bytes)) } } diff --git a/consensus/core/src/tx.rs b/consensus/core/src/tx.rs index 67e09df738..3595dcb8b9 100644 --- a/consensus/core/src/tx.rs +++ b/consensus/core/src/tx.rs @@ -230,6 +230,23 @@ impl Transaction { } } +impl MemSizeEstimator for Transaction { + fn estimate_mem_bytes(&self) -> usize { + // Calculates mem bytes of the transaction (for cache tracking purposes) + size_of::() + + self.payload.len() + + self + .inputs + .iter() + .map(|i| i.signature_script.len() + size_of::()) + .chain(self.outputs.iter().map(|o| { + // size_of::() already counts SCRIPT_VECTOR_SIZE bytes within, so we only add the delta + o.script_public_key.script().len().saturating_sub(SCRIPT_VECTOR_SIZE) + size_of::() + })) + .sum::() + } +} + /// Represents any kind of transaction which has populated UTXO entry data and can be verified/signed etc pub trait VerifiableTransaction { fn tx(&self) -> &Transaction; diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 7215c331f3..c37f9bd0b9 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -58,6 +58,7 @@ use kaspa_consensus_core::{ tx::TxResult, }, header::Header, + merkle::calc_hash_merkle_root, muhash::MuHashExtensions, network::NetworkType, pruning::{PruningPointProof, PruningPointTrustedData, PruningPointsList}, @@ -675,6 +676,11 @@ impl ConsensusApi for Consensus { self.services.coinbase_manager.modify_coinbase_payload(payload, miner_data) } + fn calc_transaction_hash_merkle_root(&self, txs: &[Transaction], pov_daa_score: u64) -> Hash { + let storage_mass_activated = pov_daa_score > self.config.storage_mass_activation_daa_score; + calc_hash_merkle_root(txs.iter(), storage_mass_activated) + } + fn validate_pruning_proof(&self, proof: &PruningPointProof) -> Result<(), PruningImportError> { self.services.pruning_proof_manager.validate_pruning_point_proof(proof) } diff --git a/consensus/src/consensus/test_consensus.rs b/consensus/src/consensus/test_consensus.rs index a937388ba9..472bdbd835 100644 --- a/consensus/src/consensus/test_consensus.rs +++ b/consensus/src/consensus/test_consensus.rs @@ -176,7 +176,7 @@ impl TestConsensus { let cb = Transaction::new(TX_VERSION, vec![], vec![], 0, SUBNETWORK_ID_COINBASE, 0, cb_payload); txs.insert(0, cb); - header.hash_merkle_root = calc_hash_merkle_root(txs.iter()); + header.hash_merkle_root = calc_hash_merkle_root(txs.iter(), false); MutableBlock::new(header, txs) } diff --git a/consensus/src/model/stores/headers.rs b/consensus/src/model/stores/headers.rs index b0c25b5960..64e10a90be 100644 --- a/consensus/src/model/stores/headers.rs +++ b/consensus/src/model/stores/headers.rs @@ -29,9 +29,7 @@ pub struct HeaderWithBlockLevel { impl MemSizeEstimator for HeaderWithBlockLevel { fn estimate_mem_bytes(&self) -> usize { - size_of::
() - + self.header.parents_by_level.iter().map(|l| l.len()).sum::() * size_of::() - + size_of::() + self.header.as_ref().estimate_mem_bytes() + size_of::() } } diff --git a/consensus/src/pipeline/body_processor/body_validation_in_context.rs b/consensus/src/pipeline/body_processor/body_validation_in_context.rs index 2425556d0e..042410fa85 100644 --- a/consensus/src/pipeline/body_processor/body_validation_in_context.rs +++ b/consensus/src/pipeline/body_processor/body_validation_in_context.rs @@ -94,13 +94,17 @@ mod tests { }; use kaspa_consensus_core::{ api::ConsensusApi, - merkle::calc_hash_merkle_root, + merkle::calc_hash_merkle_root as calc_hash_merkle_root_with_options, subnets::SUBNETWORK_ID_NATIVE, tx::{Transaction, TransactionInput, TransactionOutpoint}, }; use kaspa_core::assert_match; use kaspa_hashes::Hash; + fn calc_hash_merkle_root<'a>(txs: impl ExactSizeIterator) -> Hash { + calc_hash_merkle_root_with_options(txs, false) + } + #[tokio::test] async fn validate_body_in_context_test() { let config = ConfigBuilder::new(DEVNET_PARAMS) diff --git a/consensus/src/pipeline/body_processor/body_validation_in_isolation.rs b/consensus/src/pipeline/body_processor/body_validation_in_isolation.rs index e5a51d8154..c413552b99 100644 --- a/consensus/src/pipeline/body_processor/body_validation_in_isolation.rs +++ b/consensus/src/pipeline/body_processor/body_validation_in_isolation.rs @@ -2,7 +2,7 @@ use std::{collections::HashSet, sync::Arc}; use super::BlockBodyProcessor; use crate::errors::{BlockProcessResult, RuleError}; -use kaspa_consensus_core::{block::Block, merkle::calc_hash_merkle_root_with_options, tx::TransactionOutpoint}; +use kaspa_consensus_core::{block::Block, merkle::calc_hash_merkle_root, tx::TransactionOutpoint}; impl BlockBodyProcessor { pub fn validate_body_in_isolation(self: &Arc, block: &Block) -> BlockProcessResult { @@ -29,7 +29,7 @@ impl BlockBodyProcessor { } fn check_hash_merkle_root(block: &Block, storage_mass_activated: bool) -> BlockProcessResult<()> { - let calculated = calc_hash_merkle_root_with_options(block.transactions.iter(), storage_mass_activated); + let calculated = calc_hash_merkle_root(block.transactions.iter(), storage_mass_activated); if calculated != block.header.hash_merkle_root { return Err(RuleError::BadMerkleRoot(block.header.hash_merkle_root, calculated)); } @@ -137,13 +137,17 @@ mod tests { api::{BlockValidationFutures, ConsensusApi}, block::MutableBlock, header::Header, - merkle::calc_hash_merkle_root, + merkle::calc_hash_merkle_root as calc_hash_merkle_root_with_options, subnets::{SUBNETWORK_ID_COINBASE, SUBNETWORK_ID_NATIVE}, tx::{scriptvec, ScriptPublicKey, Transaction, TransactionId, TransactionInput, TransactionOutpoint, TransactionOutput}, }; use kaspa_core::assert_match; use kaspa_hashes::Hash; + fn calc_hash_merkle_root<'a>(txs: impl ExactSizeIterator) -> Hash { + calc_hash_merkle_root_with_options(txs, false) + } + #[test] fn validate_body_in_isolation_test() { let consensus = TestConsensus::new(&Config::new(MAINNET_PARAMS)); diff --git a/consensus/src/pipeline/virtual_processor/processor.rs b/consensus/src/pipeline/virtual_processor/processor.rs index 0d6d9989cb..6596f624c7 100644 --- a/consensus/src/pipeline/virtual_processor/processor.rs +++ b/consensus/src/pipeline/virtual_processor/processor.rs @@ -54,7 +54,7 @@ use kaspa_consensus_core::{ coinbase::MinerData, config::genesis::GenesisBlock, header::Header, - merkle::calc_hash_merkle_root_with_options, + merkle::calc_hash_merkle_root, pruning::PruningPointsList, tx::{MutableTransaction, Transaction}, utxo::{ @@ -77,8 +77,10 @@ use kaspa_hashes::Hash; use kaspa_muhash::MuHash; use kaspa_notify::{events::EventType, notifier::Notify}; +use super::errors::{PruningImportError, PruningImportResult}; use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender}; use itertools::Itertools; +use kaspa_consensus_core::tx::ValidatedTransaction; use kaspa_utils::binary_heap::BinaryHeapExtensions; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use rand::{seq::SliceRandom, Rng}; @@ -94,8 +96,6 @@ use std::{ sync::{atomic::Ordering, Arc}, }; -use super::errors::{PruningImportError, PruningImportResult}; - pub struct VirtualStateProcessor { // Channels receiver: CrossbeamReceiver, @@ -833,12 +833,9 @@ impl VirtualStateProcessor { txs: &[Transaction], virtual_state: &VirtualState, utxo_view: &V, - ) -> Vec> { - self.thread_pool.install(|| { - txs.par_iter() - .map(|tx| self.validate_block_template_transaction(tx, virtual_state, &utxo_view)) - .collect::>>() - }) + ) -> Vec> { + self.thread_pool + .install(|| txs.par_iter().map(|tx| self.validate_block_template_transaction(tx, virtual_state, &utxo_view)).collect()) } fn validate_block_template_transaction( @@ -846,13 +843,14 @@ impl VirtualStateProcessor { tx: &Transaction, virtual_state: &VirtualState, utxo_view: &impl UtxoView, - ) -> TxResult<()> { + ) -> TxResult { // No need to validate the transaction in isolation since we rely on the mining manager to submit transactions // which were previously validated through `validate_mempool_transaction_and_populate`, hence we only perform // in-context validations self.transaction_validator.utxo_free_tx_validation(tx, virtual_state.daa_score, virtual_state.past_median_time)?; - self.validate_transaction_in_utxo_context(tx, utxo_view, virtual_state.daa_score, TxValidationFlags::Full)?; - Ok(()) + let ValidatedTransaction { calculated_fee, .. } = + self.validate_transaction_in_utxo_context(tx, utxo_view, virtual_state.daa_score, TxValidationFlags::Full)?; + Ok(calculated_fee) } pub fn build_block_template( @@ -869,7 +867,7 @@ impl VirtualStateProcessor { // optimizing for the common case where all txs are valid. Following selection calls // are called within the lock in order to preserve validness of already validated txs let mut txs = tx_selector.select_transactions(); - + let mut calculated_fees = Vec::with_capacity(txs.len()); let virtual_read = self.virtual_stores.read(); let virtual_state = virtual_read.state.get().unwrap(); let virtual_utxo_view = &virtual_read.utxo_set; @@ -877,9 +875,14 @@ impl VirtualStateProcessor { let mut invalid_transactions = HashMap::new(); let results = self.validate_block_template_transactions_in_parallel(&txs, &virtual_state, &virtual_utxo_view); for (tx, res) in txs.iter().zip(results) { - if let Err(e) = res { - invalid_transactions.insert(tx.id(), e); - tx_selector.reject_selection(tx.id()); + match res { + Err(e) => { + invalid_transactions.insert(tx.id(), e); + tx_selector.reject_selection(tx.id()); + } + Ok(fee) => { + calculated_fees.push(fee); + } } } @@ -894,12 +897,16 @@ impl VirtualStateProcessor { let next_batch_results = self.validate_block_template_transactions_in_parallel(&next_batch, &virtual_state, &virtual_utxo_view); for (tx, res) in next_batch.into_iter().zip(next_batch_results) { - if let Err(e) = res { - invalid_transactions.insert(tx.id(), e); - tx_selector.reject_selection(tx.id()); - has_rejections = true; - } else { - txs.push(tx); + match res { + Err(e) => { + invalid_transactions.insert(tx.id(), e); + tx_selector.reject_selection(tx.id()); + has_rejections = true; + } + Ok(fee) => { + txs.push(tx); + calculated_fees.push(fee); + } } } } @@ -916,7 +923,7 @@ impl VirtualStateProcessor { drop(virtual_read); // Build the template - self.build_block_template_from_virtual_state(virtual_state, miner_data, txs) + self.build_block_template_from_virtual_state(virtual_state, miner_data, txs, calculated_fees) } pub(crate) fn validate_block_template_transactions( @@ -944,6 +951,7 @@ impl VirtualStateProcessor { virtual_state: Arc, miner_data: MinerData, mut txs: Vec, + calculated_fees: Vec, ) -> Result { // [`calc_block_parents`] can use deep blocks below the pruning point for this calculation, so we // need to hold the pruning lock. @@ -967,7 +975,7 @@ impl VirtualStateProcessor { // Hash according to hardfork activation let storage_mass_activated = virtual_state.daa_score > self.storage_mass_activation_daa_score; - let hash_merkle_root = calc_hash_merkle_root_with_options(txs.iter(), storage_mass_activated); + let hash_merkle_root = calc_hash_merkle_root(txs.iter(), storage_mass_activated); let accepted_id_merkle_root = kaspa_merkle::calc_merkle_root(virtual_state.accepted_tx_ids.iter().copied()); let utxo_commitment = virtual_state.multiset.clone().finalize(); @@ -997,6 +1005,7 @@ impl VirtualStateProcessor { selected_parent_timestamp, selected_parent_daa_score, selected_parent_hash, + calculated_fees, )) } diff --git a/consensus/src/pipeline/virtual_processor/test_block_builder.rs b/consensus/src/pipeline/virtual_processor/test_block_builder.rs index 872bf15b40..2654a6a5fe 100644 --- a/consensus/src/pipeline/virtual_processor/test_block_builder.rs +++ b/consensus/src/pipeline/virtual_processor/test_block_builder.rs @@ -61,6 +61,6 @@ impl TestBlockBuilder { let pov_virtual_utxo_view = (&virtual_read.utxo_set).compose(accumulated_diff); self.validate_block_template_transactions(&txs, &pov_virtual_state, &pov_virtual_utxo_view)?; drop(virtual_read); - self.build_block_template_from_virtual_state(pov_virtual_state, miner_data, txs) + self.build_block_template_from_virtual_state(pov_virtual_state, miner_data, txs, vec![]) } } diff --git a/consensus/src/processes/transaction_validator/tx_validation_in_isolation.rs b/consensus/src/processes/transaction_validator/tx_validation_in_isolation.rs index 67901612da..914624f940 100644 --- a/consensus/src/processes/transaction_validator/tx_validation_in_isolation.rs +++ b/consensus/src/processes/transaction_validator/tx_validation_in_isolation.rs @@ -17,6 +17,7 @@ impl TransactionValidator { check_duplicate_transaction_inputs(tx)?; check_gas(tx)?; check_transaction_payload(tx)?; + check_transaction_subnetwork(tx)?; check_transaction_version(tx) } @@ -146,10 +147,18 @@ fn check_transaction_output_value_ranges(tx: &Transaction) -> TxResult<()> { Ok(()) } +fn check_transaction_subnetwork(tx: &Transaction) -> TxResult<()> { + if tx.is_coinbase() || tx.subnetwork_id.is_native() { + Ok(()) + } else { + Err(TxRuleError::SubnetworksDisabled(tx.subnetwork_id.clone())) + } +} + #[cfg(test)] mod tests { use kaspa_consensus_core::{ - subnets::{SUBNETWORK_ID_COINBASE, SUBNETWORK_ID_NATIVE}, + subnets::{SubnetworkId, SUBNETWORK_ID_COINBASE, SUBNETWORK_ID_NATIVE}, tx::{scriptvec, ScriptPublicKey, Transaction, TransactionId, TransactionInput, TransactionOutpoint, TransactionOutput}, }; use kaspa_core::assert_match; @@ -261,6 +270,10 @@ mod tests { tv.validate_tx_in_isolation(&valid_tx).unwrap(); + let mut tx: Transaction = valid_tx.clone(); + tx.subnetwork_id = SubnetworkId::from_byte(3); + assert_match!(tv.validate_tx_in_isolation(&tx), Err(TxRuleError::SubnetworksDisabled(_))); + let mut tx = valid_tx.clone(); tx.inputs = vec![]; assert_match!(tv.validate_tx_in_isolation(&tx), Err(TxRuleError::NoTxInputs)); diff --git a/kaspad/Cargo.toml b/kaspad/Cargo.toml index 0decbc9cc8..3507339f29 100644 --- a/kaspad/Cargo.toml +++ b/kaspad/Cargo.toml @@ -41,9 +41,9 @@ kaspa-utxoindex.workspace = true kaspa-wrpc-server.workspace = true async-channel.workspace = true +cfg-if.workspace = true clap.workspace = true dhat = { workspace = true, optional = true } -serde.workspace = true dirs.workspace = true futures-util.workspace = true itertools.workspace = true @@ -52,13 +52,16 @@ num_cpus.workspace = true rand.workspace = true rayon.workspace = true rocksdb.workspace = true +serde.workspace = true tempfile.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] } workflow-log.workspace = true + toml = "0.8.10" serde_with = "3.7.0" [features] heap = ["dhat", "kaspa-alloc/heap"] devnet-prealloc = ["kaspa-consensus/devnet-prealloc"] +semaphore-trace = ["kaspa-utils/semaphore-trace"] diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index 17f73fcfbc..3a636c1084 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -165,7 +165,13 @@ impl Runtime { let log_dir = get_log_dir(args); // Initialize the logger - kaspa_core::log::init_logger(log_dir.as_deref(), &args.log_level); + cfg_if::cfg_if! { + if #[cfg(feature = "semaphore-trace")] { + kaspa_core::log::init_logger(log_dir.as_deref(), &format!("{},{}=debug", args.log_level, kaspa_utils::sync::semaphore_module_path())); + } else { + kaspa_core::log::init_logger(log_dir.as_deref(), &args.log_level); + } + }; // Configure the panic behavior // As we log the panic, we want to set it up after the logger diff --git a/mining/src/block_template/builder.rs b/mining/src/block_template/builder.rs index e111b25627..6f0dbe6743 100644 --- a/mining/src/block_template/builder.rs +++ b/mining/src/block_template/builder.rs @@ -3,7 +3,6 @@ use kaspa_consensus_core::{ api::ConsensusApi, block::{BlockTemplate, TemplateBuildMode, TemplateTransactionSelector}, coinbase::MinerData, - merkle::calc_hash_merkle_root, tx::COINBASE_TRANSACTION_INDEX, }; use kaspa_core::time::{unix_now, Stopwatch}; @@ -106,7 +105,8 @@ impl BlockTemplateBuilder { coinbase_tx.outputs.last_mut().unwrap().script_public_key = new_miner_data.script_public_key.clone(); } // Update the hash merkle root according to the modified transactions - block_template.block.header.hash_merkle_root = calc_hash_merkle_root(block_template.block.transactions.iter()); + block_template.block.header.hash_merkle_root = + consensus.calc_transaction_hash_merkle_root(&block_template.block.transactions, block_template.block.header.daa_score); let new_timestamp = unix_now(); if new_timestamp > block_template.block.header.timestamp { // Only if new time stamp is later than current, update the header. Otherwise, diff --git a/mining/src/manager.rs b/mining/src/manager.rs index 8bd46c3d86..081854698e 100644 --- a/mining/src/manager.rs +++ b/mining/src/manager.rs @@ -210,7 +210,11 @@ impl MiningManager { } /// Returns realtime feerate estimations based on internal mempool state with additional verbose data - pub(crate) fn get_realtime_feerate_estimations_verbose(&self) -> FeeEstimateVerbose { + pub(crate) fn get_realtime_feerate_estimations_verbose( + &self, + consensus: &dyn ConsensusApi, + prefix: kaspa_addresses::Prefix, + ) -> MiningManagerResult { let args = FeerateEstimatorArgs::new(self.config.network_blocks_per_second, self.config.maximum_mass_per_block); let network_mass_per_second = args.network_mass_per_second(); let mempool_read = self.mempool.read(); @@ -218,16 +222,37 @@ impl MiningManager { let ready_transactions_count = mempool_read.ready_transaction_count(); let ready_transaction_total_mass = mempool_read.ready_transaction_total_mass(); drop(mempool_read); - FeeEstimateVerbose { + let mut resp = FeeEstimateVerbose { estimations: estimator.calc_estimations(self.config.minimum_feerate()), network_mass_per_second, mempool_ready_transactions_count: ready_transactions_count as u64, mempool_ready_transactions_total_mass: ready_transaction_total_mass, - // TODO: Next PR + next_block_template_feerate_min: -1.0, next_block_template_feerate_median: -1.0, next_block_template_feerate_max: -1.0, + }; + // calculate next_block_template_feerate_xxx + { + let script_public_key = kaspa_txscript::pay_to_address_script(&kaspa_addresses::Address::new( + prefix, + kaspa_addresses::Version::PubKey, + &[0u8; 32], + )); + let miner_data: MinerData = MinerData::new(script_public_key, vec![]); + + let BlockTemplate { block: kaspa_consensus_core::block::MutableBlock { transactions, .. }, calculated_fees, .. } = + self.get_block_template(consensus, &miner_data)?; + + let Some(Stats { max, median, min }) = feerate_stats(transactions, calculated_fees) else { + return Ok(resp); + }; + + resp.next_block_template_feerate_max = max; + resp.next_block_template_feerate_min = min; + resp.next_block_template_feerate_median = median; } + Ok(resp) } /// Clears the block template cache, forcing the next call to get_block_template to build a new block template. @@ -834,8 +859,12 @@ impl MiningManagerProxy { } /// Returns realtime feerate estimations based on internal mempool state with additional verbose data - pub async fn get_realtime_feerate_estimations_verbose(self) -> FeeEstimateVerbose { - spawn_blocking(move || self.inner.get_realtime_feerate_estimations_verbose()).await.unwrap() + pub async fn get_realtime_feerate_estimations_verbose( + self, + consensus: &ConsensusProxy, + prefix: kaspa_addresses::Prefix, + ) -> MiningManagerResult { + consensus.clone().spawn_blocking(move |c| self.inner.get_realtime_feerate_estimations_verbose(c, prefix)).await } /// Validates a transaction and adds it to the set of known transactions that have not yet been @@ -982,3 +1011,103 @@ impl MiningManagerProxy { count } } + +/// Represents statistical information about fee rates of transactions. +struct Stats { + /// The maximum fee rate observed. + max: f64, + /// The median fee rate observed. + median: f64, + /// The minimum fee rate observed. + min: f64, +} +/// Calculates the maximum, median, and minimum fee rates (fee per unit mass) +/// for a set of transactions, excluding the first transaction which is assumed +/// to be the coinbase transaction. +/// +/// # Arguments +/// +/// * `transactions` - A vector of `Transaction` objects. The first transaction +/// is assumed to be the coinbase transaction and is excluded from fee rate +/// calculations. +/// * `calculated_fees` - A vector of fees associated with the transactions. +/// This vector should have one less element than the `transactions` vector +/// since the first transaction (coinbase) does not have a fee. +/// +/// # Returns +/// +/// Returns an `Option` containing the maximum, median, and minimum fee +/// rates if the input vectors are valid. Returns `None` if the vectors are +/// empty or if the lengths are inconsistent. +fn feerate_stats(transactions: Vec, calculated_fees: Vec) -> Option { + if calculated_fees.is_empty() { + return None; + } + if transactions.len() != calculated_fees.len() + 1 { + error!( + "[feerate_stats] block template transactions length ({}) is expected to be one more than `calculated_fees` length ({})", + transactions.len(), + calculated_fees.len() + ); + return None; + } + debug_assert!(transactions[0].is_coinbase()); + let mut feerates = calculated_fees + .into_iter() + .zip(transactions + .iter() + // skip coinbase tx + .skip(1) + .map(Transaction::mass)) + .map(|(fee, mass)| fee as f64 / mass as f64) + .collect_vec(); + feerates.sort_unstable_by(f64::total_cmp); + + let max = feerates[feerates.len() - 1]; + let min = feerates[0]; + let median = feerates[feerates.len() / 2]; + + Some(Stats { max, median, min }) +} + +#[cfg(test)] +mod tests { + use super::*; + use kaspa_consensus_core::subnets; + use std::iter::repeat; + + fn transactions(length: usize) -> Vec { + let tx = || { + let tx = Transaction::new(0, vec![], vec![], 0, Default::default(), 0, vec![]); + tx.set_mass(2); + tx + }; + let mut txs = repeat(tx()).take(length).collect_vec(); + txs[0].subnetwork_id = subnets::SUBNETWORK_ID_COINBASE; + txs + } + + #[test] + fn feerate_stats_test() { + let calculated_fees = vec![100u64, 200, 300, 400]; + let txs = transactions(calculated_fees.len() + 1); + let Stats { max, median, min } = feerate_stats(txs, calculated_fees).unwrap(); + assert_eq!(max, 200.0); + assert_eq!(median, 150.0); + assert_eq!(min, 50.0); + } + + #[test] + fn feerate_stats_empty_test() { + let calculated_fees = vec![]; + let txs = transactions(calculated_fees.len() + 1); + assert!(feerate_stats(txs, calculated_fees).is_none()); + } + + #[test] + fn feerate_stats_inconsistent_test() { + let calculated_fees = vec![100u64, 200, 300, 400]; + let txs = transactions(calculated_fees.len()); + assert!(feerate_stats(txs, calculated_fees).is_none()); + } +} diff --git a/mining/src/testutils/consensus_mock.rs b/mining/src/testutils/consensus_mock.rs index 4ec37ec820..d68e062a0e 100644 --- a/mining/src/testutils/consensus_mock.rs +++ b/mining/src/testutils/consensus_mock.rs @@ -19,7 +19,7 @@ use kaspa_consensus_core::{ utxo::utxo_collection::UtxoCollection, }; use kaspa_core::time::unix_now; -use kaspa_hashes::ZERO_HASH; +use kaspa_hashes::{Hash, ZERO_HASH}; use parking_lot::RwLock; use std::{collections::HashMap, sync::Arc}; @@ -86,7 +86,7 @@ impl ConsensusApi for ConsensusMock { let coinbase = coinbase_manager.expected_coinbase_transaction(miner_data.clone()); txs.insert(0, coinbase.tx); let now = unix_now(); - let hash_merkle_root = calc_hash_merkle_root(txs.iter()); + let hash_merkle_root = self.calc_transaction_hash_merkle_root(&txs, 0); let header = Header::new_finalized( BLOCK_VERSION, vec![], @@ -103,7 +103,7 @@ impl ConsensusApi for ConsensusMock { ); let mutable_block = MutableBlock::new(header, txs); - Ok(BlockTemplate::new(mutable_block, miner_data, coinbase.has_red_reward, now, 0, ZERO_HASH)) + Ok(BlockTemplate::new(mutable_block, miner_data, coinbase.has_red_reward, now, 0, ZERO_HASH, vec![])) } fn validate_mempool_transaction(&self, mutable_tx: &mut MutableTransaction, _: &TransactionValidationArgs) -> TxResult<()> { @@ -177,4 +177,8 @@ impl ConsensusApi for ConsensusMock { let coinbase_manager = CoinbaseManagerMock::new(); Ok(coinbase_manager.modify_coinbase_payload(payload, miner_data)) } + + fn calc_transaction_hash_merkle_root(&self, txs: &[Transaction], _pov_daa_score: u64) -> Hash { + calc_hash_merkle_root(txs.iter(), false) + } } diff --git a/protocol/flows/src/flowcontext/orphans.rs b/protocol/flows/src/flowcontext/orphans.rs index 75c223caa6..d1122b0e07 100644 --- a/protocol/flows/src/flowcontext/orphans.rs +++ b/protocol/flows/src/flowcontext/orphans.rs @@ -78,7 +78,7 @@ impl OrphanBlocksPool { if self.orphans.contains_key(&orphan_hash) { return None; } - + orphan_block.asses_for_cache()?; let (roots, orphan_ancestors) = match self.get_orphan_roots(consensus, orphan_block.header.direct_parents().iter().copied().collect()).await { FindRootsOutput::Roots(roots, orphan_ancestors) => (roots, orphan_ancestors), diff --git a/rpc/service/src/service.rs b/rpc/service/src/service.rs index e15fa3afa8..fc6f266fb4 100644 --- a/rpc/service/src/service.rs +++ b/rpc/service/src/service.rs @@ -35,6 +35,7 @@ use kaspa_index_core::{ connection::IndexChannelConnection, indexed_utxos::UtxoSetByScriptPublicKey, notification::Notification as IndexNotification, notifier::IndexNotifier, }; +use kaspa_mining::feerate::FeeEstimateVerbose; use kaspa_mining::model::tx_query::TransactionQuery; use kaspa_mining::{manager::MiningManagerProxy, mempool::tx::Orphan}; use kaspa_notify::listener::ListenerLifespan; @@ -113,7 +114,7 @@ pub struct RpcCoreService { p2p_tower_counters: Arc, grpc_tower_counters: Arc, fee_estimate_cache: ExpiringCache, - fee_estimate_verbose_cache: ExpiringCache, + fee_estimate_verbose_cache: ExpiringCache>, } const RPC_CORE: &str = "rpc-core"; @@ -683,10 +684,16 @@ NOTE: This error usually indicates an RPC conversion error between the node and ) -> RpcResult { if request.verbose { let mining_manager = self.mining_manager.clone(); + let consensus_manager = self.consensus_manager.clone(); + let prefix = self.config.prefix(); + let response = self .fee_estimate_verbose_cache - .get(async move { mining_manager.get_realtime_feerate_estimations_verbose().await.into_rpc() }) - .await; + .get(async move { + let session = consensus_manager.consensus().unguarded_session(); + mining_manager.get_realtime_feerate_estimations_verbose(&session, prefix).await.map(FeeEstimateVerbose::into_rpc) + }) + .await?; Ok(response) } else { let estimate = self.get_fee_estimate_call(GetFeeEstimateRequest {}).await?.estimate; diff --git a/simpa/Cargo.toml b/simpa/Cargo.toml index b52aa6fd93..30162ba4f1 100644 --- a/simpa/Cargo.toml +++ b/simpa/Cargo.toml @@ -22,6 +22,7 @@ kaspa-perf-monitor.workspace = true kaspa-utils.workspace = true async-channel.workspace = true +cfg-if.workspace = true clap.workspace = true dhat = { workspace = true, optional = true } futures-util.workspace = true @@ -38,3 +39,4 @@ tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] } [features] heap = ["dhat", "kaspa-alloc/heap"] +semaphore-trace = ["kaspa-utils/semaphore-trace"] diff --git a/simpa/src/main.rs b/simpa/src/main.rs index 368b523447..ac8bf8b852 100644 --- a/simpa/src/main.rs +++ b/simpa/src/main.rs @@ -20,7 +20,12 @@ use kaspa_consensus_core::{ BlockHashSet, BlockLevel, HashMapCustomHasher, }; use kaspa_consensus_notify::root::ConsensusNotificationRoot; -use kaspa_core::{info, task::service::AsyncService, task::tick::TickService, time::unix_now, trace, warn}; +use kaspa_core::{ + info, + task::{service::AsyncService, tick::TickService}, + time::unix_now, + trace, warn, +}; use kaspa_database::prelude::ConnBuilder; use kaspa_database::{create_temp_db, load_existing_db}; use kaspa_hashes::Hash; @@ -133,7 +138,13 @@ fn main() { let args = Args::parse(); // Initialize the logger - kaspa_core::log::init_logger(None, &args.log_level); + cfg_if::cfg_if! { + if #[cfg(feature = "semaphore-trace")] { + kaspa_core::log::init_logger(None, &format!("{},{}=debug", args.log_level, kaspa_utils::sync::semaphore_module_path())); + } else { + kaspa_core::log::init_logger(None, &args.log_level); + } + }; // Configure the panic behavior // As we log the panic, we want to set it up after the logger diff --git a/utils/Cargo.toml b/utils/Cargo.toml index dda05cb0ea..641ecb61ae 100644 --- a/utils/Cargo.toml +++ b/utils/Cargo.toml @@ -11,7 +11,6 @@ repository.workspace = true [dependencies] arc-swap.workspace = true -parking_lot.workspace = true async-channel.workspace = true borsh.workspace = true cfg-if.workspace = true @@ -19,12 +18,14 @@ event-listener.workspace = true faster-hex.workspace = true ipnet.workspace = true itertools.workspace = true +log.workspace = true +once_cell.workspace = true +parking_lot.workspace = true serde.workspace = true smallvec.workspace = true thiserror.workspace = true triggered.workspace = true uuid.workspace = true -log.workspace = true wasm-bindgen.workspace = true [target.'cfg(not(target_arch = "wasm32"))'.dependencies] @@ -42,3 +43,6 @@ rand.workspace = true [[bench]] name = "bench" harness = false + +[features] +semaphore-trace = [] diff --git a/utils/src/sync/mod.rs b/utils/src/sync/mod.rs index 40fb147cb3..14afe79772 100644 --- a/utils/src/sync/mod.rs +++ b/utils/src/sync/mod.rs @@ -1,2 +1,7 @@ pub mod rwlock; pub(crate) mod semaphore; + +#[cfg(feature = "semaphore-trace")] +pub fn semaphore_module_path() -> &'static str { + semaphore::get_module_path() +} diff --git a/utils/src/sync/semaphore.rs b/utils/src/sync/semaphore.rs index 4b94e8f2f5..2ea6dcc031 100644 --- a/utils/src/sync/semaphore.rs +++ b/utils/src/sync/semaphore.rs @@ -4,6 +4,64 @@ use std::{ time::Duration, }; +#[cfg(feature = "semaphore-trace")] +mod trace { + use super::*; + use log::debug; + use once_cell::sync::Lazy; + use std::sync::atomic::AtomicU64; + use std::time::SystemTime; + + static SYS_START: Lazy = Lazy::new(SystemTime::now); + + #[inline] + pub(super) fn sys_now() -> u64 { + SystemTime::now().duration_since(*SYS_START).unwrap_or_default().as_micros() as u64 + } + + #[derive(Debug, Default)] + pub struct TraceInner { + readers_start: AtomicU64, + readers_time: AtomicU64, + log_time: AtomicU64, + log_value: AtomicU64, + } + + impl TraceInner { + pub(super) fn mark_readers_start(&self) { + self.readers_start.store(sys_now(), Ordering::Relaxed); + } + + pub(super) fn mark_readers_end(&self) { + let start = self.readers_start.load(Ordering::Relaxed); + let now = sys_now(); + if start < now { + let readers_time = self.readers_time.fetch_add(now - start, Ordering::Relaxed) + now - start; + let log_time = self.log_time.load(Ordering::Relaxed); + if log_time + (Duration::from_secs(10).as_micros() as u64) < now { + let log_value = self.log_value.load(Ordering::Relaxed); + debug!( + "Semaphore: log interval: {:?}, readers time: {:?}, fraction: {:.2}", + Duration::from_micros(now - log_time), + Duration::from_micros(readers_time - log_value), + (readers_time - log_value) as f64 / (now - log_time) as f64 + ); + self.log_value.store(readers_time, Ordering::Relaxed); + self.log_time.store(now, Ordering::Relaxed); + } + } + } + } +} + +#[cfg(feature = "semaphore-trace")] +use trace::*; + +#[cfg(feature = "semaphore-trace")] +pub(crate) fn get_module_path() -> &'static str { + module_path!() +} + /// A low-level non-fair semaphore. The semaphore is non-fair in the sense that clients acquiring /// a lower number of permits might get their allocation before earlier clients which requested more /// permits -- if the semaphore can provide the lower allocation but not the larger. This non-fairness @@ -15,13 +73,28 @@ use std::{ pub(crate) struct Semaphore { counter: AtomicUsize, signal: Event, + #[cfg(feature = "semaphore-trace")] + trace_inner: TraceInner, } impl Semaphore { pub const MAX_PERMITS: usize = usize::MAX; - pub const fn new(available_permits: usize) -> Semaphore { - Semaphore { counter: AtomicUsize::new(available_permits), signal: Event::new() } + pub fn new(available_permits: usize) -> Semaphore { + cfg_if::cfg_if! { + if #[cfg(feature = "semaphore-trace")] { + Semaphore { + counter: AtomicUsize::new(available_permits), + signal: Event::new(), + trace_inner: Default::default(), + } + } else { + Semaphore { + counter: AtomicUsize::new(available_permits), + signal: Event::new(), + } + } + } } /// Tries to acquire `permits` slots from the semaphore. Upon success, returns the acquired slot @@ -33,7 +106,14 @@ impl Semaphore { } match self.counter.compare_exchange_weak(count, count - permits, Ordering::AcqRel, Ordering::Acquire) { - Ok(_) => return Some(count), + Ok(_) => { + #[cfg(feature = "semaphore-trace")] + if permits == 1 && count == Self::MAX_PERMITS { + // permits == 1 indicates a reader, count == Self::MAX_PERMITS indicates it is the first reader + self.trace_inner.mark_readers_start(); + } + return Some(count); + } Err(c) => count = c, } } @@ -75,6 +155,12 @@ impl Semaphore { /// Returns the released slot pub fn release(&self, permits: usize) -> usize { let slot = self.counter.fetch_add(permits, Ordering::AcqRel) + permits; + + #[cfg(feature = "semaphore-trace")] + if permits == 1 && slot == Self::MAX_PERMITS { + // permits == 1 indicates a reader, slot == Self::MAX_PERMITS indicates it is the last reader + self.trace_inner.mark_readers_end(); + } self.signal.notify(permits); slot }