From 29ec7a95c1858c24a64eb1d59b62925295764c97 Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Sat, 9 Mar 2024 12:27:11 -0500 Subject: [PATCH] more features and ci --- .github/workflows/build_and_upload.yml | 40 +++++ Cargo.lock | 8 + binaries/geph5-bridge/src/listen_forward.rs | 36 +++- binaries/geph5-bridge/src/main.rs | 4 +- binaries/geph5-broker/Cargo.toml | 4 +- binaries/geph5-broker/src/database.rs | 21 +++ binaries/geph5-broker/src/main.rs | 1 + binaries/geph5-broker/src/routes.rs | 52 ++++++ binaries/geph5-broker/src/rpc_impl.rs | 25 ++- binaries/geph5-client/Cargo.toml | 1 + binaries/geph5-client/src/client/inner.rs | 33 +++- binaries/geph5-client/src/exit.rs | 65 ++++++- binaries/geph5-exit/Cargo.toml | 3 + binaries/geph5-exit/src/listen.rs | 45 ++++- binaries/geph5-exit/src/listen/b2e_process.rs | 41 +++++ binaries/geph5-exit/src/main.rs | 1 + libraries/geph5-broker-protocol/src/lib.rs | 4 +- libraries/geph5-broker-protocol/src/route.rs | 2 +- libraries/geph5-misc-rpc/src/bridge.rs | 4 +- libraries/geph5-misc-rpc/src/exit.rs | 6 +- libraries/picomux/Cargo.toml | 4 +- libraries/picomux/src/frame.rs | 5 +- libraries/picomux/src/lib.rs | 163 +++++++++++------- libraries/sillad-sosistab3/src/lib.rs | 6 +- libraries/sillad/Cargo.toml | 1 + libraries/sillad/src/dialer.rs | 109 +++++++++++- libraries/sillad/src/lib.rs | 20 ++- libraries/sillad/src/tcp.rs | 6 +- 28 files changed, 597 insertions(+), 113 deletions(-) create mode 100644 .github/workflows/build_and_upload.yml create mode 100644 binaries/geph5-broker/src/routes.rs create mode 100644 binaries/geph5-exit/src/listen/b2e_process.rs diff --git a/.github/workflows/build_and_upload.yml b/.github/workflows/build_and_upload.yml new file mode 100644 index 0000000..bac885b --- /dev/null +++ b/.github/workflows/build_and_upload.yml @@ -0,0 +1,40 @@ +# .github/workflows/build_and_upload.yml +name: Build and Upload + +on: [push] + +jobs: + build: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Install musl-tools + run: sudo apt-get install -y musl-tools + + - name: Install Rust + uses: actions-rs/toolchain@v1 + with: + profile: minimal + toolchain: stable + override: true + target: x86_64-unknown-linux-musl + + - name: Build geph5-bridge + run: cargo build --locked --release --target x86_64-unknown-linux-musl --manifest-path binaries/geph5-bridge/Cargo.toml + + - name: Build geph5-exit + run: cargo build --locked --release --target x86_64-unknown-linux-musl --manifest-path binaries/geph5-exit/Cargo.toml + + # - name: Upload to S3 + # uses: jakejarvis/s3-sync-action@master + # with: + # args: --acl public-read --follow-symlinks --delete + # env: + # AWS_S3_BUCKET: ${{ secrets.AWS_S3_BUCKET }} + # AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }} + # AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + # AWS_REGION: "us-west-1" + # SOURCE_DIR: "target/x86_64-unknown-linux-musl/release" diff --git a/Cargo.lock b/Cargo.lock index 1059ea4..3cb5882 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1463,6 +1463,7 @@ dependencies = [ "ed25519-dalek", "futures-util", "geph5-broker-protocol", + "geph5-misc-rpc", "isocountry", "moka", "nanorpc", @@ -1474,6 +1475,7 @@ dependencies = [ "serde_json", "serde_yaml", "sillad", + "sillad-sosistab3", "smolscale", "sqlx", "thiserror", @@ -1528,6 +1530,7 @@ dependencies = [ "serde_json", "serde_yaml", "sillad", + "sillad-sosistab3", "smol 2.0.0", "smol-timeout", "smolscale", @@ -1555,6 +1558,7 @@ dependencies = [ "geph5-misc-rpc", "hex", "isocountry", + "moka", "nanorpc", "nursery_macro", "once_cell", @@ -1565,9 +1569,11 @@ dependencies = [ "serde_json", "serde_yaml", "sillad", + "sillad-sosistab3", "smol 2.0.0", "smolscale", "stdcode", + "tachyonix", "tap", "thiserror", "tracing", @@ -2465,6 +2471,7 @@ dependencies = [ "bytes", "dashmap", "fastrand 2.0.1", + "futures-intrusive 0.5.0", "futures-lite 2.2.0", "futures-util", "oneshot", @@ -3173,6 +3180,7 @@ dependencies = [ "futures-util", "libc", "pin-project", + "smol-timeout", ] [[package]] diff --git a/binaries/geph5-bridge/src/listen_forward.rs b/binaries/geph5-bridge/src/listen_forward.rs index 6df0278..9a1638f 100644 --- a/binaries/geph5-bridge/src/listen_forward.rs +++ b/binaries/geph5-bridge/src/listen_forward.rs @@ -1,5 +1,10 @@ -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + net::{IpAddr, SocketAddr}, + sync::Arc, + time::Duration, +}; +use anyhow::Context; use async_trait::async_trait; use deadpool::managed::Pool; use futures_util::AsyncReadExt as _; @@ -15,9 +20,11 @@ use sillad::{ use smol::future::FutureExt as _; use stdcode::StdcodeSerializeExt; +use tap::Tap; -pub async fn listen_forward_loop(listener: impl Listener) { +pub async fn listen_forward_loop(my_ip: IpAddr, listener: impl Listener) { let state = State { + my_ip, mapping: Cache::builder() .time_to_idle(Duration::from_secs(86400)) .build(), @@ -29,18 +36,22 @@ pub async fn listen_forward_loop(listener: impl Listener) { struct State { // b2e_dest => (metadata, task) - mapping: Cache>>)>, + my_ip: IpAddr, + mapping: Cache<(SocketAddr, B2eMetadata), (SocketAddr, Arc>>)>, } #[async_trait] impl BridgeControlProtocol for State { async fn tcp_forward(&self, b2e_dest: SocketAddr, metadata: B2eMetadata) -> SocketAddr { self.mapping - .get_with(b2e_dest, async { + .get_with((b2e_dest, metadata.clone()), async { let listener = TcpListener::bind("0.0.0.0:0".parse().unwrap()) .await .unwrap(); - let addr = listener.local_addr().await; + let addr = listener + .local_addr() + .await + .tap_mut(|s| s.set_ip(self.my_ip)); let task = smolscale::spawn(handle_one_listener(listener, b2e_dest, metadata)); (addr, Arc::new(task)) }) @@ -58,7 +69,9 @@ async fn handle_one_listener( let client_conn = listener.accept().await?; let metadata = metadata.clone(); smolscale::spawn(async move { - let exit_conn = dial_pooled(b2e_dest, &metadata.stdcode()).await?; + let exit_conn = dial_pooled(b2e_dest, &metadata.stdcode()) + .await + .inspect_err(|e| tracing::warn!("cannot dial pooled: {:?}", e))?; let (client_read, client_write) = client_conn.split(); let (exit_read, exit_write) = exit_conn.split(); smol::io::copy(exit_read, client_write) @@ -86,9 +99,13 @@ async fn dial_pooled(b2e_dest: SocketAddr, metadata: &[u8]) -> anyhow::Result deadpool::managed::RecycleResult { + tracing::debug!(alive = conn.is_alive(), "trying to recycle"); if !conn.is_alive() { let error = conn.open(b"").await.expect_err("dude"); return Err(error.into()); diff --git a/binaries/geph5-bridge/src/main.rs b/binaries/geph5-bridge/src/main.rs index 8901857..0fab527 100644 --- a/binaries/geph5-bridge/src/main.rs +++ b/binaries/geph5-bridge/src/main.rs @@ -46,7 +46,7 @@ fn main() { let control_cookie = format!("bridge-cookie-{}", rand::random::()); let control_listener = SosistabListener::new(listener, Cookie::new(&control_cookie)); let upload_loop = broker_upload_loop(control_listen, control_cookie); - let listen_loop = listen_forward_loop(control_listener); + let listen_loop = listen_forward_loop(my_ip, control_listener); upload_loop.race(listen_loop).await }) } @@ -88,6 +88,6 @@ async fn broker_upload_loop(control_listen: SocketAddr, control_cookie: String) .await .unwrap() .unwrap(); - async_io::Timer::after(Duration::from_secs(60)).await; + async_io::Timer::after(Duration::from_secs(10)).await; } } diff --git a/binaries/geph5-broker/Cargo.toml b/binaries/geph5-broker/Cargo.toml index facb6e2..464daad 100644 --- a/binaries/geph5-broker/Cargo.toml +++ b/binaries/geph5-broker/Cargo.toml @@ -18,6 +18,7 @@ serde_yaml = "0.9.32" smolscale = "0.4.4" sqlx = { version = "0.7", features = [ "runtime-tokio-rustls", "postgres", "chrono" ] } geph5-broker-protocol={path="../../libraries/geph5-broker-protocol"} +geph5-misc-rpc={path="../../libraries/geph5-misc-rpc"} async-trait = "0.1.77" nanorpc = "0.1.12" thiserror = "1.0.57" @@ -31,4 +32,5 @@ ed25519-dalek = "2.1.1" tokio = { version = "1.0", features = ["full"] } tracing-subscriber = "0.3.18" nanorpc-sillad={path="../../libraries/nanorpc-sillad"} -sillad={path="../../libraries/sillad"} \ No newline at end of file +sillad={path="../../libraries/sillad"} +sillad-sosistab3={path="../../libraries/sillad-sosistab3"} \ No newline at end of file diff --git a/binaries/geph5-broker/src/database.rs b/binaries/geph5-broker/src/database.rs index 02e5cb9..06f71cd 100644 --- a/binaries/geph5-broker/src/database.rs +++ b/binaries/geph5-broker/src/database.rs @@ -1,6 +1,7 @@ use std::{ops::Deref, str::FromStr, time::Duration}; use async_io::Timer; +use geph5_broker_protocol::BridgeDescriptor; use once_cell::sync::Lazy; use rand::Rng; use sqlx::{ @@ -40,6 +41,10 @@ pub async fn database_gc_loop() -> anyhow::Result<()> { .execute(POSTGRES.deref()) .await?; tracing::debug!(rows_affected = res.rows_affected(), "cleaned up exits"); + let res = sqlx::query("delete from bridges_new where expiry > extract(epoch from now())") + .execute(POSTGRES.deref()) + .await?; + tracing::debug!(rows_affected = res.rows_affected(), "cleaned up bridges"); } } @@ -78,3 +83,19 @@ pub async fn insert_exit(exit: &ExitRow) -> anyhow::Result<()> { .await?; Ok(()) } + +pub async fn query_bridges(key: &str) -> anyhow::Result> { + static RANDOM: Lazy = Lazy::new(|| format!("rando-{}", rand::random::())); + let raw: Vec<(String, String, String, i64)> = sqlx::query_as(r" + select distinct on (pool) listen, cookie, pool, expiry from bridges_new order by pool, encode(digest(listen || $1 || $2, 'sha256'), 'hex'); + ").bind(key).bind(RANDOM.deref()).fetch_all(POSTGRES.deref()).await?; + Ok(raw + .into_iter() + .map(|row| BridgeDescriptor { + control_listen: row.0.parse().unwrap(), + control_cookie: row.1, + pool: row.2, + expiry: row.3 as _, + }) + .collect()) +} diff --git a/binaries/geph5-broker/src/main.rs b/binaries/geph5-broker/src/main.rs index a66e37e..64865fe 100644 --- a/binaries/geph5-broker/src/main.rs +++ b/binaries/geph5-broker/src/main.rs @@ -11,6 +11,7 @@ use smolscale::immortal::{Immortal, RespawnStrategy}; use std::{fs, net::SocketAddr, path::PathBuf}; mod database; +mod routes; mod rpc_impl; /// The global config file. diff --git a/binaries/geph5-broker/src/routes.rs b/binaries/geph5-broker/src/routes.rs new file mode 100644 index 0000000..3a9386c --- /dev/null +++ b/binaries/geph5-broker/src/routes.rs @@ -0,0 +1,52 @@ +use std::{ + net::SocketAddr, + time::{Duration, SystemTime}, +}; + +use geph5_broker_protocol::{BridgeDescriptor, RouteDescriptor}; +use geph5_misc_rpc::bridge::{B2eMetadata, BridgeControlClient, ObfsProtocol}; +use moka::future::Cache; +use nanorpc_sillad::DialerTransport; +use once_cell::sync::Lazy; +use sillad::tcp::TcpDialer; +use sillad_sosistab3::{dialer::SosistabDialer, Cookie}; + +pub async fn bridge_to_leaf_route( + bridge: &BridgeDescriptor, + exit_b2e: SocketAddr, +) -> anyhow::Result { + static CACHE: Lazy> = Lazy::new(|| { + Cache::builder() + .time_to_live(Duration::from_secs(60)) + .build() + }); + + let cookie = Cookie::new(&bridge.control_cookie); + + CACHE + .try_get_with((bridge.control_listen, exit_b2e), async { + let dialer = SosistabDialer { + inner: TcpDialer { + dest_addr: bridge.control_listen, + }, + cookie, + }; + let cookie = format!("exit-cookie-{}", rand::random::()); + let control_client = BridgeControlClient(DialerTransport(dialer)); + let forwarded_listen = control_client + .tcp_forward( + exit_b2e, + B2eMetadata { + protocol: ObfsProtocol::Sosistab3(cookie.clone()), + expiry: SystemTime::now() + Duration::from_secs(86400), + }, + ) + .await?; + anyhow::Ok(RouteDescriptor::Sosistab3 { + cookie, + lower: RouteDescriptor::Tcp(forwarded_listen).into(), + }) + }) + .await + .map_err(|err| anyhow::anyhow!("bridge comms failed: {:?}", err)) +} diff --git a/binaries/geph5-broker/src/rpc_impl.rs b/binaries/geph5-broker/src/rpc_impl.rs index 2586247..f097593 100644 --- a/binaries/geph5-broker/src/rpc_impl.rs +++ b/binaries/geph5-broker/src/rpc_impl.rs @@ -1,4 +1,4 @@ -use std::{ops::Deref, sync::Arc, time::Duration}; +use std::{net::SocketAddr, ops::Deref, sync::Arc, time::Duration}; use async_trait::async_trait; use ed25519_dalek::{SigningKey, VerifyingKey}; @@ -11,7 +11,8 @@ use moka::future::Cache; use once_cell::sync::Lazy; use crate::{ - database::{insert_exit, ExitRow, POSTGRES}, + database::{insert_exit, query_bridges, ExitRow, POSTGRES}, + routes::bridge_to_leaf_route, CONFIG_FILE, }; @@ -72,9 +73,23 @@ impl BrokerProtocol for BrokerImpl { .map_err(|e: Arc| e.deref().clone()) } - async fn get_routes(&self, _exit: String) -> Result { - // Implement your logic here - unimplemented!(); + async fn get_routes(&self, exit: SocketAddr) -> Result { + // TODO auth + let raw_descriptors = query_bridges("").await?; + let mut routes = vec![]; + for desc in raw_descriptors { + match bridge_to_leaf_route(&desc, exit).await { + Ok(route) => routes.push(route), + Err(err) => { + tracing::warn!( + err = debug(err), + bridge = debug(desc), + "could not communicate" + ) + } + } + } + Ok(RouteDescriptor::Race(routes)) } async fn put_exit(&self, descriptor: Mac>) -> Result<(), GenericError> { diff --git a/binaries/geph5-client/Cargo.toml b/binaries/geph5-client/Cargo.toml index 3b245fb..e5b924c 100644 --- a/binaries/geph5-client/Cargo.toml +++ b/binaries/geph5-client/Cargo.toml @@ -17,6 +17,7 @@ geph5-broker-protocol={path="../../libraries/geph5-broker-protocol"} geph5-misc-rpc={path="../../libraries/geph5-misc-rpc"} picomux={path="../../libraries/picomux"} sillad={path="../../libraries/sillad"} +sillad-sosistab3={path="../../libraries/sillad-sosistab3"} nanorpc-sillad={path="../../libraries/nanorpc-sillad"} reqwest = {version="0.11.24", default-features=false, features=["rustls-tls"]} diff --git a/binaries/geph5-client/src/client/inner.rs b/binaries/geph5-client/src/client/inner.rs index ba48711..46409a5 100644 --- a/binaries/geph5-client/src/client/inner.rs +++ b/binaries/geph5-client/src/client/inner.rs @@ -10,7 +10,11 @@ use nursery_macro::nursery; use picomux::PicoMux; use sillad::{dialer::Dialer as _, Pipe}; use smol::future::FutureExt as _; -use std::{sync::Arc, time::Instant}; +use smol_timeout::TimeoutExt; +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; use stdcode::StdcodeSerializeExt; @@ -33,14 +37,25 @@ static CONN_REQ_CHAN: CtxField<( #[tracing::instrument(skip(ctx))] pub async fn client_inner(ctx: AnyCtx) -> anyhow::Result<()> { let start = Instant::now(); - let (pubkey, raw_dialer) = ctx.init().exit_constraint.dialer(&ctx).await?; - let raw_pipe = raw_dialer.dial().await?; - tracing::debug!(elapsed = debug(start.elapsed()), "raw dialer constructed"); - let authed_pipe = client_auth(raw_pipe, pubkey).await?; - tracing::debug!( - elapsed = debug(start.elapsed()), - "authentication done, starting mux system" - ); + let authed_pipe = async { + let (pubkey, raw_dialer) = ctx.init().exit_constraint.dialer(&ctx).await?; + tracing::debug!(elapsed = debug(start.elapsed()), "raw dialer constructed"); + let raw_pipe = raw_dialer.dial().await?; + tracing::debug!( + elapsed = debug(start.elapsed()), + protocol = raw_pipe.protocol(), + "dial completed" + ); + let authed_pipe = client_auth(raw_pipe, pubkey).await?; + tracing::debug!( + elapsed = debug(start.elapsed()), + "authentication done, starting mux system" + ); + anyhow::Ok(authed_pipe) + } + .timeout(Duration::from_secs(60)) + .await + .context("overall dial/mux/auth timeout")??; let (read, write) = authed_pipe.split(); let mux = Arc::new(PicoMux::new(read, write)); diff --git a/binaries/geph5-client/src/exit.rs b/binaries/geph5-client/src/exit.rs index 5cf3d09..3c0b782 100644 --- a/binaries/geph5-client/src/exit.rs +++ b/binaries/geph5-client/src/exit.rs @@ -1,15 +1,18 @@ +use std::time::Duration; + use anyctx::AnyCtx; use anyhow::Context; use ed25519_dalek::VerifyingKey; -use geph5_broker_protocol::{BrokerClient, DOMAIN_EXIT_DESCRIPTOR}; +use geph5_broker_protocol::{BrokerClient, RouteDescriptor, DOMAIN_EXIT_DESCRIPTOR}; use isocountry::CountryCode; use rand::seq::SliceRandom; use serde::{Deserialize, Serialize}; use sillad::{ - dialer::{DialerExt, DynDialer}, + dialer::{DialerExt, DynDialer, FailingDialer}, tcp::TcpDialer, }; +use sillad_sosistab3::{dialer::SosistabDialer, Cookie}; use crate::client::{Config, CtxField}; @@ -104,9 +107,63 @@ impl ExitConstraint { .min_by_key(|e| (e.1.load * 1000.0) as u64) .context("no exits that fit the criterion")?; tracing::debug!(exit = debug(&exit), "narrowed down choice of exit"); - let dialer = TcpDialer { + let direct_dialer = TcpDialer { dest_addr: exit.c2e_listen, }; - Ok((pubkey, dialer.dynamic())) + + // Also obtain the bridges + let bridge_routes = broker + .get_routes(exit.b2e_listen) + .await? + .map_err(|e| anyhow::anyhow!("broker refused to serve bridge routes: {e}"))?; + tracing::debug!( + bridge_routes = debug(&bridge_routes), + "bridge routes obtained too" + ); + + let bridge_dialer = route_to_dialer(&bridge_routes); + + Ok(( + pubkey, + direct_dialer + .race(bridge_dialer.delay(Duration::from_millis(200))) + .dynamic(), + )) + } +} + +fn route_to_dialer(route: &RouteDescriptor) -> DynDialer { + match route { + RouteDescriptor::Tcp(addr) => TcpDialer { dest_addr: *addr }.dynamic(), + RouteDescriptor::Sosistab3 { cookie, lower } => { + let inner = route_to_dialer(lower); + SosistabDialer { + inner, + cookie: Cookie::new(cookie), + } + .dynamic() + } + RouteDescriptor::Race(inside) => inside + .iter() + .map(route_to_dialer) + .reduce(|a, b| a.race(b).dynamic()) + .unwrap_or_else(|| FailingDialer.dynamic()), + RouteDescriptor::Fallback(a) => a + .iter() + .map(route_to_dialer) + .reduce(|a, b| a.fallback(b).dynamic()) + .unwrap_or_else(|| FailingDialer.dynamic()), + RouteDescriptor::Timeout { + milliseconds, + lower, + } => route_to_dialer(lower) + .timeout(Duration::from_millis(*milliseconds as _)) + .dynamic(), + RouteDescriptor::Delay { + milliseconds, + lower, + } => route_to_dialer(lower) + .delay(Duration::from_millis((*milliseconds).into())) + .dynamic(), } } diff --git a/binaries/geph5-exit/Cargo.toml b/binaries/geph5-exit/Cargo.toml index 503865a..576f6af 100644 --- a/binaries/geph5-exit/Cargo.toml +++ b/binaries/geph5-exit/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] geph5-broker-protocol={path="../../libraries/geph5-broker-protocol"} sillad={path="../../libraries/sillad"} +sillad-sosistab3={path="../../libraries/sillad-sosistab3"} picomux={path="../../libraries/picomux"} async-trait = "0.1.77" nanorpc = "0.1.12" @@ -35,3 +36,5 @@ stdcode = "0.1.14" x25519-dalek = "2.0.1" hex = "0.4.3" nursery_macro = "0.1.0" +moka = {version="0.12.5", features=["future"]} +tachyonix = "0.2.1" diff --git a/binaries/geph5-exit/src/listen.rs b/binaries/geph5-exit/src/listen.rs index a4451d3..1f80515 100644 --- a/binaries/geph5-exit/src/listen.rs +++ b/binaries/geph5-exit/src/listen.rs @@ -3,9 +3,11 @@ use ed25519_dalek::Signer; use futures_util::{AsyncReadExt, TryFutureExt}; use geph5_broker_protocol::{BrokerClient, ExitDescriptor, Mac, Signed, DOMAIN_EXIT_DESCRIPTOR}; use geph5_misc_rpc::{ + bridge::B2eMetadata, exit::{ClientCryptHello, ClientExitCryptPipe, ClientHello, ExitHello, ExitHelloInner}, read_prepend_length, write_prepend_length, }; +use moka::future::Cache; use picomux::PicoMux; use sillad::{listener::Listener, tcp::TcpListener, EitherPipe, Pipe}; use smol::future::FutureExt as _; @@ -15,15 +17,18 @@ use std::{ time::{Duration, SystemTime}, }; use stdcode::StdcodeSerializeExt; +use tachyonix::Sender; use tap::Tap; use x25519_dalek::{EphemeralSecret, PublicKey}; +mod b2e_process; use crate::{broker::BrokerRpcTransport, proxy::proxy_stream, CONFIG_FILE, SIGNING_SECRET}; pub async fn listen_main() -> anyhow::Result<()> { let c2e = c2e_loop(); + let b2e = b2e_loop(); let broker = broker_loop(); - c2e.race(broker).await + c2e.race(broker).race(b2e).await } #[tracing::instrument] @@ -74,7 +79,7 @@ async fn broker_loop() -> anyhow::Result<()> { .await? .map_err(|e| anyhow::anyhow!(e.0))?; - smol::Timer::after(Duration::from_secs(60)).await; + smol::Timer::after(Duration::from_secs(10)).await; } } None => { @@ -95,6 +100,42 @@ async fn c2e_loop() -> anyhow::Result<()> { } } +async fn b2e_loop() -> anyhow::Result<()> { + let mut listener = TcpListener::bind(CONFIG_FILE.wait().b2e_listen).await?; + let b2e_table: Cache> = Cache::builder() + .time_to_idle(Duration::from_secs(86400)) + .build(); + loop { + let b2e_raw = listener.accept().await?; + let (read, write) = b2e_raw.split(); + let mut b2e_mux = PicoMux::new(read, write); + let b2e_table = b2e_table.clone(); + smolscale::spawn::>(async move { + loop { + let lala = b2e_mux.accept().await?; + let b2e_metadata: B2eMetadata = stdcode::deserialize(lala.metadata())?; + tracing::debug!( + metadata = debug(&b2e_metadata), + "accepting b2e with metadata" + ); + let send = b2e_table + .get_with(b2e_metadata.clone(), async { + tracing::debug!( + metadata = debug(&b2e_metadata), + "this is a new table entry" + ); + let (send, recv) = tachyonix::channel(1); + smolscale::spawn(b2e_process::b2e_process(b2e_metadata, recv)).detach(); + send + }) + .await; + send.send(lala).await.ok().context("could not accept")?; + } + }) + .detach() + } +} + async fn handle_client(mut client: impl Pipe) -> anyhow::Result<()> { // execute the authentication let client_hello: ClientHello = stdcode::deserialize(&read_prepend_length(&mut client).await?)?; diff --git a/binaries/geph5-exit/src/listen/b2e_process.rs b/binaries/geph5-exit/src/listen/b2e_process.rs new file mode 100644 index 0000000..c782eab --- /dev/null +++ b/binaries/geph5-exit/src/listen/b2e_process.rs @@ -0,0 +1,41 @@ +use std::io::ErrorKind; + +use async_trait::async_trait; +use geph5_misc_rpc::bridge::{B2eMetadata, ObfsProtocol}; +use sillad_sosistab3::{listener::SosistabListener, Cookie}; +use tachyonix::Receiver; + +use super::handle_client; + +pub async fn b2e_process( + b2e_metadata: B2eMetadata, + recv: Receiver, +) -> anyhow::Result<()> { + let listener = ReceiverListener(recv); + match b2e_metadata.protocol { + ObfsProtocol::Sosistab3(cookie) => { + b2e_inner(SosistabListener::new(listener, Cookie::new(&cookie))).await + } + } +} + +async fn b2e_inner(mut listener: impl sillad::listener::Listener) -> anyhow::Result<()> { + loop { + let client = listener.accept().await?; + smolscale::spawn(handle_client(client)).detach(); + } +} + +struct ReceiverListener(Receiver); + +#[async_trait] +impl sillad::listener::Listener for ReceiverListener { + type P = picomux::Stream; + async fn accept(&mut self) -> std::io::Result { + if let Ok(val) = self.0.recv().await { + Ok(val) + } else { + Err(std::io::Error::new(ErrorKind::BrokenPipe, "channel closed")) + } + } +} diff --git a/binaries/geph5-exit/src/main.rs b/binaries/geph5-exit/src/main.rs index 9b5d8c6..95ddb76 100644 --- a/binaries/geph5-exit/src/main.rs +++ b/binaries/geph5-exit/src/main.rs @@ -6,6 +6,7 @@ use once_cell::sync::{Lazy, OnceCell}; use rand::Rng; use serde::Deserialize; use std::{net::SocketAddr, path::PathBuf}; + mod broker; mod listen; mod proxy; diff --git a/libraries/geph5-broker-protocol/src/lib.rs b/libraries/geph5-broker-protocol/src/lib.rs index 07319a4..cef6ee4 100644 --- a/libraries/geph5-broker-protocol/src/lib.rs +++ b/libraries/geph5-broker-protocol/src/lib.rs @@ -1,4 +1,4 @@ -use std::fmt::Display; +use std::{fmt::Display, net::SocketAddr}; use async_trait::async_trait; use nanorpc::nanorpc_derive; @@ -19,7 +19,7 @@ pub use bridge::*; #[async_trait] pub trait BrokerProtocol { async fn get_exits(&self) -> Result, GenericError>; - async fn get_routes(&self, exit: String) -> Result; + async fn get_routes(&self, exit_b2e: SocketAddr) -> Result; async fn put_exit(&self, descriptor: Mac>) -> Result<(), GenericError>; async fn put_bridge(&self, descriptor: Mac) -> Result<(), GenericError>; } diff --git a/libraries/geph5-broker-protocol/src/route.rs b/libraries/geph5-broker-protocol/src/route.rs index 3eda54a..7fe309d 100644 --- a/libraries/geph5-broker-protocol/src/route.rs +++ b/libraries/geph5-broker-protocol/src/route.rs @@ -2,7 +2,7 @@ use std::net::SocketAddr; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug)] #[serde(rename_all = "snake_case")] /// This fully describes a route to a particular exit. pub enum RouteDescriptor { diff --git a/libraries/geph5-misc-rpc/src/bridge.rs b/libraries/geph5-misc-rpc/src/bridge.rs index 0565a0a..67d82c2 100644 --- a/libraries/geph5-misc-rpc/src/bridge.rs +++ b/libraries/geph5-misc-rpc/src/bridge.rs @@ -5,14 +5,14 @@ use nanorpc::nanorpc_derive; use serde::{Deserialize, Serialize}; /// The metadata object passed to the exit on every b2e link. -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, Eq, Hash, PartialEq)] pub struct B2eMetadata { pub protocol: ObfsProtocol, pub expiry: SystemTime, } /// Initialization information for an obfuscation session. -#[derive(Serialize, Deserialize, Clone, Debug)] +#[derive(Serialize, Deserialize, Clone, Debug, Eq, Hash, PartialEq)] pub enum ObfsProtocol { Sosistab3(String), } diff --git a/libraries/geph5-misc-rpc/src/exit.rs b/libraries/geph5-misc-rpc/src/exit.rs index 3195e8c..561197a 100644 --- a/libraries/geph5-misc-rpc/src/exit.rs +++ b/libraries/geph5-misc-rpc/src/exit.rs @@ -158,4 +158,8 @@ impl ClientExitCryptPipe { } } -impl Pipe for ClientExitCryptPipe {} +impl Pipe for ClientExitCryptPipe { + fn protocol(&self) -> &str { + "client-exit" + } +} diff --git a/libraries/picomux/Cargo.toml b/libraries/picomux/Cargo.toml index bf7ab18..4438fc5 100644 --- a/libraries/picomux/Cargo.toml +++ b/libraries/picomux/Cargo.toml @@ -34,11 +34,13 @@ tap = "1.0.1" serde = {version="1.0.197", features=["derive"]} serde_json = "1.0.114" async-io = "2.3.1" +sillad={path="../sillad"} +futures-intrusive = "0.5.0" [dev-dependencies] argh = "0.1" smol="2" socksv5="0.3" -sillad={path="../sillad"} + sillad-sosistab3={path="../sillad-sosistab3"} tracing-subscriber="0.3" diff --git a/libraries/picomux/src/frame.rs b/libraries/picomux/src/frame.rs index 0cafdf4..3b27638 100644 --- a/libraries/picomux/src/frame.rs +++ b/libraries/picomux/src/frame.rs @@ -3,7 +3,7 @@ use bytes::Bytes; use futures_util::{AsyncRead, AsyncReadExt}; use serde::{Deserialize, Serialize}; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Frame { pub header: Header, pub body: Bytes, @@ -46,7 +46,7 @@ impl Frame { } } -#[derive(Clone, Copy, Pod, PartialEq, Zeroable)] +#[derive(Clone, Copy, Pod, PartialEq, Zeroable, Debug)] #[repr(C)] pub struct Header { pub version: u8, @@ -59,6 +59,7 @@ pub const CMD_SYN: u8 = 0; pub const CMD_FIN: u8 = 1; pub const CMD_PSH: u8 = 2; pub const CMD_NOP: u8 = 3; +pub const CMD_MORE: u8 = 4; pub const CMD_PING: u8 = 0xa0; pub const CMD_PONG: u8 = 0xa1; diff --git a/libraries/picomux/src/lib.rs b/libraries/picomux/src/lib.rs index 2a75629..3e13a02 100644 --- a/libraries/picomux/src/lib.rs +++ b/libraries/picomux/src/lib.rs @@ -22,7 +22,8 @@ use async_task::Task; use bytes::Bytes; use dashmap::DashMap; -use frame::{Frame, CMD_FIN, CMD_NOP, CMD_PING, CMD_PONG, CMD_PSH, CMD_SYN}; +use frame::{Frame, CMD_FIN, CMD_MORE, CMD_NOP, CMD_PING, CMD_PONG, CMD_PSH, CMD_SYN}; +use futures_intrusive::sync::SharedSemaphore; use futures_lite::{Future, FutureExt as LiteExt}; use futures_util::{ future::{FusedFuture, Shared}, @@ -34,11 +35,13 @@ use async_io::Timer; use parking_lot::Mutex; use rand::Rng; use smol_timeout::TimeoutExt; -use tachyonix::{Receiver, Sender}; +use tachyonix::{Receiver, Sender, TrySendError}; use tap::Tap; use crate::frame::{Header, PingInfo}; +const MORE_INTERVAL: usize = 200; + #[derive(Clone, Copy, Debug)] pub struct LivenessConfig { pub ping_interval: Duration, @@ -90,7 +93,7 @@ impl PicoMux { /// Returns whether the mux is alive. pub fn is_alive(&self) -> bool { - !self.task.is_terminated() + self.task.peek().is_none() } /// Sets the liveness maintenance configuration for this session. @@ -145,10 +148,7 @@ impl PicoMux { async move { let res = res.await; match res.deref() { - Err(err) => Err(std::io::Error::new( - err.kind(), - err.get_ref().map(|e| e.to_string()).unwrap_or_default(), - )), + Err(err) => Err(std::io::Error::new(err.kind(), err.to_string())), _ => unreachable!(), } } @@ -169,7 +169,7 @@ async fn picomux_inner( let (send_outgoing, mut recv_outgoing) = tachyonix::channel(1); let (send_pong, mut recv_pong) = tachyonix::channel(1); - let buffer_table: DashMap<_, _, BuildHasherDefault> = DashMap::default(); + let buffer_table: DashMap> = DashMap::default(); // writes outgoing frames let outgoing_loop = async { loop { @@ -197,7 +197,7 @@ async fn picomux_inner( }; let create_stream = |stream_id, metadata: Bytes| { - let (send_incoming, mut recv_incoming) = tachyonix::channel(100); + let (send_incoming, mut recv_incoming) = tachyonix::channel(MORE_INTERVAL * 2); let (mut write_incoming, read_incoming) = bipe::bipe(32768); let (write_outgoing, mut read_outgoing) = bipe::bipe(32768); let stream = Stream { @@ -205,58 +205,77 @@ async fn picomux_inner( read_incoming, metadata, }; + + let send_more = SharedSemaphore::new(false, 2); // jelly bean movers - smolscale::spawn::>(async move { - loop { - let frame: Frame = recv_incoming.recv().await?; - write_incoming.write_all(&frame.body).await?; + smolscale::spawn::>({ + let send_outgoing = send_outgoing.clone(); + + async move { + loop { + for _ in 0..MORE_INTERVAL { + let frame: Frame = recv_incoming.recv().await?; + write_incoming.write_all(&frame.body).await?; + } + send_outgoing + .send(Frame::new_empty(stream_id, CMD_MORE)) + .await?; + tracing::debug!(stream_id, MORE_INTERVAL, "sending MORE"); + } } }) .detach(); - let send_outgoing = send_outgoing.clone(); - smolscale::spawn::>(async move { - scopeguard::defer!({ - let send_outgoing = send_outgoing.clone(); - smolscale::spawn(async move { - send_outgoing - .send(Frame { + + smolscale::spawn::>({ + let send_more = send_more.clone(); + let send_outgoing = send_outgoing.clone(); + async move { + scopeguard::defer!({ + let send_outgoing = send_outgoing.clone(); + smolscale::spawn(async move { + send_outgoing + .send(Frame { + header: Header { + version: 1, + command: CMD_FIN, + body_len: 0, + stream_id, + }, + body: Bytes::new(), + }) + .await + }) + .detach(); + }); + let mut buf = [0u8; 16384]; + loop { + send_more.acquire(1).await.disarm(); + for _ in 0..MORE_INTERVAL { + let n = read_outgoing.read(&mut buf).await?; + if n == 0 { + return Ok(()); + } + let frame = Frame { header: Header { version: 1, - command: CMD_FIN, - body_len: 0, + command: CMD_PSH, + body_len: n as _, stream_id, }, - body: Bytes::new(), - }) - .await - }) - .detach(); - }); - let mut buf = [0u8; 16384]; - loop { - let n = read_outgoing.read(&mut buf).await?; - if n == 0 { - return Ok(()); + body: Bytes::copy_from_slice(&buf[..n]), + }; + tracing::trace!(stream_id, n, "sending outgoing data into channel"); + send_outgoing + .send(frame) + .await + .ok() + .context("cannot send")?; + } } - let frame = Frame { - header: Header { - version: 1, - command: CMD_PSH, - body_len: n as _, - stream_id, - }, - body: Bytes::copy_from_slice(&buf[..n]), - }; - tracing::trace!(stream_id, n, "sending outgoing data into channel"); - send_outgoing - .send(frame) - .await - .ok() - .context("cannot send")?; } }) .detach(); - (stream, send_incoming) + (stream, send_incoming, send_more) }; // receive open requests @@ -277,9 +296,11 @@ async fn picomux_inner( f.header.body_len = metadata.len() as _; })) .await; - let (stream, send_incoming) = create_stream(stream_id, metadata); + let (stream, send_incoming, send_more) = create_stream(stream_id, metadata); // thread safety: there can be no race because we are racing the futures in the foreground and there's no await point between when we obtain the id and when we insert - assert!(buffer_table.insert(stream_id, send_incoming).is_none()); + assert!(buffer_table + .insert(stream_id, (send_incoming, send_more)) + .is_none()); let _ = request.send(stream); } }; @@ -356,30 +377,33 @@ async fn picomux_inner( "duplicate SYN", )); } - let (stream, send_incoming) = + let (stream, send_incoming, send_more) = create_stream(stream_id, frame.body.clone()); - if send_accepted.try_send(stream).is_err() { + if let Err(err) = send_accepted.try_send(stream) { + tracing::warn!(err = debug(err), "oh dead"); + return Err(std::io::Error::new(ErrorKind::NotConnected, "dead")); + } + buffer_table.insert(frame.header.stream_id, (send_incoming, send_more)); + } + CMD_MORE => { + let back = buffer_table.get(&stream_id); + if let Some(back) = back { + back.1.release(1); + } else { tracing::warn!( stream_id = frame.header.stream_id, - "dropping stream because the accept queue is full" + "MORE to a stream that is no longer here" ); } - buffer_table.insert(frame.header.stream_id, send_incoming); } CMD_PSH => { let back = buffer_table.get(&stream_id); if let Some(back) = back { - if back - .send(frame.clone()) - .timeout(Duration::from_millis(200)) - .await - .is_none() - { - tracing::warn!( - stream_id = frame.header.stream_id, - "dropping stream because the read queue is full" - ); - buffer_table.remove(&stream_id); + if let Err(TrySendError::Full(_)) = back.0.try_send(frame.clone()) { + tracing::error!( + stream_id, + "receive queue full --- this should NEVER happen" + ) } } else { tracing::warn!( @@ -391,6 +415,7 @@ async fn picomux_inner( CMD_FIN => { buffer_table.remove(&frame.header.stream_id); } + CMD_NOP => {} CMD_PING => { let ping_info: PingInfo = @@ -504,6 +529,12 @@ impl AsyncWrite for Stream { } } +impl sillad::Pipe for Stream { + fn protocol(&self) -> &str { + "sillad-stream" + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/libraries/sillad-sosistab3/src/lib.rs b/libraries/sillad-sosistab3/src/lib.rs index 528040e..72ce648 100644 --- a/libraries/sillad-sosistab3/src/lib.rs +++ b/libraries/sillad-sosistab3/src/lib.rs @@ -185,4 +185,8 @@ impl AsyncRead for SosistabPipe

{ } } -impl Pipe for SosistabPipe

{} +impl Pipe for SosistabPipe

{ + fn protocol(&self) -> &str { + "sosistab3" + } +} diff --git a/libraries/sillad/Cargo.toml b/libraries/sillad/Cargo.toml index 5790d8d..e4c8f1c 100644 --- a/libraries/sillad/Cargo.toml +++ b/libraries/sillad/Cargo.toml @@ -12,3 +12,4 @@ futures-lite = "2.2.0" futures-util = {version="0.3.30", features=["io"]} libc = "0.2.153" pin-project = "1.1.4" +smol-timeout = "0.6.0" diff --git a/libraries/sillad/src/dialer.rs b/libraries/sillad/src/dialer.rs index 18966c1..416594e 100644 --- a/libraries/sillad/src/dialer.rs +++ b/libraries/sillad/src/dialer.rs @@ -1,9 +1,9 @@ use std::{pin::Pin, sync::Arc}; +use crate::{EitherPipe, Pipe}; use async_trait::async_trait; use futures_lite::{Future, FutureExt}; - -use crate::{EitherPipe, Pipe}; +use smol_timeout::TimeoutExt; #[async_trait] /// Dialers create pipes by initiating a connection to some sort of "other side". Failures are indicated by the standard I/O error type. @@ -20,6 +20,27 @@ pub trait DialerExt: Dialer { fn dynamic(self) -> DynDialer { DynDialer::new(self) } + + fn timeout(self, duration: std::time::Duration) -> TimeoutDialer { + TimeoutDialer { + dialer: self, + timeout: duration, + } + } + + fn fallback(self, fallback: B) -> FallbackDialer { + FallbackDialer { + primary: self, + fallback, + } + } + + fn delay(self, duration: std::time::Duration) -> DelayDialer { + DelayDialer { + dialer: self, + delay: duration, + } + } } impl DialerExt for T {} @@ -69,10 +90,92 @@ pub struct RaceDialer(pub L, pub R); impl Dialer for RaceDialer { type P = EitherPipe; async fn dial(&self) -> std::io::Result { - futures_lite::future::race( + race_ok( async { Ok(EitherPipe::Left(self.0.dial().await?)) }, async { Ok(EitherPipe::Right(self.1.dial().await?)) }, ) .await } } + +async fn race_ok(f1: F1, f2: F2) -> Result +where + F1: Future>, + F2: Future>, +{ + // unfortunately we do need to box here :( + match futures_util::future::select(Box::pin(f1), Box::pin(f2)).await { + futures_util::future::Either::Left((Ok(val), _)) => Ok(val), + futures_util::future::Either::Right((Ok(val), _)) => Ok(val), + futures_util::future::Either::Left((Err(_), f2)) => f2.await, + futures_util::future::Either::Right((Err(_), f1)) => f1.await, + } +} + +/// FailingDialer is a dialer that always fails and never returns anything. +pub struct FailingDialer; + +#[async_trait] +impl Dialer for FailingDialer { + type P = Box; + + async fn dial(&self) -> std::io::Result { + return Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "failing dialer always fails", + )); + } +} + +pub struct TimeoutDialer { + dialer: D, + timeout: std::time::Duration, +} + +#[async_trait] +impl Dialer for TimeoutDialer { + type P = D::P; + + async fn dial(&self) -> std::io::Result { + self.dialer + .dial() + .timeout(self.timeout) + .await + .ok_or(std::io::Error::new( + std::io::ErrorKind::TimedOut, + format!("dial timed out after {:?}", self.timeout), + ))? + } +} + +pub struct FallbackDialer { + primary: A, + fallback: B, +} + +#[async_trait] +impl Dialer for FallbackDialer { + type P = EitherPipe; + + async fn dial(&self) -> std::io::Result { + match self.primary.dial().await { + Ok(pipe) => Ok(EitherPipe::Left(pipe)), + Err(_) => self.fallback.dial().await.map(EitherPipe::Right), + } + } +} + +pub struct DelayDialer { + dialer: D, + delay: std::time::Duration, +} + +#[async_trait] +impl Dialer for DelayDialer { + type P = D::P; + + async fn dial(&self) -> std::io::Result { + async_io::Timer::after(self.delay).await; + self.dialer.dial().await + } +} diff --git a/libraries/sillad/src/lib.rs b/libraries/sillad/src/lib.rs index b6b17ec..4f77203 100644 --- a/libraries/sillad/src/lib.rs +++ b/libraries/sillad/src/lib.rs @@ -13,9 +13,20 @@ pub trait Pipe: AsyncRead + AsyncWrite + Send + Unpin + 'static { fn shared_secret(&self) -> Option<&[u8]> { None } + + /// This must return a string that uniquely identifies the protocol type. + fn protocol(&self) -> &str; } -impl Pipe for Box {} +impl Pipe for Box { + fn shared_secret(&self) -> Option<&[u8]> { + (**self).shared_secret() + } + + fn protocol(&self) -> &str { + (**self).protocol() + } +} /// EitherPipe is a pipe that is either left or right. #[pin_project(project = EitherPipeProj)] @@ -77,4 +88,11 @@ impl Pipe for EitherPipe { EitherPipe::Right(r) => r.shared_secret(), } } + + fn protocol(&self) -> &str { + match self { + EitherPipe::Left(l) => l.protocol(), + EitherPipe::Right(r) => r.protocol(), + } + } } diff --git a/libraries/sillad/src/tcp.rs b/libraries/sillad/src/tcp.rs index bb2b119..beb725c 100644 --- a/libraries/sillad/src/tcp.rs +++ b/libraries/sillad/src/tcp.rs @@ -109,4 +109,8 @@ impl AsyncWrite for TcpPipe { } } -impl Pipe for TcpPipe {} +impl Pipe for TcpPipe { + fn protocol(&self) -> &str { + "tcp" + } +}