Skip to content

Commit

Permalink
Merge remote-tracking branch 'msutton-origin/cache-policy' into tn11-…
Browse files Browse the repository at this point in the history
…compiled
  • Loading branch information
coderofstuff committed Dec 14, 2023
2 parents 50c8c96 + 392a30a commit d821abc
Show file tree
Hide file tree
Showing 46 changed files with 525 additions and 256 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use itertools::{
};
use kaspa_consensus_core::config::Config;
use kaspa_core::{debug, info, task::tick::TickService, time::unix_now, warn};
use kaspa_database::prelude::{StoreResultExtensions, DB};
use kaspa_database::prelude::{CachePolicy, StoreResultExtensions, DB};
use kaspa_utils::networking::IpAddress;
use local_ip_address::list_afinet_netifas;
use parking_lot::Mutex;
Expand Down Expand Up @@ -61,7 +61,7 @@ pub struct AddressManager {
impl AddressManager {
pub fn new(config: Arc<Config>, db: Arc<DB>, tick_service: Arc<TickService>) -> (Arc<Mutex<Self>>, Option<Extender>) {
let mut instance = Self {
banned_address_store: DbBannedAddressesStore::new(db.clone(), MAX_ADDRESSES as u64),
banned_address_store: DbBannedAddressesStore::new(db.clone(), CachePolicy::Unit(MAX_ADDRESSES)),
address_store: address_store_with_cache::new(db),
local_net_addresses: Vec::new(),
config,
Expand Down Expand Up @@ -337,7 +337,7 @@ mod address_store_with_cache {
};

use itertools::Itertools;
use kaspa_database::prelude::DB;
use kaspa_database::prelude::{CachePolicy, DB};
use kaspa_utils::networking::PrefixBucket;
use rand::{
distributions::{WeightedError, WeightedIndex},
Expand All @@ -359,7 +359,7 @@ mod address_store_with_cache {

impl Store {
fn new(db: Arc<DB>) -> Self {
let db_store = DbAddressesStore::new(db, 0);
let db_store = DbAddressesStore::new(db, CachePolicy::Unit(0));
let mut addresses = HashMap::new();
for (key, entry) in db_store.iterator().map(|res| res.unwrap()) {
addresses.insert(key, entry);
Expand Down
8 changes: 5 additions & 3 deletions components/addressmanager/src/stores/address_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use kaspa_database::{
prelude::DB,
prelude::{CachePolicy, StoreError, StoreResult},
prelude::{CachedDbAccess, DirectDbWriter},
prelude::{StoreError, StoreResult},
registry::DatabaseStorePrefixes,
};
use serde::{Deserialize, Serialize};
Expand All @@ -17,6 +17,8 @@ pub struct Entry {
pub address: NetAddress,
}

impl kaspa_utils::mem_size::MemSizeEstimator for Entry {}

pub trait AddressesStoreReader {
fn get(&self, key: AddressKey) -> Result<Entry, StoreError>;
}
Expand Down Expand Up @@ -74,8 +76,8 @@ pub struct DbAddressesStore {
}

impl DbAddressesStore {
pub fn new(db: Arc<DB>, cache_size: u64) -> Self {
Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_size, DatabaseStorePrefixes::Addresses.into()) }
pub fn new(db: Arc<DB>, cache_policy: CachePolicy) -> Self {
Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_policy, DatabaseStorePrefixes::Addresses.into()) }
}

pub fn iterator(&self) -> impl Iterator<Item = Result<(AddressKey, Entry), Box<dyn Error>>> + '_ {
Expand Down
8 changes: 5 additions & 3 deletions components/addressmanager/src/stores/banned_address_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use kaspa_database::{
prelude::{CachePolicy, StoreError, StoreResult},
prelude::{CachedDbAccess, DirectDbWriter, DB},
prelude::{StoreError, StoreResult},
registry::DatabaseStorePrefixes,
};
use serde::{Deserialize, Serialize};
Expand All @@ -10,6 +10,8 @@ use std::{error::Error, fmt::Display, sync::Arc};
#[derive(Clone, Copy, Serialize, Deserialize)]
pub struct ConnectionBanTimestamp(pub u64);

impl kaspa_utils::mem_size::MemSizeEstimator for ConnectionBanTimestamp {}

pub trait BannedAddressesStoreReader {
fn get(&self, address: IpAddr) -> Result<ConnectionBanTimestamp, StoreError>;
}
Expand Down Expand Up @@ -70,8 +72,8 @@ pub struct DbBannedAddressesStore {
}

impl DbBannedAddressesStore {
pub fn new(db: Arc<DB>, cache_size: u64) -> Self {
Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_size, DatabaseStorePrefixes::BannedAddresses.into()) }
pub fn new(db: Arc<DB>, cache_policy: CachePolicy) -> Self {
Self { db: Arc::clone(&db), access: CachedDbAccess::new(db, cache_policy, DatabaseStorePrefixes::BannedAddresses.into()) }
}

pub fn iterator(&self) -> impl Iterator<Item = Result<(IpAddr, ConnectionBanTimestamp), Box<dyn Error>>> + '_ {
Expand Down
2 changes: 2 additions & 0 deletions consensus/core/src/blockstatus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub enum BlockStatus {
StatusHeaderOnly,
}

impl kaspa_utils::mem_size::MemSizeEstimator for BlockStatus {}

impl BlockStatus {
pub fn has_block_header(self) -> bool {
matches!(
Expand Down
55 changes: 15 additions & 40 deletions consensus/core/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,45 +110,40 @@ pub mod perf {
//! The constants in this module should all be revisited if mainnet consensus parameters change.
//!

use crate::{config::params::Params, header::Header, BlueWorkType};
use crate::{config::params::Params, BlueWorkType};
use kaspa_hashes::Hash;
use std::mem::size_of;

use super::consensus::NETWORK_DELAY_BOUND;

/// The default target depth for reachability reindexes.
pub const DEFAULT_REINDEX_DEPTH: u64 = 100;

/// The default slack interval used by the reachability
/// algorithm to encounter for blocks out of the selected chain.
pub const DEFAULT_REINDEX_SLACK: u64 = 1 << 12;

const BASELINE_HEADER_DATA_CACHE_SIZE: u64 = 10_000;
const BASELINE_BLOCK_DATA_CACHE_SIZE: u64 = 200;
const BASELINE_BLOCK_WINDOW_CACHE_SIZE: u64 = 2000;
const BASELINE_UTXOSET_CACHE_SIZE: u64 = 10_000;
const BASELINE_HEADER_DATA_CACHE_SIZE: usize = 10_000;
const BASELINE_BLOCK_DATA_CACHE_SIZE: usize = 200;
const BASELINE_BLOCK_WINDOW_CACHE_SIZE: usize = 2000;
const BASELINE_UTXOSET_CACHE_SIZE: usize = 10_000;

#[derive(Clone, Debug)]
pub struct PerfParams {
//
// Cache sizes
//
/// Preferred cache size for headers store
pub headers_cache_size: u64,

/// Preferred cache size for header-related data
pub header_data_cache_size: u64,
pub header_data_cache_size: usize,

/// Preferred cache size for block-body-related data which
/// is typically orders-of magnitude larger than header data
/// (Note this cannot be set to high due to severe memory consumption)
pub block_data_cache_size: u64,
pub block_data_cache_size: usize,

/// Preferred cache size for UTXO-related data
pub utxo_set_cache_size: u64,
pub utxo_set_cache_size: usize,

/// Preferred cache size for block-window-related data
pub block_window_cache_size: u64,
pub block_window_cache_size: usize,

//
// Thread-pools
Expand All @@ -163,7 +158,6 @@ pub mod perf {
}

pub const PERF_PARAMS: PerfParams = PerfParams {
headers_cache_size: BASELINE_HEADER_DATA_CACHE_SIZE,
header_data_cache_size: BASELINE_HEADER_DATA_CACHE_SIZE,
block_data_cache_size: BASELINE_BLOCK_DATA_CACHE_SIZE,
utxo_set_cache_size: BASELINE_UTXOSET_CACHE_SIZE,
Expand All @@ -174,41 +168,22 @@ pub mod perf {

impl PerfParams {
pub fn adjust_to_consensus_params(&mut self, consensus_params: &Params) {
self.block_data_cache_size *= consensus_params.bps().clamp(1, 10); // Allow caching up to 10x over the baseline
self.block_data_cache_size *= consensus_params.bps().clamp(1, 10) as usize; // Allow caching up to 10x over the baseline
self.block_window_cache_size = calculate_difficulty_window_cache_size(consensus_params);
self.headers_cache_size = calculate_headers_cache_size(consensus_params);
}
}

/// Bounds the cache size according to the "memory budget" (represented in bytes) and the approximate size of each unit in bytes
pub fn bounded_cache_size(desired_size: u64, memory_budget_bytes: u64, approx_unit_bytes: usize) -> u64 {
let max_cache_size = memory_budget_bytes / approx_unit_bytes as u64;
u64::min(desired_size, max_cache_size)
pub fn bounded_cache_size(desired_size: usize, memory_budget_bytes: usize, approx_unit_bytes: usize) -> usize {
let max_cache_size = memory_budget_bytes / approx_unit_bytes;
usize::min(desired_size, max_cache_size)
}

pub fn calculate_difficulty_window_cache_size(consensus_params: &Params) -> u64 {
let window_memory_budget = 250_000_000u64; // 250MB
pub fn calculate_difficulty_window_cache_size(consensus_params: &Params) -> usize {
let window_memory_budget = 250_000_000usize; // 250MB
let single_window_byte_size = consensus_params.difficulty_window_size(0) * (size_of::<Hash>() + size_of::<BlueWorkType>());
bounded_cache_size(BASELINE_BLOCK_WINDOW_CACHE_SIZE, window_memory_budget, single_window_byte_size)
}

pub fn calculate_headers_cache_size(consensus_params: &Params) -> u64 {
let headers_memory_budget = 500_000_000u64; // 500MB
let approx_header_byte_size = approx_header_parents(consensus_params) * size_of::<Hash>() + size_of::<Header>();
bounded_cache_size(consensus_params.bps() * BASELINE_HEADER_DATA_CACHE_SIZE, headers_memory_budget, approx_header_byte_size)
}

pub fn approx_direct_header_parents(consensus_params: &Params) -> usize {
consensus_params.bps() as usize * NETWORK_DELAY_BOUND as usize
}

pub fn approx_header_parents(consensus_params: &Params) -> usize {
approx_direct_header_parents(consensus_params) * 4 // 4x for multi-levels
}

pub fn approx_mergeset_size(consensus_params: &Params) -> usize {
consensus_params.bps() as usize * NETWORK_DELAY_BOUND as usize
}
}

#[cfg(test)]
Expand Down
2 changes: 2 additions & 0 deletions consensus/core/src/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ impl UtxoEntry {
}
}

impl kaspa_utils::mem_size::MemSizeEstimator for UtxoEntry {}

pub type TransactionIndexType = u32;

/// Represents a Kaspa transaction outpoint
Expand Down
11 changes: 10 additions & 1 deletion consensus/core/src/utxo/utxo_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use super::{
utxo_error::{UtxoAlgebraError, UtxoResult},
};
use crate::tx::{TransactionOutpoint, UtxoEntry, VerifiableTransaction};
use kaspa_utils::mem_size::{MemSize, MemSizeEstimator};
use serde::{Deserialize, Serialize};
use std::collections::hash_map::Entry::Vacant;
use std::{collections::hash_map::Entry::Vacant, mem::size_of};

pub trait ImmutableUtxoDiff {
fn added(&self) -> &UtxoCollection;
Expand All @@ -17,6 +18,14 @@ pub struct UtxoDiff {
pub remove: UtxoCollection,
}

impl MemSizeEstimator for UtxoDiff {
fn estimate_mem_size(&self) -> MemSize {
MemSize::BytesStatic {
num_bytes: size_of::<Self>() + (self.add.len() + self.remove.len()) * size_of::<(TransactionOutpoint, UtxoEntry)>(),
}
}
}

impl<T: ImmutableUtxoDiff> ImmutableUtxoDiff for &T {
fn added(&self) -> &UtxoCollection {
(*self).added()
Expand Down
8 changes: 6 additions & 2 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use kaspa_consensus_notify::root::ConsensusNotificationRoot;
use kaspa_consensusmanager::{ConsensusFactory, ConsensusInstance, DynConsensusCtl, SessionLock};
use kaspa_core::{debug, time::unix_now, warn};
use kaspa_database::{
prelude::{BatchDbWriter, CachedDbAccess, CachedDbItem, DirectDbWriter, StoreError, StoreResult, StoreResultExtensions, DB},
prelude::{
BatchDbWriter, CachePolicy, CachedDbAccess, CachedDbItem, DirectDbWriter, StoreError, StoreResult, StoreResultExtensions, DB,
},
registry::DatabaseStorePrefixes,
};

Expand All @@ -25,6 +27,8 @@ pub struct ConsensusEntry {
creation_timestamp: u64,
}

impl kaspa_utils::mem_size::MemSizeEstimator for ConsensusEntry {}

impl ConsensusEntry {
pub fn new(key: u64, directory_name: String, creation_timestamp: u64) -> Self {
Self { key, directory_name, creation_timestamp }
Expand Down Expand Up @@ -79,7 +83,7 @@ impl MultiConsensusManagementStore {
pub fn new(db: Arc<DB>) -> Self {
let mut store = Self {
db: db.clone(),
entries: CachedDbAccess::new(db.clone(), 16, DatabaseStorePrefixes::ConsensusEntries.into()),
entries: CachedDbAccess::new(db.clone(), CachePolicy::Unit(16), DatabaseStorePrefixes::ConsensusEntries.into()),
metadata: CachedDbItem::new(db, DatabaseStorePrefixes::MultiConsensusMetadata.into()),
};
store.init();
Expand Down
Loading

0 comments on commit d821abc

Please sign in to comment.