Skip to content

Commit

Permalink
feat: instrument internals (with tracing)
Browse files Browse the repository at this point in the history
  • Loading branch information
passcod committed Mar 11, 2024
1 parent 782bf42 commit 9546d12
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 11 deletions.
43 changes: 43 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ windows = { version = "0.53.0", optional = true }

[dev-dependencies]
tokio = { version = "1.33.0", features = ["io-util", "macros", "process", "rt", "rt-multi-thread", "time"] }
tracing-subscriber = "0.3.18"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[features]
default = ["creation-flags", "job-object", "process-group", "process-session"]
Expand Down
1 change: 1 addition & 0 deletions src/tokio/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tokio::{
};
use tracing::debug;

#[derive(Debug)]
pub struct TokioCommandWrap {
command: Command,
wrappers: IndexMap<TypeId, Box<dyn TokioCommandWrapper>>,
Expand Down
31 changes: 22 additions & 9 deletions src/tokio/job_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use tokio::{
process::{Child, Command},
task::spawn_blocking,
};
use tracing::{debug, instrument};
use windows::Win32::{
Foundation::{CloseHandle, HANDLE},
System::Threading::CREATE_SUSPENDED,
Expand All @@ -20,10 +21,11 @@ use super::CreationFlags;
use super::KillOnDrop;
use super::{TokioChildWrapper, TokioCommandWrap, TokioCommandWrapper};

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct JobObject;

impl TokioCommandWrapper for JobObject {
#[instrument(level = "debug", skip(self))]
fn pre_spawn(&mut self, command: &mut Command, core: &TokioCommandWrap) -> Result<()> {
let mut flags = CREATE_SUSPENDED;
#[cfg(feature = "creation-flags")]
Expand All @@ -35,6 +37,7 @@ impl TokioCommandWrapper for JobObject {
Ok(())
}

#[instrument(level = "debug", skip(self))]
fn wrap_child(
&mut self,
inner: Box<dyn TokioChildWrapper>,
Expand All @@ -45,6 +48,19 @@ impl TokioCommandWrapper for JobObject {
#[cfg(not(feature = "kill-on-drop"))]
let kill_on_drop = false;

#[cfg(feature = "creation-flags")]
let create_suspended = core
.get_wrap::<CreationFlags>()
.map_or(false, |flags| flags.0.contains(CREATE_SUSPENDED));
#[cfg(not(feature = "creation-flags"))]
let create_suspended = false;

debug!(
?kill_on_drop,
?create_suspended,
"options from other wrappers"
);

let handle = HANDLE(
inner
.inner()
Expand All @@ -55,14 +71,7 @@ impl TokioCommandWrapper for JobObject {
let job_port = make_job_object(handle, kill_on_drop)?;

// only resume if the user didn't specify CREATE_SUSPENDED
#[cfg(feature = "creation-flags")]
let resume = core
.get_wrap::<CreationFlags>()
.map_or(false, |flags| !flags.0.contains(CREATE_SUSPENDED));
#[cfg(not(feature = "creation-flags"))]
let resume = true;

if resume {
if !create_suspended {
resume_threads(handle)?;
}

Expand All @@ -78,6 +87,7 @@ pub struct JobObjectChild {
}

impl JobObjectChild {
#[instrument(level = "debug", skip(job_port))]
pub(crate) fn new(inner: Box<dyn TokioChildWrapper>, job_port: JobPort) -> Self {
Self {
inner,
Expand All @@ -104,10 +114,12 @@ impl TokioChildWrapper for JobObjectChild {
self.inner.into_inner()
}

#[instrument(level = "debug", skip(self))]
fn start_kill(&mut self) -> Result<()> {
terminate_job(self.job_port.job, 1)
}

#[instrument(level = "debug", skip(self))]
fn wait(&mut self) -> Box<dyn Future<Output = Result<ExitStatus>> + '_> {
Box::new(async {
if let ChildExitStatus::Exited(status) = &self.exit_status {
Expand Down Expand Up @@ -138,6 +150,7 @@ impl TokioChildWrapper for JobObjectChild {
})
}

#[instrument(level = "debug", skip(self))]
fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
wait_on_job(self.job_port.completion_port, Some(Duration::ZERO))?;
self.inner.try_wait()
Expand Down
2 changes: 1 addition & 1 deletion src/tokio/kill_on_drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tokio::process::Command;

use super::{TokioCommandWrap, TokioCommandWrapper};

#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct KillOnDrop;

impl TokioCommandWrapper for KillOnDrop {
Expand Down
9 changes: 9 additions & 0 deletions src/tokio/process_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use tokio::{
process::{Child, Command},
task::spawn_blocking,
};
use tracing::instrument;

use crate::ChildExitStatus;

Expand Down Expand Up @@ -49,6 +50,7 @@ pub struct ProcessGroupChild {
}

impl ProcessGroupChild {
#[instrument(level = "debug")]
pub(crate) fn new(inner: Box<dyn TokioChildWrapper>, pgid: Pid) -> Self {
Self {
inner,
Expand All @@ -59,6 +61,7 @@ impl ProcessGroupChild {
}

impl TokioCommandWrapper for ProcessGroup {
#[instrument(level = "debug", skip(self))]
fn pre_spawn(&mut self, command: &mut Command, _core: &TokioCommandWrap) -> Result<()> {
#[cfg(tokio_unstable)]
{
Expand All @@ -78,6 +81,7 @@ impl TokioCommandWrapper for ProcessGroup {
Ok(())
}

#[instrument(level = "debug", skip(self))]
fn wrap_child(
&mut self,
inner: Box<dyn TokioChildWrapper>,
Expand All @@ -97,10 +101,12 @@ impl TokioCommandWrapper for ProcessGroup {
}

impl ProcessGroupChild {
#[instrument(level = "debug", skip(self))]
fn signal_imp(&self, sig: Signal) -> Result<()> {
killpg(self.pgid, sig).map_err(Error::from)
}

#[instrument(level = "debug")]
fn wait_imp(pgid: Pid, flag: WaitPidFlag) -> Result<ControlFlow<Option<ExitStatus>>> {
// wait for processes in a loop until every process in this group has
// exited (this ensures that we reap any zombies that may have been
Expand Down Expand Up @@ -156,10 +162,12 @@ impl TokioChildWrapper for ProcessGroupChild {
self.inner.into_inner()
}

#[instrument(level = "debug", skip(self))]
fn start_kill(&mut self) -> Result<()> {
self.signal_imp(Signal::SIGKILL)
}

#[instrument(level = "debug", skip(self))]
fn wait(&mut self) -> Box<dyn Future<Output = Result<ExitStatus>> + '_> {
Box::new(async {
if let ChildExitStatus::Exited(status) = &self.exit_status {
Expand Down Expand Up @@ -188,6 +196,7 @@ impl TokioChildWrapper for ProcessGroupChild {
})
}

#[instrument(level = "debug", skip(self))]
fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
if let ChildExitStatus::Exited(status) = &self.exit_status {
return Ok(Some(*status));
Expand Down
3 changes: 3 additions & 0 deletions src/tokio/process_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use std::io::{Error, Result};

use nix::unistd::{setsid, Pid};
use tokio::process::Command;
use tracing::instrument;

use super::{TokioCommandWrap, TokioCommandWrapper};

#[derive(Debug, Clone)]
pub struct ProcessSession;

impl TokioCommandWrapper for ProcessSession {
#[instrument(level = "debug", skip(self))]
fn pre_spawn(&mut self, command: &mut Command, _core: &TokioCommandWrap) -> Result<()> {
unsafe {
command.pre_exec(move || setsid().map_err(Error::from).map(|_| ()));
Expand All @@ -17,6 +19,7 @@ impl TokioCommandWrapper for ProcessSession {
Ok(())
}

#[instrument(level = "debug", skip(self))]
fn wrap_child(
&mut self,
inner: Box<dyn super::core::TokioChildWrapper>,
Expand Down
12 changes: 12 additions & 0 deletions src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
time::Duration,
};

use tracing::{debug, instrument};
use windows::Win32::{
Foundation::{CloseHandle, HANDLE},
System::{
Expand Down Expand Up @@ -46,10 +47,13 @@ unsafe impl Sync for JobPort {}
///
/// If `kill_on_drop` is true, we opt into the `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE` flag, which
/// essentially implements the "reap children" feature of Unix systems directly in Win32.
#[instrument(level = "debug")]
pub(crate) fn make_job_object(handle: HANDLE, kill_on_drop: bool) -> Result<JobPort> {
let job = unsafe { CreateJobObjectW(None, None) }.map_err(Error::other)?;
debug!(?job, "done CreateJobObjectW");

let completion_port = unsafe { CreateIoCompletionPort(None, None, 0, 1) }?;
debug!(?completion_port, "done CreateIoCompletionPort");

let associate_completion = JOBOBJECT_ASSOCIATE_COMPLETION_PORT {
CompletionKey: job.0 as _,
Expand All @@ -66,6 +70,7 @@ pub(crate) fn make_job_object(handle: HANDLE, kill_on_drop: bool) -> Result<JobP
.expect("cannot safely cast to DWORD"),
)
}?;
debug!("done SetInformationJobObject(completion)");

let mut info = JOBOBJECT_EXTENDED_LIMIT_INFORMATION::default();

Expand All @@ -83,8 +88,11 @@ pub(crate) fn make_job_object(handle: HANDLE, kill_on_drop: bool) -> Result<JobP
.expect("cannot safely cast to DWORD"),
)
}?;
debug!("done SetInformationJobObject(limit)");

unsafe { AssignProcessToJobObject(job, handle) }?;
debug!("done AssignProcessToJobObject");

Ok(JobPort {
job,
completion_port,
Expand All @@ -95,6 +103,7 @@ pub(crate) fn make_job_object(handle: HANDLE, kill_on_drop: bool) -> Result<JobP
///
/// This is a pretty terrible hack, but it's either this or we
/// re-implement all of Rust's std::process just to get access!
#[instrument(level = "debug")]
pub(crate) fn resume_threads(child_process: HANDLE) -> Result<()> {
#[inline]
unsafe fn inner(pid: u32, tool_handle: HANDLE) -> Result<()> {
Expand Down Expand Up @@ -134,10 +143,13 @@ pub(crate) fn resume_threads(child_process: HANDLE) -> Result<()> {
}

/// Terminate a job object without waiting for the processes to exit.
#[instrument(level = "debug")]
pub(crate) fn terminate_job(job: HANDLE, exit_code: u32) -> Result<()> {
unsafe { TerminateJobObject(job, exit_code) }.map_err(Error::other)
}

/// Wait for a job to complete.
#[instrument(level = "debug")]
pub(crate) fn wait_on_job(
completion_port: HANDLE,
timeout: Option<Duration>,
Expand Down
5 changes: 5 additions & 0 deletions tests/tokio_windows/id_same_as_inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ async fn nowrap() -> Result<()> {

#[tokio::test]
async fn job_object() -> Result<()> {
tracing_subscriber::fmt::fmt()
.pretty()
.with_env_filter(tracing_subscriber::filter::EnvFilter::from_default_env())
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::FULL)
.init();
let child = TokioCommandWrap::with_new("powershell.exe", |command| {
command.arg("/C").arg("echo hello").stdout(Stdio::null());
})
Expand Down

0 comments on commit 9546d12

Please sign in to comment.