From 9cef927be73a4bf25621ae092e8e7cba79993b57 Mon Sep 17 00:00:00 2001 From: Enzo Cioppettini Date: Tue, 7 Jul 2020 15:08:44 -0300 Subject: [PATCH] remove lock and put each connection in a task the new connection requests are handled through a new message tip changes are handled with tokio::watch because we only care about the latest state, but block changes use a broadcast to keep old ones too --- jormungandr/src/intercom.rs | 1 + jormungandr/src/main.rs | 7 +- jormungandr/src/notifier/mod.rs | 229 ++++++++++++++-------------- jormungandr/src/rest/context.rs | 2 +- jormungandr/src/rest/v1/handlers.rs | 7 +- jormungandr/src/rest/v1/logic.rs | 7 +- 6 files changed, 126 insertions(+), 127 deletions(-) diff --git a/jormungandr/src/intercom.rs b/jormungandr/src/intercom.rs index c2a19cc581..2801941a9a 100644 --- a/jormungandr/src/intercom.rs +++ b/jormungandr/src/intercom.rs @@ -616,6 +616,7 @@ pub enum ExplorerMsg { pub enum NotifierMsg { NewBlock(HeaderHash), NewTip(HeaderHash), + NewConnection(warp::ws::WebSocket), } #[cfg(test)] diff --git a/jormungandr/src/main.rs b/jormungandr/src/main.rs index d06215e556..9237ad64ce 100644 --- a/jormungandr/src/main.rs +++ b/jormungandr/src/main.rs @@ -153,9 +153,12 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E .and_then(|settings| settings.notifier) .and_then(|settings| settings.max_connections); - let mut notifier = notifier::Notifier::new(max_connections); + let blockchain_tip = blockchain_tip.clone(); + let current_tip = block_on(async { blockchain_tip.get_ref().await.header().id() }); + + let notifier = notifier::Notifier::new(max_connections, current_tip); - let context = notifier.clone(); + let context = notifier::NotifierContext(msgbox.clone()); services.spawn_future("notifier", move |info| async move { notifier.start(info, queue).await diff --git a/jormungandr/src/notifier/mod.rs b/jormungandr/src/notifier/mod.rs index 108513e117..e49cbe9edf 100644 --- a/jormungandr/src/notifier/mod.rs +++ b/jormungandr/src/notifier/mod.rs @@ -1,13 +1,13 @@ use crate::intercom::NotifierMsg as Message; -use crate::utils::async_msg::{channel, MessageBox, MessageQueue}; +use crate::utils::async_msg::{MessageBox, MessageQueue}; use crate::utils::task::TokioServiceInfo; use chain_impl_mockchain::header::HeaderId; -use futures::{SinkExt, StreamExt}; +use futures::{select, SinkExt, StreamExt}; use jormungandr_lib::interfaces::notifier::JsonMessage; -use serde::{Serialize, Serializer}; use slog::Logger; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; +use tokio::sync::{broadcast, watch}; const MAX_CONNECTIONS_DEFAULT: usize = 255; @@ -17,72 +17,135 @@ const MAX_CONNECTIONS_DEFAULT: usize = 255; const MAX_CONNECTIONS_ERROR_CLOSE_CODE: u16 = 4000; const MAX_CONNECTIONS_ERROR_REASON: &str = "MAX CONNECTIONS reached"; -#[derive(Clone)] pub struct Notifier { - next_user_id: Arc, - clients: Arc>, + connection_counter: Arc, max_connections: usize, + tip_sender: Arc>, + tip_receiver: watch::Receiver, + block_sender: Arc>, } -type Clients = std::collections::HashMap>; +#[derive(Clone)] +pub struct NotifierContext(pub MessageBox); + +impl NotifierContext { + pub async fn new_connection(&mut self, ws: warp::ws::WebSocket) { + &mut self.0.send(Message::NewConnection(ws)).await; + } +} impl Notifier { - pub fn new(max_connections: Option) -> Notifier { + pub fn new(max_connections: Option, current_tip: HeaderId) -> Notifier { + let (tip_sender, tip_receiver) = watch::channel(current_tip); + let (block_sender, _block_receiver) = broadcast::channel(16); + Notifier { - next_user_id: Arc::new(AtomicUsize::new(1)), - clients: Default::default(), + connection_counter: Arc::new(AtomicUsize::new(0)), max_connections: max_connections.unwrap_or(MAX_CONNECTIONS_DEFAULT), + tip_sender: Arc::new(tip_sender), + tip_receiver, + block_sender: Arc::new(block_sender), } } - pub async fn start(&mut self, info: TokioServiceInfo, queue: MessageQueue) { - let clients1 = self.clients.clone(); - let clients2 = self.clients.clone(); - let logger = info.logger(); - - let (deleted_msgbox, deleted_queue) = channel::(32); - - // TODO: it may be better to have a task that runs periodically instead of - // when a sender is detected to be disconected, but that would require - // reading the sockets besides from writing to them - info.spawn( - "clean disconnected notifier clients", - handle_disconnected(clients2.clone(), deleted_queue), - ); + pub async fn start(&self, info: TokioServiceInfo, queue: MessageQueue) { + let info = Arc::new(info); queue - .for_each(|input| { - info.spawn( - "notifier send new messages", - process_message( - logger.clone(), - clients1.clone(), - input, - deleted_msgbox.clone(), - ), - ); + .for_each(move |input| { + let tip_sender = Arc::clone(&self.tip_sender); + let block_sender = Arc::clone(&self.block_sender); + let logger = info.logger().clone(); + + match input { + Message::NewBlock(block_id) => { + info.spawn("notifier broadcast block", async move { + if let Err(_err) = block_sender.send(block_id) { + () + } + }); + } + Message::NewTip(block_id) => { + info.spawn("notifier broadcast new tip", async move { + if let Err(_err) = tip_sender.broadcast(block_id) { + error!(logger, "notifier failed to broadcast tip {}", block_id); + } + }); + } + Message::NewConnection(ws) => { + trace!(logger, "processing notifier new connection"); + let info2 = Arc::clone(&info); + + let connection_counter = Arc::clone(&self.connection_counter); + let max_connections = self.max_connections; + let tip_receiver = self.tip_receiver.clone(); + + info.spawn("notifier process new messages", async move { + Self::new_connection( + info2, + max_connections, + connection_counter, + tip_receiver, + block_sender, + ws, + ) + .await; + }); + } + } + futures::future::ready(()) }) .await; } - pub async fn new_connection(&self, ws: warp::ws::WebSocket) { - let id = self.next_user_id.fetch_add(1, Ordering::Relaxed); - - let clients = Arc::clone(&self.clients); - - let rejected = async move { - let mut locked_clients = clients.write().await; - if locked_clients.len() < (self.max_connections as usize) { - locked_clients.insert(id, tokio::sync::Mutex::new(ws)); - None - } else { - Some(ws) - } - } - .await; + pub async fn new_connection( + info: Arc, + max_connections: usize, + connection_counter: Arc, + tip_receiver: watch::Receiver, + block_sender: Arc>, + mut ws: warp::ws::WebSocket, + ) { + let counter = connection_counter.load(Ordering::Acquire); + + if counter < max_connections { + connection_counter.store(counter + 1, Ordering::Release); + + let mut tip_receiver = tip_receiver.fuse(); + let mut block_receiver = block_sender.subscribe().fuse(); + + info.spawn("notifier connection", (move || async move { + loop { + select! { + msg = tip_receiver.next() => { + if let Some(msg) = msg { + let warp_msg = warp::ws::Message::text(JsonMessage::NewTip(msg.into())); + + if let Err(_disconnected) = ws.send(warp_msg).await { + break; + } + } + }, + msg = block_receiver.next() => { + // if this is an Err it means this receiver is lagging, in which case it will + // drop messages, I think ignoring that case and continuing with the rest is + // fine + if let Some(Ok(msg)) = msg { + let warp_msg = warp::ws::Message::text(JsonMessage::NewBlock(msg.into())); + + if let Err(_disconnected) = ws.send(warp_msg).await { + break; + } + } + }, + complete => break, + }; + } - if let Some(mut ws) = rejected { + futures::future::ready(()) + })().await); + } else { let close_msg = warp::ws::Message::close_with( MAX_CONNECTIONS_ERROR_CLOSE_CODE, MAX_CONNECTIONS_ERROR_REASON, @@ -93,69 +156,3 @@ impl Notifier { } } } - -async fn process_message( - logger: Logger, - clients: Arc>, - msg: Message, - mut disconnected: MessageBox, -) { - let warp_msg = warp::ws::Message::text(JsonMessage::from(msg)); - - let dead = notify_all(clients, warp_msg).await; - - for id in dead { - disconnected.send(id).await.unwrap_or_else(|err| { - error!( - logger, - "notifier error when adding id to disconnected: {}", err - ); - }); - } -} - -async fn notify_all( - clients: Arc>, - msg: warp::ws::Message, -) -> Vec { - let clients = clients.clone(); - async move { - let mut disconnected = vec![]; - let clients = clients.read().await; - for (client_id, channel) in clients.iter() { - if let Err(_disconnected) = channel.lock().await.send(msg.clone()).await { - disconnected.push(client_id.clone()) - } - } - disconnected - } - .await -} - -async fn handle_disconnected( - clients: Arc>, - disconnected: MessageQueue, -) { - async move { - let clients2 = Arc::clone(&clients); - disconnected - .for_each(|id| { - let clients_handle = Arc::clone(&clients2); - async move { - let mut locked_clients = clients_handle.write().await; - locked_clients.remove(&id); - } - }) - .await; - } - .await; -} - -impl From for JsonMessage { - fn from(msg: Message) -> JsonMessage { - match msg { - Message::NewBlock(inner) => JsonMessage::NewBlock(inner.into()), - Message::NewTip(inner) => JsonMessage::NewTip(inner.into()), - } - } -} diff --git a/jormungandr/src/rest/context.rs b/jormungandr/src/rest/context.rs index 0acc83476a..5cdbc1489b 100644 --- a/jormungandr/src/rest/context.rs +++ b/jormungandr/src/rest/context.rs @@ -140,5 +140,5 @@ pub struct FullContext { pub enclave: Enclave, pub network_state: NetworkStateR, pub explorer: Option, - pub notifier: crate::notifier::Notifier, + pub notifier: crate::notifier::NotifierContext, } diff --git a/jormungandr/src/rest/v1/handlers.rs b/jormungandr/src/rest/v1/handlers.rs index 0a14eb9677..f507f1b98f 100644 --- a/jormungandr/src/rest/v1/handlers.rs +++ b/jormungandr/src/rest/v1/handlers.rs @@ -1,4 +1,4 @@ -use crate::rest::{context::Error, v1::logic, ContextLock}; +use crate::rest::{v1::logic, ContextLock}; use warp::{reject::Reject, Rejection, Reply}; impl Reject for logic::Error {} @@ -51,9 +51,4 @@ pub async fn handle_subscription( logic::handle_subscription(ws, &context) .await .map_err(warp::reject::custom) - // let full_context = context.try_full().map_err(warp::reject::custom)?; - - // let notifier: crate::notifier::Notifier = full_context.notifier.clone(); - - // Ok(ws.on_upgrade(move |socket| add_connection(notifier, socket))) } diff --git a/jormungandr/src/rest/v1/logic.rs b/jormungandr/src/rest/v1/logic.rs index 16bf7d63a2..15133bff67 100644 --- a/jormungandr/src/rest/v1/logic.rs +++ b/jormungandr/src/rest/v1/logic.rs @@ -109,11 +109,14 @@ pub async fn handle_subscription( context: &Context, ) -> Result { let full_context = context.try_full()?; - let notifier: crate::notifier::Notifier = full_context.notifier.clone(); + let notifier: crate::notifier::NotifierContext = full_context.notifier.clone(); Ok(ws.on_upgrade(move |socket| add_connection(notifier, socket))) } -async fn add_connection(notifier: crate::notifier::Notifier, socket: warp::ws::WebSocket) { +async fn add_connection( + mut notifier: crate::notifier::NotifierContext, + socket: warp::ws::WebSocket, +) { notifier.new_connection(socket).await; }