Skip to content

Commit

Permalink
virtual chain from block batching. (#454)
Browse files Browse the repository at this point in the history
* expose vspc_from_block batching possibilities to rpc.

* fmt

* limit by merged blocks, set source as def. start.

* small clean-up

* fmt

* actually bound by num of merged blocks, in include transactions case.

* fmt

* update.

* update_2

* new_high = high

* remove high hash in consensus api.. as it is not required.

* fmt

* make proto comment more accurate.

* fix tests, and lints, add to ser / der correctly.

* change two freq warns to debug

* remove option, default to pp. not source.

* fix integration test, some Option<Hash> left.

* bump version: ´0.15.1 => 0.15.2`

* remove "optional" startHash

* add to cli rpc.rs

* remove comment.

* edit comment in .proto referencing def. startHash behavior.

* only batch added chain blocks, not removed, add check if source is a chain ancestor of high.

* remove dangling code in comment

* remove error from some prev. commit.

* Optionalize limts.

---------

Co-authored-by: Michael Sutton <[email protected]>
  • Loading branch information
D-Stacks and michaelsutton authored Sep 19, 2024
1 parent 613d082 commit b14537f
Show file tree
Hide file tree
Showing 14 changed files with 247 additions and 148 deletions.
116 changes: 58 additions & 58 deletions Cargo.lock

Large diffs are not rendered by default.

112 changes: 56 additions & 56 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ members = [

[workspace.package]
rust-version = "1.81.0"
version = "0.15.1"
version = "0.15.2"
authors = ["Kaspa developers"]
license = "ISC"
repository = "https://github.com/kaspanet/rusty-kaspa"
Expand All @@ -80,61 +80,61 @@ include = [
]

[workspace.dependencies]
# kaspa-testing-integration = { version = "0.15.1", path = "testing/integration" }
kaspa-addresses = { version = "0.15.1", path = "crypto/addresses" }
kaspa-addressmanager = { version = "0.15.1", path = "components/addressmanager" }
kaspa-bip32 = { version = "0.15.1", path = "wallet/bip32" }
kaspa-cli = { version = "0.15.1", path = "cli" }
kaspa-connectionmanager = { version = "0.15.1", path = "components/connectionmanager" }
kaspa-consensus = { version = "0.15.1", path = "consensus" }
kaspa-consensus-core = { version = "0.15.1", path = "consensus/core" }
kaspa-consensus-client = { version = "0.15.1", path = "consensus/client" }
kaspa-consensus-notify = { version = "0.15.1", path = "consensus/notify" }
kaspa-consensus-wasm = { version = "0.15.1", path = "consensus/wasm" }
kaspa-consensusmanager = { version = "0.15.1", path = "components/consensusmanager" }
kaspa-core = { version = "0.15.1", path = "core" }
kaspa-daemon = { version = "0.15.1", path = "daemon" }
kaspa-database = { version = "0.15.1", path = "database" }
kaspa-grpc-client = { version = "0.15.1", path = "rpc/grpc/client" }
kaspa-grpc-core = { version = "0.15.1", path = "rpc/grpc/core" }
kaspa-grpc-server = { version = "0.15.1", path = "rpc/grpc/server" }
kaspa-hashes = { version = "0.15.1", path = "crypto/hashes" }
kaspa-index-core = { version = "0.15.1", path = "indexes/core" }
kaspa-index-processor = { version = "0.15.1", path = "indexes/processor" }
kaspa-math = { version = "0.15.1", path = "math" }
kaspa-merkle = { version = "0.15.1", path = "crypto/merkle" }
kaspa-metrics-core = { version = "0.15.1", path = "metrics/core" }
kaspa-mining = { version = "0.15.1", path = "mining" }
kaspa-mining-errors = { version = "0.15.1", path = "mining/errors" }
kaspa-muhash = { version = "0.15.1", path = "crypto/muhash" }
kaspa-notify = { version = "0.15.1", path = "notify" }
kaspa-p2p-flows = { version = "0.15.1", path = "protocol/flows" }
kaspa-p2p-lib = { version = "0.15.1", path = "protocol/p2p" }
kaspa-perf-monitor = { version = "0.15.1", path = "metrics/perf_monitor" }
kaspa-pow = { version = "0.15.1", path = "consensus/pow" }
kaspa-rpc-core = { version = "0.15.1", path = "rpc/core" }
kaspa-rpc-macros = { version = "0.15.1", path = "rpc/macros" }
kaspa-rpc-service = { version = "0.15.1", path = "rpc/service" }
kaspa-txscript = { version = "0.15.1", path = "crypto/txscript" }
kaspa-txscript-errors = { version = "0.15.1", path = "crypto/txscript/errors" }
kaspa-utils = { version = "0.15.1", path = "utils" }
kaspa-utils-tower = { version = "0.15.1", path = "utils/tower" }
kaspa-utxoindex = { version = "0.15.1", path = "indexes/utxoindex" }
kaspa-wallet = { version = "0.15.1", path = "wallet/native" }
kaspa-wallet-cli-wasm = { version = "0.15.1", path = "wallet/wasm" }
kaspa-wallet-keys = { version = "0.15.1", path = "wallet/keys" }
kaspa-wallet-pskt = { version = "0.15.1", path = "wallet/pskt" }
kaspa-wallet-core = { version = "0.15.1", path = "wallet/core" }
kaspa-wallet-macros = { version = "0.15.1", path = "wallet/macros" }
kaspa-wasm = { version = "0.15.1", path = "wasm" }
kaspa-wasm-core = { version = "0.15.1", path = "wasm/core" }
kaspa-wrpc-client = { version = "0.15.1", path = "rpc/wrpc/client" }
kaspa-wrpc-proxy = { version = "0.15.1", path = "rpc/wrpc/proxy" }
kaspa-wrpc-server = { version = "0.15.1", path = "rpc/wrpc/server" }
kaspa-wrpc-wasm = { version = "0.15.1", path = "rpc/wrpc/wasm" }
kaspa-wrpc-example-subscriber = { version = "0.15.1", path = "rpc/wrpc/examples/subscriber" }
kaspad = { version = "0.15.1", path = "kaspad" }
kaspa-alloc = { version = "0.15.1", path = "utils/alloc" }
# kaspa-testing-integration = { version = "0.15.2", path = "testing/integration" }
kaspa-addresses = { version = "0.15.2", path = "crypto/addresses" }
kaspa-addressmanager = { version = "0.15.2", path = "components/addressmanager" }
kaspa-bip32 = { version = "0.15.2", path = "wallet/bip32" }
kaspa-cli = { version = "0.15.2", path = "cli" }
kaspa-connectionmanager = { version = "0.15.2", path = "components/connectionmanager" }
kaspa-consensus = { version = "0.15.2", path = "consensus" }
kaspa-consensus-core = { version = "0.15.2", path = "consensus/core" }
kaspa-consensus-client = { version = "0.15.2", path = "consensus/client" }
kaspa-consensus-notify = { version = "0.15.2", path = "consensus/notify" }
kaspa-consensus-wasm = { version = "0.15.2", path = "consensus/wasm" }
kaspa-consensusmanager = { version = "0.15.2", path = "components/consensusmanager" }
kaspa-core = { version = "0.15.2", path = "core" }
kaspa-daemon = { version = "0.15.2", path = "daemon" }
kaspa-database = { version = "0.15.2", path = "database" }
kaspa-grpc-client = { version = "0.15.2", path = "rpc/grpc/client" }
kaspa-grpc-core = { version = "0.15.2", path = "rpc/grpc/core" }
kaspa-grpc-server = { version = "0.15.2", path = "rpc/grpc/server" }
kaspa-hashes = { version = "0.15.2", path = "crypto/hashes" }
kaspa-index-core = { version = "0.15.2", path = "indexes/core" }
kaspa-index-processor = { version = "0.15.2", path = "indexes/processor" }
kaspa-math = { version = "0.15.2", path = "math" }
kaspa-merkle = { version = "0.15.2", path = "crypto/merkle" }
kaspa-metrics-core = { version = "0.15.2", path = "metrics/core" }
kaspa-mining = { version = "0.15.2", path = "mining" }
kaspa-mining-errors = { version = "0.15.2", path = "mining/errors" }
kaspa-muhash = { version = "0.15.2", path = "crypto/muhash" }
kaspa-notify = { version = "0.15.2", path = "notify" }
kaspa-p2p-flows = { version = "0.15.2", path = "protocol/flows" }
kaspa-p2p-lib = { version = "0.15.2", path = "protocol/p2p" }
kaspa-perf-monitor = { version = "0.15.2", path = "metrics/perf_monitor" }
kaspa-pow = { version = "0.15.2", path = "consensus/pow" }
kaspa-rpc-core = { version = "0.15.2", path = "rpc/core" }
kaspa-rpc-macros = { version = "0.15.2", path = "rpc/macros" }
kaspa-rpc-service = { version = "0.15.2", path = "rpc/service" }
kaspa-txscript = { version = "0.15.2", path = "crypto/txscript" }
kaspa-txscript-errors = { version = "0.15.2", path = "crypto/txscript/errors" }
kaspa-utils = { version = "0.15.2", path = "utils" }
kaspa-utils-tower = { version = "0.15.2", path = "utils/tower" }
kaspa-utxoindex = { version = "0.15.2", path = "indexes/utxoindex" }
kaspa-wallet = { version = "0.15.2", path = "wallet/native" }
kaspa-wallet-cli-wasm = { version = "0.15.2", path = "wallet/wasm" }
kaspa-wallet-keys = { version = "0.15.2", path = "wallet/keys" }
kaspa-wallet-pskt = { version = "0.15.2", path = "wallet/pskt" }
kaspa-wallet-core = { version = "0.15.2", path = "wallet/core" }
kaspa-wallet-macros = { version = "0.15.2", path = "wallet/macros" }
kaspa-wasm = { version = "0.15.2", path = "wasm" }
kaspa-wasm-core = { version = "0.15.2", path = "wasm/core" }
kaspa-wrpc-client = { version = "0.15.2", path = "rpc/wrpc/client" }
kaspa-wrpc-proxy = { version = "0.15.2", path = "rpc/wrpc/proxy" }
kaspa-wrpc-server = { version = "0.15.2", path = "rpc/wrpc/server" }
kaspa-wrpc-wasm = { version = "0.15.2", path = "rpc/wrpc/wasm" }
kaspa-wrpc-example-subscriber = { version = "0.15.2", path = "rpc/wrpc/examples/subscriber" }
kaspad = { version = "0.15.2", path = "kaspad" }
kaspa-alloc = { version = "0.15.2", path = "utils/alloc" }

# external
aes = "0.8.3"
Expand Down
18 changes: 14 additions & 4 deletions cli/src/modules/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,20 @@ impl Rpc {
// let result = rpc.get_subnetwork_call(GetSubnetworkRequest { }).await?;
// self.println(&ctx, result);
// }
// RpcApiOps::GetVirtualChainFromBlock => {
// let result = rpc.get_virtual_chain_from_block_call(GetVirtualChainFromBlockRequest { }).await?;
// self.println(&ctx, result);
// }
RpcApiOps::GetVirtualChainFromBlock => {
if argv.is_empty() {
return Err(Error::custom("Missing startHash argument"));
};
let start_hash = RpcHash::from_hex(argv.remove(0).as_str())?;
let include_accepted_transaction_ids = argv.remove(0).parse::<bool>().unwrap_or_default();
let result = rpc
.get_virtual_chain_from_block_call(
None,
GetVirtualChainFromBlockRequest { start_hash, include_accepted_transaction_ids },
)
.await?;
self.println(&ctx, result);
}
// RpcApiOps::GetBlocks => {
// let result = rpc.get_blocks_call(GetBlocksRequest { }).await?;
// self.println(&ctx, result);
Expand Down
16 changes: 12 additions & 4 deletions components/consensusmanager/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,12 @@ impl ConsensusSessionOwned {
self.clone().spawn_blocking(|c| c.is_nearly_synced()).await
}

pub async fn async_get_virtual_chain_from_block(&self, hash: Hash) -> ConsensusResult<ChainPath> {
self.clone().spawn_blocking(move |c| c.get_virtual_chain_from_block(hash)).await
pub async fn async_get_virtual_chain_from_block(
&self,
low: Hash,
chain_path_added_limit: Option<usize>,
) -> ConsensusResult<ChainPath> {
self.clone().spawn_blocking(move |c| c.get_virtual_chain_from_block(low, chain_path_added_limit)).await
}

pub async fn async_get_virtual_utxos(
Expand Down Expand Up @@ -380,8 +384,12 @@ impl ConsensusSessionOwned {
/// Returns acceptance data for a set of blocks belonging to the selected parent chain.
///
/// See `self::get_virtual_chain`
pub async fn async_get_blocks_acceptance_data(&self, hashes: Vec<Hash>) -> ConsensusResult<Vec<Arc<AcceptanceData>>> {
self.clone().spawn_blocking(move |c| c.get_blocks_acceptance_data(&hashes)).await
pub async fn async_get_blocks_acceptance_data(
&self,
hashes: Vec<Hash>,
merged_blocks_limit: Option<usize>,
) -> ConsensusResult<Vec<Arc<AcceptanceData>>> {
self.clone().spawn_blocking(move |c| c.get_blocks_acceptance_data(&hashes, merged_blocks_limit)).await
}

pub async fn async_is_chain_block(&self, hash: Hash) -> ConsensusResult<bool> {
Expand Down
13 changes: 11 additions & 2 deletions consensus/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ pub trait ConsensusApi: Send + Sync {
unimplemented!()
}

fn get_virtual_chain_from_block(&self, hash: Hash) -> ConsensusResult<ChainPath> {
/// Gets the virtual chain paths from `low` to the `sink` hash, or until `chain_path_added_limit` is reached
///
/// Note:
/// 1) `chain_path_added_limit` will populate removed fully, and then the added chain path, up to `chain_path_added_limit` amount of hashes.
/// 1.1) use `None to impose no limit with optimized backward chain iteration, for better performance in cases where batching is not required.
fn get_virtual_chain_from_block(&self, low: Hash, chain_path_added_limit: Option<usize>) -> ConsensusResult<ChainPath> {
unimplemented!()
}

Expand Down Expand Up @@ -297,7 +302,11 @@ pub trait ConsensusApi: Send + Sync {
/// Returns acceptance data for a set of blocks belonging to the selected parent chain.
///
/// See `self::get_virtual_chain`
fn get_blocks_acceptance_data(&self, hashes: &[Hash]) -> ConsensusResult<Vec<Arc<AcceptanceData>>> {
fn get_blocks_acceptance_data(
&self,
hashes: &[Hash],
merged_blocks_limit: Option<usize>,
) -> ConsensusResult<Vec<Arc<AcceptanceData>>> {
unimplemented!()
}

Expand Down
50 changes: 43 additions & 7 deletions consensus/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,14 +607,26 @@ impl ConsensusApi for Consensus {
self.config.is_nearly_synced(compact.timestamp, compact.daa_score)
}

fn get_virtual_chain_from_block(&self, hash: Hash) -> ConsensusResult<ChainPath> {
// Calculate chain changes between the given hash and the
// sink. Note that we explicitly don't
fn get_virtual_chain_from_block(&self, low: Hash, chain_path_added_limit: Option<usize>) -> ConsensusResult<ChainPath> {
// Calculate chain changes between the given `low` and the current sink hash (up to `limit` amount of block hashes).
// Note:
// 1) that we explicitly don't
// do the calculation against the virtual itself so that we
// won't later need to remove it from the result.
// 2) supplying `None` as `chain_path_added_limit` will result in the full chain path, with optimized performance.
let _guard = self.pruning_lock.blocking_read();
self.validate_block_exists(hash)?;
Ok(self.services.dag_traversal_manager.calculate_chain_path(hash, self.get_sink()))

// Verify that the block exists
self.validate_block_exists(low)?;

// Verify that source is on chain(block)
self.services
.reachability_service
.is_chain_ancestor_of(self.get_source(), low)
.then_some(())
.ok_or(ConsensusError::General("the queried hash does not have source on its chain"))?;

Ok(self.services.dag_traversal_manager.calculate_chain_path(low, self.get_sink(), chain_path_added_limit))
}

/// Returns a Vec of header samples since genesis
Expand Down Expand Up @@ -914,11 +926,35 @@ impl ConsensusApi for Consensus {
self.acceptance_data_store.get(hash).unwrap_option().ok_or(ConsensusError::MissingData(hash))
}

fn get_blocks_acceptance_data(&self, hashes: &[Hash]) -> ConsensusResult<Vec<Arc<AcceptanceData>>> {
fn get_blocks_acceptance_data(
&self,
hashes: &[Hash],
merged_blocks_limit: Option<usize>,
) -> ConsensusResult<Vec<Arc<AcceptanceData>>> {
// Note: merged_blocks_limit will limit after the sum of merged blocks is breached along the supplied hash's acceptance data
// and not limit the acceptance data within a queried hash. i.e. It has mergeset_size_limit granularity, this is to guarantee full acceptance data coverage.
if merged_blocks_limit.is_none() {
return hashes
.iter()
.copied()
.map(|hash| self.acceptance_data_store.get(hash).unwrap_option().ok_or(ConsensusError::MissingData(hash)))
.collect::<ConsensusResult<Vec<_>>>();
}
let merged_blocks_limit = merged_blocks_limit.unwrap(); // we handle `is_none`, so may unwrap.
let mut num_of_merged_blocks = 0usize;

hashes
.iter()
.copied()
.map(|hash| self.acceptance_data_store.get(hash).unwrap_option().ok_or(ConsensusError::MissingData(hash)))
.map_while(|hash| {
let entry = self.acceptance_data_store.get(hash).unwrap_option().ok_or(ConsensusError::MissingData(hash));
num_of_merged_blocks += entry.as_ref().map_or(0, |entry| entry.len());
if num_of_merged_blocks > merged_blocks_limit {
None
} else {
Some(entry)
}
})
.collect::<ConsensusResult<Vec<_>>>()
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/pipeline/virtual_processor/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl VirtualStateProcessor {
assert_eq!(virtual_ghostdag_data.selected_parent, new_sink);

let sink_multiset = self.utxo_multisets_store.get(new_sink).unwrap();
let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink);
let chain_path = self.dag_traversal_manager.calculate_chain_path(prev_sink, new_sink, None);
let new_virtual_state = self
.calculate_and_commit_virtual_state(
virtual_read,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl<
(blocks, highest_reached)
}

fn find_highest_common_chain_block(&self, low: Hash, high: Hash) -> Hash {
pub fn find_highest_common_chain_block(&self, low: Hash, high: Hash) -> Hash {
self.reachability_service
.default_backward_chain_iterator(low)
.find(|candidate| self.reachability_service.is_chain_ancestor_of(*candidate, high))
Expand Down
19 changes: 15 additions & 4 deletions consensus/src/processes/traversal_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl<T: GhostdagStoreReader, U: ReachabilityStoreReader, V: RelationsStoreReader
Self { genesis_hash, ghostdag_store, relations_store, reachability_service }
}

pub fn calculate_chain_path(&self, from: Hash, to: Hash) -> ChainPath {
pub fn calculate_chain_path(&self, from: Hash, to: Hash, chain_path_added_limit: Option<usize>) -> ChainPath {
let mut removed = Vec::new();
let mut common_ancestor = from;
for current in self.reachability_service.default_backward_chain_iterator(from) {
Expand All @@ -42,9 +42,20 @@ impl<T: GhostdagStoreReader, U: ReachabilityStoreReader, V: RelationsStoreReader
break;
}
}
// It is more intuitive to use forward iterator here, but going downwards the selected chain is faster.
let mut added = self.reachability_service.backward_chain_iterator(to, common_ancestor, false).collect_vec();
added.reverse();
if chain_path_added_limit.is_none() {
// Use backward chain iterator
// It is more intuitive to use forward iterator here, but going downwards the selected chain is faster.
let mut added = self.reachability_service.backward_chain_iterator(to, common_ancestor, false).collect_vec();
added.reverse();
return ChainPath { added, removed };
}
// Use forward chain iterator, to ascertain a path from the common ancestor to the target.
let added = self
.reachability_service
.forward_chain_iterator(common_ancestor, to, true)
.skip(1)
.take(chain_path_added_limit.unwrap()) // we handle is_none so we may unwrap.
.collect_vec();
ChainPath { added, removed }
}

Expand Down
6 changes: 3 additions & 3 deletions mining/src/mempool/remove_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::mempool::{
Mempool,
};
use kaspa_consensus_core::tx::TransactionId;
use kaspa_core::{debug, warn};
use kaspa_core::debug;
use kaspa_utils::iter::IterExtensions;

impl Mempool {
Expand Down Expand Up @@ -43,8 +43,8 @@ impl Mempool {
TxRemovalReason::Muted => {}
TxRemovalReason::DoubleSpend => match removed_transactions.len() {
0 => {}
1 => warn!("Removed transaction ({}) {}{}", reason, removed_transactions[0], extra_info),
n => warn!(
1 => debug!("Removed transaction ({}) {}{}", reason, removed_transactions[0], extra_info),
n => debug!(
"Removed {} transactions ({}): {}{}",
n,
reason,
Expand Down
9 changes: 7 additions & 2 deletions rpc/grpc/core/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,13 @@ message GetSubnetworkResponseMessage{
RPCError error = 1000;
}

// GetVirtualChainFromBlockRequestMessage requests the virtual selected
// parent chain from some startHash to this kaspad's current virtual
/// GetVirtualChainFromBlockRequestMessage requests the virtual selected
/// parent chain from some startHash to this kaspad's current virtual
/// Note:
/// this call batches the response to:
/// a. the network's `mergeset size limit * 10` amount of added chain blocks, if `includeAcceptedTransactionIds = false`
/// b. or `mergeset size limit * 10` amount of merged blocks, if `includeAcceptedTransactionIds = true`
/// c. it does not batch the removed chain blocks, only the added ones.
message GetVirtualChainFromBlockRequestMessage{
string startHash = 1;
bool includeAcceptedTransactionIds = 2;
Expand Down
3 changes: 2 additions & 1 deletion rpc/service/src/converter/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,9 @@ impl ConsensusConverter {
&self,
consensus: &ConsensusProxy,
chain_path: &ChainPath,
merged_blocks_limit: Option<usize>,
) -> RpcResult<Vec<RpcAcceptedTransactionIds>> {
let acceptance_data = consensus.async_get_blocks_acceptance_data(chain_path.added.clone()).await.unwrap();
let acceptance_data = consensus.async_get_blocks_acceptance_data(chain_path.added.clone(), merged_blocks_limit).await.unwrap();
Ok(chain_path
.added
.iter()
Expand Down
Loading

0 comments on commit b14537f

Please sign in to comment.