Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rocksdb options tuning #426

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions consensus/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ pub struct Config {

/// A scale factor to apply to memory allocation bounds
pub ram_scale: f64,

pub rocksdb_cache_size: Option<usize>,
}

impl Config {
Expand Down Expand Up @@ -95,6 +97,7 @@ impl Config {
initial_utxo_set: Default::default(),
disable_upnp: false,
ram_scale: 1.0,
rocksdb_cache_size: None,
}
}

Expand Down
37 changes: 24 additions & 13 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,13 +292,19 @@ impl ConsensusFactory for Factory {
};

let dir = self.db_root_dir.join(entry.directory_name.clone());
let db = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets
.build()
.unwrap();

let db = {
let builder = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2); // active and staging consensuses should have equal budgets
if let Some(size) = self.config.rocksdb_cache_size {
builder.with_cache_size(size).build()
} else {
builder.build()
}
}
.unwrap();
let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
db.clone(),
Expand Down Expand Up @@ -326,13 +332,18 @@ impl ConsensusFactory for Factory {

let entry = self.management_store.write().new_staging_consensus_entry().unwrap();
let dir = self.db_root_dir.join(entry.directory_name);
let db = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2) // active and staging consensuses should have equal budgets
.build()
.unwrap();

let db = {
let builder = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(self.fd_budget / 2); // active and staging consensuses should have equal budgets
if let Some(size) = self.config.rocksdb_cache_size {
builder.with_cache_size(size).build()
} else {
builder.build()
}
}
.unwrap();
let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
db.clone(),
Expand Down
131 changes: 110 additions & 21 deletions database/src/db/conn_builder.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
use crate::db::DB;
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::{path::PathBuf, sync::Arc};
use rocksdb::{BlockBasedOptions, Cache, DBCompressionType, DBWithThreadMode, MultiThreaded};
use std::{path::PathBuf, sync::Arc, thread::available_parallelism};

const KB: usize = 1024;
const MB: usize = 1024 * KB;
const GB: usize = 1024 * MB;

#[derive(Debug)]
pub struct Unspecified;

#[derive(Debug)]
pub struct ConnBuilder<Path, const STATS_ENABLED: bool, StatsPeriod, FDLimit> {
pub struct ConnBuilder<Path, const STATS_ENABLED: bool, StatsPeriod, FDLimit, CacheSize> {
db_path: Path,
create_if_missing: bool,
parallelism: usize,
files_limit: FDLimit,
mem_budget: usize,
stats_period: StatsPeriod,
cache_size: CacheSize,
}

impl Default for ConnBuilder<Unspecified, false, Unspecified, Unspecified> {
impl Default for ConnBuilder<Unspecified, true, Unspecified, Unspecified, Unspecified> {
fn default() -> Self {
ConnBuilder {
db_path: Unspecified,
Expand All @@ -24,74 +29,96 @@ impl Default for ConnBuilder<Unspecified, false, Unspecified, Unspecified> {
mem_budget: 64 * 1024 * 1024,
stats_period: Unspecified,
files_limit: Unspecified,
cache_size: Unspecified,
}
}
}

impl<Path, const STATS_ENABLED: bool, StatsPeriod, FDLimit> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
pub fn with_db_path(self, db_path: PathBuf) -> ConnBuilder<PathBuf, STATS_ENABLED, StatsPeriod, FDLimit> {
impl<Path, const STATS_ENABLED: bool, StatsPeriod, FDLimit, CacheSize>
ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit, CacheSize>
{
pub fn with_db_path(self, db_path: PathBuf) -> ConnBuilder<PathBuf, STATS_ENABLED, StatsPeriod, FDLimit, CacheSize> {
ConnBuilder {
db_path,
files_limit: self.files_limit,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
mem_budget: self.mem_budget,
stats_period: self.stats_period,
cache_size: self.cache_size,
}
}
pub fn with_create_if_missing(self, create_if_missing: bool) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
pub fn with_create_if_missing(self, create_if_missing: bool) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit, CacheSize> {
ConnBuilder { create_if_missing, ..self }
}
pub fn with_parallelism(self, parallelism: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
pub fn with_parallelism(self, parallelism: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit, CacheSize> {
ConnBuilder { parallelism: parallelism.into(), ..self }
}
pub fn with_mem_budget(self, mem_budget: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
pub fn with_mem_budget(self, mem_budget: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit, CacheSize> {
ConnBuilder { mem_budget: mem_budget.into(), ..self }
}
pub fn with_files_limit(self, files_limit: impl Into<i32>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, i32> {
pub fn with_files_limit(self, files_limit: impl Into<i32>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, i32, CacheSize> {
ConnBuilder {
db_path: self.db_path,
files_limit: files_limit.into(),
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
mem_budget: self.mem_budget,
stats_period: self.stats_period,
cache_size: self.cache_size,
}
}
pub fn disable_stats(self) -> ConnBuilder<Path, false, Unspecified, FDLimit, CacheSize> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: Unspecified,
cache_size: self.cache_size,
}
}
}

impl<Path, FDLimit> ConnBuilder<Path, false, Unspecified, FDLimit> {
pub fn enable_stats(self) -> ConnBuilder<Path, true, Unspecified, FDLimit> {
impl<Path, const STATS_ENABLED: bool, FDLimit, CacheSize> ConnBuilder<Path, STATS_ENABLED, Unspecified, FDLimit, CacheSize> {
pub fn enable_stats(self) -> ConnBuilder<Path, true, Unspecified, FDLimit, CacheSize> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: self.stats_period,
cache_size: self.cache_size,
}
}
}

impl<Path, StatsPeriod, FDLimit> ConnBuilder<Path, true, StatsPeriod, FDLimit> {
pub fn disable_stats(self) -> ConnBuilder<Path, false, Unspecified, FDLimit> {
impl<Path, StatsPeriod, FDLimit, CacheSize> ConnBuilder<Path, true, StatsPeriod, FDLimit, CacheSize> {
pub fn with_stats_period(self, stats_period: impl Into<u32>) -> ConnBuilder<Path, true, u32, FDLimit, CacheSize> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: Unspecified,
stats_period: stats_period.into(),
cache_size: self.cache_size,
}
}
pub fn with_stats_period(self, stats_period: impl Into<u32>) -> ConnBuilder<Path, true, u32, FDLimit> {
}

impl<Path, StatsPeriod, FDLimit, CacheSize> ConnBuilder<Path, true, StatsPeriod, FDLimit, CacheSize> {
pub fn with_cache_size(self, cache_size: usize) -> ConnBuilder<Path, true, StatsPeriod, FDLimit, usize> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: stats_period.into(),
stats_period: self.stats_period,
cache_size,
}
}
}
Expand All @@ -103,32 +130,79 @@ macro_rules! default_opts {
opts.increase_parallelism($self.parallelism as i32);
}

opts.optimize_level_style_compaction($self.mem_budget);
{
// based on optimized by default
opts.set_max_background_jobs((available_parallelism().unwrap().get() / 2) as i32);
opts.set_compaction_readahead_size(2 * MB);
opts.set_level_zero_stop_writes_trigger(36);
opts.set_level_zero_slowdown_writes_trigger(20);
opts.set_max_compaction_bytes(2 * MB as u64 * 100);
}
{
let buffer_size = 32usize * MB;
let min_to_merge = 4i32;
let max_buffers = 16;
let trigger = 12i32;
let max_level_base = min_to_merge as u64 * trigger as u64 * buffer_size as u64;

opts.set_target_file_size_base(32 * MB as u64);
opts.set_max_bytes_for_level_multiplier(4.0);

opts.set_write_buffer_size(buffer_size);
opts.set_max_write_buffer_number(max_buffers);
opts.set_min_write_buffer_number_to_merge(min_to_merge);
opts.set_level_zero_file_num_compaction_trigger(trigger);
opts.set_max_bytes_for_level_base(max_level_base);

opts.set_wal_bytes_per_sync(1000 * KB as u64); // suggested by advisor
opts.set_use_direct_io_for_flush_and_compaction(true); // should decrease write amp
opts.set_keep_log_file_num(1); // good for analytics
opts.set_bytes_per_sync(MB as u64);
opts.set_max_total_wal_size(GB as u64);
opts.set_compression_per_level(&[
DBCompressionType::None,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
DBCompressionType::Lz4,
]);
opts.set_level_compaction_dynamic_level_bytes(true); // default option since 8.4 https://github.com/facebook/rocksdb/wiki/Leveled-Compaction#level_compaction_dynamic_level_bytes-is-true-recommended-default-since-version-84

// Create BlockBasedOptions and set block size
let mut block_opts = rocksdb::BlockBasedOptions::default();
block_opts.set_bloom_filter(4.9, true); // increases ram, trade-off, needs to be tested
block_opts.set_block_size(128 * KB);
$self.cache_size.apply_cache_size(&mut block_opts);
opts.set_block_based_table_factory(&block_opts);
}
let guard = kaspa_utils::fd_budget::acquire_guard($self.files_limit)?;
opts.set_max_open_files($self.files_limit);
opts.create_if_missing($self.create_if_missing);
Ok((opts, guard))
}};
}

impl ConnBuilder<PathBuf, false, Unspecified, i32> {
impl<T: ApplyCacheSize> ConnBuilder<PathBuf, false, Unspecified, i32, T> {
pub fn build(self) -> Result<Arc<DB>, kaspa_utils::fd_budget::Error> {
let (opts, guard) = default_opts!(self)?;
let db = Arc::new(DB::new(<DBWithThreadMode<MultiThreaded>>::open(&opts, self.db_path.to_str().unwrap()).unwrap(), guard));
Ok(db)
}
}

impl ConnBuilder<PathBuf, true, Unspecified, i32> {
impl<T: ApplyCacheSize> ConnBuilder<PathBuf, true, Unspecified, i32, T> {
pub fn build(self) -> Result<Arc<DB>, kaspa_utils::fd_budget::Error> {
let (mut opts, guard) = default_opts!(self)?;
opts.enable_statistics();
opts.set_report_bg_io_stats(true);
let db = Arc::new(DB::new(<DBWithThreadMode<MultiThreaded>>::open(&opts, self.db_path.to_str().unwrap()).unwrap(), guard));
Ok(db)
}
}

impl ConnBuilder<PathBuf, true, u32, i32> {
impl<T: ApplyCacheSize> ConnBuilder<PathBuf, true, u32, i32, T> {
pub fn build(self) -> Result<Arc<DB>, kaspa_utils::fd_budget::Error> {
let (mut opts, guard) = default_opts!(self)?;
opts.enable_statistics();
Expand All @@ -138,3 +212,18 @@ impl ConnBuilder<PathBuf, true, u32, i32> {
Ok(db)
}
}

pub trait ApplyCacheSize {
fn apply_cache_size(&self, options: &mut BlockBasedOptions);
}

impl ApplyCacheSize for usize {
fn apply_cache_size(&self, options: &mut BlockBasedOptions) {
let cache = Cache::new_lru_cache(*self);
options.set_block_cache(&cache);
}
}

impl ApplyCacheSize for Unspecified {
fn apply_cache_size(&self, _options: &mut BlockBasedOptions) {}
}
5 changes: 5 additions & 0 deletions kaspad/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ pub struct Args {
#[serde(rename = "nogrpc")]
pub disable_grpc: bool,
pub ram_scale: f64,
pub rocksdb_consensus_cache_size: Option<usize>,
}

impl Default for Args {
Expand Down Expand Up @@ -140,6 +141,7 @@ impl Default for Args {
disable_dns_seeding: false,
disable_grpc: false,
ram_scale: 1.0,
rocksdb_consensus_cache_size: None,
}
}
}
Expand All @@ -159,6 +161,7 @@ impl Args {
config.p2p_listen_address = self.listen.unwrap_or(ContextualNetAddress::unspecified());
config.externalip = self.externalip.map(|v| v.normalize(config.default_p2p_port()));
config.ram_scale = self.ram_scale;
config.rocksdb_cache_size = self.rocksdb_consensus_cache_size;

#[cfg(feature = "devnet-prealloc")]
if let Some(num_prealloc_utxos) = self.num_prealloc_utxos {
Expand Down Expand Up @@ -369,6 +372,7 @@ Setting to 0 prevents the preallocation and sets the maximum to {}, leading to 0
.help("Apply a scale factor to memory allocation bounds. Nodes with limited RAM (~4-8GB) should set this to ~0.3-0.5 respectively. Nodes with
a large RAM (~64GB) can set this value to ~3.0-4.0 and gain superior performance especially for syncing peers faster"),
)
.arg(Arg::new("rocksdb-consensus-cache-size").long("rocksdb-consensus-cache-size").require_equals(true).value_parser(clap::value_parser!(u64)))
;

#[cfg(feature = "devnet-prealloc")]
Expand Down Expand Up @@ -448,6 +452,7 @@ impl Args {
disable_dns_seeding: arg_match_unwrap_or::<bool>(&m, "nodnsseed", defaults.disable_dns_seeding),
disable_grpc: arg_match_unwrap_or::<bool>(&m, "nogrpc", defaults.disable_grpc),
ram_scale: arg_match_unwrap_or::<f64>(&m, "ram-scale", defaults.ram_scale),
rocksdb_consensus_cache_size: m.get_one::<u32>("rocksdb-consensus-cache-size").map(|v| *v as usize),

#[cfg(feature = "devnet-prealloc")]
num_prealloc_utxos: m.get_one::<u64>("num-prealloc-utxos").cloned(),
Expand Down
Loading