From f94d31a75bcfc0228dee7835cb8c1a992cf29f1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Tue, 6 Feb 2024 11:06:49 +0100 Subject: [PATCH 1/3] Add SOCKS5 server to test-manager --- test/Cargo.lock | 15 ++++++++++++ test/test-manager/Cargo.toml | 1 + test/test-manager/src/main.rs | 31 +++++++++++++++++++++++++ test/test-manager/src/vm/network/mod.rs | 3 +++ 4 files changed, 50 insertions(+) diff --git a/test/Cargo.lock b/test/Cargo.lock index f48fec100d0e..725d56952ef2 100644 --- a/test/Cargo.lock +++ b/test/Cargo.lock @@ -827,6 +827,20 @@ dependencies = [ "libc", ] +[[package]] +name = "fast-socks5" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbcc731f3c17a5053e07e6a2290918da75cd8b9b1217b419721f715674ac520c" +dependencies = [ + "anyhow", + "async-trait", + "log", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -3095,6 +3109,7 @@ dependencies = [ "dirs", "env_logger", "err-derive", + "fast-socks5", "futures", "inventory", "ipnetwork 0.20.0", diff --git a/test/test-manager/Cargo.toml b/test/test-manager/Cargo.toml index 53c1c355f16f..f7a57fab1cad 100644 --- a/test/test-manager/Cargo.toml +++ b/test/test-manager/Cargo.toml @@ -11,6 +11,7 @@ rust-version.workspace = true workspace = true [dependencies] +fast-socks5 = "0.9.5" anyhow = { version = "1", features = ["backtrace"] } futures = { workspace = true } regex = "1" diff --git a/test/test-manager/src/main.rs b/test/test-manager/src/main.rs index f5e7a70d9f8f..a06b2f025751 100644 --- a/test/test-manager/src/main.rs +++ b/test/test-manager/src/main.rs @@ -14,6 +14,8 @@ use std::path::PathBuf; use anyhow::Context; use anyhow::Result; use clap::Parser; +use futures::StreamExt; +use std::net::SocketAddr; use tests::config::DEFAULT_MULLVAD_HOST; /// Test manager for Mullvad VPN app @@ -248,6 +250,34 @@ async fn main() -> Result<()> { .await .context("Failed to run provisioning for VM")?; + let socks_server: fast_socks5::server::Socks5Server = + fast_socks5::server::Socks5Server::bind(SocketAddr::new( + crate::vm::network::NON_TUN_GATEWAY.into(), + crate::vm::network::SOCKS5_PORT, + )) + .await + .context("Failed to start SOCKS5 server")?; + let socks_server = tokio::spawn(async move { + let mut incoming = socks_server.incoming(); + + while let Some(new_client) = incoming.next().await { + match new_client { + Ok(socket) => { + let fut = socket.upgrade_to_socks5(); + tokio::spawn(async move { + match fut.await { + Ok(_socket) => log::info!("socks client disconnected"), + Err(error) => log::error!("socks client failed: {error}"), + } + }); + } + Err(error) => { + log::error!("failed to accept socks client: {error}"); + } + } + } + }); + let skip_wait = vm_config.provisioner != config::Provisioner::Noop; let result = run_tests::run( @@ -291,6 +321,7 @@ async fn main() -> Result<()> { if display { instance.wait().await; } + socks_server.abort(); result } Commands::FormatTestReports { reports } => { diff --git a/test/test-manager/src/vm/network/mod.rs b/test/test-manager/src/vm/network/mod.rs index e12a95c713f0..944e24101302 100644 --- a/test/test-manager/src/vm/network/mod.rs +++ b/test/test-manager/src/vm/network/mod.rs @@ -15,3 +15,6 @@ pub use platform::{ CUSTOM_TUN_REMOTE_REAL_PORT, CUSTOM_TUN_REMOTE_TUN_ADDR, DUMMY_LAN_INTERFACE_IP, NON_TUN_GATEWAY, }; + +/// Port on NON_TUN_GATEWAY that hosts a SOCKS5 server +pub const SOCKS5_PORT: u16 = 54321; From 0bfaddec42d4a2db516d99bc572b27e394427aa8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Tue, 6 Feb 2024 14:22:59 +0100 Subject: [PATCH 2/3] Move testing SOCKS server to own crate --- test/Cargo.lock | 13 +++++++++++- test/Cargo.toml | 1 + test/socks-server/Cargo.toml | 18 +++++++++++++++++ test/socks-server/src/lib.rs | 38 +++++++++++++++++++++++++++++++++++ test/test-manager/Cargo.toml | 2 +- test/test-manager/src/main.rs | 36 +++++++-------------------------- 6 files changed, 77 insertions(+), 31 deletions(-) create mode 100644 test/socks-server/Cargo.toml create mode 100644 test/socks-server/src/lib.rs diff --git a/test/Cargo.lock b/test/Cargo.lock index 725d56952ef2..64d355254034 100644 --- a/test/Cargo.lock +++ b/test/Cargo.lock @@ -2889,6 +2889,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "socks-server" +version = "0.0.0" +dependencies = [ + "err-derive", + "fast-socks5", + "futures", + "log", + "tokio", +] + [[package]] name = "spin" version = "0.5.2" @@ -3109,7 +3120,6 @@ dependencies = [ "dirs", "env_logger", "err-derive", - "fast-socks5", "futures", "inventory", "ipnetwork 0.20.0", @@ -3126,6 +3136,7 @@ dependencies = [ "regex", "serde", "serde_json", + "socks-server", "ssh2", "talpid-types", "tarpc", diff --git a/test/Cargo.toml b/test/Cargo.toml index a6653530b5ce..27c0f94469fc 100644 --- a/test/Cargo.toml +++ b/test/Cargo.toml @@ -11,6 +11,7 @@ members = [ "test-manager", "test-runner", "test-rpc", + "socks-server", ] [workspace.lints.rust] diff --git a/test/socks-server/Cargo.toml b/test/socks-server/Cargo.toml new file mode 100644 index 000000000000..ba6d1ba4f860 --- /dev/null +++ b/test/socks-server/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "socks-server" +description = "Contains a simple SOCKS5 server" +authors.workspace = true +repository.workspace = true +license.workspace = true +edition.workspace = true +rust-version.workspace = true + +[lints] +workspace = true + +[dependencies] +fast-socks5 = "0.9.5" +err-derive = { workspace = true } +tokio = { workspace = true } +log = { workspace = true } +futures = { workspace = true } diff --git a/test/socks-server/src/lib.rs b/test/socks-server/src/lib.rs new file mode 100644 index 000000000000..19638efe390d --- /dev/null +++ b/test/socks-server/src/lib.rs @@ -0,0 +1,38 @@ +use futures::StreamExt; +use std::io; +use std::net::SocketAddr; + +#[derive(err_derive::Error, Debug)] +pub enum Error { + #[error(display = "Failed to start SOCKS5 server")] + StartSocksServer(#[error(source)] io::Error), +} + +pub async fn spawn(bind_addr: SocketAddr) -> Result, Error> { + let socks_server: fast_socks5::server::Socks5Server = + fast_socks5::server::Socks5Server::bind(bind_addr) + .await + .map_err(Error::StartSocksServer)?; + + let handle = tokio::spawn(async move { + let mut incoming = socks_server.incoming(); + + while let Some(new_client) = incoming.next().await { + match new_client { + Ok(socket) => { + let fut = socket.upgrade_to_socks5(); + tokio::spawn(async move { + match fut.await { + Ok(_socket) => log::info!("socks client disconnected"), + Err(error) => log::error!("socks client failed: {error}"), + } + }); + } + Err(error) => { + log::error!("failed to accept socks client: {error}"); + } + } + } + }); + Ok(handle) +} diff --git a/test/test-manager/Cargo.toml b/test/test-manager/Cargo.toml index f7a57fab1cad..72230a2eddbe 100644 --- a/test/test-manager/Cargo.toml +++ b/test/test-manager/Cargo.toml @@ -11,7 +11,6 @@ rust-version.workspace = true workspace = true [dependencies] -fast-socks5 = "0.9.5" anyhow = { version = "1", features = ["backtrace"] } futures = { workspace = true } regex = "1" @@ -43,6 +42,7 @@ pcap = { version = "0.10.1", features = ["capture-stream"] } pnet_packet = "0.31.0" test-rpc = { path = "../test-rpc" } +socks-server = { path = "../socks-server" } env_logger = { workspace = true } diff --git a/test/test-manager/src/main.rs b/test/test-manager/src/main.rs index a06b2f025751..d09dab0f53ab 100644 --- a/test/test-manager/src/main.rs +++ b/test/test-manager/src/main.rs @@ -14,7 +14,6 @@ use std::path::PathBuf; use anyhow::Context; use anyhow::Result; use clap::Parser; -use futures::StreamExt; use std::net::SocketAddr; use tests::config::DEFAULT_MULLVAD_HOST; @@ -250,33 +249,12 @@ async fn main() -> Result<()> { .await .context("Failed to run provisioning for VM")?; - let socks_server: fast_socks5::server::Socks5Server = - fast_socks5::server::Socks5Server::bind(SocketAddr::new( - crate::vm::network::NON_TUN_GATEWAY.into(), - crate::vm::network::SOCKS5_PORT, - )) - .await - .context("Failed to start SOCKS5 server")?; - let socks_server = tokio::spawn(async move { - let mut incoming = socks_server.incoming(); - - while let Some(new_client) = incoming.next().await { - match new_client { - Ok(socket) => { - let fut = socket.upgrade_to_socks5(); - tokio::spawn(async move { - match fut.await { - Ok(_socket) => log::info!("socks client disconnected"), - Err(error) => log::error!("socks client failed: {error}"), - } - }); - } - Err(error) => { - log::error!("failed to accept socks client: {error}"); - } - } - } - }); + // For convenience, spawn a SOCKS5 server that is reachable for tests that need it + let socks = socks_server::spawn(SocketAddr::new( + crate::vm::network::NON_TUN_GATEWAY.into(), + crate::vm::network::SOCKS5_PORT, + )) + .await?; let skip_wait = vm_config.provisioner != config::Provisioner::Noop; @@ -321,7 +299,7 @@ async fn main() -> Result<()> { if display { instance.wait().await; } - socks_server.abort(); + socks.abort(); result } Commands::FormatTestReports { reports } => { From eed7234599253f3d742be8bb4b6b1ecbf1299dc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?David=20L=C3=B6nnhager?= Date: Tue, 6 Feb 2024 17:33:10 +0100 Subject: [PATCH 3/3] Add RPCs for running TCP forwarder on test runner --- test/socks-server/src/lib.rs | 24 +++++- test/test-manager/src/main.rs | 2 +- test/test-rpc/src/client.rs | 10 +++ test/test-rpc/src/lib.rs | 12 +++ test/test-rpc/src/net.rs | 57 +++++++++++++- test/test-runner/src/forward.rs | 127 ++++++++++++++++++++++++++++++++ test/test-runner/src/main.rs | 19 +++++ 7 files changed, 247 insertions(+), 4 deletions(-) create mode 100644 test/test-runner/src/forward.rs diff --git a/test/socks-server/src/lib.rs b/test/socks-server/src/lib.rs index 19638efe390d..eed676ac8ec8 100644 --- a/test/socks-server/src/lib.rs +++ b/test/socks-server/src/lib.rs @@ -3,12 +3,18 @@ use std::io; use std::net::SocketAddr; #[derive(err_derive::Error, Debug)] +#[error(no_from)] pub enum Error { #[error(display = "Failed to start SOCKS5 server")] StartSocksServer(#[error(source)] io::Error), } -pub async fn spawn(bind_addr: SocketAddr) -> Result, Error> { +pub struct Handle { + handle: tokio::task::JoinHandle<()>, +} + +/// Spawn a SOCKS server bound to `bind_addr` +pub async fn spawn(bind_addr: SocketAddr) -> Result { let socks_server: fast_socks5::server::Socks5Server = fast_socks5::server::Socks5Server::bind(bind_addr) .await @@ -21,6 +27,8 @@ pub async fn spawn(bind_addr: SocketAddr) -> Result, match new_client { Ok(socket) => { let fut = socket.upgrade_to_socks5(); + + // Act as normal SOCKS server tokio::spawn(async move { match fut.await { Ok(_socket) => log::info!("socks client disconnected"), @@ -34,5 +42,17 @@ pub async fn spawn(bind_addr: SocketAddr) -> Result, } } }); - Ok(handle) + Ok(Handle { handle }) +} + +impl Handle { + pub fn close(&self) { + self.handle.abort(); + } +} + +impl Drop for Handle { + fn drop(&mut self) { + self.close(); + } } diff --git a/test/test-manager/src/main.rs b/test/test-manager/src/main.rs index d09dab0f53ab..f81bf775947d 100644 --- a/test/test-manager/src/main.rs +++ b/test/test-manager/src/main.rs @@ -299,7 +299,7 @@ async fn main() -> Result<()> { if display { instance.wait().await; } - socks.abort(); + socks.close(); result } Commands::FormatTestReports { reports } => { diff --git a/test/test-rpc/src/client.rs b/test/test-rpc/src/client.rs index 2c47328e00f9..4d103ed44ec5 100644 --- a/test/test-rpc/src/client.rs +++ b/test/test-rpc/src/client.rs @@ -213,6 +213,16 @@ impl ServiceClient { .await? } + /// Start forwarding TCP from a server listening on `bind_addr` to the given address, and return a handle that closes the + /// server when dropped + pub async fn start_tcp_forward( + &self, + bind_addr: SocketAddr, + via_addr: SocketAddr, + ) -> Result { + crate::net::SockHandle::start_tcp_forward(self.client.clone(), bind_addr, via_addr).await + } + /// Restarts the app. /// /// Shuts down a running app, making it disconnect from any current tunnel diff --git a/test/test-rpc/src/lib.rs b/test/test-rpc/src/lib.rs index 5919a894d12c..d2bee40dbb5e 100644 --- a/test/test-rpc/src/lib.rs +++ b/test/test-rpc/src/lib.rs @@ -53,6 +53,8 @@ pub enum Error { InvalidUrl, #[error(display = "Timeout")] Timeout, + #[error(display = "TCP forward error")] + TcpForward, } /// Response from am.i.mullvad.net @@ -148,6 +150,16 @@ mod service { /// Perform DNS resolution. async fn resolve_hostname(hostname: String) -> Result, Error>; + /// Start forwarding TCP bound to the given address. Return an ID that can be used with + /// `stop_tcp_forward`, and the address that the listening socket was actually bound to. + async fn start_tcp_forward( + bind_addr: SocketAddr, + via_addr: SocketAddr, + ) -> Result<(net::SockHandleId, SocketAddr), Error>; + + /// Stop forwarding TCP that was previously started with `start_tcp_forward`. + async fn stop_tcp_forward(id: net::SockHandleId) -> Result<(), Error>; + /// Restart the Mullvad VPN application. async fn restart_mullvad_daemon() -> Result<(), Error>; diff --git a/test/test-rpc/src/net.rs b/test/test-rpc/src/net.rs index b4e114ea47d1..77aa5c938abf 100644 --- a/test/test-rpc/src/net.rs +++ b/test/test-rpc/src/net.rs @@ -1,6 +1,8 @@ +use futures::channel::oneshot; use hyper::{Client, Uri}; use once_cell::sync::Lazy; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use std::net::SocketAddr; use tokio_rustls::rustls::ClientConfig; use crate::{AmIMullvad, Error}; @@ -17,6 +19,59 @@ static CLIENT_CONFIG: Lazy = Lazy::new(|| { .with_no_client_auth() }); +#[derive(Debug, Serialize, Deserialize, Clone, Copy, Hash, PartialEq, Eq)] +pub struct SockHandleId(pub usize); + +pub struct SockHandle { + stop_tx: Option>, + bind_addr: SocketAddr, +} + +impl SockHandle { + pub(crate) async fn start_tcp_forward( + client: crate::service::ServiceClient, + bind_addr: SocketAddr, + via_addr: SocketAddr, + ) -> Result { + let (stop_tx, stop_rx) = oneshot::channel(); + + let (id, bind_addr) = client + .start_tcp_forward(tarpc::context::current(), bind_addr, via_addr) + .await??; + + tokio::spawn(async move { + let _ = stop_rx.await; + + log::trace!("Stopping TCP forward"); + + if let Err(error) = client.stop_tcp_forward(tarpc::context::current(), id).await { + log::error!("Failed to stop TCP forward: {error}"); + } + }); + + Ok(SockHandle { + stop_tx: Some(stop_tx), + bind_addr, + }) + } + + pub fn stop(&mut self) { + if let Some(stop_tx) = self.stop_tx.take() { + let _ = stop_tx.send(()); + } + } + + pub fn bind_addr(&self) -> SocketAddr { + self.bind_addr + } +} + +impl Drop for SockHandle { + fn drop(&mut self) { + self.stop() + } +} + pub async fn geoip_lookup(mullvad_host: String) -> Result { let uri = Uri::try_from(format!("https://ipv4.am.i.{mullvad_host}/json")) .map_err(|_| Error::InvalidUrl)?; diff --git a/test/test-runner/src/forward.rs b/test/test-runner/src/forward.rs new file mode 100644 index 000000000000..ec9e8a98f10b --- /dev/null +++ b/test/test-runner/src/forward.rs @@ -0,0 +1,127 @@ +use once_cell::sync::Lazy; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use test_rpc::net::SockHandleId; +use tokio::net::TcpListener; +use tokio::net::TcpStream; + +static SERVERS: Lazy>> = + Lazy::new(|| Mutex::new(HashMap::new())); + +/// Spawn a TCP forwarder that sends TCP via `via_addr` +pub async fn start_server( + bind_addr: SocketAddr, + via_addr: SocketAddr, +) -> Result<(SockHandleId, SocketAddr), test_rpc::Error> { + let next_nonce = { + static NONCE: AtomicUsize = AtomicUsize::new(0); + || NONCE.fetch_add(1, Ordering::Relaxed) + }; + let id = SockHandleId(next_nonce()); + + let handle = tcp_forward(bind_addr, via_addr).await.map_err(|error| { + log::error!("Failed to start TCP forwarder listener: {error}"); + test_rpc::Error::TcpForward + })?; + + let bind_addr = handle.local_addr(); + + let mut servers = SERVERS.lock().unwrap(); + servers.insert(id, handle); + + Ok((id, bind_addr)) +} + +/// Stop TCP forwarder given some ID returned by `start_server` +pub fn stop_server(id: SockHandleId) -> Result<(), test_rpc::Error> { + let handle = { + let mut servers = SERVERS.lock().unwrap(); + servers.remove(&id) + }; + + if let Some(handle) = handle { + handle.close(); + } + Ok(()) +} + +struct Handle { + handle: tokio::task::JoinHandle<()>, + bind_addr: SocketAddr, + clients: Arc>>>, +} + +impl Handle { + pub fn close(&self) { + self.handle.abort(); + + let mut clients = self.clients.lock().unwrap(); + for client in clients.drain(..) { + client.abort(); + } + } + + pub fn local_addr(&self) -> SocketAddr { + self.bind_addr + } +} + +impl Drop for Handle { + fn drop(&mut self) { + self.close(); + } +} + +/// Forward TCP traffic via `proxy_addr` +async fn tcp_forward( + bind_addr: SocketAddr, + proxy_addr: SocketAddr, +) -> Result { + let listener = TcpListener::bind(&bind_addr).await.map_err(|error| { + log::error!("Failed to bind TCP forward socket: {error}"); + test_rpc::Error::TcpForward + })?; + let bind_addr = listener.local_addr().map_err(|error| { + log::error!("Failed to get TCP socket addr: {error}"); + test_rpc::Error::TcpForward + })?; + + let clients = Arc::new(Mutex::new(vec![])); + + let clients_copy = clients.clone(); + + let handle = tokio::spawn(async move { + loop { + match listener.accept().await { + Ok((mut client, _addr)) => { + let client_handle = tokio::spawn(async move { + let mut proxy = match TcpStream::connect(proxy_addr).await { + Ok(proxy) => proxy, + Err(error) => { + log::error!("failed to connect to TCP proxy: {error}"); + return; + } + }; + + if let Err(error) = + tokio::io::copy_bidirectional(&mut client, &mut proxy).await + { + log::error!("copy_directional failed: {error}"); + } + }); + clients_copy.lock().unwrap().push(client_handle); + } + Err(error) => { + log::error!("failed to accept TCP client: {error}"); + } + } + } + }); + Ok(Handle { + handle, + bind_addr, + clients, + }) +} diff --git a/test/test-runner/src/main.rs b/test/test-runner/src/main.rs index 1c2c301b27c2..74f7761cc20d 100644 --- a/test/test-runner/src/main.rs +++ b/test/test-runner/src/main.rs @@ -10,6 +10,7 @@ use tarpc::context; use tarpc::server::Channel; use test_rpc::{ mullvad_daemon::{ServiceStatus, SOCKET_PATH}, + net::SockHandleId, package::Package, transport::GrpcForwarder, AppTrace, Service, @@ -22,6 +23,7 @@ use tokio::{ use tokio_util::codec::{Decoder, LengthDelimitedCodec}; mod app; +mod forward; mod logging; mod net; mod package; @@ -167,6 +169,23 @@ impl Service for TestServer { .collect()) } + async fn start_tcp_forward( + self, + _: context::Context, + bind_addr: SocketAddr, + via_addr: SocketAddr, + ) -> Result<(SockHandleId, SocketAddr), test_rpc::Error> { + forward::start_server(bind_addr, via_addr).await + } + + async fn stop_tcp_forward( + self, + _: context::Context, + id: SockHandleId, + ) -> Result<(), test_rpc::Error> { + forward::stop_server(id) + } + async fn get_interface_ip( self, _: context::Context,