From 11ea2708a7e535319f668b9bee2b436aee4430c4 Mon Sep 17 00:00:00 2001 From: nullchinchilla Date: Fri, 26 Apr 2024 13:01:34 -0400 Subject: [PATCH] Add http dependency, update l10n and zoom factor management, implement HTTPS proxy support --- Cargo.lock | 8 + binaries/geph5-client-gui/Cargo.toml | 1 + binaries/geph5-client-gui/src/l10n.csv | 2 +- binaries/geph5-client-gui/src/main.rs | 10 +- binaries/geph5-client-gui/src/pac/linux.rs | 22 + binaries/geph5-client-gui/src/settings.rs | 28 +- .../src/settings_default.yaml | 1 + .../geph5-client-gui/src/tabs/dashboard.rs | 2 +- binaries/geph5-client/Cargo.toml | 6 + binaries/geph5-client/src/client.rs | 7 +- binaries/geph5-client/src/client_inner.rs | 17 +- .../geph5-client/src/http_proxy/address.rs | 125 +++++ .../src/http_proxy/http_client.rs | 107 +++++ binaries/geph5-client/src/http_proxy/mod.rs | 440 ++++++++++++++++++ binaries/geph5-client/src/lib.rs | 2 + binaries/geph5-client/src/socks5.rs | 32 +- libraries/picomux/Cargo.toml | 1 + libraries/picomux/src/lib.rs | 38 +- 18 files changed, 789 insertions(+), 60 deletions(-) create mode 100644 binaries/geph5-client/src/http_proxy/address.rs create mode 100644 binaries/geph5-client/src/http_proxy/http_client.rs create mode 100644 binaries/geph5-client/src/http_proxy/mod.rs diff --git a/Cargo.lock b/Cargo.lock index e3e26c9..0baab5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2170,9 +2170,11 @@ dependencies = [ "anyctx", "anyhow", "argh", + "async-compat", "async-trait", "blake3", "blind-rsa-signatures", + "bytes", "clone-macro", "dirs", "ed25519-dalek", @@ -2181,6 +2183,8 @@ dependencies = [ "geph5-broker-protocol", "geph5-misc-rpc", "hex", + "http 0.2.12", + "hyper 0.14.28", "isahc", "isocountry", "mizaru2", @@ -2190,6 +2194,7 @@ dependencies = [ "nursery_macro", "oneshot", "picomux", + "pin-project", "rand", "reqwest", "serde", @@ -2206,6 +2211,7 @@ dependencies = [ "tachyonix", "tap", "thiserror", + "tokio", "tracing", "tracing-subscriber", "x25519-dalek", @@ -2224,6 +2230,7 @@ dependencies = [ "futures-util", "geph5-broker-protocol", "geph5-client", + "http 1.1.0", "moka", "native-dialog", "once_cell", @@ -3799,6 +3806,7 @@ dependencies = [ "futures-util", "oneshot", "parking_lot 0.12.1", + "pin-project", "rand", "recycle-box", "scopeguard", diff --git a/binaries/geph5-client-gui/Cargo.toml b/binaries/geph5-client-gui/Cargo.toml index fdb6067..55f8375 100644 --- a/binaries/geph5-client-gui/Cargo.toml +++ b/binaries/geph5-client-gui/Cargo.toml @@ -30,6 +30,7 @@ tracing-subscriber = "0.3.18" serde_json = "1.0.115" oneshot = "0.1.6" chrono = "0.4.38" +http = "1.1.0" [target.'cfg(windows)'.dependencies] winreg = "0.52.0" diff --git a/binaries/geph5-client-gui/src/l10n.csv b/binaries/geph5-client-gui/src/l10n.csv index c1f5c55..ff52c39 100644 --- a/binaries/geph5-client-gui/src/l10n.csv +++ b/binaries/geph5-client-gui/src/l10n.csv @@ -8,7 +8,7 @@ connect,Connect,连接,Подключить,Etesāl connected,Connected,已连接,Подключено,Mottasel connection_time,Connection time,连接时间,Время соединения,Zamān-e etesāl dashboard,Dashboard,仪表盘,Приборная панель,Dāšbord -data_used,Data used,已用流量,Использованные данные,Dādehā-ye maṣraf-šode +data_used,Data used,已用流量,Использ. данные,Dādehā-ye maṣraf-šode disconnect,Disconnect,断开连接,Отключить,Qat'-e etesāl disconnected,Disconnected,已断开连接,Отключено,Qat' šode ast download_speed,Download speed,下载速度,Скорость загрузки,Sor'at-e dānlod diff --git a/binaries/geph5-client-gui/src/main.rs b/binaries/geph5-client-gui/src/main.rs index 8b00231..97ee200 100644 --- a/binaries/geph5-client-gui/src/main.rs +++ b/binaries/geph5-client-gui/src/main.rs @@ -17,7 +17,7 @@ use l10n::l10n; use logs::LogLayer; use native_dialog::MessageType; use prefs::{pref_read, pref_write}; -use settings::render_settings; +use settings::{render_settings, ZOOM_FACTOR}; use tabs::{dashboard::Dashboard, logs::Logs}; use tracing_subscriber::{layer::SubscriberExt as _, util::SubscriberInitExt, EnvFilter}; @@ -46,9 +46,10 @@ fn main() { let native_options = eframe::NativeOptions { viewport: egui::ViewportBuilder::default() - .with_inner_size([350.0, 350.0]) - .with_min_inner_size([350.0, 350.0]) - .with_max_inner_size([350.0, 350.0]), + .with_inner_size([320.0, 320.0]) + .with_min_inner_size([320.0, 320.0]) + // .with_max_inner_size([320.0, 320.0]) + , // shader_version: Some(ShaderVersion::Es100), ..Default::default() }; @@ -116,6 +117,7 @@ impl App { impl eframe::App for App { fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) { + ctx.set_zoom_factor(ZOOM_FACTOR.get()); ctx.request_repaint_after(Duration::from_millis(300)); // ctx.request_repaint(); diff --git a/binaries/geph5-client-gui/src/pac/linux.rs b/binaries/geph5-client-gui/src/pac/linux.rs index d5fd5f6..1a7817b 100644 --- a/binaries/geph5-client-gui/src/pac/linux.rs +++ b/binaries/geph5-client-gui/src/pac/linux.rs @@ -30,6 +30,28 @@ pub fn set_http_proxy(proxy: SocketAddr) -> anyhow::Result<()> { ]) .output() .context("Failed to set HTTP proxy port")?; + Command::new("gsettings") + .args(["set", "org.gnome.system.proxy.https", "enabled", "true"]) + .output() + .context("Failed to enable HTTPS proxy setting")?; + Command::new("gsettings") + .args([ + "set", + "org.gnome.system.proxy.https", + "host", + proxy.ip().to_string().as_str(), + ]) + .output() + .context("Failed to set HTTPS proxy host")?; + Command::new("gsettings") + .args([ + "set", + "org.gnome.system.proxy.https", + "port", + &proxy.port().to_string(), + ]) + .output() + .context("Failed to set HTTPS proxy port")?; Command::new("gsettings") .args(["set", "org.gnome.system.proxy", "mode", "manual"]) .output() diff --git a/binaries/geph5-client-gui/src/settings.rs b/binaries/geph5-client-gui/src/settings.rs index 60c2d9b..fe0ed18 100644 --- a/binaries/geph5-client-gui/src/settings.rs +++ b/binaries/geph5-client-gui/src/settings.rs @@ -22,7 +22,7 @@ static USERNAME: Lazy> = static PASSWORD: Lazy> = Lazy::new(|| StoreCell::new_persistent("password", || "".to_string())); -static ZOOM_FACTOR: Lazy> = +pub static ZOOM_FACTOR: Lazy> = Lazy::new(|| StoreCell::new_persistent("zoom_factor", || 1.0)); pub static LANG_CODE: Lazy> = @@ -32,8 +32,6 @@ pub static PROXY_AUTOCONF: Lazy> = Lazy::new(|| StoreCell::new_persistent("proxy_autoconf", || false)); pub fn render_settings(ctx: &egui::Context, ui: &mut egui::Ui) -> anyhow::Result<()> { - ctx.set_zoom_factor(ZOOM_FACTOR.get()); - // Account settings // ui.heading(l10n("account_info")); USERNAME.modify(|username| { @@ -55,7 +53,15 @@ pub fn render_settings(ctx: &egui::Context, ui: &mut egui::Ui) -> anyhow::Result ZOOM_FACTOR.modify(|zoom_factor| { ui.horizontal(|ui| { ui.label(l10n("zoom_factor")); - ui.add(egui::Slider::new(zoom_factor, 0.5..=3.0)); // Adjusted range for better control + egui::ComboBox::from_id_source("zoom_factor_cmbx") + .selected_text(format!("{:.2}", zoom_factor)) + .show_ui(ui, |ui| { + ui.selectable_value(zoom_factor, 1.0, "1.0"); + ui.selectable_value(zoom_factor, 1.25, "1.25"); + ui.selectable_value(zoom_factor, 1.5, "1.5"); + ui.selectable_value(zoom_factor, 1.75, "1.75"); + ui.selectable_value(zoom_factor, 2.0, "2.0"); + }); }) }); @@ -89,14 +95,14 @@ pub fn render_settings(ctx: &egui::Context, ui: &mut egui::Ui) -> anyhow::Result }) }); - // Configuration file - ui.separator(); - // ui.heading(l10n("Configuration File")); - let config = get_config()?; - let config_json = serde_json::to_value(config)?; - let config_yaml = serde_yaml::to_string(&config_json)?; + // // Configuration file + // ui.separator(); + // // ui.heading(l10n("Configuration File")); + // let config = get_config()?; + // let config_json = serde_json::to_value(config)?; + // let config_yaml = serde_yaml::to_string(&config_json)?; - egui::ScrollArea::vertical().show(ui, |ui| ui.code_editor(&mut config_yaml.as_str())); + // egui::ScrollArea::vertical().show(ui, |ui| ui.code_editor(&mut config_yaml.as_str())); Ok(()) } diff --git a/binaries/geph5-client-gui/src/settings_default.yaml b/binaries/geph5-client-gui/src/settings_default.yaml index 29cca21..b839e6a 100644 --- a/binaries/geph5-client-gui/src/settings_default.yaml +++ b/binaries/geph5-client-gui/src/settings_default.yaml @@ -1,4 +1,5 @@ socks5_listen: 127.0.0.1:9999 +http_proxy_listen: 127.0.0.1:19999 exit_constraint: auto broker: diff --git a/binaries/geph5-client-gui/src/tabs/dashboard.rs b/binaries/geph5-client-gui/src/tabs/dashboard.rs index 1385a29..217ce54 100644 --- a/binaries/geph5-client-gui/src/tabs/dashboard.rs +++ b/binaries/geph5-client-gui/src/tabs/dashboard.rs @@ -40,7 +40,7 @@ impl Dashboard { if ui.button(l10n("connect")).clicked() { tracing::warn!("connect clicked"); if PROXY_AUTOCONF.get() { - set_http_proxy("127.0.0.1:11111".parse()?)?; + set_http_proxy(get_config()?.http_proxy_listen)?; } *daemon = Some(geph5_client::Client::start(get_config()?)); } diff --git a/binaries/geph5-client/Cargo.toml b/binaries/geph5-client/Cargo.toml index 6f10596..79edd12 100644 --- a/binaries/geph5-client/Cargo.toml +++ b/binaries/geph5-client/Cargo.toml @@ -49,3 +49,9 @@ tap = "1.0.1" dirs = "5.0.1" moka = {version="0.12.5", features=["future"]} isahc = {version="1.7.2", features=["static-ssl"]} +hyper = { version = "0.14.27", features = ["http1", "client", "server", "tcp", "stream"] } +http = "0.2.9" +tokio = { version = "1.33.0", features = ["rt", "net", "io-util"] } +bytes = "1.6.0" +pin-project = "1.1.5" +async-compat = "0.2.3" diff --git a/binaries/geph5-client/src/client.rs b/binaries/geph5-client/src/client.rs index 4edb28e..b3e0331 100644 --- a/binaries/geph5-client/src/client.rs +++ b/binaries/geph5-client/src/client.rs @@ -19,6 +19,7 @@ use crate::{ broker::{broker_client, BrokerSource}, client_inner::client_once, database::db_read_or_wait, + http_proxy::run_http_proxy, route::ExitConstraint, socks5::socks5_loop, stats::STAT_TOTAL_BYTES, @@ -27,6 +28,7 @@ use crate::{ #[derive(Serialize, Deserialize, Clone)] pub struct Config { pub socks5_listen: SocketAddr, + pub http_proxy_listen: SocketAddr, pub exit_constraint: ExitConstraint, pub cache: Option, pub broker: Option, @@ -120,6 +122,9 @@ async fn client_main(ctx: AnyCtx) -> anyhow::Result<()> { |e| tracing::warn!("client died and restarted: {:?}", e) )), ); - socks5_loop(&ctx).race(auth_loop(&ctx)).await + socks5_loop(&ctx) + .race(run_http_proxy(&ctx)) + .race(auth_loop(&ctx)) + .await } } diff --git a/binaries/geph5-client/src/client_inner.rs b/binaries/geph5-client/src/client_inner.rs index a196131..8105b0f 100644 --- a/binaries/geph5-client/src/client_inner.rs +++ b/binaries/geph5-client/src/client_inner.rs @@ -1,5 +1,6 @@ use anyctx::AnyCtx; use anyhow::Context; +use clone_macro::clone; use ed25519_dalek::VerifyingKey; use futures_util::{future::try_join_all, AsyncReadExt as _}; use geph5_misc_rpc::{ @@ -21,7 +22,9 @@ use std::{ use stdcode::StdcodeSerializeExt; -use crate::{auth::get_connect_token, client::CtxField, route::get_dialer}; +use crate::{ + auth::get_connect_token, client::CtxField, route::get_dialer, stats::STAT_TOTAL_BYTES, +}; use super::Config; @@ -29,7 +32,17 @@ pub async fn open_conn(ctx: &AnyCtx, dest_addr: &str) -> anyhow::Result< let (send, recv) = oneshot::channel(); let elem = (dest_addr.to_string(), send); let _ = ctx.get(CONN_REQ_CHAN).0.send(elem).await; - Ok(recv.await?) + let mut conn = recv.await?; + let ctx = ctx.clone(); + conn.set_on_read(clone!([ctx], move |n| { + ctx.get(STAT_TOTAL_BYTES) + .fetch_add(n as _, Ordering::Relaxed); + })); + conn.set_on_write(clone!([ctx], move |n| { + ctx.get(STAT_TOTAL_BYTES) + .fetch_sub(n as _, Ordering::Relaxed); + })); + Ok(conn) } type ChanElem = (String, oneshot::Sender); diff --git a/binaries/geph5-client/src/http_proxy/address.rs b/binaries/geph5-client/src/http_proxy/address.rs new file mode 100644 index 0000000..3933536 --- /dev/null +++ b/binaries/geph5-client/src/http_proxy/address.rs @@ -0,0 +1,125 @@ +use bytes::{Buf, BufMut, BytesMut}; +use std::{ + fmt::{self, Debug}, + io::Cursor, + net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, +}; +use tokio::io::{self, AsyncRead, AsyncReadExt}; +/// SOCKS5 protocol error + +#[derive(Clone, PartialEq, Eq, Hash)] +pub enum Address { + SocketAddress(SocketAddr), + DomainNameAddress(String, u16), +} + +impl Debug for Address { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result { + match *self { + Address::SocketAddress(ref addr) => write!(f, "{}", addr), + Address::DomainNameAddress(ref addr, ref port) => write!(f, "{} {}", addr, port), + } + } +} +impl fmt::Display for Address { + #[inline] + fn fmt(&self, f: &mut std::fmt::Formatter) -> fmt::Result { + match *self { + Address::SocketAddress(ref addr) => write!(f, "{}", addr), + Address::DomainNameAddress(ref addr, ref port) => write!(f, "{}:{}", addr, port), + } + } +} +impl std::net::ToSocketAddrs for Address { + type Iter = std::vec::IntoIter; + fn to_socket_addrs(&self) -> io::Result> { + match self.clone() { + Address::SocketAddress(addr) => Ok(vec![addr].into_iter()), + Address::DomainNameAddress(addr, port) => (&addr[..], port).to_socket_addrs(), + } + } +} +impl From for Address { + fn from(s: SocketAddr) -> Address { + Address::SocketAddress(s) + } +} +impl From<(String, u16)> for Address { + fn from((dn, port): (String, u16)) -> Address { + Address::DomainNameAddress(dn, port) + } +} +#[inline] +fn get_addr_len(atyp: &Address) -> usize { + match *atyp { + Address::SocketAddress(SocketAddr::V4(..)) => 1 + 4 + 2, + Address::SocketAddress(SocketAddr::V6(..)) => 1 + 8 * 2 + 2, + Address::DomainNameAddress(ref dmname, _) => 1 + 1 + dmname.len() + 2, + } +} + +pub fn host_addr(uri: &hyper::Uri) -> Option
{ + match uri.authority() { + None => None, + Some(authority) => { + // NOTE: Authority may include authentication info (user:password) + // Although it is already deprecated, but some very old application may still depending on it + // + // But ... We won't be compatible with it. :) + + // Check if URI has port + match authority.port_u16() { + Some(port) => { + // Well, it has port! + // 1. Maybe authority is a SocketAddr (127.0.0.1:1234, [::1]:1234) + // 2. Otherwise, it must be a domain name (google.com:443) + + match authority.as_str().parse::() { + Ok(saddr) => Some(Address::from(saddr)), + Err(..) => Some(Address::DomainNameAddress( + authority.host().to_owned(), + port, + )), + } + } + None => { + // Ok, we don't have port + // 1. IPv4 Address 127.0.0.1 + // 2. IPv6 Address: https://tools.ietf.org/html/rfc2732 , [::1] + // 3. Domain name + + // Uses default port + let port = match uri.scheme_str() { + None => 80, // Assume it is http + Some("http") => 80, + Some("https") => 443, + _ => return None, // Not supported + }; + + // RFC2732 indicates that IPv6 address should be wrapped in [ and ] + let authority_str = authority.as_str(); + if authority_str.starts_with('[') && authority_str.ends_with(']') { + // Must be a IPv6 address + let addr = authority_str.trim_start_matches('[').trim_end_matches(']'); + match addr.parse::() { + Ok(a) => Some(Address::from(SocketAddr::new(a, port))), + // Ignore invalid IPv6 address + Err(..) => None, + } + } else { + // Maybe it is a IPv4 address, or a non-standard IPv6 + match authority_str.parse::() { + Ok(a) => Some(Address::from(SocketAddr::new(a, port))), + // Should be a domain name, or a invalid IP address. + // Let DNS deal with it. + Err(..) => { + Some(Address::DomainNameAddress(authority_str.to_owned(), port)) + } + } + } + } + } + } + } +} diff --git a/binaries/geph5-client/src/http_proxy/http_client.rs b/binaries/geph5-client/src/http_proxy/http_client.rs new file mode 100644 index 0000000..9eeca63 --- /dev/null +++ b/binaries/geph5-client/src/http_proxy/http_client.rs @@ -0,0 +1,107 @@ +use anyctx::AnyCtx; +use async_compat::{Compat, CompatExt}; +use futures_util::{future::BoxFuture, FutureExt}; +use hyper::{client::connect::Connection, Uri}; +use pin_project::pin_project; +use std::future::Future; + +use std::pin::Pin; +use std::task::{self, Poll}; + +use crate::{client_inner::open_conn, Config}; + +use super::address::host_addr; + +#[derive(Clone)] +pub struct Connector { + ctx: AnyCtx, +} + +impl Connector { + pub fn new(ctx: AnyCtx) -> Connector { + Connector { ctx } + } +} + +impl hyper::service::Service for Connector { + type Error = std::io::Error; + type Future = SocksConnecting; + type Response = PicomuxConnection; + fn poll_ready(&mut self, _cx: &mut task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, dst: Uri) -> Self::Future { + let ctx = self.ctx.clone(); + SocksConnecting { + fut: async move { + match host_addr(&dst) { + None => { + use std::io::{Error, ErrorKind}; + let err = Error::new(ErrorKind::Other, "URI must be a valid Address"); + Err(err) + } + Some(addr) => open_conn(&ctx, &addr.to_string()) + .await + .map_err(|e| std::io::Error::new(std::io::ErrorKind::ConnectionRefused, e)) + .map(|c| PicomuxConnection(c.compat())), + } + } + .boxed(), + } + } +} +#[pin_project] +pub struct SocksConnecting { + #[pin] + fut: BoxFuture<'static, std::io::Result>, +} + +impl Future for SocksConnecting { + type Output = std::io::Result; + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + self.project().fut.poll(cx) + } +} +pub type CtxClient = hyper::Client; + +pub struct PicomuxConnection(Compat); + +impl Connection for PicomuxConnection { + fn connected(&self) -> hyper::client::connect::Connected { + hyper::client::connect::Connected::new() + } +} + +impl tokio::io::AsyncRead for PicomuxConnection { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_read(cx, buf) + } +} + +impl tokio::io::AsyncWrite for PicomuxConnection { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.0).poll_write(cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_flush(cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.0).poll_shutdown(cx) + } +} diff --git a/binaries/geph5-client/src/http_proxy/mod.rs b/binaries/geph5-client/src/http_proxy/mod.rs new file mode 100644 index 0000000..58b0274 --- /dev/null +++ b/binaries/geph5-client/src/http_proxy/mod.rs @@ -0,0 +1,440 @@ +mod address; +mod http_client; + +use std::{convert::Infallible, net::SocketAddr, str::FromStr as _}; + +pub async fn run_http_proxy(ctx: &AnyCtx) -> anyhow::Result<()> { + let shared_server: SharedProxyServer = ProxyServer::new_shared(ctx.clone()); + let listen = ctx.init().http_proxy_listen; + let make_service = make_service_fn(move |socket: &AddrStream| { + let client_addr = socket.remote_addr(); + let cloned_server = shared_server.clone(); + let ctx = ctx.clone(); + async move { + Ok::<_, Infallible>(service_fn(move |req: Request| { + server_dispatch(req, client_addr, cloned_server.clone(), ctx.clone()) + })) + } + }); + let server = hyper::Server::bind(&listen) + .http1_only(true) + .serve(make_service); + server.await?; + Ok(()) +} +type SharedProxyServer = std::sync::Arc; + +async fn server_dispatch( + mut req: Request, + client_addr: SocketAddr, + proxy_server: SharedProxyServer, + ctx: AnyCtx, +) -> std::io::Result> { + let host = match host_addr(req.uri()) { + None => { + if req.uri().authority().is_some() { + tracing::trace!( + method = %req.method(), + uri = %req.uri(), + "HTTP URI doesn't have a valid host" + ); + return Ok(make_bad_request()); + } else { + tracing::trace!( + method = %req.method(), + uri = %req.uri(), + "HTTP URI doesn't have a valid host" + ); + } + match req.headers().get("Host") { + None => { + return Ok(make_bad_request()); + } + Some(hhost) => match hhost.to_str() { + Err(..) => { + return Ok(make_bad_request()); + } + Ok(shost) => { + match Authority::from_str(shost) { + Ok(authority) => { + match authority_addr(req.uri().scheme_str(), &authority) { + Some(host) => { + tracing::trace!( + method = %req.method(), + uri = %req.uri(), + host = %host, + "HTTP URI got host from header" + ); + + // Reassemble URI + let mut parts = req.uri().clone().into_parts(); + if parts.scheme.is_none() { + // Use http as default. + parts.scheme = Some(Scheme::HTTP); + } + parts.authority = Some(authority); + + // Replaces URI + *req.uri_mut() = + Uri::from_parts(parts).expect("Reassemble URI failed"); + + tracing::trace!( + method = %req.method(), + uri = %req.uri(), + "Reassembled URI from \"Host\"" + ); + + host + } + None => { + tracing::trace!( + method = %req.method(), + uri = %req.uri(), + host_header_value = %shost, + "HTTP \"Host\" header invalid" + ); + + return Ok(make_bad_request()); + } + } + } + Err(..) => { + tracing::trace!( + method = %req.method(), + uri = %req.uri(), + host_header_value = ?hhost, + "HTTP \"Host\" header is not an Authority" + ); + + return Ok(make_bad_request()); + } + } + } + }, + } + } + Some(h) => h, + }; + if Method::CONNECT == req.method() { + tracing::trace!( + method = %req.method(), + client_addr = %client_addr, + host = %host, + "CONNECT relay connected" + ); + tokio::spawn(async move { + match hyper::upgrade::on(req).await { + Ok(upgraded) => { + tracing::trace!( + + client_addr = %client_addr, + host = %host, + "CONNECT tunnel upgrade success" + ); + let stream = open_conn(&ctx, &host.to_string()).await; + if let Ok(stream) = stream { + establish_connect_tunnel(upgraded, stream, client_addr).await + } + } + Err(e) => { + tracing::trace!( + + client_addr = %client_addr, + host = %host, + error = %e, + "Failed to upgrade TCP tunnel" + ); + } + } + }); + let resp = Response::builder().body(Body::empty()).unwrap(); + Ok(resp) + } else { + let method = req.method().clone(); + tracing::trace!(method = %method, host = %host, "HTTP request received"); + let conn_keep_alive = check_keep_alive(req.version(), req.headers(), true); + clear_hop_headers(req.headers_mut()); + set_conn_keep_alive(req.version(), req.headers_mut(), conn_keep_alive); + let mut res: Response = match proxy_server.client.request(req).await { + Ok(res) => res, + Err(err) => { + tracing::trace!( + method = %method, + client_addr = %client_addr, + proxy_addr = "127.0.0.1:1080", + host = %host, + error = %err, + "HTTP relay failed" + ); + let mut resp = Response::new(Body::from(format!("Relay failed to {}", host))); + *resp.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + return Ok(resp); + } + }; + let res_keep_alive = + conn_keep_alive && check_keep_alive(res.version(), res.headers(), false); + clear_hop_headers(res.headers_mut()); + set_conn_keep_alive(res.version(), res.headers_mut(), res_keep_alive); + Ok(res) + } +} +use anyctx::AnyCtx; +use async_compat::{Compat, CompatExt}; +use futures_util::{ + future::{self, Either}, + FutureExt, +}; +use http::{ + uri::{Authority, Scheme}, + HeaderMap, HeaderValue, Method, Uri, Version, +}; +use hyper::{ + server::conn::AddrStream, + service::{make_service_fn, service_fn}, + upgrade::Upgraded, + Body, Request, Response, StatusCode, +}; + +async fn establish_connect_tunnel( + upgraded: Upgraded, + stream: picomux::Stream, + client_addr: SocketAddr, +) { + use tokio::io::{copy, split}; + + let (mut r, mut w) = split(upgraded); + let (mut svr_r, mut svr_w) = split(stream.compat()); + + let rhalf = copy(&mut r, &mut svr_w); + let whalf = copy(&mut svr_r, &mut w); + + tracing::trace!( + client_addr = %client_addr, + "CONNECT relay established" + ); + + match future::select(rhalf.boxed(), whalf.boxed()).await { + Either::Left((Ok(..), _)) => tracing::trace!( + client_addr = %client_addr, + "CONNECT relay closed" + ), + Either::Left((Err(err), _)) => { + tracing::trace!( + client_addr = %client_addr, + error = %err, + "CONNECT relay closed with error" + ); + } + Either::Right((Ok(..), _)) => tracing::trace!( + client_addr = %client_addr, + "CONNECT relay closed" + ), + Either::Right((Err(err), _)) => { + tracing::trace!( + client_addr = %client_addr, + error = %err, + "CONNECT relay closed with error" + ); + } + } + + tracing::trace!( + client_addr = %client_addr, + "CONNECT relay closed" + ); +} + +fn make_bad_request() -> Response { + let mut resp = Response::new(Body::empty()); + *resp.status_mut() = StatusCode::BAD_REQUEST; + resp +} + +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; + +use crate::{client_inner::open_conn, Config}; + +use self::address::{host_addr, Address}; +fn authority_addr(scheme_str: Option<&str>, authority: &Authority) -> Option
{ + // RFC7230 indicates that we should ignore userinfo + // https://tools.ietf.org/html/rfc7230#section-5.3.3 + + // Check if URI has port + let port = match authority.port_u16() { + Some(port) => port, + None => { + match scheme_str { + None => 80, // Assume it is http + Some("http") => 80, + Some("https") => 443, + _ => return None, // Not supported + } + } + }; + + let host_str = authority.host(); + + // RFC3986 indicates that IPv6 address should be wrapped in [ and ] + // https://tools.ietf.org/html/rfc3986#section-3.2.2 + // + // Example: [::1] without port + if host_str.starts_with('[') && host_str.ends_with(']') { + // Must be a IPv6 address + let addr = &host_str[1..host_str.len() - 1]; + match addr.parse::() { + Ok(a) => Some(Address::from(SocketAddr::new(IpAddr::V6(a), port))), + // Ignore invalid IPv6 address + Err(..) => None, + } + } else { + // It must be a IPv4 address + match host_str.parse::() { + Ok(a) => Some(Address::from(SocketAddr::new(IpAddr::V4(a), port))), + // Should be a domain name, or a invalid IP address. + // Let DNS deal with it. + Err(..) => Some(Address::DomainNameAddress(host_str.to_owned(), port)), + } + } +} + +fn check_keep_alive(version: Version, headers: &HeaderMap, check_proxy: bool) -> bool { + let mut conn_keep_alive = match version { + Version::HTTP_10 => false, + Version::HTTP_11 => true, + _ => unimplemented!("HTTP Proxy only supports 1.0 and 1.1"), + }; + + if check_proxy { + // Modern browers will send Proxy-Connection instead of Connection + // for HTTP/1.0 proxies which blindly forward Connection to remote + // + // https://tools.ietf.org/html/rfc7230#appendix-A.1.2 + for value in headers.get_all("Proxy-Connection") { + if let Ok(value) = value.to_str() { + if value.eq_ignore_ascii_case("close") { + conn_keep_alive = false; + } else { + for part in value.split(',') { + let part = part.trim(); + if part.eq_ignore_ascii_case("keep-alive") { + conn_keep_alive = true; + break; + } + } + } + } + } + } + + // Connection will replace Proxy-Connection + // + // But why client sent both Connection and Proxy-Connection? That's not standard! + for value in headers.get_all("Connection") { + if let Ok(value) = value.to_str() { + if value.eq_ignore_ascii_case("close") { + conn_keep_alive = false; + } else { + for part in value.split(',') { + let part = part.trim(); + + if part.eq_ignore_ascii_case("keep-alive") { + conn_keep_alive = true; + break; + } + } + } + } + } + + conn_keep_alive +} + +fn clear_hop_headers(headers: &mut HeaderMap) { + // Clear headers indicated by Connection and Proxy-Connection + let mut extra_headers = Vec::new(); + + for connection in headers.get_all("Connection") { + if let Ok(conn) = connection.to_str() { + if !conn.eq_ignore_ascii_case("close") { + for header in conn.split(',') { + let header = header.trim(); + + if !header.eq_ignore_ascii_case("keep-alive") { + extra_headers.push(header.to_owned()); + } + } + } + } + } + + for connection in headers.get_all("Proxy-Connection") { + if let Ok(conn) = connection.to_str() { + if !conn.eq_ignore_ascii_case("close") { + for header in conn.split(',') { + let header = header.trim(); + + if !header.eq_ignore_ascii_case("keep-alive") { + extra_headers.push(header.to_owned()); + } + } + } + } + } + + for header in extra_headers { + while let Some(..) = headers.remove(&header) {} + } + + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection + const HOP_BY_HOP_HEADERS: [&str; 9] = [ + "Keep-Alive", + "Transfer-Encoding", + "TE", + "Connection", + "Trailer", + "Upgrade", + "Proxy-Authorization", + "Proxy-Authenticate", + "Proxy-Connection", // Not standard, but many implementations do send this header + ]; + + for header in &HOP_BY_HOP_HEADERS { + while let Some(..) = headers.remove(*header) {} + } +} + +fn set_conn_keep_alive(version: Version, headers: &mut HeaderMap, keep_alive: bool) { + match version { + Version::HTTP_10 => { + // HTTP/1.0 close connection by default + if keep_alive { + headers.insert("Connection", HeaderValue::from_static("keep-alive")); + } + } + Version::HTTP_11 => { + // HTTP/1.1 keep-alive connection by default + if !keep_alive { + headers.insert("Connection", HeaderValue::from_static("close")); + } + } + _ => unimplemented!("HTTP Proxy only supports 1.0 and 1.1"), + } +} + +#[derive(Clone)] +pub struct ProxyServer { + client: http_client::CtxClient, +} + +impl ProxyServer { + fn new(ctx: AnyCtx) -> ProxyServer { + let connector = http_client::Connector::new(ctx); + let proxy_client: http_client::CtxClient = hyper::Client::builder().build(connector); + ProxyServer { + client: proxy_client, + } + } + fn new_shared(ctx: AnyCtx) -> SharedProxyServer { + std::sync::Arc::new(ProxyServer::new(ctx)) + } +} diff --git a/binaries/geph5-client/src/lib.rs b/binaries/geph5-client/src/lib.rs index bbafc65..74dd055 100644 --- a/binaries/geph5-client/src/lib.rs +++ b/binaries/geph5-client/src/lib.rs @@ -1,12 +1,14 @@ pub use broker::broker_client; pub use client::Client; pub use client::Config; +use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; mod auth; mod broker; mod client; mod client_inner; mod database; +mod http_proxy; mod route; mod socks5; mod stats; diff --git a/binaries/geph5-client/src/socks5.rs b/binaries/geph5-client/src/socks5.rs index 2840a9d..02a311e 100644 --- a/binaries/geph5-client/src/socks5.rs +++ b/binaries/geph5-client/src/socks5.rs @@ -50,38 +50,12 @@ pub async fn socks5_loop(ctx: &AnyCtx) -> anyhow::Result<()> { .await?; tracing::trace!(remote_addr = display(&remote_addr), "connection opened"); let (read_stream, write_stream) = stream.split(); - io_copy_with(read_stream, write_client, |n| { - ctx.get(STAT_TOTAL_BYTES) - .fetch_add(n as u64, Ordering::Relaxed); - }) - .race(io_copy_with(read_client, write_stream, |n| { - ctx.get(STAT_TOTAL_BYTES) - .fetch_add(n as u64, Ordering::Relaxed); - })) - .await?; + smol::io::copy(read_stream, write_client) + .race(smol::io::copy(read_client, write_stream)) + .await?; anyhow::Ok(()) }) .detach(); } }) } - -async fn io_copy_with( - mut read: impl AsyncRead + Unpin, - mut write: impl AsyncWrite + Unpin, - mut on_copy: impl FnMut(usize), -) -> anyhow::Result<()> { - let mut buffer = vec![0; 8192]; // Buffer size can be adjusted based on expected data sizes or performance testing - - loop { - let bytes_read = read.read(&mut buffer).await?; - - if bytes_read == 0 { - break; // End of input stream - } - - write.write_all(&buffer[..bytes_read]).await?; - on_copy(bytes_read); // Invoke the callback - } - Ok(()) -} diff --git a/libraries/picomux/Cargo.toml b/libraries/picomux/Cargo.toml index 28d86a1..6c07fd3 100644 --- a/libraries/picomux/Cargo.toml +++ b/libraries/picomux/Cargo.toml @@ -40,6 +40,7 @@ async-io = "2.3.1" sillad={version="0.1", path="../sillad"} futures-intrusive = "0.5.0" async-channel = "2.2.0" +pin-project = "1.1.5" [dev-dependencies] argh = "0.1" diff --git a/libraries/picomux/src/lib.rs b/libraries/picomux/src/lib.rs index 345fbc1..b8982ce 100644 --- a/libraries/picomux/src/lib.rs +++ b/libraries/picomux/src/lib.rs @@ -31,6 +31,7 @@ use futures_util::{ use async_io::Timer; use parking_lot::Mutex; +use pin_project::pin_project; use rand::Rng; use smol_timeout::TimeoutExt; use tachyonix::{Receiver, Sender, TrySendError}; @@ -209,6 +210,8 @@ async fn picomux_inner( write_outgoing, read_incoming, metadata, + on_write: Box::new(|_| {}), + on_read: Box::new(|_| {}), }; let send_more = SharedSemaphore::new(false, INIT_WINDOW); @@ -502,10 +505,15 @@ async fn picomux_inner( .await } +#[pin_project] pub struct Stream { + #[pin] read_incoming: bipe::BipeReader, + #[pin] write_outgoing: bipe::BipeWriter, metadata: Bytes, + on_write: Box, + on_read: Box, } impl Debug for Stream { @@ -519,14 +527,12 @@ impl Stream { &self.metadata } - fn pin_project_read(self: std::pin::Pin<&mut Self>) -> Pin<&mut bipe::BipeReader> { - // SAFETY: this is a safe pin-projection, since we never get a &mut sosistab2::Stream from a Pin<&mut Stream> elsewhere. - // Safety requires that we either consistently lose Pin or keep it. - // We could use the "pin_project" crate but I'm too lazy. - unsafe { self.map_unchecked_mut(|s| &mut s.read_incoming) } + pub fn set_on_write(&mut self, on_write: impl Fn(usize) + Send + Sync + 'static) { + self.on_write = Box::new(on_write); } - fn pin_project_write(self: std::pin::Pin<&mut Self>) -> Pin<&mut bipe::BipeWriter> { - unsafe { self.map_unchecked_mut(|s| &mut s.write_outgoing) } + + pub fn set_on_read(&mut self, on_read: impl Fn(usize) + Send + Sync + 'static) { + self.on_read = Box::new(on_read); } } @@ -540,7 +546,12 @@ impl AsyncRead for Stream { cx.waker().wake_by_ref(); Poll::Pending } else { - self.pin_project_read().poll_read(cx, buf) + let this = self.project(); + let r = this.read_incoming.poll_read(cx, buf); + if r.is_ready() { + (this.on_read)(buf.len()); + } + r } } } @@ -557,7 +568,12 @@ impl AsyncWrite for Stream { cx.waker().wake_by_ref(); Poll::Pending } else { - self.pin_project_write().poll_write(cx, buf) + let this = self.project(); + let r = this.write_outgoing.poll_write(cx, buf); + if r.is_ready() { + (this.on_write)(buf.len()); + } + r } } @@ -565,14 +581,14 @@ impl AsyncWrite for Stream { self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.pin_project_write().poll_flush(cx) + self.project().write_outgoing.poll_flush(cx) } fn poll_close( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll> { - self.pin_project_write().poll_close(cx) + self.project().write_outgoing.poll_close(cx) } }