diff --git a/mullvad-management-interface/src/client.rs b/mullvad-management-interface/src/client.rs index 0020ca696989..fdf33ab66548 100644 --- a/mullvad-management-interface/src/client.rs +++ b/mullvad-management-interface/src/client.rs @@ -631,7 +631,6 @@ impl MullvadProxyClient { Ok(()) } - //#[cfg(target_os = "windows")] pub async fn add_split_tunnel_app>(&mut self, path: P) -> Result<()> { let path = path.as_ref().to_str().ok_or(Error::PathMustBeUtf8)?; self.0 @@ -641,7 +640,6 @@ impl MullvadProxyClient { Ok(()) } - //#[cfg(target_os = "windows")] pub async fn remove_split_tunnel_app>(&mut self, path: P) -> Result<()> { let path = path.as_ref().to_str().ok_or(Error::PathMustBeUtf8)?; self.0 @@ -651,7 +649,6 @@ impl MullvadProxyClient { Ok(()) } - //#[cfg(target_os = "windows")] pub async fn clear_split_tunnel_apps(&mut self) -> Result<()> { self.0 .clear_split_tunnel_apps(()) @@ -660,7 +657,6 @@ impl MullvadProxyClient { Ok(()) } - //#[cfg(target_os = "windows")] pub async fn set_split_tunnel_state(&mut self, state: bool) -> Result<()> { self.0 .set_split_tunnel_state(state) diff --git a/test/Cargo.lock b/test/Cargo.lock index 09ec93e782c4..be5449a75fdd 100644 --- a/test/Cargo.lock +++ b/test/Cargo.lock @@ -65,10 +65,13 @@ dependencies = [ name = "am-i-mullvad" version = "0.0.0" dependencies = [ + "clap", "color-eyre", "eyre", + "ping", "reqwest", "serde", + "socket2 0.5.3", ] [[package]] @@ -1393,7 +1396,7 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b58db92f96b720de98181bbbe63c831e87005ab460c1bf306eb2622b4707997f" dependencies = [ - "socket2 0.5.4", + "socket2 0.5.3", "widestring", "windows-sys 0.48.0", "winreg", @@ -2157,6 +2160,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "ping" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "122ee1f5a6843bec84fcbd5c6ba3622115337a6b8965b93a61aad347648f4e8d" +dependencies = [ + "rand 0.8.5", + "socket2 0.4.9", + "thiserror", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -2914,7 +2928,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "shadowsocks-crypto", - "socket2 0.5.4", + "socket2 0.5.3", "spin 0.9.8", "thiserror", "tokio", @@ -2997,9 +3011,9 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "2538b18701741680e0322a2302176d3253a35388e2e62f172f64f4f16605f877" dependencies = [ "libc", "windows-sys 0.48.0", @@ -3081,7 +3095,7 @@ dependencies = [ "parking_lot 0.12.1", "pnet_packet 0.33.0", "rand 0.8.5", - "socket2 0.5.4", + "socket2 0.5.3", "thiserror", "tokio", "tracing", @@ -3172,6 +3186,7 @@ dependencies = [ "base64 0.13.1", "ipnetwork 0.16.0", "jnix", + "log", "serde", "thiserror", "x25519-dalek", @@ -3183,7 +3198,7 @@ name = "talpid-windows" version = "0.0.0" dependencies = [ "futures", - "socket2 0.5.4", + "socket2 0.5.3", "talpid-types", "thiserror", "windows-sys 0.48.0", @@ -3336,7 +3351,7 @@ dependencies = [ "rs-release", "serde", "serde_json", - "socket2 0.5.4", + "socket2 0.5.3", "surge-ping", "talpid-platform-metadata", "talpid-windows", @@ -3449,7 +3464,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.4", + "socket2 0.5.3", "tokio-macros", "windows-sys 0.48.0", ] @@ -3548,7 +3563,7 @@ dependencies = [ "log", "once_cell", "pin-project", - "socket2 0.5.4", + "socket2 0.5.3", "tokio", "windows-sys 0.48.0", ] diff --git a/test/am-i-mullvad/Cargo.toml b/test/am-i-mullvad/Cargo.toml index c3bda1b1cbc7..760c2a219886 100644 --- a/test/am-i-mullvad/Cargo.toml +++ b/test/am-i-mullvad/Cargo.toml @@ -11,7 +11,10 @@ rust-version.workspace = true workspace = true [dependencies] +clap = { workspace = true, features = ["derive"] } color-eyre = "0.6.2" eyre = "0.6.12" +ping = "0.5.2" reqwest = { version = "0.11.24", default-features = false, features = ["blocking", "rustls-tls", "json"] } serde = { version = "1.0.197", features = ["derive"] } +socket2 = { version = "0.5.3", features = ["all"] } diff --git a/test/am-i-mullvad/src/cli.rs b/test/am-i-mullvad/src/cli.rs new file mode 100644 index 000000000000..a92288573791 --- /dev/null +++ b/test/am-i-mullvad/src/cli.rs @@ -0,0 +1,32 @@ +use std::net::SocketAddr; + +use clap::Parser; + +/// CLI tool that queries to check if the machine is connected to +/// Mullvad VPN. +#[derive(Parser)] +pub struct Opt { + /// Interactive mode, press enter to check if you are Mullvad. + #[clap(short, long)] + pub interactive: bool, + + /// Timeout for network connections (in millis). + #[clap(short, long, default_value = "2000")] + pub timeout: u64, + + /// Try to send some junk data over TCP to . + #[clap(long, requires = "send_destination")] + pub send_tcp: bool, + + /// Try to send some junk data over UDP to . + #[clap(long, requires = "send_destination")] + pub send_udp: bool, + + /// Try to send ICMP request to . + #[clap(long, requires = "send_destination")] + pub send_icmp: bool, + + /// Target of , or . + #[clap(short = 'd', long)] + pub send_destination: Option, +} diff --git a/test/am-i-mullvad/src/lib.rs b/test/am-i-mullvad/src/lib.rs new file mode 100644 index 000000000000..cb36c236b0be --- /dev/null +++ b/test/am-i-mullvad/src/lib.rs @@ -0,0 +1,2 @@ +pub mod cli; +pub mod net; diff --git a/test/am-i-mullvad/src/main.rs b/test/am-i-mullvad/src/main.rs index c6cc272d308a..ef83bd2b8cee 100644 --- a/test/am-i-mullvad/src/main.rs +++ b/test/am-i-mullvad/src/main.rs @@ -1,19 +1,59 @@ +use clap::Parser; use eyre::{eyre, Context}; -use reqwest::blocking::get; +use reqwest::blocking::Client; use serde::Deserialize; -use std::process; +use std::{io::stdin, time::Duration}; -#[derive(Debug, Deserialize)] -struct Response { - ip: String, - mullvad_exit_ip_hostname: Option, -} +use am_i_mullvad::cli::Opt; +use am_i_mullvad::net::{send_ping, send_tcp, send_udp}; fn main() -> eyre::Result<()> { + let opt = Opt::parse(); color_eyre::install()?; + if opt.interactive { + let stdin = stdin(); + for line in stdin.lines() { + let _ = line.wrap_err("Failed to read from stdin")?; + test_connection(&opt)?; + } + } else { + test_connection(&opt)?; + } + + Ok(()) +} + +fn test_connection(opt: &Opt) -> eyre::Result { + if let Some(destination) = opt.send_destination { + if opt.send_tcp { + let _ = send_tcp(opt, destination); + } + if opt.send_udp { + let _ = send_udp(opt, destination); + } + if opt.send_icmp { + let _ = send_ping(opt, destination.ip()); + } + } + am_i_mullvad(opt) +} + +/// Check if connected to Mullvad and print the result to stdout +fn am_i_mullvad(opt: &Opt) -> eyre::Result { + #[derive(Debug, Deserialize)] + struct Response { + ip: String, + mullvad_exit_ip_hostname: Option, + } + let url = "https://am.i.mullvad.net/json"; - let response: Response = get(url) + + let client = Client::new(); + let response: Response = client + .get(url) + .timeout(Duration::from_millis(opt.timeout)) + .send() .and_then(|r| r.json()) .wrap_err_with(|| eyre!("Failed to GET {url}"))?; @@ -22,12 +62,12 @@ fn main() -> eyre::Result<()> { "You are connected to Mullvad (server {}). Your IP address is {}", server, response.ip ); - Ok(()) + Ok(true) } else { println!( "You are not connected to Mullvad. Your IP address is {}", response.ip ); - process::exit(1) + Ok(false) } } diff --git a/test/am-i-mullvad/src/net.rs b/test/am-i-mullvad/src/net.rs new file mode 100644 index 000000000000..4e0e09e3514c --- /dev/null +++ b/test/am-i-mullvad/src/net.rs @@ -0,0 +1,76 @@ +use eyre::{eyre, Context}; +use std::{ + io::Write, + net::{IpAddr, Ipv4Addr, SocketAddr}, + time::Duration, +}; + +use crate::cli::Opt; + +pub fn send_tcp(opt: &Opt, destination: SocketAddr) -> eyre::Result<()> { + let bind_addr: SocketAddr = SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0); + + let family = match &destination { + SocketAddr::V4(_) => socket2::Domain::IPV4, + SocketAddr::V6(_) => socket2::Domain::IPV6, + }; + let sock = socket2::Socket::new(family, socket2::Type::STREAM, Some(socket2::Protocol::TCP)) + .wrap_err(eyre!("Failed to create TCP socket"))?; + + eprintln!("Connecting from {bind_addr} to {destination}/TCP"); + + sock.bind(&socket2::SockAddr::from(bind_addr)) + .wrap_err(eyre!("Failed to bind TCP socket to {bind_addr}"))?; + + let timeout = Duration::from_millis(opt.timeout); + sock.set_write_timeout(Some(timeout))?; + sock.set_read_timeout(Some(timeout))?; + + sock.connect_timeout(&socket2::SockAddr::from(destination), timeout) + .wrap_err(eyre!("Failed to connect to {destination}"))?; + + let mut stream = std::net::TcpStream::from(sock); + stream + .write_all(b"hello there") + .wrap_err(eyre!("Failed to send message to {destination}"))?; + + Ok(()) +} + +pub fn send_udp(_opt: &Opt, destination: SocketAddr) -> Result<(), eyre::Error> { + let bind_addr: SocketAddr = SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0); + + eprintln!("Connecting from {bind_addr} to {destination}/UDP"); + + let family = match &destination { + SocketAddr::V4(_) => socket2::Domain::IPV4, + SocketAddr::V6(_) => socket2::Domain::IPV6, + }; + let sock = socket2::Socket::new(family, socket2::Type::DGRAM, Some(socket2::Protocol::UDP)) + .wrap_err("Failed to create UDP socket")?; + + sock.bind(&socket2::SockAddr::from(bind_addr)) + .wrap_err(eyre!("Failed to bind UDP socket to {bind_addr}"))?; + + //log::debug!("Send message from {bind_addr} to {destination}/UDP"); + + let std_socket = std::net::UdpSocket::from(sock); + std_socket + .send_to(b"Hello there!", destination) + .wrap_err(eyre!("Failed to send message to {destination}"))?; + + Ok(()) +} + +pub fn send_ping(opt: &Opt, destination: IpAddr) -> eyre::Result<()> { + ping::ping( + destination, + Some(Duration::from_millis(opt.timeout)), + None, + None, + None, + None, + )?; + + Ok(()) +} diff --git a/test/test-manager/src/tests/split_tunnel.rs b/test/test-manager/src/tests/split_tunnel.rs index 9902dec231dd..dc47d139663a 100644 --- a/test/test-manager/src/tests/split_tunnel.rs +++ b/test/test-manager/src/tests/split_tunnel.rs @@ -1,145 +1,343 @@ +use anyhow::{anyhow, bail, ensure, Context}; use mullvad_management_interface::MullvadProxyClient; -use std::str; +use pcap::Direction; +use pnet_packet::ip::IpNextHeaderProtocols; +use std::{net::Ipv4Addr, str, time::Duration}; use test_macro::test_function; -use test_rpc::{meta::Os, ExecResult, ServiceClient}; +use test_rpc::{meta::Os, ServiceClient, SpawnOpts}; +use tokio::time::{sleep, timeout}; + +use crate::network_monitor::{start_packet_monitor, MonitorOptions}; use super::{config::TEST_CONFIG, helpers, TestContext}; +const AM_I_MULLVAD_PATH_WINDOWS: &str = "E:\\am-i-mullvad.exe"; +const AM_I_MULLVAD_PATH_LINUX: &str = "/tmp/am-i-mullvad"; +/// Test that split tunneling works by asserting the following: +/// - Splitting a process shouldn't do anything if tunnel is not connected. +/// - A split process should never push traffic through the tunnel. +/// - Splitting/unsplitting should work regardless if process is running. #[test_function] pub async fn test_split_tunnel( - ctx: TestContext, + _ctx: TestContext, rpc: ServiceClient, - mullvad_client: MullvadProxyClient, + mut mullvad_client: MullvadProxyClient, ) -> anyhow::Result<()> { - match TEST_CONFIG.os { - Os::Linux => test_split_tunnel_linux(ctx, rpc, mullvad_client).await, - Os::Windows => test_split_tunnel_windows(ctx, rpc, mullvad_client).await, - Os::Macos => todo!("MacOS"), - } + helpers::disconnect_and_wait(&mut mullvad_client).await?; + + let mut checker = ConnChecker::new(rpc.clone(), mullvad_client.clone()); + + // test that program is behaving when we are disconnected + (checker.spawn().await?.assert_insecure().await) + .with_context(|| "Test disconnected and unsplit")?; + checker.split().await?; + (checker.spawn().await?.assert_insecure().await) + .with_context(|| "Test disconnected and split")?; + checker.unsplit().await?; + + // TODO: Overkill? + // test that program is behaving being split/unsplit while running and we are disconnected + let mut handle = checker.spawn().await?; + handle.split().await?; + (handle.assert_insecure().await) + .with_context(|| "Test disconnected and being split while running")?; + handle.unsplit().await?; + (handle.assert_insecure().await) + .with_context(|| "Test disconnected and being unsplit while running")?; + drop(handle); + + log::info!("connecting"); + helpers::connect_and_wait(&mut mullvad_client).await?; + + // test running an unsplit program + checker + .spawn() + .await? + .assert_secure() + .await + .with_context(|| "Test connected and unsplit")?; + + // test running a split program + checker.split().await?; + checker + .spawn() + .await? + .assert_insecure() + .await + .with_context(|| "Test connected and split")?; + + checker.unsplit().await?; + + // test splitting and unsplitting a program while it's running + let mut handle = checker.spawn().await?; + (handle.assert_secure().await).with_context(|| "Test connected and unsplit (again)")?; + handle.split().await?; + (handle.assert_insecure().await) + .with_context(|| "Test connected and being split while running")?; + handle.unsplit().await?; + (handle.assert_secure().await) + .with_context(|| "Test connected and being unsplit while running")?; + drop(handle); + + Ok(()) } -pub async fn test_split_tunnel_windows( - _: TestContext, +struct ConnChecker { rpc: ServiceClient, - mut mullvad_client: MullvadProxyClient, -) -> anyhow::Result<()> { - const AM_I_MULLVAD_EXE: &str = "E:\\am-i-mullvad.exe"; + mullvad_client: MullvadProxyClient, + executable_path: &'static str, + split: bool, +} - async fn am_i_mullvad(rpc: &ServiceClient) -> anyhow::Result { - parse_am_i_mullvad(rpc.exec(AM_I_MULLVAD_EXE, []).await?) - } +struct ConnCheckerHandle<'a> { + checker: &'a mut ConnChecker, + pid: u32, +} - let mut errored = false; +struct ConnectonStatus { + /// True if reported we are connected. + am_i_mullvad: bool, - helpers::disconnect_and_wait(&mut mullvad_client).await?; + /// True if we sniffed TCP packets going outside the tunnel. + leaked_tcp: bool, - if am_i_mullvad(&rpc).await? { - log::error!("We should be disconnected, but `{AM_I_MULLVAD_EXE}` reported that it was connected to Mullvad."); - log::error!("Host machine is probably connected to Mullvad, this will throw off results"); - errored = true - } + /// True if we sniffed UDP packets going outside the tunnel. + leaked_udp: bool, - helpers::connect_and_wait(&mut mullvad_client).await?; + /// True if we sniffed ICMP packets going outside the tunnel. + leaked_icmp: bool, +} - if !am_i_mullvad(&rpc).await? { - log::error!( - "We should be connected, but `{AM_I_MULLVAD_EXE}` reported no connection to Mullvad." - ); - errored = true +impl ConnChecker { + pub fn new(rpc: ServiceClient, mullvad_client: MullvadProxyClient) -> Self { + Self { + rpc, + mullvad_client, + split: false, + + executable_path: match TEST_CONFIG.os { + Os::Windows => AM_I_MULLVAD_PATH_WINDOWS, + Os::Linux => AM_I_MULLVAD_PATH_LINUX, + Os::Macos => todo!("MacOS"), + }, + } } - mullvad_client - .add_split_tunnel_app(AM_I_MULLVAD_EXE) - .await?; - mullvad_client.set_split_tunnel_state(true).await?; + /// Spawn the connecton checker process and return a handle to it. + /// + /// Dropping the handle will stop the process. + /// **NOTE**: The handle must be dropped from a tokio runtime context. + pub async fn spawn(&mut self) -> anyhow::Result> { + let opts = SpawnOpts { + attach_stdin: true, + attach_stdout: true, + args: [ + "--interactive", + "--timeout", + "5000", + "-d", + "1.1.1.1:1337", + "--send-tcp", + "--send-udp", + "--send-icmp", + ] + .map(String::from) + .to_vec(), + ..SpawnOpts::new(self.executable_path) + }; - if am_i_mullvad(&rpc).await? { - log::error!( - "`{AM_I_MULLVAD_EXE}` should have been split, but it reported a connection to Mullvad" - ); - errored = true - } + let pid = self.rpc.spawn(opts).await?; - helpers::disconnect_and_wait(&mut mullvad_client).await?; + if self.split && TEST_CONFIG.os == Os::Linux { + self.mullvad_client + .add_split_tunnel_process(pid as i32) + .await?; + } - if am_i_mullvad(&rpc).await? { - log::error!( - "`{AM_I_MULLVAD_EXE}` reported a connection to Mullvad while split and disconnected" - ); - errored = true + Ok(ConnCheckerHandle { pid, checker: self }) } - mullvad_client.set_split_tunnel_state(false).await?; - mullvad_client - .remove_split_tunnel_app(AM_I_MULLVAD_EXE) - .await?; + /// Enable split tunneling for the connection checker. + pub async fn split(&mut self) -> anyhow::Result<()> { + log::info!("enable split tunnel"); + self.split = true; + + match TEST_CONFIG.os { + Os::Linux => { /* linux programs can't be split until they are spawned */ } + Os::Windows => { + self.mullvad_client + .add_split_tunnel_app(self.executable_path) + .await?; + self.mullvad_client.set_split_tunnel_state(true).await?; + } + Os::Macos => todo!("MacOS"), + } - if errored { - anyhow::bail!("test_split_tunnel failed, see log output for details."); + Ok(()) } - Ok(()) + /// Disable split tunneling for the connection checker. + pub async fn unsplit(&mut self) -> anyhow::Result<()> { + log::info!("disable split tunnel"); + self.split = false; + + match TEST_CONFIG.os { + Os::Linux => {} + Os::Windows => { + self.mullvad_client.set_split_tunnel_state(false).await?; + self.mullvad_client + .remove_split_tunnel_app(self.executable_path) + .await?; + } + Os::Macos => todo!("MacOS"), + } + + Ok(()) + } } -pub async fn test_split_tunnel_linux( - _: TestContext, - rpc: ServiceClient, - mut mullvad_client: MullvadProxyClient, -) -> anyhow::Result<()> { - const AM_I_MULLVAD_URL: &str = "https://am.i.mullvad.net/connected"; - - async fn am_i_mullvad(rpc: &ServiceClient, split_tunnel: bool) -> anyhow::Result { - let result = if split_tunnel { - rpc.exec("mullvad-exclude", ["curl", AM_I_MULLVAD_URL]) - .await? - } else { - rpc.exec("curl", [AM_I_MULLVAD_URL]).await? - }; +impl ConnCheckerHandle<'_> { + pub async fn split(&mut self) -> anyhow::Result<()> { + if TEST_CONFIG.os == Os::Linux { + self.checker + .mullvad_client + .add_split_tunnel_process(self.pid as i32) + .await?; + } - parse_am_i_mullvad(result) + self.checker.split().await } - let mut errored = false; + pub async fn unsplit(&mut self) -> anyhow::Result<()> { + if TEST_CONFIG.os == Os::Linux { + self.checker + .mullvad_client + .remove_split_tunnel_process(self.pid as i32) + .await?; + } - helpers::connect_and_wait(&mut mullvad_client).await?; + self.checker.unsplit().await + } - if !am_i_mullvad(&rpc, false).await? { - log::error!("We should be connected, but `am.i.mullvad` reported that it was not connected to Mullvad."); - errored = true; + /// Assert that traffic is flowing through the Mullvad tunnel and that no packets are leaked. + pub async fn assert_secure(&mut self) -> anyhow::Result<()> { + log::info!("checking that connection is secure"); + let status = self.check_connection().await?; + ensure!(status.am_i_mullvad); + ensure!(!status.leaked_tcp); + ensure!(!status.leaked_udp); + ensure!(!status.leaked_icmp); + + Ok(()) } - if am_i_mullvad(&rpc, true).await? { - log::error!( - "`mullvad-exclude curl {AM_I_MULLVAD_URL}` reported that it was connected to Mullvad." - ); - log::error!("`curl` does not appear to have been split correctly."); - errored = true; + /// Assert that traffic is NOT flowing through the Mullvad tunnel and that packets ARE leaked. + pub async fn assert_insecure(&mut self) -> anyhow::Result<()> { + log::info!("checking that connection is not secure"); + let status = self.check_connection().await?; + ensure!(!status.am_i_mullvad); + ensure!(status.leaked_tcp); + ensure!(status.leaked_udp); + ensure!(status.leaked_icmp); + + Ok(()) } - helpers::disconnect_and_wait(&mut mullvad_client).await?; + async fn check_connection(&mut self) -> anyhow::Result { + //let nontun_iface = self + // .rpc + // .get_default_interface() + // .await + // .with_context(|| "failed to find non-tun interface")?; + //println!("nontun_iface: {nontun_iface:?}"); + + let monitor = start_packet_monitor( + |packet| packet.destination.ip() == Ipv4Addr::new(1, 1, 1, 1), + MonitorOptions { + direction: Some(Direction::In), + ..MonitorOptions::default() + }, + ) + .await; + + self.checker + .rpc + .write_child_stdin(self.pid, "Say the line, Bart!\r\n".into()) + .await?; - if am_i_mullvad(&rpc, false).await? { - log::error!("We should be disconnected, but `curl {AM_I_MULLVAD_URL}` reported that it was connected to Mullvad."); - log::error!("Host machine is probably connected to Mullvad. This may affect test results."); - errored = true; + let line = self.read_stdout_line().await?; + + let monitor_result = monitor + .into_result() + .await + .map_err(|_e| anyhow!("Packet monitor unexpectedly stopped"))?; + + Ok(ConnectonStatus { + am_i_mullvad: parse_am_i_mullvad(line)?, + + leaked_tcp: (monitor_result.packets.iter()) + .any(|pkt| pkt.protocol == IpNextHeaderProtocols::Tcp), + + leaked_udp: (monitor_result.packets.iter()) + .any(|pkt| pkt.protocol == IpNextHeaderProtocols::Udp), + + leaked_icmp: (monitor_result.packets.iter()) + .any(|pkt| pkt.protocol == IpNextHeaderProtocols::Icmp), + }) } - if errored { - anyhow::bail!("test_split_tunnel failed, see log output for details."); + /// Try to a single line of output from the spawned process + async fn read_stdout_line(&mut self) -> anyhow::Result { + timeout(Duration::from_secs(5), async { + let mut line = String::new(); + loop { + let Some(output) = self.checker.rpc.read_child_stdout(self.pid).await? else { + bail!("got EOF from connection checker process"); + }; + + if output.is_empty() { + sleep(Duration::from_millis(500)).await; + continue; + } + + line.push_str(&output); + + if line.ends_with('\n') { + log::info!("output from child process: {output:?}"); + return Ok(line); + } + } + }) + .await + .with_context(|| "Timeout reading stdout from connection checker")? } +} - Ok(()) +impl Drop for ConnCheckerHandle<'_> { + fn drop(&mut self) { + let rpc = self.checker.rpc.clone(); + let pid = self.pid; + + let Ok(runtime_handle) = tokio::runtime::Handle::try_current() else { + log::error!("ConnCheckerHandle dropped outside of a tokio runtime."); + return; + }; + + runtime_handle.spawn(async move { + // Make sure child process is stopped when this handle is dropped. + // Closing stdin does the trick. + let _ = rpc.close_child_stdin(pid).await; + }); + } } /// Parse output from am-i-mullvad. Returns true if connected to Mullvad. -fn parse_am_i_mullvad(result: ExecResult) -> anyhow::Result { - let stdout = str::from_utf8(&result.stdout).expect("curl output is UTF-8"); - - Ok(if stdout.contains("You are connected") { +fn parse_am_i_mullvad(result: String) -> anyhow::Result { + Ok(if result.contains("You are connected") { true - } else if stdout.contains("You are not connected") { + } else if result.contains("You are not connected") { false } else { - anyhow::bail!("Unexpected output from am-i-mullvad: {stdout:?}") + anyhow::bail!("Unexpected output from am-i-mullvad: {result:?}") }) } diff --git a/test/test-manager/src/vm/provision.rs b/test/test-manager/src/vm/provision.rs index 5f01e8f192b9..fbd8024f2466 100644 --- a/test/test-manager/src/vm/provision.rs +++ b/test/test-manager/src/vm/provision.rs @@ -106,6 +106,11 @@ fn blocking_ssh( ssh_send_file_path(&session, &source, temp_dir) .context("Failed to send test runner to remote")?; + // Transfer am-i-mullvad + let source = local_runner_dir.join("am-i-mullvad"); + ssh_send_file_path(&session, &source, temp_dir) + .context("Failed to send am-i-mullvad to remote")?; + // Transfer app packages ssh_send_file_path(&session, &local_app_manifest.current_app_path, temp_dir) .context("Failed to send current app package to remote")?; diff --git a/test/test-rpc/src/client.rs b/test/test-rpc/src/client.rs index b4fb67f5c069..324669de3fc6 100644 --- a/test/test-rpc/src/client.rs +++ b/test/test-rpc/src/client.rs @@ -351,4 +351,26 @@ impl ServiceClient { .make_device_json_old(tarpc::context::current()) .await? } + + pub async fn spawn(&self, opts: SpawnOpts) -> Result { + self.client.spawn(tarpc::context::current(), opts).await? + } + + pub async fn read_child_stdout(&self, pid: u32) -> Result, Error> { + self.client + .read_child_stdout(tarpc::context::current(), pid) + .await? + } + + pub async fn write_child_stdin(&self, pid: u32, data: String) -> Result<(), Error> { + self.client + .write_child_stdin(tarpc::context::current(), pid, data) + .await? + } + + pub async fn close_child_stdin(&self, pid: u32) -> Result<(), Error> { + self.client + .close_child_stdin(tarpc::context::current(), pid) + .await? + } } diff --git a/test/test-rpc/src/lib.rs b/test/test-rpc/src/lib.rs index d1515206015f..9e2d1571ead8 100644 --- a/test/test-rpc/src/lib.rs +++ b/test/test-rpc/src/lib.rs @@ -57,6 +57,8 @@ pub enum Error { Timeout, #[error("TCP forward error")] TcpForward, + #[error("{0}")] + Other(String), } /// Response from am.i.mullvad.net @@ -80,6 +82,27 @@ impl ExecResult { } } +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct SpawnOpts { + pub path: String, + pub args: Vec, + pub env: BTreeMap, + pub attach_stdin: bool, + pub attach_stdout: bool, +} + +impl SpawnOpts { + pub fn new(path: impl Into) -> SpawnOpts { + SpawnOpts { + path: path.into(), + args: Default::default(), + env: Default::default(), + attach_stdin: Default::default(), + attach_stdout: Default::default(), + } + } +} + #[derive(Debug, Serialize, Deserialize)] pub enum AppTrace { Path(PathBuf), @@ -197,6 +220,25 @@ mod service { async fn reboot() -> Result<(), Error>; async fn make_device_json_old() -> Result<(), Error>; + + /// Spawn a child process and return the PID. + async fn spawn(opts: SpawnOpts) -> Result; + + /// Read from stdout of a process spawned through [Service::spawn]. + /// + /// Process must have been spawned with `attach_stdout`. + /// Returns `None` if process stdout is closed. + async fn read_child_stdout(pid: u32) -> Result, Error>; + + /// Write to stdin of a process spawned through [Service::spawn]. + /// + /// Process must have been spawned with `attach_stdin`. + async fn write_child_stdin(pid: u32, data: String) -> Result<(), Error>; + + /// Close stdin of a process spawned through [Service::spawn]. + /// + /// Process must have been spawned with `attach_stdin`. + async fn close_child_stdin(pid: u32) -> Result<(), Error>; } } diff --git a/test/test-runner/src/main.rs b/test/test-runner/src/main.rs index 3511d78cec55..e21f1d2fe059 100644 --- a/test/test-runner/src/main.rs +++ b/test/test-runner/src/main.rs @@ -1,9 +1,12 @@ -use futures::{pin_mut, SinkExt, StreamExt}; +use futures::{pin_mut, select_biased, FutureExt, SinkExt, StreamExt}; use logging::LOGGER; use std::{ collections::{BTreeMap, HashMap}, net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, + process::Stdio, + sync::Arc, + time::Duration, }; use tarpc::{context, server::Channel}; @@ -12,12 +15,13 @@ use test_rpc::{ net::SockHandleId, package::Package, transport::GrpcForwarder, - AppTrace, Service, + AppTrace, Service, SpawnOpts, }; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, - process::Command, - sync::broadcast::error::TryRecvError, + process::{Child, Command}, + sync::{broadcast::error::TryRecvError, Mutex}, + time::sleep, }; use tokio_util::codec::{Decoder, LengthDelimitedCodec}; @@ -28,8 +32,13 @@ mod net; mod package; mod sys; -#[derive(Clone)] -pub struct TestServer(pub ()); +#[derive(Clone, Default)] +pub struct TestServer(Arc>); + +#[derive(Default)] +struct State { + spawned_procs: HashMap, +} #[tarpc::server] impl Service for TestServer { @@ -319,6 +328,114 @@ impl Service for TestServer { async fn make_device_json_old(self, _: context::Context) -> Result<(), test_rpc::Error> { app::make_device_json_old().await } + + async fn spawn(self, _: context::Context, opts: SpawnOpts) -> Result { + log::info!("Spawn {} (args: {:?})", opts.path, opts.args); + + let mut cmd = Command::new(&opts.path); + cmd.args(opts.args); + + // Make sure that PATH is updated + // TODO: We currently do not need this on non-Windows + #[cfg(target_os = "windows")] + cmd.env("PATH", sys::get_system_path_var()?); + + cmd.envs(opts.env); + + if opts.attach_stdin { + cmd.stdin(Stdio::piped()); + } else { + cmd.stdin(Stdio::null()); + } + + if opts.attach_stdout { + cmd.stdout(Stdio::piped()); + } + + let child = cmd.spawn().map_err(|error| { + log::error!("Failed to exec {}: {error}", opts.path); + test_rpc::Error::Syscall + })?; + + let pid = child + .id() + .expect("Child hasn't been polled to completion yet"); + + let mut state = self.0.lock().await; + state.spawned_procs.insert(pid, child); + + Ok(pid) + } + + async fn read_child_stdout( + self, + _: context::Context, + pid: u32, + ) -> Result, test_rpc::Error> { + let mut state = self.0.lock().await; + let child = state + .spawned_procs + .get_mut(&pid) + .expect("TODO: unknown pid error"); + + let Some(stdout) = child.stdout.as_mut() else { + return Ok(None); + }; + + let mut buf = vec![0u8; 512]; + + let n = select_biased! { + result = stdout.read(&mut buf).fuse() => result.expect("todo: read error"), + _ = sleep(Duration::from_millis(500)).fuse() => return Ok(Some(String::new())), + }; + + // check for EOF + if n == 0 { + child.stdout = None; + return Ok(None); + } + + buf.truncate(n); + let output = String::from_utf8(buf).expect("TODO: utf8 error"); + + Ok(Some(output)) + } + + async fn write_child_stdin( + self, + _: context::Context, + pid: u32, + data: String, + ) -> Result<(), test_rpc::Error> { + let mut state = self.0.lock().await; + let child = state + .spawned_procs + .get_mut(&pid) + .expect("TODO: unknown pid error"); + + let Some(stdin) = child.stdin.as_mut() else { + todo!("error on no stdin?") + }; + + stdin + .write_all(data.as_bytes()) + .await + .expect("todo: write error"); + log::debug!("wrote {} bytes to pid {pid}", data.len()); + + Ok(()) + } + + async fn close_child_stdin(self, _: context::Context, pid: u32) -> Result<(), test_rpc::Error> { + let mut state = self.0.lock().await; + let child = state + .spawned_procs + .get_mut(&pid) + .expect("TODO: unknown pid error"); + + child.stdin = None; + Ok(()) + } } fn get_pipe_status() -> ServiceStatus { @@ -364,7 +481,7 @@ async fn main() -> Result<(), Error> { )); let server = tarpc::server::BaseChannel::with_defaults(runner_transport); - server.execute(TestServer(()).serve()).await; + server.execute(TestServer::default().serve()).await; log::error!("Restarting server since it stopped"); }