Skip to content

Commit

Permalink
Update ipstack-geph to v0.2.6, add moka dependency, refactor VPN tu…
Browse files Browse the repository at this point in the history
…nneling, use AtomicUsize in picomux, and log picomux stream counts
  • Loading branch information
nullchinchilla committed Sep 11, 2024
1 parent 25fcd29 commit aaa3b20
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions binaries/geph5-client/src/vpn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,8 @@ pub async fn vpn_loop(ctx: &AnyCtx<Config>) -> anyhow::Result<()> {
}
} else {
let tunneled = open_conn(&ctx, "udp", &peer_addr.to_string()).await?;
let (read_tunneled, write_tunneled) = tunneled.split();
let (mut read_tunneled, mut write_tunneled) = tunneled.split();
let up_loop = async {
let mut write_tunneled = BufWriter::new(write_tunneled);
loop {
let to_up = captured.recv().await?;
write_tunneled
Expand All @@ -230,7 +229,6 @@ pub async fn vpn_loop(ctx: &AnyCtx<Config>) -> anyhow::Result<()> {
}
};
let dn_loop = async {
let mut read_tunneled = BufReader::new(read_tunneled);
loop {
let mut len_buf = [0u8; 2];
read_tunneled.read_exact(&mut len_buf).await?;
Expand Down
16 changes: 12 additions & 4 deletions libraries/picomux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
ops::Deref,
pin::Pin,
sync::{
atomic::{AtomicU64, Ordering},
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc,
},
task::Poll,
Expand Down Expand Up @@ -203,7 +203,8 @@ async fn picomux_inner(
};

let create_stream = |stream_id, metadata: Bytes| {
let (send_incoming, mut recv_incoming) = tachyonix::channel(MAX_WINDOW);
let (send_incoming, mut recv_incoming) =
tachyonix::channel::<Box<(Frame, Instant)>>(MAX_WINDOW);
let (mut write_incoming, read_incoming) = bipe::bipe(MSS * 2);
let (write_outgoing, mut read_outgoing) = bipe::bipe(MSS * 2);
let stream = Stream {
Expand All @@ -217,15 +218,22 @@ async fn picomux_inner(
let send_more = SharedSemaphore::new(false, INIT_WINDOW);
// jelly bean movers
smolscale::spawn::<anyhow::Result<()>>({
static COUNT: AtomicUsize = AtomicUsize::new(0);

let send_outgoing = send_outgoing.clone();

async move {
let count = COUNT.fetch_add(1, Ordering::Relaxed);
eprintln!("opened {count} picomux streams");
scopeguard::defer!({
COUNT.fetch_sub(1, Ordering::Relaxed);
});
let mut remote_window = INIT_WINDOW;
let mut target_remote_window = MAX_WINDOW;
let mut last_window_adjust = Instant::now();
loop {
let min_quantum = (target_remote_window / 10).clamp(3, 50);
let (frame, enqueued_time): (Frame, Instant) = recv_incoming.recv().await?;
let (frame, enqueued_time): (Frame, Instant) = *recv_incoming.recv().await?;
let queue_delay = enqueued_time.elapsed();
tracing::trace!(
stream_id,
Expand Down Expand Up @@ -462,7 +470,7 @@ async fn picomux_inner(
let back = buffer_table.get(&stream_id);
if let Some(back) = back {
if let Err(TrySendError::Full(_)) =
back.0.try_send((frame.clone(), Instant::now()))
back.0.try_send(Box::new((frame.clone(), Instant::now())))
{
tracing::error!(
stream_id,
Expand Down

0 comments on commit aaa3b20

Please sign in to comment.