Skip to content

Optimize Build::compile_objects: Only spawns one thread #780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ edition = "2018"

[dependencies]
jobserver = { version = "0.1.16", optional = true }
os_pipe = "1"
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The msrv CI for windows failed because it automatically pulls in the latest version of os_pipe v1.1.2, which depends on windows-sys v0.42.

windows-sys v0.42 seems to use if, match, || and && in const, hence failed in rust v1.34.

However, we only need os_pipe v1.0.0 to work here, I've tested this with cargo update --Zminimal-version and all the tests ran just fine.

So I think this does not break the msrv given that the users manage the Cargo.lock to not use the latest version of os_pipe, but not entirely sure about that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this doesn't practically update the MSRV. I think a reasonable solution would be to run cargo update -p os_pipe --precise 1.0.0 (possibly after cargo generate-lockfile if needed) in the CI script.

That said, do you know what version these features require? I don't mind bumping it a bit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, do you know what version these features require? I don't mind bumping it a bit.

I looked into the tracking issue rust-lang/rust#57563 and it seems to be the first item in the list rust-lang/rust#49146 , which is stablised in 1.46.

P.S. If we are bumping msrv to 1.46, then I can replace the Option used in fn jobserver() with mem::MaybeUninit.

Copy link
Member

@thomcc thomcc Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1.46 should be fine. I'll ask if anybody objects on Zulip (https://rust-lang.zulipchat.com/#narrow/stream/351149-t-libs.2Fcrates/topic/Any.20objections.20to.20bumping.20.60cc.60's.20MSRV), but my main goal was to avoid going past Debian stable (which is on 1.48), since it tends to cause a lot of spurious bug reports. Even then, those would probably be directed at the crate that causes the compilation error (in this case windows-sys), rather than us.

That said, I'm slightly unsure about adding the dependency on os_pipe, since it's not nearly as widely used as cc, and is pretty lightweight (so it might be better to implement manually). That said, it might be fine.

Copy link
Collaborator Author

@NobodyXu NobodyXu Jan 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(so it might be better to implement manually).

That's definitely doable, I checked their source code and it is really simple, though I personally don't want more unsafe code in cc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, although I think that might be easier unsafe to audit than the current code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also notable that we don't currently have any non-optional dependencies, and even among optional dependencies we don't have any that aren't maintained by rust-lang team members (this would change that). So I'm not sure, it might be worth being a bit conservative here.

I'm largely inclined to think windows-sys and libc are fine, though.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I'm not sure, it might be worth being a bit conservative here.

Hmmm, maybe a good idea to move os_pipe into rust-lang considering that it is also a fundamental lib used by other crates?

CC @oconnor663 owner of os_pipe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In any case I still need to review this code and use of the dependency (sadly, it's looking like I might not get to this until the weekend, not sure).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to help if I can. Luckily the whole crate isn't very much code :)


[features]
parallel = ["jobserver"]
Expand Down
252 changes: 134 additions & 118 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1218,8 +1218,7 @@ impl Build {
}

#[cfg(feature = "parallel")]
fn compile_objects<'me>(&'me self, objs: &[Object]) -> Result<(), Error> {
use std::sync::atomic::{AtomicBool, Ordering::SeqCst};
fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
use std::sync::Once;

// Limit our parallelism globally with a jobserver. Start off by
Expand All @@ -1242,56 +1241,28 @@ impl Build {
// Note that this jobserver is cached globally so we only used one per
// process and only worry about creating it once.
//
// * Next we use a raw `thread::spawn` per thread to actually compile
// objects in parallel. We only actually spawn a thread after we've
// acquired a token to perform some work
//
// * Finally though we want to keep the dependencies of this crate
// pretty light, so we avoid using a safe abstraction like `rayon` and
// instead rely on some bits of `unsafe` code. We know that this stack
// frame persists while everything is compiling so we use all the
// stack-allocated objects without cloning/reallocating. We use a
// transmute to `State` with a `'static` lifetime to persist
// everything we need across the boundary, and the join-on-drop
// semantics of `JoinOnDrop` should ensure that our stack frame is
// alive while threads are alive.
// * Next we use spawn the process to actually compile objects in
// parallel after we've acquired a token to perform some work
//
// With all that in mind we compile all objects in a loop here, after we
// acquire the appropriate tokens, Once all objects have been compiled
// we join on all the threads and propagate the results of compilation.
//
// Note that as a slight optimization we try to break out as soon as
// possible as soon as any compilation fails to ensure that errors get
// out to the user as fast as possible.
let error = AtomicBool::new(false);
let mut threads = Vec::new();
for obj in objs {
if error.load(SeqCst) {
break;
}
let token = server.acquire()?;
let state = State {
build: self,
obj,
error: &error,
};
let state = unsafe { std::mem::transmute::<State, State<'static>>(state) };
let thread = thread::spawn(|| {
let state: State<'me> = state; // erase the `'static` lifetime
let result = state.build.compile_object(state.obj);
if result.is_err() {
state.error.store(true, SeqCst);
}
drop(token); // make sure our jobserver token is released after the compile
return result;
});
threads.push(JoinOnDrop(Some(thread)));
}
// we wait on all the processes and propagate the results of compilation.
let print = PrintThread::new()?;

for mut thread in threads {
if let Some(thread) = thread.0.take() {
thread.join().expect("thread should not panic")?;
}
let children = objs
.iter()
.map(|obj| {
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
let token = server.acquire()?;

let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?;

Ok((cmd, program, KillOnDrop(child), token))
})
.collect::<Result<Vec<_>, Error>>()?;

for (cmd, program, mut child, _token) in children {
wait_on_child(&cmd, &program, &mut child.0)?;
}

// Reacquire our process's token before we proceed, which we released
Expand All @@ -1302,16 +1273,6 @@ impl Build {

return Ok(());

/// Shared state from the parent thread to the child thread. This
/// package of pointers is temporarily transmuted to a `'static`
/// lifetime to cross the thread boundary and then once the thread is
/// running we erase the `'static` to go back to an anonymous lifetime.
struct State<'a> {
build: &'a Build,
obj: &'a Object,
error: &'a AtomicBool,
}

/// Returns a suitable `jobserver::Client` used to coordinate
/// parallelism between build scripts.
fn jobserver() -> &'static jobserver::Client {
Expand Down Expand Up @@ -1357,26 +1318,30 @@ impl Build {
return client;
}

struct JoinOnDrop(Option<thread::JoinHandle<Result<(), Error>>>);
struct KillOnDrop(Child);

impl Drop for JoinOnDrop {
impl Drop for KillOnDrop {
fn drop(&mut self) {
if let Some(thread) = self.0.take() {
drop(thread.join());
}
let child = &mut self.0;

child.kill().ok();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, but I'm not a fan of using .ok() as an idiom to ignore the return value here. I'd suggest using let _ = and a comment.

(Or, possibly, calling .expect() since it's probably worth killing the process if this fails. But that'd be a semantic change from the previous code, so if done at all that should be a separate PR.)

}
}
}

#[cfg(not(feature = "parallel"))]
fn compile_objects(&self, objs: &[Object]) -> Result<(), Error> {
let print = PrintThread::new()?;

for obj in objs {
self.compile_object(obj)?;
let (mut cmd, name) = self.create_compile_object_cmd(obj)?;
run_inner(&mut cmd, &name, print.pipe_writer_cloned()?.unwrap())?;
}

Ok(())
}

fn compile_object(&self, obj: &Object) -> Result<(), Error> {
fn create_compile_object_cmd(&self, obj: &Object) -> Result<(Command, String), Error> {
let asm_ext = AsmFileExt::from_path(&obj.src);
let is_asm = asm_ext.is_some();
let target = self.get_target()?;
Expand Down Expand Up @@ -1425,8 +1390,7 @@ impl Build {
self.fix_env_for_apple_os(&mut cmd)?;
}

run(&mut cmd, &name)?;
Ok(())
Ok((cmd, name))
}

/// This will return a result instead of panicing; see expand() for the complete description.
Expand Down Expand Up @@ -3463,21 +3427,19 @@ impl Tool {
}
}

fn run(cmd: &mut Command, program: &str) -> Result<(), Error> {
let (mut child, print) = spawn(cmd, program)?;
fn wait_on_child(cmd: &Command, program: &str, child: &mut Child) -> Result<(), Error> {
let status = match child.wait() {
Ok(s) => s,
Err(_) => {
Err(e) => {
return Err(Error::new(
ErrorKind::ToolExecError,
&format!(
"Failed to wait on spawned child process, command {:?} with args {:?}.",
cmd, program
"Failed to wait on spawned child process, command {:?} with args {:?}: {}.",
cmd, program, e
),
));
}
};
print.join().unwrap();
println!("{}", status);

if status.success() {
Expand All @@ -3493,63 +3455,62 @@ fn run(cmd: &mut Command, program: &str) -> Result<(), Error> {
}
}

fn run_inner(
cmd: &mut Command,
program: &str,
pipe_writer: os_pipe::PipeWriter,
) -> Result<(), Error> {
let mut child = spawn(cmd, program, pipe_writer)?;
wait_on_child(cmd, program, &mut child)
}

fn run(cmd: &mut Command, program: &str) -> Result<(), Error> {
let mut print = PrintThread::new()?;
run_inner(cmd, program, print.pipe_writer().take().unwrap())?;

Ok(())
}

fn run_output(cmd: &mut Command, program: &str) -> Result<Vec<u8>, Error> {
cmd.stdout(Stdio::piped());
let (mut child, print) = spawn(cmd, program)?;

let mut print = PrintThread::new()?;
let mut child = spawn(cmd, program, print.pipe_writer().take().unwrap())?;

let mut stdout = vec![];
child
.stdout
.take()
.unwrap()
.read_to_end(&mut stdout)
.unwrap();
let status = match child.wait() {
Ok(s) => s,
Err(_) => {
return Err(Error::new(
ErrorKind::ToolExecError,
&format!(
"Failed to wait on spawned child process, command {:?} with args {:?}.",
cmd, program
),
));
}
};
print.join().unwrap();
println!("{}", status);

if status.success() {
Ok(stdout)
} else {
Err(Error::new(
ErrorKind::ToolExecError,
&format!(
"Command {:?} with args {:?} did not execute successfully (status code {}).",
cmd, program, status
),
))
}
wait_on_child(cmd, program, &mut child)?;

Ok(stdout)
}

fn spawn(cmd: &mut Command, program: &str) -> Result<(Child, JoinHandle<()>), Error> {
println!("running: {:?}", cmd);
fn spawn(
cmd: &mut Command,
program: &str,
pipe_writer: os_pipe::PipeWriter,
) -> Result<Child, Error> {
struct ResetStderr<'cmd>(&'cmd mut Command);

// Capture the standard error coming from these programs, and write it out
// with cargo:warning= prefixes. Note that this is a bit wonky to avoid
// requiring the output to be UTF-8, we instead just ship bytes from one
// location to another.
match cmd.stderr(Stdio::piped()).spawn() {
Ok(mut child) => {
let stderr = BufReader::new(child.stderr.take().unwrap());
let print = thread::spawn(move || {
for line in stderr.split(b'\n').filter_map(|l| l.ok()) {
print!("cargo:warning=");
std::io::stdout().write_all(&line).unwrap();
println!("");
}
});
Ok((child, print))
impl Drop for ResetStderr<'_> {
fn drop(&mut self) {
// Reset stderr to default to release pipe_writer so that print thread will
// not block forever.
self.0.stderr(Stdio::inherit());
}
}

println!("running: {:?}", cmd);

let cmd = ResetStderr(cmd);

match cmd.0.stderr(pipe_writer).spawn() {
Ok(child) => Ok(child),
Err(ref e) if e.kind() == io::ErrorKind::NotFound => {
let extra = if cfg!(windows) {
" (see https://github.com/rust-lang/cc-rs#compile-time-requirements \
Expand All @@ -3562,11 +3523,11 @@ fn spawn(cmd: &mut Command, program: &str) -> Result<(Child, JoinHandle<()>), Er
&format!("Failed to find tool. Is `{}` installed?{}", program, extra),
))
}
Err(ref e) => Err(Error::new(
Err(e) => Err(Error::new(
ErrorKind::ToolExecError,
&format!(
"Command {:?} with args {:?} failed to start: {:?}",
cmd, program, e
cmd.0, program, e
),
)),
}
Expand Down Expand Up @@ -3767,3 +3728,58 @@ impl AsmFileExt {
None
}
}

struct PrintThread {
handle: Option<JoinHandle<()>>,
pipe_writer: Option<os_pipe::PipeWriter>,
}

impl PrintThread {
fn new() -> Result<Self, Error> {
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;

// Capture the standard error coming from compilation, and write it out
// with cargo:warning= prefixes. Note that this is a bit wonky to avoid
// requiring the output to be UTF-8, we instead just ship bytes from one
// location to another.
let print = thread::spawn(move || {
let mut stderr = BufReader::with_capacity(4096, pipe_reader);
let mut line = String::with_capacity(20);
let mut stdout = io::stdout();

// read_line returns 0 on Eof
while stderr.read_line(&mut line).unwrap() != 0 {
writeln!(&mut stdout, "cargo:warning={}", line).ok();

// read_line does not clear the buffer
line.clear();
}
});

Ok(Self {
handle: Some(print),
pipe_writer: Some(pipe_writer),
})
}

fn pipe_writer(&mut self) -> &mut Option<os_pipe::PipeWriter> {
&mut self.pipe_writer
}

fn pipe_writer_cloned(&self) -> Result<Option<os_pipe::PipeWriter>, Error> {
self.pipe_writer
.as_ref()
.map(os_pipe::PipeWriter::try_clone)
.transpose()
.map_err(From::from)
}
}

impl Drop for PrintThread {
fn drop(&mut self) {
// Drop pipe_writer first to avoid deadlock
self.pipe_writer.take();

self.handle.take().unwrap().join().unwrap();
}
}