Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reconstruct and fmt the sync codes #3969

Merged
merged 9 commits into from
Sep 20, 2023
29 changes: 16 additions & 13 deletions chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use starcoin_open_block::OpenedBlock;
use starcoin_state_api::{AccountStateReader, ChainStateReader, ChainStateWriter};
use starcoin_statedb::ChainStateDB;
use starcoin_storage::flexi_dag::SyncFlexiDagSnapshot;
use starcoin_storage::Store;
use starcoin_storage::storage::CodecKVStore;
use starcoin_storage::Store;
use starcoin_time_service::TimeService;
use starcoin_types::block::BlockIdAndNumber;
use starcoin_types::contract_event::ContractEventInfo;
Expand Down Expand Up @@ -108,7 +108,10 @@ impl BlockChain {
)),
None => None,
};
let dag_snapshot_tips = storage.get_accumulator_snapshot_storage().get(head_id)?.map(|snapshot| snapshot.child_hashes);
let dag_snapshot_tips = storage
.get_accumulator_snapshot_storage()
.get(head_id)?
.map(|snapshot| snapshot.child_hashes);
let mut chain = Self {
genesis_hash: genesis,
time_service,
Expand All @@ -123,11 +126,7 @@ impl BlockChain {
storage.as_ref(),
),
status: ChainStatusWithBlock {
status: ChainStatus::new(
head_block.header.clone(),
block_info,
dag_snapshot_tips,
),
status: ChainStatus::new(head_block.header.clone(), block_info, dag_snapshot_tips),
head: head_block,
},
statedb: chain_state,
Expand Down Expand Up @@ -638,21 +637,25 @@ impl BlockChain {
);
Ok(())
}

pub fn dag_parents_in_tips(&self, dag_parents: Vec<HashValue>) -> Result<bool> {
Ok(dag_parents.into_iter().all(|parent| {
match &self.status.status.tips_hash {
Ok(dag_parents
.into_iter()
.all(|parent| match &self.status.status.tips_hash {
Some(tips) => tips.contains(&parent),
None => false,
}
}))
}))
}

pub fn is_head_of_dag_accumulator(&self, next_tips: Vec<HashValue>) -> Result<bool> {
let key = Self::calculate_dag_accumulator_key(next_tips)?;
let next_tips_info = self.storage.get_dag_accumulator_info(key)?;

return Ok(next_tips_info == self.dag_accumulator.as_ref().map(|accumulator| accumulator.get_info()));
return Ok(next_tips_info
== self
.dag_accumulator
.as_ref()
.map(|accumulator| accumulator.get_info()));
}
}

Expand Down
2 changes: 1 addition & 1 deletion commons/stream-task/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use thiserror::Error;

#[derive(Clone, Copy, Debug)]
#[derive(Clone, Copy, Debug, PartialEq)]
pub enum CollectorState {
/// Collector is enough, do not feed more item, finish task.
Enough,
Expand Down
3 changes: 1 addition & 2 deletions node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use futures::executor::block_on;
use futures_timer::Delay;
use network_api::{PeerProvider, PeerSelector, PeerStrategy};
use starcoin_account_service::{AccountEventService, AccountService, AccountStorage};
use starcoin_accumulator::node::AccumulatorStoreType;
use starcoin_block_relayer::BlockRelayer;
use starcoin_chain_notify::ChainNotifyHandlerService;
use starcoin_chain_service::ChainReaderService;
Expand Down Expand Up @@ -46,7 +45,7 @@ use starcoin_storage::db_storage::DBStorage;
use starcoin_storage::errors::StorageInitError;
use starcoin_storage::metrics::StorageMetrics;
use starcoin_storage::storage::StorageInstance;
use starcoin_storage::{BlockStore, Storage, Store};
use starcoin_storage::{BlockStore, Storage};
use starcoin_stratum::service::{StratumService, StratumServiceFactory};
use starcoin_stratum::stratum::{Stratum, StratumFactory};
use starcoin_sync::announcement::AnnouncementService;
Expand Down
5 changes: 4 additions & 1 deletion storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,10 @@ impl SyncFlexiDagStore for Storage {

// for block chain
new_tips.iter().try_fold((), |_, block_id| {
if let Some(t) = self.flexi_dag_storage.get_hashes_by_hash(block_id.clone())? {
if let Some(t) = self
.flexi_dag_storage
.get_hashes_by_hash(block_id.clone())?
{
if t != snapshot {
bail!("the key {} should not exists", block_id);
}
Expand Down
9 changes: 0 additions & 9 deletions sync/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,6 @@ pub struct SyncTarget {
pub peers: Vec<PeerId>,
}

#[derive(Debug, Clone)]
pub struct NewBlockChainRequest {
pub new_head_block: HashValue,
}

impl ServiceRequest for NewBlockChainRequest {
type Response = anyhow::Result<()>;
}

#[derive(Debug, Clone)]
pub struct SyncStatusRequest;

Expand Down
62 changes: 24 additions & 38 deletions sync/src/block_connector/block_connector_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@

use crate::block_connector::{ExecuteRequest, ResetRequest, WriteBlockChainService};
use crate::sync::{CheckSyncEvent, SyncService};
use crate::tasks::{BlockConnectedEvent, BlockDiskCheckEvent};
use anyhow::{format_err, Result, Ok};
use crate::tasks::{BlockConnectedEvent, BlockDiskCheckEvent, BlockConnectedFinishEvent};
use anyhow::{format_err, Ok, Result};
use network_api::PeerProvider;
use starcoin_chain_api::{ConnectBlockError, WriteableChainService, ChainReader};
use starcoin_chain_api::{ChainReader, ConnectBlockError, WriteableChainService};
use starcoin_config::{NodeConfig, G_CRATE_VERSION};
use starcoin_consensus::BlockDAG;
use starcoin_executor::VMMetrics;
Expand All @@ -16,16 +16,14 @@ use starcoin_service_registry::{
ActorService, EventHandler, ServiceContext, ServiceFactory, ServiceHandler,
};
use starcoin_storage::{BlockStore, Storage};
use starcoin_sync_api::{PeerNewBlock, NewBlockChainRequest};
use starcoin_sync_api::PeerNewBlock;
use starcoin_txpool::TxPoolService;
use starcoin_types::block::ExecutedBlock;
use starcoin_types::sync_status::SyncStatus;
use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown, NewHeadBlock};
use starcoin_types::system_events::{MinedBlock, SyncStatusChangeEvent, SystemShutdown};
use std::sync::{Arc, Mutex};
use sysinfo::{DiskExt, System, SystemExt};

use super::BlockConnectedRequest;

const DISK_CHECKPOINT_FOR_PANIC: u64 = 1024 * 1024 * 1024 * 3;
const DISK_CHECKPOINT_FOR_WARN: u64 = 1024 * 1024 * 1024 * 5;

Expand Down Expand Up @@ -170,15 +168,28 @@ impl EventHandler<Self, BlockConnectedEvent> for BlockConnectorService {
fn handle_event(
&mut self,
msg: BlockConnectedEvent,
_ctx: &mut ServiceContext<BlockConnectorService>,
ctx: &mut ServiceContext<BlockConnectorService>,
) {
//because this block has execute at sync task, so just try connect to select head chain.
//TODO refactor connect and execute

let block = msg.block;
if let Err(e) = self.chain_service.try_connect(block, msg.dag_parents) {
error!("Process connected block error: {:?}", e);
let feedback = msg.feedback;

match msg.action {
crate::tasks::BlockConnectAction::ConnectNewBlock => {
if let Err(e) = self.chain_service.try_connect(block, msg.dag_parents) {
error!("Process connected new block from sync error: {:?}", e);
}
}
crate::tasks::BlockConnectAction::ConnectExecutedBlock => {
if let Err(e) = self.chain_service.switch_new_main(block.header().id(), ctx) {
error!("Process connected executed block from sync error: {:?}", e);
}
}
}

feedback.map(|f| f.unbounded_send(BlockConnectedFinishEvent));
}
}

Expand Down Expand Up @@ -222,7 +233,9 @@ impl EventHandler<Self, PeerNewBlock> for BlockConnectorService {
match connect_error {
ConnectBlockError::FutureBlock(block) => {
//TODO cache future block
if let std::result::Result::Ok(sync_service) = ctx.service_ref::<SyncService>() {
if let std::result::Result::Ok(sync_service) =
ctx.service_ref::<SyncService>()
{
info!(
"BlockConnector try connect future block ({:?},{}), peer_id:{:?}, notify Sync service check sync.",
block.id(),
Expand Down Expand Up @@ -279,18 +292,6 @@ impl ServiceHandler<Self, ResetRequest> for BlockConnectorService {
}
}

impl ServiceHandler<Self, NewBlockChainRequest> for BlockConnectorService {
fn handle(
&mut self,
msg: NewBlockChainRequest,
ctx: &mut ServiceContext<BlockConnectorService>,
) -> Result<()> {
let (new_branch, dag_parents, next_tips) = self.chain_service.switch_new_main(msg.new_head_block)?;
ctx.broadcast(NewHeadBlock(Arc::new(new_branch.head_block()), Some(dag_parents), Some(next_tips)));
Ok(())
}
}

impl ServiceHandler<Self, ExecuteRequest> for BlockConnectorService {
fn handle(
&mut self,
Expand All @@ -301,18 +302,3 @@ impl ServiceHandler<Self, ExecuteRequest> for BlockConnectorService {
.execute(msg.block, msg.dag_transaction_parent)
}
}

impl ServiceHandler<Self, BlockConnectedRequest> for BlockConnectorService {
fn handle(
&mut self,
msg: BlockConnectedRequest,
_ctx: &mut ServiceContext<BlockConnectorService>,
) -> Result<()> {
//because this block has execute at sync task, so just try connect to select head chain.
//TODO refactor connect and execute

let block = msg.block;
let result = self.chain_service.try_connect(block, msg.dag_parents);
result
}
}
10 changes: 0 additions & 10 deletions sync/src/block_connector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,3 @@ pub struct ExecuteRequest {
impl ServiceRequest for ExecuteRequest {
type Response = anyhow::Result<ExecutedBlock>;
}

#[derive(Debug, Clone)]
pub struct BlockConnectedRequest {
pub block: Block,
pub dag_parents: Option<Vec<HashValue>>,
}

impl ServiceRequest for BlockConnectedRequest {
type Response = anyhow::Result<()>;
}
5 changes: 2 additions & 3 deletions sync/src/block_connector/test_write_dag_block_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ pub fn gen_dag_blocks(
}
}

let result = writeable_block_chain_service
.execute_dag_block_pool();
let result = writeable_block_chain_service.execute_dag_block_pool();
let result = result.unwrap();
match result {
super::write_block_chain::ConnectOk::Duplicate(block)
Expand Down Expand Up @@ -159,7 +158,7 @@ async fn test_block_chain_switch_main() {
.get_main()
.current_header()
.id(),
last_block.unwrap()
last_block.unwrap()
);

last_block = gen_fork_dag_block_chain(
Expand Down
Loading