Skip to content

Commit

Permalink
wip: fuse process-group child for correct tests
Browse files Browse the repository at this point in the history
  • Loading branch information
passcod committed Mar 9, 2024
1 parent 8355004 commit ecb7577
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 134 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 13 additions & 31 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,19 @@ rust-version = "1.75.0"
# there are a few windows-specific ones
autoexamples = false

[dependencies.tokio]
version = "1.33.0"
features = ["io-util", "macros", "process", "rt"]
optional = true

[target.'cfg(unix)'.dependencies.nix]
version = "0.27.1"
default-features = false
features = ["fs", "poll", "signal"]
optional = true

[target.'cfg(windows)'.dependencies.winapi]
version = "0.3.9"
features = [
"impl-default",
"handleapi",
"ioapiset",
"jobapi2",
"processthreadsapi",
"tlhelp32",
"winbase",
]
optional = true
[dependencies]
indexmap = "2.2.5"
tokio = { version = "1.33.0", features = ["io-util", "macros", "process", "rt"], optional = true }
tracing = "0.1.40"

[target.'cfg(unix)'.dependencies]
nix = { version = "0.27.1", default-features = false, features = ["fs", "poll", "signal"], optional = true }

[target.'cfg(windows)'.dependencies]
winapi = { version = "0.3.9", features = ["impl-default", "handleapi", "ioapiset", "jobapi2", "processthreadsapi", "tlhelp32", "winbase"], optional = true }

[dev-dependencies]
tokio = { version = "1.33.0", features = ["io-util", "macros", "process", "rt", "rt-multi-thread", "time"] }

[features]
default = ["std", "tokio1", "creation-flags", "job-object", "process-group", "process-session"]
Expand All @@ -67,13 +57,5 @@ process-session = ["dep:nix", "process-group"]
## Wrapper: Pty (TODO)
pty = []


[dev-dependencies]
tokio = { version = "1.10.0", features = ["io-util", "macros", "process", "rt", "rt-multi-thread", "time"] }

[package.metadata.docs.rs]
all-features = true

[dependencies]
indexmap = "2.2.5"
tracing = "0.1.40"
2 changes: 1 addition & 1 deletion src/tokio/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub trait TokioCommandWrapper: std::fmt::Debug {
}
}

pub trait TokioChildWrapper: std::fmt::Debug {
pub trait TokioChildWrapper: std::fmt::Debug + Send + Sync {
fn inner(&self) -> &Child;
fn inner_mut(&mut self) -> &mut Child;
fn into_inner(self: Box<Self>) -> Child;
Expand Down
118 changes: 74 additions & 44 deletions src/tokio/process_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
io::{Error, Result},
ops::ControlFlow,
os::unix::process::ExitStatusExt,
pin::pin,
process::ExitStatus,
};

Expand All @@ -21,6 +20,8 @@ use tokio::{
task::spawn_blocking,
};

use super::{TokioChildWrapper, TokioCommandWrapper};

#[derive(Debug, Clone)]
pub struct ProcessGroup {
leader: Pid,
Expand All @@ -40,11 +41,28 @@ impl ProcessGroup {

#[derive(Debug)]
pub struct ProcessGroupChild {
pub(crate) inner: Box<dyn super::core::TokioChildWrapper>,
pub(crate) pgid: Pid,
inner: Box<dyn TokioChildWrapper>,
exit_status: ChildExitStatus,
pgid: Pid,
}

#[derive(Debug)]
enum ChildExitStatus {
Running,
Exited(ExitStatus),
}

impl ProcessGroupChild {
pub(crate) fn new(inner: Box<dyn TokioChildWrapper>, pgid: Pid) -> Self {
Self {
inner,
exit_status: ChildExitStatus::Running,
pgid,
}
}
}

impl super::core::TokioCommandWrapper for ProcessGroup {
impl TokioCommandWrapper for ProcessGroup {
fn pre_spawn(&mut self, command: &mut Command) -> Result<()> {
#[cfg(tokio_unstable)]
{
Expand All @@ -66,8 +84,8 @@ impl super::core::TokioCommandWrapper for ProcessGroup {

fn wrap_child(
&mut self,
inner: Box<dyn super::core::TokioChildWrapper>,
) -> Result<Box<dyn super::core::TokioChildWrapper>> {
inner: Box<dyn TokioChildWrapper>,
) -> Result<Box<dyn TokioChildWrapper>> {
let pgid = Pid::from_raw(
i32::try_from(
inner
Expand All @@ -77,7 +95,7 @@ impl super::core::TokioCommandWrapper for ProcessGroup {
.expect("Command PID > i32::MAX"),
);

Ok(Box::new(ProcessGroupChild { inner, pgid }))
Ok(Box::new(ProcessGroupChild::new(inner, pgid)))
}
}

Expand All @@ -87,28 +105,27 @@ impl ProcessGroupChild {
}

fn wait_imp(pgid: Pid, flag: WaitPidFlag) -> Result<ControlFlow<Option<ExitStatus>>> {
// Wait for processes in a loop until every process in this
// process group has exited (this ensures that we reap any
// zombies that may have been created if the parent exited after
// spawning children, but didn't wait for those children to
// exit).
// wait for processes in a loop until every process in this group has
// exited (this ensures that we reap any zombies that may have been
// created if the parent exited after spawning children, but didn't wait
// for those children to exit)
let mut parent_exit_status: Option<ExitStatus> = None;
loop {
// we can't use the safe wrapper directly because it doesn't
// return the raw status, and we need it to convert to the
// std's ExitStatus.
// we can't use the safe wrapper directly because it doesn't return
// the raw status, and we need it to convert to the std's ExitStatus
let mut status: i32 = 0;
match unsafe { libc::waitpid(-pgid.as_raw(), &mut status as *mut libc::c_int, flag.bits()) } {
match unsafe {
libc::waitpid(-pgid.as_raw(), &mut status as *mut libc::c_int, flag.bits())
} {
0 => {
// Zero should only happen if WNOHANG was passed in,
// and means that no processes have yet to exit.
// zero should only happen if WNOHANG was passed in,
// and means that no processes have yet to exit
return Ok(ControlFlow::Continue(()));
}
-1 => {
match Errno::last() {
Errno::ECHILD => {
// No more children to reap; this is a
// graceful exit.
// no more children to reap; this is a graceful exit
return Ok(ControlFlow::Break(parent_exit_status));
}
errno => {
Expand All @@ -117,22 +134,21 @@ impl ProcessGroupChild {
}
}
pid => {
// *A* process exited. Was it the parent process
// that we started? If so, collect the exit signal,
// otherwise we reaped a zombie process and should
// continue in the loop.
// a process exited. was it the parent process that we
// started? if so, collect the exit signal, otherwise we
// reaped a zombie process and should continue looping
if pgid == Pid::from_raw(pid) {
parent_exit_status = Some(ExitStatus::from_raw(status));
} else {
// Reaped a zombie child; keep looping.
// reaped a zombie child; keep looping
}
}
};
}
}
}

impl super::core::TokioChildWrapper for ProcessGroupChild {
impl TokioChildWrapper for ProcessGroupChild {
fn inner(&self) -> &Child {
self.inner.inner()
}
Expand All @@ -157,37 +173,51 @@ impl super::core::TokioChildWrapper for ProcessGroupChild {

fn wait(&mut self) -> Box<dyn Future<Output = Result<ExitStatus>> + '_> {
Box::new(async {
if let ChildExitStatus::Exited(status) = &self.exit_status {
return Ok(*status);
}

const MAX_RETRY_ATTEMPT: usize = 10;
let pgid = self.pgid;

// Always wait for parent to exit first.
//
// It's likely that all its children has already exited and reaped by
// the time the parent exits.
// always wait for parent to exit first, as by the time it does,
// it's likely that all its children have already been reaped.
let status = Box::into_pin(self.inner.wait()).await?;
self.exit_status = ChildExitStatus::Exited(status);

// Try reaping all children, if there are some that are still alive after
// several attempts, then spawn a blocking task to reap them.
for retry_attempt in 1..=MAX_RETRY_ATTEMPT {
if Self::wait_imp(self.pgid, WaitPidFlag::WNOHANG)?.is_break() {
break;
} else if retry_attempt == MAX_RETRY_ATTEMPT {
let pgid = self.pgid;
pin!(spawn_blocking(move || Self::wait_imp(
pgid,
WaitPidFlag::empty()
)))
.await??;
// nevertheless, now try reaping all children a few times...
for _ in 1..MAX_RETRY_ATTEMPT {
if Self::wait_imp(pgid, WaitPidFlag::WNOHANG)?.is_break() {
return Ok(status);
}
}

// ...finally, if there are some that are still alive,
// block in the background to reap them fully.
spawn_blocking(move || Self::wait_imp(pgid, WaitPidFlag::empty())).await??;
Ok(status)
})
}

fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
if let ChildExitStatus::Exited(status) = &self.exit_status {
return Ok(Some(*status));
}

match Self::wait_imp(self.pgid, WaitPidFlag::WNOHANG)? {
ControlFlow::Break(res) => Ok(res),
ControlFlow::Continue(()) => self.inner.try_wait(),
ControlFlow::Break(res) => {
if let Some(status) = res {
self.exit_status = ChildExitStatus::Exited(status);
}
Ok(res)
}
ControlFlow::Continue(()) => {
let exited = self.inner.try_wait()?;
if let Some(exited) = exited {
self.exit_status = ChildExitStatus::Exited(exited);
}
Ok(exited)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/tokio/process_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,6 @@ impl super::core::TokioCommandWrapper for ProcessSession {
.expect("Command PID > i32::MAX"),
);

Ok(Box::new(super::ProcessGroupChild { inner, pgid }))
Ok(Box::new(super::ProcessGroupChild::new(inner, pgid)))
}
}
Loading

0 comments on commit ecb7577

Please sign in to comment.