Skip to content

Commit

Permalink
remove lock and put each connection in a task
Browse files Browse the repository at this point in the history
the new connection requests are handled through a new message
tip changes are handled with tokio::watch because we only care about the
latest state, but block changes use a broadcast to keep old ones too
  • Loading branch information
ecioppettini committed Sep 2, 2020
1 parent 2fa70b3 commit 9cef927
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 127 deletions.
1 change: 1 addition & 0 deletions jormungandr/src/intercom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,7 @@ pub enum ExplorerMsg {
pub enum NotifierMsg {
NewBlock(HeaderHash),
NewTip(HeaderHash),
NewConnection(warp::ws::WebSocket),
}

#[cfg(test)]
Expand Down
7 changes: 5 additions & 2 deletions jormungandr/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,12 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
.and_then(|settings| settings.notifier)
.and_then(|settings| settings.max_connections);

let mut notifier = notifier::Notifier::new(max_connections);
let blockchain_tip = blockchain_tip.clone();
let current_tip = block_on(async { blockchain_tip.get_ref().await.header().id() });

let notifier = notifier::Notifier::new(max_connections, current_tip);

let context = notifier.clone();
let context = notifier::NotifierContext(msgbox.clone());

services.spawn_future("notifier", move |info| async move {
notifier.start(info, queue).await
Expand Down
229 changes: 113 additions & 116 deletions jormungandr/src/notifier/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::intercom::NotifierMsg as Message;
use crate::utils::async_msg::{channel, MessageBox, MessageQueue};
use crate::utils::async_msg::{MessageBox, MessageQueue};
use crate::utils::task::TokioServiceInfo;
use chain_impl_mockchain::header::HeaderId;
use futures::{SinkExt, StreamExt};
use futures::{select, SinkExt, StreamExt};
use jormungandr_lib::interfaces::notifier::JsonMessage;
use serde::{Serialize, Serializer};
use slog::Logger;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::{broadcast, watch};

const MAX_CONNECTIONS_DEFAULT: usize = 255;

Expand All @@ -17,72 +17,135 @@ const MAX_CONNECTIONS_DEFAULT: usize = 255;
const MAX_CONNECTIONS_ERROR_CLOSE_CODE: u16 = 4000;
const MAX_CONNECTIONS_ERROR_REASON: &str = "MAX CONNECTIONS reached";

#[derive(Clone)]
pub struct Notifier {
next_user_id: Arc<AtomicUsize>,
clients: Arc<tokio::sync::RwLock<Clients>>,
connection_counter: Arc<AtomicUsize>,
max_connections: usize,
tip_sender: Arc<watch::Sender<HeaderId>>,
tip_receiver: watch::Receiver<HeaderId>,
block_sender: Arc<broadcast::Sender<HeaderId>>,
}

type Clients = std::collections::HashMap<usize, tokio::sync::Mutex<warp::ws::WebSocket>>;
#[derive(Clone)]
pub struct NotifierContext(pub MessageBox<Message>);

impl NotifierContext {
pub async fn new_connection(&mut self, ws: warp::ws::WebSocket) {
&mut self.0.send(Message::NewConnection(ws)).await;
}
}

impl Notifier {
pub fn new(max_connections: Option<usize>) -> Notifier {
pub fn new(max_connections: Option<usize>, current_tip: HeaderId) -> Notifier {
let (tip_sender, tip_receiver) = watch::channel(current_tip);
let (block_sender, _block_receiver) = broadcast::channel(16);

Notifier {
next_user_id: Arc::new(AtomicUsize::new(1)),
clients: Default::default(),
connection_counter: Arc::new(AtomicUsize::new(0)),
max_connections: max_connections.unwrap_or(MAX_CONNECTIONS_DEFAULT),
tip_sender: Arc::new(tip_sender),
tip_receiver,
block_sender: Arc::new(block_sender),
}
}

pub async fn start(&mut self, info: TokioServiceInfo, queue: MessageQueue<Message>) {
let clients1 = self.clients.clone();
let clients2 = self.clients.clone();
let logger = info.logger();

let (deleted_msgbox, deleted_queue) = channel::<usize>(32);

// TODO: it may be better to have a task that runs periodically instead of
// when a sender is detected to be disconected, but that would require
// reading the sockets besides from writing to them
info.spawn(
"clean disconnected notifier clients",
handle_disconnected(clients2.clone(), deleted_queue),
);
pub async fn start(&self, info: TokioServiceInfo, queue: MessageQueue<Message>) {
let info = Arc::new(info);

queue
.for_each(|input| {
info.spawn(
"notifier send new messages",
process_message(
logger.clone(),
clients1.clone(),
input,
deleted_msgbox.clone(),
),
);
.for_each(move |input| {
let tip_sender = Arc::clone(&self.tip_sender);
let block_sender = Arc::clone(&self.block_sender);
let logger = info.logger().clone();

match input {
Message::NewBlock(block_id) => {
info.spawn("notifier broadcast block", async move {
if let Err(_err) = block_sender.send(block_id) {
()
}
});
}
Message::NewTip(block_id) => {
info.spawn("notifier broadcast new tip", async move {
if let Err(_err) = tip_sender.broadcast(block_id) {
error!(logger, "notifier failed to broadcast tip {}", block_id);
}
});
}
Message::NewConnection(ws) => {
trace!(logger, "processing notifier new connection");
let info2 = Arc::clone(&info);

let connection_counter = Arc::clone(&self.connection_counter);
let max_connections = self.max_connections;
let tip_receiver = self.tip_receiver.clone();

info.spawn("notifier process new messages", async move {
Self::new_connection(
info2,
max_connections,
connection_counter,
tip_receiver,
block_sender,
ws,
)
.await;
});
}
}

futures::future::ready(())
})
.await;
}

pub async fn new_connection(&self, ws: warp::ws::WebSocket) {
let id = self.next_user_id.fetch_add(1, Ordering::Relaxed);

let clients = Arc::clone(&self.clients);

let rejected = async move {
let mut locked_clients = clients.write().await;
if locked_clients.len() < (self.max_connections as usize) {
locked_clients.insert(id, tokio::sync::Mutex::new(ws));
None
} else {
Some(ws)
}
}
.await;
pub async fn new_connection(
info: Arc<TokioServiceInfo>,
max_connections: usize,
connection_counter: Arc<AtomicUsize>,
tip_receiver: watch::Receiver<HeaderId>,
block_sender: Arc<broadcast::Sender<HeaderId>>,
mut ws: warp::ws::WebSocket,
) {
let counter = connection_counter.load(Ordering::Acquire);

if counter < max_connections {
connection_counter.store(counter + 1, Ordering::Release);

let mut tip_receiver = tip_receiver.fuse();
let mut block_receiver = block_sender.subscribe().fuse();

info.spawn("notifier connection", (move || async move {
loop {
select! {
msg = tip_receiver.next() => {
if let Some(msg) = msg {
let warp_msg = warp::ws::Message::text(JsonMessage::NewTip(msg.into()));

if let Err(_disconnected) = ws.send(warp_msg).await {
break;
}
}
},
msg = block_receiver.next() => {
// if this is an Err it means this receiver is lagging, in which case it will
// drop messages, I think ignoring that case and continuing with the rest is
// fine
if let Some(Ok(msg)) = msg {
let warp_msg = warp::ws::Message::text(JsonMessage::NewBlock(msg.into()));

if let Err(_disconnected) = ws.send(warp_msg).await {
break;
}
}
},
complete => break,
};
}

if let Some(mut ws) = rejected {
futures::future::ready(())
})().await);
} else {
let close_msg = warp::ws::Message::close_with(
MAX_CONNECTIONS_ERROR_CLOSE_CODE,
MAX_CONNECTIONS_ERROR_REASON,
Expand All @@ -93,69 +156,3 @@ impl Notifier {
}
}
}

async fn process_message(
logger: Logger,
clients: Arc<tokio::sync::RwLock<Clients>>,
msg: Message,
mut disconnected: MessageBox<usize>,
) {
let warp_msg = warp::ws::Message::text(JsonMessage::from(msg));

let dead = notify_all(clients, warp_msg).await;

for id in dead {
disconnected.send(id).await.unwrap_or_else(|err| {
error!(
logger,
"notifier error when adding id to disconnected: {}", err
);
});
}
}

async fn notify_all(
clients: Arc<tokio::sync::RwLock<Clients>>,
msg: warp::ws::Message,
) -> Vec<usize> {
let clients = clients.clone();
async move {
let mut disconnected = vec![];
let clients = clients.read().await;
for (client_id, channel) in clients.iter() {
if let Err(_disconnected) = channel.lock().await.send(msg.clone()).await {
disconnected.push(client_id.clone())
}
}
disconnected
}
.await
}

async fn handle_disconnected(
clients: Arc<tokio::sync::RwLock<Clients>>,
disconnected: MessageQueue<usize>,
) {
async move {
let clients2 = Arc::clone(&clients);
disconnected
.for_each(|id| {
let clients_handle = Arc::clone(&clients2);
async move {
let mut locked_clients = clients_handle.write().await;
locked_clients.remove(&id);
}
})
.await;
}
.await;
}

impl From<Message> for JsonMessage {
fn from(msg: Message) -> JsonMessage {
match msg {
Message::NewBlock(inner) => JsonMessage::NewBlock(inner.into()),
Message::NewTip(inner) => JsonMessage::NewTip(inner.into()),
}
}
}
2 changes: 1 addition & 1 deletion jormungandr/src/rest/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,5 @@ pub struct FullContext {
pub enclave: Enclave,
pub network_state: NetworkStateR,
pub explorer: Option<crate::explorer::Explorer>,
pub notifier: crate::notifier::Notifier,
pub notifier: crate::notifier::NotifierContext,
}
7 changes: 1 addition & 6 deletions jormungandr/src/rest/v1/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::rest::{context::Error, v1::logic, ContextLock};
use crate::rest::{v1::logic, ContextLock};
use warp::{reject::Reject, Rejection, Reply};

impl Reject for logic::Error {}
Expand Down Expand Up @@ -51,9 +51,4 @@ pub async fn handle_subscription(
logic::handle_subscription(ws, &context)
.await
.map_err(warp::reject::custom)
// let full_context = context.try_full().map_err(warp::reject::custom)?;

// let notifier: crate::notifier::Notifier = full_context.notifier.clone();

// Ok(ws.on_upgrade(move |socket| add_connection(notifier, socket)))
}
7 changes: 5 additions & 2 deletions jormungandr/src/rest/v1/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,14 @@ pub async fn handle_subscription(
context: &Context,
) -> Result<impl warp::Reply, Error> {
let full_context = context.try_full()?;
let notifier: crate::notifier::Notifier = full_context.notifier.clone();
let notifier: crate::notifier::NotifierContext = full_context.notifier.clone();

Ok(ws.on_upgrade(move |socket| add_connection(notifier, socket)))
}

async fn add_connection(notifier: crate::notifier::Notifier, socket: warp::ws::WebSocket) {
async fn add_connection(
mut notifier: crate::notifier::NotifierContext,
socket: warp::ws::WebSocket,
) {
notifier.new_connection(socket).await;
}

0 comments on commit 9cef927

Please sign in to comment.