diff --git a/sync/src/parallel/executor.rs b/sync/src/parallel/executor.rs index 4b61ff3980..4735f2a769 100644 --- a/sync/src/parallel/executor.rs +++ b/sync/src/parallel/executor.rs @@ -14,6 +14,8 @@ use tokio::{ task::JoinHandle, }; +const MAX_TOTAL_WAITING_TIME: u64 = 3600000; // an hour + #[derive(Debug)] pub enum ExecuteState { Executing(HashValue), @@ -96,6 +98,8 @@ impl DagBlockExecutor { block.header().id() ); + let mut total_waiting_time: u64 = 0; + let waiting_per_time: u64 = 100; loop { match Self::waiting_for_parents( &self.dag, @@ -104,9 +108,34 @@ impl DagBlockExecutor { ) { Ok(true) => break, Ok(false) => { + if total_waiting_time >= MAX_TOTAL_WAITING_TIME { + error!( + "failed to check parents: {:?}, for reason: timeout", + header + ); + match self + .sender + .send(ExecuteState::Error(Box::new(header.clone()))) + .await + { + Ok(_) => (), + Err(e) => { + error!( + "failed to send error state: {:?}, for reason: {:?}", + header, e + ); + return; + } + } + return; + } tokio::task::yield_now().await; - tokio::time::sleep(tokio::time::Duration::from_millis(100)) - .await + tokio::time::sleep(tokio::time::Duration::from_millis( + waiting_per_time, + )) + .await; + total_waiting_time = + total_waiting_time.saturating_add(waiting_per_time); } Err(e) => { error!( diff --git a/sync/src/parallel/sender.rs b/sync/src/parallel/sender.rs index 4d30510b01..e0fd9316cb 100644 --- a/sync/src/parallel/sender.rs +++ b/sync/src/parallel/sender.rs @@ -26,7 +26,7 @@ struct DagBlockWorker { } pub struct DagBlockSender<'a> { - sync_dag_store: SyncDagStore, + sync_dag_store: Arc, executors: Vec, queue_size: usize, time_service: Arc, @@ -38,7 +38,7 @@ pub struct DagBlockSender<'a> { impl<'a> DagBlockSender<'a> { pub fn new( - sync_dag_store: SyncDagStore, + sync_dag_store: Arc, queue_size: usize, time_service: Arc, storage: Arc, diff --git a/sync/src/sync.rs b/sync/src/sync.rs index 4778a9a566..b57e5590e3 100644 --- a/sync/src/sync.rs +++ b/sync/src/sync.rs @@ -62,6 +62,7 @@ pub struct SyncService { stage: SyncStage, config: Arc, storage: Arc, + sync_dag_store: Arc, metrics: Option, peer_score_metrics: Option, vm_metrics: Option, @@ -83,6 +84,13 @@ impl SyncService { let head_block_info = storage .get_block_info(head_block_hash)? .ok_or_else(|| format_err!("can't get block info by hash {}", head_block_hash))?; + let sync_dag_store = Arc::new(SyncDagStore::create_from_path( + config.storage.sync_dir(), + SyncDagStoreConfig::create_with_params( + config.storage.cache_size(), + RocksdbConfig::default(), + ), + )?); //TODO bail PrometheusError after use custom metrics registry. let metrics = config .metrics @@ -97,6 +105,7 @@ impl SyncService { stage: SyncStage::NotStart, config, storage, + sync_dag_store, metrics, peer_score_metrics, vm_metrics, @@ -226,13 +235,7 @@ impl SyncService { let sync_metrics = self.metrics.clone(); let vm_metrics = self.vm_metrics.clone(); let dag = ctx.get_shared::()?; - let sync_dag_store = SyncDagStore::create_from_path( - config.storage.sync_dir(), - SyncDagStoreConfig::create_with_params( - config.storage.cache_size(), - RocksdbConfig::default(), - ), - )?; + let sync_dag_store = self.sync_dag_store.clone(); let fut = async move { let startup_info = storage .get_startup_info()? diff --git a/sync/src/tasks/block_sync_task.rs b/sync/src/tasks/block_sync_task.rs index e68ae73db8..95c03ad8ce 100644 --- a/sync/src/tasks/block_sync_task.rs +++ b/sync/src/tasks/block_sync_task.rs @@ -214,7 +214,7 @@ pub struct BlockCollector { local_store: Arc, fetcher: Arc, latest_block_id: HashValue, - sync_dag_store: SyncDagStore, + sync_dag_store: Arc, } impl ContinueChainOperator for BlockCollector @@ -264,7 +264,7 @@ where skip_pow_verify: bool, local_store: Arc, fetcher: Arc, - sync_dag_store: SyncDagStore, + sync_dag_store: Arc, ) -> Self { let latest_block_id = chain.current_header().id(); Self { diff --git a/sync/src/tasks/continue_execute_absent_block.rs b/sync/src/tasks/continue_execute_absent_block.rs index 0e3c3bebce..18c288af84 100644 --- a/sync/src/tasks/continue_execute_absent_block.rs +++ b/sync/src/tasks/continue_execute_absent_block.rs @@ -19,14 +19,14 @@ pub trait ContinueChainOperator { pub struct ContinueExecuteAbsentBlock<'a> { operator: &'a mut dyn ContinueChainOperator, local_store: Arc, - sync_dag_store: SyncDagStore, + sync_dag_store: Arc, } impl<'a> ContinueExecuteAbsentBlock<'a> { pub fn new( operator: &'a mut dyn ContinueChainOperator, local_store: Arc, - sync_dag_store: SyncDagStore, + sync_dag_store: Arc, ) -> anyhow::Result> { anyhow::Result::Ok(ContinueExecuteAbsentBlock { operator, diff --git a/sync/src/tasks/inner_sync_task.rs b/sync/src/tasks/inner_sync_task.rs index 7d91778143..8b0434961d 100644 --- a/sync/src/tasks/inner_sync_task.rs +++ b/sync/src/tasks/inner_sync_task.rs @@ -38,7 +38,7 @@ where peer_provider: N, custom_error_handle: Arc, dag: BlockDAG, - sync_dag_store: SyncDagStore, + sync_dag_store: Arc, } impl InnerSyncTask @@ -58,7 +58,7 @@ where peer_provider: N, custom_error_handle: Arc, dag: BlockDAG, - sync_dag_store: SyncDagStore, + sync_dag_store: Arc, ) -> Self { Self { ancestor, diff --git a/sync/src/tasks/mock.rs b/sync/src/tasks/mock.rs index 96b9822d1b..ac22c1112f 100644 --- a/sync/src/tasks/mock.rs +++ b/sync/src/tasks/mock.rs @@ -135,7 +135,7 @@ pub struct SyncNodeMocker { pub peer_id: PeerId, pub chain_mocker: MockChain, pub err_mocker: ErrorMocker, - pub sync_dag_store: SyncDagStore, + pub sync_dag_store: Arc, peer_selector: PeerSelector, } @@ -155,8 +155,10 @@ impl SyncNodeMocker { None, ); let peer_selector = PeerSelector::new(vec![peer_info], PeerStrategy::default(), None); - let sync_dag_store = SyncDagStore::create_for_testing() - .context("Failed to create SyncDagStore for testing")?; + let sync_dag_store = Arc::new( + SyncDagStore::create_for_testing() + .context("Failed to create SyncDagStore for testing")?, + ); Ok(Self::new_inner( peer_id, chain, @@ -186,7 +188,7 @@ impl SyncNodeMocker { None, ); let peer_selector = PeerSelector::new(vec![peer_info], PeerStrategy::default(), None); - let sync_dag_store = SyncDagStore::create_for_testing()?; + let sync_dag_store = Arc::new(SyncDagStore::create_for_testing()?); Ok(Self::new_inner( peer_id, chain, @@ -206,7 +208,7 @@ impl SyncNodeMocker { let peer_id = PeerId::random(); let peer_info = PeerInfo::new(peer_id.clone(), chain.chain_info(), vec![], vec![], None); let peer_selector = PeerSelector::new(vec![peer_info], PeerStrategy::default(), None); - let sync_dag_store = SyncDagStore::create_for_testing()?; + let sync_dag_store = Arc::new(SyncDagStore::create_for_testing()?); Ok(Self::new_inner( peer_id, chain, @@ -223,7 +225,7 @@ impl SyncNodeMocker { delay_milliseconds: u64, random_error_percent: u32, peer_selector: PeerSelector, - sync_dag_store: SyncDagStore, + sync_dag_store: Arc, ) -> Self { Self::new_inner( peer_id, @@ -241,7 +243,7 @@ impl SyncNodeMocker { error_strategy: ErrorStrategy, random_error_percent: u32, peer_selector: PeerSelector, - sync_dag_store: SyncDagStore, + sync_dag_store: Arc, ) -> Self { Self { peer_id: peer_id.clone(), diff --git a/sync/src/tasks/mod.rs b/sync/src/tasks/mod.rs index 48abe6d18d..51a4cf844b 100644 --- a/sync/src/tasks/mod.rs +++ b/sync/src/tasks/mod.rs @@ -622,7 +622,7 @@ pub fn full_sync_task( sync_metrics: Option, vm_metrics: Option, dag: BlockDAG, - sync_dag_store: SyncDagStore, + sync_dag_store: Arc, ) -> Result<( BoxFuture<'static, Result>, TaskHandle, diff --git a/sync/src/tasks/tests.rs b/sync/src/tasks/tests.rs index fee80c643f..797d0a7095 100644 --- a/sync/src/tasks/tests.rs +++ b/sync/src/tasks/tests.rs @@ -54,7 +54,7 @@ which is no longer suitable for the dag"] pub async fn test_failed_block() -> Result<()> { let net = ChainNetwork::new_builtin(BuiltinNetworkID::Halley); let (storage, chain_info, _, dag) = Genesis::init_storage_for_test(&net)?; - let sync_dag_store = SyncDagStore::create_for_testing()?; + let sync_dag_store = Arc::new(SyncDagStore::create_for_testing()?); let chain = BlockChain::new( net.time_service(), @@ -917,7 +917,7 @@ async fn test_sync_target() { 300, 0, peer_selector, - SyncDagStore::create_for_testing().expect("failed to create the sync dag store"), + Arc::new(SyncDagStore::create_for_testing().expect("failed to create the sync dag store")), )); let full_target = node2 .get_best_target(genesis_chain_info.total_difficulty())