diff --git a/consensus/core/src/config/mod.rs b/consensus/core/src/config/mod.rs index d62bf15a9..2ed1561db 100644 --- a/consensus/core/src/config/mod.rs +++ b/consensus/core/src/config/mod.rs @@ -68,6 +68,8 @@ pub struct Config { /// A scale factor to apply to memory allocation bounds pub ram_scale: f64, + + pub rocksdb_cache_size: Option, } impl Config { @@ -95,6 +97,7 @@ impl Config { initial_utxo_set: Default::default(), disable_upnp: false, ram_scale: 1.0, + rocksdb_cache_size: None, } } diff --git a/consensus/src/consensus/factory.rs b/consensus/src/consensus/factory.rs index f3ee51d9c..6f22c3322 100644 --- a/consensus/src/consensus/factory.rs +++ b/consensus/src/consensus/factory.rs @@ -292,13 +292,19 @@ impl ConsensusFactory for Factory { }; let dir = self.db_root_dir.join(entry.directory_name.clone()); - let db = kaspa_database::prelude::ConnBuilder::default() - .with_db_path(dir) - .with_parallelism(self.db_parallelism) - .with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets - .build() - .unwrap(); + let db = { + let builder = kaspa_database::prelude::ConnBuilder::default() + .with_db_path(dir) + .with_parallelism(self.db_parallelism) + .with_files_limit(self.fd_budget / 2); // active and staging consensuses should have equal budgets + if let Some(size) = self.config.rocksdb_cache_size { + builder.with_cache_size(size).build() + } else { + builder.build() + } + } + .unwrap(); let session_lock = SessionLock::new(); let consensus = Arc::new(Consensus::new( db.clone(), @@ -326,13 +332,18 @@ impl ConsensusFactory for Factory { let entry = self.management_store.write().new_staging_consensus_entry().unwrap(); let dir = self.db_root_dir.join(entry.directory_name); - let db = kaspa_database::prelude::ConnBuilder::default() - .with_db_path(dir) - .with_parallelism(self.db_parallelism) - .with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets - .build() - .unwrap(); - + let db = { + let builder = kaspa_database::prelude::ConnBuilder::default() + .with_db_path(dir) + .with_parallelism(self.db_parallelism) + .with_files_limit(self.fd_budget / 2); // active and staging consensuses should have equal budgets + if let Some(size) = self.config.rocksdb_cache_size { + builder.with_cache_size(size).build() + } else { + builder.build() + } + } + .unwrap(); let session_lock = SessionLock::new(); let consensus = Arc::new(Consensus::new( db.clone(), diff --git a/database/src/db/conn_builder.rs b/database/src/db/conn_builder.rs index 6de330f81..d1ee61138 100644 --- a/database/src/db/conn_builder.rs +++ b/database/src/db/conn_builder.rs @@ -1,21 +1,26 @@ use crate::db::DB; -use rocksdb::{DBWithThreadMode, MultiThreaded}; -use std::{path::PathBuf, sync::Arc}; +use rocksdb::{BlockBasedOptions, Cache, DBCompressionType, DBWithThreadMode, MultiThreaded}; +use std::{path::PathBuf, sync::Arc, thread::available_parallelism}; + +const KB: usize = 1024; +const MB: usize = 1024 * KB; +const GB: usize = 1024 * MB; #[derive(Debug)] pub struct Unspecified; #[derive(Debug)] -pub struct ConnBuilder { +pub struct ConnBuilder { db_path: Path, create_if_missing: bool, parallelism: usize, files_limit: FDLimit, mem_budget: usize, stats_period: StatsPeriod, + cache_size: CacheSize, } -impl Default for ConnBuilder { +impl Default for ConnBuilder { fn default() -> Self { ConnBuilder { db_path: Unspecified, @@ -24,12 +29,15 @@ impl Default for ConnBuilder { mem_budget: 64 * 1024 * 1024, stats_period: Unspecified, files_limit: Unspecified, + cache_size: Unspecified, } } } -impl ConnBuilder { - pub fn with_db_path(self, db_path: PathBuf) -> ConnBuilder { +impl + ConnBuilder +{ + pub fn with_db_path(self, db_path: PathBuf) -> ConnBuilder { ConnBuilder { db_path, files_limit: self.files_limit, @@ -37,18 +45,19 @@ impl ConnBuilder ConnBuilder { + pub fn with_create_if_missing(self, create_if_missing: bool) -> ConnBuilder { ConnBuilder { create_if_missing, ..self } } - pub fn with_parallelism(self, parallelism: impl Into) -> ConnBuilder { + pub fn with_parallelism(self, parallelism: impl Into) -> ConnBuilder { ConnBuilder { parallelism: parallelism.into(), ..self } } - pub fn with_mem_budget(self, mem_budget: impl Into) -> ConnBuilder { + pub fn with_mem_budget(self, mem_budget: impl Into) -> ConnBuilder { ConnBuilder { mem_budget: mem_budget.into(), ..self } } - pub fn with_files_limit(self, files_limit: impl Into) -> ConnBuilder { + pub fn with_files_limit(self, files_limit: impl Into) -> ConnBuilder { ConnBuilder { db_path: self.db_path, files_limit: files_limit.into(), @@ -56,12 +65,24 @@ impl ConnBuilder ConnBuilder { + ConnBuilder { + db_path: self.db_path, + create_if_missing: self.create_if_missing, + parallelism: self.parallelism, + files_limit: self.files_limit, + mem_budget: self.mem_budget, + stats_period: Unspecified, + cache_size: self.cache_size, } } } -impl ConnBuilder { - pub fn enable_stats(self) -> ConnBuilder { +impl ConnBuilder { + pub fn enable_stats(self) -> ConnBuilder { ConnBuilder { db_path: self.db_path, create_if_missing: self.create_if_missing, @@ -69,29 +90,35 @@ impl ConnBuilder { files_limit: self.files_limit, mem_budget: self.mem_budget, stats_period: self.stats_period, + cache_size: self.cache_size, } } } -impl ConnBuilder { - pub fn disable_stats(self) -> ConnBuilder { +impl ConnBuilder { + pub fn with_stats_period(self, stats_period: impl Into) -> ConnBuilder { ConnBuilder { db_path: self.db_path, create_if_missing: self.create_if_missing, parallelism: self.parallelism, files_limit: self.files_limit, mem_budget: self.mem_budget, - stats_period: Unspecified, + stats_period: stats_period.into(), + cache_size: self.cache_size, } } - pub fn with_stats_period(self, stats_period: impl Into) -> ConnBuilder { +} + +impl ConnBuilder { + pub fn with_cache_size(self, cache_size: usize) -> ConnBuilder { ConnBuilder { db_path: self.db_path, create_if_missing: self.create_if_missing, parallelism: self.parallelism, files_limit: self.files_limit, mem_budget: self.mem_budget, - stats_period: stats_period.into(), + stats_period: self.stats_period, + cache_size, } } } @@ -103,7 +130,53 @@ macro_rules! default_opts { opts.increase_parallelism($self.parallelism as i32); } - opts.optimize_level_style_compaction($self.mem_budget); + { + // based on optimized by default + opts.set_max_background_jobs((available_parallelism().unwrap().get() / 2) as i32); + opts.set_compaction_readahead_size(2 * MB); + opts.set_level_zero_stop_writes_trigger(36); + opts.set_level_zero_slowdown_writes_trigger(20); + opts.set_max_compaction_bytes(2 * MB as u64 * 100); + } + { + let buffer_size = 32usize * MB; + let min_to_merge = 4i32; + let max_buffers = 16; + let trigger = 12i32; + let max_level_base = min_to_merge as u64 * trigger as u64 * buffer_size as u64; + + opts.set_target_file_size_base(32 * MB as u64); + opts.set_max_bytes_for_level_multiplier(4.0); + + opts.set_write_buffer_size(buffer_size); + opts.set_max_write_buffer_number(max_buffers); + opts.set_min_write_buffer_number_to_merge(min_to_merge); + opts.set_level_zero_file_num_compaction_trigger(trigger); + opts.set_max_bytes_for_level_base(max_level_base); + + opts.set_wal_bytes_per_sync(1000 * KB as u64); // suggested by advisor + opts.set_use_direct_io_for_flush_and_compaction(true); // should decrease write amp + opts.set_keep_log_file_num(1); // good for analytics + opts.set_bytes_per_sync(MB as u64); + opts.set_max_total_wal_size(GB as u64); + opts.set_compression_per_level(&[ + DBCompressionType::None, + DBCompressionType::Lz4, + DBCompressionType::Lz4, + DBCompressionType::Lz4, + DBCompressionType::Lz4, + DBCompressionType::Lz4, + DBCompressionType::Lz4, + ]); + opts.set_level_compaction_dynamic_level_bytes(true); // default option since 8.4 https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true-recommended-default-since-version-84 + + // Create BlockBasedOptions and set block size + let mut block_opts = rocksdb::BlockBasedOptions::default(); + block_opts.set_bloom_filter(4.9, true); // increases ram, trade-off, needs to be tested + block_opts.set_block_size(128 * KB); + $self.cache_size.apply_cache_size(&mut block_opts); + opts.set_block_based_table_factory(&block_opts); + } let guard = kaspa_utils::fd_budget::acquire_guard($self.files_limit)?; opts.set_max_open_files($self.files_limit); opts.create_if_missing($self.create_if_missing); @@ -111,7 +184,7 @@ macro_rules! default_opts { }}; } -impl ConnBuilder { +impl ConnBuilder { pub fn build(self) -> Result, kaspa_utils::fd_budget::Error> { let (opts, guard) = default_opts!(self)?; let db = Arc::new(DB::new(>::open(&opts, self.db_path.to_str().unwrap()).unwrap(), guard)); @@ -119,16 +192,17 @@ impl ConnBuilder { } } -impl ConnBuilder { +impl ConnBuilder { pub fn build(self) -> Result, kaspa_utils::fd_budget::Error> { let (mut opts, guard) = default_opts!(self)?; opts.enable_statistics(); + opts.set_report_bg_io_stats(true); let db = Arc::new(DB::new(>::open(&opts, self.db_path.to_str().unwrap()).unwrap(), guard)); Ok(db) } } -impl ConnBuilder { +impl ConnBuilder { pub fn build(self) -> Result, kaspa_utils::fd_budget::Error> { let (mut opts, guard) = default_opts!(self)?; opts.enable_statistics(); @@ -138,3 +212,18 @@ impl ConnBuilder { Ok(db) } } + +pub trait ApplyCacheSize { + fn apply_cache_size(&self, options: &mut BlockBasedOptions); +} + +impl ApplyCacheSize for usize { + fn apply_cache_size(&self, options: &mut BlockBasedOptions) { + let cache = Cache::new_lru_cache(*self); + options.set_block_cache(&cache); + } +} + +impl ApplyCacheSize for Unspecified { + fn apply_cache_size(&self, _options: &mut BlockBasedOptions) {} +} diff --git a/kaspad/src/args.rs b/kaspad/src/args.rs index 2774269d3..293840aac 100644 --- a/kaspad/src/args.rs +++ b/kaspad/src/args.rs @@ -90,6 +90,7 @@ pub struct Args { #[serde(rename = "nogrpc")] pub disable_grpc: bool, pub ram_scale: f64, + pub rocksdb_consensus_cache_size: Option, } impl Default for Args { @@ -140,6 +141,7 @@ impl Default for Args { disable_dns_seeding: false, disable_grpc: false, ram_scale: 1.0, + rocksdb_consensus_cache_size: None, } } } @@ -159,6 +161,7 @@ impl Args { config.p2p_listen_address = self.listen.unwrap_or(ContextualNetAddress::unspecified()); config.externalip = self.externalip.map(|v| v.normalize(config.default_p2p_port())); config.ram_scale = self.ram_scale; + config.rocksdb_cache_size = self.rocksdb_consensus_cache_size; #[cfg(feature = "devnet-prealloc")] if let Some(num_prealloc_utxos) = self.num_prealloc_utxos { @@ -369,6 +372,7 @@ Setting to 0 prevents the preallocation and sets the maximum to {}, leading to 0 .help("Apply a scale factor to memory allocation bounds. Nodes with limited RAM (~4-8GB) should set this to ~0.3-0.5 respectively. Nodes with a large RAM (~64GB) can set this value to ~3.0-4.0 and gain superior performance especially for syncing peers faster"), ) + .arg(Arg::new("rocksdb-consensus-cache-size").long("rocksdb-consensus-cache-size").require_equals(true).value_parser(clap::value_parser!(u64))) ; #[cfg(feature = "devnet-prealloc")] @@ -448,6 +452,7 @@ impl Args { disable_dns_seeding: arg_match_unwrap_or::(&m, "nodnsseed", defaults.disable_dns_seeding), disable_grpc: arg_match_unwrap_or::(&m, "nogrpc", defaults.disable_grpc), ram_scale: arg_match_unwrap_or::(&m, "ram-scale", defaults.ram_scale), + rocksdb_consensus_cache_size: m.get_one::("rocksdb-consensus-cache-size").map(|v| *v as usize), #[cfg(feature = "devnet-prealloc")] num_prealloc_utxos: m.get_one::("num-prealloc-utxos").cloned(),