From 2d305719132adfe85ea2cdbae9533034971012d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fe=CC=81lix=20Saparelli?= Date: Mon, 11 Mar 2024 21:22:41 +1300 Subject: [PATCH] wip: impl the std side --- src/lib.rs | 6 +- src/std.rs | 29 ++++++ src/std/core.rs | 104 +++++++++++++++++++ src/std/creation_flags.rs | 15 +++ src/std/job_object.rs | 142 ++++++++++++++++++++++++++ src/std/kill_on_drop.rs | 13 +++ src/std/process_group.rs | 200 +++++++++++++++++++++++++++++++++++++ src/std/process_session.rs | 35 +++++++ 8 files changed, 541 insertions(+), 3 deletions(-) create mode 100644 src/std.rs create mode 100644 src/std/core.rs create mode 100644 src/std/creation_flags.rs create mode 100644 src/std/job_object.rs create mode 100644 src/std/kill_on_drop.rs create mode 100644 src/std/process_group.rs create mode 100644 src/std/process_session.rs diff --git a/src/lib.rs b/src/lib.rs index 3c10981..a887b67 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,8 +5,8 @@ pub(crate) mod generic_wrap; -// #[cfg(feature = "std")] -// pub mod std; +#[cfg(feature = "std")] +pub mod std; #[cfg(feature = "tokio1")] pub mod tokio; @@ -23,5 +23,5 @@ mod windows; #[derive(Debug)] pub(crate) enum ChildExitStatus { Running, - Exited(std::process::ExitStatus), + Exited(::std::process::ExitStatus), } diff --git a/src/std.rs b/src/std.rs new file mode 100644 index 0000000..f0160a5 --- /dev/null +++ b/src/std.rs @@ -0,0 +1,29 @@ +#[doc(inline)] +pub use core::{StdChildWrapper, StdCommandWrap, StdCommandWrapper}; +#[cfg(all(windows, feature = "creation-flags"))] +#[doc(inline)] +pub use creation_flags::CreationFlags; +#[cfg(all(windows, feature = "job-object"))] +#[doc(inline)] +pub use job_object::JobObject; +#[cfg(feature = "kill-on-drop")] +#[doc(inline)] +pub use kill_on_drop::KillOnDrop; +#[cfg(all(unix, feature = "process-group"))] +#[doc(inline)] +pub use process_group::{ProcessGroup, ProcessGroupChild}; +#[cfg(all(unix, feature = "process-session"))] +#[doc(inline)] +pub use process_session::ProcessSession; + +mod core; +#[cfg(all(windows, feature = "creation-flags"))] +mod creation_flags; +#[cfg(all(windows, feature = "job-object"))] +mod job_object; +#[cfg(feature = "kill-on-drop")] +mod kill_on_drop; +#[cfg(all(unix, feature = "process-group"))] +mod process_group; +#[cfg(all(unix, feature = "process-session"))] +mod process_session; diff --git a/src/std/core.rs b/src/std/core.rs new file mode 100644 index 0000000..70d872b --- /dev/null +++ b/src/std/core.rs @@ -0,0 +1,104 @@ +use std::{ + io::Result, + process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitStatus, Output}, +}; + +#[cfg(unix)] +use nix::{ + sys::signal::{kill, Signal}, + unistd::Pid, +}; + +crate::generic_wrap::Wrap!(StdCommandWrap, Command, StdCommandWrapper, StdChildWrapper); + +pub trait StdCommandWrapper: std::fmt::Debug { + // process-wrap guarantees that `other` will be of the same type as `self` + // note that other crates that may use this trait should guarantee this, but + // that cannot be enforced by the type system, so you should still panic if + // downcasting fails, instead of potentially causing UB + fn extend(&mut self, _other: Box) {} + + fn pre_spawn(&mut self, _command: &mut Command, _core: &StdCommandWrap) -> Result<()> { + Ok(()) + } + + fn post_spawn(&mut self, _child: &mut Child, _core: &StdCommandWrap) -> Result<()> { + Ok(()) + } + + fn wrap_child( + &mut self, + child: Box, + _core: &StdCommandWrap, + ) -> Result> { + Ok(child) + } +} + +pub trait StdChildWrapper: std::fmt::Debug + Send + Sync { + fn inner(&self) -> &Child; + fn inner_mut(&mut self) -> &mut Child; + fn into_inner(self: Box) -> Child; + + fn stdin(&mut self) -> &mut Option { + &mut self.inner_mut().stdin + } + + fn stdout(&mut self) -> &mut Option { + &mut self.inner_mut().stdout + } + + fn stderr(&mut self) -> &mut Option { + &mut self.inner_mut().stderr + } + + fn id(&self) -> u32 { + self.inner().id() + } + + fn kill(&mut self) -> Result<()> { + self.start_kill()?; + self.wait()?; + Ok(()) + } + + fn start_kill(&mut self) -> Result<()> { + self.inner_mut().start_kill() + } + + fn try_wait(&mut self) -> Result> { + self.inner_mut().try_wait() + } + + fn wait(&mut self) -> Result { + self.inner_mut().wait() + } + + fn wait_with_output(self: Box) -> Result + where + Self: 'static, + { + todo!() + } + + #[cfg(unix)] + fn signal(&self, sig: Signal) -> Result<()> { + kill( + Pid::from_raw(i32::try_from(self.id()).map_err(std::io::Error::other)?), + sig, + ) + .map_err(std::io::Error::from) + } +} + +impl StdChildWrapper for Child { + fn inner(&self) -> &Child { + self + } + fn inner_mut(&mut self) -> &mut Child { + self + } + fn into_inner(self: Box) -> Child { + *self + } +} diff --git a/src/std/creation_flags.rs b/src/std/creation_flags.rs new file mode 100644 index 0000000..17ae664 --- /dev/null +++ b/src/std/creation_flags.rs @@ -0,0 +1,15 @@ +use std::{io::Result, os::windows::process::CommandExt, process::Command}; + +use windows::Win32::System::Threading::PROCESS_CREATION_FLAGS; + +use super::{StdCommandWrap, StdCommandWrapper}; + +#[derive(Debug, Clone)] +pub struct CreationFlags(pub PROCESS_CREATION_FLAGS); + +impl StdCommandWrapper for CreationFlags { + fn pre_spawn(&mut self, command: &mut Command, _core: &StdCommandWrap) -> Result<()> { + command.creation_flags((self.0).0); + Ok(()) + } +} diff --git a/src/std/job_object.rs b/src/std/job_object.rs new file mode 100644 index 0000000..799135c --- /dev/null +++ b/src/std/job_object.rs @@ -0,0 +1,142 @@ +use std::{ + io::Result, + os::windows::{io::AsRawHandle, process::CommandExt}, + process::{Child, Command, ExitStatus}, + time::Duration, +}; + +use tracing::{debug, instrument}; +use windows::Win32::{ + Foundation::{CloseHandle, HANDLE}, + System::Threading::CREATE_SUSPENDED, +}; + +use crate::{ + windows::{make_job_object, resume_threads, terminate_job, wait_on_job, JobPort}, + ChildExitStatus, +}; + +#[cfg(feature = "creation-flags")] +use super::CreationFlags; +#[cfg(feature = "kill-on-drop")] +use super::KillOnDrop; +use super::{StdChildWrapper, StdCommandWrap, StdCommandWrapper}; + +#[derive(Debug)] +pub struct JobObject; + +impl StdCommandWrapper for JobObject { + #[instrument(level = "debug", skip(self))] + fn pre_spawn(&mut self, command: &mut Command, core: &StdCommandWrap) -> Result<()> { + let mut flags = CREATE_SUSPENDED; + #[cfg(feature = "creation-flags")] + if let Some(CreationFlags(user_flags)) = core.get_wrap::() { + flags |= *user_flags; + } + + command.creation_flags(flags.0); + Ok(()) + } + + #[instrument(level = "debug", skip(self))] + fn wrap_child( + &mut self, + inner: Box, + core: &StdCommandWrap, + ) -> Result> { + #[cfg(feature = "kill-on-drop")] + let kill_on_drop = core.has_wrap::(); + #[cfg(not(feature = "kill-on-drop"))] + let kill_on_drop = false; + + #[cfg(feature = "creation-flags")] + let create_suspended = core + .get_wrap::() + .map_or(false, |flags| flags.0.contains(CREATE_SUSPENDED)); + #[cfg(not(feature = "creation-flags"))] + let create_suspended = false; + + debug!( + ?kill_on_drop, + ?create_suspended, + "options from other wrappers" + ); + + let handle = HANDLE(inner.inner().as_raw_handle() as _); + + let job_port = make_job_object(handle, kill_on_drop)?; + + // only resume if the user didn't specify CREATE_SUSPENDED + if !create_suspended { + resume_threads(handle)?; + } + + Ok(Box::new(JobObjectChild::new(inner, job_port))) + } +} + +#[derive(Debug)] +pub struct JobObjectChild { + inner: Box, + exit_status: ChildExitStatus, + job_port: JobPort, +} + +impl JobObjectChild { + #[instrument(level = "debug", skip(job_port))] + pub(crate) fn new(inner: Box, job_port: JobPort) -> Self { + Self { + inner, + exit_status: ChildExitStatus::Running, + job_port, + } + } +} + +impl StdChildWrapper for JobObjectChild { + fn inner(&self) -> &Child { + self.inner.inner() + } + fn inner_mut(&mut self) -> &mut Child { + self.inner.inner_mut() + } + fn into_inner(self: Box) -> Child { + // manually drop the completion port + let its = std::mem::ManuallyDrop::new(self.job_port); + unsafe { CloseHandle(its.completion_port) }.ok(); + // we leave the job handle unclosed, otherwise the Child is useless + // (as closing it will terminate the job) + + self.inner.into_inner() + } + + #[instrument(level = "debug", skip(self))] + fn start_kill(&mut self) -> Result<()> { + terminate_job(self.job_port.job, 1) + } + + #[instrument(level = "debug", skip(self))] + fn wait(&mut self) -> Result { + if let ChildExitStatus::Exited(status) = &self.exit_status { + return Ok(*status); + } + + // always wait for parent to exit first, as by the time it does, + // it's likely that all its children have already exited. + let status = self.inner.wait()?; + self.exit_status = ChildExitStatus::Exited(status); + + // nevertheless, now wait and make sure we reap all children. + let JobPort { + completion_port, .. + } = self.job_port; + wait_on_job(completion_port, None)?; + Ok(status) + } + + #[instrument(level = "debug", skip(self))] + fn try_wait(&mut self) -> Result> { + wait_on_job(self.job_port.completion_port, Some(Duration::ZERO))?; + self.inner.try_wait() + } +} diff --git a/src/std/kill_on_drop.rs b/src/std/kill_on_drop.rs new file mode 100644 index 0000000..5ed60ea --- /dev/null +++ b/src/std/kill_on_drop.rs @@ -0,0 +1,13 @@ +use std::{io::Result, process::Command}; + +use super::{StdCommandWrap, StdCommandWrapper}; + +#[derive(Debug)] +pub struct KillOnDrop; + +impl StdCommandWrapper for KillOnDrop { + fn pre_spawn(&mut self, command: &mut Command, _core: &StdCommandWrap) -> Result<()> { + command.kill_on_drop(true); + Ok(()) + } +} diff --git a/src/std/process_group.rs b/src/std/process_group.rs new file mode 100644 index 0000000..00ed934 --- /dev/null +++ b/src/std/process_group.rs @@ -0,0 +1,200 @@ +use std::{ + io::{Error, Result}, + ops::ControlFlow, + os::unix::process::{CommandExt, ExitStatusExt}, + process::{Child, Command, ExitStatus}, +}; + +use nix::{ + errno::Errno, + libc, + sys::{ + signal::{killpg, Signal}, + wait::WaitPidFlag, + }, + unistd::{setpgid, Pid}, +}; +use tracing::instrument; + +use crate::ChildExitStatus; + +use super::{StdChildWrapper, StdCommandWrap, StdCommandWrapper}; + +#[derive(Debug, Clone)] +pub struct ProcessGroup { + leader: Pid, +} + +impl ProcessGroup { + pub fn leader() -> Self { + Self { + leader: Pid::from_raw(0), + } + } + + pub fn attach_to(leader: Pid) -> Self { + Self { leader } + } +} + +#[derive(Debug)] +pub struct ProcessGroupChild { + inner: Box, + exit_status: ChildExitStatus, + pgid: Pid, +} + +impl ProcessGroupChild { + #[instrument(level = "debug")] + pub(crate) fn new(inner: Box, pgid: Pid) -> Self { + Self { + inner, + exit_status: ChildExitStatus::Running, + pgid, + } + } +} + +impl StdCommandWrapper for ProcessGroup { + #[instrument(level = "debug", skip(self))] + fn pre_spawn(&mut self, command: &mut Command, _core: &StdCommandWrap) -> Result<()> { + #[cfg(Std_unstable)] + { + command.process_group(self.leader.as_raw()); + } + + #[cfg(not(Std_unstable))] + let leader = self.leader; + unsafe { + command.pre_exec(move || { + setpgid(Pid::this(), leader) + .map_err(Error::from) + .map(|_| ()) + }); + } + + Ok(()) + } + + #[instrument(level = "debug", skip(self))] + fn wrap_child( + &mut self, + inner: Box, + _core: &StdCommandWrap, + ) -> Result> { + let pgid = Pid::from_raw(i32::try_from(inner.id()).expect("Command PID > i32::MAX")); + + Ok(Box::new(ProcessGroupChild::new(inner, pgid))) + } +} + +impl ProcessGroupChild { + #[instrument(level = "debug", skip(self))] + fn signal_imp(&self, sig: Signal) -> Result<()> { + killpg(self.pgid, sig).map_err(Error::from) + } + + #[instrument(level = "debug")] + fn wait_imp(pgid: Pid, flag: WaitPidFlag) -> Result>> { + // wait for processes in a loop until every process in this group has + // exited (this ensures that we reap any zombies that may have been + // created if the parent exited after spawning children, but didn't wait + // for those children to exit) + let mut parent_exit_status: Option = None; + loop { + // we can't use the safe wrapper directly because it doesn't return + // the raw status, and we need it to convert to the std's ExitStatus + let mut status: i32 = 0; + match unsafe { + libc::waitpid(-pgid.as_raw(), &mut status as *mut libc::c_int, flag.bits()) + } { + 0 => { + // zero should only happen if WNOHANG was passed in, + // and means that no processes have yet to exit + return Ok(ControlFlow::Continue(())); + } + -1 => { + match Errno::last() { + Errno::ECHILD => { + // no more children to reap; this is a graceful exit + return Ok(ControlFlow::Break(parent_exit_status)); + } + errno => { + return Err(Error::from(errno)); + } + } + } + pid => { + // a process exited. was it the parent process that we + // started? if so, collect the exit signal, otherwise we + // reaped a zombie process and should continue looping + if pgid == Pid::from_raw(pid) { + parent_exit_status = Some(ExitStatus::from_raw(status)); + } else { + // reaped a zombie child; keep looping + } + } + }; + } + } +} + +impl StdChildWrapper for ProcessGroupChild { + fn inner(&self) -> &Child { + self.inner.inner() + } + fn inner_mut(&mut self) -> &mut Child { + self.inner.inner_mut() + } + fn into_inner(self: Box) -> Child { + self.inner.into_inner() + } + + #[instrument(level = "debug", skip(self))] + fn start_kill(&mut self) -> Result<()> { + self.signal_imp(Signal::SIGKILL) + } + + #[instrument(level = "debug", skip(self))] + fn wait(&mut self) -> Result { + if let ChildExitStatus::Exited(status) = &self.exit_status { + return Ok(*status); + } + + // always wait for parent to exit first, as by the time it does, + // it's likely that all its children have already been reaped. + let status = self.inner.wait()?; + self.exit_status = ChildExitStatus::Exited(status); + + // nevertheless, now wait and make sure we reap all children. + Self::wait_imp(self.pgid, WaitPidFlag::empty())?; + Ok(status) + } + + #[instrument(level = "debug", skip(self))] + fn try_wait(&mut self) -> Result> { + if let ChildExitStatus::Exited(status) = &self.exit_status { + return Ok(Some(*status)); + } + + match Self::wait_imp(self.pgid, WaitPidFlag::WNOHANG)? { + ControlFlow::Break(res) => { + if let Some(status) = res { + self.exit_status = ChildExitStatus::Exited(status); + } + Ok(res) + } + ControlFlow::Continue(()) => { + let exited = self.inner.try_wait()?; + if let Some(exited) = exited { + self.exit_status = ChildExitStatus::Exited(exited); + } + Ok(exited) + } + } + } + + fn signal(&self, sig: Signal) -> Result<()> { + self.signal_imp(sig) + } +} diff --git a/src/std/process_session.rs b/src/std/process_session.rs new file mode 100644 index 0000000..27b1144 --- /dev/null +++ b/src/std/process_session.rs @@ -0,0 +1,35 @@ +use std::{ + io::{Error, Result}, + os::unix::process::CommandExt, + process::Command, +}; + +use nix::unistd::{setsid, Pid}; +use tracing::instrument; + +use super::{StdCommandWrap, StdCommandWrapper}; + +#[derive(Debug, Clone)] +pub struct ProcessSession; + +impl StdCommandWrapper for ProcessSession { + #[instrument(level = "debug", skip(self))] + fn pre_spawn(&mut self, command: &mut Command, _core: &StdCommandWrap) -> Result<()> { + unsafe { + command.pre_exec(move || setsid().map_err(Error::from).map(|_| ())); + } + + Ok(()) + } + + #[instrument(level = "debug", skip(self))] + fn wrap_child( + &mut self, + inner: Box, + _core: &StdCommandWrap, + ) -> Result> { + let pgid = Pid::from_raw(i32::try_from(inner.id()).expect("Command PID > i32::MAX")); + + Ok(Box::new(super::ProcessGroupChild::new(inner, pgid))) + } +}