Skip to content

Commit

Permalink
Merge pull request #2903 from input-output-hk/send-new-tip-message-to…
Browse files Browse the repository at this point in the history
…-explorer

send new-tip message to explorer
  • Loading branch information
Mikhail Zabaluev committed Jan 25, 2021
2 parents ab08fb0 + 8f363c6 commit 3e470c2
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 43 deletions.
41 changes: 37 additions & 4 deletions jormungandr/src/blockchain/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,20 @@ impl Process {
fn start_branch_reprocessing(&self, info: &TokioServiceInfo) {
let tip = self.blockchain_tip.clone();
let blockchain = self.blockchain.clone();
let explorer = self.explorer_msgbox.clone();
let logger = info.logger().clone();

info.run_periodic_fallible(
"branch reprocessing",
BRANCH_REPROCESSING_INTERVAL,
move || reprocess_tip(logger.clone(), blockchain.clone(), tip.clone()),
move || {
reprocess_tip(
logger.clone(),
blockchain.clone(),
tip.clone(),
explorer.clone(),
)
},
)
}

Expand Down Expand Up @@ -279,7 +287,12 @@ fn try_request_fragment_removal(
/// this function will re-process the tip against the different branches
/// this is because a branch may have become more interesting with time
/// moving forward and branches may have been dismissed
async fn reprocess_tip(logger: Logger, mut blockchain: Blockchain, tip: Tip) -> Result<(), Error> {
async fn reprocess_tip(
logger: Logger,
mut blockchain: Blockchain,
tip: Tip,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
) -> Result<(), Error> {
let branches: Vec<Arc<Ref>> = blockchain.branches().branches().await;

let tip_as_ref = tip.get_ref().await;
Expand All @@ -290,7 +303,14 @@ async fn reprocess_tip(logger: Logger, mut blockchain: Blockchain, tip: Tip) ->
.collect::<Vec<_>>();

for other in others {
process_new_ref(&logger, &mut blockchain, tip.clone(), Arc::clone(other)).await?
process_new_ref(
&logger,
&mut blockchain,
tip.clone(),
Arc::clone(other),
explorer_msg_box.clone(),
)
.await?
}

Ok(())
Expand All @@ -310,6 +330,7 @@ pub async fn process_new_ref(
blockchain: &mut Blockchain,
mut tip: Tip,
candidate: Arc<Ref>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
) -> Result<(), Error> {
let candidate_hash = candidate.hash();
let tip_ref = tip.get_ref().await;
Expand Down Expand Up @@ -354,6 +375,15 @@ pub async fn process_new_ref(
let branch = blockchain.branches_mut().apply_or_create(candidate).await;
tip.swap(branch).await;
}

if let Some(mut msg_box) = explorer_msg_box {
msg_box
.send(ExplorerMsg::NewTip(candidate_hash))
.await
.unwrap_or_else(|err| {
error!(logger, "cannot send new tip to explorer: {}", err)
});
}
}
}

Expand All @@ -366,11 +396,12 @@ async fn process_and_propagate_new_ref(
tip: Tip,
new_block_ref: Arc<Ref>,
mut network_msg_box: MessageBox<NetworkMsg>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
) -> Result<(), Error> {
let header = new_block_ref.header().clone();
debug!(logger, "processing the new block and propagating");

process_new_ref(logger, blockchain, tip, new_block_ref).await?;
process_new_ref(logger, blockchain, tip, new_block_ref, explorer_msg_box).await?;

debug!(logger, "propagating block to the network");
network_msg_box
Expand Down Expand Up @@ -406,6 +437,7 @@ async fn process_leadership_block(
blockchain_tip,
Arc::clone(&new_block_ref),
network_msg_box,
explorer_msg_box.clone(),
)
.await?;

Expand Down Expand Up @@ -576,6 +608,7 @@ async fn process_network_blocks(
blockchain_tip,
Arc::clone(&new_block_ref),
network_msg_box,
explorer_msg_box,
)
.await?;

Expand Down
62 changes: 24 additions & 38 deletions jormungandr/src/explorer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,8 @@ pub struct Explorer {
pub db: ExplorerDB,
}

struct Branch {
state_ref: multiverse::Ref<State>,
length: ChainLength,
}

#[derive(Clone)]
struct Tip(Arc<RwLock<Branch>>);
struct Tip(Arc<RwLock<HeaderHash>>);

#[derive(Clone)]
pub struct ExplorerDB {
Expand Down Expand Up @@ -114,7 +109,7 @@ impl Explorer {
.for_each(|input| async {
match input {
ExplorerMsg::NewBlock(block) => {
let mut explorer_db = self.db.clone();
let explorer_db = self.db.clone();
let logger = info.logger().clone();
info.spawn_fallible("apply block", async move {
explorer_db
Expand All @@ -130,6 +125,13 @@ impl Explorer {
.await
});
}
ExplorerMsg::NewTip(hash) => {
let explorer_db = self.db.clone();
info.spawn(
"apply block",
async move { explorer_db.set_tip(hash).await },
);
}
}
})
.await;
Expand Down Expand Up @@ -189,16 +191,15 @@ impl ExplorerDB {

let multiverse = Multiverse::<State>::new();
let block0_id = block0.id();
let initial_state_ref = multiverse
multiverse
.insert(block0.chain_length(), block0_id, initial_state)
.await;

let block0_id = block0.id();

let bootstraped_db = ExplorerDB {
multiverse,
longest_chain_tip: Tip::new(Branch {
state_ref: initial_state_ref,
length: block0.header.chain_length(),
}),
longest_chain_tip: Tip::new(block0.header.id()),
blockchain_config,
blockchain: blockchain.clone(),
blockchain_tip,
Expand All @@ -214,9 +215,9 @@ impl ExplorerDB {
}
};

let mut db = stream
let db = stream
.map_err(Error::from)
.try_fold(bootstraped_db, |mut db, block| async move {
.try_fold(bootstraped_db, |db, block| async move {
db.apply_block(block).await?;
Ok(db)
})
Expand Down Expand Up @@ -251,12 +252,11 @@ impl ExplorerDB {
/// chain length is greater than the current.
/// This doesn't perform any validation on the given block and the previous state, it
/// is assumed that the Block is valid
async fn apply_block(&mut self, block: Block) -> Result<multiverse::Ref<State>> {
async fn apply_block(&self, block: Block) -> Result<multiverse::Ref<State>> {
let previous_block = block.header.block_parent_hash();
let chain_length = block.header.chain_length();
let block_id = block.header.hash();
let multiverse = self.multiverse.clone();
let current_tip = self.longest_chain_tip.clone();
let discrimination = self.blockchain_config.discrimination;

let previous_state = multiverse
Expand Down Expand Up @@ -308,16 +308,14 @@ impl ExplorerDB {
)
.await;

current_tip
.compare_and_replace(Branch {
state_ref: state_ref.clone(),
length: chain_length,
})
.await;

Ok(state_ref)
}

pub async fn set_tip(&self, hash: HeaderHash) {
let mut guard = self.longest_chain_tip.0.write().await;
*guard = hash;
}

pub async fn get_latest_block_hash(&self) -> HeaderHash {
self.longest_chain_tip.get_block_id().await
}
Expand Down Expand Up @@ -809,23 +807,11 @@ impl BlockchainConfig {
}

impl Tip {
fn new(branch: Branch) -> Tip {
Tip(Arc::new(RwLock::new(branch)))
}

async fn compare_and_replace(&self, other: Branch) {
let mut current = self.0.write().await;

if other.length > (*current).length {
*current = Branch {
state_ref: other.state_ref,
length: other.length,
};
}
fn new(block0_hash: HeaderHash) -> Tip {
Tip(Arc::new(RwLock::new(block0_hash)))
}

async fn get_block_id(&self) -> HeaderHash {
let guard = self.0.read().await;
*guard.state_ref.id()
*self.0.read().await
}
}
1 change: 1 addition & 0 deletions jormungandr/src/intercom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ pub enum NetworkMsg {
/// Messages to the explorer task
pub enum ExplorerMsg {
NewBlock(Block),
NewTip(HeaderHash),
}

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion jormungandr/src/network/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ where
&mut blockchain,
branch.clone(),
parent_tip.clone(),
None,
)
.await
{
Expand All @@ -286,7 +287,7 @@ where
}

if let Some(parent_tip) = maybe_parent_tip {
blockchain::process_new_ref(&logger, &mut blockchain, branch, parent_tip)
blockchain::process_new_ref(&logger, &mut blockchain, branch, parent_tip, None)
.await
.map_err(Error::ChainSelectionFailed)
} else {
Expand Down

0 comments on commit 3e470c2

Please sign in to comment.