Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
neonphog committed May 14, 2024
1 parent 9cc3736 commit e4171fe
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 135 deletions.
82 changes: 41 additions & 41 deletions crates/tx5-connection/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl ConnRecv {
pub struct Conn {
ready: Arc<tokio::sync::Semaphore>,
pub_key: PubKey,
cmd_send: Weak<tokio::sync::mpsc::Sender<ConnCmd>>,
cmd_send: CloseSend<ConnCmd>,
conn_task: tokio::task::JoinHandle<()>,
keepalive_task: tokio::task::JoinHandle<()>,

Expand All @@ -42,19 +42,19 @@ impl Conn {
}

pub(crate) fn priv_new(
is_polite: bool,
pub_key: PubKey,
client: Weak<tx5_signal::SignalConnection>,
config: Arc<tx5_signal::SignalConfig>,
) -> (Arc<Self>, ConnRecv, Arc<tokio::sync::mpsc::Sender<ConnCmd>>) {
) -> (Arc<Self>, ConnRecv, CloseSend<ConnCmd>) {
// zero len semaphore.. we actually just wait for the close
let ready = Arc::new(tokio::sync::Semaphore::new(0));

#[cfg(test)]
let webrtc_ready = Arc::new(tokio::sync::Semaphore::new(0));

let (msg_send, msg_recv) = tokio::sync::mpsc::channel(32);
let (cmd_send, mut cmd_recv) = tokio::sync::mpsc::channel(32);
let cmd_send = Arc::new(cmd_send);
let (mut msg_send, msg_recv) = CloseSend::channel();
let (cmd_send, mut cmd_recv) = CloseSend::channel();

let keepalive_dur = config.max_idle / 2;
let client2 = client.clone();
Expand All @@ -79,7 +79,7 @@ impl Conn {
let ready2 = ready.clone();
let client2 = client.clone();
let pub_key2 = pub_key.clone();
let cmd_send3 = Arc::downgrade(&cmd_send);
let cmd_send3 = cmd_send.clone();
let conn_task = tokio::task::spawn(async move {
let client = match client2.upgrade() {
Some(client) => client,
Expand Down Expand Up @@ -148,8 +148,7 @@ impl Conn {
ready2.close();

let (webrtc, mut webrtc_recv) = webrtc::Webrtc::new(
// TODO - we need the *this* pub_key to determine politeness
false,
is_polite,
// TODO - pass stun server config here
b"{}".to_vec(),
// TODO - make this configurable
Expand All @@ -158,8 +157,10 @@ impl Conn {

let client3 = client2.clone();
let pub_key3 = pub_key2.clone();
let msg_send2 = msg_send.clone();
let mut msg_send2 = msg_send.clone();
let _webrtc_task = AbortTask(tokio::task::spawn(async move {
msg_send2.set_close_on_drop(true);

use webrtc::WebrtcEvt::*;
while let Some(evt) = webrtc_recv.recv().await {
match evt {
Expand Down Expand Up @@ -208,22 +209,22 @@ impl Conn {
}
}
Ready => {
if let Some(cmd_send) = cmd_send3.upgrade() {
if cmd_send
.send(ConnCmd::WebrtcReady)
.await
.is_err()
{
break;
}
} else {
if cmd_send3
.send(ConnCmd::WebrtcReady)
.await
.is_err()
{
break;
}
}
}
}
}));

msg_send.set_close_on_drop(true);

let mut send_over_webrtc = false;

while let Ok(Some(cmd)) =
tokio::time::timeout(config.max_idle, cmd_recv.recv()).await
{
Expand Down Expand Up @@ -260,7 +261,11 @@ impl Conn {
}
}
ConnCmd::SendMessage(msg) => {
if let Some(client) = client2.upgrade() {
if send_over_webrtc {
if webrtc.message(msg).await.is_err() {
break;
}
} else if let Some(client) = client2.upgrade() {
if client
.send_message(&pub_key2, msg)
.await
Expand All @@ -273,6 +278,8 @@ impl Conn {
}
}
ConnCmd::WebrtcReady => {
send_over_webrtc = true;

#[cfg(test)]
webrtc_ready2.close();
}
Expand All @@ -287,20 +294,20 @@ impl Conn {
// the receiver side is closed because msg_send is dropped.
});

(
Arc::new(Self {
ready,
pub_key,
cmd_send: Arc::downgrade(&cmd_send),
conn_task,
keepalive_task,

#[cfg(test)]
webrtc_ready,
}),
ConnRecv(msg_recv),
cmd_send,
)
let mut cmd_send2 = cmd_send.clone();
cmd_send2.set_close_on_drop(true);
let this = Self {
ready,
pub_key,
cmd_send: cmd_send2,
conn_task,
keepalive_task,

#[cfg(test)]
webrtc_ready,
};

(Arc::new(this), ConnRecv(msg_recv), cmd_send)
}

/// Wait until this connection is ready to send / receive data.
Expand All @@ -322,13 +329,6 @@ impl Conn {

/// Send up to 16KiB of message data.
pub async fn send(&self, msg: Vec<u8>) -> Result<()> {
if let Some(cmd_send) = self.cmd_send.upgrade() {
cmd_send
.send(ConnCmd::SendMessage(msg))
.await
.map_err(|_| Error::other("closed"))
} else {
Err(Error::other("closed"))
}
self.cmd_send.send(ConnCmd::SendMessage(msg)).await
}
}
44 changes: 30 additions & 14 deletions crates/tx5-connection/src/hub.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub use super::*;

type HubMapT =
HashMap<PubKey, (Weak<Conn>, Arc<tokio::sync::mpsc::Sender<ConnCmd>>)>;
type HubMapT = HashMap<PubKey, (Weak<Conn>, CloseSend<ConnCmd>)>;
struct HubMap(HubMapT);

impl std::ops::Deref for HubMap {
Expand All @@ -25,15 +24,12 @@ impl HubMap {
}

async fn hub_map_assert(
is_polite: bool,
pub_key: PubKey,
map: &mut HubMap,
client: &Arc<tx5_signal::SignalConnection>,
config: &Arc<tx5_signal::SignalConfig>,
) -> Result<(
Option<ConnRecv>,
Arc<Conn>,
Arc<tokio::sync::mpsc::Sender<ConnCmd>>,
)> {
) -> Result<(Option<ConnRecv>, Arc<Conn>, CloseSend<ConnCmd>)> {
let mut found_during_prune = None;

map.retain(|_, c| {
Expand All @@ -56,12 +52,19 @@ async fn hub_map_assert(

// we're connected to the peer, create a connection

let (conn, recv, cmd_send) =
Conn::priv_new(pub_key.clone(), Arc::downgrade(client), config.clone());
let (conn, recv, cmd_send) = Conn::priv_new(
is_polite,
pub_key.clone(),
Arc::downgrade(client),
config.clone(),
);

let weak_conn = Arc::downgrade(&conn);

map.insert(pub_key, (weak_conn, cmd_send.clone()));
let mut store_cmd_send = cmd_send.clone();
store_cmd_send.set_close_on_drop(true);

map.insert(pub_key, (weak_conn, store_cmd_send));

Ok((Some(recv), conn, cmd_send))
}
Expand Down Expand Up @@ -140,15 +143,20 @@ impl Hub {
let (conn_send, conn_recv) = tokio::sync::mpsc::channel(32);
let weak_client = Arc::downgrade(&client);
let url = url.to_string();
let pub_key = client.pub_key().clone();
let this_pub_key = client.pub_key().clone();
task_list.push(tokio::task::spawn(async move {
let mut map = HubMap::new();
while let Some(cmd) = cmd_recv.recv().await {
match cmd {
HubCmd::CliRecv { pub_key, msg } => {
if let Some(client) = weak_client.upgrade() {
if pub_key == this_pub_key {
// ignore self messages
continue;
}
let is_polite = pub_key > this_pub_key;
let (recv, conn, cmd_send) = match hub_map_assert(
pub_key, &mut map, &client, &config,
is_polite, pub_key, &mut map, &client, &config,
)
.await
{
Expand All @@ -170,10 +178,18 @@ impl Hub {
}
}
HubCmd::Connect { pub_key, resp } => {
if pub_key == this_pub_key {
let _ = resp.send(Err(Error::other(
"cannot connect to self",
)));
continue;
}
let is_polite = pub_key > this_pub_key;
if let Some(client) = weak_client.upgrade() {
let _ = resp.send(
hub_map_assert(
pub_key, &mut map, &client, &config,
is_polite, pub_key, &mut map, &client,
&config,
)
.await
.map(|(recv, conn, _)| (recv, conn)),
Expand All @@ -190,7 +206,7 @@ impl Hub {
client.close().await;
}

tracing::debug!(%url, ?pub_key, "hub close");
tracing::debug!(%url, ?this_pub_key, "hub close");
}));

Ok((
Expand Down
87 changes: 85 additions & 2 deletions crates/tx5-connection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ compile_error!("Must specify exactly 1 webrtc backend");
pub use tx5_go_pion::Tx5InitConfig;

use std::collections::HashMap;
use std::io::{Error, Result};
use std::sync::{Arc, Weak};
use std::future::Future;
use std::io::{Error, ErrorKind, Result};
use std::sync::{Arc, Mutex, Weak};

pub use tx5_signal;
use tx5_signal::PubKey;
Expand All @@ -48,6 +49,88 @@ impl<R> Drop for AbortTask<R> {
}
}

struct CloseSend<T: 'static + Send> {
sender: Arc<Mutex<Option<tokio::sync::mpsc::Sender<T>>>>,
close_on_drop: bool,
}

impl<T: 'static + Send> Clone for CloseSend<T> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
close_on_drop: false,
}
}
}

impl<T: 'static + Send> Drop for CloseSend<T> {
fn drop(&mut self) {
if self.close_on_drop {
self.sender.lock().unwrap().take();
}
}
}

impl<T: 'static + Send> CloseSend<T> {
pub fn channel() -> (Self, tokio::sync::mpsc::Receiver<T>) {
let (s, r) = tokio::sync::mpsc::channel(32);
(
Self {
sender: Arc::new(Mutex::new(Some(s))),
close_on_drop: false,
},
r,
)
}

pub fn set_close_on_drop(&mut self, close_on_drop: bool) {
self.close_on_drop = close_on_drop;
}

pub fn send(
&self,
t: T,
) -> impl Future<Output = Result<()>> + 'static + Send {
let s = self.sender.lock().unwrap().clone();
async move {
match s {
Some(s) => {
s.send(t).await.map_err(|_| ErrorKind::BrokenPipe.into())
}
None => Err(ErrorKind::BrokenPipe.into()),
}
}
}

pub fn send_slow_app(
&self,
t: T,
) -> impl Future<Output = Result<()>> + 'static + Send {
// Grace time to allow a slow app to catch up before we close a
// connection to prevent our memory from filling up with backlogged
// message data.
const SLOW_APP_TO: std::time::Duration =
std::time::Duration::from_millis(99);

let s = self.sender.lock().unwrap().clone();
async move {
match s {
Some(s) => match s.send_timeout(t, SLOW_APP_TO).await {
Err(
tokio::sync::mpsc::error::SendTimeoutError::Timeout(_),
) => {
tracing::warn!("Closing connection due to slow app");
Err(ErrorKind::TimedOut.into())
}
Err(_) => Err(ErrorKind::BrokenPipe.into()),
Ok(_) => Ok(()),
},
None => Err(ErrorKind::BrokenPipe.into()),
}
}
}
}

mod webrtc;

mod hub;
Expand Down
Loading

0 comments on commit e4171fe

Please sign in to comment.