Skip to content

Commit

Permalink
Bugfix/explorer fixes (nymtech#477)
Browse files Browse the repository at this point in the history
* Using original std::thread::spawn

* websocket client reconnection upon broken metrics
  • Loading branch information
jstuczyn authored Dec 2, 2020
1 parent 5fc81db commit 46d3591
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 12 deletions.
10 changes: 5 additions & 5 deletions explorer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ async fn main() {
.to_owned();

let public_path = std::env::current_exe()
.expect("Failed to evaluate current exe path")
.parent()
.expect("the binary itself has no parent path?!")
.join("public");
.expect("Failed to evaluate current exe path")
.parent()
.expect("the binary itself has no parent path?!")
.join("public");

tokio::task::spawn_blocking(|| {
std::thread::spawn(|| {
rocket::ignite()
.mount("/", StaticFiles::from(public_path))
.launch()
Expand Down
55 changes: 48 additions & 7 deletions explorer/src/websockets/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ use log::*;
use tokio::net::TcpStream;
use tokio::stream::StreamExt;
use tokio::sync::broadcast;
use tokio::time::{delay_for, Duration};
use tokio_native_tls::TlsStream;
use tokio_tungstenite::tungstenite::{Error as WsError, Message};
use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::{connect_async, stream::Stream};

pub(crate) type WsItem = Result<Message, WsError>;
const MAX_RECONNECTION_ATTEMPTS: u32 = 10;
const RECONNECTION_BACKOFF: Duration = Duration::from_secs(10);

/// A websocket client which subscribes to the metrics centrally collected by the metrics server.
/// All metrics messages get copied out to this dashboard instance's clients.
pub(crate) struct MetricsWebsocketClient {
metrics_address: String,
metrics_upstream: WebSocketStream<Stream<TcpStream, TlsStream<TcpStream>>>,
broadcaster: broadcast::Sender<Message>,

reconnection_attempt: u32,
}

impl MetricsWebsocketClient {
Expand All @@ -30,33 +36,68 @@ impl MetricsWebsocketClient {
info!("Subscribed to metrics websocket at {}", metrics_address);

Ok(MetricsWebsocketClient {
metrics_address: metrics_address.into(),
metrics_upstream: ws_stream,
broadcaster,
reconnection_attempt: 0,
})
}

async fn attempt_reconnection(&mut self) {
info!("attempting reconnection to metrics websocket...");
if self.reconnection_attempt >= MAX_RECONNECTION_ATTEMPTS {
// kill the process and reset everything when service restarts
error!("failed to re-establish websocket connection to metrics server");
std::process::exit(1)
}

// use linear backoff to try to reconnect asap
delay_for(RECONNECTION_BACKOFF * self.reconnection_attempt).await;

let ws_stream = match connect_async(&self.metrics_address).await {
Ok((ws_stream, _)) => ws_stream,
Err(err) => {
self.reconnection_attempt += 1;
info!("reconnection failed... - {}", err);
return;
}
};

info!("reconnected!");

self.reconnection_attempt = 0;
self.metrics_upstream = ws_stream;
}

/// When the metrics server sends a message, it should be copied out to the server and distributed
/// to all connected clients.
fn on_message(&self, item: WsItem) {
fn on_message(&self, item: WsItem) -> Result<(), WsError> {
let ws_message = match item {
Ok(message) => message,
Err(err) => {
error!("failed to obtain valid websocket message - {}", err);
return;
return Err(err);
}
};

match self.broadcaster.send(ws_message) {
Ok(received) => info!("broadcasted websocket metrics data to {} clients", received),
Err(_) => info!("no clients are currently subscribed"),
Ok(received) => debug!("broadcasted websocket metrics data to {} clients", received),
Err(_) => debug!("no clients are currently subscribed"),
}

Ok(())
}

pub(crate) async fn run(&mut self) {
while let Some(incoming) = self.metrics_upstream.next().await {
self.on_message(incoming)
loop {
if let Some(incoming) = self.metrics_upstream.next().await {
if let Err(_) = self.on_message(incoming) {
self.attempt_reconnection().await;
}
} else {
self.attempt_reconnection().await;
}
}
info!("Our metrics server subscriber is finished!")
}
}

Expand Down

0 comments on commit 46d3591

Please sign in to comment.