Skip to content

Commit

Permalink
wait for output instead of waiting for task completion
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Feb 5, 2025
1 parent b46ffa9 commit 9753d5b
Showing 1 changed file with 42 additions and 19 deletions.
61 changes: 42 additions & 19 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,31 +412,41 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let mut ctx = self.execute_context(turbo_tasks);
let mut task = ctx.task(task_id, TaskDataCategory::All);

fn listen_to_done_event<B: BackingStorage>(
this: &TurboTasksBackendInner<B>,
reader: Option<TaskId>,
done_event: &Event,
) -> EventListener {
let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
let listener = done_event.listen_with_note(move || {
if let Some(reader_desc) = reader_desc.as_ref() {
format!("try_read_task_output from {}", reader_desc())
} else {
"try_read_task_output (untracked)".to_string()
}
});
listener
}

fn check_in_progress<B: BackingStorage>(
this: &TurboTasksBackendInner<B>,
task: &impl TaskGuard,
reader: Option<TaskId>,
) -> Option<std::result::Result<std::result::Result<RawVc, EventListener>, anyhow::Error>>
{
if let Some(in_progress) = get!(task, InProgress) {
match in_progress {
InProgressState::Scheduled { done_event, .. }
| InProgressState::InProgress(box InProgressStateInner {
done_event, ..
}) => {
let reader_desc = reader.map(|r| this.get_task_desc_fn(r));
let listener = done_event.listen_with_note(move || {
if let Some(reader_desc) = reader_desc.as_ref() {
format!("try_read_task_output from {}", reader_desc())
} else {
"try_read_task_output (untracked)".to_string()
}
});
return Some(Ok(Err(listener)));
}
match get!(task, InProgress) {
Some(InProgressState::Scheduled { done_event, .. }) => {
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
}
Some(InProgressState::InProgress(box InProgressStateInner {
marked_as_completed,
done_event,
..
})) if !*marked_as_completed => {
Some(Ok(Err(listen_to_done_event(this, reader, done_event))))
}
_ => None,
}
None
}

if self.should_track_children() && matches!(consistency, ReadConsistency::Strong) {
Expand Down Expand Up @@ -1162,6 +1172,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
};
let &mut InProgressState::InProgress(box InProgressStateInner {
stale,
ref mut marked_as_completed,
ref done_event,
ref mut new_children,
..
}) = in_progress
Expand Down Expand Up @@ -1194,6 +1206,14 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
);
return true;
}

// mark the task as completed, so dependent tasks can continue working
if !*marked_as_completed {
*marked_as_completed = true;
done_event.notify(usize::MAX);
}

// take the children from the task to process them
let mut new_children = take(new_children);

// TODO handle stateful
Expand Down Expand Up @@ -1452,8 +1472,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {

drop(task);

done_event.notify(usize::MAX);

if let Some(data_update) = data_update {
AggregationUpdateQueue::run(data_update, &mut ctx);
}
Expand Down Expand Up @@ -1740,11 +1758,16 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
let mut task = ctx.task(task, TaskDataCategory::Data);
if let Some(InProgressState::InProgress(box InProgressStateInner {
marked_as_completed,
done_event,
..
})) = get_mut!(task, InProgress)
{
*marked_as_completed = true;
done_event.notify(usize::MAX);
// TODO this should remove the dirty state (also check session_dependent)
// but this would break some assumptions for strongly consistent reads.
// Client tasks are not connected yet, so we wouldn't wait for them.
// Maybe that's ok in cases where mark_finished() is used? Seems like it?
}
}

Expand Down

0 comments on commit 9753d5b

Please sign in to comment.