Skip to content

Commit a7aa5e3

Browse files
authored
Fix make resource failure when cc parallel is enabled (#985)
1 parent acde7a1 commit a7aa5e3

File tree

7 files changed

+263
-519
lines changed

7 files changed

+263
-519
lines changed

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@ exclude = ["/.github", "tests", "src/bin"]
1919
edition = "2018"
2020
rust-version = "1.53"
2121

22+
[dependencies]
23+
jobserver = { version = "0.1.20", default-features = false, optional = true }
24+
2225
[target.'cfg(unix)'.dependencies]
2326
# Don't turn on the feature "std" for this, see https://github.com/rust-lang/cargo/issues/4866
2427
# which is still an issue with `resolver = "1"`.
2528
libc = { version = "0.2.62", default-features = false, optional = true }
2629

2730
[features]
28-
parallel = ["libc"]
31+
parallel = ["libc", "jobserver"]
2932

3033
[dev-dependencies]
3134
tempfile = "3"

src/lib.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,9 @@ enum ErrorKind {
312312
ToolNotFound,
313313
/// One of the function arguments failed validation.
314314
InvalidArgument,
315+
#[cfg(feature = "parallel")]
316+
/// jobserver helpthread failure
317+
JobserverHelpThreadError,
315318
}
316319

317320
/// Represents an internal error that occurred, with an explanation.
@@ -1510,13 +1513,7 @@ impl Build {
15101513
let spawn_future = async {
15111514
for obj in objs {
15121515
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
1513-
let token = loop {
1514-
if let Some(token) = tokens.try_acquire()? {
1515-
break token;
1516-
} else {
1517-
YieldOnce::default().await
1518-
}
1519-
};
1516+
let token = tokens.acquire().await?;
15201517
let mut child = spawn(&mut cmd, &program, &self.cargo_output)?;
15211518
let mut stderr_forwarder = StderrForwarder::new(&mut child);
15221519
stderr_forwarder.set_non_blocking()?;

src/parallel/job_token.rs

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
use std::{marker::PhantomData, mem::MaybeUninit, sync::Once};
2+
3+
use crate::Error;
4+
5+
pub(crate) struct JobToken(PhantomData<()>);
6+
7+
impl JobToken {
8+
fn new() -> Self {
9+
Self(PhantomData)
10+
}
11+
}
12+
13+
impl Drop for JobToken {
14+
fn drop(&mut self) {
15+
match JobTokenServer::new() {
16+
JobTokenServer::Inherited(jobserver) => jobserver.release_token_raw(),
17+
JobTokenServer::InProcess(jobserver) => jobserver.release_token_raw(),
18+
}
19+
}
20+
}
21+
22+
enum JobTokenServer {
23+
Inherited(inherited_jobserver::JobServer),
24+
InProcess(inprocess_jobserver::JobServer),
25+
}
26+
27+
impl JobTokenServer {
28+
/// This function returns a static reference to the jobserver because
29+
/// - creating a jobserver from env is a bit fd-unsafe (e.g. the fd might
30+
/// be closed by other jobserver users in the process) and better do it
31+
/// at the start of the program.
32+
/// - in case a jobserver cannot be created from env (e.g. it's not
33+
/// present), we will create a global in-process only jobserver
34+
/// that has to be static so that it will be shared by all cc
35+
/// compilation.
36+
fn new() -> &'static Self {
37+
static INIT: Once = Once::new();
38+
static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();
39+
40+
unsafe {
41+
INIT.call_once(|| {
42+
let server = inherited_jobserver::JobServer::from_env()
43+
.map(Self::Inherited)
44+
.unwrap_or_else(|| Self::InProcess(inprocess_jobserver::JobServer::new()));
45+
JOBSERVER = MaybeUninit::new(server);
46+
});
47+
// TODO: Poor man's assume_init_ref, as that'd require a MSRV of 1.55.
48+
&*JOBSERVER.as_ptr()
49+
}
50+
}
51+
}
52+
53+
pub(crate) enum ActiveJobTokenServer {
54+
Inherited(inherited_jobserver::ActiveJobServer<'static>),
55+
InProcess(&'static inprocess_jobserver::JobServer),
56+
}
57+
58+
impl ActiveJobTokenServer {
59+
pub(crate) fn new() -> Result<Self, Error> {
60+
match JobTokenServer::new() {
61+
JobTokenServer::Inherited(inherited_jobserver) => {
62+
inherited_jobserver.enter_active().map(Self::Inherited)
63+
}
64+
JobTokenServer::InProcess(inprocess_jobserver) => {
65+
Ok(Self::InProcess(inprocess_jobserver))
66+
}
67+
}
68+
}
69+
70+
pub(crate) async fn acquire(&self) -> Result<JobToken, Error> {
71+
match &self {
72+
Self::Inherited(jobserver) => jobserver.acquire().await,
73+
Self::InProcess(jobserver) => Ok(jobserver.acquire().await),
74+
}
75+
}
76+
}
77+
78+
mod inherited_jobserver {
79+
use super::JobToken;
80+
81+
use crate::{parallel::async_executor::YieldOnce, Error, ErrorKind};
82+
83+
use std::{
84+
io, mem,
85+
sync::{mpsc, Mutex, MutexGuard, PoisonError},
86+
};
87+
88+
pub(super) struct JobServer {
89+
/// Implicit token for this process which is obtained and will be
90+
/// released in parent. Since JobTokens only give back what they got,
91+
/// there should be at most one global implicit token in the wild.
92+
///
93+
/// Since Rust does not execute any `Drop` for global variables,
94+
/// we can't just put it back to jobserver and then re-acquire it at
95+
/// the end of the process.
96+
///
97+
/// Use `Mutex` to avoid race between acquire and release.
98+
/// If an `AtomicBool` is used, then it's possible for:
99+
/// - `release_token_raw`: Tries to set `global_implicit_token` to true, but it is already
100+
/// set to `true`, continue to release it to jobserver
101+
/// - `acquire` takes the global implicit token, set `global_implicit_token` to false
102+
/// - `release_token_raw` now writes the token back into the jobserver, while
103+
/// `global_implicit_token` is `false`
104+
///
105+
/// If the program exits here, then cc effectively increases parallelism by one, which is
106+
/// incorrect, hence we use a `Mutex` here.
107+
global_implicit_token: Mutex<bool>,
108+
inner: jobserver::Client,
109+
}
110+
111+
impl JobServer {
112+
pub(super) unsafe fn from_env() -> Option<Self> {
113+
jobserver::Client::from_env().map(|inner| Self {
114+
inner,
115+
global_implicit_token: Mutex::new(true),
116+
})
117+
}
118+
119+
fn get_global_implicit_token(&self) -> MutexGuard<'_, bool> {
120+
self.global_implicit_token
121+
.lock()
122+
.unwrap_or_else(PoisonError::into_inner)
123+
}
124+
125+
/// All tokens except for the global implicit token will be put back into the jobserver
126+
/// immediately and they cannot be cached, since Rust does not call `Drop::drop` on
127+
/// global variables.
128+
pub(super) fn release_token_raw(&self) {
129+
let mut global_implicit_token = self.get_global_implicit_token();
130+
131+
if *global_implicit_token {
132+
// There's already a global implicit token, so this token must
133+
// be released back into jobserver.
134+
//
135+
// `release_raw` should not block
136+
let _ = self.inner.release_raw();
137+
} else {
138+
*global_implicit_token = true;
139+
}
140+
}
141+
142+
pub(super) fn enter_active(&self) -> Result<ActiveJobServer<'_>, Error> {
143+
ActiveJobServer::new(self)
144+
}
145+
}
146+
147+
pub(crate) struct ActiveJobServer<'a> {
148+
jobserver: &'a JobServer,
149+
helper_thread: jobserver::HelperThread,
150+
/// When rx is dropped, all the token stored within it will be dropped.
151+
rx: mpsc::Receiver<io::Result<jobserver::Acquired>>,
152+
}
153+
154+
impl<'a> ActiveJobServer<'a> {
155+
fn new(jobserver: &'a JobServer) -> Result<Self, Error> {
156+
let (tx, rx) = mpsc::channel();
157+
158+
Ok(Self {
159+
rx,
160+
helper_thread: jobserver.inner.clone().into_helper_thread(move |res| {
161+
let _ = tx.send(res);
162+
})?,
163+
jobserver,
164+
})
165+
}
166+
167+
pub(super) async fn acquire(&self) -> Result<JobToken, Error> {
168+
let mut has_requested_token = false;
169+
170+
loop {
171+
// Fast path
172+
if mem::replace(&mut *self.jobserver.get_global_implicit_token(), false) {
173+
break Ok(JobToken::new());
174+
}
175+
176+
// Cold path, no global implicit token, obtain one
177+
match self.rx.try_recv() {
178+
Ok(res) => {
179+
let acquired = res?;
180+
acquired.drop_without_releasing();
181+
break Ok(JobToken::new());
182+
}
183+
Err(mpsc::TryRecvError::Disconnected) => {
184+
break Err(Error::new(
185+
ErrorKind::JobserverHelpThreadError,
186+
"jobserver help thread has returned before ActiveJobServer is dropped",
187+
))
188+
}
189+
Err(mpsc::TryRecvError::Empty) => {
190+
if !has_requested_token {
191+
self.helper_thread.request_token();
192+
has_requested_token = true;
193+
}
194+
YieldOnce::default().await
195+
}
196+
}
197+
}
198+
}
199+
}
200+
}
201+
202+
mod inprocess_jobserver {
203+
use super::JobToken;
204+
205+
use crate::parallel::async_executor::YieldOnce;
206+
207+
use std::{
208+
env::var,
209+
sync::atomic::{
210+
AtomicU32,
211+
Ordering::{AcqRel, Acquire},
212+
},
213+
};
214+
215+
pub(crate) struct JobServer(AtomicU32);
216+
217+
impl JobServer {
218+
pub(super) fn new() -> Self {
219+
// Use `NUM_JOBS` if set (it's configured by Cargo) and otherwise
220+
// just fall back to a semi-reasonable number.
221+
//
222+
// Note that we could use `num_cpus` here but it's an extra
223+
// dependency that will almost never be used, so
224+
// it's generally not too worth it.
225+
let mut parallelism = 4;
226+
// TODO: Use std::thread::available_parallelism as an upper bound
227+
// when MSRV is bumped.
228+
if let Ok(amt) = var("NUM_JOBS") {
229+
if let Ok(amt) = amt.parse() {
230+
parallelism = amt;
231+
}
232+
}
233+
234+
Self(AtomicU32::new(parallelism))
235+
}
236+
237+
pub(super) async fn acquire(&self) -> JobToken {
238+
loop {
239+
let res = self
240+
.0
241+
.fetch_update(AcqRel, Acquire, |tokens| tokens.checked_sub(1));
242+
243+
if res.is_ok() {
244+
break JobToken::new();
245+
}
246+
247+
YieldOnce::default().await
248+
}
249+
}
250+
251+
pub(super) fn release_token_raw(&self) {
252+
self.0.fetch_add(1, AcqRel);
253+
}
254+
}
255+
}

0 commit comments

Comments
 (0)