Skip to content

Commit

Permalink
Eagerly schedule local tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
bgw committed Jan 14, 2025
1 parent 2889669 commit 6e3b95b
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 126 deletions.
89 changes: 30 additions & 59 deletions turbopack/crates/turbo-tasks/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use crate::{
trait_helpers::get_trait_method,
util::StaticOrArc,
vc::ReadVcFuture,
Completion, FunctionMeta, InvalidationReason, InvalidationReasonSet, OutputContent, ResolvedVc,
Completion, FunctionMeta, InvalidationReason, InvalidationReasonSet, ResolvedVc,
SharedReference, TaskId, TaskIdSet, ValueTypeId, Vc, VcRead, VcValueTrait, VcValueType,
};

Expand Down Expand Up @@ -441,21 +441,9 @@ impl CurrentGlobalTaskState {
}
}

/// Create a [`LocalTask::Unscheduled`].
#[cfg(feature = "local_resolution")]
fn create_local_task(
&mut self,
ty: CachedTaskType,
// if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence
persistence: TaskPersistence,
) -> LocalTaskId {
use crate::task::local_task;

self.local_tasks
.push(LocalTask::Unscheduled(Arc::new(local_task::Unscheduled {
ty,
persistence,
})));
fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId {
self.local_tasks.push(local_task);
// generate a one-indexed id
if cfg!(debug_assertions) {
LocalTaskId::from(u32::try_from(self.local_tasks.len()).unwrap())
Expand All @@ -469,6 +457,7 @@ impl CurrentGlobalTaskState {
&self.local_tasks[(*local_task_id as usize) - 1]
}

#[cfg(feature = "local_resolution")]
fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask {
&mut self.local_tasks[(*local_task_id as usize) - 1]
}
Expand Down Expand Up @@ -692,11 +681,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
arg,
};
#[cfg(feature = "local_resolution")]
return CURRENT_GLOBAL_TASK_STATE.with(move |gts| {
let mut gts_write = gts.write().unwrap();
let local_task_id = gts_write.create_local_task(task_type, persistence);
RawVc::LocalOutput(gts_write.task_id, local_task_id)
});
return self.schedule_local_task(task_type, persistence);
#[cfg(not(feature = "local_resolution"))]
match persistence {
TaskPersistence::LocalCells => {
Expand Down Expand Up @@ -735,11 +720,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
arg,
};
#[cfg(feature = "local_resolution")]
return CURRENT_GLOBAL_TASK_STATE.with(move |gts| {
let mut gts_write = gts.write().unwrap();
let local_task_id = gts_write.create_local_task(task_type, persistence);
RawVc::LocalOutput(gts_write.task_id, local_task_id)
});
return self.schedule_local_task(task_type, persistence);
#[cfg(not(feature = "local_resolution"))]
return match persistence {
TaskPersistence::LocalCells => {
Expand Down Expand Up @@ -793,11 +774,7 @@ impl<B: Backend + 'static> TurboTasks<B> {
};

#[cfg(feature = "local_resolution")]
return CURRENT_GLOBAL_TASK_STATE.with(move |gts| {
let mut gts_write = gts.write().unwrap();
let local_task_id = gts_write.create_local_task(task_type, persistence);
RawVc::LocalOutput(gts_write.task_id, local_task_id)
});
return self.schedule_local_task(task_type, persistence);
#[cfg(not(feature = "local_resolution"))]
return match persistence {
TaskPersistence::LocalCells => {
Expand Down Expand Up @@ -912,48 +889,45 @@ impl<B: Backend + 'static> TurboTasks<B> {
tokio::task::spawn(future);
}

fn schedule_local_task(&self, parent_task_id: TaskId, local_task_id: LocalTaskId) {
let Some((global_task_state, unscheduled_local_task)) =
#[cfg(feature = "local_resolution")]
fn schedule_local_task(
&self,
ty: CachedTaskType,
// if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence
persistence: TaskPersistence,
) -> RawVc {
use crate::OutputContent;

let ty = Arc::new(ty);
let (global_task_state, local_task_id, parent_task_id) =
CURRENT_GLOBAL_TASK_STATE.with(|gts| {
let mut gts_write = gts.write().unwrap();
gts_write.assert_task_id(parent_task_id);
let local_task = gts_write.get_mut_local_task(local_task_id);
let LocalTask::Unscheduled(unscheduled_local_task) = local_task else {
return None;
};
let unscheduled_local_task = Arc::clone(unscheduled_local_task);
*local_task = LocalTask::Scheduled {
let local_task_id = gts_write.create_local_task(LocalTask::Scheduled {
done_event: Event::new({
let ult = Arc::clone(&unscheduled_local_task);
move || format!("LocalTask({})::done_event", ult.ty)
let ty = Arc::clone(&ty);
move || format!("LocalTask({})::done_event", ty)
}),
};

Some((Arc::clone(gts), unscheduled_local_task))
})
else {
// it's either already scheduled or already done
return;
};
});
(Arc::clone(gts), local_task_id, gts_write.task_id)
});

let local_task_state = CurrentLocalTaskState::new(
self.execution_id_factory.get(),
unscheduled_local_task
.ty
.try_get_function_id()
ty.try_get_function_id()
.map(|func_id| &get_function(func_id).function_meta),
);

#[cfg(feature = "tokio_tracing")]
let description = format!(
"[local] (parent: {}) {}",
self.backend.get_task_description(parent_task_id),
unscheduled_local_task.ty,
ty,
);

let this = self.pin();
let future = async move {
let TaskExecutionSpec { future, span } = unscheduled_local_task.start_execution(&*this);
let TaskExecutionSpec { future, span } =
crate::task::local_task::get_local_task_execution_spec(&*this, &ty, persistence);
async move {
let (result, _duration, _memory_usage) =
CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()).await;
Expand Down Expand Up @@ -1003,6 +977,8 @@ impl<B: Backend + 'static> TurboTasks<B> {
.unwrap();
#[cfg(not(feature = "tokio_tracing"))]
tokio::task::spawn(future);

RawVc::LocalOutput(parent_task_id, local_task_id)
}

fn begin_primary_job(&self) {
Expand Down Expand Up @@ -1515,11 +1491,6 @@ impl<B: Backend + 'static> TurboTasksApi for TurboTasks<B> {
let gts_read = gts.read().unwrap();
gts_read.assert_task_id(parent_task_id);
match gts_read.get_local_task(local_task_id) {
LocalTask::Unscheduled(..) => {
drop(gts_read);
self.schedule_local_task(parent_task_id, local_task_id);
continue;
}
LocalTask::Scheduled { done_event } => {
return Ok(Err(done_event.listen()));
}
Expand Down
124 changes: 57 additions & 67 deletions turbopack/crates/turbo-tasks/src/task/local_task.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::sync::Arc;

use crate::{
backend::{Backend, CachedTaskType, TaskExecutionSpec},
event::Event,
Expand All @@ -8,75 +6,67 @@ use crate::{

/// A potentially in-flight local task stored in `CurrentGlobalTaskState::local_tasks`.
pub enum LocalTask {
Unscheduled(Arc<Unscheduled>),
Scheduled { done_event: Event },
Done { output: OutputContent },
}

pub struct Unscheduled {
pub ty: CachedTaskType,
/// if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence
pub persistence: TaskPersistence,
}

impl Unscheduled {
pub fn start_execution<'a>(
&'a self,
turbo_tasks: &dyn TurboTasksBackendApi<impl Backend + 'static>,
) -> TaskExecutionSpec<'a> {
let Self { ty, persistence } = self;
match ty {
CachedTaskType::Native {
fn_type: native_fn_id,
this,
arg,
} => {
debug_assert_eq!(persistence, &TaskPersistence::LocalCells);
let func = registry::get_function(*native_fn_id);
let span = func.span(TaskPersistence::LocalCells);
let entered = span.enter();
let future = func.execute(*this, &**arg);
drop(entered);
TaskExecutionSpec { future, span }
}
CachedTaskType::ResolveNative {
fn_type: native_fn_id,
this,
arg,
} => {
let func = registry::get_function(*native_fn_id);
let span = func.resolve_span(TaskPersistence::LocalCells);
let entered = span.enter();
let future = Box::pin(CachedTaskType::run_resolve_native(
*native_fn_id,
*this,
&**arg,
*persistence,
turbo_tasks.pin(),
));
drop(entered);
TaskExecutionSpec { future, span }
}
CachedTaskType::ResolveTrait {
trait_type: trait_type_id,
method_name: name,
this,
arg,
} => {
let trait_type = registry::get_trait(*trait_type_id);
let span = trait_type.resolve_span(name);
let entered = span.enter();
let future = Box::pin(CachedTaskType::run_resolve_trait(
*trait_type_id,
name.clone(),
*this,
&**arg,
*persistence,
turbo_tasks.pin(),
));
drop(entered);
TaskExecutionSpec { future, span }
}
pub fn get_local_task_execution_spec<'a>(
turbo_tasks: &'_ dyn TurboTasksBackendApi<impl Backend + 'static>,
ty: &'a CachedTaskType,
// if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence
persistence: TaskPersistence,
) -> TaskExecutionSpec<'a> {
match ty {
CachedTaskType::Native {
fn_type: native_fn_id,
this,
arg,
} => {
debug_assert_eq!(persistence, TaskPersistence::LocalCells);
let func = registry::get_function(*native_fn_id);
let span = func.span(TaskPersistence::LocalCells);
let entered = span.enter();
let future = func.execute(*this, &**arg);
drop(entered);
TaskExecutionSpec { future, span }
}
CachedTaskType::ResolveNative {
fn_type: native_fn_id,
this,
arg,
} => {
let func = registry::get_function(*native_fn_id);
let span = func.resolve_span(TaskPersistence::LocalCells);
let entered = span.enter();
let future = Box::pin(CachedTaskType::run_resolve_native(
*native_fn_id,
*this,
&**arg,
persistence,
turbo_tasks.pin(),
));
drop(entered);
TaskExecutionSpec { future, span }
}
CachedTaskType::ResolveTrait {
trait_type: trait_type_id,
method_name: name,
this,
arg,
} => {
let trait_type = registry::get_trait(*trait_type_id);
let span = trait_type.resolve_span(name);
let entered = span.enter();
let future = Box::pin(CachedTaskType::run_resolve_trait(
*trait_type_id,
name.clone(),
*this,
&**arg,
persistence,
turbo_tasks.pin(),
));
drop(entered);
TaskExecutionSpec { future, span }
}
}
}

0 comments on commit 6e3b95b

Please sign in to comment.