diff --git a/components/addressmanager/src/stores/address_store.rs b/components/addressmanager/src/stores/address_store.rs index b4ea43bd2..41b12f6cc 100644 --- a/components/addressmanager/src/stores/address_store.rs +++ b/components/addressmanager/src/stores/address_store.rs @@ -18,11 +18,7 @@ pub struct Entry { pub address: NetAddress, } -impl MemSizeEstimator for Entry { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for Entry {} pub trait AddressesStoreReader { fn get(&self, key: AddressKey) -> Result; diff --git a/components/addressmanager/src/stores/banned_address_store.rs b/components/addressmanager/src/stores/banned_address_store.rs index 1cdeedf24..b530af0ce 100644 --- a/components/addressmanager/src/stores/banned_address_store.rs +++ b/components/addressmanager/src/stores/banned_address_store.rs @@ -11,11 +11,7 @@ use std::{error::Error, fmt::Display, sync::Arc}; #[derive(Clone, Copy, Serialize, Deserialize)] pub struct ConnectionBanTimestamp(pub u64); -impl MemSizeEstimator for ConnectionBanTimestamp { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for ConnectionBanTimestamp {} pub trait BannedAddressesStoreReader { fn get(&self, address: IpAddr) -> Result; diff --git a/consensus/core/src/blockstatus.rs b/consensus/core/src/blockstatus.rs index a77ae5cc9..be43143de 100644 --- a/consensus/core/src/blockstatus.rs +++ b/consensus/core/src/blockstatus.rs @@ -21,11 +21,7 @@ pub enum BlockStatus { StatusHeaderOnly, } -impl MemSizeEstimator for BlockStatus { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for BlockStatus {} impl BlockStatus { pub fn has_block_header(self) -> bool { diff --git a/consensus/core/src/config/mod.rs b/consensus/core/src/config/mod.rs index 8904b447c..d62bf15a9 100644 --- a/consensus/core/src/config/mod.rs +++ b/consensus/core/src/config/mod.rs @@ -54,7 +54,7 @@ pub struct Config { pub user_agent_comments: Vec, - // If undefined, sets it to 0.0.0.0 + /// If undefined, sets it to 0.0.0.0 pub p2p_listen_address: ContextualNetAddress, pub externalip: Option, @@ -63,7 +63,11 @@ pub struct Config { #[cfg(feature = "devnet-prealloc")] pub initial_utxo_set: Arc, + pub disable_upnp: bool, + + /// A scale factor to apply to memory allocation bounds + pub ram_scale: f64, } impl Config { @@ -90,6 +94,7 @@ impl Config { #[cfg(feature = "devnet-prealloc")] initial_utxo_set: Default::default(), disable_upnp: false, + ram_scale: 1.0, } } diff --git a/consensus/core/src/errors/config.rs b/consensus/core/src/errors/config.rs index 0df911131..2b73e07e9 100644 --- a/consensus/core/src/errors/config.rs +++ b/consensus/core/src/errors/config.rs @@ -8,6 +8,12 @@ pub enum ConfigError { #[error("Configuration: --logdir and --nologfiles cannot be used together")] MixedLogDirAndNoLogFiles, + #[error("Configuration: --ram-scale cannot be set below 0.1")] + RamScaleTooLow, + + #[error("Configuration: --ram-scale cannot be set above 10.0")] + RamScaleTooHigh, + #[cfg(feature = "devnet-prealloc")] #[error("Cannot preallocate UTXOs on any network except devnet")] PreallocUtxosOnNonDevnet, diff --git a/consensus/core/src/tx.rs b/consensus/core/src/tx.rs index 22efe28af..35ba7bae3 100644 --- a/consensus/core/src/tx.rs +++ b/consensus/core/src/tx.rs @@ -45,11 +45,7 @@ impl UtxoEntry { } } -impl MemSizeEstimator for UtxoEntry { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for UtxoEntry {} pub type TransactionIndexType = u32; diff --git a/consensus/src/consensus/factory.rs b/consensus/src/consensus/factory.rs index 7acae97ef..ad398c75b 100644 --- a/consensus/src/consensus/factory.rs +++ b/consensus/src/consensus/factory.rs @@ -28,11 +28,7 @@ pub struct ConsensusEntry { creation_timestamp: u64, } -impl MemSizeEstimator for ConsensusEntry { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for ConsensusEntry {} impl ConsensusEntry { pub fn new(key: u64, directory_name: String, creation_timestamp: u64) -> Self { diff --git a/consensus/src/consensus/storage.rs b/consensus/src/consensus/storage.rs index 4e5fe71f2..ea152dada 100644 --- a/consensus/src/consensus/storage.rs +++ b/consensus/src/consensus/storage.rs @@ -70,6 +70,9 @@ pub struct ConsensusStorage { impl ConsensusStorage { pub fn new(db: Arc, config: Arc) -> Arc { + let scale_factor = config.ram_scale; + let scaled = |s| (s as f64 * scale_factor) as usize; + let params = &config.params; let perf_params = &config.perf; @@ -79,18 +82,20 @@ impl ConsensusStorage { let level_lower_bound = 2 * params.pruning_proof_m as usize; // Number of items lower bound for level-related caches // Budgets in bytes. All byte budgets overall sum up to ~1GB of memory (which obviously takes more low level alloc space) - let daa_excluded_budget = 30_000_000; - let statuses_budget = 30_000_000; - let reachability_data_budget = 20_000_000; - let reachability_sets_budget = 20_000_000; // x 2 for tree children and future covering set - let ghostdag_compact_budget = 15_000_000; - let headers_compact_budget = 5_000_000; - let parents_budget = 40_000_000; // x 3 for reachability and levels - let children_budget = 5_000_000; // x 3 for reachability and levels - let ghostdag_budget = 80_000_000; // x 2 for levels - let headers_budget = 80_000_000; - let utxo_diffs_budget = 40_000_000; - let block_window_budget = 200_000_000; // x 2 for difficulty and median time + let daa_excluded_budget = scaled(30_000_000); + let statuses_budget = scaled(30_000_000); + let reachability_data_budget = scaled(20_000_000); + let reachability_sets_budget = scaled(20_000_000); // x 2 for tree children and future covering set + let ghostdag_compact_budget = scaled(15_000_000); + let headers_compact_budget = scaled(5_000_000); + let parents_budget = scaled(40_000_000); // x 3 for reachability and levels + let children_budget = scaled(5_000_000); // x 3 for reachability and levels + let ghostdag_budget = scaled(80_000_000); // x 2 for levels + let headers_budget = scaled(80_000_000); + let transactions_budget = scaled(40_000_000); + let utxo_diffs_budget = scaled(40_000_000); + let block_window_budget = scaled(200_000_000); // x 2 for difficulty and median time + let acceptance_data_budget = scaled(40_000_000); // Unit sizes in bytes let daa_excluded_bytes = size_of::() + size_of::(); // Expected empty sets @@ -150,10 +155,10 @@ impl ConsensusStorage { let block_data_builder = PolicyBuilder::new().max_items(perf_params.block_data_cache_size).untracked(); let header_data_builder = PolicyBuilder::new().max_items(perf_params.header_data_cache_size).untracked(); let utxo_set_builder = PolicyBuilder::new().max_items(perf_params.utxo_set_cache_size).untracked(); - let transactions_builder = PolicyBuilder::new().max_items(40_000).tracked_units(); // Tracked units are txs. + let transactions_builder = PolicyBuilder::new().bytes_budget(transactions_budget).tracked_bytes(); + let acceptance_data_builder = PolicyBuilder::new().bytes_budget(acceptance_data_budget).tracked_bytes(); let past_pruning_points_builder = PolicyBuilder::new().max_items(1024).untracked(); - // TODO: consider tracking transactions by bytes (preferably by saving the size in a field on the block level) // TODO: consider tracking UtxoDiff byte sizes more accurately including the exact size of ScriptPublicKey // Headers @@ -210,7 +215,7 @@ impl ConsensusStorage { let block_transactions_store = Arc::new(DbBlockTransactionsStore::new(db.clone(), transactions_builder.build())); let utxo_diffs_store = Arc::new(DbUtxoDiffsStore::new(db.clone(), utxo_diffs_builder.build())); let utxo_multisets_store = Arc::new(DbUtxoMultisetsStore::new(db.clone(), block_data_builder.build())); - let acceptance_data_store = Arc::new(DbAcceptanceDataStore::new(db.clone(), block_data_builder.build())); + let acceptance_data_store = Arc::new(DbAcceptanceDataStore::new(db.clone(), acceptance_data_builder.build())); // Tips let headers_selected_tip_store = Arc::new(RwLock::new(DbHeadersSelectedTipStore::new(db.clone()))); diff --git a/consensus/src/model/stores/acceptance_data.rs b/consensus/src/model/stores/acceptance_data.rs index 0bd278161..a66fcdcfe 100644 --- a/consensus/src/model/stores/acceptance_data.rs +++ b/consensus/src/model/stores/acceptance_data.rs @@ -1,4 +1,6 @@ use kaspa_consensus_core::acceptance_data::AcceptanceData; +use kaspa_consensus_core::acceptance_data::AcceptedTxEntry; +use kaspa_consensus_core::acceptance_data::MergesetBlockAcceptanceData; use kaspa_consensus_core::BlockHasher; use kaspa_database::prelude::CachePolicy; use kaspa_database::prelude::StoreError; @@ -6,7 +8,11 @@ use kaspa_database::prelude::DB; use kaspa_database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter}; use kaspa_database::registry::DatabaseStorePrefixes; use kaspa_hashes::Hash; +use kaspa_utils::mem_size::MemSizeEstimator; use rocksdb::WriteBatch; +use serde::Deserialize; +use serde::Serialize; +use std::mem::size_of; use std::sync::Arc; pub trait AcceptanceDataStoreReader { @@ -18,11 +24,24 @@ pub trait AcceptanceDataStore: AcceptanceDataStoreReader { fn delete(&self, hash: Hash) -> Result<(), StoreError>; } +/// Simple wrapper for implementing `MemSizeEstimator` +#[derive(Clone, Serialize, Deserialize)] +struct AcceptanceDataEntry(Arc); + +impl MemSizeEstimator for AcceptanceDataEntry { + fn estimate_mem_bytes(&self) -> usize { + self.0.iter().map(|l| l.accepted_transactions.len()).sum::() * size_of::() + + self.0.len() * size_of::() + + size_of::() + + size_of::() + } +} + /// A DB + cache implementation of `DbAcceptanceDataStore` trait, with concurrency support. #[derive(Clone)] pub struct DbAcceptanceDataStore { db: Arc, - access: CachedDbAccess, BlockHasher>, + access: CachedDbAccess, } impl DbAcceptanceDataStore { @@ -38,7 +57,7 @@ impl DbAcceptanceDataStore { if self.access.has(hash)? { return Err(StoreError::HashAlreadyExists(hash)); } - self.access.write(BatchDbWriter::new(batch), hash, acceptance_data)?; + self.access.write(BatchDbWriter::new(batch), hash, AcceptanceDataEntry(acceptance_data))?; Ok(()) } @@ -49,7 +68,7 @@ impl DbAcceptanceDataStore { impl AcceptanceDataStoreReader for DbAcceptanceDataStore { fn get(&self, hash: Hash) -> Result, StoreError> { - self.access.read(hash) + Ok(self.access.read(hash)?.0) } } @@ -58,7 +77,7 @@ impl AcceptanceDataStore for DbAcceptanceDataStore { if self.access.has(hash)? { return Err(StoreError::HashAlreadyExists(hash)); } - self.access.write(DirectDbWriter::new(&self.db), hash, acceptance_data)?; + self.access.write(DirectDbWriter::new(&self.db), hash, AcceptanceDataEntry(acceptance_data))?; Ok(()) } diff --git a/consensus/src/model/stores/block_transactions.rs b/consensus/src/model/stores/block_transactions.rs index 89b2241db..050606d3c 100644 --- a/consensus/src/model/stores/block_transactions.rs +++ b/consensus/src/model/stores/block_transactions.rs @@ -1,5 +1,4 @@ -use std::sync::Arc; - +use kaspa_consensus_core::tx::{TransactionInput, TransactionOutput}; use kaspa_consensus_core::{tx::Transaction, BlockHasher}; use kaspa_database::prelude::CachePolicy; use kaspa_database::prelude::StoreError; @@ -7,7 +6,11 @@ use kaspa_database::prelude::DB; use kaspa_database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter}; use kaspa_database::registry::DatabaseStorePrefixes; use kaspa_hashes::Hash; +use kaspa_utils::mem_size::MemSizeEstimator; use rocksdb::WriteBatch; +use serde::{Deserialize, Serialize}; +use std::mem::size_of; +use std::sync::Arc; pub trait BlockTransactionsStoreReader { fn get(&self, hash: Hash) -> Result>, StoreError>; @@ -19,11 +22,31 @@ pub trait BlockTransactionsStore: BlockTransactionsStoreReader { fn delete(&self, hash: Hash) -> Result<(), StoreError>; } +#[derive(Clone, Serialize, Deserialize)] +struct BlockBody(Arc>); + +impl MemSizeEstimator for BlockBody { + fn estimate_mem_bytes(&self) -> usize { + const NORMAL_SIG_SIZE: usize = 66; + let (inputs, outputs) = self.0.iter().fold((0, 0), |(ins, outs), tx| (ins + tx.inputs.len(), outs + tx.outputs.len())); + // TODO: consider tracking transactions by bytes accurately (preferably by saving the size in a field) + // We avoid zooming in another level and counting exact bytes for sigs and scripts for performance reasons. + // Outliers with longer signatures are rare enough and their size is eventually bounded by mempool standards + // or in the worst case by max block mass. + // A similar argument holds for spk within outputs, but in this case the constant is already counted through the SmallVec used within. + inputs * (size_of::() + NORMAL_SIG_SIZE) + + outputs * size_of::() + + self.0.len() * size_of::() + + size_of::>() + + size_of::() + } +} + /// A DB + cache implementation of `BlockTransactionsStore` trait, with concurrency support. #[derive(Clone)] pub struct DbBlockTransactionsStore { db: Arc, - access: CachedDbAccess>, BlockHasher>, + access: CachedDbAccess, } impl DbBlockTransactionsStore { @@ -43,7 +66,7 @@ impl DbBlockTransactionsStore { if self.access.has(hash)? { return Err(StoreError::HashAlreadyExists(hash)); } - self.access.write(BatchDbWriter::new(batch), hash, transactions)?; + self.access.write(BatchDbWriter::new(batch), hash, BlockBody(transactions))?; Ok(()) } @@ -54,7 +77,7 @@ impl DbBlockTransactionsStore { impl BlockTransactionsStoreReader for DbBlockTransactionsStore { fn get(&self, hash: Hash) -> Result>, StoreError> { - self.access.read(hash) + Ok(self.access.read(hash)?.0) } } @@ -63,7 +86,7 @@ impl BlockTransactionsStore for DbBlockTransactionsStore { if self.access.has(hash)? { return Err(StoreError::HashAlreadyExists(hash)); } - self.access.write(DirectDbWriter::new(&self.db), hash, transactions)?; + self.access.write(DirectDbWriter::new(&self.db), hash, BlockBody(transactions))?; Ok(()) } diff --git a/consensus/src/model/stores/block_window_cache.rs b/consensus/src/model/stores/block_window_cache.rs index 20eb43bbe..5fee0e1f8 100644 --- a/consensus/src/model/stores/block_window_cache.rs +++ b/consensus/src/model/stores/block_window_cache.rs @@ -22,11 +22,7 @@ pub struct BlockWindowHeap { origin: WindowOrigin, } -impl MemSizeEstimator for BlockWindowHeap { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for BlockWindowHeap {} impl BlockWindowHeap { pub fn new(origin: WindowOrigin) -> Self { diff --git a/consensus/src/model/stores/depth.rs b/consensus/src/model/stores/depth.rs index 1deeeddb4..730b442d4 100644 --- a/consensus/src/model/stores/depth.rs +++ b/consensus/src/model/stores/depth.rs @@ -28,11 +28,7 @@ struct BlockDepthInfo { finality_point: Hash, } -impl MemSizeEstimator for BlockDepthInfo { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for BlockDepthInfo {} /// A DB + cache implementation of `DepthStore` trait, with concurrency support. #[derive(Clone)] diff --git a/consensus/src/model/stores/ghostdag.rs b/consensus/src/model/stores/ghostdag.rs index d144dd22b..89c4686c5 100644 --- a/consensus/src/model/stores/ghostdag.rs +++ b/consensus/src/model/stores/ghostdag.rs @@ -46,11 +46,7 @@ impl MemSizeEstimator for GhostdagData { } } -impl MemSizeEstimator for CompactGhostdagData { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for CompactGhostdagData {} impl From<&GhostdagData> for CompactGhostdagData { fn from(value: &GhostdagData) -> Self { diff --git a/consensus/src/model/stores/headers.rs b/consensus/src/model/stores/headers.rs index 06a74e25c..b0c25b596 100644 --- a/consensus/src/model/stores/headers.rs +++ b/consensus/src/model/stores/headers.rs @@ -49,11 +49,7 @@ pub struct CompactHeaderData { pub blue_score: u64, } -impl MemSizeEstimator for CompactHeaderData { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for CompactHeaderData {} impl From<&Header> for CompactHeaderData { fn from(header: &Header) -> Self { diff --git a/consensus/src/model/stores/reachability.rs b/consensus/src/model/stores/reachability.rs index 67ecb891a..71b3d50d2 100644 --- a/consensus/src/model/stores/reachability.rs +++ b/consensus/src/model/stores/reachability.rs @@ -29,11 +29,7 @@ pub(crate) struct ReachabilityData { pub height: u64, } -impl MemSizeEstimator for ReachabilityData { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for ReachabilityData {} impl ReachabilityData { pub fn new(parent: Hash, interval: Interval, height: u64) -> Self { diff --git a/consensus/src/pipeline/monitor.rs b/consensus/src/pipeline/monitor.rs index 8c73b34a7..190fbe42b 100644 --- a/consensus/src/pipeline/monitor.rs +++ b/consensus/src/pipeline/monitor.rs @@ -59,7 +59,7 @@ impl ConsensusMonitor { if delta.header_counts != 0 { delta.dep_counts as f64 / delta.header_counts as f64 } else { 0f64 }, if delta.header_counts != 0 { delta.mergeset_counts as f64 / delta.header_counts as f64 } else { 0f64 }, if delta.body_counts != 0 { delta.txs_counts as f64 / delta.body_counts as f64 } else{ 0f64 }, - if delta.body_counts != 0 { delta.mass_counts / delta.body_counts } else{ 0 }, + if delta.body_counts != 0 { delta.mass_counts as f64 / delta.body_counts as f64 } else{ 0f64 }, ); last_snapshot = snapshot; diff --git a/crypto/hashes/src/lib.rs b/crypto/hashes/src/lib.rs index dbe639737..7d690e80b 100644 --- a/crypto/hashes/src/lib.rs +++ b/crypto/hashes/src/lib.rs @@ -168,11 +168,7 @@ impl FromHex for Hash { } } -impl MemSizeEstimator for Hash { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for Hash {} #[wasm_bindgen] impl Hash { diff --git a/indexes/core/src/indexed_utxos.rs b/indexes/core/src/indexed_utxos.rs index 663f53191..69a20d37f 100644 --- a/indexes/core/src/indexed_utxos.rs +++ b/indexes/core/src/indexed_utxos.rs @@ -31,11 +31,7 @@ impl CompactUtxoEntry { } } -impl MemSizeEstimator for CompactUtxoEntry { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for CompactUtxoEntry {} impl From for CompactUtxoEntry { fn from(utxo_entry: UtxoEntry) -> Self { diff --git a/kaspad/src/args.rs b/kaspad/src/args.rs index a38a9bb08..556810287 100644 --- a/kaspad/src/args.rs +++ b/kaspad/src/args.rs @@ -67,6 +67,7 @@ pub struct Args { pub disable_upnp: bool, pub disable_dns_seeding: bool, + pub ram_scale: f64, } impl Default for Args { @@ -114,6 +115,7 @@ impl Default for Args { disable_upnp: false, disable_dns_seeding: false, + ram_scale: 1.0, } } } @@ -124,6 +126,7 @@ impl Args { config.disable_upnp = self.disable_upnp; config.unsafe_rpc = self.unsafe_rpc; config.enable_unsynced_mining = self.enable_unsynced_mining; + config.enable_mainnet_mining = self.enable_mainnet_mining; config.is_archival = self.archival; // TODO: change to `config.enable_sanity_checks = self.sanity` when we reach stable versions config.enable_sanity_checks = true; @@ -131,6 +134,7 @@ impl Args { config.block_template_cache_lifetime = self.block_template_cache_lifetime; config.p2p_listen_address = self.listen.unwrap_or(ContextualNetAddress::unspecified()); config.externalip = self.externalip.map(|v| v.normalize(config.default_p2p_port())); + config.ram_scale = self.ram_scale; #[cfg(feature = "devnet-prealloc")] if let Some(num_prealloc_utxos) = self.num_prealloc_utxos { @@ -322,6 +326,14 @@ pub fn cli() -> Command { ) .arg(arg!(--"disable-upnp" "Disable upnp")) .arg(arg!(--"nodnsseed" "Disable DNS seeding for peers")) + .arg( + Arg::new("ram-scale") + .long("ram-scale") + .require_equals(true) + .value_parser(clap::value_parser!(f64)) + .help("Apply a scale factor to memory allocation bounds. Nodes with limited RAM (~4-8GB) should set this to ~0.3-0.5 respectively. Nodes with +a large RAM (~64GB) can set this value to ~3.0-4.0 and gain superior performance especially for syncing peers faster"), + ) ; #[cfg(feature = "devnet-prealloc")] @@ -391,6 +403,7 @@ impl Args { block_template_cache_lifetime: defaults.block_template_cache_lifetime, disable_upnp: m.get_one::("disable-upnp").cloned().unwrap_or(defaults.disable_upnp), disable_dns_seeding: m.get_one::("nodnsseed").cloned().unwrap_or(defaults.disable_dns_seeding), + ram_scale: m.get_one::("ram-scale").cloned().unwrap_or(defaults.ram_scale), #[cfg(feature = "devnet-prealloc")] num_prealloc_utxos: m.get_one::("num-prealloc-utxos").cloned(), diff --git a/kaspad/src/daemon.rs b/kaspad/src/daemon.rs index ff0c38b8f..d5de531f0 100644 --- a/kaspad/src/daemon.rs +++ b/kaspad/src/daemon.rs @@ -85,6 +85,12 @@ pub fn validate_args(args: &Args) -> ConfigResult<()> { if args.logdir.is_some() && args.no_log_files { return Err(ConfigError::MixedLogDirAndNoLogFiles); } + if args.ram_scale < 0.1 { + return Err(ConfigError::RamScaleTooLow); + } + if args.ram_scale > 10.0 { + return Err(ConfigError::RamScaleTooHigh); + } Ok(()) } @@ -407,11 +413,12 @@ do you confirm? (answer y/n or pass --yes to the Kaspad command line to confirm let (address_manager, port_mapping_extender_svc) = AddressManager::new(config.clone(), meta_db, tick_service.clone()); let mining_monitor = Arc::new(MiningMonitor::new(mining_counters.clone(), tx_script_cache_counters.clone(), tick_service.clone())); - let mining_manager = MiningManagerProxy::new(Arc::new(MiningManager::new_with_spam_blocking_option( + let mining_manager = MiningManagerProxy::new(Arc::new(MiningManager::new_with_extended_config( network.is_mainnet(), config.target_time_per_block, false, config.max_block_mass, + config.ram_scale, config.block_template_cache_lifetime, mining_counters, ))); diff --git a/metrics/perf_monitor/src/lib.rs b/metrics/perf_monitor/src/lib.rs index e7215d71d..39bfd1b57 100644 --- a/metrics/perf_monitor/src/lib.rs +++ b/metrics/perf_monitor/src/lib.rs @@ -40,10 +40,11 @@ impl> Monitor { let mut last_log_time = Instant::now(); let mut last_read = 0; let mut last_written = 0; + let mut process_stat = ProcessStat::cur()?; while let TickReason::Wakeup = self.tick_service.as_ref().tick(self.fetch_interval).await { let ProcessMemoryInfo { resident_set_size, virtual_memory_size, .. } = get_process_memory_info()?; let core_num = processor_numbers()?; - let cpu_usage = ProcessStat::cur()?.cpu()?; + let cpu_usage = process_stat.cpu()?; let fd_num = fd_count_cur()?; let IOStats { read_bytes: disk_io_read_bytes, write_bytes: disk_io_write_bytes, .. } = get_process_io_stats()?; diff --git a/mining/src/manager.rs b/mining/src/manager.rs index cf3be07a6..ccd669718 100644 --- a/mining/src/manager.rs +++ b/mining/src/manager.rs @@ -53,11 +53,12 @@ impl MiningManager { Self::with_config(config, cache_lifetime, counters) } - pub fn new_with_spam_blocking_option( + pub fn new_with_extended_config( block_spam_txs: bool, target_time_per_block: u64, relay_non_std_transactions: bool, max_block_mass: u64, + ram_scale: f64, cache_lifetime: Option, counters: Arc, ) -> Self { @@ -66,7 +67,8 @@ impl MiningManager { target_time_per_block, relay_non_std_transactions, max_block_mass, - ); + ) + .apply_ram_scale(ram_scale); Self::with_config(config, cache_lifetime, counters) } diff --git a/mining/src/mempool/config.rs b/mining/src/mempool/config.rs index 49d0205a0..c81679540 100644 --- a/mining/src/mempool/config.rs +++ b/mining/src/mempool/config.rs @@ -1,7 +1,7 @@ use kaspa_consensus_core::constants::TX_VERSION; pub(crate) const DEFAULT_MAXIMUM_TRANSACTION_COUNT: u64 = 1_000_000; -pub(crate) const DEFAULT_MAXIMUM_READY_TRANSACTION_COUNT: u64 = 100_000; +pub(crate) const DEFAULT_MAXIMUM_READY_TRANSACTION_COUNT: u64 = 50_000; pub(crate) const DEFAULT_MAXIMUM_BUILD_BLOCK_TEMPLATE_ATTEMPTS: u64 = 5; pub(crate) const DEFAULT_TRANSACTION_EXPIRE_INTERVAL_SECONDS: u64 = 60; @@ -135,4 +135,9 @@ impl Config { ) -> Self { Self { block_spam_txs, ..Self::build_default(target_milliseconds_per_block, relay_non_std_transactions, max_block_mass) } } + + pub fn apply_ram_scale(mut self, ram_scale: f64) -> Self { + self.maximum_transaction_count = (self.maximum_transaction_count as f64 * ram_scale.min(1.0)) as u64; // Allow only scaling down + self + } } diff --git a/protocol/flows/src/flow_context.rs b/protocol/flows/src/flow_context.rs index dea1941a1..23daad29f 100644 --- a/protocol/flows/src/flow_context.rs +++ b/protocol/flows/src/flow_context.rs @@ -78,8 +78,6 @@ pub enum BlockLogEvent { Submit(Hash), /// Orphaned block with x missing roots Orphaned(Hash, usize), - /// Detected a known orphan with x missing roots - OrphanRoots(Hash, usize), /// Unorphaned x blocks with hash being a representative Unorphaned(Hash, usize), } @@ -176,9 +174,6 @@ impl BlockEventLogger { summary.orphan_count += 1; summary.orphan_rep = Some(hash) } - BlockLogEvent::OrphanRoots(_, roots_count) => { - summary.orphan_roots_count += roots_count; - } BlockLogEvent::Unorphaned(hash, count) => { summary.unorphan_count += count; summary.unorphan_rep = Some(hash) @@ -198,25 +193,15 @@ impl BlockEventLogger { } } - match (summary.unorphan_count, summary.orphan_count, summary.orphan_roots_count) { - (0, 0, 0) => {} - (1, 0, 0) => info!("Unorphaned block {}", summary.unorphan()), - (n, 0, 0) => info!("Unorphaned {} block(s) ...{}", n, summary.unorphan()), - (0, m, 0) => info!("Orphaned {} block(s) ...{}", m, summary.orphan()), - (0, m, l) => info!("Orphaned {} block(s) ...{} and queued {} missing roots", m, summary.orphan(), l), - (n, m, 0) => { - info!("Unorphaned {} block(s) ...{}, orphaned {} block(s) ...{}", n, summary.unorphan(), m, summary.orphan(),) - } - (n, m, l) => { - info!( - "Unorphaned {} block(s) ...{}, orphaned {} block(s) ...{} and queued {} missing roots", - n, - summary.unorphan(), - m, - summary.orphan(), - l - ) - } + match (summary.orphan_count, summary.orphan_roots_count) { + (0, 0) => {} + (n, m) => info!("Orphaned {} block(s) ...{} and queued {} missing roots", n, summary.orphan(), m), + } + + match summary.unorphan_count { + 0 => {} + 1 => info!("Unorphaned block {}", summary.unorphan()), + n => info!("Unorphaned {} block(s) ...{}", n, summary.unorphan()), } } }); @@ -454,7 +439,6 @@ impl FlowContext { } pub async fn add_orphan(&self, consensus: &ConsensusProxy, orphan_block: Block) -> Option { - self.log_block_event(BlockLogEvent::Orphaned(orphan_block.hash(), 0)); self.orphans_pool.write().await.add_orphan(consensus, orphan_block).await } @@ -524,11 +508,8 @@ impl FlowContext { match event { BlockLogEvent::Relay(hash) => info!("Accepted block {} via relay", hash), BlockLogEvent::Submit(hash) => info!("Accepted block {} via submit block", hash), - BlockLogEvent::Orphaned(orphan, _) => { - info!("Received a block with missing parents, adding to orphan pool: {}", orphan) - } - BlockLogEvent::OrphanRoots(orphan, roots_count) => { - info!("Block {} has {} missing ancestors. Adding them to the invs queue...", orphan, roots_count) + BlockLogEvent::Orphaned(orphan, roots_count) => { + info!("Received a block with {} missing ancestors, adding to orphan pool: {}", roots_count, orphan) } _ => {} } diff --git a/protocol/flows/src/v5/blockrelay/flow.rs b/protocol/flows/src/v5/blockrelay/flow.rs index b0e5a3c54..7c62e2643 100644 --- a/protocol/flows/src/v5/blockrelay/flow.rs +++ b/protocol/flows/src/v5/blockrelay/flow.rs @@ -213,8 +213,7 @@ impl HandleRelayInvsFlow { } } - fn enqueue_orphan_roots(&mut self, orphan: Hash, roots: Vec, known_within_range: bool) { - self.ctx.log_block_event(BlockLogEvent::OrphanRoots(orphan, roots.len())); + fn enqueue_orphan_roots(&mut self, _orphan: Hash, roots: Vec, known_within_range: bool) { self.invs_route.enqueue_indirect_invs(roots, known_within_range) } @@ -274,14 +273,17 @@ impl HandleRelayInvsFlow { match self.ctx.add_orphan(consensus, block).await { // There is a sync gap between consensus and the orphan pool, meaning that consensus might have indicated // that this block is orphan, but by the time it got to the orphan pool we discovered it no longer has missing roots. - // We signal this to the caller by returning false, triggering a consensus processing retry. - // Note that no roots means it is still possible there is a known orphan ancestor in the orphan pool. However - // we should still retry consensus in this case because the ancestor might have been queued to consensus - // already and consensus handles dependencies with improved (pipeline) concurrency and overlapping + // In such a case, the orphan pool will queue the known orphan ancestors to consensus and will return the block processing + // batch. + // We signal this to the caller by returning the batch of processed ancestors, indicating a consensus processing retry + // should be performed for this block as well. Some(OrphanOutput::NoRoots(ancestor_batch)) => { return Ok(Some(ancestor_batch)); } - Some(OrphanOutput::Roots(roots)) => self.enqueue_orphan_roots(hash, roots, known_within_range), + Some(OrphanOutput::Roots(roots)) => { + self.ctx.log_block_event(BlockLogEvent::Orphaned(hash, roots.len())); + self.enqueue_orphan_roots(hash, roots, known_within_range) + } None | Some(OrphanOutput::Unknown) => {} } } else { diff --git a/simpa/src/main.rs b/simpa/src/main.rs index 1e38b1c1c..192f1a9e8 100644 --- a/simpa/src/main.rs +++ b/simpa/src/main.rs @@ -73,6 +73,10 @@ struct Args { #[arg(short = 'f', long, default_value_t = false)] headers_first: bool, + /// Applies a scale factor to memory allocation bounds + #[arg(long, default_value_t = 1.0)] + ram_scale: f64, + /// Logging level for all subsystems {off, error, warn, info, debug, trace} /// -- You may also specify =,=,... to set the log level for individual subsystems #[arg(long = "loglevel", default_value = format!("info,{}=trace", env!("CARGO_PKG_NAME")))] @@ -177,6 +181,7 @@ fn main_impl(mut args: Args) { .apply_args(|config| apply_args_to_consensus_params(&args, &mut config.params)) .apply_args(|config| apply_args_to_perf_params(&args, &mut config.perf)) .adjust_perf_params_to_consensus_params() + .apply_args(|config| config.ram_scale = args.ram_scale) .skip_proof_of_work() .enable_sanity_checks(); if !args.test_pruning { diff --git a/utils/src/mem_size.rs b/utils/src/mem_size.rs index 01b87c433..c7963a40c 100644 --- a/utils/src/mem_size.rs +++ b/utils/src/mem_size.rs @@ -15,10 +15,10 @@ pub enum MemMode { } /// The contract for estimating deep memory size owned by this object. Implementors -/// are expected to support only a single function - bytes or units. Objects with pre-compilation -/// known static size or which are containers of items with known static size should implement the `_units` -/// estimation and return the number of logical items (either 1 or the number of items in -/// the container). Objects with varying runtime sizes should implement the `_bytes` estimation. +/// are expected to support only a single function - bytes or units. Objects which are +/// containers of items with pre-compilation known static size should implement the `_units` +/// estimation and return the number of logical items (i.e. number of items in +/// the container). Objects with more complex and varying runtime sizes should implement the `_bytes` estimation. /// /// By panicking on the remaining unimplemented function we ensure that tests will catch any inconsistency over the /// used units between the object implementing the contract and the code using its size for various purposes (e.g. cache @@ -47,46 +47,14 @@ pub trait MemSizeEstimator { } } -impl MemSizeEstimator for u64 { - fn estimate_mem_units(&self) -> usize { - 1 - } -} -impl MemSizeEstimator for u32 { - fn estimate_mem_units(&self) -> usize { - 1 - } -} -impl MemSizeEstimator for u16 { - fn estimate_mem_units(&self) -> usize { - 1 - } -} -impl MemSizeEstimator for u8 { - fn estimate_mem_units(&self) -> usize { - 1 - } -} -impl MemSizeEstimator for i64 { - fn estimate_mem_units(&self) -> usize { - 1 - } -} -impl MemSizeEstimator for i32 { - fn estimate_mem_units(&self) -> usize { - 1 - } -} -impl MemSizeEstimator for i16 { - fn estimate_mem_units(&self) -> usize { - 1 - } -} -impl MemSizeEstimator for i8 { - fn estimate_mem_units(&self) -> usize { - 1 - } -} +impl MemSizeEstimator for u64 {} +impl MemSizeEstimator for u32 {} +impl MemSizeEstimator for u16 {} +impl MemSizeEstimator for u8 {} +impl MemSizeEstimator for i64 {} +impl MemSizeEstimator for i32 {} +impl MemSizeEstimator for i16 {} +impl MemSizeEstimator for i8 {} impl MemSizeEstimator for Vec { fn estimate_mem_units(&self) -> usize {