Skip to content

Commit

Permalink
Add a RAM usage scale factor configurable from the command line (kasp…
Browse files Browse the repository at this point in the history
…anet#374)

* track block txs by approx bytes

* 0.5 scale factor on byte budgets

* fix obsolete comment (unrelated to this pr)

* reduce orphan block flood

* use tracked bytes mode for acceptance data cache

* impl `estimate_mem_units` only when actually used

* propagate ram scale arg from the command line

* add to simpa as well

* make it clear that mass is an average

* fix cpu usage stat
  • Loading branch information
michaelsutton committed Jan 4, 2024
1 parent 0c76768 commit 697e5d3
Show file tree
Hide file tree
Showing 27 changed files with 167 additions and 173 deletions.
6 changes: 1 addition & 5 deletions components/addressmanager/src/stores/address_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ pub struct Entry {
pub address: NetAddress,
}

impl MemSizeEstimator for Entry {
fn estimate_mem_units(&self) -> usize {
1
}
}
impl MemSizeEstimator for Entry {}

pub trait AddressesStoreReader {
fn get(&self, key: AddressKey) -> Result<Entry, StoreError>;
Expand Down
6 changes: 1 addition & 5 deletions components/addressmanager/src/stores/banned_address_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,7 @@ use std::{error::Error, fmt::Display, sync::Arc};
#[derive(Clone, Copy, Serialize, Deserialize)]
pub struct ConnectionBanTimestamp(pub u64);

impl MemSizeEstimator for ConnectionBanTimestamp {
fn estimate_mem_units(&self) -> usize {
1
}
}
impl MemSizeEstimator for ConnectionBanTimestamp {}

pub trait BannedAddressesStoreReader {
fn get(&self, address: IpAddr) -> Result<ConnectionBanTimestamp, StoreError>;
Expand Down
6 changes: 1 addition & 5 deletions consensus/core/src/blockstatus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,7 @@ pub enum BlockStatus {
StatusHeaderOnly,
}

impl MemSizeEstimator for BlockStatus {
fn estimate_mem_units(&self) -> usize {
1
}
}
impl MemSizeEstimator for BlockStatus {}

impl BlockStatus {
pub fn has_block_header(self) -> bool {
Expand Down
7 changes: 6 additions & 1 deletion consensus/core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct Config {

pub user_agent_comments: Vec<String>,

// If undefined, sets it to 0.0.0.0
/// If undefined, sets it to 0.0.0.0
pub p2p_listen_address: ContextualNetAddress,

pub externalip: Option<NetAddress>,
Expand All @@ -63,7 +63,11 @@ pub struct Config {

#[cfg(feature = "devnet-prealloc")]
pub initial_utxo_set: Arc<UtxoCollection>,

pub disable_upnp: bool,

/// A scale factor to apply to memory allocation bounds
pub ram_scale: f64,
}

impl Config {
Expand All @@ -90,6 +94,7 @@ impl Config {
#[cfg(feature = "devnet-prealloc")]
initial_utxo_set: Default::default(),
disable_upnp: false,
ram_scale: 1.0,
}
}

Expand Down
6 changes: 6 additions & 0 deletions consensus/core/src/errors/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ pub enum ConfigError {
#[error("Configuration: --logdir and --nologfiles cannot be used together")]
MixedLogDirAndNoLogFiles,

#[error("Configuration: --ram-scale cannot be set below 0.1")]
RamScaleTooLow,

#[error("Configuration: --ram-scale cannot be set above 10.0")]
RamScaleTooHigh,

#[cfg(feature = "devnet-prealloc")]
#[error("Cannot preallocate UTXOs on any network except devnet")]
PreallocUtxosOnNonDevnet,
Expand Down
6 changes: 1 addition & 5 deletions consensus/core/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,7 @@ impl UtxoEntry {
}
}

impl MemSizeEstimator for UtxoEntry {
fn estimate_mem_units(&self) -> usize {
1
}
}
impl MemSizeEstimator for UtxoEntry {}

pub type TransactionIndexType = u32;

Expand Down
6 changes: 1 addition & 5 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ pub struct ConsensusEntry {
creation_timestamp: u64,
}

impl MemSizeEstimator for ConsensusEntry {
fn estimate_mem_units(&self) -> usize {
1
}
}
impl MemSizeEstimator for ConsensusEntry {}

impl ConsensusEntry {
pub fn new(key: u64, directory_name: String, creation_timestamp: u64) -> Self {
Expand Down
35 changes: 20 additions & 15 deletions consensus/src/consensus/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ pub struct ConsensusStorage {

impl ConsensusStorage {
pub fn new(db: Arc<DB>, config: Arc<Config>) -> Arc<Self> {
let scale_factor = config.ram_scale;
let scaled = |s| (s as f64 * scale_factor) as usize;

let params = &config.params;
let perf_params = &config.perf;

Expand All @@ -79,18 +82,20 @@ impl ConsensusStorage {
let level_lower_bound = 2 * params.pruning_proof_m as usize; // Number of items lower bound for level-related caches

// Budgets in bytes. All byte budgets overall sum up to ~1GB of memory (which obviously takes more low level alloc space)
let daa_excluded_budget = 30_000_000;
let statuses_budget = 30_000_000;
let reachability_data_budget = 20_000_000;
let reachability_sets_budget = 20_000_000; // x 2 for tree children and future covering set
let ghostdag_compact_budget = 15_000_000;
let headers_compact_budget = 5_000_000;
let parents_budget = 40_000_000; // x 3 for reachability and levels
let children_budget = 5_000_000; // x 3 for reachability and levels
let ghostdag_budget = 80_000_000; // x 2 for levels
let headers_budget = 80_000_000;
let utxo_diffs_budget = 40_000_000;
let block_window_budget = 200_000_000; // x 2 for difficulty and median time
let daa_excluded_budget = scaled(30_000_000);
let statuses_budget = scaled(30_000_000);
let reachability_data_budget = scaled(20_000_000);
let reachability_sets_budget = scaled(20_000_000); // x 2 for tree children and future covering set
let ghostdag_compact_budget = scaled(15_000_000);
let headers_compact_budget = scaled(5_000_000);
let parents_budget = scaled(40_000_000); // x 3 for reachability and levels
let children_budget = scaled(5_000_000); // x 3 for reachability and levels
let ghostdag_budget = scaled(80_000_000); // x 2 for levels
let headers_budget = scaled(80_000_000);
let transactions_budget = scaled(40_000_000);
let utxo_diffs_budget = scaled(40_000_000);
let block_window_budget = scaled(200_000_000); // x 2 for difficulty and median time
let acceptance_data_budget = scaled(40_000_000);

// Unit sizes in bytes
let daa_excluded_bytes = size_of::<Hash>() + size_of::<BlockHashSet>(); // Expected empty sets
Expand Down Expand Up @@ -150,10 +155,10 @@ impl ConsensusStorage {
let block_data_builder = PolicyBuilder::new().max_items(perf_params.block_data_cache_size).untracked();
let header_data_builder = PolicyBuilder::new().max_items(perf_params.header_data_cache_size).untracked();
let utxo_set_builder = PolicyBuilder::new().max_items(perf_params.utxo_set_cache_size).untracked();
let transactions_builder = PolicyBuilder::new().max_items(40_000).tracked_units(); // Tracked units are txs.
let transactions_builder = PolicyBuilder::new().bytes_budget(transactions_budget).tracked_bytes();
let acceptance_data_builder = PolicyBuilder::new().bytes_budget(acceptance_data_budget).tracked_bytes();
let past_pruning_points_builder = PolicyBuilder::new().max_items(1024).untracked();

// TODO: consider tracking transactions by bytes (preferably by saving the size in a field on the block level)
// TODO: consider tracking UtxoDiff byte sizes more accurately including the exact size of ScriptPublicKey

// Headers
Expand Down Expand Up @@ -210,7 +215,7 @@ impl ConsensusStorage {
let block_transactions_store = Arc::new(DbBlockTransactionsStore::new(db.clone(), transactions_builder.build()));
let utxo_diffs_store = Arc::new(DbUtxoDiffsStore::new(db.clone(), utxo_diffs_builder.build()));
let utxo_multisets_store = Arc::new(DbUtxoMultisetsStore::new(db.clone(), block_data_builder.build()));
let acceptance_data_store = Arc::new(DbAcceptanceDataStore::new(db.clone(), block_data_builder.build()));
let acceptance_data_store = Arc::new(DbAcceptanceDataStore::new(db.clone(), acceptance_data_builder.build()));

// Tips
let headers_selected_tip_store = Arc::new(RwLock::new(DbHeadersSelectedTipStore::new(db.clone())));
Expand Down
27 changes: 23 additions & 4 deletions consensus/src/model/stores/acceptance_data.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
use kaspa_consensus_core::acceptance_data::AcceptanceData;
use kaspa_consensus_core::acceptance_data::AcceptedTxEntry;
use kaspa_consensus_core::acceptance_data::MergesetBlockAcceptanceData;
use kaspa_consensus_core::BlockHasher;
use kaspa_database::prelude::CachePolicy;
use kaspa_database::prelude::StoreError;
use kaspa_database::prelude::DB;
use kaspa_database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter};
use kaspa_database::registry::DatabaseStorePrefixes;
use kaspa_hashes::Hash;
use kaspa_utils::mem_size::MemSizeEstimator;
use rocksdb::WriteBatch;
use serde::Deserialize;
use serde::Serialize;
use std::mem::size_of;
use std::sync::Arc;

pub trait AcceptanceDataStoreReader {
Expand All @@ -18,11 +24,24 @@ pub trait AcceptanceDataStore: AcceptanceDataStoreReader {
fn delete(&self, hash: Hash) -> Result<(), StoreError>;
}

/// Simple wrapper for implementing `MemSizeEstimator`
#[derive(Clone, Serialize, Deserialize)]
struct AcceptanceDataEntry(Arc<AcceptanceData>);

impl MemSizeEstimator for AcceptanceDataEntry {
fn estimate_mem_bytes(&self) -> usize {
self.0.iter().map(|l| l.accepted_transactions.len()).sum::<usize>() * size_of::<AcceptedTxEntry>()
+ self.0.len() * size_of::<MergesetBlockAcceptanceData>()
+ size_of::<AcceptanceData>()
+ size_of::<Self>()
}
}

/// A DB + cache implementation of `DbAcceptanceDataStore` trait, with concurrency support.
#[derive(Clone)]
pub struct DbAcceptanceDataStore {
db: Arc<DB>,
access: CachedDbAccess<Hash, Arc<AcceptanceData>, BlockHasher>,
access: CachedDbAccess<Hash, AcceptanceDataEntry, BlockHasher>,
}

impl DbAcceptanceDataStore {
Expand All @@ -38,7 +57,7 @@ impl DbAcceptanceDataStore {
if self.access.has(hash)? {
return Err(StoreError::HashAlreadyExists(hash));
}
self.access.write(BatchDbWriter::new(batch), hash, acceptance_data)?;
self.access.write(BatchDbWriter::new(batch), hash, AcceptanceDataEntry(acceptance_data))?;
Ok(())
}

Expand All @@ -49,7 +68,7 @@ impl DbAcceptanceDataStore {

impl AcceptanceDataStoreReader for DbAcceptanceDataStore {
fn get(&self, hash: Hash) -> Result<Arc<AcceptanceData>, StoreError> {
self.access.read(hash)
Ok(self.access.read(hash)?.0)
}
}

Expand All @@ -58,7 +77,7 @@ impl AcceptanceDataStore for DbAcceptanceDataStore {
if self.access.has(hash)? {
return Err(StoreError::HashAlreadyExists(hash));
}
self.access.write(DirectDbWriter::new(&self.db), hash, acceptance_data)?;
self.access.write(DirectDbWriter::new(&self.db), hash, AcceptanceDataEntry(acceptance_data))?;
Ok(())
}

Expand Down
35 changes: 29 additions & 6 deletions consensus/src/model/stores/block_transactions.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use std::sync::Arc;

use kaspa_consensus_core::tx::{TransactionInput, TransactionOutput};
use kaspa_consensus_core::{tx::Transaction, BlockHasher};
use kaspa_database::prelude::CachePolicy;
use kaspa_database::prelude::StoreError;
use kaspa_database::prelude::DB;
use kaspa_database::prelude::{BatchDbWriter, CachedDbAccess, DirectDbWriter};
use kaspa_database::registry::DatabaseStorePrefixes;
use kaspa_hashes::Hash;
use kaspa_utils::mem_size::MemSizeEstimator;
use rocksdb::WriteBatch;
use serde::{Deserialize, Serialize};
use std::mem::size_of;
use std::sync::Arc;

pub trait BlockTransactionsStoreReader {
fn get(&self, hash: Hash) -> Result<Arc<Vec<Transaction>>, StoreError>;
Expand All @@ -19,11 +22,31 @@ pub trait BlockTransactionsStore: BlockTransactionsStoreReader {
fn delete(&self, hash: Hash) -> Result<(), StoreError>;
}

#[derive(Clone, Serialize, Deserialize)]
struct BlockBody(Arc<Vec<Transaction>>);

impl MemSizeEstimator for BlockBody {
fn estimate_mem_bytes(&self) -> usize {
const NORMAL_SIG_SIZE: usize = 66;
let (inputs, outputs) = self.0.iter().fold((0, 0), |(ins, outs), tx| (ins + tx.inputs.len(), outs + tx.outputs.len()));
// TODO: consider tracking transactions by bytes accurately (preferably by saving the size in a field)
// We avoid zooming in another level and counting exact bytes for sigs and scripts for performance reasons.
// Outliers with longer signatures are rare enough and their size is eventually bounded by mempool standards
// or in the worst case by max block mass.
// A similar argument holds for spk within outputs, but in this case the constant is already counted through the SmallVec used within.
inputs * (size_of::<TransactionInput>() + NORMAL_SIG_SIZE)
+ outputs * size_of::<TransactionOutput>()
+ self.0.len() * size_of::<Transaction>()
+ size_of::<Vec<Transaction>>()
+ size_of::<Self>()
}
}

/// A DB + cache implementation of `BlockTransactionsStore` trait, with concurrency support.
#[derive(Clone)]
pub struct DbBlockTransactionsStore {
db: Arc<DB>,
access: CachedDbAccess<Hash, Arc<Vec<Transaction>>, BlockHasher>,
access: CachedDbAccess<Hash, BlockBody, BlockHasher>,
}

impl DbBlockTransactionsStore {
Expand All @@ -43,7 +66,7 @@ impl DbBlockTransactionsStore {
if self.access.has(hash)? {
return Err(StoreError::HashAlreadyExists(hash));
}
self.access.write(BatchDbWriter::new(batch), hash, transactions)?;
self.access.write(BatchDbWriter::new(batch), hash, BlockBody(transactions))?;
Ok(())
}

Expand All @@ -54,7 +77,7 @@ impl DbBlockTransactionsStore {

impl BlockTransactionsStoreReader for DbBlockTransactionsStore {
fn get(&self, hash: Hash) -> Result<Arc<Vec<Transaction>>, StoreError> {
self.access.read(hash)
Ok(self.access.read(hash)?.0)
}
}

Expand All @@ -63,7 +86,7 @@ impl BlockTransactionsStore for DbBlockTransactionsStore {
if self.access.has(hash)? {
return Err(StoreError::HashAlreadyExists(hash));
}
self.access.write(DirectDbWriter::new(&self.db), hash, transactions)?;
self.access.write(DirectDbWriter::new(&self.db), hash, BlockBody(transactions))?;
Ok(())
}

Expand Down
6 changes: 1 addition & 5 deletions consensus/src/model/stores/block_window_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,7 @@ pub struct BlockWindowHeap {
origin: WindowOrigin,
}

impl MemSizeEstimator for BlockWindowHeap {
fn estimate_mem_units(&self) -> usize {
1
}
}
impl MemSizeEstimator for BlockWindowHeap {}

impl BlockWindowHeap {
pub fn new(origin: WindowOrigin) -> Self {
Expand Down
6 changes: 1 addition & 5 deletions consensus/src/model/stores/depth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ struct BlockDepthInfo {
finality_point: Hash,
}

impl MemSizeEstimator for BlockDepthInfo {
fn estimate_mem_units(&self) -> usize {
1
}
}
impl MemSizeEstimator for BlockDepthInfo {}

/// A DB + cache implementation of `DepthStore` trait, with concurrency support.
#[derive(Clone)]
Expand Down
6 changes: 1 addition & 5 deletions consensus/src/model/stores/ghostdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ impl MemSizeEstimator for GhostdagData {
}
}

impl MemSizeEstimator for CompactGhostdagData {
fn estimate_mem_units(&self) -> usize {
1
}
}
impl MemSizeEstimator for CompactGhostdagData {}

impl From<&GhostdagData> for CompactGhostdagData {
fn from(value: &GhostdagData) -> Self {
Expand Down
6 changes: 1 addition & 5 deletions consensus/src/model/stores/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,7 @@ pub struct CompactHeaderData {
pub blue_score: u64,
}

impl MemSizeEstimator for CompactHeaderData {
fn estimate_mem_units(&self) -> usize {
1
}
}
impl MemSizeEstimator for CompactHeaderData {}

impl From<&Header> for CompactHeaderData {
fn from(header: &Header) -> Self {
Expand Down
6 changes: 1 addition & 5 deletions consensus/src/model/stores/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,7 @@ pub(crate) struct ReachabilityData {
pub height: u64,
}

impl MemSizeEstimator for ReachabilityData {
fn estimate_mem_units(&self) -> usize {
1
}
}
impl MemSizeEstimator for ReachabilityData {}

impl ReachabilityData {
pub fn new(parent: Hash, interval: Interval, height: u64) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl ConsensusMonitor {
if delta.header_counts != 0 { delta.dep_counts as f64 / delta.header_counts as f64 } else { 0f64 },
if delta.header_counts != 0 { delta.mergeset_counts as f64 / delta.header_counts as f64 } else { 0f64 },
if delta.body_counts != 0 { delta.txs_counts as f64 / delta.body_counts as f64 } else{ 0f64 },
if delta.body_counts != 0 { delta.mass_counts / delta.body_counts } else{ 0 },
if delta.body_counts != 0 { delta.mass_counts as f64 / delta.body_counts as f64 } else{ 0f64 },
);

last_snapshot = snapshot;
Expand Down
Loading

0 comments on commit 697e5d3

Please sign in to comment.