diff --git a/Cargo.lock b/Cargo.lock index c398c67f7a5d..c929ce575478 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3491,9 +3491,7 @@ dependencies = [ "futures", "log", "once_cell", - "os_pipe", "parity-tokio-ipc", - "parking_lot", "prost", "shadowsocks-service", "shell-escape", diff --git a/talpid-core/src/tunnel/mod.rs b/talpid-core/src/tunnel/mod.rs index d254ec93d58c..4ab6bb8d3d97 100644 --- a/talpid-core/src/tunnel/mod.rs +++ b/talpid-core/src/tunnel/mod.rs @@ -224,11 +224,18 @@ impl TunnelMonitor { config, log, resource_dir, - tunnel_close_rx, #[cfg(target_os = "linux")] route_manager, ) .await?; + + let close_handle = monitor.close_handle(); + tokio::spawn(async move { + if tunnel_close_rx.await.is_ok() { + close_handle.close(); + } + }); + Ok(TunnelMonitor { monitor: InternalTunnelMonitor::OpenVpn(monitor), }) @@ -298,9 +305,12 @@ enum InternalTunnelMonitor { impl InternalTunnelMonitor { fn wait(self) -> Result<()> { + #[cfg(not(target_os = "android"))] + let handle = tokio::runtime::Handle::current(); + match self { #[cfg(not(target_os = "android"))] - InternalTunnelMonitor::OpenVpn(tun) => tun.wait()?, + InternalTunnelMonitor::OpenVpn(tun) => handle.block_on(tun.wait())?, InternalTunnelMonitor::Wireguard(tun) => tun.wait()?, } diff --git a/talpid-openvpn/Cargo.toml b/talpid-openvpn/Cargo.toml index 8106c4816d1f..6bfdb0c57741 100644 --- a/talpid-openvpn/Cargo.toml +++ b/talpid-openvpn/Cargo.toml @@ -15,8 +15,6 @@ err-derive = { workspace = true } futures = "0.3.15" once_cell = { workspace = true } log = { workspace = true } -os_pipe = "1.1.4" -parking_lot = "0.12.0" shell-escape = "0.1" talpid-routing = { path = "../talpid-routing" } talpid-tunnel = { path = "../talpid-tunnel" } diff --git a/talpid-openvpn/src/lib.rs b/talpid-openvpn/src/lib.rs index e6080044cb05..48e3b20b20ea 100644 --- a/talpid-openvpn/src/lib.rs +++ b/talpid-openvpn/src/lib.rs @@ -4,30 +4,25 @@ #![deny(rust_2018_idioms)] use crate::proxy::{ProxyMonitor, ProxyResourceData}; -use futures::channel::oneshot; #[cfg(windows)] use once_cell::sync::Lazy; use process::openvpn::{OpenVpnCommand, OpenVpnProcHandle}; #[cfg(target_os = "linux")] use std::collections::{HashMap, HashSet}; -#[cfg(windows)] -use std::ffi::OsString; +#[cfg(target_os = "windows")] +use std::{ffi::OsString, sync::Arc}; use std::{ fs, io::{self, Write}, path::{Path, PathBuf}, process::ExitStatus, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc, Arc, - }, time::Duration, }; #[cfg(target_os = "linux")] use talpid_routing::{self, RequiredRoute}; use talpid_tunnel::TunnelEvent; use talpid_types::{net::openvpn, ErrorExt}; -use tokio::{sync::Mutex, task}; +use tokio::task; #[cfg(windows)] use widestring::U16CString; @@ -109,19 +104,8 @@ pub enum Error { CredentialsWriteError(#[error(source)] io::Error), /// Failures related to the proxy service. - #[error(display = "Unable to start the proxy service")] - StartProxyError(#[error(source)] proxy::Error), - - /// Error while monitoring proxy service - #[error(display = "Error while monitoring proxy service")] - MonitorProxyError(#[error(source)] io::Error), - - /// The proxy exited unexpectedly - #[error( - display = "The proxy exited unexpectedly providing these details: {}", - _0 - )] - ProxyExited(String), + #[error(display = "Proxy service failed")] + ProxyError(#[error(source)] proxy::Error), /// The map is missing 'dev' #[cfg(target_os = "linux")] @@ -159,24 +143,19 @@ const OPENVPN_BIN_FILENAME: &str = "openvpn.exe"; /// Struct for monitoring an OpenVPN process. #[derive(Debug)] pub struct OpenVpnMonitor { - spawn_task: Option< - tokio::task::JoinHandle< - std::result::Result, futures::future::Aborted>, - >, - >, - abort_spawn: futures::future::AbortHandle, - - child: Arc>>, + prepare_task: tokio::task::JoinHandle>, + proxy_monitor: Option>, - closed: Arc, /// Keep the `TempFile` for the user-pass file in the struct, so it's removed on drop. _user_pass_file: mktemp::TempFile, /// Keep the 'TempFile' for the proxy user-pass file in the struct, so it's removed on drop. _proxy_auth_file: Option, - runtime: tokio::runtime::Handle, event_server_abort_tx: triggered::Trigger, - server_join_handle: Option>>, + server_join_handle: task::JoinHandle>, + + monitor_abort_tx: triggered::Trigger, + monitor_abort_rx: triggered::Listener, #[cfg(windows)] _wintun: Arc>, @@ -247,7 +226,6 @@ impl OpenVpnMonitor { params: &openvpn::TunnelParameters, log_path: Option, resource_dir: &Path, - tunnel_close_rx: oneshot::Receiver<()>, #[cfg(target_os = "linux")] route_manager: talpid_routing::RouteManagerHandle, ) -> Result where @@ -306,7 +284,6 @@ impl OpenVpnMonitor { user_pass_file, proxy_auth_file, proxy_monitor, - tunnel_close_rx, #[cfg(target_os = "linux")] fwmark: params.fwmark, }; @@ -371,7 +348,6 @@ struct OpenVpnTunnelInitArgs { user_pass_file: mktemp::TempFile, proxy_auth_file: Option, proxy_monitor: Option>, - tunnel_close_rx: oneshot::Receiver<()>, #[cfg(target_os = "linux")] fwmark: u32, } @@ -393,7 +369,6 @@ impl OpenVpnMonitor { let user_pass_file = init_args.user_pass_file; let proxy_auth_file = init_args.proxy_auth_file; let proxy_monitor = init_args.proxy_monitor; - let tunnel_close_rx = init_args.tunnel_close_rx; let (server_join_handle, ipc_path) = event_server::start(on_event, event_server_abort_rx) .map_err(Error::EventDispatcherError)?; @@ -406,42 +381,30 @@ impl OpenVpnMonitor { cmd.plugin(plugin_path, vec![ipc_path]) .log(log_path.as_deref()); - let (spawn_task, abort_spawn) = futures::future::abortable(Self::prepare_process( + let prepare_task = tokio::spawn(Self::prepare_process( cmd, #[cfg(windows)] wintun.clone(), )); - let spawn_task = tokio::spawn(spawn_task); + + let (monitor_abort_tx, monitor_abort_rx) = triggered::trigger(); let monitor = OpenVpnMonitor { - spawn_task: Some(spawn_task), - abort_spawn, - child: Arc::new(Mutex::new(None)), + prepare_task, proxy_monitor, - closed: Arc::new(AtomicBool::new(false)), _user_pass_file: user_pass_file, _proxy_auth_file: proxy_auth_file, - runtime: tokio::runtime::Handle::current(), event_server_abort_tx, - server_join_handle: Some(server_join_handle), + server_join_handle, + + monitor_abort_tx, + monitor_abort_rx, #[cfg(windows)] _wintun: wintun, }; - let close_handle = monitor.close_handle(); - tokio::spawn(async move { - if tunnel_close_rx.await.is_ok() { - if let Err(error) = close_handle.close().await { - log::error!( - "{}", - error.display_chain_with_msg("Failed to close the tunnel") - ); - } - } - }); - Ok(monitor) } @@ -461,68 +424,41 @@ impl OpenVpnMonitor { /// Creates a handle to this monitor, allowing the tunnel to be closed while some other /// thread is blocked in `wait`. - fn close_handle(&self) -> OpenVpnCloseHandle { + pub fn close_handle(&self) -> OpenVpnCloseHandle { OpenVpnCloseHandle { - child: self.child.clone(), - abort_spawn: self.abort_spawn.clone(), - closed: self.closed.clone(), + monitor_abort_tx: self.monitor_abort_tx.clone(), + prepare_task: self.prepare_task.abort_handle(), } } /// Consumes the monitor and waits for both proxy and tunnel, as applicable. - pub fn wait(mut self) -> Result<()> { + pub async fn wait(mut self) -> Result<()> { if let Some(mut proxy_monitor) = self.proxy_monitor.take() { - let (tx_tunnel, rx) = mpsc::channel(); - let tx_proxy = tx_tunnel.clone(); let tunnel_close_handle = self.close_handle(); let proxy_close_handle = proxy_monitor.close_handle(); - enum Stopped { - Tunnel(Result<()>), - Proxy(proxy::Result<()>), - } - - let handle = self.runtime.clone(); - handle.spawn(async move { - tx_tunnel - .send(Stopped::Tunnel(self.wait_tunnel().await)) - .unwrap(); + let tunnel_task = async move { + let result = self.wait_tunnel().await; let _ = proxy_close_handle.close(); - }); - - handle.spawn(async move { - tx_proxy - .send(Stopped::Proxy(proxy_monitor.wait().await)) - .unwrap(); - tunnel_close_handle.close().await - }); - - let result = rx.recv().expect("wait got no result"); - let _ = rx.recv(); - - match result { - Stopped::Tunnel(tunnel_result) => tunnel_result, - Stopped::Proxy(proxy_result) => proxy_result.map_err(|error| match error { - proxy::Error::UnexpectedExit(details) => Error::ProxyExited(details), - proxy::Error::Io(error) => Error::MonitorProxyError(error), - }), - } + result + }; + + let proxy_task = async move { + let result = proxy_monitor.wait().await; + tunnel_close_handle.close(); + result.map_err(Error::ProxyError) + }; + + join_return_first(tunnel_task, proxy_task).await } else { // No proxy active, wait only for the tunnel. - let handle = self.runtime.clone(); - let (tx_tunnel, rx) = mpsc::channel(); - handle.spawn(async move { - let x = self.wait_tunnel(); - tx_tunnel.send(x.await).unwrap(); - }); - rx.recv().expect("wait_tunnel got no result") + self.wait_tunnel().await } } /// Supplement `inner_wait_tunnel()` with logging and error handling. async fn wait_tunnel(self) -> Result<()> { - let result = self.inner_wait_tunnel().await; - match result { + match self.inner_wait_tunnel().await { WaitResult::Preparation(result) => match result { Err(error) => { log::debug!( @@ -533,8 +469,8 @@ impl OpenVpnMonitor { } _ => Ok(()), }, - WaitResult::Child(Ok(exit_status), closed) => { - if exit_status.success() || closed { + WaitResult::Child(Ok(exit_status)) => { + if exit_status.success() { log::debug!( "OpenVPN exited, as expected, with exit status: {}", exit_status @@ -545,7 +481,7 @@ impl OpenVpnMonitor { Err(Error::ChildProcessDied) } } - WaitResult::Child(Err(e), _) => { + WaitResult::Child(Err(e)) => { log::error!("OpenVPN process wait error: {}", e); Err(Error::ChildProcessError("Error when waiting", e)) } @@ -558,51 +494,37 @@ impl OpenVpnMonitor { /// Waits for both the child process and the event dispatcher in parallel. After both have /// returned this returns the earliest result. - async fn inner_wait_tunnel(mut self) -> WaitResult { - let child = match self - .spawn_task - .take() - .unwrap() - .await - .expect("spawn task panicked") - { + async fn inner_wait_tunnel(self) -> WaitResult { + let mut child = match self.prepare_task.await { Ok(Ok(child)) => child, Ok(Err(error)) => { - self.closed.swap(true, Ordering::SeqCst); return WaitResult::Preparation(Err(error)); } Err(_) => return WaitResult::Preparation(Ok(())), }; - if self.closed.load(Ordering::SeqCst) { - let _ = child.kill().await; - return WaitResult::Preparation(Ok(())); - } - - { - self.child.lock().await.replace(child); - } - - let event_server_abort_tx = self.event_server_abort_tx.clone(); - let kill_child = async move { - let result = self.child.lock().await.as_ref().unwrap().wait().await; - let closed = self.closed.load(Ordering::SeqCst); - let result = WaitResult::Child(result, closed); - event_server_abort_tx.trigger(); - result + let result = tokio::select! { + result = child.wait() => { + log::debug!("OpenVPN process exited"); + result + } + _ = self.monitor_abort_rx => { + log::debug!("Killing OpenVPN process"); + child.kill(); + child.wait().await + } + }; + + self.event_server_abort_tx.trigger(); + WaitResult::Child(result) }; let kill_event_dispatcher = async move { - let server_join_handle = self - .server_join_handle - .take() - .expect("No event server quit handle"); - let _ = server_join_handle.await; + let _ = self.server_join_handle.await; WaitResult::EventDispatcher }; - let (result, _) = tokio::join!(kill_child, kill_event_dispatcher); - result + join_return_first(kill_child, kill_event_dispatcher).await } fn create_proxy_auth_file( @@ -627,7 +549,7 @@ impl OpenVpnMonitor { if let Some(ref settings) = proxy_settings { let proxy_monitor = proxy::start_proxy(settings, proxy_resources) .await - .map_err(Error::StartProxyError)?; + .map_err(Error::ProxyError)?; return Ok(Some(proxy_monitor)); } Ok(None) @@ -717,26 +639,17 @@ impl OpenVpnMonitor { } /// A handle to an `OpenVpnMonitor` for closing it. -#[derive(Debug, Clone)] -pub struct OpenVpnCloseHandle { - child: Arc>>, - abort_spawn: futures::future::AbortHandle, - closed: Arc, +#[derive(Debug)] +pub struct OpenVpnCloseHandle { + monitor_abort_tx: triggered::Trigger, + prepare_task: tokio::task::AbortHandle, } -impl OpenVpnCloseHandle { - /// Kills the underlying OpenVPN process, making the `OpenVpnMonitor::wait` method return. - pub async fn close(self) -> io::Result<()> { - if !self.closed.swap(true, Ordering::SeqCst) { - self.abort_spawn.abort(); - if let Some(child) = self.child.lock().await.as_ref() { - child.kill().await - } else { - Ok(()) - } - } else { - Ok(()) - } +impl OpenVpnCloseHandle { + /// Begin killing the OpenVPN monitor, making the `OpenVpnMonitor::wait` method return. + pub fn close(self) { + self.prepare_task.abort(); + self.monitor_abort_tx.trigger(); } } @@ -744,7 +657,7 @@ impl OpenVpnCloseHandle { #[derive(Debug)] enum WaitResult { Preparation(io::Result<()>), - Child(io::Result, bool), + Child(io::Result), EventDispatcher, } @@ -771,10 +684,10 @@ pub trait OpenVpnBuilder { #[async_trait::async_trait] pub trait ProcessHandle: Send + Sync + 'static { /// Block until the subprocess exits or there is an error in the wait syscall. - async fn wait(&self) -> io::Result; + async fn wait(&mut self) -> io::Result; - /// Kill the subprocess. - async fn kill(&self) -> io::Result<()>; + /// Kill the subprocess without waiting for it to complete. + fn kill(&mut self); } impl OpenVpnBuilder for OpenVpnCommand { @@ -805,12 +718,32 @@ impl OpenVpnBuilder for OpenVpnCommand { #[async_trait::async_trait] impl ProcessHandle for OpenVpnProcHandle { - async fn wait(&self) -> io::Result { - self.wait().await + async fn wait(&mut self) -> io::Result { + OpenVpnProcHandle::wait(self).await } - async fn kill(&self) -> io::Result<()> { - self.nice_kill(OPENVPN_DIE_TIMEOUT).await + fn kill(&mut self) { + OpenVpnProcHandle::kill(self, OPENVPN_DIE_TIMEOUT) + } +} + +/// Join two futures and return the result of the first one to complete. +async fn join_return_first( + future1: impl std::future::Future, + future2: impl std::future::Future, +) -> R { + futures::pin_mut!(future1); + futures::pin_mut!(future2); + + match futures::future::select(future1, future2).await { + futures::future::Either::Left((result, other)) => { + let _ = other.await; + result + } + futures::future::Either::Right((result, other)) => { + let _ = other.await; + result + } } } @@ -1126,10 +1059,9 @@ mod event_server { mod tests { use super::*; use crate::mktemp::TempFile; - use parking_lot::Mutex; use std::{ path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, Mutex}, }; #[cfg(windows)] @@ -1191,12 +1123,12 @@ mod tests { type ProcessHandle = TestProcessHandle; fn plugin(&mut self, path: impl AsRef, _args: Vec) -> &mut Self { - *self.plugin.lock() = Some(path.as_ref().to_path_buf()); + *self.plugin.lock().unwrap() = Some(path.as_ref().to_path_buf()); self } fn log(&mut self, log: Option>) -> &mut Self { - *self.log.lock() = log.as_ref().map(|path| path.as_ref().to_path_buf()); + *self.log.lock().unwrap() = log.as_ref().map(|path| path.as_ref().to_path_buf()); self } @@ -1217,27 +1149,24 @@ mod tests { #[async_trait::async_trait] impl ProcessHandle for TestProcessHandle { #[cfg(unix)] - async fn wait(&self) -> io::Result { + async fn wait(&mut self) -> io::Result { use std::os::unix::process::ExitStatusExt; Ok(ExitStatus::from_raw(self.0)) } #[cfg(windows)] - async fn wait(&self) -> io::Result { + async fn wait(&mut self) -> io::Result { use std::os::windows::process::ExitStatusExt; Ok(ExitStatus::from_raw(self.0 as u32)) } - async fn kill(&self) -> io::Result<()> { - Ok(()) - } + fn kill(&mut self) {} } fn create_init_args_plugin_log( plugin_path: PathBuf, log_path: Option, ) -> OpenVpnTunnelInitArgs { - let (_close_tx, close_rx) = oneshot::channel(); let (event_server_abort_tx, event_server_abort_rx) = triggered::trigger(); OpenVpnTunnelInitArgs { event_server_abort_tx, @@ -1247,7 +1176,6 @@ mod tests { user_pass_file: TempFile::new(), proxy_auth_file: None, proxy_monitor: None, - tunnel_close_rx: close_rx, #[cfg(target_os = "linux")] fwmark: 0, } @@ -1270,7 +1198,7 @@ mod tests { ); assert_eq!( Some(PathBuf::from("./my_test_plugin")), - *builder.plugin.lock() + *builder.plugin.lock().unwrap() ); } @@ -1288,7 +1216,7 @@ mod tests { ); assert_eq!( Some(PathBuf::from("./my_test_log_file")), - *builder.log.lock() + *builder.log.lock().unwrap() ); } @@ -1307,7 +1235,7 @@ mod tests { Box::new(TestWintunContext {}), ) .unwrap(); - assert!(testee.wait().is_ok()); + assert!(testee.wait().await.is_ok()); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -1325,7 +1253,7 @@ mod tests { Box::new(TestWintunContext {}), ) .unwrap(); - assert!(testee.wait().is_err()); + assert!(testee.wait().await.is_err()); } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -1344,8 +1272,8 @@ mod tests { ) .unwrap(); - testee.close_handle().close().await.unwrap(); - let result = testee.wait(); + testee.close_handle().close(); + let result = testee.wait().await; println!("[testee.wait(): {:?}]", result); assert!(result.is_ok()); } @@ -1362,7 +1290,7 @@ mod tests { Box::new(TestWintunContext {}), ) .unwrap(); - match result.wait() { + match result.wait().await { Err(Error::StartProcessError) => (), _ => panic!("Wrong error"), } diff --git a/talpid-openvpn/src/process/openvpn.rs b/talpid-openvpn/src/process/openvpn.rs index 44e00d0eda0c..8d969d196cf1 100644 --- a/talpid-openvpn/src/process/openvpn.rs +++ b/talpid-openvpn/src/process/openvpn.rs @@ -1,12 +1,13 @@ -use os_pipe::{pipe, PipeWriter}; -use parking_lot::Mutex; +use futures::channel::oneshot; use shell_escape; use std::{ ffi::{OsStr, OsString}, fmt, io, path::{Path, PathBuf}, + process::Stdio, + time::Duration, }; -use talpid_types::{net, ErrorExt}; +use talpid_types::net; static BASE_ARGUMENTS: &[&[&str]] = &[ &["--client"], @@ -364,16 +365,8 @@ impl fmt::Display for OpenVpnCommand { /// Handle to a running OpenVPN process. pub struct OpenVpnProcHandle { - /// Handle to the child process running OpenVPN. - /// - /// This handle is acquired by calling [`OpenVpnCommand::build`] (or - /// [`tokio::process::Command::spawn`]). - pub inner: std::sync::Arc>, - /// Pipe handle to stdin of the OpenVPN process. Our custom fork of OpenVPN - /// has been changed so that it exits cleanly when stdin is closed. This is a hack - /// solution to cleanly shut OpenVPN down without using the - /// management interface (which would be the correct thing to do). - pub stdin: Mutex>, + stop_tx: Option>, + proc: tokio::task::JoinHandle>, } impl OpenVpnProcHandle { @@ -390,81 +383,75 @@ impl OpenVpnProcHandle { cmd = cmd.stderr(std::process::Stdio::null()) } - let (reader, writer) = pipe()?; - let proc_handle = cmd.stdin(reader).spawn()?; + let mut proc_handle = cmd.stdin(Stdio::piped()).spawn()?; - Ok(Self { - inner: std::sync::Arc::new(tokio::sync::Mutex::new(proc_handle)), - stdin: Mutex::new(Some(writer)), - }) - } + let (stop_tx, mut stop_rx) = oneshot::channel(); - /// Attempts to stop the OpenVPN process gracefully in the given time - /// period, otherwise kills the process. - pub async fn nice_kill(&self, timeout: std::time::Duration) -> io::Result<()> { - log::debug!("Trying to stop child process gracefully"); - self.stop().await; - - // Wait for the process to die for a maximum of `timeout`. - let wait_result = tokio::time::timeout(timeout, self.wait()).await; - match wait_result { - Ok(_) => log::debug!("Child process terminated gracefully"), - Err(_) => { - log::warn!( - "Child process did not terminate gracefully within timeout, forcing termination" - ); - self.kill().await?; - } - } - Ok(()) - } + let proc = tokio::spawn(async move { + let stdin = proc_handle.stdin.take().expect("expected stdin handle"); - /// Waits for the child to exit completely, returning the status that it - /// exited with. See [tokio::process::Child::wait] for in-depth - /// documentation. - async fn wait(&self) -> io::Result { - self.inner.lock().await.wait().await - } + tokio::select! { + timeout = &mut stop_rx => { + // Dropping our stdin handle so that it is closed once. Closing the handle should + // gracefully stop our OpenVPN child process. This only works because our OpenVPN + // fork expects this. + drop(stdin); - /// Kill the OpenVPN process and drop its stdin handle. - async fn stop(&self) { - // Dropping our stdin handle so that it is closed once. Closing the handle should - // gracefully stop our OpenVPN child process. - if self.stdin.lock().take().is_none() { - log::warn!("Tried to close OpenVPN stdin handle twice, this is a bug"); - } - self.clean_up().await - } + if let Ok(timeout) = timeout { + // + // Controlled shutdown using nice_kill() + // - async fn kill(&self) -> io::Result<()> { - log::warn!("Killing OpenVPN process"); - self.inner.lock().await.kill().await?; - log::debug!("OpenVPN forcefully killed"); - Ok(()) - } + log::debug!("Trying to stop child process gracefully"); - async fn has_stopped(&self) -> io::Result { - let exit_status = self.inner.lock().await.try_wait()?; - Ok(exit_status.is_some()) - } + match tokio::time::timeout(timeout, proc_handle.wait()).await { + Ok(_) => log::debug!("Child process terminated gracefully"), + Err(_) => { + log::warn!( + "Child process did not terminate gracefully within timeout, forcing termination" + ); + proc_handle.kill().await?; + } + } + } else { + // + // If the abort channel is just dropped, kill the process immediately. + // + log::debug!("Killing OpenVPN process forcefully"); + let _ = proc_handle.kill().await; + } + + proc_handle.wait().await + } - /// Try to kill the OpenVPN process. - async fn clean_up(&self) { - let result = match self.has_stopped().await { - Ok(false) => self.kill().await, - Err(e) => { - log::error!( - "{}", - e.display_chain_with_msg("Failed to check if OpenVPN is running") - ); - self.kill().await + // + // If the process exits on its own, we're also done. + // + result = proc_handle.wait() => { + log::debug!("OpenVPN process terminated"); + result + } } - _ => Ok(()), - }; - if let Err(error) = result { - log::error!("{}", error.display_chain_with_msg("Failed to kill OpenVPN")); + }); + + Ok(Self { + stop_tx: Some(stop_tx), + proc, + }) + } + + /// Begins to kill the process, causing `wait()` to return. This function does not wait for the operation + /// to complete. + pub fn kill(&mut self, timeout: std::time::Duration) { + if let Some(tx) = self.stop_tx.take() { + let _ = tx.send(timeout); } } + + /// Waits for the child to exit completely. + pub async fn wait(&mut self) -> io::Result { + (&mut self.proc).await.expect("openvpn task panicked") + } } #[cfg(test)]