diff --git a/Cargo.lock b/Cargo.lock index bf01db2794..512a1bda73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -588,7 +588,7 @@ name = "chain-vote" version = "0.1.0" source = "git+https://github.com/input-output-hk/chain-libs.git?branch=master#8bdcd2c8369e120a80ec5b40846632c5d295c1f6" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "chain-core", "chain-crypto", "const_format", @@ -599,6 +599,15 @@ dependencies = [ "thiserror", ] +[[package]] +name = "chain-watch" +version = "0.1.0" +dependencies = [ + "prost", + "tonic", + "tonic-build", +] + [[package]] name = "chrono" version = "0.4.19" @@ -1867,6 +1876,7 @@ dependencies = [ "chain-storage", "chain-time", "chain-vote", + "chain-watch", "chrono", "futures", "hex", diff --git a/jormungandr/Cargo.toml b/jormungandr/Cargo.toml index 2c37de9404..e4c3f4d87e 100644 --- a/jormungandr/Cargo.toml +++ b/jormungandr/Cargo.toml @@ -22,7 +22,7 @@ chain-time = { git = "https://github.com/input-output-hk/chain-libs.git", b chain-vote = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" } cardano-legacy-address = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" } imhamt = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "master" } - +chain-watch = { git = "https://github.com/input-output-hk/chain-libs.git", branch = "heterogeneous-client-api" } arc-swap = "^1.1.0" async-trait = "0.1.50" async-graphql = "2.5.1" diff --git a/jormungandr/src/blockchain/process.rs b/jormungandr/src/blockchain/process.rs index 75a9c1cd57..e91a44ad62 100644 --- a/jormungandr/src/blockchain/process.rs +++ b/jormungandr/src/blockchain/process.rs @@ -7,7 +7,9 @@ use super::{ use crate::{ blockcfg::{Block, FragmentId, Header, HeaderHash}, blockchain::Checkpoints, - intercom::{self, BlockMsg, ExplorerMsg, NetworkMsg, PropagateMsg, TransactionMsg}, + intercom::{ + self, BlockMsg, ExplorerMsg, NetworkMsg, NotifierMsg, PropagateMsg, TransactionMsg, + }, metrics::{Metrics, MetricsBackend}, network::p2p::Address, utils::{ @@ -58,6 +60,7 @@ pub struct Process { pub network_msgbox: MessageBox, pub fragment_msgbox: MessageBox, pub explorer_msgbox: Option>, + pub notifier_msgbox: MessageBox, pub garbage_collection_interval: Duration, } @@ -92,6 +95,7 @@ impl Process { let blockchain_tip = self.blockchain_tip.clone(); let network_msg_box = self.network_msgbox.clone(); let explorer_msg_box = self.explorer_msgbox.clone(); + let event_notifier_msg_box = self.notifier_msgbox.clone(); let tx_msg_box = self.fragment_msgbox.clone(); let stats_counter = self.stats_counter.clone(); @@ -117,6 +121,7 @@ impl Process { tx_msg_box, network_msg_box, explorer_msg_box, + event_notifier_msg_box, leadership_block, stats_counter, ) @@ -163,6 +168,7 @@ impl Process { tx_msg_box, network_msg_box, explorer_msg_box, + event_notifier_msg_box, get_next_block_scheduler, handle, stats_counter, @@ -195,6 +201,8 @@ impl Process { let explorer = self.explorer_msgbox.clone(); let tx_msg_box = self.fragment_msgbox.clone(); + let notifier = self.notifier_msgbox.clone(); + info.run_periodic_fallible( "branch reprocessing", BRANCH_REPROCESSING_INTERVAL, @@ -204,6 +212,7 @@ impl Process { tip.clone(), explorer.clone(), tx_msg_box.clone(), + notifier.clone(), ) }, ) @@ -294,6 +303,7 @@ async fn reprocess_tip( tip: Tip, explorer_msg_box: Option>, tx_msg_box: MessageBox, + notifier_msg_box: MessageBox, ) -> Result<(), Error> { let branches: Vec> = blockchain.branches().branches().await; @@ -311,6 +321,7 @@ async fn reprocess_tip( Arc::clone(other), explorer_msg_box.clone(), Some(tx_msg_box.clone()), + Some(notifier_msg_box.clone()), ) .await? } @@ -333,6 +344,7 @@ pub async fn process_new_ref( candidate: Arc, explorer_msg_box: Option>, mut tx_msg_box: Option>, + notifier_msg_box: Option>, ) -> Result<(), Error> { let candidate_hash = candidate.hash(); let tip_ref = tip.get_ref().await; @@ -411,6 +423,15 @@ pub async fn process_new_ref( .await .unwrap_or_else(|err| { tracing::error!("cannot send new tip to explorer: {}", err) + }) + } + + if let Some(mut msg_box) = notifier_msg_box { + msg_box + .send(NotifierMsg::NewTip(candidate_hash)) + .await + .unwrap_or_else(|err| { + tracing::error!("cannot notify new block to subscribers: {}", err) }); } } @@ -426,15 +447,18 @@ async fn process_and_propagate_new_ref( mut network_msg_box: MessageBox, explorer_msg_box: Option>, tx_msg_box: MessageBox, + notifier_msg_box: MessageBox, ) -> chain::Result<()> { let header = new_block_ref.header().clone(); tracing::debug!("processing the new block and propagating"); + process_new_ref( blockchain, tip, new_block_ref, explorer_msg_box, Some(tx_msg_box), + Some(notifier_msg_box), ) .await?; @@ -454,6 +478,7 @@ async fn process_leadership_block( tx_msg_box: MessageBox, network_msg_box: MessageBox, explorer_msg_box: Option>, + mut notifier_msg_box: MessageBox, leadership_block: LeadershipBlock, stats_counter: Metrics, ) -> chain::Result<()> { @@ -467,12 +492,23 @@ async fn process_leadership_block( network_msg_box, explorer_msg_box.clone(), tx_msg_box, + notifier_msg_box.clone(), ) .await?; // Track block as new new tip block stats_counter.set_tip_block(&block, &new_block_ref); + if let Err(err) = notifier_msg_box + .send(NotifierMsg::NewBlock(block.clone())) + .await + { + tracing::error!( + "Cannot propagate block to blockchain event notifier: {}", + err + ) + } + if let Some(mut msg_box) = explorer_msg_box { msg_box.send(ExplorerMsg::NewBlock(block)).await?; } @@ -547,6 +583,7 @@ async fn process_network_blocks( tx_msg_box: MessageBox, network_msg_box: MessageBox, mut explorer_msg_box: Option>, + mut notifier_msg_box: MessageBox, mut get_next_block_scheduler: GetNextBlockScheduler, handle: intercom::RequestStreamHandle, stats_counter: Metrics, @@ -564,6 +601,7 @@ async fn process_network_blocks( &blockchain, block.clone(), explorer_msg_box.as_mut(), + &mut notifier_msg_box, &mut get_next_block_scheduler, ) .await; @@ -603,6 +641,7 @@ async fn process_network_blocks( network_msg_box, explorer_msg_box, tx_msg_box, + notifier_msg_box, ) .await?; @@ -620,6 +659,7 @@ async fn process_network_block( blockchain: &Blockchain, block: Block, explorer_msg_box: Option<&mut MessageBox>, + event_notifier_msg_box: &mut MessageBox, get_next_block_scheduler: &mut GetNextBlockScheduler, ) -> Result>, chain::Error> { get_next_block_scheduler @@ -650,7 +690,14 @@ async fn process_network_block( Err(Error::MissingParentBlock(parent_hash)) } PreCheckedHeader::HeaderWithCache { parent_ref, .. } => { - let r = check_and_apply_block(blockchain, parent_ref, block, explorer_msg_box).await; + let r = check_and_apply_block( + blockchain, + parent_ref, + block, + explorer_msg_box, + event_notifier_msg_box, + ) + .await; r } } @@ -661,6 +708,7 @@ async fn check_and_apply_block( parent_ref: Arc, block: Block, explorer_msg_box: Option<&mut MessageBox>, + event_notifier_msg_box: &mut MessageBox, ) -> Result>, chain::Error> { let explorer_enabled = explorer_msg_box.is_some(); let post_checked = blockchain @@ -679,6 +727,8 @@ async fn check_and_apply_block( } else { None }; + let block_for_subscribers = block.clone(); + let applied_block = blockchain .apply_and_store_block(post_checked, block) .await?; @@ -695,6 +745,13 @@ async fn check_and_apply_block( .try_send(ExplorerMsg::NewBlock(block_for_explorer.take().unwrap())) .unwrap_or_else(|err| tracing::error!("cannot add block to explorer: {}", err)); } + + event_notifier_msg_box + .try_send(NotifierMsg::NewBlock(block_for_subscribers)) + .unwrap_or_else(|err| { + tracing::error!("cannot notify new block to subscribers: {}", err) + }); + Ok(Some(block_ref)) } else { tracing::debug!( diff --git a/jormungandr/src/fragment/logs.rs b/jormungandr/src/fragment/logs.rs index 58ef03dd49..1c17580216 100644 --- a/jormungandr/src/fragment/logs.rs +++ b/jormungandr/src/fragment/logs.rs @@ -33,10 +33,12 @@ impl Logs { pub fn insert_pending(&mut self, log: FragmentLog) -> bool { assert!(log.is_pending()); let fragment_id = *log.fragment_id(); + if self.entries.contains(&fragment_id) { false } else { self.entries.put(fragment_id, (log, None)); + true } } diff --git a/jormungandr/src/intercom.rs b/jormungandr/src/intercom.rs index 8efb04c45a..8fb8ae071e 100644 --- a/jormungandr/src/intercom.rs +++ b/jormungandr/src/intercom.rs @@ -628,5 +628,12 @@ pub enum ExplorerMsg { NewTip(HeaderHash), } +/// Messages to the notifier task +pub enum NotifierMsg { + NewBlock(Block), + NewTip(HeaderHash), + FragmentLog(FragmentId, FragmentStatus), +} + #[cfg(test)] mod tests {} diff --git a/jormungandr/src/main.rs b/jormungandr/src/main.rs index 5ea82261dc..83b100a4d5 100644 --- a/jormungandr/src/main.rs +++ b/jormungandr/src/main.rs @@ -39,6 +39,7 @@ pub mod leadership; pub mod log; pub mod metrics; pub mod network; +pub mod notifier; pub mod rest; pub mod secure; pub mod settings; @@ -77,6 +78,7 @@ const NETWORK_TASK_QUEUE_LEN: usize = 64; const EXPLORER_TASK_QUEUE_LEN: usize = 32; const CLIENT_TASK_QUEUE_LEN: usize = 32; const TOPOLOGY_TASK_QUEUE_LEN: usize = 32; +const NOTIFIER_TASK_QUEUE_LEN: usize = 32; const BOOTSTRAP_RETRY_WAIT: Duration = Duration::from_secs(5); fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::Error> { @@ -148,12 +150,29 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E } }; + let (notifier_msgbox, notifier) = { + let (msgbox, queue) = async_msg::channel(NOTIFIER_TASK_QUEUE_LEN); + + let blockchain_tip = blockchain_tip.clone(); + let current_tip = block_on(async { blockchain_tip.get_ref().await.header().id() }); + + let (notifier, message_processor) = + notifier::Notifier::new(current_tip, blockchain.storage().clone()); + + services.spawn_future("notifier", move |info| async move { + message_processor.start(info, queue).await + }); + + (msgbox, notifier) + }; + { let blockchain = blockchain.clone(); let blockchain_tip = blockchain_tip.clone(); let network_msgbox = network_msgbox.clone(); let fragment_msgbox = fragment_msgbox.clone(); let explorer_msgbox = explorer.as_ref().map(|(msg_box, _context)| msg_box.clone()); + let notifier_msgbox = notifier_msgbox.clone(); // TODO: we should get this value from the configuration let block_cache_ttl: Duration = Duration::from_secs(120); let stats_counter = stats_counter.clone(); @@ -165,6 +184,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E network_msgbox, fragment_msgbox, explorer_msgbox, + notifier_msgbox, garbage_collection_interval: block_cache_ttl, }; process.start(info, block_queue) @@ -206,6 +226,7 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E global_state, input: network_queue, channels, + notification_service: notifier, }; network::start(params) }); diff --git a/jormungandr/src/network/bootstrap.rs b/jormungandr/src/network/bootstrap.rs index 0da0a63145..1241ffba7b 100644 --- a/jormungandr/src/network/bootstrap.rs +++ b/jormungandr/src/network/bootstrap.rs @@ -272,6 +272,7 @@ where parent_tip.clone(), None, None, + None, ) .await { @@ -284,7 +285,7 @@ where } if let Some(parent_tip) = maybe_parent_tip { - blockchain::process_new_ref(&mut blockchain, branch, parent_tip, None, None) + blockchain::process_new_ref(&mut blockchain, branch, parent_tip, None, None, None) .await .map_err(Error::ChainSelectionFailed) } else { diff --git a/jormungandr/src/network/grpc/server.rs b/jormungandr/src/network/grpc/server.rs index ae38375829..41ca7cd796 100644 --- a/jormungandr/src/network/grpc/server.rs +++ b/jormungandr/src/network/grpc/server.rs @@ -14,6 +14,7 @@ pub async fn run_listen_socket( listen: &Listen, state: GlobalStateR, channels: Channels, + notification_service: crate::notifier::Notifier, ) -> Result<(), ListenError> { let sockaddr = listen.address(); let span = span!(parent: &state.span, Level::TRACE, "listen_socket", local_addr = %sockaddr.to_string()); @@ -25,6 +26,7 @@ pub async fn run_listen_socket( .concurrency_limit_per_connection(concurrency_limits::SERVER_REQUESTS) .tcp_keepalive(Some(keepalive_durations::TCP)) .add_service(service) + .add_service(notification_service.into_server()) .serve(sockaddr) .await .map_err(|cause| ListenError { cause, sockaddr }) diff --git a/jormungandr/src/network/mod.rs b/jormungandr/src/network/mod.rs index 145531a6c3..e0be1320ab 100644 --- a/jormungandr/src/network/mod.rs +++ b/jormungandr/src/network/mod.rs @@ -269,6 +269,7 @@ pub struct TaskParams { pub global_state: GlobalStateR, pub input: MessageQueue, pub channels: Channels, + pub notification_service: crate::notifier::Notifier, } pub async fn start(params: TaskParams) { @@ -278,6 +279,7 @@ pub async fn start(params: TaskParams) { let input = params.input; let channels = params.channels; let global_state = params.global_state; + let notification_service = params.notification_service; // open the port for listening/accepting other peers to connect too let listen_state = global_state.clone(); @@ -286,14 +288,19 @@ pub async fn start(params: TaskParams) { if let Some(listen) = listen_state.config.listen() { match listen.protocol { Protocol::Grpc => { - grpc::run_listen_socket(&listen, listen_state, listen_channels) - .await - .unwrap_or_else(|e| { - tracing::error!( - reason = %e, - "failed to listen for P2P connections at {}", listen.connection - ); - }); + grpc::run_listen_socket( + &listen, + listen_state, + listen_channels, + notification_service, + ) + .await + .unwrap_or_else(|e| { + tracing::error!( + reason = %e, + "failed to listen for P2P connections at {}", listen.connection + ); + }); } Protocol::Ntt => unimplemented!(), } diff --git a/jormungandr/src/notifier/mod.rs b/jormungandr/src/notifier/mod.rs new file mode 100644 index 0000000000..4a069219a7 --- /dev/null +++ b/jormungandr/src/notifier/mod.rs @@ -0,0 +1,249 @@ +pub use crate::intercom::NotifierMsg as Message; +use crate::{ + blockchain::Storage, + intercom::{self, ReplyStream}, + utils::async_msg::MessageQueue, +}; +use crate::{ + intercom::ReplyStreamHandle, + utils::{async_msg::MessageBox, task::TokioServiceInfo}, +}; +use chain_core::property::Serialize; +use chain_impl_mockchain::header::HeaderId; +use chain_watch::{ + mempool_event, + subscription_service_server::{self, SubscriptionServiceServer}, + Block, BlockId, BlockSubscriptionRequest, MempoolEvent, MempoolFragmentInABlock, + MempoolFragmentInserted, MempoolFragmentRejected, MempoolSubscriptionRequest, + SyncMultiverseRequest, TipSubscriptionRequest, +}; +use futures::Stream; +use futures::{ + stream::{Map, MapErr}, + SinkExt, StreamExt, TryStream, TryStreamExt, +}; +use jormungandr_lib::interfaces::FragmentStatus; +use std::sync::Arc; +use tokio::sync::{broadcast, watch, Mutex}; +use tokio_stream::wrappers::{BroadcastStream, WatchStream}; +use tonic::{Request, Response, Status}; + +#[derive(Clone)] +pub struct Notifier { + tip_receiver: watch::Receiver, + block_sender: Arc>, + mempool_sender: Arc>, + request_tx: Arc>>, +} + +pub struct MessageProcessor { + tip_sender: Arc>, + block_sender: Arc>, + mempool_sender: Arc>, + requests: MessageQueue, + storage: Storage, +} + +enum RequestMsg { + PullBlocks { + from: u32, + handle: ReplyStreamHandle, + }, +} + +impl MessageProcessor { + pub async fn start(self, info: TokioServiceInfo, mut queue: MessageQueue) { + let storage = self.storage; + let requests = self.requests; + info.spawn("notifier client", async move { + let storage = storage.clone(); + requests + .for_each(|msg| async { + match msg { + RequestMsg::PullBlocks { from, handle } => { + let mut from = from; + let mut sink = handle.start_sending(); + + loop { + tracing::debug!("sending block with chainlength"); + let blocks = storage.get_blocks_by_chain_length(from).unwrap(); + + for block in &blocks { + let _ = sink + .feed(Ok(Block { + content: block.serialize_as_vec().unwrap(), + })) + .await; + } + + from += 1; + + if blocks.is_empty() { + break; + } + } + } + } + }) + .await; + }); + + while let Some(input) = queue.next().await { + match input { + Message::NewBlock(block) => { + let block_sender = Arc::clone(&self.block_sender); + info.spawn("notifier broadcast block", async move { + if let Err(_err) = block_sender.send(Block { + content: block.serialize_as_vec().unwrap(), + }) {} + }); + } + Message::NewTip(block_id) => { + let tip_sender = Arc::clone(&self.tip_sender); + info.spawn("notifier broadcast new tip", async move { + if let Err(_err) = tip_sender.send(BlockId { + content: block_id.serialize_as_vec().unwrap(), + }) { + tracing::error!("notifier failed to broadcast tip {}", block_id); + } + }); + } + Message::FragmentLog(fragment_id, status) => { + let mempool_sender = Arc::clone(&self.mempool_sender); + info.spawn("notifier broadcast mempool update", async move { + let event = match status { + FragmentStatus::Pending => { + mempool_event::Event::Inserted(MempoolFragmentInserted {}) + } + FragmentStatus::Rejected { reason } => { + mempool_event::Event::Rejected(MempoolFragmentRejected { reason }) + } + FragmentStatus::InABlock { block, .. } => { + mempool_event::Event::InABlock(MempoolFragmentInABlock { + block: Some(BlockId { + content: block.into_hash().serialize_as_vec().unwrap(), + }), + }) + } + }; + + if let Err(_err) = mempool_sender.send(MempoolEvent { + fragment_id: fragment_id.serialize_as_vec().unwrap(), + event: Some(event), + }) {} + }); + } + } + } + } +} + +impl Notifier { + pub fn new(current_tip: HeaderId, storage: Storage) -> (Notifier, MessageProcessor) { + let (tip_sender, tip_receiver) = watch::channel(BlockId { + content: current_tip.serialize_as_vec().unwrap(), + }); + let (block_sender, _block_receiver) = broadcast::channel(16); + let (mempool_sender, _mempool_receiver) = broadcast::channel(16); + + let tip_sender = Arc::new(tip_sender); + let block_sender = Arc::new(block_sender); + let mempool_sender = Arc::new(mempool_sender); + + let (request_tx, requests) = crate::utils::async_msg::channel(16); + + let notifier = Notifier { + tip_receiver, + block_sender: Arc::clone(&block_sender), + mempool_sender: Arc::clone(&mempool_sender), + request_tx: Arc::new(Mutex::new(request_tx)), + }; + + let message_processor = MessageProcessor { + tip_sender, + block_sender: Arc::clone(&block_sender), + mempool_sender: Arc::clone(&mempool_sender), + storage, + requests, + }; + + (notifier, message_processor) + } + + pub fn into_server(self) -> SubscriptionServiceServer { + SubscriptionServiceServer::new(self) + } +} + +type SubscriptionTryStream = MapErr::Error) -> Status>; +type SubscriptionStream = Map::Item) -> Result<::Item, Status>>; + +#[tonic::async_trait] +impl subscription_service_server::SubscriptionService for Notifier { + type BlockSubscriptionStream = SubscriptionTryStream>; + type TipSubscriptionStream = SubscriptionStream>; + type MempoolSubscriptionStream = SubscriptionTryStream>; + type SyncMultiverseStream = SubscriptionTryStream>; + + async fn block_subscription( + &self, + _request: Request, + ) -> Result, Status> { + let block_receiver = BroadcastStream::new(self.block_sender.subscribe()); + + // there are two possible errors for the block_receiver. + // one occurs when there are no more senders, but that won't happen here. + // the other is when the receiver is lagging. I'm actually not sure + // what would be a sensible choice, so I just put some arbitrary error + // for now + let live_stream: SubscriptionTryStream> = + block_receiver.map_err(|_e| Status::deadline_exceeded("some updates were dropped")); + + Ok(Response::new(live_stream)) + } + + async fn tip_subscription( + &self, + _request: Request, + ) -> Result, Status> { + let tip_receiver: SubscriptionStream<_> = + WatchStream::new(self.tip_receiver.clone()).map::, _>(Ok); + + Ok(Response::new(tip_receiver)) + } + + async fn mempool_subscription( + &self, + _request: Request, + ) -> Result, Status> { + let mempool_receiver = BroadcastStream::new(self.mempool_sender.subscribe()); + + // see comment in `block_subscription` + Ok(Response::new(mempool_receiver.map_err(|_e| { + Status::deadline_exceeded("some updates were dropped") + }))) + } + + async fn sync_multiverse( + &self, + request: Request, + ) -> Result, Status> { + let from = request.get_ref().from; + + let (handle, future) = intercom::stream_reply(32); + + self.request_tx + .lock() + .await + .send(RequestMsg::PullBlocks { from, handle }) + .await + .map_err(|_| Status::internal("can't process request"))?; + + let stream = future + .await + .unwrap() + .map_err((|_| Status::internal("error")) as fn(intercom::Error) -> Status); + + Ok(Response::new(stream)) + } +}