From 48ff2dbedb474daca4639ae9e0ca95d5371fd7bd Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini Date: Mon, 11 Jan 2021 14:15:23 -0300 Subject: [PATCH 1/2] send new-tip message to explorer so the logic will always be the same. --- jormungandr/src/blockchain/process.rs | 41 ++++++++++++++++-- jormungandr/src/explorer/mod.rs | 62 +++++++++++---------------- jormungandr/src/intercom.rs | 1 + jormungandr/src/network/bootstrap.rs | 3 +- 4 files changed, 64 insertions(+), 43 deletions(-) diff --git a/jormungandr/src/blockchain/process.rs b/jormungandr/src/blockchain/process.rs index 5a83259fee..b261390db6 100644 --- a/jormungandr/src/blockchain/process.rs +++ b/jormungandr/src/blockchain/process.rs @@ -188,12 +188,20 @@ impl Process { fn start_branch_reprocessing(&self, info: &TokioServiceInfo) { let tip = self.blockchain_tip.clone(); let blockchain = self.blockchain.clone(); + let explorer = self.explorer_msgbox.clone(); let logger = info.logger().clone(); info.run_periodic_fallible( "branch reprocessing", BRANCH_REPROCESSING_INTERVAL, - move || reprocess_tip(logger.clone(), blockchain.clone(), tip.clone()), + move || { + reprocess_tip( + logger.clone(), + blockchain.clone(), + tip.clone(), + explorer.clone(), + ) + }, ) } @@ -279,7 +287,12 @@ fn try_request_fragment_removal( /// this function will re-process the tip against the different branches /// this is because a branch may have become more interesting with time /// moving forward and branches may have been dismissed -async fn reprocess_tip(logger: Logger, mut blockchain: Blockchain, tip: Tip) -> Result<(), Error> { +async fn reprocess_tip( + logger: Logger, + mut blockchain: Blockchain, + tip: Tip, + explorer_msg_box: Option>, +) -> Result<(), Error> { let branches: Vec> = blockchain.branches().branches().await; let tip_as_ref = tip.get_ref().await; @@ -290,7 +303,14 @@ async fn reprocess_tip(logger: Logger, mut blockchain: Blockchain, tip: Tip) -> .collect::>(); for other in others { - process_new_ref(&logger, &mut blockchain, tip.clone(), Arc::clone(other)).await? + process_new_ref( + &logger, + &mut blockchain, + tip.clone(), + Arc::clone(other), + explorer_msg_box.clone(), + ) + .await? } Ok(()) @@ -310,6 +330,7 @@ pub async fn process_new_ref( blockchain: &mut Blockchain, mut tip: Tip, candidate: Arc, + explorer_msg_box: Option>, ) -> Result<(), Error> { let candidate_hash = candidate.hash(); let tip_ref = tip.get_ref().await; @@ -354,6 +375,15 @@ pub async fn process_new_ref( let branch = blockchain.branches_mut().apply_or_create(candidate).await; tip.swap(branch).await; } + + if let Some(mut msg_box) = explorer_msg_box { + msg_box + .send(ExplorerMsg::NewTip(candidate_hash)) + .await + .unwrap_or_else(|err| { + error!(logger, "cannot send new tip to explorer: {}", err) + }); + } } } @@ -366,11 +396,12 @@ async fn process_and_propagate_new_ref( tip: Tip, new_block_ref: Arc, mut network_msg_box: MessageBox, + explorer_msg_box: Option>, ) -> Result<(), Error> { let header = new_block_ref.header().clone(); debug!(logger, "processing the new block and propagating"); - process_new_ref(logger, blockchain, tip, new_block_ref).await?; + process_new_ref(logger, blockchain, tip, new_block_ref, explorer_msg_box).await?; debug!(logger, "propagating block to the network"); network_msg_box @@ -406,6 +437,7 @@ async fn process_leadership_block( blockchain_tip, Arc::clone(&new_block_ref), network_msg_box, + explorer_msg_box.clone(), ) .await?; @@ -576,6 +608,7 @@ async fn process_network_blocks( blockchain_tip, Arc::clone(&new_block_ref), network_msg_box, + explorer_msg_box, ) .await?; diff --git a/jormungandr/src/explorer/mod.rs b/jormungandr/src/explorer/mod.rs index 03eadfb8e7..6252558326 100644 --- a/jormungandr/src/explorer/mod.rs +++ b/jormungandr/src/explorer/mod.rs @@ -37,13 +37,8 @@ pub struct Explorer { pub schema: Arc, } -struct Branch { - state_ref: multiverse::Ref, - length: ChainLength, -} - #[derive(Clone)] -struct Tip(Arc>); +struct Tip(Arc>); #[derive(Clone)] pub struct ExplorerDB { @@ -117,7 +112,7 @@ impl Explorer { .for_each(|input| async { match input { ExplorerMsg::NewBlock(block) => { - let mut explorer_db = self.db.clone(); + let explorer_db = self.db.clone(); let logger = info.logger().clone(); info.spawn_fallible("apply block", async move { explorer_db @@ -133,6 +128,13 @@ impl Explorer { .await }); } + ExplorerMsg::NewTip(hash) => { + let explorer_db = self.db.clone(); + info.spawn( + "apply block", + async move { explorer_db.set_tip(hash).await }, + ); + } } }) .await; @@ -192,16 +194,15 @@ impl ExplorerDB { let multiverse = Multiverse::::new(); let block0_id = block0.id(); - let initial_state_ref = multiverse + multiverse .insert(block0.chain_length(), block0_id, initial_state) .await; + let block0_id = block0.id(); + let bootstraped_db = ExplorerDB { multiverse, - longest_chain_tip: Tip::new(Branch { - state_ref: initial_state_ref, - length: block0.header.chain_length(), - }), + longest_chain_tip: Tip::new(block0.header.id()), blockchain_config, blockchain: blockchain.clone(), blockchain_tip, @@ -217,9 +218,9 @@ impl ExplorerDB { } }; - let mut db = stream + let db = stream .map_err(Error::from) - .try_fold(bootstraped_db, |mut db, block| async move { + .try_fold(bootstraped_db, |db, block| async move { db.apply_block(block).await?; Ok(db) }) @@ -254,12 +255,11 @@ impl ExplorerDB { /// chain length is greater than the current. /// This doesn't perform any validation on the given block and the previous state, it /// is assumed that the Block is valid - async fn apply_block(&mut self, block: Block) -> Result> { + async fn apply_block(&self, block: Block) -> Result> { let previous_block = block.header.block_parent_hash(); let chain_length = block.header.chain_length(); let block_id = block.header.hash(); let multiverse = self.multiverse.clone(); - let current_tip = self.longest_chain_tip.clone(); let discrimination = self.blockchain_config.discrimination; let previous_state = multiverse @@ -311,16 +311,14 @@ impl ExplorerDB { ) .await; - current_tip - .compare_and_replace(Branch { - state_ref: state_ref.clone(), - length: chain_length, - }) - .await; - Ok(state_ref) } + pub async fn set_tip(&self, hash: HeaderHash) { + let mut guard = self.longest_chain_tip.0.write().await; + *guard = hash; + } + pub async fn get_latest_block_hash(&self) -> HeaderHash { self.longest_chain_tip.get_block_id().await } @@ -812,23 +810,11 @@ impl BlockchainConfig { } impl Tip { - fn new(branch: Branch) -> Tip { - Tip(Arc::new(RwLock::new(branch))) - } - - async fn compare_and_replace(&self, other: Branch) { - let mut current = self.0.write().await; - - if other.length > (*current).length { - *current = Branch { - state_ref: other.state_ref, - length: other.length, - }; - } + fn new(block0_hash: HeaderHash) -> Tip { + Tip(Arc::new(RwLock::new(block0_hash))) } async fn get_block_id(&self) -> HeaderHash { - let guard = self.0.read().await; - *guard.state_ref.id() + self.0.read().await.clone() } } diff --git a/jormungandr/src/intercom.rs b/jormungandr/src/intercom.rs index bc5a9e4e33..147e7455d5 100644 --- a/jormungandr/src/intercom.rs +++ b/jormungandr/src/intercom.rs @@ -619,6 +619,7 @@ pub enum NetworkMsg { /// Messages to the explorer task pub enum ExplorerMsg { NewBlock(Block), + NewTip(HeaderHash), } #[cfg(test)] diff --git a/jormungandr/src/network/bootstrap.rs b/jormungandr/src/network/bootstrap.rs index 5939905b67..c0e441bf22 100644 --- a/jormungandr/src/network/bootstrap.rs +++ b/jormungandr/src/network/bootstrap.rs @@ -274,6 +274,7 @@ where &mut blockchain, branch.clone(), parent_tip.clone(), + None, ) .await { @@ -286,7 +287,7 @@ where } if let Some(parent_tip) = maybe_parent_tip { - blockchain::process_new_ref(&logger, &mut blockchain, branch, parent_tip) + blockchain::process_new_ref(&logger, &mut blockchain, branch, parent_tip, None) .await .map_err(Error::ChainSelectionFailed) } else { From 8f363c6156f9e7d11481d660ecffb0b4c37caaa7 Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini Date: Mon, 11 Jan 2021 15:12:28 -0300 Subject: [PATCH 2/2] fix clone copy lint --- jormungandr/src/explorer/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jormungandr/src/explorer/mod.rs b/jormungandr/src/explorer/mod.rs index 6252558326..d9d4a85a7d 100644 --- a/jormungandr/src/explorer/mod.rs +++ b/jormungandr/src/explorer/mod.rs @@ -815,6 +815,6 @@ impl Tip { } async fn get_block_id(&self) -> HeaderHash { - self.0.read().await.clone() + *self.0.read().await } }