Skip to content

Commit

Permalink
Optionalize limts.
Browse files Browse the repository at this point in the history
  • Loading branch information
D-Stacks committed Sep 19, 2024
1 parent 40aeee5 commit 2eb647c
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 16 deletions.
8 changes: 6 additions & 2 deletions components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,11 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(|c| c.is_nearly_synced()).await
}

pub async fn async_get_virtual_chain_from_block(&self, low: Hash, chain_path_added_limit: usize) -> ConsensusResult<ChainPath> {
pub async fn async_get_virtual_chain_from_block(
&self,
low: Hash,
chain_path_added_limit: Option<usize>,
) -> ConsensusResult<ChainPath> {
self.clone().spawn_blocking(move |c| c.get_virtual_chain_from_block(low, chain_path_added_limit)).await
}

Expand Down Expand Up @@ -383,7 +387,7 @@ impl ConsensusSessionOwned {
pub async fn async_get_blocks_acceptance_data(
&self,
hashes: Vec<Hash>,
merged_blocks_limit: usize,
merged_blocks_limit: Option<usize>,
) -> ConsensusResult<Vec<Arc<AcceptanceData>>> {
self.clone().spawn_blocking(move |c| c.get_blocks_acceptance_data(&hashes, merged_blocks_limit)).await
}
Expand Down
10 changes: 7 additions & 3 deletions consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,8 @@ pub trait ConsensusApi: Send + Sync {
///
/// Note:
/// 1) `chain_path_added_limit` will populate removed fully, and then the added chain path, up to `chain_path_added_limit` amount of hashes.
/// 1.1) use `usize::MAX` to impose no limit with optimized backward chain iteration, for better performance in cases where batching is not required.
fn get_virtual_chain_from_block(&self, low: Hash, chain_path_added_limit: usize) -> ConsensusResult<ChainPath> {
/// 1.1) use `None to impose no limit with optimized backward chain iteration, for better performance in cases where batching is not required.
fn get_virtual_chain_from_block(&self, low: Hash, chain_path_added_limit: Option<usize>) -> ConsensusResult<ChainPath> {
unimplemented!()
}

Expand Down Expand Up @@ -302,7 +302,11 @@ pub trait ConsensusApi: Send + Sync {
/// Returns acceptance data for a set of blocks belonging to the selected parent chain.
///
/// See `self::get_virtual_chain`
fn get_blocks_acceptance_data(&self, hashes: &[Hash], merged_blocks_limit: usize) -> ConsensusResult<Vec<Arc<AcceptanceData>>> {
fn get_blocks_acceptance_data(
&self,
hashes: &[Hash],
merged_blocks_limit: Option<usize>,
) -> ConsensusResult<Vec<Arc<AcceptanceData>>> {
unimplemented!()
}

Expand Down
14 changes: 10 additions & 4 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,13 @@ impl ConsensusApi for Consensus {
self.config.is_nearly_synced(compact.timestamp, compact.daa_score)
}

fn get_virtual_chain_from_block(&self, low: Hash, chain_path_added_limit: usize) -> ConsensusResult<ChainPath> {
fn get_virtual_chain_from_block(&self, low: Hash, chain_path_added_limit: Option<usize>) -> ConsensusResult<ChainPath> {
// Calculate chain changes between the given `low` and the current sink hash (up to `limit` amount of block hashes).
// Note:
// 1) that we explicitly don't
// do the calculation against the virtual itself so that we
// won't later need to remove it from the result.
// 2) supplying `usize::MAX` as `chain_path_added_limit` will result in the full chain path, with optimized performance.
// 2) supplying `None` as `chain_path_added_limit` will result in the full chain path, with optimized performance.
let _guard = self.pruning_lock.blocking_read();

// Verify that the block exists
Expand Down Expand Up @@ -926,17 +926,23 @@ impl ConsensusApi for Consensus {
self.acceptance_data_store.get(hash).unwrap_option().ok_or(ConsensusError::MissingData(hash))
}

fn get_blocks_acceptance_data(&self, hashes: &[Hash], merged_blocks_limit: usize) -> ConsensusResult<Vec<Arc<AcceptanceData>>> {
fn get_blocks_acceptance_data(
&self,
hashes: &[Hash],
merged_blocks_limit: Option<usize>,
) -> ConsensusResult<Vec<Arc<AcceptanceData>>> {
// Note: merged_blocks_limit will limit after the sum of merged blocks is breached along the supplied hash's acceptance data
// and not limit the acceptance data within a queried hash. i.e. It has mergeset_size_limit granularity, this is to guarantee full acceptance data coverage.
if merged_blocks_limit == usize::MAX {
if merged_blocks_limit.is_none() {
return hashes
.iter()
.copied()
.map(|hash| self.acceptance_data_store.get(hash).unwrap_option().ok_or(ConsensusError::MissingData(hash)))
.collect::<ConsensusResult<Vec<_>>>();
}
let merged_blocks_limit = merged_blocks_limit.unwrap(); // we handle `is_none`, so may unwrap.
let mut num_of_merged_blocks = 0usize;

hashes
.iter()
.copied()
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl VirtualStateProcessor {
assert_eq!(virtual_ghostdag_data.selected_parent, new_sink);

let sink_multiset = self.utxo_multisets_store.get(new_sink).unwrap();
let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink, usize::MAX);
let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink, None);
let new_virtual_state = self
.calculate_and_commit_virtual_state(
virtual_read,
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/processes/traversal_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl<T: GhostdagStoreReader, U: ReachabilityStoreReader, V: RelationsStoreReader
Self { genesis_hash, ghostdag_store, relations_store, reachability_service }
}

pub fn calculate_chain_path(&self, from: Hash, to: Hash, chain_path_added_limit: usize) -> ChainPath {
pub fn calculate_chain_path(&self, from: Hash, to: Hash, chain_path_added_limit: Option<usize>) -> ChainPath {
let mut removed = Vec::new();
let mut common_ancestor = from;
for current in self.reachability_service.default_backward_chain_iterator(from) {
Expand All @@ -42,7 +42,7 @@ impl<T: GhostdagStoreReader, U: ReachabilityStoreReader, V: RelationsStoreReader
break;
}
}
if chain_path_added_limit == usize::MAX {
if chain_path_added_limit.is_none() {
// Use backward chain iterator
// It is more intuitive to use forward iterator here, but going downwards the selected chain is faster.
let mut added = self.reachability_service.backward_chain_iterator(to, common_ancestor, false).collect_vec();
Expand All @@ -54,7 +54,7 @@ impl<T: GhostdagStoreReader, U: ReachabilityStoreReader, V: RelationsStoreReader
.reachability_service
.forward_chain_iterator(common_ancestor, to, true)
.skip(1)
.take(chain_path_added_limit)
.take(chain_path_added_limit.unwrap()) // we handle is_none so we may unwrap.
.collect_vec();
ChainPath { added, removed }
}
Expand Down
2 changes: 1 addition & 1 deletion rpc/service/src/converter/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl ConsensusConverter {
&self,
consensus: &ConsensusProxy,
chain_path: &ChainPath,
merged_blocks_limit: usize,
merged_blocks_limit: Option<usize>,
) -> RpcResult<Vec<RpcAcceptedTransactionIds>> {
let acceptance_data = consensus.async_get_blocks_acceptance_data(chain_path.added.clone(), merged_blocks_limit).await.unwrap();
Ok(chain_path
Expand Down
4 changes: 2 additions & 2 deletions rpc/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,11 +616,11 @@ NOTE: This error usually indicates an RPC conversion error between the node and
// else it returns the batch_size amount on pure chain blocks.
// Note: batch_size does not bound removed chain blocks, only added chain blocks.
let batch_size = (self.config.mergeset_size_limit * 10) as usize;
let mut virtual_chain_batch = session.async_get_virtual_chain_from_block(request.start_hash, batch_size).await?;
let mut virtual_chain_batch = session.async_get_virtual_chain_from_block(request.start_hash, Some(batch_size)).await?;
let accepted_transaction_ids = if request.include_accepted_transaction_ids {
let accepted_transaction_ids = self
.consensus_converter
.get_virtual_chain_accepted_transaction_ids(&session, &virtual_chain_batch, batch_size)
.get_virtual_chain_accepted_transaction_ids(&session, &virtual_chain_batch, Some(batch_size))
.await?;
// bound added to the length of the accepted transaction ids, which is bounded by merged blocks
virtual_chain_batch.added = virtual_chain_batch.added[..accepted_transaction_ids.len()].to_vec();
Expand Down

0 comments on commit 2eb647c

Please sign in to comment.