Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notifier grpc implementation #3249

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion jormungandr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ chain-time = { git = "https://github.com/input-output-hk/chain-libs.git", b
chain-vote = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" }
cardano-legacy-address = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" }
imhamt = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" }

chain-watch = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "heterogeneous-client-api" }
arc-swap = "^1.1.0"
async-trait = "0.1.50"
async-graphql = "2.5.1"
Expand Down
61 changes: 59 additions & 2 deletions jormungandr/src/blockchain/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use super::{
use crate::{
blockcfg::{Block, FragmentId, Header, HeaderHash},
blockchain::Checkpoints,
intercom::{self, BlockMsg, ExplorerMsg, NetworkMsg, PropagateMsg, TransactionMsg},
intercom::{
self, BlockMsg, ExplorerMsg, NetworkMsg, NotifierMsg, PropagateMsg, TransactionMsg,
},
metrics::{Metrics, MetricsBackend},
network::p2p::Address,
utils::{
Expand Down Expand Up @@ -58,6 +60,7 @@ pub struct Process {
pub network_msgbox: MessageBox<NetworkMsg>,
pub fragment_msgbox: MessageBox<TransactionMsg>,
pub explorer_msgbox: Option<MessageBox<ExplorerMsg>>,
pub notifier_msgbox: MessageBox<NotifierMsg>,
pub garbage_collection_interval: Duration,
}

Expand Down Expand Up @@ -92,6 +95,7 @@ impl Process {
let blockchain_tip = self.blockchain_tip.clone();
let network_msg_box = self.network_msgbox.clone();
let explorer_msg_box = self.explorer_msgbox.clone();
let event_notifier_msg_box = self.notifier_msgbox.clone();
let tx_msg_box = self.fragment_msgbox.clone();
let stats_counter = self.stats_counter.clone();

Expand All @@ -117,6 +121,7 @@ impl Process {
tx_msg_box,
network_msg_box,
explorer_msg_box,
event_notifier_msg_box,
leadership_block,
stats_counter,
)
Expand Down Expand Up @@ -163,6 +168,7 @@ impl Process {
tx_msg_box,
network_msg_box,
explorer_msg_box,
event_notifier_msg_box,
get_next_block_scheduler,
handle,
stats_counter,
Expand Down Expand Up @@ -195,6 +201,8 @@ impl Process {
let explorer = self.explorer_msgbox.clone();
let tx_msg_box = self.fragment_msgbox.clone();

let notifier = self.notifier_msgbox.clone();

info.run_periodic_fallible(
"branch reprocessing",
BRANCH_REPROCESSING_INTERVAL,
Expand All @@ -204,6 +212,7 @@ impl Process {
tip.clone(),
explorer.clone(),
tx_msg_box.clone(),
notifier.clone(),
)
},
)
Expand Down Expand Up @@ -294,6 +303,7 @@ async fn reprocess_tip(
tip: Tip,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
tx_msg_box: MessageBox<TransactionMsg>,
notifier_msg_box: MessageBox<NotifierMsg>,
) -> Result<(), Error> {
let branches: Vec<Arc<Ref>> = blockchain.branches().branches().await;

Expand All @@ -311,6 +321,7 @@ async fn reprocess_tip(
Arc::clone(other),
explorer_msg_box.clone(),
Some(tx_msg_box.clone()),
Some(notifier_msg_box.clone()),
)
.await?
}
Expand All @@ -333,6 +344,7 @@ pub async fn process_new_ref(
candidate: Arc<Ref>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
mut tx_msg_box: Option<MessageBox<TransactionMsg>>,
notifier_msg_box: Option<MessageBox<NotifierMsg>>,
) -> Result<(), Error> {
let candidate_hash = candidate.hash();
let tip_ref = tip.get_ref().await;
Expand Down Expand Up @@ -411,6 +423,15 @@ pub async fn process_new_ref(
.await
.unwrap_or_else(|err| {
tracing::error!("cannot send new tip to explorer: {}", err)
})
}

if let Some(mut msg_box) = notifier_msg_box {
msg_box
.send(NotifierMsg::NewTip(candidate_hash))
.await
.unwrap_or_else(|err| {
tracing::error!("cannot notify new block to subscribers: {}", err)
});
}
}
Expand All @@ -426,15 +447,18 @@ async fn process_and_propagate_new_ref(
mut network_msg_box: MessageBox<NetworkMsg>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
tx_msg_box: MessageBox<TransactionMsg>,
notifier_msg_box: MessageBox<NotifierMsg>,
) -> chain::Result<()> {
let header = new_block_ref.header().clone();
tracing::debug!("processing the new block and propagating");

process_new_ref(
blockchain,
tip,
new_block_ref,
explorer_msg_box,
Some(tx_msg_box),
Some(notifier_msg_box),
)
.await?;

Expand All @@ -454,6 +478,7 @@ async fn process_leadership_block(
tx_msg_box: MessageBox<TransactionMsg>,
network_msg_box: MessageBox<NetworkMsg>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
mut notifier_msg_box: MessageBox<crate::notifier::Message>,
leadership_block: LeadershipBlock,
stats_counter: Metrics,
) -> chain::Result<()> {
Expand All @@ -467,12 +492,23 @@ async fn process_leadership_block(
network_msg_box,
explorer_msg_box.clone(),
tx_msg_box,
notifier_msg_box.clone(),
)
.await?;

// Track block as new new tip block
stats_counter.set_tip_block(&block, &new_block_ref);

if let Err(err) = notifier_msg_box
.send(NotifierMsg::NewBlock(block.clone()))
.await
{
tracing::error!(
"Cannot propagate block to blockchain event notifier: {}",
err
)
}

if let Some(mut msg_box) = explorer_msg_box {
msg_box.send(ExplorerMsg::NewBlock(block)).await?;
}
Expand Down Expand Up @@ -547,6 +583,7 @@ async fn process_network_blocks(
tx_msg_box: MessageBox<TransactionMsg>,
network_msg_box: MessageBox<NetworkMsg>,
mut explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
mut notifier_msg_box: MessageBox<NotifierMsg>,
mut get_next_block_scheduler: GetNextBlockScheduler,
handle: intercom::RequestStreamHandle<Block, ()>,
stats_counter: Metrics,
Expand All @@ -564,6 +601,7 @@ async fn process_network_blocks(
&blockchain,
block.clone(),
explorer_msg_box.as_mut(),
&mut notifier_msg_box,
&mut get_next_block_scheduler,
)
.await;
Expand Down Expand Up @@ -603,6 +641,7 @@ async fn process_network_blocks(
network_msg_box,
explorer_msg_box,
tx_msg_box,
notifier_msg_box,
)
.await?;

Expand All @@ -620,6 +659,7 @@ async fn process_network_block(
blockchain: &Blockchain,
block: Block,
explorer_msg_box: Option<&mut MessageBox<ExplorerMsg>>,
event_notifier_msg_box: &mut MessageBox<NotifierMsg>,
get_next_block_scheduler: &mut GetNextBlockScheduler,
) -> Result<Option<Arc<Ref>>, chain::Error> {
get_next_block_scheduler
Expand Down Expand Up @@ -650,7 +690,14 @@ async fn process_network_block(
Err(Error::MissingParentBlock(parent_hash))
}
PreCheckedHeader::HeaderWithCache { parent_ref, .. } => {
let r = check_and_apply_block(blockchain, parent_ref, block, explorer_msg_box).await;
let r = check_and_apply_block(
blockchain,
parent_ref,
block,
explorer_msg_box,
event_notifier_msg_box,
)
.await;
r
}
}
Expand All @@ -661,6 +708,7 @@ async fn check_and_apply_block(
parent_ref: Arc<Ref>,
block: Block,
explorer_msg_box: Option<&mut MessageBox<ExplorerMsg>>,
event_notifier_msg_box: &mut MessageBox<NotifierMsg>,
) -> Result<Option<Arc<Ref>>, chain::Error> {
let explorer_enabled = explorer_msg_box.is_some();
let post_checked = blockchain
Expand All @@ -679,6 +727,8 @@ async fn check_and_apply_block(
} else {
None
};
let block_for_subscribers = block.clone();

let applied_block = blockchain
.apply_and_store_block(post_checked, block)
.await?;
Expand All @@ -695,6 +745,13 @@ async fn check_and_apply_block(
.try_send(ExplorerMsg::NewBlock(block_for_explorer.take().unwrap()))
.unwrap_or_else(|err| tracing::error!("cannot add block to explorer: {}", err));
}

event_notifier_msg_box
.try_send(NotifierMsg::NewBlock(block_for_subscribers))
.unwrap_or_else(|err| {
tracing::error!("cannot notify new block to subscribers: {}", err)
});

Ok(Some(block_ref))
} else {
tracing::debug!(
Expand Down
2 changes: 2 additions & 0 deletions jormungandr/src/fragment/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ impl Logs {
pub fn insert_pending(&mut self, log: FragmentLog) -> bool {
assert!(log.is_pending());
let fragment_id = *log.fragment_id();

if self.entries.contains(&fragment_id) {
false
} else {
self.entries.put(fragment_id, (log, None));

true
}
}
Expand Down
7 changes: 7 additions & 0 deletions jormungandr/src/intercom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,5 +628,12 @@ pub enum ExplorerMsg {
NewTip(HeaderHash),
}

/// Messages to the notifier task
pub enum NotifierMsg {
NewBlock(Block),
NewTip(HeaderHash),
FragmentLog(FragmentId, FragmentStatus),
}

#[cfg(test)]
mod tests {}
21 changes: 21 additions & 0 deletions jormungandr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub mod leadership;
pub mod log;
pub mod metrics;
pub mod network;
pub mod notifier;
pub mod rest;
pub mod secure;
pub mod settings;
Expand Down Expand Up @@ -77,6 +78,7 @@ const NETWORK_TASK_QUEUE_LEN: usize = 64;
const EXPLORER_TASK_QUEUE_LEN: usize = 32;
const CLIENT_TASK_QUEUE_LEN: usize = 32;
const TOPOLOGY_TASK_QUEUE_LEN: usize = 32;
const NOTIFIER_TASK_QUEUE_LEN: usize = 32;
const BOOTSTRAP_RETRY_WAIT: Duration = Duration::from_secs(5);

fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::Error> {
Expand Down Expand Up @@ -148,12 +150,29 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
}
};

let (notifier_msgbox, notifier) = {
let (msgbox, queue) = async_msg::channel(NOTIFIER_TASK_QUEUE_LEN);

let blockchain_tip = blockchain_tip.clone();
let current_tip = block_on(async { blockchain_tip.get_ref().await.header().id() });

let (notifier, message_processor) =
notifier::Notifier::new(current_tip, blockchain.storage().clone());

services.spawn_future("notifier", move |info| async move {
message_processor.start(info, queue).await
});

(msgbox, notifier)
};

{
let blockchain = blockchain.clone();
let blockchain_tip = blockchain_tip.clone();
let network_msgbox = network_msgbox.clone();
let fragment_msgbox = fragment_msgbox.clone();
let explorer_msgbox = explorer.as_ref().map(|(msg_box, _context)| msg_box.clone());
let notifier_msgbox = notifier_msgbox.clone();
// TODO: we should get this value from the configuration
let block_cache_ttl: Duration = Duration::from_secs(120);
let stats_counter = stats_counter.clone();
Expand All @@ -165,6 +184,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
network_msgbox,
fragment_msgbox,
explorer_msgbox,
notifier_msgbox,
garbage_collection_interval: block_cache_ttl,
};
process.start(info, block_queue)
Expand Down Expand Up @@ -206,6 +226,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
global_state,
input: network_queue,
channels,
notification_service: notifier,
};
network::start(params)
});
Expand Down
3 changes: 2 additions & 1 deletion jormungandr/src/network/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ where
parent_tip.clone(),
None,
None,
None,
)
.await
{
Expand All @@ -284,7 +285,7 @@ where
}

if let Some(parent_tip) = maybe_parent_tip {
blockchain::process_new_ref(&mut blockchain, branch, parent_tip, None, None)
blockchain::process_new_ref(&mut blockchain, branch, parent_tip, None, None, None)
.await
.map_err(Error::ChainSelectionFailed)
} else {
Expand Down
2 changes: 2 additions & 0 deletions jormungandr/src/network/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub async fn run_listen_socket(
listen: &Listen,
state: GlobalStateR,
channels: Channels,
notification_service: crate::notifier::Notifier,
) -> Result<(), ListenError> {
let sockaddr = listen.address();
let span = span!(parent: &state.span, Level::TRACE, "listen_socket", local_addr = %sockaddr.to_string());
Expand All @@ -25,6 +26,7 @@ pub async fn run_listen_socket(
.concurrency_limit_per_connection(concurrency_limits::SERVER_REQUESTS)
.tcp_keepalive(Some(keepalive_durations::TCP))
.add_service(service)
.add_service(notification_service.into_server())
.serve(sockaddr)
.await
.map_err(|cause| ListenError { cause, sockaddr })
Expand Down
Loading