Skip to content

Commit

Permalink
Merge branch 'master' into cache-policy
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelsutton committed Dec 14, 2023
2 parents 53cbf3a + 50d5233 commit 392a30a
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 26 deletions.
4 changes: 4 additions & 0 deletions consensus/core/src/blockstatus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,8 @@ impl BlockStatus {
pub fn is_valid(self) -> bool {
self != BlockStatus::StatusInvalid
}

pub fn is_invalid(self) -> bool {
self == BlockStatus::StatusInvalid
}
}
24 changes: 23 additions & 1 deletion consensus/core/src/config/bps.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,18 @@ impl<const BPS: u64> Bps<BPS> {
let val = (Self::ghostdag_k() / 2) as u8;
if val < 10 {
10
// TODO (TEMP): uncomment when restarting TN11 or when implementing a TN11 HF
/*
} else if val > 16 {
// We currently limit the number of parents by 16 in order to preserve processing performance
// and to prevent number of parent references per network round from growing quadratically with
// BPS. As BPS might grow beyond 10 this will mean that blocks will reference less parents than
// the average number of DAG tips. Which means relying on randomness between network peers for ensuring
// that all tips are eventually merged. We conjecture that with high probability every block will
// be merged after a log number of rounds. For mainnet this requires an increase to the value of GHOSTDAG
// K accompanied by a short security analysis, or moving to the parameterless DAGKNIGHT.
16
*/
} else {
val
}
Expand All @@ -67,6 +79,16 @@ impl<const BPS: u64> Bps<BPS> {
Self::ghostdag_k() as u64 * 10
}

// TODO (TEMP): rename to mergeset_size_limit when restarting TN11 or when implementing a TN11 HF
pub const fn _mergeset_size_limit() -> u64 {
let val = Self::ghostdag_k() as u64 * 2;
if val < 180 {
180
} else {
val
}
}

pub const fn merge_depth_bound() -> u64 {
BPS * MERGE_DEPTH_DURATION
}
Expand All @@ -87,7 +109,7 @@ impl<const BPS: u64> Bps<BPS> {
}

pub const fn pruning_proof_m() -> u64 {
// No need to scale this constant with BPS since the important block levels (higher) remain logarithmically long
// No need to scale this constant with BPS since the important block levels (higher) remain logarithmically short
PRUNING_PROOF_M
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/config/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub mod consensus {
/// Minimal size of the difficulty window. Affects the DA algorithm only at the starting period of a new net
pub const MIN_DIFFICULTY_WINDOW_LEN: usize = 10;

/// **Legacy** difficulty adjustment window size corresponding to ~46 minutes with 1 BPS
/// **Legacy** difficulty adjustment window size corresponding to ~44 minutes with 1 BPS
pub const LEGACY_DIFFICULTY_WINDOW_SIZE: usize = 2641;

/// **New** difficulty window duration expressed in time units (seconds).
Expand Down
20 changes: 16 additions & 4 deletions consensus/core/src/config/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,22 @@ impl Params {

/// Returns whether the sink timestamp is recent enough and the node is considered synced or nearly synced.
pub fn is_nearly_synced(&self, sink_timestamp: u64, sink_daa_score: u64) -> bool {
// We consider the node close to being synced if the sink (virtual selected parent) block
// timestamp is within DAA window duration far in the past. Blocks mined over such DAG state would
// enter the DAA window of fully-synced nodes and thus contribute to overall network difficulty
unix_now() < sink_timestamp + self.expected_daa_window_duration_in_milliseconds(sink_daa_score)
if self.net.is_mainnet() {
// We consider the node close to being synced if the sink (virtual selected parent) block
// timestamp is within DAA window duration far in the past. Blocks mined over such DAG state would
// enter the DAA window of fully-synced nodes and thus contribute to overall network difficulty
unix_now() < sink_timestamp + self.expected_daa_window_duration_in_milliseconds(sink_daa_score)
} else {
// For testnets we consider the node to be synced if the sink timestamp is within a time range which
// is overwhelmingly unlikely to pass without mined blocks even if net hashrate decreased dramatically.
//
// This period is smaller than the above mainnet calculation in order to ensure that an IBDing miner
// with significant testnet hashrate does not overwhelm the network with deep side-DAGs.
//
// We use DAA duration as baseline and scale it down with BPS
let max_expected_duration_without_blocks_in_milliseconds = self.target_time_per_block * NEW_DIFFICULTY_WINDOW_DURATION; // = DAA duration in milliseconds / bps
unix_now() < sink_timestamp + max_expected_duration_without_blocks_in_milliseconds
}
}

pub fn network_name(&self) -> String {
Expand Down
40 changes: 28 additions & 12 deletions consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ use crossbeam_channel::{Receiver as CrossbeamReceiver, Sender as CrossbeamSender
use itertools::Itertools;
use kaspa_utils::binary_heap::BinaryHeapExtensions;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use rand::seq::SliceRandom;
use rand::{seq::SliceRandom, Rng};
use rayon::{
prelude::{IntoParallelRefIterator, IntoParallelRefMutIterator, ParallelIterator},
ThreadPool,
Expand Down Expand Up @@ -176,8 +176,10 @@ impl VirtualStateProcessor {
thread_pool,

genesis: params.genesis.clone(),
max_block_parents: params.max_block_parents.min(16), // TODO: TEMP
mergeset_size_limit: params.mergeset_size_limit,
// TODO (TEMP): remove TN11 bounds when restarting/HF TN11, see comments in bps.rs
// (changing these values here is a way to influence the mined templates w/o breaking consensus)
max_block_parents: params.max_block_parents.min(16),
mergeset_size_limit: params.mergeset_size_limit.min(248),
pruning_depth: params.pruning_depth,

db,
Expand Down Expand Up @@ -522,7 +524,9 @@ impl VirtualStateProcessor {
drop(selected_chain_write);
}

/// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation
/// Returns the max number of tips to consider as virtual parents in a single virtual resolve operation.
///
/// Guaranteed to be `>= self.max_block_parents`
fn max_virtual_parent_candidates(&self) -> usize {
// Limit to max_block_parents x 3 candidates. This way we avoid going over thousands of tips when the network isn't healthy.
// There's no specific reason for a factor of 3, and its not a consensus rule, just an estimation for reducing the amount
Expand Down Expand Up @@ -574,11 +578,7 @@ impl VirtualStateProcessor {
let filtering_blue_work = self.ghostdag_primary_store.get_blue_work(filtering_root).unwrap_or_default();
return (
candidate,
heap.into_sorted_iter()
.take(self.max_virtual_parent_candidates())
.take_while(|s| s.blue_work >= filtering_blue_work)
.map(|s| s.hash)
.collect(),
heap.into_sorted_iter().take_while(|s| s.blue_work >= filtering_blue_work).map(|s| s.hash).collect(),
);
} else {
debug!("Block candidate {} has invalid UTXO state and is ignored from Virtual chain.", candidate)
Expand Down Expand Up @@ -614,16 +614,32 @@ impl VirtualStateProcessor {
// TODO: tests

// Mergeset increasing might traverse DAG areas which are below the finality point and which theoretically
// can borderline with pruned data, hence we acquire the prune lock to insure data consistency. Note that
// can borderline with pruned data, hence we acquire the prune lock to ensure data consistency. Note that
// the final selected mergeset can never be pruned (this is the essence of the prunality proof), however
// we might touch such data prior to validating the bounded merge rule. All in all, this function is short
// enough so we avoid making further optimizations
let _prune_guard = self.pruning_lock.blocking_read();
let max_block_parents = self.max_block_parents as usize;
let max_candidates = self.max_virtual_parent_candidates();

// Prioritize half the blocks with highest blue work and pick the rest randomly to ensure diversity between nodes
if candidates.len() > max_block_parents / 2 {
// `make_contiguous` should be a no op since the deque was just built
if candidates.len() > max_candidates {
// make_contiguous should be a no op since the deque was just built
let slice = candidates.make_contiguous();

// Keep slice[..max_block_parents / 2] as is, choose max_candidates - max_block_parents / 2 in random
// from the remainder of the slice while swapping them to slice[max_block_parents / 2..max_candidates].
//
// Inspired by rand::partial_shuffle (which lacks the guarantee on chosen elements location).
for i in max_block_parents / 2..max_candidates {
let j = rand::thread_rng().gen_range(i..slice.len()); // i < max_candidates < slice.len()
slice.swap(i, j);
}

// Truncate the unchosen elements
candidates.truncate(max_candidates);
} else if candidates.len() > max_block_parents / 2 {
// Fallback to a simpler algo in this case
candidates.make_contiguous()[max_block_parents / 2..].shuffle(&mut rand::thread_rng());
}

Expand Down
15 changes: 11 additions & 4 deletions protocol/flows/src/flow_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ const PROTOCOL_VERSION: u32 = 6;
/// See `check_orphan_resolution_range`
const BASELINE_ORPHAN_RESOLUTION_RANGE: u32 = 5;

/// Orphans are kept as full blocks so we cannot hold too much of them in memory
const MAX_ORPHANS_UPPER_BOUND: usize = 1024;

/// The min time to wait before allowing another parallel request
const REQUEST_SCOPE_WAIT_TIME: Duration = Duration::from_secs(1);

Expand Down Expand Up @@ -203,11 +206,11 @@ impl FlowContext {
) -> Self {
let hub = Hub::new();

let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log2().min(3.0) as u32;
let orphan_resolution_range = BASELINE_ORPHAN_RESOLUTION_RANGE + (config.bps() as f64).log2().ceil() as u32;

// The maximum amount of orphans allowed in the orphans pool. This number is an
// approximation of how many orphans there can possibly be on average.
let max_orphans = 2u64.pow(orphan_resolution_range) as usize * config.ghostdag_k as usize;
// The maximum amount of orphans allowed in the orphans pool. This number is an approximation
// of how many orphans there can possibly be on average bounded by an upper bound.
let max_orphans = (2u64.pow(orphan_resolution_range) as usize * config.ghostdag_k as usize).min(MAX_ORPHANS_UPPER_BOUND);
Self {
inner: Arc::new(FlowContextInner {
node_id: Uuid::new_v4().into(),
Expand Down Expand Up @@ -360,6 +363,10 @@ impl FlowContext {
unorphaned_blocks
}

pub async fn revalidate_orphans(&self, consensus: &ConsensusProxy) {
self.orphans_pool.write().await.revalidate_orphans(consensus).await
}

/// Adds the rpc-submitted block to the DAG and propagates it to peers.
pub async fn submit_rpc_block(&self, consensus: &ConsensusProxy, block: Block) -> Result<(), ProtocolError> {
if block.transactions.is_empty() {
Expand Down
38 changes: 37 additions & 1 deletion protocol/flows/src/flowcontext/orphans.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,27 @@ impl OrphanBlocksPool {
}
})
}

/// Iterate all orphans and remove blocks which are no longer orphans.
/// This is important for the overall health of the pool and for ensuring that
/// orphan blocks don't evict due to pool size limit while already processed
/// blocks remain in it. Should be called following IBD.
pub async fn revalidate_orphans(&mut self, consensus: &ConsensusProxy) {
let mut i = 0;
while i < self.orphans.len() {
if let Some((&h, _)) = self.orphans.get_index(i) {
if consensus.async_get_block_status(h).await.is_some_and(|s| s.is_invalid() || s.has_block_body()) {
// If we swap removed do not advance i so that we revisit the new element moved
// to i in the next iteration. Loop will progress because len is shorter now.
self.orphans.swap_remove_index(i);
} else {
i += 1;
}
} else {
i += 1;
}
}
}
}

#[cfg(test)]
Expand Down Expand Up @@ -183,6 +204,8 @@ mod tests {
let b = Block::from_precomputed_hash(9.into(), vec![]);
let c = Block::from_precomputed_hash(10.into(), roots.clone());
let d = Block::from_precomputed_hash(11.into(), vec![10.into()]);
let e = Block::from_precomputed_hash(12.into(), vec![10.into()]);
let f = Block::from_precomputed_hash(13.into(), vec![12.into()]);

pool.add_orphan(c.clone());
pool.add_orphan(d.clone());
Expand All @@ -192,11 +215,24 @@ mod tests {
consensus.validate_and_insert_block(a.clone()).virtual_state_task.await.unwrap();
consensus.validate_and_insert_block(b.clone()).virtual_state_task.await.unwrap();

// Test unorphaning
let (blocks, _, virtual_state_tasks) = pool.unorphan_blocks(&consensus, 8.into()).await;
try_join_all(virtual_state_tasks).await.unwrap();
assert_eq!(blocks.into_iter().map(|b| b.hash()).collect::<HashSet<_>>(), HashSet::from([10.into(), 11.into()]));
assert!(pool.orphans.is_empty());

drop((a, b, c, d));
// Test revalidation
pool.add_orphan(d.clone());
pool.add_orphan(e.clone());
pool.add_orphan(f.clone());
assert_eq!(pool.orphans.len(), 3);
pool.revalidate_orphans(&consensus).await;
assert_eq!(pool.orphans.len(), 2);
consensus.validate_and_insert_block(e.clone()).virtual_state_task.await.unwrap();
consensus.validate_and_insert_block(f.clone()).virtual_state_task.await.unwrap();
pool.revalidate_orphans(&consensus).await;
assert!(pool.orphans.is_empty());

drop((a, b, c, d, e, f));
}
}
7 changes: 6 additions & 1 deletion protocol/flows/src/v5/blockrelay/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,12 @@ impl HandleRelayInvsFlow {
.await?;
let msg = dequeue_with_timeout!(self.msg_route, Payload::BlockLocator)?;
let locator_hashes: Vec<Hash> = msg.try_into()?;
for h in locator_hashes {
// Locator hashes are sent from later to earlier, so it makes sense to query consensus in reverse. Technically
// with current syncer-side implementations (in both go-kaspa and this codebase) we could query only the last one,
// but we prefer not relying on such details for correctness
//
// TODO: change syncer-side to only send the most early block since it's sufficient for our needs
for h in locator_hashes.into_iter().rev() {
if consensus.async_get_block_status(h).await.is_some_and(|s| s.has_block_body()) {
return Ok(true);
}
Expand Down
7 changes: 6 additions & 1 deletion protocol/flows/src/v5/ibd/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,12 @@ impl IbdFlow {

// Relay block might be in the antipast of syncer sink, thus
// check its past for missing bodies as well.
self.sync_missing_block_bodies(&session, relay_block.hash()).await
self.sync_missing_block_bodies(&session, relay_block.hash()).await?;

// Following IBD we revalidate orphans since many of them might have been processed during the IBD
self.ctx.revalidate_orphans(&session).await;

Ok(())
}

async fn determine_ibd_type(
Expand Down
2 changes: 1 addition & 1 deletion simpa/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ async fn validate(src_consensus: &Consensus, dst_consensus: &Consensus, params:
for (i, mut chunk) in iter.enumerate() {
let current_joins = submit_chunk(src_consensus, dst_consensus, &mut chunk);
let statuses = try_join_all(prev_joins).await.unwrap();
info!("Validated chunk {}", i);
trace!("Validated chunk {}", i);
assert!(statuses.iter().all(|s| s.is_utxo_valid_or_pending()));
prev_joins = current_joins;
}
Expand Down

0 comments on commit 392a30a

Please sign in to comment.