Skip to content

Commit

Permalink
rewrite for better detection of connection loss
Browse files Browse the repository at this point in the history
  • Loading branch information
muhamadazmy committed Dec 11, 2023
1 parent 23ab5c1 commit 4a3e8a6
Showing 1 changed file with 78 additions and 83 deletions.
161 changes: 78 additions & 83 deletions src/peer/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use crate::token;
use anyhow::{Context, Result};
use futures_util::sink::SinkExt;
use futures_util::stream::StreamExt;
use std::time::Duration;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use tokio::sync::mpsc;
use tokio::time::Instant;
use tokio_tungstenite::tungstenite::{Error, Message};
use tokio::time::timeout;
use tokio_tungstenite::tungstenite::Message;
use url::Url;

const PING_INTERVAL: Duration = Duration::from_secs(20);
const READ_TIMEOUT: Duration = Duration::from_secs(40);
const READ_TIMEOUT: u64 = 40; // seconds

pub struct Socket {
rx: mpsc::Receiver<Message>,
Expand Down Expand Up @@ -71,19 +73,9 @@ impl Socket {

let u = u.into();
let builder = token::TokenBuilder::new(id, None, signer);
log::info!("connecting to relay {:?} ....", &u.domain());
tokio::spawn(retainer(u, builder, up_rx, down_tx));
// we also auto send ping messages to detect stall connections
let pinger = connection.writer();
tokio::spawn(async move {
loop {
log::trace!("sending a ping");
if let Err(err) = pinger.write(Message::Ping(Vec::default()), None).await {
log::error!("ping error: {}", err);
}
tokio::time::sleep(PING_INTERVAL).await;
}
});

connection
}
}
Expand All @@ -99,103 +91,106 @@ async fn retainer<S: Signer>(

u.set_query(Some(token.as_str()));

log::info!("connecting to relay {:?}", &u.domain());
let (ws, _) = match tokio_tungstenite::connect_async(&u).await {
Ok(v) => v,
Err(err) => {
log::trace!(
log::error!(
"failed to establish connection to {:?} : {:#}",
u.domain(),
err
);
tokio::time::sleep(Duration::from_secs(2)).await;
log::trace!("retrying connection");
log::info!("retrying connection to {:?}", &u.domain());
continue;
}
};
log::info!("now connected to relay {:?}", &u.domain());

let (mut write, read) = ws.split();
let mut read = read_stream(read);
let mut last = Instant::now();
let (mut write, mut read) = ws.split();
let ts = Arc::new(AtomicU64::new(timestamp()));

let down_tx = down_tx.clone();
let (close, mut closed) = mpsc::channel(1);

let ts_clone = Arc::clone(&ts);
let handler = tokio::spawn(async move {
let ts = ts_clone;
while let Some(message) = read.next().await {
let message = match message {
Ok(message) => message,
Err(err) => {
if let Err(_) = close.send(err).await {

Check warning on line 123 in src/peer/socket.rs

View workflow job for this annotation

GitHub Actions / Test-Clippy-Build

redundant pattern matching, consider using `is_err()`
log::error!("failed to notify of socket connection loss");
}
return;
}
};

if message.is_pong() {
ts.store(timestamp(), Ordering::Relaxed);
log::debug!("received a pong");
continue;
}

'receive: loop {
// we check here when was the last time a message was received
// from the relay. we expect to receive PONG answers (because we
// send PING every PING_INTERVAL).
// hence if for some reason there are NO received messaged for
// period of READ_TIMEOUT, we can safely assume connection is stalling
// and we can try to reconnect
if Instant::now().duration_since(last) > READ_TIMEOUT {
log::error!("reading timeout trying to reconnect");
// the problem is on break this message will be lost!
break 'receive;
if let Err(_err) = down_tx.send(message).await {
log::error!("failed to push received message");
}
}
});

'receive: loop {
tokio::select! {
Some(message) = up_rx.recv() => {
log::trace!("sending message to relay {:?}", &u.domain());
if let Err(err) = write.send(message).await {
// probably connection closed as well, we need to renew!
log::error!("disconnected: error while sending message: {}", err);
break 'receive;
read_err = closed.recv() => {
if let Some(err) = read_err {
log::error!("read error: {}", err);
}
log::debug!("read routine exited, reconnecting");
// force reconnecting
break 'receive;
},
message = read.recv() => {
let message = match message {
None=> {
log::error!("disconnected: read stream ended") ;
break 'receive;
},
Some(message) => message,
};
message = timeout(PING_INTERVAL, up_rx.recv()) => {
// first check if we have timed out
let now = timestamp();
if now - ts.load(Ordering::Relaxed) > READ_TIMEOUT {
// we have timed out! we need to reconnect then
handler.abort();
log::info!("connection timeout! retrying");
break 'receive
}

// we take a note with when a message was received
last = Instant::now();
log::trace!("received a message from relay {:?}", &u.domain());
let message = match message {
Ok(Message::Pong(_)) => {
log::trace!("received a pong");
continue 'receive;
}
Ok(message) => message,
Err(err) => {
// todo: those errors probably mean we need to re-connect
// we will see what to do later.
log::error!("disconnected: error while receiving message: {}", err);
break 'receive;
Ok(Some(message)) => message,
Ok(None) => {
//weird why would we receive a non message
log::error!("socket closed!");
handler.abort();
return
},
Err(_) => {
// receive timeout (on upstream message)
// we can then send a ping to keep the connection alive
log::debug!("sending a ping");
Message::Ping(Vec::default())
}
};

if let Err(err) = down_tx.send(message).await {
log::error!("failed to queue received message for processing: {}", err);
log::trace!("sending message to relay {:?}", &u.domain());
if let Err(err) = write.send(message).await {
// probably connection closed as well, we need to renew!
log::error!("disconnected: error while sending message: {}", err);
break 'receive;
}
}

}
}
tokio::time::sleep(Duration::from_secs(2)).await;
}
}

fn read_stream(
mut stream: futures_util::stream::SplitStream<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
>,
) -> mpsc::Receiver<Result<Message, Error>> {
let (sender, receiver) = mpsc::channel(1);
tokio::spawn(async move {
loop {
match stream.next().await {
None => return,
Some(result) => {
if sender.send(result).await.is_err() {
return;
}
}
}
}
});

receiver
fn timestamp() -> u64 {
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}

0 comments on commit 4a3e8a6

Please sign in to comment.