Skip to content

Commit

Permalink
Enhance parallel (#4369)
Browse files Browse the repository at this point in the history
* print less info about ghost data

* no delete in reset

* get the block diligently when calling the rpc method

* add sync the specific block

* break if succeed to execute the block in waiting list to restart the execution process for those previous blocks that maybe failed to be executed

* use heap for execution waiting list

* use compact ghostdata

* add/remove some info

* save the block into the local for cache

* close sync

* no process for the future block

* wait 500ms for fetch the dag block

* add use get block

* add fetch_blocks

* add fet hcblock

* use ctx spawn to re execute the dag block

* connect the specific block at the last step

* use BTreeSet

* fix fmt

* 1, use event handle
2, sync process will mutex

* allow parell sync

* remove the dirty tips

* check the dag data interity

* 1, re insert the reachability data
2, add mutex when dag commiting

* fix typo

* remove unused if block
  • Loading branch information
jackzhhuang authored Jan 8, 2025
1 parent 5fbf4e3 commit 34ed48c
Show file tree
Hide file tree
Showing 15 changed files with 643 additions and 160 deletions.
6 changes: 0 additions & 6 deletions block-relayer/src/block_relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,6 @@ impl EventHandler<Self, PeerCompactBlockMessage> for BlockRelayer {
if let Some(metrics) = self.metrics.as_ref() {
metrics.block_relay_time.observe(time_sec);
}
sl_info!(
"{action} {hash} {time_sec}",
time_sec = time_sec,
hash = compact_block_msg.message.compact_block.header.id().to_hex(),
action = "block_relay_time",
);
//TODO should filter too old block?

if let Err(e) = self.handle_block_event(compact_block_msg, ctx) {
Expand Down
11 changes: 10 additions & 1 deletion chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,15 @@ impl BlockChain {
.ok_or_else(|| format_err!("Can not find block hash by number {}", number))
}

pub fn check_parents_ready(&self, header: &BlockHeader) -> bool {
header.parents_hash().into_iter().all(|parent| {
self.has_dag_block(parent).unwrap_or_else(|e| {
warn!("check_parents_ready error: {:?}", e);
false
})
})
}

fn check_exist_block(&self, block_id: HashValue, block_number: BlockNumber) -> Result<bool> {
Ok(self
.get_hash_by_number(block_number)?
Expand Down Expand Up @@ -1360,7 +1369,7 @@ impl ChainReader for BlockChain {
return Ok(false);
}

self.dag.has_dag_block(header.id())
self.dag.has_block_connected(&header)
}

fn check_chain_type(&self) -> Result<ChainType> {
Expand Down
117 changes: 107 additions & 10 deletions flexidag/src/blockdag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::process_key_already_error;
use crate::prune::pruning_point_manager::PruningPointManagerT;
use crate::reachability::ReachabilityError;
use anyhow::{bail, ensure, Ok};
use parking_lot::Mutex;
use rocksdb::WriteBatch;
use starcoin_config::temp_dir;
use starcoin_crypto::{HashValue as Hash, HashValue};
Expand Down Expand Up @@ -54,6 +55,7 @@ pub struct BlockDAG {
pub storage: FlexiDagStorage,
ghostdag_manager: DbGhostdagManager,
pruning_point_manager: PruningPointManager,
commit_lock: Arc<Mutex<FlexiDagStorage>>,
}

impl BlockDAG {
Expand All @@ -75,11 +77,12 @@ impl BlockDAG {
reachability_service.clone(),
);
let pruning_point_manager = PruningPointManager::new(reachability_service, ghostdag_store);

let commit_lock = Arc::new(Mutex::new(db.clone()));
Self {
ghostdag_manager,
storage: db,
pruning_point_manager,
commit_lock,
}
}

Expand All @@ -98,13 +101,101 @@ impl BlockDAG {
Ok(Self::new(k, dag_storage))
}

pub fn has_dag_block(&self, hash: Hash) -> anyhow::Result<bool> {
Ok(self.storage.header_store.has(hash)?)
pub fn has_block_connected(&self, block_header: &BlockHeader) -> anyhow::Result<bool> {
let _ghostdata = match self.storage.ghost_dag_store.get_data(block_header.id()) {
std::result::Result::Ok(data) => data,
Err(e) => {
warn!(
"failed to get ghostdata by hash: {:?}, the block should be re-executed",
e
);
return anyhow::Result::Ok(false);
}
};

let _dag_header = match self.storage.header_store.get_header(block_header.id()) {
std::result::Result::Ok(header) => header,
Err(e) => {
warn!(
"failed to get header by hash: {:?}, the block should be re-executed",
e
);
return anyhow::Result::Ok(false);
}
};

let parents = match self
.storage
.relations_store
.read()
.get_parents(block_header.id())
{
std::result::Result::Ok(parents) => parents,
Err(e) => {
warn!(
"failed to get parents by hash: {:?}, the block should be re-executed",
e
);
return anyhow::Result::Ok(false);
}
};

if !parents.iter().all(|parent| {
let children = match self.storage.relations_store.read().get_children(*parent) {
std::result::Result::Ok(children) => children,
Err(e) => {
warn!("failed to get children by hash: {:?}, the block should be re-executed", e);
return false;
}
};

if !children.contains(&block_header.id()) {
warn!("the parent: {:?} does not have the child: {:?}", parent, block_header.id());
return false;
}

match inquirer::is_dag_ancestor_of(&*self.storage.reachability_store.read(), *parent, block_header.id()) {
std::result::Result::Ok(pass) => {
if !pass {
warn!("failed to check ancestor, the block: {:?} is not the descendant of its parent: {:?}, the block should be re-executed", block_header.id(), *parent);
return false;
}
true
}
Err(e) => {
warn!("failed to check ancestor, the block: {:?} is not the descendant of its parent: {:?}, the block should be re-executed, error: {:?}", block_header.id(), *parent, e);
false
}
}
}) {
return anyhow::Result::Ok(false);
}

if block_header.pruning_point() == HashValue::zero() {
return anyhow::Result::Ok(true);
} else {
match inquirer::is_dag_ancestor_of(
&*self.storage.reachability_store.read(),
block_header.pruning_point(),
block_header.id(),
) {
std::result::Result::Ok(pass) => {
if !pass {
warn!("failed to check ancestor, the block: {:?} is not the descendant of the pruning: {:?}", block_header.id(), block_header.pruning_point());
return anyhow::Result::Ok(false);
}
}
Err(e) => {
warn!("failed to check ancestor, the block: {:?} is not the descendant of the pruning: {:?}, error: {:?}", block_header.id(), block_header.pruning_point(), e);
return anyhow::Result::Ok(false);
}
}
}

anyhow::Result::Ok(true)
}

pub fn check_ancestor_of(&self, ancestor: Hash, descendant: Hash) -> anyhow::Result<bool> {
// self.ghostdag_manager
// .check_ancestor_of(ancestor, descendant)
inquirer::is_dag_ancestor_of(
&*self.storage.reachability_store.read(),
ancestor,
Expand Down Expand Up @@ -239,11 +330,12 @@ impl BlockDAG {
);
}

info!("start to commit via batch, header id: {:?}", header.id());

// Create a DB batch writer
let mut batch = WriteBatch::default();

info!("start to commit via batch, header id: {:?}", header.id());
let lock_guard = self.commit_lock.lock();

// lock the dag data to write in batch
// the cache will be written at the same time
// when the batch is written before flush to the disk and
Expand Down Expand Up @@ -322,6 +414,7 @@ impl BlockDAG {
.write_batch(batch)
.expect("failed to write dag data in batch");

drop(lock_guard);
info!("finish writing the batch, head id: {:?}", header.id());

Ok(())
Expand Down Expand Up @@ -381,6 +474,9 @@ impl BlockDAG {
// Create a DB batch writer
let mut batch = WriteBatch::default();

info!("start to commit via batch, header id: {:?}", header.id());
let lock_guard = self.commit_lock.lock();

// lock the dag data to write in batch, read lock.
// the cache will be written at the same time
// when the batch is written before flush to the disk and
Expand Down Expand Up @@ -460,6 +556,7 @@ impl BlockDAG {
.write_batch(batch)
.expect("failed to write dag data in batch");

drop(lock_guard);
info!("finish writing the batch, head id: {:?}", header.id());

Ok(())
Expand Down Expand Up @@ -533,12 +630,12 @@ impl BlockDAG {
pruning_depth: u64,
pruning_finality: u64,
) -> anyhow::Result<MineNewDagBlockInfo> {
info!("start to calculate the mergeset and tips, previous pruning point: {:?}, previous ghostdata: {:?}", previous_pruning_point, previous_ghostdata);
info!("start to calculate the mergeset and tips, previous pruning point: {:?}, previous ghostdata: {:?} and its red block count: {:?}", previous_pruning_point, previous_ghostdata.to_compact(), previous_ghostdata.mergeset_reds.len());
let dag_state = self.get_dag_state(previous_pruning_point)?;
let next_ghostdata = self.ghostdata(&dag_state.tips)?;
info!(
"start to calculate the mergeset and tips for tips: {:?}, and last pruning point: {:?} and next ghostdata: {:?}",
dag_state.tips, previous_pruning_point, next_ghostdata,
"start to calculate the mergeset and tips for tips: {:?}, and last pruning point: {:?} and next ghostdata: {:?}, red block count: {:?}",
dag_state.tips, previous_pruning_point, next_ghostdata.to_compact(), next_ghostdata.mergeset_reds.len()
);
let next_pruning_point = self.pruning_point_manager().next_pruning_point(
previous_pruning_point,
Expand Down
24 changes: 23 additions & 1 deletion flexidag/src/reachability/inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,30 @@ fn add_dag_block(
mergeset_iterator: HashIterator,
) -> Result<()> {
// Update the future covering set for blocks in the mergeset
let mut insert_future_set_result: Vec<std::result::Result<(), ReachabilityError>> = Vec::new();
for merged_block in mergeset_iterator {
insert_to_future_covering_set(store, merged_block, new_block)?;
let result = insert_to_future_covering_set(store, merged_block, new_block);
if result.is_err() {
match result {
Err(ReachabilityError::DataInconsistency) => {
// This is a data inconsistency error, which means that the block is already in the future covering set
// of the merged block. This is a serious error, and we should propagate it.
insert_future_set_result.push(Err(ReachabilityError::DataInconsistency));
}
Err(ReachabilityError::HashesNotOrdered) => {
// This is a hashes not ordered error, which means that the merged block is not in the future covering set
// of the new block. This is a serious error, and we should propagate it.
return Err(ReachabilityError::HashesNotOrdered);
}
_ => {
// This is an unexpected error, and we should propagate it.
return result;
}
}
}
}
for result in insert_future_set_result.into_iter() {
result?;
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion flexidag/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async fn test_with_spawn() {
std::result::Result::Ok(_) => break,
Err(e) => {
debug!("failed to commit error: {:?}, i: {:?}", e, i);
if dag_clone.has_dag_block(block_clone.id()).unwrap() {
if dag_clone.has_block_connected(&block_clone).unwrap() {
break;
}
count -= 1;
Expand Down
12 changes: 6 additions & 6 deletions network-p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,12 +1308,12 @@ impl<T: BusinessLayerHandle + Send> Future for NetworkWorker<T> {
})) => {
if let Some(metrics) = this.metrics.as_ref() {
for (protocol, message) in &messages {
info!(
"[network-p2p] receive notification from {} {} {}",
remote,
protocol,
message.len()
);
// info!(
// "[network-p2p] receive notification from {} {} {}",
// remote,
// protocol,
// message.len()
// );
metrics
.notifications_sizes
.with_label_values(&["in", protocol])
Expand Down
20 changes: 10 additions & 10 deletions network/api/src/peer_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,18 +276,18 @@ impl PeerSelector {
peers
});
if best_peers.is_empty() || best_peers[0].total_difficulty() <= min_difficulty {
info!(
"best peer difficulty {:?} is smaller than min difficulty {:?}, return None",
best_peers[0].total_difficulty(),
min_difficulty
);
// info!(
// "best peer difficulty {:?} is smaller than min difficulty {:?}, return None",
// best_peers[0].total_difficulty(),
// min_difficulty
// );
None
} else {
info!(
"best peer difficulty {:?}, info: {:?} picked",
best_peers[0].total_difficulty(),
best_peers
);
// info!(
// "best peer difficulty {:?}, info: {:?} picked",
// best_peers[0].total_difficulty(),
// best_peers
// );
Some(best_peers)
}
}
Expand Down
Loading

0 comments on commit 34ed48c

Please sign in to comment.