diff --git a/Cargo.lock b/Cargo.lock index 36c27eb..85b730b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,9 +98,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.149" +version = "0.2.153" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" +checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "memchr" diff --git a/Cargo.toml b/Cargo.toml index 651b0bf..ce648d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,29 +19,19 @@ rust-version = "1.75.0" # there are a few windows-specific ones autoexamples = false -[dependencies.tokio] -version = "1.33.0" -features = ["io-util", "macros", "process", "rt"] -optional = true - -[target.'cfg(unix)'.dependencies.nix] -version = "0.27.1" -default-features = false -features = ["fs", "poll", "signal"] -optional = true - -[target.'cfg(windows)'.dependencies.winapi] -version = "0.3.9" -features = [ - "impl-default", - "handleapi", - "ioapiset", - "jobapi2", - "processthreadsapi", - "tlhelp32", - "winbase", -] -optional = true +[dependencies] +indexmap = "2.2.5" +tokio = { version = "1.33.0", features = ["io-util", "macros", "process", "rt"], optional = true } +tracing = "0.1.40" + +[target.'cfg(unix)'.dependencies] +nix = { version = "0.27.1", default-features = false, features = ["fs", "poll", "signal"], optional = true } + +[target.'cfg(windows)'.dependencies] +winapi = { version = "0.3.9", features = ["impl-default", "handleapi", "ioapiset", "jobapi2", "processthreadsapi", "tlhelp32", "winbase"], optional = true } + +[dev-dependencies] +tokio = { version = "1.33.0", features = ["io-util", "macros", "process", "rt", "rt-multi-thread", "time"] } [features] default = ["std", "tokio1", "creation-flags", "job-object", "process-group", "process-session"] @@ -67,13 +57,5 @@ process-session = ["dep:nix", "process-group"] ## Wrapper: Pty (TODO) pty = [] - -[dev-dependencies] -tokio = { version = "1.10.0", features = ["io-util", "macros", "process", "rt", "rt-multi-thread", "time"] } - [package.metadata.docs.rs] all-features = true - -[dependencies] -indexmap = "2.2.5" -tracing = "0.1.40" diff --git a/src/tokio/core.rs b/src/tokio/core.rs index b48aeba..eb39f30 100644 --- a/src/tokio/core.rs +++ b/src/tokio/core.rs @@ -98,7 +98,7 @@ pub trait TokioCommandWrapper: std::fmt::Debug { } } -pub trait TokioChildWrapper: std::fmt::Debug { +pub trait TokioChildWrapper: std::fmt::Debug + Send + Sync { fn inner(&self) -> &Child; fn inner_mut(&mut self) -> &mut Child; fn into_inner(self: Box) -> Child; diff --git a/src/tokio/process_group.rs b/src/tokio/process_group.rs index fd15a8a..536fe3b 100644 --- a/src/tokio/process_group.rs +++ b/src/tokio/process_group.rs @@ -3,7 +3,6 @@ use std::{ io::{Error, Result}, ops::ControlFlow, os::unix::process::ExitStatusExt, - pin::pin, process::ExitStatus, }; @@ -21,6 +20,8 @@ use tokio::{ task::spawn_blocking, }; +use super::{TokioChildWrapper, TokioCommandWrapper}; + #[derive(Debug, Clone)] pub struct ProcessGroup { leader: Pid, @@ -40,11 +41,28 @@ impl ProcessGroup { #[derive(Debug)] pub struct ProcessGroupChild { - pub(crate) inner: Box, - pub(crate) pgid: Pid, + inner: Box, + exit_status: ChildExitStatus, + pgid: Pid, +} + +#[derive(Debug)] +enum ChildExitStatus { + Running, + Exited(ExitStatus), +} + +impl ProcessGroupChild { + pub(crate) fn new(inner: Box, pgid: Pid) -> Self { + Self { + inner, + exit_status: ChildExitStatus::Running, + pgid, + } + } } -impl super::core::TokioCommandWrapper for ProcessGroup { +impl TokioCommandWrapper for ProcessGroup { fn pre_spawn(&mut self, command: &mut Command) -> Result<()> { #[cfg(tokio_unstable)] { @@ -66,8 +84,8 @@ impl super::core::TokioCommandWrapper for ProcessGroup { fn wrap_child( &mut self, - inner: Box, - ) -> Result> { + inner: Box, + ) -> Result> { let pgid = Pid::from_raw( i32::try_from( inner @@ -77,7 +95,7 @@ impl super::core::TokioCommandWrapper for ProcessGroup { .expect("Command PID > i32::MAX"), ); - Ok(Box::new(ProcessGroupChild { inner, pgid })) + Ok(Box::new(ProcessGroupChild::new(inner, pgid))) } } @@ -87,28 +105,27 @@ impl ProcessGroupChild { } fn wait_imp(pgid: Pid, flag: WaitPidFlag) -> Result>> { - // Wait for processes in a loop until every process in this - // process group has exited (this ensures that we reap any - // zombies that may have been created if the parent exited after - // spawning children, but didn't wait for those children to - // exit). + // wait for processes in a loop until every process in this group has + // exited (this ensures that we reap any zombies that may have been + // created if the parent exited after spawning children, but didn't wait + // for those children to exit) let mut parent_exit_status: Option = None; loop { - // we can't use the safe wrapper directly because it doesn't - // return the raw status, and we need it to convert to the - // std's ExitStatus. + // we can't use the safe wrapper directly because it doesn't return + // the raw status, and we need it to convert to the std's ExitStatus let mut status: i32 = 0; - match unsafe { libc::waitpid(-pgid.as_raw(), &mut status as *mut libc::c_int, flag.bits()) } { + match unsafe { + libc::waitpid(-pgid.as_raw(), &mut status as *mut libc::c_int, flag.bits()) + } { 0 => { - // Zero should only happen if WNOHANG was passed in, - // and means that no processes have yet to exit. + // zero should only happen if WNOHANG was passed in, + // and means that no processes have yet to exit return Ok(ControlFlow::Continue(())); } -1 => { match Errno::last() { Errno::ECHILD => { - // No more children to reap; this is a - // graceful exit. + // no more children to reap; this is a graceful exit return Ok(ControlFlow::Break(parent_exit_status)); } errno => { @@ -117,14 +134,13 @@ impl ProcessGroupChild { } } pid => { - // *A* process exited. Was it the parent process - // that we started? If so, collect the exit signal, - // otherwise we reaped a zombie process and should - // continue in the loop. + // a process exited. was it the parent process that we + // started? if so, collect the exit signal, otherwise we + // reaped a zombie process and should continue looping if pgid == Pid::from_raw(pid) { parent_exit_status = Some(ExitStatus::from_raw(status)); } else { - // Reaped a zombie child; keep looping. + // reaped a zombie child; keep looping } } }; @@ -132,7 +148,7 @@ impl ProcessGroupChild { } } -impl super::core::TokioChildWrapper for ProcessGroupChild { +impl TokioChildWrapper for ProcessGroupChild { fn inner(&self) -> &Child { self.inner.inner() } @@ -157,37 +173,51 @@ impl super::core::TokioChildWrapper for ProcessGroupChild { fn wait(&mut self) -> Box> + '_> { Box::new(async { + if let ChildExitStatus::Exited(status) = &self.exit_status { + return Ok(*status); + } + const MAX_RETRY_ATTEMPT: usize = 10; + let pgid = self.pgid; - // Always wait for parent to exit first. - // - // It's likely that all its children has already exited and reaped by - // the time the parent exits. + // always wait for parent to exit first, as by the time it does, + // it's likely that all its children have already been reaped. let status = Box::into_pin(self.inner.wait()).await?; + self.exit_status = ChildExitStatus::Exited(status); - // Try reaping all children, if there are some that are still alive after - // several attempts, then spawn a blocking task to reap them. - for retry_attempt in 1..=MAX_RETRY_ATTEMPT { - if Self::wait_imp(self.pgid, WaitPidFlag::WNOHANG)?.is_break() { - break; - } else if retry_attempt == MAX_RETRY_ATTEMPT { - let pgid = self.pgid; - pin!(spawn_blocking(move || Self::wait_imp( - pgid, - WaitPidFlag::empty() - ))) - .await??; + // nevertheless, now try reaping all children a few times... + for _ in 1..MAX_RETRY_ATTEMPT { + if Self::wait_imp(pgid, WaitPidFlag::WNOHANG)?.is_break() { + return Ok(status); } } + // ...finally, if there are some that are still alive, + // block in the background to reap them fully. + spawn_blocking(move || Self::wait_imp(pgid, WaitPidFlag::empty())).await??; Ok(status) }) } fn try_wait(&mut self) -> Result> { + if let ChildExitStatus::Exited(status) = &self.exit_status { + return Ok(Some(*status)); + } + match Self::wait_imp(self.pgid, WaitPidFlag::WNOHANG)? { - ControlFlow::Break(res) => Ok(res), - ControlFlow::Continue(()) => self.inner.try_wait(), + ControlFlow::Break(res) => { + if let Some(status) = res { + self.exit_status = ChildExitStatus::Exited(status); + } + Ok(res) + } + ControlFlow::Continue(()) => { + let exited = self.inner.try_wait()?; + if let Some(exited) = exited { + self.exit_status = ChildExitStatus::Exited(exited); + } + Ok(exited) + } } } diff --git a/src/tokio/process_session.rs b/src/tokio/process_session.rs index 3ac0c26..be73c93 100644 --- a/src/tokio/process_session.rs +++ b/src/tokio/process_session.rs @@ -28,6 +28,6 @@ impl super::core::TokioCommandWrapper for ProcessSession { .expect("Command PID > i32::MAX"), ); - Ok(Box::new(super::ProcessGroupChild { inner, pgid })) + Ok(Box::new(super::ProcessGroupChild::new(inner, pgid))) } } diff --git a/tests/tokio_unix.rs b/tests/tokio_unix.rs index 1dce420..f375b99 100644 --- a/tests/tokio_unix.rs +++ b/tests/tokio_unix.rs @@ -4,19 +4,16 @@ use std::{ io::Result, // os::unix::process::ExitStatusExt, process::Stdio, - // time::Duration, + time::Duration, }; use process_wrap::tokio::*; use tokio::{ - io::{ - AsyncReadExt, - // AsyncWriteExt - }, - // time::sleep, + io::{AsyncReadExt, AsyncWriteExt}, + time::sleep, }; -// const DIE_TIME: Duration = Duration::from_millis(100); +const DIE_TIME: Duration = Duration::from_millis(100); // each test has a _nowrap variant that uses the process-wrap API but doesn't apply any Wrappers for comparison/debugging. @@ -70,20 +67,19 @@ async fn inner_read_stdout_process_session() -> Result<()> { Ok(()) } -/* #[tokio::test] async fn into_inner_write_stdin_nowrap() -> Result<()> { - let mut child = Command::new("cat") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn()?; + let mut child = TokioCommandWrap::with_new("cat", |command| { + command.stdin(Stdio::piped()).stdout(Stdio::piped()); + }) + .spawn()?; - if let Some(mut din) = child.stdin.take() { + if let Some(mut din) = child.stdin().take() { din.write_all(b"hello").await?; } let mut output = String::new(); - if let Some(mut out) = child.stdout.take() { + if let Some(mut out) = child.stdout().take() { out.read_to_string(&mut output).await?; } @@ -92,19 +88,42 @@ async fn into_inner_write_stdin_nowrap() -> Result<()> { } #[tokio::test] -async fn into_inner_write_stdin_group() -> Result<()> { - let mut child = Command::new("cat") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .group_spawn()? - .into_inner(); +async fn into_inner_write_stdin_process_group() -> Result<()> { + let mut child = TokioCommandWrap::with_new("cat", |command| { + command.stdin(Stdio::piped()).stdout(Stdio::piped()); + }) + .wrap(ProcessGroup::leader()) + .spawn()? + .into_inner(); + + if let Some(mut din) = child.stdin().take() { + din.write_all(b"hello").await?; + } + + let mut output = String::new(); + if let Some(mut out) = child.stdout().take() { + out.read_to_string(&mut output).await?; + } + + assert_eq!(output.as_str(), "hello"); + Ok(()) +} - if let Some(mut din) = child.stdin.take() { +#[tokio::test] +async fn into_inner_write_stdin_process_session() -> Result<()> { + let mut child = TokioCommandWrap::with_new("cat", |command| { + command.stdin(Stdio::piped()).stdout(Stdio::piped()); + }) + .wrap(ProcessSession) + .spawn()? + .into_inner(); + + if let Some(mut din) = child.stdin().take() { din.write_all(b"hello").await?; } let mut output = String::new(); - if let Some(mut out) = child.stdout.take() { + if let Some(mut out) = child.stdout().take() { out.read_to_string(&mut output).await?; } @@ -112,30 +131,111 @@ async fn into_inner_write_stdin_group() -> Result<()> { Ok(()) } +#[tokio::test] +async fn wait_after_die_nowrap() -> Result<()> { + let mut child = TokioCommandWrap::with_new("echo", |command| { + command.stdout(Stdio::null()); + }) + .spawn()?; + sleep(DIE_TIME).await; + + let status = Box::into_pin(child.wait()).await?; + assert!(status.success()); + + Ok(()) +} + +#[tokio::test] +async fn wait_after_die_process_group() -> Result<()> { + let mut child = TokioCommandWrap::with_new("echo", |command| { + command.stdout(Stdio::null()); + }) + .wrap(ProcessGroup::leader()) + .spawn()?; + sleep(DIE_TIME).await; + + let status = Box::into_pin(child.wait()).await?; + assert!(status.success()); + + Ok(()) +} + +#[tokio::test] +async fn wait_after_die_process_session() -> Result<()> { + let mut child = TokioCommandWrap::with_new("echo", |command| { + command.stdout(Stdio::null()); + }) + .wrap(ProcessSession) + .spawn()?; + sleep(DIE_TIME).await; + + let status = Box::into_pin(child.wait()).await?; + assert!(status.success()); + + Ok(()) +} + #[tokio::test] async fn kill_and_try_wait_nowrap() -> Result<()> { - let mut child = Command::new("yes").stdout(Stdio::null()).spawn()?; - assert!(child.try_wait()?.is_none()); - child.kill().await?; + let mut child = TokioCommandWrap::with_new("yes", |command| { + command.stdout(Stdio::null()); + }) + .spawn()?; + assert!(child.try_wait()?.is_none(), "pre kill"); + + Box::into_pin(child.kill()).await?; sleep(DIE_TIME).await; - assert!(child.try_wait()?.is_some()); + assert!(child.try_wait()?.is_some(), "try_wait one"); + sleep(DIE_TIME).await; - assert!(child.try_wait()?.is_some()); + assert!(child.try_wait()?.is_some(), "try_wait two"); + Ok(()) } #[tokio::test] -async fn kill_and_try_wait_group() -> Result<()> { - let mut child = Command::new("yes").stdout(Stdio::null()).group_spawn()?; - assert!(child.try_wait()?.is_none()); - child.kill().await?; +async fn kill_and_try_wait_group_process_group() -> Result<()> { + let mut child = TokioCommandWrap::with_new("yes", |command| { + command.stdout(Stdio::null()); + }) + .wrap(ProcessGroup::leader()) + .spawn()?; + assert!(child.try_wait()?.is_none(), "pre kill"); + + Box::into_pin(child.kill()).await?; + + let status = Box::into_pin(child.wait()).await?; + assert!(!status.success()); + sleep(DIE_TIME).await; - assert!(child.try_wait()?.is_some()); + assert!(child.try_wait()?.is_some(), "try_wait one"); + sleep(DIE_TIME).await; - assert!(child.try_wait()?.is_some()); + assert!(child.try_wait()?.is_some(), "try_wait two"); + Ok(()) } +#[tokio::test] +async fn kill_and_try_wait_group_process_session() -> Result<()> { + let mut child = TokioCommandWrap::with_new("yes", |command| { + command.stdout(Stdio::null()); + }) + .wrap(ProcessSession) + .spawn()?; + assert!(child.try_wait()?.is_none(), "pre kill"); + + Box::into_pin(child.kill()).await?; + sleep(DIE_TIME).await; + assert!(child.try_wait()?.is_some(), "try_wait one"); + + sleep(DIE_TIME).await; + assert!(child.try_wait()?.is_some(), "try_wait two"); + + Ok(()) +} + +/* #[tokio::test] async fn try_wait_twice_after_sigterm_nowrap() -> Result<()> { let mut child = Command::new("yes").stdout(Stdio::null()).spawn()?; @@ -200,28 +300,6 @@ async fn wait_twice_after_sigterm_group() -> Result<()> { Ok(()) } -#[tokio::test] -async fn wait_after_die_nowrap() -> Result<()> { - let mut child = Command::new("echo").stdout(Stdio::null()).spawn()?; - sleep(DIE_TIME).await; - - let status = child.wait().await?; - assert!(status.success()); - - Ok(()) -} - -#[tokio::test] -async fn wait_after_die_group() -> Result<()> { - let mut child = Command::new("echo").stdout(Stdio::null()).group_spawn()?; - sleep(DIE_TIME).await; - - let status = child.wait().await?; - assert!(status.success()); - - Ok(()) -} - #[tokio::test] async fn try_wait_after_die_nowrap() -> Result<()> { let mut child = Command::new("echo").stdout(Stdio::null()).spawn()?;