Skip to content

Commit

Permalink
We are, mildly amusingly, delaying glyph order for BE glyphs to the d…
Browse files Browse the repository at this point in the history
…etriment of overall execution. Gradually gaining control at least.
  • Loading branch information
rsheeter committed Nov 13, 2023
1 parent 314ebb3 commit f45a46e
Showing 1 changed file with 83 additions and 60 deletions.
143 changes: 83 additions & 60 deletions fontc/src/workload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
panic::AssertUnwindSafe,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Arc, Mutex,
},
};

Expand All @@ -19,7 +19,7 @@ use fontir::{
use log::{debug, trace, warn};

use crate::{
timing::{create_timer, JobTime, JobTimer},
timing::{create_timer, JobTime, JobTimeQueued, JobTimer},
work::{AnyAccess, AnyContext, AnyWork, AnyWorkError},
ChangeDetector, Error,
};
Expand Down Expand Up @@ -243,45 +243,23 @@ impl<'a> Workload<'a> {
}
}

pub fn launchable(&mut self) -> Vec<AnyWorkId> {
/// Populate launchable with jobs ready to run from highest to lowest priority
pub fn update_launchable(&mut self, launchable: &mut Vec<AnyWorkId>) {
let timing = create_timer(AnyWorkId::InternalTiming("Launchable"))
.queued()
.run();

let mut move_to_front: Option<Vec<usize>> = None;
let mut launchable: Vec<_> = self
launchable.clear();
for id in self
.jobs_pending
.iter()
.filter_map(|(id, job)| (!job.running && self.can_run(job)).then_some(id))
.enumerate()
.map(|(idx, id)| {
if matches!(
id,
AnyWorkId::Fe(FeWorkIdentifier::Kerning)
| AnyWorkId::Fe(FeWorkIdentifier::GlyphOrder)
) {
if let Some(move_to_front) = move_to_front.as_mut() {
move_to_front.push(idx);
} else {
move_to_front = Some(vec![idx]);
}
}
id
})
.cloned()
.collect();
trace!("Launchable: {launchable:?}");

// Try to prioritize the critical path
// <https://github.com/googlefonts/fontc/issues/456>, <https://github.com/googlefonts/fontc/pull/565>
if let Some(move_to_front) = move_to_front.take() {
for (idx, move_idx) in move_to_front.into_iter().enumerate() {
launchable.swap(idx, move_idx);
}
{
launchable.push(id.clone());
}

trace!("Launchable: {:?}", launchable);
self.timer.add(timing.complete());
launchable
}

pub fn exec(mut self, fe_root: &FeContext, be_root: &BeContext) -> Result<JobTimer, Error> {
Expand All @@ -292,14 +270,22 @@ impl<'a> Workload<'a> {
// a flag we set if we panic
let abort_queued_jobs = Arc::new(AtomicBool::new(false));

let run_queue = Arc::new(Mutex::new(
Vec::<(AnyWork, JobTimeQueued, AnyContext)>::with_capacity(512),
));

// Do NOT assign custom thread names because it makes flamegraph root each thread individually
rayon::in_place_scope(|scope| {
// Whenever a task completes see if it was the last incomplete dependency of other task(s)
// and spawn them if it was
// TODO timeout and die it if takes too long to make forward progress or we're spinning w/o progress

// To avoid allocation every poll for work
let mut launchable = Vec::with_capacity(512.min(self.job_count));

while self.success.len() < self.job_count {
// Spawn anything that is currently executable (has no unfulfilled dependencies)
let launchable = self.launchable();
self.update_launchable(&mut launchable);
if launchable.is_empty() && !self.jobs_pending.values().any(|j| j.running) {
if log::log_enabled!(log::Level::Warn) {
warn!("{}/{} jobs have succeeded, nothing is running, and nothing is launchable", self.success.len(), self.job_count);
Expand All @@ -310,41 +296,77 @@ impl<'a> Workload<'a> {
return Err(Error::UnableToProceed(self.jobs_pending.len()));
}

// Launch anything that needs launching
for id in launchable {
let timing = create_timer(id.clone());

let job = self.jobs_pending.get_mut(&id).unwrap();
log::trace!("Start {:?}", id);
let send = send.clone();
job.running = true;
let work = job
.work
.take()
.expect("{id:?} ready to run but has no work?!");
if !job.run {
if let Err(e) = send.send((id.clone(), Ok(()), JobTime::nop(id.clone()))) {
log::error!("Unable to write nop {id:?} to completion channel: {e}");
//FIXME: if we can't send messages it means the receiver has dropped,
//which means we should... return? abort?
// Get launchables ready to run
{
let mut run_queue = run_queue.lock().unwrap();
for id in launchable.iter() {
let timing = create_timer(id.clone());

let job = self.jobs_pending.get_mut(&id).unwrap();
log::trace!("Start {:?}", id);
job.running = true;
let work = job
.work
.take()
.expect("{id:?} ready to run but has no work?!");
if !job.run {
if let Err(e) =
send.send((id.clone(), Ok(()), JobTime::nop(id.clone())))
{
log::error!(
"Unable to write nop {id:?} to completion channel: {e}"
);
//FIXME: if we can't send messages it means the receiver has dropped,
//which means we should... return? abort?
}
continue;
}
continue;
let work_context = AnyContext::for_work(
fe_root,
be_root,
&id,
job.read_access.clone(),
job.write_access.clone(),
);

let timing = timing.queued();
run_queue.push((work, timing, work_context));
}
let work_context = AnyContext::for_work(
fe_root,
be_root,
&id,
job.read_access.clone(),
job.write_access.clone(),
);

// Try to prioritize the critical path based on --emit-timing observation
// <https://github.com/googlefonts/fontc/issues/456>, <https://github.com/googlefonts/fontc/pull/565>
run_queue.sort_by_cached_key(|(work, ..)| {
// Higher priority sorts last, which means run first due to pop
// We basically want things that block the glyph order => kern => fea sequence to go asap
match work.id() {
AnyWorkId::Be(BeWorkIdentifier::Features) => 99,
AnyWorkId::Fe(FeWorkIdentifier::Features) => 99,
AnyWorkId::Fe(FeWorkIdentifier::Kerning) => 99,
AnyWorkId::Fe(FeWorkIdentifier::GlyphOrder) => 99,
AnyWorkId::Fe(FeWorkIdentifier::PreliminaryGlyphOrder) => 99,
AnyWorkId::Fe(FeWorkIdentifier::StaticMetadata) => 99,
AnyWorkId::Fe(FeWorkIdentifier::GlobalMetrics) => 99,
AnyWorkId::Fe(FeWorkIdentifier::Glyph(..)) => 1,
_ => 0,
}
});
}

// Spawn a processor for every job that's executable
for _ in 0..launchable.len() {
let send = send.clone();
let run_queue = run_queue.clone();
let abort = abort_queued_jobs.clone();

let timing = timing.queued();
scope.spawn(move |_| {
let runnable = { run_queue.lock().unwrap().pop() };
let Some((work, timing, work_context)) = runnable else {
panic!("Spawned more jobs than items available to run");
};
let id = work.id();
let timing = timing.run();
if abort.load(Ordering::Relaxed) {
log::trace!("Aborting {:?}", work.id());
log::trace!("Aborting {:?}", id);
return;
}
// # Unwind Safety
Expand Down Expand Up @@ -485,8 +507,9 @@ impl<'a> Workload<'a> {
#[cfg(test)]
pub fn run_for_test(&mut self, fe_root: &FeContext, be_root: &BeContext) -> HashSet<AnyWorkId> {
let pre_success = self.success.clone();
let mut launchable = Vec::new();
while !self.jobs_pending.is_empty() {
let launchable = self.launchable();
self.update_launchable(&mut launchable);
if launchable.is_empty() {
log::error!("Completed:");
let mut success: Vec<_> = self.success.iter().collect();
Expand Down

0 comments on commit f45a46e

Please sign in to comment.