Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send new-tip message to explorer #2903

Merged
merged 2 commits into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions jormungandr/src/blockchain/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,20 @@ impl Process {
fn start_branch_reprocessing(&self, info: &TokioServiceInfo) {
let tip = self.blockchain_tip.clone();
let blockchain = self.blockchain.clone();
let explorer = self.explorer_msgbox.clone();
let logger = info.logger().clone();

info.run_periodic_fallible(
"branch reprocessing",
BRANCH_REPROCESSING_INTERVAL,
move || reprocess_tip(logger.clone(), blockchain.clone(), tip.clone()),
move || {
reprocess_tip(
logger.clone(),
blockchain.clone(),
tip.clone(),
explorer.clone(),
)
},
)
}

Expand Down Expand Up @@ -279,7 +287,12 @@ fn try_request_fragment_removal(
/// this function will re-process the tip against the different branches
/// this is because a branch may have become more interesting with time
/// moving forward and branches may have been dismissed
async fn reprocess_tip(logger: Logger, mut blockchain: Blockchain, tip: Tip) -> Result<(), Error> {
async fn reprocess_tip(
logger: Logger,
mut blockchain: Blockchain,
tip: Tip,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
) -> Result<(), Error> {
let branches: Vec<Arc<Ref>> = blockchain.branches().branches().await;

let tip_as_ref = tip.get_ref().await;
Expand All @@ -290,7 +303,14 @@ async fn reprocess_tip(logger: Logger, mut blockchain: Blockchain, tip: Tip) ->
.collect::<Vec<_>>();

for other in others {
process_new_ref(&logger, &mut blockchain, tip.clone(), Arc::clone(other)).await?
process_new_ref(
&logger,
&mut blockchain,
tip.clone(),
Arc::clone(other),
explorer_msg_box.clone(),
)
.await?
}

Ok(())
Expand All @@ -310,6 +330,7 @@ pub async fn process_new_ref(
blockchain: &mut Blockchain,
mut tip: Tip,
candidate: Arc<Ref>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
) -> Result<(), Error> {
let candidate_hash = candidate.hash();
let tip_ref = tip.get_ref().await;
Expand Down Expand Up @@ -354,6 +375,15 @@ pub async fn process_new_ref(
let branch = blockchain.branches_mut().apply_or_create(candidate).await;
tip.swap(branch).await;
}

if let Some(mut msg_box) = explorer_msg_box {
msg_box
.send(ExplorerMsg::NewTip(candidate_hash))
.await
.unwrap_or_else(|err| {
error!(logger, "cannot send new tip to explorer: {}", err)
});
}
}
}

Expand All @@ -366,11 +396,12 @@ async fn process_and_propagate_new_ref(
tip: Tip,
new_block_ref: Arc<Ref>,
mut network_msg_box: MessageBox<NetworkMsg>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
) -> Result<(), Error> {
let header = new_block_ref.header().clone();
debug!(logger, "processing the new block and propagating");

process_new_ref(logger, blockchain, tip, new_block_ref).await?;
process_new_ref(logger, blockchain, tip, new_block_ref, explorer_msg_box).await?;

debug!(logger, "propagating block to the network");
network_msg_box
Expand Down Expand Up @@ -406,6 +437,7 @@ async fn process_leadership_block(
blockchain_tip,
Arc::clone(&new_block_ref),
network_msg_box,
explorer_msg_box.clone(),
)
.await?;

Expand Down Expand Up @@ -576,6 +608,7 @@ async fn process_network_blocks(
blockchain_tip,
Arc::clone(&new_block_ref),
network_msg_box,
explorer_msg_box,
)
.await?;

Expand Down
62 changes: 24 additions & 38 deletions jormungandr/src/explorer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,8 @@ pub struct Explorer {
pub schema: Arc<graphql::Schema>,
}

struct Branch {
state_ref: multiverse::Ref<State>,
length: ChainLength,
}

#[derive(Clone)]
struct Tip(Arc<RwLock<Branch>>);
struct Tip(Arc<RwLock<HeaderHash>>);

#[derive(Clone)]
pub struct ExplorerDB {
Expand Down Expand Up @@ -117,7 +112,7 @@ impl Explorer {
.for_each(|input| async {
match input {
ExplorerMsg::NewBlock(block) => {
let mut explorer_db = self.db.clone();
let explorer_db = self.db.clone();
let logger = info.logger().clone();
info.spawn_fallible("apply block", async move {
explorer_db
Expand All @@ -133,6 +128,13 @@ impl Explorer {
.await
});
}
ExplorerMsg::NewTip(hash) => {
let explorer_db = self.db.clone();
info.spawn(
"apply block",
async move { explorer_db.set_tip(hash).await },
);
}
}
})
.await;
Expand Down Expand Up @@ -192,16 +194,15 @@ impl ExplorerDB {

let multiverse = Multiverse::<State>::new();
let block0_id = block0.id();
let initial_state_ref = multiverse
multiverse
.insert(block0.chain_length(), block0_id, initial_state)
.await;

let block0_id = block0.id();

let bootstraped_db = ExplorerDB {
multiverse,
longest_chain_tip: Tip::new(Branch {
state_ref: initial_state_ref,
length: block0.header.chain_length(),
}),
longest_chain_tip: Tip::new(block0.header.id()),
blockchain_config,
blockchain: blockchain.clone(),
blockchain_tip,
Expand All @@ -217,9 +218,9 @@ impl ExplorerDB {
}
};

let mut db = stream
let db = stream
.map_err(Error::from)
.try_fold(bootstraped_db, |mut db, block| async move {
.try_fold(bootstraped_db, |db, block| async move {
db.apply_block(block).await?;
Ok(db)
})
Expand Down Expand Up @@ -254,12 +255,11 @@ impl ExplorerDB {
/// chain length is greater than the current.
/// This doesn't perform any validation on the given block and the previous state, it
/// is assumed that the Block is valid
async fn apply_block(&mut self, block: Block) -> Result<multiverse::Ref<State>> {
async fn apply_block(&self, block: Block) -> Result<multiverse::Ref<State>> {
let previous_block = block.header.block_parent_hash();
let chain_length = block.header.chain_length();
let block_id = block.header.hash();
let multiverse = self.multiverse.clone();
let current_tip = self.longest_chain_tip.clone();
let discrimination = self.blockchain_config.discrimination;

let previous_state = multiverse
Expand Down Expand Up @@ -311,16 +311,14 @@ impl ExplorerDB {
)
.await;

current_tip
.compare_and_replace(Branch {
state_ref: state_ref.clone(),
length: chain_length,
})
.await;

Ok(state_ref)
}

pub async fn set_tip(&self, hash: HeaderHash) {
let mut guard = self.longest_chain_tip.0.write().await;
*guard = hash;
}

pub async fn get_latest_block_hash(&self) -> HeaderHash {
self.longest_chain_tip.get_block_id().await
}
Expand Down Expand Up @@ -812,23 +810,11 @@ impl BlockchainConfig {
}

impl Tip {
fn new(branch: Branch) -> Tip {
Tip(Arc::new(RwLock::new(branch)))
}

async fn compare_and_replace(&self, other: Branch) {
let mut current = self.0.write().await;

if other.length > (*current).length {
*current = Branch {
state_ref: other.state_ref,
length: other.length,
};
}
fn new(block0_hash: HeaderHash) -> Tip {
Tip(Arc::new(RwLock::new(block0_hash)))
}

async fn get_block_id(&self) -> HeaderHash {
let guard = self.0.read().await;
*guard.state_ref.id()
*self.0.read().await
}
}
1 change: 1 addition & 0 deletions jormungandr/src/intercom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ pub enum NetworkMsg {
/// Messages to the explorer task
pub enum ExplorerMsg {
NewBlock(Block),
NewTip(HeaderHash),
}

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion jormungandr/src/network/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ where
&mut blockchain,
branch.clone(),
parent_tip.clone(),
None,
)
.await
{
Expand All @@ -286,7 +287,7 @@ where
}

if let Some(parent_tip) = maybe_parent_tip {
blockchain::process_new_ref(&logger, &mut blockchain, branch, parent_tip)
blockchain::process_new_ref(&logger, &mut blockchain, branch, parent_tip, None)
.await
.map_err(Error::ChainSelectionFailed)
} else {
Expand Down