diff --git a/turbopack/crates/turbo-tasks-memory/src/output.rs b/turbopack/crates/turbo-tasks-memory/src/output.rs index ab5564b3b017e..a53b7397b155b 100644 --- a/turbopack/crates/turbo-tasks-memory/src/output.rs +++ b/turbopack/crates/turbo-tasks-memory/src/output.rs @@ -21,13 +21,10 @@ impl Output { /// INVALIDATION: Be careful with this, it will not track dependencies, so /// using it could break cache invalidation. - pub fn read_untracked(&mut self) -> Result { + pub fn read_untracked(&self) -> Result { match &self.content { None => Err(anyhow!("Output is empty")), - Some(OutputContent::Error(err)) => Err(anyhow::Error::new(err.clone())), - Some(OutputContent::Link(raw_vc)) => Ok(*raw_vc), - Some(OutputContent::Panic(Some(message))) => Err(anyhow!("A task panicked: {message}")), - Some(OutputContent::Panic(None)) => Err(anyhow!("A task panicked")), + Some(content) => content.read_untracked(), } } diff --git a/turbopack/crates/turbo-tasks-testing/src/lib.rs b/turbopack/crates/turbo-tasks-testing/src/lib.rs index 842e33e95f2c2..43f6a2d714ff1 100644 --- a/turbopack/crates/turbo-tasks-testing/src/lib.rs +++ b/turbopack/crates/turbo-tasks-testing/src/lib.rs @@ -20,7 +20,7 @@ use turbo_tasks::{ registry, test_helpers::with_turbo_tasks_for_testing, util::{SharedError, StaticOrArc}, - CellId, ExecutionId, InvalidationReason, MagicAny, RawVc, ReadConsistency, TaskId, + CellId, ExecutionId, InvalidationReason, LocalTaskId, MagicAny, RawVc, ReadConsistency, TaskId, TaskPersistence, TraitTypeId, TurboTasksApi, TurboTasksCallApi, }; @@ -242,6 +242,24 @@ impl TurboTasksApi for VcStorage { self.read_own_task_cell(current_task, index) } + fn try_read_local_output( + &self, + parent_task_id: TaskId, + local_task_id: LocalTaskId, + consistency: ReadConsistency, + ) -> Result> { + self.try_read_local_output_untracked(parent_task_id, local_task_id, consistency) + } + + fn try_read_local_output_untracked( + &self, + _parent_task_id: TaskId, + _local_task_id: LocalTaskId, + _consistency: ReadConsistency, + ) -> Result> { + unimplemented!() + } + fn emit_collectible(&self, _trait_type: turbo_tasks::TraitTypeId, _collectible: RawVc) { unimplemented!() } diff --git a/turbopack/crates/turbo-tasks/src/backend.rs b/turbopack/crates/turbo-tasks/src/backend.rs index 0242e460d456e..0da142efca47d 100644 --- a/turbopack/crates/turbo-tasks/src/backend.rs +++ b/turbopack/crates/turbo-tasks/src/backend.rs @@ -123,10 +123,10 @@ impl Display for CachedTaskType { match self { CachedTaskType::Native { .. } => {} CachedTaskType::ResolveNative { .. } => { - f.write_str("[resolve] "); + f.write_str("[resolve] ")?; } CachedTaskType::ResolveTrait { .. } => { - f.write_str("[resolve trait] "); + f.write_str("[resolve trait] ")?; } } f.write_str(&self.get_name()) diff --git a/turbopack/crates/turbo-tasks/src/manager.rs b/turbopack/crates/turbo-tasks/src/manager.rs index 6dd9845b29af2..f20e50eff06f0 100644 --- a/turbopack/crates/turbo-tasks/src/manager.rs +++ b/turbopack/crates/turbo-tasks/src/manager.rs @@ -37,7 +37,7 @@ use crate::{ }, id_factory::{IdFactory, IdFactoryWithReuse}, magic_any::MagicAny, - raw_vc::{CellId, RawVc}, + raw_vc::{self, CellId, RawVc}, registry::{self, get_function}, task::{ local_task::{LocalTask, UnscheduledLocalTask}, @@ -152,23 +152,19 @@ pub trait TurboTasksApi: TurboTasksCallApi + Sync + Send { fn try_read_local_output( &self, - _task_id: TaskId, - _local_output_id: LocalTaskId, - _consistency: ReadConsistency, - ) -> Result> { - todo!("bgw: local outputs"); - } + parent_task_id: TaskId, + local_task_id: LocalTaskId, + consistency: ReadConsistency, + ) -> Result>; /// INVALIDATION: Be careful with this, it will not track dependencies, so /// using it could break cache invalidation. fn try_read_local_output_untracked( &self, - _task: TaskId, - _local_output_id: LocalTaskId, - _consistency: ReadConsistency, - ) -> Result> { - todo!("bgw: local outputs"); - } + parent_task_id: TaskId, + local_task_id: LocalTaskId, + consistency: ReadConsistency, + ) -> Result>; fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap; @@ -431,6 +427,14 @@ impl CurrentGlobalTaskState { } } + fn assert_task_id(&self, expected_task_id: TaskId) { + if self.task_id != expected_task_id { + unimplemented!( + "Local tasks can currently only be scheduled/awaited within their parent task" + ); + } + } + /// Create a [`LocalTask::Unscheduled`]. fn create_local_task( &mut self, @@ -886,12 +890,7 @@ impl TurboTasks { let Some((global_task_state, unscheduled_local_task)) = CURRENT_GLOBAL_TASK_STATE.with(|gts| { let mut gts_write = gts.write().unwrap(); - if parent_task_id != gts_write.task_id { - unimplemented!( - "Local tasks can currently only be scheduled/awaited within their parent \ - task" - ); - } + gts_write.assert_task_id(parent_task_id); let local_task = gts_write.get_mut_local_task(local_task_id); let LocalTask::Unscheduled(unscheduled_local_task) = local_task else { return None; @@ -920,6 +919,13 @@ impl TurboTasks { .map(|func_id| &get_function(func_id).function_meta), ); + #[cfg(feature = "tokio_tracing")] + let description = format!( + "[local] (parent: {}) {}", + self.backend.get_task_description(parent_task_id), + unscheduled_local_task.ty, + ); + let this = self.pin(); let future = async move { let TaskExecutionSpec { future, span } = unscheduled_local_task.start_execution(&*this); @@ -942,10 +948,16 @@ impl TurboTasks { }, }; - CURRENT_GLOBAL_TASK_STATE.with(move |gts| { + let done_event = CURRENT_GLOBAL_TASK_STATE.with(move |gts| { let mut gts_write = gts.write().unwrap(); - *gts_write.get_mut_local_task(local_task_id) = local_task; - }) + let scheduled_task = + std::mem::replace(gts_write.get_mut_local_task(local_task_id), local_task); + let LocalTask::Scheduled { done_event } = scheduled_task else { + panic!("local task finished, but was not in the scheduled state?"); + }; + done_event + }); + done_event.notify(usize::MAX) } .instrument(span) .await @@ -960,17 +972,10 @@ impl TurboTasks { let future = TURBO_TASKS.scope(self.pin(), future).in_current_span(); #[cfg(feature = "tokio_tracing")] - { - let description = format!( - "[local] (parent: {}) {}", - self.backend.get_task_description(parent_task_id), - local_task_ty, - ); - tokio::task::Builder::new() - .name(&description) - .spawn(future) - .unwrap(); - } + tokio::task::Builder::new() + .name(&description) + .spawn(future) + .unwrap(); #[cfg(not(feature = "tokio_tracing"))] tokio::task::spawn(future); } @@ -1454,6 +1459,46 @@ impl TurboTasksApi for TurboTasks { .try_read_own_task_cell_untracked(current_task, index, self) } + fn try_read_local_output( + &self, + parent_task_id: TaskId, + local_task_id: LocalTaskId, + consistency: ReadConsistency, + ) -> Result> { + // we don't currently support reading a local output outside of it's own task, so + // tracked/untracked is currently irrelevant + self.try_read_local_output_untracked(parent_task_id, local_task_id, consistency) + } + + /// INVALIDATION: Be careful with this, it will not track dependencies, so + /// using it could break cache invalidation. + fn try_read_local_output_untracked( + &self, + parent_task_id: TaskId, + local_task_id: LocalTaskId, + // we don't currently support reading a local output outside of it's own task, so + // consistency is currently irrelevant + _consistency: ReadConsistency, + ) -> Result> { + CURRENT_GLOBAL_TASK_STATE.with(|gts| loop { + let gts_read = gts.read().unwrap(); + gts_read.assert_task_id(parent_task_id); + match gts_read.get_local_task(local_task_id) { + LocalTask::Unscheduled(..) => { + drop(gts_read); + self.schedule_local_task(parent_task_id, local_task_id); + continue; + } + LocalTask::Scheduled { done_event } => { + return Ok(Err(done_event.listen())); + } + LocalTask::Done { output } => { + return Ok(Ok(output.read_untracked()?)); + } + } + }) + } + fn read_task_collectibles(&self, task: TaskId, trait_id: TraitTypeId) -> TaskCollectiblesMap { self.backend.read_task_collectibles( task, diff --git a/turbopack/crates/turbo-tasks/src/output.rs b/turbopack/crates/turbo-tasks/src/output.rs index 2fe3057eb7409..5a0ad3a7882b6 100644 --- a/turbopack/crates/turbo-tasks/src/output.rs +++ b/turbopack/crates/turbo-tasks/src/output.rs @@ -3,6 +3,8 @@ use std::{ fmt::{self, Display}, }; +use anyhow::anyhow; + use crate::{util::SharedError, RawVc}; /// A helper type representing the output of a resolved task. @@ -13,13 +15,26 @@ pub enum OutputContent { Panic(Option>>), } +impl OutputContent { + /// INVALIDATION: Be careful with this, it will not track dependencies, so + /// using it could break cache invalidation. + pub fn read_untracked(&self) -> anyhow::Result { + match &self { + Self::Error(err) => Err(anyhow::Error::new(err.clone())), + Self::Link(raw_vc) => Ok(*raw_vc), + Self::Panic(Some(message)) => Err(anyhow!("A task panicked: {message}")), + Self::Panic(None) => Err(anyhow!("A task panicked")), + } + } +} + impl Display for OutputContent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - OutputContent::Link(raw_vc) => write!(f, "link {:?}", raw_vc), - OutputContent::Error(err) => write!(f, "error {}", err), - OutputContent::Panic(Some(message)) => write!(f, "panic {}", message), - OutputContent::Panic(None) => write!(f, "panic"), + Self::Link(raw_vc) => write!(f, "link {:?}", raw_vc), + Self::Error(err) => write!(f, "error {}", err), + Self::Panic(Some(message)) => write!(f, "panic {}", message), + Self::Panic(None) => write!(f, "panic"), } } }