Skip to content

Commit

Permalink
basic grpc-based subscription re-implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
ecioppettini committed Sep 11, 2021
1 parent 5cb6430 commit 631ca8a
Show file tree
Hide file tree
Showing 10 changed files with 369 additions and 13 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion jormungandr/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ chain-time = { git = "https://github.com/input-output-hk/chain-libs.git", b
chain-vote = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" }
cardano-legacy-address = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" }
imhamt = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" }

chain-watch = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "heterogeneous-client-api" }
arc-swap = "^1.1.0"
async-trait = "0.1.50"
async-graphql = "2.5.1"
Expand Down
61 changes: 59 additions & 2 deletions jormungandr/src/blockchain/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use super::{
use crate::{
blockcfg::{Block, FragmentId, Header, HeaderHash},
blockchain::Checkpoints,
intercom::{self, BlockMsg, ExplorerMsg, NetworkMsg, PropagateMsg, TransactionMsg},
intercom::{
self, BlockMsg, ExplorerMsg, NetworkMsg, NotifierMsg, PropagateMsg, TransactionMsg,
},
metrics::{Metrics, MetricsBackend},
network::p2p::Address,
utils::{
Expand Down Expand Up @@ -58,6 +60,7 @@ pub struct Process {
pub network_msgbox: MessageBox<NetworkMsg>,
pub fragment_msgbox: MessageBox<TransactionMsg>,
pub explorer_msgbox: Option<MessageBox<ExplorerMsg>>,
pub notifier_msgbox: MessageBox<NotifierMsg>,
pub garbage_collection_interval: Duration,
}

Expand Down Expand Up @@ -92,6 +95,7 @@ impl Process {
let blockchain_tip = self.blockchain_tip.clone();
let network_msg_box = self.network_msgbox.clone();
let explorer_msg_box = self.explorer_msgbox.clone();
let event_notifier_msg_box = self.notifier_msgbox.clone();
let tx_msg_box = self.fragment_msgbox.clone();
let stats_counter = self.stats_counter.clone();

Expand All @@ -117,6 +121,7 @@ impl Process {
tx_msg_box,
network_msg_box,
explorer_msg_box,
event_notifier_msg_box,
leadership_block,
stats_counter,
)
Expand Down Expand Up @@ -163,6 +168,7 @@ impl Process {
tx_msg_box,
network_msg_box,
explorer_msg_box,
event_notifier_msg_box,
get_next_block_scheduler,
handle,
stats_counter,
Expand Down Expand Up @@ -195,6 +201,8 @@ impl Process {
let explorer = self.explorer_msgbox.clone();
let tx_msg_box = self.fragment_msgbox.clone();

let notifier = self.notifier_msgbox.clone();

info.run_periodic_fallible(
"branch reprocessing",
BRANCH_REPROCESSING_INTERVAL,
Expand All @@ -204,6 +212,7 @@ impl Process {
tip.clone(),
explorer.clone(),
tx_msg_box.clone(),
notifier.clone(),
)
},
)
Expand Down Expand Up @@ -294,6 +303,7 @@ async fn reprocess_tip(
tip: Tip,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
tx_msg_box: MessageBox<TransactionMsg>,
notifier_msg_box: MessageBox<NotifierMsg>,
) -> Result<(), Error> {
let branches: Vec<Arc<Ref>> = blockchain.branches().branches().await;

Expand All @@ -311,6 +321,7 @@ async fn reprocess_tip(
Arc::clone(other),
explorer_msg_box.clone(),
Some(tx_msg_box.clone()),
Some(notifier_msg_box.clone()),
)
.await?
}
Expand All @@ -333,6 +344,7 @@ pub async fn process_new_ref(
candidate: Arc<Ref>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
mut tx_msg_box: Option<MessageBox<TransactionMsg>>,
notifier_msg_box: Option<MessageBox<NotifierMsg>>,
) -> Result<(), Error> {
let candidate_hash = candidate.hash();
let tip_ref = tip.get_ref().await;
Expand Down Expand Up @@ -411,6 +423,15 @@ pub async fn process_new_ref(
.await
.unwrap_or_else(|err| {
tracing::error!("cannot send new tip to explorer: {}", err)
})
}

if let Some(mut msg_box) = notifier_msg_box {
msg_box
.send(NotifierMsg::NewTip(candidate_hash))
.await
.unwrap_or_else(|err| {
tracing::error!("cannot notify new block to subscribers: {}", err)
});
}
}
Expand All @@ -426,15 +447,18 @@ async fn process_and_propagate_new_ref(
mut network_msg_box: MessageBox<NetworkMsg>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
tx_msg_box: MessageBox<TransactionMsg>,
notifier_msg_box: MessageBox<NotifierMsg>,
) -> chain::Result<()> {
let header = new_block_ref.header().clone();
tracing::debug!("processing the new block and propagating");

process_new_ref(
blockchain,
tip,
new_block_ref,
explorer_msg_box,
Some(tx_msg_box),
Some(notifier_msg_box),
)
.await?;

Expand All @@ -454,6 +478,7 @@ async fn process_leadership_block(
tx_msg_box: MessageBox<TransactionMsg>,
network_msg_box: MessageBox<NetworkMsg>,
explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
mut notifier_msg_box: MessageBox<crate::notifier::Message>,
leadership_block: LeadershipBlock,
stats_counter: Metrics,
) -> chain::Result<()> {
Expand All @@ -467,12 +492,23 @@ async fn process_leadership_block(
network_msg_box,
explorer_msg_box.clone(),
tx_msg_box,
notifier_msg_box.clone(),
)
.await?;

// Track block as new new tip block
stats_counter.set_tip_block(&block, &new_block_ref);

if let Err(err) = notifier_msg_box
.send(NotifierMsg::NewBlock(block.clone()))
.await
{
tracing::error!(
"Cannot propagate block to blockchain event notifier: {}",
err
)
}

if let Some(mut msg_box) = explorer_msg_box {
msg_box.send(ExplorerMsg::NewBlock(block)).await?;
}
Expand Down Expand Up @@ -547,6 +583,7 @@ async fn process_network_blocks(
tx_msg_box: MessageBox<TransactionMsg>,
network_msg_box: MessageBox<NetworkMsg>,
mut explorer_msg_box: Option<MessageBox<ExplorerMsg>>,
mut notifier_msg_box: MessageBox<NotifierMsg>,
mut get_next_block_scheduler: GetNextBlockScheduler,
handle: intercom::RequestStreamHandle<Block, ()>,
stats_counter: Metrics,
Expand All @@ -564,6 +601,7 @@ async fn process_network_blocks(
&blockchain,
block.clone(),
explorer_msg_box.as_mut(),
&mut notifier_msg_box,
&mut get_next_block_scheduler,
)
.await;
Expand Down Expand Up @@ -603,6 +641,7 @@ async fn process_network_blocks(
network_msg_box,
explorer_msg_box,
tx_msg_box,
notifier_msg_box,
)
.await?;

Expand All @@ -620,6 +659,7 @@ async fn process_network_block(
blockchain: &Blockchain,
block: Block,
explorer_msg_box: Option<&mut MessageBox<ExplorerMsg>>,
event_notifier_msg_box: &mut MessageBox<NotifierMsg>,
get_next_block_scheduler: &mut GetNextBlockScheduler,
) -> Result<Option<Arc<Ref>>, chain::Error> {
get_next_block_scheduler
Expand Down Expand Up @@ -650,7 +690,14 @@ async fn process_network_block(
Err(Error::MissingParentBlock(parent_hash))
}
PreCheckedHeader::HeaderWithCache { parent_ref, .. } => {
let r = check_and_apply_block(blockchain, parent_ref, block, explorer_msg_box).await;
let r = check_and_apply_block(
blockchain,
parent_ref,
block,
explorer_msg_box,
event_notifier_msg_box,
)
.await;
r
}
}
Expand All @@ -661,6 +708,7 @@ async fn check_and_apply_block(
parent_ref: Arc<Ref>,
block: Block,
explorer_msg_box: Option<&mut MessageBox<ExplorerMsg>>,
event_notifier_msg_box: &mut MessageBox<NotifierMsg>,
) -> Result<Option<Arc<Ref>>, chain::Error> {
let explorer_enabled = explorer_msg_box.is_some();
let post_checked = blockchain
Expand All @@ -679,6 +727,8 @@ async fn check_and_apply_block(
} else {
None
};
let block_for_subscribers = block.clone();

let applied_block = blockchain
.apply_and_store_block(post_checked, block)
.await?;
Expand All @@ -695,6 +745,13 @@ async fn check_and_apply_block(
.try_send(ExplorerMsg::NewBlock(block_for_explorer.take().unwrap()))
.unwrap_or_else(|err| tracing::error!("cannot add block to explorer: {}", err));
}

event_notifier_msg_box
.try_send(NotifierMsg::NewBlock(block_for_subscribers))
.unwrap_or_else(|err| {
tracing::error!("cannot notify new block to subscribers: {}", err)
});

Ok(Some(block_ref))
} else {
tracing::debug!(
Expand Down
2 changes: 2 additions & 0 deletions jormungandr/src/fragment/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ impl Logs {
pub fn insert_pending(&mut self, log: FragmentLog) -> bool {
assert!(log.is_pending());
let fragment_id = *log.fragment_id();

if self.entries.contains(&fragment_id) {
false
} else {
self.entries.put(fragment_id, (log, None));

true
}
}
Expand Down
7 changes: 7 additions & 0 deletions jormungandr/src/intercom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,5 +628,12 @@ pub enum ExplorerMsg {
NewTip(HeaderHash),
}

/// Messages to the notifier task
pub enum NotifierMsg {
NewBlock(Block),
NewTip(HeaderHash),
FragmentLog(FragmentId, FragmentStatus),
}

#[cfg(test)]
mod tests {}
21 changes: 21 additions & 0 deletions jormungandr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub mod leadership;
pub mod log;
pub mod metrics;
pub mod network;
pub mod notifier;
pub mod rest;
pub mod secure;
pub mod settings;
Expand Down Expand Up @@ -77,6 +78,7 @@ const NETWORK_TASK_QUEUE_LEN: usize = 64;
const EXPLORER_TASK_QUEUE_LEN: usize = 32;
const CLIENT_TASK_QUEUE_LEN: usize = 32;
const TOPOLOGY_TASK_QUEUE_LEN: usize = 32;
const NOTIFIER_TASK_QUEUE_LEN: usize = 32;
const BOOTSTRAP_RETRY_WAIT: Duration = Duration::from_secs(5);

fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::Error> {
Expand Down Expand Up @@ -148,12 +150,29 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
}
};

let (notifier_msgbox, notifier) = {
let (msgbox, queue) = async_msg::channel(NOTIFIER_TASK_QUEUE_LEN);

let blockchain_tip = blockchain_tip.clone();
let current_tip = block_on(async { blockchain_tip.get_ref().await.header().id() });

let (notifier, message_processor) =
notifier::Notifier::new(current_tip, blockchain.storage().clone());

services.spawn_future("notifier", move |info| async move {
message_processor.start(info, queue).await
});

(msgbox, notifier)
};

{
let blockchain = blockchain.clone();
let blockchain_tip = blockchain_tip.clone();
let network_msgbox = network_msgbox.clone();
let fragment_msgbox = fragment_msgbox.clone();
let explorer_msgbox = explorer.as_ref().map(|(msg_box, _context)| msg_box.clone());
let notifier_msgbox = notifier_msgbox.clone();
// TODO: we should get this value from the configuration
let block_cache_ttl: Duration = Duration::from_secs(120);
let stats_counter = stats_counter.clone();
Expand All @@ -165,6 +184,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
network_msgbox,
fragment_msgbox,
explorer_msgbox,
notifier_msgbox,
garbage_collection_interval: block_cache_ttl,
};
process.start(info, block_queue)
Expand Down Expand Up @@ -206,6 +226,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
global_state,
input: network_queue,
channels,
notification_service: notifier,
};
network::start(params)
});
Expand Down
3 changes: 2 additions & 1 deletion jormungandr/src/network/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ where
parent_tip.clone(),
None,
None,
None,
)
.await
{
Expand All @@ -284,7 +285,7 @@ where
}

if let Some(parent_tip) = maybe_parent_tip {
blockchain::process_new_ref(&mut blockchain, branch, parent_tip, None, None)
blockchain::process_new_ref(&mut blockchain, branch, parent_tip, None, None, None)
.await
.map_err(Error::ChainSelectionFailed)
} else {
Expand Down
2 changes: 2 additions & 0 deletions jormungandr/src/network/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub async fn run_listen_socket(
listen: &Listen,
state: GlobalStateR,
channels: Channels,
notification_service: crate::notifier::Notifier,
) -> Result<(), ListenError> {
let sockaddr = listen.address();
let span = span!(parent: &state.span, Level::TRACE, "listen_socket", local_addr = %sockaddr.to_string());
Expand All @@ -25,6 +26,7 @@ pub async fn run_listen_socket(
.concurrency_limit_per_connection(concurrency_limits::SERVER_REQUESTS)
.tcp_keepalive(Some(keepalive_durations::TCP))
.add_service(service)
.add_service(notification_service.into_server())
.serve(sockaddr)
.await
.map_err(|cause| ListenError { cause, sockaddr })
Expand Down
Loading

0 comments on commit 631ca8a

Please sign in to comment.