Skip to content

Commit

Permalink
perf(jetsocat,dgw): limit number of syscalls in JMUX sender task (#976)
Browse files Browse the repository at this point in the history
Number of syscalls is reduced by using a `BufWriter` and waiting for
write operations to stop before calling `flush()`.

Performance is increased by ~28.4%.

Before this patch:

> 0.0000-17.1307 sec 28.8 GBytes 14.4 Gbits/sec

After this patch:

> 0.0000-13.8483 sec  29.9 GBytes  18.5 Gbits/sec
  • Loading branch information
CBenoit authored Aug 14, 2024
1 parent 8ebfd23 commit 11efaa5
Showing 1 changed file with 22 additions and 7 deletions.
29 changes: 22 additions & 7 deletions crates/jmux-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,28 +250,43 @@ impl<T: AsyncWrite + Unpin + Send + 'static> JmuxSenderTask<T> {
#[instrument("sender", skip_all)]
async fn run(self) -> anyhow::Result<()> {
let Self {
mut jmux_writer,
jmux_writer,
mut msg_to_send_rx,
} = self;

let mut jmux_writer = tokio::io::BufWriter::with_capacity(16 * 1024, jmux_writer);
let mut buf = bytes::BytesMut::new();
let mut needs_flush = false;

while let Some(msg) = msg_to_send_rx.recv().await {
trace!(?msg, "Send channel message");
loop {
tokio::select! {
msg = msg_to_send_rx.recv() => {
let Some(msg) = msg else {
break;
};

buf.clear();
msg.encode(&mut buf)?;
trace!(?msg, "Send channel message");

jmux_writer.write_all(&buf).await?;
buf.clear();
msg.encode(&mut buf)?;

jmux_writer.flush().await?;
jmux_writer.write_all(&buf).await?;
needs_flush = true;
}
_ = tokio::time::sleep(core::time::Duration::from_millis(10)), if needs_flush => {
jmux_writer.flush().await?;
needs_flush = false;
}
}
}

// TODO: send a signal to the main scheduler when we are done processing channel data messages
// and adjust windows for all the channels only then.

info!("Closing JMUX sender task...");

jmux_writer.flush().await?;

Ok(())
}
}
Expand Down

0 comments on commit 11efaa5

Please sign in to comment.