diff --git a/sync/src/block_connector/block_connector_service.rs b/sync/src/block_connector/block_connector_service.rs index f87d1d8c29..11e1addffa 100644 --- a/sync/src/block_connector/block_connector_service.rs +++ b/sync/src/block_connector/block_connector_service.rs @@ -1,12 +1,16 @@ // Copyright (c) The Starcoin Core Contributors // SPDX-License-Identifier: Apache-2.0 +#[cfg(test)] +use super::CheckBlockConnectorHashValue; use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService}; use crate::sync::{CheckSyncEvent, SyncService}; use crate::tasks::{BlockConnectedEvent, BlockConnectedFinishEvent, BlockDiskCheckEvent}; +#[cfg(test)] +use anyhow::bail; use anyhow::{format_err, Result}; use network_api::PeerProvider; -use starcoin_chain_api::{ConnectBlockError, WriteableChainService, ChainReader}; +use starcoin_chain_api::{ChainReader, ConnectBlockError, WriteableChainService}; use starcoin_config::{NodeConfig, G_CRATE_VERSION}; use starcoin_crypto::HashValue; use starcoin_executor::VMMetrics; @@ -26,8 +30,6 @@ use starcoin_types::sync_status::SyncStatus; use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown}; use std::sync::Arc; use sysinfo::{DiskExt, System, SystemExt}; -#[cfg(test)] -use super::CheckBlockConnectorHashValue; const DISK_CHECKPOINT_FOR_PANIC: u64 = 1024 * 1024 * 1024 * 3; const DISK_CHECKPOINT_FOR_WARN: u64 = 1024 * 1024 * 1024 * 5; @@ -190,9 +192,7 @@ where } } -impl EventHandler - for BlockConnectorService -{ +impl EventHandler for BlockConnectorService { fn handle_event( &mut self, msg: BlockConnectedEvent, @@ -222,9 +222,7 @@ impl EventHandler } #[cfg(test)] -impl EventHandler - for BlockConnectorService -{ +impl EventHandler for BlockConnectorService { fn handle_event( &mut self, msg: BlockConnectedEvent, @@ -386,11 +384,10 @@ where ) -> Result<()> { if self.chain_service.get_main().status().head().id() == msg.head_hash { info!("the branch in chain service is the same as target's branch"); - return Ok(()); + Ok(()) + } else { + info!("mock branch in chain service is not the same as target's branch"); + bail!("blockchain in chain service is not the same as target!"); } - info!("mock branch in chain service is not the same as target's branch"); - bail!("blockchain in chain service is not the same as target!"); } } - - diff --git a/sync/src/block_connector/mod.rs b/sync/src/block_connector/mod.rs index c95df5a5a6..72ea8d3560 100644 --- a/sync/src/block_connector/mod.rs +++ b/sync/src/block_connector/mod.rs @@ -51,4 +51,3 @@ pub struct CheckBlockConnectorHashValue { impl ServiceRequest for CheckBlockConnectorHashValue { type Response = anyhow::Result<()>; } - diff --git a/sync/src/tasks/block_sync_task.rs b/sync/src/tasks/block_sync_task.rs index fbb517930c..3fe42d66cc 100644 --- a/sync/src/tasks/block_sync_task.rs +++ b/sync/src/tasks/block_sync_task.rs @@ -266,9 +266,9 @@ where // finally, if it is the last one, wait for the last block to be processed. if block_connect_event.feedback.is_some() && receiver.is_some() { - let mut count = 0; + let mut count: i32 = 0; while count < 3 { - count += 1; + count = count.saturating_add(1); match receiver.as_mut().unwrap().try_next() { Ok(_) => { break; @@ -280,7 +280,7 @@ where } } } - return Ok(state); + Ok(state) } fn apply_block(&mut self, block: Block, peer_id: Option) -> Result<()> { diff --git a/sync/src/tasks/inner_sync_task.rs b/sync/src/tasks/inner_sync_task.rs index dc80fc4415..8d0f70e953 100644 --- a/sync/src/tasks/inner_sync_task.rs +++ b/sync/src/tasks/inner_sync_task.rs @@ -118,7 +118,7 @@ where .and_then(move |(ancestor, accumulator), event_handle| { let check_local_store = ancestor_block_info.total_difficulty <= current_block_info.total_difficulty; - + let block_sync_task = BlockSyncTask::new( accumulator, ancestor, diff --git a/sync/src/tasks/mock.rs b/sync/src/tasks/mock.rs index c5c47d1099..6b9ddb3296 100644 --- a/sync/src/tasks/mock.rs +++ b/sync/src/tasks/mock.rs @@ -173,8 +173,7 @@ impl SyncNodeMocker { delay_milliseconds: u64, random_error_percent: u32, ) -> Result { - let chain = - MockChain::new_with_storage(net, storage, chain_info.head().id().clone(), miner)?; + let chain = MockChain::new_with_storage(net, storage, chain_info.head().id(), miner)?; let peer_id = PeerId::random(); let peer_info = PeerInfo::new( peer_id.clone(), diff --git a/sync/src/tasks/mod.rs b/sync/src/tasks/mod.rs index 60bb1079c5..7577beaa00 100644 --- a/sync/src/tasks/mod.rs +++ b/sync/src/tasks/mod.rs @@ -407,8 +407,7 @@ pub trait BlockConnectedEventHandle: Send + Clone + std::marker::Unpin { fn handle(&mut self, event: BlockConnectedEvent) -> Result<()>; } -impl BlockConnectedEventHandle for ServiceRef> -{ +impl BlockConnectedEventHandle for ServiceRef> { fn handle(&mut self, event: BlockConnectedEvent) -> Result<()> { self.notify(event)?; Ok(()) @@ -416,11 +415,10 @@ impl BlockConnectedEventHandle for ServiceRef> -{ +impl BlockConnectedEventHandle for ServiceRef> { fn handle(&mut self, event: BlockConnectedEvent) -> Result<()> { self.notify(event)?; - return Ok(()); + Ok(()) } } diff --git a/sync/src/tasks/tests.rs b/sync/src/tasks/tests.rs index 2e357ab652..bfa9b02e51 100644 --- a/sync/src/tasks/tests.rs +++ b/sync/src/tasks/tests.rs @@ -30,19 +30,17 @@ use starcoin_crypto::HashValue; use starcoin_genesis::Genesis; use starcoin_genesis::Genesis as StarcoinGenesis; use starcoin_logger::prelude::*; -use starcoin_service_registry::{RegistryAsyncService, RegistryService, ServiceRef, ActorService}; +use starcoin_service_registry::{RegistryAsyncService, RegistryService, ServiceRef}; use starcoin_storage::{BlockStore, Storage}; use starcoin_sync_api::SyncTarget; -use starcoin_txpool::{TxPoolActorService, TxPoolService}; -use starcoin_txpool_api::TxPoolSyncService; use starcoin_txpool_mock_service::MockTxPoolService; use starcoin_types::{ block::{Block, BlockBody, BlockHeaderBuilder, BlockIdAndNumber, BlockInfo}, U256, }; -use stest::actix_export::System; use std::collections::HashMap; use std::sync::{Arc, Mutex}; +use stest::actix_export::System; use stream_task::{ DefaultCustomErrorHandle, Generator, TaskError, TaskEventCounterHandle, TaskGenerator, }; @@ -1014,19 +1012,13 @@ fn sync_block_in_async_connection( let mut chain = MockChain::new_with_storage( thread_local_node.chain_mocker.net().clone(), storage.clone(), - thread_local_node - .chain_mocker - .head() - .status() - .head - .id() - .clone(), + thread_local_node.chain_mocker.head().status().head.id(), thread_local_node.chain_mocker.miner().clone(), ) .unwrap(); loop { - match receiver.try_next() { - std::result::Result::Ok(result) => match result { + if let std::result::Result::Ok(result) = receiver.try_next() { + match result { Some(event) => { chain .select_head(event.block) @@ -1042,8 +1034,7 @@ fn sync_block_in_async_connection( } } None => break, - }, - Err(_) => (), + } } } }; @@ -1061,7 +1052,7 @@ fn sync_block_in_async_connection( false, local_net.time_service(), storage.clone(), - sender.clone(), + sender, target_node.clone(), local_ancestor_sender, DummyNetworkService::default(), @@ -1090,9 +1081,9 @@ async fn test_sync_block_in_async_connection() -> Result<()> { let (storage, chain_info, _) = Genesis::init_storage_for_test(&net).expect("init storage by genesis fail."); let local_node = Arc::new(SyncNodeMocker::new_with_storage( - net.clone(), + net, storage.clone(), - chain_info.clone(), + chain_info, AccountInfo::random(), 1, 0, @@ -1100,7 +1091,7 @@ async fn test_sync_block_in_async_connection() -> Result<()> { target_node = sync_block_in_async_connection(target_node, local_node.clone(), storage.clone(), 10)?; - _ = sync_block_in_async_connection(target_node, local_node.clone(), storage.clone(), 20)?; + _ = sync_block_in_async_connection(target_node, local_node, storage, 20)?; Ok(()) } @@ -1119,20 +1110,23 @@ fn sync_block_in_block_connection_service_mock( let storage = local_node.chain().get_storage(); let startup_info = storage - .get_startup_info()? - .ok_or_else(|| format_err!("Startup info should exist."))?; + .get_startup_info()? + .ok_or_else(|| format_err!("Startup info should exist."))?; let current_block_id = startup_info.main; let local_net = local_node.chain_mocker.net(); let (local_ancestor_sender, _local_ancestor_receiver) = unbounded(); - + let (sync_task, _task_handle, task_event_counter) = full_sync_task( current_block_id, target.clone(), false, local_net.time_service(), storage.clone(), - async_std::task::block_on(registry.service_ref::>())?.clone(), + async_std::task::block_on( + registry.service_ref::>(), + )? + .clone(), target_node.clone(), local_ancestor_sender, DummyNetworkService::default(), @@ -1144,10 +1138,15 @@ fn sync_block_in_block_connection_service_mock( info!("checking branch in sync service is the same as target's branch"); assert_eq!(branch.current_header().id(), target.target_id.id()); - let block_connector_service = async_std::task::block_on(registry.service_ref::>())?.clone(); - let result = async_std::task::block_on(block_connector_service.send(CheckBlockConnectorHashValue { - head_hash: target.target_id.id(), - }))?; + let block_connector_service = async_std::task::block_on( + registry.service_ref::>(), + )? + .clone(); + let result = async_std::task::block_on(block_connector_service.send( + CheckBlockConnectorHashValue { + head_hash: target.target_id.id(), + }, + ))?; if result.is_ok() { break; } @@ -1165,7 +1164,7 @@ async fn test_sync_block_apply_failed_but_connect_success() -> Result<()> { let config = Arc::new(NodeConfig::random_for_test()); let (storage, chain_info, _) = StarcoinGenesis::init_storage_for_test(config.net()) .expect("init storage by genesis fail."); - + let target_node = Arc::new(SyncNodeMocker::new(config.net().clone(), 1, 0)?); let local_node = Arc::new(SyncNodeMocker::new_with_storage( config.net().clone(), @@ -1178,7 +1177,7 @@ async fn test_sync_block_apply_failed_but_connect_success() -> Result<()> { let (registry_sender, registry_receiver) = async_std::channel::unbounded(); - let _handle = timeout_join_handler::spawn(move|| { + let _handle = timeout_join_handler::spawn(move || { let system = System::with_tokio_rt(|| { tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -1198,19 +1197,29 @@ async fn test_sync_block_apply_failed_but_connect_success() -> Result<()> { registry .register::>() - .await.unwrap(); + .await + .unwrap(); registry_sender.send(registry).await.unwrap(); - }); - + system.run().unwrap(); }); let registry = registry_receiver.recv().await.unwrap(); - let target_node = sync_block_in_block_connection_service_mock(target_node, local_node.clone(), ®istry, 10)?; - _ = sync_block_in_block_connection_service_mock(target_node, local_node.clone(), ®istry, 20)?; + let target_node = sync_block_in_block_connection_service_mock( + target_node, + local_node.clone(), + ®istry, + 10, + )?; + _ = sync_block_in_block_connection_service_mock( + target_node, + local_node.clone(), + ®istry, + 20, + )?; Ok(()) }