Skip to content

Commit

Permalink
wip: port all tokio_unix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
passcod committed Mar 9, 2024
1 parent ecb7577 commit 1dde66e
Show file tree
Hide file tree
Showing 17 changed files with 744 additions and 417 deletions.
111 changes: 111 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ rust-version = "1.75.0"
autoexamples = false

[dependencies]
futures = "0.3.30"
indexmap = "2.2.5"
tokio = { version = "1.33.0", features = ["io-util", "macros", "process", "rt"], optional = true }
tracing = "0.1.40"
Expand Down
10 changes: 7 additions & 3 deletions src/tokio/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{
ffi::OsStr,
future::Future,
io::{Error, Result},
process::ExitStatus,
process::{ExitStatus, Output},
};

use indexmap::IndexMap;
Expand Down Expand Up @@ -127,12 +127,16 @@ pub trait TokioChildWrapper: std::fmt::Debug + Send + Sync {
self.inner_mut().start_kill()
}

fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
self.inner_mut().try_wait()
}

fn wait(&mut self) -> Box<dyn Future<Output = Result<ExitStatus>> + '_> {
Box::new(self.inner_mut().wait())
}

fn try_wait(&mut self) -> Result<Option<ExitStatus>> {
self.inner_mut().try_wait()
fn wait_with_output(self: Box<Self>) -> Box<dyn Future<Output = Result<Output>>> {
Box::new(self.into_inner().wait_with_output())
}

#[cfg(unix)]
Expand Down
35 changes: 34 additions & 1 deletion src/tokio/process_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::{
io::{Error, Result},
ops::ControlFlow,
os::unix::process::ExitStatusExt,
process::ExitStatus,
process::{ExitStatus, Output},
};

use futures::future::try_join3;
use nix::{
errno::Errno,
libc,
Expand All @@ -16,6 +17,7 @@ use nix::{
unistd::{setpgid, Pid},
};
use tokio::{
io::{AsyncRead, AsyncReadExt},
process::{Child, Command},
task::spawn_blocking,
};
Expand Down Expand Up @@ -221,6 +223,37 @@ impl TokioChildWrapper for ProcessGroupChild {
}
}

fn wait_with_output(mut self: Box<Self>) -> Box<dyn Future<Output = Result<Output>>> {
Box::new(async move {
async fn read_to_end<A: AsyncRead + Unpin>(io: &mut Option<A>) -> Result<Vec<u8>> {
let mut vec = Vec::new();
if let Some(io) = io.as_mut() {
io.read_to_end(&mut vec).await?;
}
Ok(vec)
}

let mut stdout_pipe = self.stdout().take();
let mut stderr_pipe = self.stderr().take();

let stdout_fut = read_to_end(&mut stdout_pipe);
let stderr_fut = read_to_end(&mut stderr_pipe);

let (status, stdout, stderr) =
try_join3(Box::into_pin(self.wait()), stdout_fut, stderr_fut).await?;

// Drop happens after `try_join` due to <https://github.com/tokio-rs/tokio/issues/4309>
drop(stdout_pipe);
drop(stderr_pipe);

Ok(Output {
status,
stdout,
stderr,
})
})
}

fn signal(&self, sig: Signal) -> Result<()> {
self.signal_imp(sig)
}
Expand Down
Loading

0 comments on commit 1dde66e

Please sign in to comment.