diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index bc898c475d8158..3bb824aa1da938 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -45,7 +45,7 @@ use crate::{ trait_helpers::get_trait_method, util::StaticOrArc, vc::ReadVcFuture, - Completion, FunctionMeta, InvalidationReason, InvalidationReasonSet, OutputContent, ResolvedVc, + Completion, FunctionMeta, InvalidationReason, InvalidationReasonSet, ResolvedVc, SharedReference, TaskId, TaskIdSet, ValueTypeId, Vc, VcRead, VcValueTrait, VcValueType, }; @@ -441,21 +441,9 @@ impl CurrentGlobalTaskState { } } - /// Create a [`LocalTask::Unscheduled`]. #[cfg(feature = "local_resolution")] - fn create_local_task( - &mut self, - ty: CachedTaskType, - // if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence - persistence: TaskPersistence, - ) -> LocalTaskId { - use crate::task::local_task; - - self.local_tasks - .push(LocalTask::Unscheduled(Arc::new(local_task::Unscheduled { - ty, - persistence, - }))); + fn create_local_task(&mut self, local_task: LocalTask) -> LocalTaskId { + self.local_tasks.push(local_task); // generate a one-indexed id if cfg!(debug_assertions) { LocalTaskId::from(u32::try_from(self.local_tasks.len()).unwrap()) @@ -469,6 +457,7 @@ impl CurrentGlobalTaskState { &self.local_tasks[(*local_task_id as usize) - 1] } + #[cfg(feature = "local_resolution")] fn get_mut_local_task(&mut self, local_task_id: LocalTaskId) -> &mut LocalTask { &mut self.local_tasks[(*local_task_id as usize) - 1] } @@ -692,11 +681,7 @@ impl TurboTasks { arg, }; #[cfg(feature = "local_resolution")] - return CURRENT_GLOBAL_TASK_STATE.with(move |gts| { - let mut gts_write = gts.write().unwrap(); - let local_task_id = gts_write.create_local_task(task_type, persistence); - RawVc::LocalOutput(gts_write.task_id, local_task_id) - }); + return self.schedule_local_task(task_type, persistence); #[cfg(not(feature = "local_resolution"))] match persistence { TaskPersistence::LocalCells => { @@ -735,11 +720,7 @@ impl TurboTasks { arg, }; #[cfg(feature = "local_resolution")] - return CURRENT_GLOBAL_TASK_STATE.with(move |gts| { - let mut gts_write = gts.write().unwrap(); - let local_task_id = gts_write.create_local_task(task_type, persistence); - RawVc::LocalOutput(gts_write.task_id, local_task_id) - }); + return self.schedule_local_task(task_type, persistence); #[cfg(not(feature = "local_resolution"))] return match persistence { TaskPersistence::LocalCells => { @@ -793,11 +774,7 @@ impl TurboTasks { }; #[cfg(feature = "local_resolution")] - return CURRENT_GLOBAL_TASK_STATE.with(move |gts| { - let mut gts_write = gts.write().unwrap(); - let local_task_id = gts_write.create_local_task(task_type, persistence); - RawVc::LocalOutput(gts_write.task_id, local_task_id) - }); + return self.schedule_local_task(task_type, persistence); #[cfg(not(feature = "local_resolution"))] return match persistence { TaskPersistence::LocalCells => { @@ -912,35 +889,31 @@ impl TurboTasks { tokio::task::spawn(future); } - fn schedule_local_task(&self, parent_task_id: TaskId, local_task_id: LocalTaskId) { - let Some((global_task_state, unscheduled_local_task)) = + #[cfg(feature = "local_resolution")] + fn schedule_local_task( + &self, + ty: CachedTaskType, + // if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence + persistence: TaskPersistence, + ) -> RawVc { + use crate::OutputContent; + + let ty = Arc::new(ty); + let (global_task_state, local_task_id, parent_task_id) = CURRENT_GLOBAL_TASK_STATE.with(|gts| { let mut gts_write = gts.write().unwrap(); - gts_write.assert_task_id(parent_task_id); - let local_task = gts_write.get_mut_local_task(local_task_id); - let LocalTask::Unscheduled(unscheduled_local_task) = local_task else { - return None; - }; - let unscheduled_local_task = Arc::clone(unscheduled_local_task); - *local_task = LocalTask::Scheduled { + let local_task_id = gts_write.create_local_task(LocalTask::Scheduled { done_event: Event::new({ - let ult = Arc::clone(&unscheduled_local_task); - move || format!("LocalTask({})::done_event", ult.ty) + let ty = Arc::clone(&ty); + move || format!("LocalTask({})::done_event", ty) }), - }; - - Some((Arc::clone(gts), unscheduled_local_task)) - }) - else { - // it's either already scheduled or already done - return; - }; + }); + (Arc::clone(gts), local_task_id, gts_write.task_id) + }); let local_task_state = CurrentLocalTaskState::new( self.execution_id_factory.get(), - unscheduled_local_task - .ty - .try_get_function_id() + ty.try_get_function_id() .map(|func_id| &get_function(func_id).function_meta), ); @@ -948,12 +921,13 @@ impl TurboTasks { let description = format!( "[local] (parent: {}) {}", self.backend.get_task_description(parent_task_id), - unscheduled_local_task.ty, + ty, ); let this = self.pin(); let future = async move { - let TaskExecutionSpec { future, span } = unscheduled_local_task.start_execution(&*this); + let TaskExecutionSpec { future, span } = + crate::task::local_task::get_local_task_execution_spec(&*this, &ty, persistence); async move { let (result, _duration, _memory_usage) = CaptureFuture::new(AssertUnwindSafe(future).catch_unwind()).await; @@ -1003,6 +977,8 @@ impl TurboTasks { .unwrap(); #[cfg(not(feature = "tokio_tracing"))] tokio::task::spawn(future); + + RawVc::LocalOutput(parent_task_id, local_task_id) } fn begin_primary_job(&self) { @@ -1515,11 +1491,6 @@ impl TurboTasksApi for TurboTasks { let gts_read = gts.read().unwrap(); gts_read.assert_task_id(parent_task_id); match gts_read.get_local_task(local_task_id) { - LocalTask::Unscheduled(..) => { - drop(gts_read); - self.schedule_local_task(parent_task_id, local_task_id); - continue; - } LocalTask::Scheduled { done_event } => { return Ok(Err(done_event.listen())); } diff --git a/turbopack/crates/turbo-tasks/src/task/local_task.rs b/turbopack/crates/turbo-tasks/src/task/local_task.rs index d878d1ff2117b1..768c76f458cc4d 100644 --- a/turbopack/crates/turbo-tasks/src/task/local_task.rs +++ b/turbopack/crates/turbo-tasks/src/task/local_task.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use crate::{ backend::{Backend, CachedTaskType, TaskExecutionSpec}, event::Event, @@ -8,75 +6,67 @@ use crate::{ /// A potentially in-flight local task stored in `CurrentGlobalTaskState::local_tasks`. pub enum LocalTask { - Unscheduled(Arc), Scheduled { done_event: Event }, Done { output: OutputContent }, } -pub struct Unscheduled { - pub ty: CachedTaskType, - /// if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence - pub persistence: TaskPersistence, -} - -impl Unscheduled { - pub fn start_execution<'a>( - &'a self, - turbo_tasks: &dyn TurboTasksBackendApi, - ) -> TaskExecutionSpec<'a> { - let Self { ty, persistence } = self; - match ty { - CachedTaskType::Native { - fn_type: native_fn_id, - this, - arg, - } => { - debug_assert_eq!(persistence, &TaskPersistence::LocalCells); - let func = registry::get_function(*native_fn_id); - let span = func.span(TaskPersistence::LocalCells); - let entered = span.enter(); - let future = func.execute(*this, &**arg); - drop(entered); - TaskExecutionSpec { future, span } - } - CachedTaskType::ResolveNative { - fn_type: native_fn_id, - this, - arg, - } => { - let func = registry::get_function(*native_fn_id); - let span = func.resolve_span(TaskPersistence::LocalCells); - let entered = span.enter(); - let future = Box::pin(CachedTaskType::run_resolve_native( - *native_fn_id, - *this, - &**arg, - *persistence, - turbo_tasks.pin(), - )); - drop(entered); - TaskExecutionSpec { future, span } - } - CachedTaskType::ResolveTrait { - trait_type: trait_type_id, - method_name: name, - this, - arg, - } => { - let trait_type = registry::get_trait(*trait_type_id); - let span = trait_type.resolve_span(name); - let entered = span.enter(); - let future = Box::pin(CachedTaskType::run_resolve_trait( - *trait_type_id, - name.clone(), - *this, - &**arg, - *persistence, - turbo_tasks.pin(), - )); - drop(entered); - TaskExecutionSpec { future, span } - } +pub fn get_local_task_execution_spec<'a>( + turbo_tasks: &'_ dyn TurboTasksBackendApi, + ty: &'a CachedTaskType, + // if this is a `CachedTaskType::Resolve*`, we'll spawn another task with this persistence + persistence: TaskPersistence, +) -> TaskExecutionSpec<'a> { + match ty { + CachedTaskType::Native { + fn_type: native_fn_id, + this, + arg, + } => { + debug_assert_eq!(persistence, TaskPersistence::LocalCells); + let func = registry::get_function(*native_fn_id); + let span = func.span(TaskPersistence::LocalCells); + let entered = span.enter(); + let future = func.execute(*this, &**arg); + drop(entered); + TaskExecutionSpec { future, span } + } + CachedTaskType::ResolveNative { + fn_type: native_fn_id, + this, + arg, + } => { + let func = registry::get_function(*native_fn_id); + let span = func.resolve_span(TaskPersistence::LocalCells); + let entered = span.enter(); + let future = Box::pin(CachedTaskType::run_resolve_native( + *native_fn_id, + *this, + &**arg, + persistence, + turbo_tasks.pin(), + )); + drop(entered); + TaskExecutionSpec { future, span } + } + CachedTaskType::ResolveTrait { + trait_type: trait_type_id, + method_name: name, + this, + arg, + } => { + let trait_type = registry::get_trait(*trait_type_id); + let span = trait_type.resolve_span(name); + let entered = span.enter(); + let future = Box::pin(CachedTaskType::run_resolve_trait( + *trait_type_id, + name.clone(), + *this, + &**arg, + persistence, + turbo_tasks.pin(), + )); + drop(entered); + TaskExecutionSpec { future, span } } } }