From 34ed48c6b8bbd477e6d980f8898729dd5a4c0bc1 Mon Sep 17 00:00:00 2001 From: Jack Huang Date: Wed, 8 Jan 2025 09:02:26 +0800 Subject: [PATCH] Enhance parallel (#4369) * print less info about ghost data * no delete in reset * get the block diligently when calling the rpc method * add sync the specific block * break if succeed to execute the block in waiting list to restart the execution process for those previous blocks that maybe failed to be executed * use heap for execution waiting list * use compact ghostdata * add/remove some info * save the block into the local for cache * close sync * no process for the future block * wait 500ms for fetch the dag block * add use get block * add fetch_blocks * add fet hcblock * use ctx spawn to re execute the dag block * connect the specific block at the last step * use BTreeSet * fix fmt * 1, use event handle 2, sync process will mutex * allow parell sync * remove the dirty tips * check the dag data interity * 1, re insert the reachability data 2, add mutex when dag commiting * fix typo * remove unused if block --- block-relayer/src/block_relayer.rs | 6 - chain/src/chain.rs | 11 +- flexidag/src/blockdag.rs | 117 ++++++- flexidag/src/reachability/inquirer.rs | 24 +- flexidag/tests/tests.rs | 2 +- network-p2p/src/service.rs | 12 +- network/api/src/peer_provider.rs | 20 +- node/src/node.rs | 49 +-- sync/api/src/lib.rs | 45 +++ .../block_connector_service.rs | 30 +- sync/src/block_connector/write_block_chain.rs | 102 +++--- sync/src/parallel/executor.rs | 2 +- sync/src/sync.rs | 293 ++++++++++++++++-- sync/src/tasks/block_sync_task.rs | 1 - sync/src/verified_rpc_client.rs | 89 ++++++ 15 files changed, 643 insertions(+), 160 deletions(-) diff --git a/block-relayer/src/block_relayer.rs b/block-relayer/src/block_relayer.rs index 40e88c6f71..115fb1b882 100644 --- a/block-relayer/src/block_relayer.rs +++ b/block-relayer/src/block_relayer.rs @@ -341,12 +341,6 @@ impl EventHandler for BlockRelayer { if let Some(metrics) = self.metrics.as_ref() { metrics.block_relay_time.observe(time_sec); } - sl_info!( - "{action} {hash} {time_sec}", - time_sec = time_sec, - hash = compact_block_msg.message.compact_block.header.id().to_hex(), - action = "block_relay_time", - ); //TODO should filter too old block? if let Err(e) = self.handle_block_event(compact_block_msg, ctx) { diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 99886657f0..4b72772df0 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -384,6 +384,15 @@ impl BlockChain { .ok_or_else(|| format_err!("Can not find block hash by number {}", number)) } + pub fn check_parents_ready(&self, header: &BlockHeader) -> bool { + header.parents_hash().into_iter().all(|parent| { + self.has_dag_block(parent).unwrap_or_else(|e| { + warn!("check_parents_ready error: {:?}", e); + false + }) + }) + } + fn check_exist_block(&self, block_id: HashValue, block_number: BlockNumber) -> Result { Ok(self .get_hash_by_number(block_number)? @@ -1360,7 +1369,7 @@ impl ChainReader for BlockChain { return Ok(false); } - self.dag.has_dag_block(header.id()) + self.dag.has_block_connected(&header) } fn check_chain_type(&self) -> Result { diff --git a/flexidag/src/blockdag.rs b/flexidag/src/blockdag.rs index ce59648f47..6ee2cf2404 100644 --- a/flexidag/src/blockdag.rs +++ b/flexidag/src/blockdag.rs @@ -19,6 +19,7 @@ use crate::process_key_already_error; use crate::prune::pruning_point_manager::PruningPointManagerT; use crate::reachability::ReachabilityError; use anyhow::{bail, ensure, Ok}; +use parking_lot::Mutex; use rocksdb::WriteBatch; use starcoin_config::temp_dir; use starcoin_crypto::{HashValue as Hash, HashValue}; @@ -54,6 +55,7 @@ pub struct BlockDAG { pub storage: FlexiDagStorage, ghostdag_manager: DbGhostdagManager, pruning_point_manager: PruningPointManager, + commit_lock: Arc>, } impl BlockDAG { @@ -75,11 +77,12 @@ impl BlockDAG { reachability_service.clone(), ); let pruning_point_manager = PruningPointManager::new(reachability_service, ghostdag_store); - + let commit_lock = Arc::new(Mutex::new(db.clone())); Self { ghostdag_manager, storage: db, pruning_point_manager, + commit_lock, } } @@ -98,13 +101,101 @@ impl BlockDAG { Ok(Self::new(k, dag_storage)) } - pub fn has_dag_block(&self, hash: Hash) -> anyhow::Result { - Ok(self.storage.header_store.has(hash)?) + pub fn has_block_connected(&self, block_header: &BlockHeader) -> anyhow::Result { + let _ghostdata = match self.storage.ghost_dag_store.get_data(block_header.id()) { + std::result::Result::Ok(data) => data, + Err(e) => { + warn!( + "failed to get ghostdata by hash: {:?}, the block should be re-executed", + e + ); + return anyhow::Result::Ok(false); + } + }; + + let _dag_header = match self.storage.header_store.get_header(block_header.id()) { + std::result::Result::Ok(header) => header, + Err(e) => { + warn!( + "failed to get header by hash: {:?}, the block should be re-executed", + e + ); + return anyhow::Result::Ok(false); + } + }; + + let parents = match self + .storage + .relations_store + .read() + .get_parents(block_header.id()) + { + std::result::Result::Ok(parents) => parents, + Err(e) => { + warn!( + "failed to get parents by hash: {:?}, the block should be re-executed", + e + ); + return anyhow::Result::Ok(false); + } + }; + + if !parents.iter().all(|parent| { + let children = match self.storage.relations_store.read().get_children(*parent) { + std::result::Result::Ok(children) => children, + Err(e) => { + warn!("failed to get children by hash: {:?}, the block should be re-executed", e); + return false; + } + }; + + if !children.contains(&block_header.id()) { + warn!("the parent: {:?} does not have the child: {:?}", parent, block_header.id()); + return false; + } + + match inquirer::is_dag_ancestor_of(&*self.storage.reachability_store.read(), *parent, block_header.id()) { + std::result::Result::Ok(pass) => { + if !pass { + warn!("failed to check ancestor, the block: {:?} is not the descendant of its parent: {:?}, the block should be re-executed", block_header.id(), *parent); + return false; + } + true + } + Err(e) => { + warn!("failed to check ancestor, the block: {:?} is not the descendant of its parent: {:?}, the block should be re-executed, error: {:?}", block_header.id(), *parent, e); + false + } + } + }) { + return anyhow::Result::Ok(false); + } + + if block_header.pruning_point() == HashValue::zero() { + return anyhow::Result::Ok(true); + } else { + match inquirer::is_dag_ancestor_of( + &*self.storage.reachability_store.read(), + block_header.pruning_point(), + block_header.id(), + ) { + std::result::Result::Ok(pass) => { + if !pass { + warn!("failed to check ancestor, the block: {:?} is not the descendant of the pruning: {:?}", block_header.id(), block_header.pruning_point()); + return anyhow::Result::Ok(false); + } + } + Err(e) => { + warn!("failed to check ancestor, the block: {:?} is not the descendant of the pruning: {:?}, error: {:?}", block_header.id(), block_header.pruning_point(), e); + return anyhow::Result::Ok(false); + } + } + } + + anyhow::Result::Ok(true) } pub fn check_ancestor_of(&self, ancestor: Hash, descendant: Hash) -> anyhow::Result { - // self.ghostdag_manager - // .check_ancestor_of(ancestor, descendant) inquirer::is_dag_ancestor_of( &*self.storage.reachability_store.read(), ancestor, @@ -239,11 +330,12 @@ impl BlockDAG { ); } - info!("start to commit via batch, header id: {:?}", header.id()); - // Create a DB batch writer let mut batch = WriteBatch::default(); + info!("start to commit via batch, header id: {:?}", header.id()); + let lock_guard = self.commit_lock.lock(); + // lock the dag data to write in batch // the cache will be written at the same time // when the batch is written before flush to the disk and @@ -322,6 +414,7 @@ impl BlockDAG { .write_batch(batch) .expect("failed to write dag data in batch"); + drop(lock_guard); info!("finish writing the batch, head id: {:?}", header.id()); Ok(()) @@ -381,6 +474,9 @@ impl BlockDAG { // Create a DB batch writer let mut batch = WriteBatch::default(); + info!("start to commit via batch, header id: {:?}", header.id()); + let lock_guard = self.commit_lock.lock(); + // lock the dag data to write in batch, read lock. // the cache will be written at the same time // when the batch is written before flush to the disk and @@ -460,6 +556,7 @@ impl BlockDAG { .write_batch(batch) .expect("failed to write dag data in batch"); + drop(lock_guard); info!("finish writing the batch, head id: {:?}", header.id()); Ok(()) @@ -533,12 +630,12 @@ impl BlockDAG { pruning_depth: u64, pruning_finality: u64, ) -> anyhow::Result { - info!("start to calculate the mergeset and tips, previous pruning point: {:?}, previous ghostdata: {:?}", previous_pruning_point, previous_ghostdata); + info!("start to calculate the mergeset and tips, previous pruning point: {:?}, previous ghostdata: {:?} and its red block count: {:?}", previous_pruning_point, previous_ghostdata.to_compact(), previous_ghostdata.mergeset_reds.len()); let dag_state = self.get_dag_state(previous_pruning_point)?; let next_ghostdata = self.ghostdata(&dag_state.tips)?; info!( - "start to calculate the mergeset and tips for tips: {:?}, and last pruning point: {:?} and next ghostdata: {:?}", - dag_state.tips, previous_pruning_point, next_ghostdata, + "start to calculate the mergeset and tips for tips: {:?}, and last pruning point: {:?} and next ghostdata: {:?}, red block count: {:?}", + dag_state.tips, previous_pruning_point, next_ghostdata.to_compact(), next_ghostdata.mergeset_reds.len() ); let next_pruning_point = self.pruning_point_manager().next_pruning_point( previous_pruning_point, diff --git a/flexidag/src/reachability/inquirer.rs b/flexidag/src/reachability/inquirer.rs index fa6f8cbd68..8dfca4746e 100644 --- a/flexidag/src/reachability/inquirer.rs +++ b/flexidag/src/reachability/inquirer.rs @@ -74,8 +74,30 @@ fn add_dag_block( mergeset_iterator: HashIterator, ) -> Result<()> { // Update the future covering set for blocks in the mergeset + let mut insert_future_set_result: Vec> = Vec::new(); for merged_block in mergeset_iterator { - insert_to_future_covering_set(store, merged_block, new_block)?; + let result = insert_to_future_covering_set(store, merged_block, new_block); + if result.is_err() { + match result { + Err(ReachabilityError::DataInconsistency) => { + // This is a data inconsistency error, which means that the block is already in the future covering set + // of the merged block. This is a serious error, and we should propagate it. + insert_future_set_result.push(Err(ReachabilityError::DataInconsistency)); + } + Err(ReachabilityError::HashesNotOrdered) => { + // This is a hashes not ordered error, which means that the merged block is not in the future covering set + // of the new block. This is a serious error, and we should propagate it. + return Err(ReachabilityError::HashesNotOrdered); + } + _ => { + // This is an unexpected error, and we should propagate it. + return result; + } + } + } + } + for result in insert_future_set_result.into_iter() { + result?; } Ok(()) } diff --git a/flexidag/tests/tests.rs b/flexidag/tests/tests.rs index ffc402708f..d9ddde5b34 100644 --- a/flexidag/tests/tests.rs +++ b/flexidag/tests/tests.rs @@ -149,7 +149,7 @@ async fn test_with_spawn() { std::result::Result::Ok(_) => break, Err(e) => { debug!("failed to commit error: {:?}, i: {:?}", e, i); - if dag_clone.has_dag_block(block_clone.id()).unwrap() { + if dag_clone.has_block_connected(&block_clone).unwrap() { break; } count -= 1; diff --git a/network-p2p/src/service.rs b/network-p2p/src/service.rs index 90f85fe55b..0e7d4bde5d 100644 --- a/network-p2p/src/service.rs +++ b/network-p2p/src/service.rs @@ -1308,12 +1308,12 @@ impl Future for NetworkWorker { })) => { if let Some(metrics) = this.metrics.as_ref() { for (protocol, message) in &messages { - info!( - "[network-p2p] receive notification from {} {} {}", - remote, - protocol, - message.len() - ); + // info!( + // "[network-p2p] receive notification from {} {} {}", + // remote, + // protocol, + // message.len() + // ); metrics .notifications_sizes .with_label_values(&["in", protocol]) diff --git a/network/api/src/peer_provider.rs b/network/api/src/peer_provider.rs index 37ded13c1c..b6c17eddf9 100644 --- a/network/api/src/peer_provider.rs +++ b/network/api/src/peer_provider.rs @@ -276,18 +276,18 @@ impl PeerSelector { peers }); if best_peers.is_empty() || best_peers[0].total_difficulty() <= min_difficulty { - info!( - "best peer difficulty {:?} is smaller than min difficulty {:?}, return None", - best_peers[0].total_difficulty(), - min_difficulty - ); + // info!( + // "best peer difficulty {:?} is smaller than min difficulty {:?}, return None", + // best_peers[0].total_difficulty(), + // min_difficulty + // ); None } else { - info!( - "best peer difficulty {:?}, info: {:?} picked", - best_peers[0].total_difficulty(), - best_peers - ); + // info!( + // "best peer difficulty {:?}, info: {:?} picked", + // best_peers[0].total_difficulty(), + // best_peers + // ); Some(best_peers) } } diff --git a/node/src/node.rs b/node/src/node.rs index 6041b0ecf4..dc6ca390c7 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -7,11 +7,10 @@ use crate::peer_message_handler::NodePeerMessageHandler; use crate::rpc_service_factory::RpcServiceFactory; use crate::NodeHandle; use actix::prelude::*; -use anyhow::{format_err, Result}; +use anyhow::Result; use futures::channel::oneshot; use futures::executor::block_on; use futures_timer::Delay; -use network_api::{PeerProvider, PeerSelector, PeerStrategy}; use starcoin_account_service::{AccountEventService, AccountService, AccountStorage}; use starcoin_block_relayer::BlockRelayer; use starcoin_chain_notify::ChainNotifyHandlerService; @@ -26,7 +25,7 @@ use starcoin_miner::generate_block_event_pacemaker::GenerateBlockEventPacemaker; use starcoin_miner::{BlockBuilderService, MinerService}; use starcoin_miner_client::job_bus_client::JobBusClient; use starcoin_miner_client::miner::MinerClientService; -use starcoin_network::{NetworkActorService, NetworkServiceRef}; +use starcoin_network::NetworkActorService; use starcoin_network_rpc::NetworkRpcService; use starcoin_node_api::errors::NodeStartError; use starcoin_node_api::message::{NodeRequest, NodeResponse}; @@ -48,10 +47,10 @@ use starcoin_storage::{BlockStore, Storage}; use starcoin_stratum::service::{StratumService, StratumServiceFactory}; use starcoin_stratum::stratum::{Stratum, StratumFactory}; use starcoin_sync::announcement::AnnouncementService; -use starcoin_sync::block_connector::{BlockConnectorService, ExecuteRequest, ResetRequest}; +use starcoin_sync::block_connector::{BlockConnectorService, ResetRequest}; use starcoin_sync::sync::SyncService; use starcoin_sync::txn_sync::TxnSyncService; -use starcoin_sync::verified_rpc_client::VerifiedRpcClient; +use starcoin_sync_api::SyncSpecificTargretRequest; use starcoin_txpool::{TxPoolActorService, TxPoolService}; use starcoin_types::blockhash::KType; use starcoin_types::system_events::{SystemShutdown, SystemStarted}; @@ -142,44 +141,14 @@ impl ServiceHandler for NodeService { NodeResponse::AsyncResult(receiver) } NodeRequest::ReExecuteBlock(block_hash) => { - let storage = self - .registry - .get_shared_sync::>() - .expect("Storage must exist."); - - let connect_service = ctx - .service_ref::>()? - .clone(); - let network = ctx.get_shared::()?; + let sync_service = ctx.service_ref::()?.clone(); let fut = async move { info!("Prepare to re execute block {}", block_hash); - let block = match storage.get_block(block_hash)? { - Some(block) => Some(block), - None => { - info!("Get block from peer to peer network"); - //get block from peer to peer network. - let peer_set = network.peer_set().await?; - if peer_set.is_empty() { - info!("Peers is empty."); - None - } else { - let peer_selector = - PeerSelector::new(peer_set, PeerStrategy::Best, None); - peer_selector.retain_rpc_peers(); - let rpc_client = VerifiedRpcClient::new(peer_selector, network); - let mut blocks = rpc_client.get_blocks(vec![block_hash]).await?; - blocks.pop().flatten().map(|(block, _peer)| block) - } - } - }; - let block = block.ok_or_else(|| { - format_err!( - "Can not find block by {} from local and peer to peer network.", - block_hash - ) + sync_service.notify(SyncSpecificTargretRequest { + block: None, + block_id: block_hash, + peer_id: None, })?; - let result = connect_service.send(ExecuteRequest { block }).await??; - info!("Re execute result: {:?}", result); Ok(()) }; let receiver = ctx.exec(fut); diff --git a/sync/api/src/lib.rs b/sync/api/src/lib.rs index 1162b2954f..602ff0887c 100644 --- a/sync/api/src/lib.rs +++ b/sync/api/src/lib.rs @@ -1,6 +1,8 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 +use std::cmp::Ordering; + use anyhow::Result; use network_api::PeerId; use network_api::PeerStrategy; @@ -16,6 +18,38 @@ pub use stream_task::TaskProgressReport; mod service; +#[derive(Clone, Debug, Eq)] +pub struct SyncBlockSort { + pub block: Block, +} + +impl PartialEq for SyncBlockSort { + fn eq(&self, other: &Self) -> bool { + self.block.header().id() == other.block.header().id() + } +} + +impl PartialOrd for SyncBlockSort { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for SyncBlockSort { + fn cmp(&self, other: &Self) -> Ordering { + let result = self + .block + .header() + .number() + .cmp(&other.block.header().number()); + if Ordering::Equal == result { + self.block.header().id().cmp(&other.block.header().id()) + } else { + result + } + } +} + #[derive(Clone, Debug)] pub struct StartSyncTxnEvent; @@ -60,6 +94,17 @@ impl ServiceRequest for SyncStatusRequest { type Response = SyncStatus; } +#[derive(Debug, Clone)] +pub struct SyncSpecificTargretRequest { + pub block: Option, + pub block_id: HashValue, + pub peer_id: Option, +} + +impl ServiceRequest for SyncSpecificTargretRequest { + type Response = Result<()>; +} + #[derive(Debug, Clone)] pub struct SyncProgressRequest; diff --git a/sync/src/block_connector/block_connector_service.rs b/sync/src/block_connector/block_connector_service.rs index 600c1dd885..df3162e8ff 100644 --- a/sync/src/block_connector/block_connector_service.rs +++ b/sync/src/block_connector/block_connector_service.rs @@ -16,6 +16,7 @@ use anyhow::{bail, format_err, Ok, Result}; use network_api::PeerProvider; use starcoin_chain::BlockChain; use starcoin_chain_api::{ChainReader, ConnectBlockError, WriteableChainService}; +use starcoin_config::genesis_config::G_BASE_MAX_UNCLES_PER_BLOCK; use starcoin_config::{NodeConfig, G_CRATE_VERSION}; use starcoin_consensus::Consensus; use starcoin_crypto::HashValue; @@ -30,10 +31,12 @@ use starcoin_service_registry::{ }; use starcoin_storage::{BlockStore, Storage}; use starcoin_sync_api::PeerNewBlock; +use starcoin_sync_api::SyncSpecificTargretRequest; use starcoin_txpool::TxPoolService; use starcoin_txpool_api::TxPoolSyncService; #[cfg(test)] use starcoin_txpool_mock_service::MockTxPoolService; +use starcoin_types::block::BlockHeader; use starcoin_types::block::ExecutedBlock; use starcoin_types::sync_status::SyncStatus; use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown}; @@ -122,6 +125,23 @@ where None } + + // return false if the number of the block is larger than the current number of the chain. + // or return false if the gap of those two blocks is larger than 2 * G_BASE_MAX_UNCLES_PER_BLOCK + // else return true. + // return false will trigger the burden sync operation. + // return true will trigger the specific(light) sync operation. + fn is_near_block(&self, block_header: &BlockHeader) -> bool { + let current_number = self.chain_service.get_main().status().head().number(); + if current_number <= block_header.number() { + return false; + } + let gap = current_number.saturating_sub(block_header.number()); + if gap <= G_BASE_MAX_UNCLES_PER_BLOCK.saturating_mul(2) { + return true; + } + false + } } impl ServiceFactory @@ -308,7 +328,15 @@ where block.header().number(), peer_id ); - let _ = sync_service.notify(CheckSyncEvent::default()); + if !self.is_near_block(block.as_ref().header()) { + let _ = sync_service.notify(CheckSyncEvent::default()); + } else { + let _ = sync_service.notify(SyncSpecificTargretRequest { + block: Some(block.as_ref().clone()), + block_id: block.id(), + peer_id: Some(peer_id), + }); + } } } e => { diff --git a/sync/src/block_connector/write_block_chain.rs b/sync/src/block_connector/write_block_chain.rs index 4569bdba7b..509dfcac66 100644 --- a/sync/src/block_connector/write_block_chain.rs +++ b/sync/src/block_connector/write_block_chain.rs @@ -28,7 +28,7 @@ use starcoin_types::{ }; #[cfg(test)] use starcoin_vm_types::{account_address::AccountAddress, transaction::SignedUserTransaction}; -use std::collections::HashSet; +// use std::collections::HashSet; use std::{fmt::Formatter, sync::Arc}; use super::BlockConnectorService; @@ -380,55 +380,55 @@ where self.dag.clone(), )?; - let start = new_head_block.header().id(); - let lastest = self.main.status().head.clone(); - - let lastest_dag_state = if lastest.pruning_point() == HashValue::zero() { - let genesis = self - .main - .get_storage() - .get_genesis()? - .ok_or_else(|| format_err!("Cannot get the genesis in storage!"))?; - self.main.dag().get_dag_state(genesis)? - } else { - self.main.dag().get_dag_state(lastest.pruning_point())? - }; - - let mut deleted_chain = lastest_dag_state.tips.into_iter().collect::>(); - let mut ready_to_delete = HashSet::new(); - loop { - let loop_to_delete = deleted_chain.clone(); - deleted_chain.clear(); - for descendant in loop_to_delete.into_iter() { - if descendant == start { - continue; - } - if self.main.dag().check_ancestor_of(descendant, start)? { - continue; - } - - let descendant_header = self - .storage - .get_block_header_by_hash(descendant)? - .ok_or_else(|| { - format_err!( - "in resetting, cannot find the block header for {:?}", - descendant - ) - })?; - deleted_chain.extend(descendant_header.parents_hash()); - - ready_to_delete.insert(descendant); - } - - if deleted_chain.is_empty() { - for candidate in ready_to_delete.into_iter() { - self.storage.delete_block(candidate)?; - self.storage.delete_block_info(candidate)?; - } - break; - } - } + // let start = new_head_block.header().id(); + // let lastest = self.main.status().head.clone(); + + // let lastest_dag_state = if lastest.pruning_point() == HashValue::zero() { + // let genesis = self + // .main + // .get_storage() + // .get_genesis()? + // .ok_or_else(|| format_err!("Cannot get the genesis in storage!"))?; + // self.main.dag().get_dag_state(genesis)? + // } else { + // self.main.dag().get_dag_state(lastest.pruning_point())? + // }; + + // let mut deleted_chain = lastest_dag_state.tips.into_iter().collect::>(); + // let mut ready_to_delete = HashSet::new(); + // loop { + // let loop_to_delete = deleted_chain.clone(); + // deleted_chain.clear(); + // for descendant in loop_to_delete.into_iter() { + // if descendant == start { + // continue; + // } + // if self.main.dag().check_ancestor_of(descendant, start)? { + // continue; + // } + + // let descendant_header = self + // .storage + // .get_block_header_by_hash(descendant)? + // .ok_or_else(|| { + // format_err!( + // "in resetting, cannot find the block header for {:?}", + // descendant + // ) + // })?; + // deleted_chain.extend(descendant_header.parents_hash()); + + // ready_to_delete.insert(descendant); + // } + + // if deleted_chain.is_empty() { + // for candidate in ready_to_delete.into_iter() { + // self.storage.delete_block(candidate)?; + // self.storage.delete_block_info(candidate)?; + // } + // break; + // } + // } if new_head_block.header().pruning_point() == HashValue::zero() { let genesis = self @@ -657,7 +657,7 @@ where .header() .parents_hash() .iter() - .all(|parent_hash| self.main.dag().has_dag_block(*parent_hash).unwrap_or(false)) + .all(|parent_hash| self.main.has_dag_block(*parent_hash).unwrap_or(false)) { debug!( "block: {:?} is a future dag block, trigger sync to pull other dag blocks", diff --git a/sync/src/parallel/executor.rs b/sync/src/parallel/executor.rs index 4735f2a769..251f08ca64 100644 --- a/sync/src/parallel/executor.rs +++ b/sync/src/parallel/executor.rs @@ -69,7 +69,7 @@ impl DagBlockExecutor { return Ok(false); } - if !chain.has_dag_block(parent_id)? { + if !chain.has_block_connected(&header)? { return Ok(false); } } diff --git a/sync/src/sync.rs b/sync/src/sync.rs index b57e5590e3..cf21b22d50 100644 --- a/sync/src/sync.rs +++ b/sync/src/sync.rs @@ -4,15 +4,16 @@ use crate::block_connector::BlockConnectorService; use crate::store::sync_dag_store::{SyncDagStore, SyncDagStoreConfig}; use crate::sync_metrics::SyncMetrics; -use crate::tasks::{full_sync_task, AncestorEvent, SyncFetcher}; +use crate::tasks::{full_sync_task, AncestorEvent, BlockFetcher, SyncFetcher}; use crate::verified_rpc_client::{RpcVerifyError, VerifiedRpcClient}; use anyhow::{format_err, Result}; use futures::FutureExt; use futures_timer::Delay; use network_api::peer_score::PeerScoreMetrics; use network_api::{PeerId, PeerProvider, PeerSelector, PeerStrategy, ReputationChange}; -use starcoin_chain::BlockChain; -use starcoin_chain_api::ChainReader; +use starcoin_chain::verifier::DagVerifier; +use starcoin_chain::{BlockChain, ChainWriter}; +use starcoin_chain_api::{ChainReader, ExecutedBlock}; use starcoin_config::{NodeConfig, RocksdbConfig}; use starcoin_dag::blockdag::BlockDAG; use starcoin_executor::VMMetrics; @@ -22,17 +23,20 @@ use starcoin_network::PeerEvent; use starcoin_service_registry::{ ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler, }; +use starcoin_storage::block::DagSyncBlock; use starcoin_storage::block_info::BlockInfoStore; use starcoin_storage::{BlockStore, Storage}; use starcoin_sync_api::{ - PeerScoreRequest, PeerScoreResponse, SyncCancelRequest, SyncProgressReport, - SyncProgressRequest, SyncServiceHandler, SyncStartRequest, SyncStatusRequest, SyncTarget, + PeerScoreRequest, PeerScoreResponse, SyncBlockSort, SyncCancelRequest, SyncProgressReport, + SyncProgressRequest, SyncServiceHandler, SyncSpecificTargretRequest, SyncStartRequest, + SyncStatusRequest, SyncTarget, }; use starcoin_txpool::TxPoolService; -use starcoin_types::block::BlockIdAndNumber; +use starcoin_types::block::{Block, BlockIdAndNumber}; use starcoin_types::startup_info::ChainStatus; use starcoin_types::sync_status::SyncStatus; use starcoin_types::system_events::{NewHeadBlock, SyncStatusChangeEvent, SystemStarted}; +use std::collections::{BTreeSet, HashSet}; use std::result::Result::Ok; use std::sync::Arc; use std::time::Duration; @@ -182,6 +186,242 @@ impl SyncService { ))) } + fn try_to_start_sync(&mut self) -> Result { + let previous_stage = std::mem::replace(&mut self.stage, SyncStage::Checking); + match previous_stage { + SyncStage::NotStart | SyncStage::Done => Ok(true), + SyncStage::Checking => { + info!("[sync] Sync stage is already in Checking"); + Ok(false) + } + SyncStage::Synchronizing(task_handle) => { + info!("[sync] Sync stage is already in Synchronizing"); + if let Some(report) = task_handle.task_event_handle.get_report() { + info!("[sync] report: {}", report); + } + //restore to Synchronizing + self.stage = SyncStage::Synchronizing(task_handle); + Ok(false) + } + SyncStage::Canceling => { + info!("[sync] Sync task is in canceling."); + Ok(false) + } + } + } + + fn check_and_start_light_sync( + &mut self, + msg: SyncSpecificTargretRequest, + ctx: &mut ServiceContext, + ) -> Result<()> { + let network = ctx.get_shared::()?; + let config = self.config.clone(); + let storage = self.storage.clone(); + let dag = ctx.get_shared::()?; + + let fut = async move { + let verified_rpc_client = Self::create_verified_client( + network, + config.clone(), + Some(PeerStrategy::Best), + msg.peer_id + .map_or_else(std::vec::Vec::new, |peer_id| vec![peer_id]), + None, + ) + .await?; + let startup_info = storage + .get_startup_info()? + .ok_or_else(|| format_err!("Startup info should exist."))?; + let mut chain = BlockChain::new( + config.net().time_service(), + startup_info.main, + storage.clone(), + None, + dag, + )?; + + let specific_block = match msg.block { + Some(block) => block, + None => { + if let Some(block) = storage.get_block(msg.block_id)? { + block + } else if let Some(sync_dag_block) = storage.get_dag_sync_block(msg.block_id)? { + sync_dag_block.block + } else { + let block_from_remote = + verified_rpc_client.fetch_blocks(vec![msg.block_id]).await?; + if block_from_remote.len() != 1 { + return Err(format_err!( + "Get block by id failed, block id: {:?}", + msg.block_id + )); + } + let block = block_from_remote + .first() + .expect("should not be none") + .0 + .clone(); + storage.save_dag_sync_block(DagSyncBlock { + block: block.clone(), + children: vec![], + })?; + block + } + } + }; + + // ensure the previous blocks are ready to be executed or were executed already + info!( + "[sync specific] Start to sync specific block: {:?}", + specific_block.id() + ); + + let mut current_round = specific_block.header().parents_hash(); + let mut next_round = vec![]; + let mut blocks_to_be_executed = vec![specific_block.clone()]; + + while !current_round.is_empty() { + for block_id in current_round { + // already executed + if chain.has_dag_block(block_id)? { + continue; + } + + // fetch from the local + match storage.get_block(block_id)? { + Some(block_in_local) => next_round.push(block_in_local), + None => { + if let Some(sync_dag_block) = storage.get_dag_sync_block(block_id)? { + next_round.push(sync_dag_block.block); + } else { + // fetch from the remote + let parents_in_remote = + verified_rpc_client.fetch_blocks(vec![block_id]).await?; + if parents_in_remote.len() != 1 { + return Err(format_err!( + "Get block by id failed, block id: {:?}", + block_id + )); + } + let block = parents_in_remote + .first() + .expect("should not be none") + .0 + .clone(); + next_round.push(block.clone()); + storage.save_dag_sync_block(DagSyncBlock { + block: next_round + .last() + .expect("impossible to be none") + .clone(), + children: vec![], + })?; + } + } + } + } + if next_round.is_empty() { + break; + } + current_round = next_round + .iter() + .flat_map(|block| block.header().parents_hash()) + .collect::>() + .into_iter() + .collect::>(); + blocks_to_be_executed.extend(next_round); + next_round = vec![]; + info!( + "[sync specific] Fetch parents blocks, current_round: {:?}", + current_round + ); + } + let mut waiting_for_execution_heap = blocks_to_be_executed + .into_iter() + .map(|block| SyncBlockSort { block }) + .collect::>(); + + let mut failed_blocks: HashSet = HashSet::new(); + info!("[sync specific] Start to execute blocks"); + while let Some(SyncBlockSort { block }) = + waiting_for_execution_heap.iter().next().cloned() + { + if chain.has_dag_block(block.id())? { + waiting_for_execution_heap.remove(&SyncBlockSort { + block: block.clone(), + }); + continue; + } + if !chain.check_parents_ready(block.header()) { + failed_blocks.insert(block.clone()); + waiting_for_execution_heap.remove(&SyncBlockSort { + block: block.clone(), + }); + continue; + } + match chain.verify_with_verifier::(block.clone()) { + Ok(verified_executed_block) => match chain.execute(verified_executed_block) { + Ok(_) => { + waiting_for_execution_heap.extend(failed_blocks.iter().map(|block| { + SyncBlockSort { + block: block.clone(), + } + })); + waiting_for_execution_heap.remove(&SyncBlockSort { + block: block.clone(), + }); + failed_blocks.clear(); + continue; + } + Err(e) => { + warn!( + "[sync specific] Execute block failed, block id: {:?}, error: {:?}", + block.id(), + e + ); + waiting_for_execution_heap.remove(&SyncBlockSort { + block: block.clone(), + }); + failed_blocks.insert(block.clone()); + continue; + } + }, + Err(_) => { + return Err(format_err!( + "Verify block failed, block id: {:?}", + block.id() + )) + } + } + } + + if chain.has_dag_block(msg.block_id)? { + chain.connect(ExecutedBlock { + block: specific_block, + block_info: storage.get_block_info(msg.block_id)?.ok_or_else(|| { + format_err!("failed to get the block info for id: {:?}", msg.block_id) + })?, + })?; + info!("[sync specific] Sync specific block done"); + } else { + return Err(format_err!( + "Sync specific block failed, block id: {:?}", + specific_block.id() + )); + } + info!("[sync specific] Sync specific block done"); + Ok(()) + }; + + ctx.spawn(fut.then(|result| async move { + if let Err(e) = result { + error!("[sync specific] Sync specific block failed, error: {:?}", e); + } + })); + Ok(()) + } + pub fn check_and_start_sync( &mut self, peers: Vec, @@ -197,32 +437,14 @@ impl SyncService { if let Some(sync_task_total) = sync_task_total.as_ref() { sync_task_total.with_label_values(&["check"]).inc(); } - match std::mem::replace(&mut self.stage, SyncStage::Checking) { - SyncStage::NotStart | SyncStage::Done => { - //continue - info!( - "[sync] Start checking sync,skip_pow_verify:{}, special peers: {:?}", - skip_pow_verify, peers - ); - } - SyncStage::Checking => { - info!("[sync] Sync stage is already in Checking"); - return Ok(()); - } - SyncStage::Synchronizing(task_handle) => { - info!("[sync] Sync stage is already in Synchronizing"); - if let Some(report) = task_handle.task_event_handle.get_report() { - info!("[sync] report: {}", report); - } - //restore to Synchronizing - self.stage = SyncStage::Synchronizing(task_handle); - return Ok(()); - } - SyncStage::Canceling => { - info!("[sync] Sync task is in canceling."); - return Ok(()); - } + + if !self.try_to_start_sync()? { + return Ok(()); } + info!( + "[sync] Start checking sync,skip_pow_verify:{}, special peers: {:?}", + skip_pow_verify, peers + ); let network = ctx.get_shared::()?; let storage = self.storage.clone(); @@ -621,6 +843,15 @@ impl EventHandler for SyncService { } } +impl EventHandler for SyncService { + fn handle_event(&mut self, msg: SyncSpecificTargretRequest, ctx: &mut ServiceContext) { + match self.check_and_start_light_sync(msg, ctx) { + Ok(()) => (), + Err(e) => warn!("[sync] Check and start light sync failed: {:?}", e), + } + } +} + impl ServiceHandler for SyncService { fn handle(&mut self, _msg: SyncStatusRequest, _ctx: &mut ServiceContext) -> SyncStatus { self.sync_status.clone() diff --git a/sync/src/tasks/block_sync_task.rs b/sync/src/tasks/block_sync_task.rs index 95c03ad8ce..68409b1aa9 100644 --- a/sync/src/tasks/block_sync_task.rs +++ b/sync/src/tasks/block_sync_task.rs @@ -224,7 +224,6 @@ where { fn has_dag_block(&self, block_id: HashValue) -> anyhow::Result { self.chain - .dag() .has_dag_block(block_id) .context("Failed to check if DAG block exists") } diff --git a/sync/src/verified_rpc_client.rs b/sync/src/verified_rpc_client.rs index dd666eda66..eaf5ce2d74 100644 --- a/sync/src/verified_rpc_client.rs +++ b/sync/src/verified_rpc_client.rs @@ -22,6 +22,7 @@ use starcoin_types::{ block::{BlockHeader, BlockInfo, BlockNumber}, transaction::TransactionInfo, }; +use std::collections::HashMap; use std::fmt::Debug; use std::time::Instant; use thiserror::Error; @@ -734,6 +735,94 @@ impl VerifiedRpcClient { .into()) } + pub async fn get_block_diligently( + &self, + ids: Vec, + ) -> Result)>>> { + let peer_infos = self + .peer_selector + .bests(0.into()) + .ok_or_else(|| format_err!("No peers for send request."))?; + + let mut result: HashMap = HashMap::new(); + let mut waiting_list = ids.clone(); + for peer_info in peer_infos { + let blocks = match self + .get_blocks_inner(peer_info.peer_id(), waiting_list.clone()) + .await + { + Ok(blocks) => blocks, + Err(err) => { + warn!("get blocks failed:{}, call get blocks legacy", err); + vec![] + } + }; + if blocks.is_empty() { + continue; + } + + let rpc_result = waiting_list + .into_iter() + .zip(blocks) + .map(|(id, block)| { + if let Some(block) = block { + let actual_id = block.id(); + if actual_id != id { + warn!( + "Get block by id: {:?} from peer: {:?}, but got block: {:?}", + id, + peer_info.peer_id(), + actual_id + ); + (id, None) + } else { + (id, Some(block)) + } + } else { + (id, None) + } + }) + .collect::>(); + + waiting_list = vec![]; + + result.extend( + rpc_result + .iter() + .filter(|(id, block)| { + if block.is_none() { + waiting_list.push(*id); + } + block.is_some() + }) + .cloned() + .map(|(id, block)| { + ( + id, + ( + block.expect("block should not be none"), + peer_info.peer_id(), + ), + ) + }) + .collect::>(), + ); + + if waiting_list.is_empty() { + break; + } + } + + Ok(ids + .into_iter() + .map(|id| { + result.get(&id).map(|block_and_peerid| { + (block_and_peerid.0.clone(), Some(block_and_peerid.1.clone())) + }) + }) + .collect()) + } + pub async fn get_blocks( &self, ids: Vec,