Skip to content

Commit

Permalink
Rename AggregateRoot to Activeness and split into both sources
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Jan 14, 2025
1 parent 3256ff8 commit d641329
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 65 deletions.
46 changes: 24 additions & 22 deletions turbopack/crates/turbo-tasks-backend/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ use crate::{
ExecuteContext, ExecuteContextImpl, Operation, OutdatedEdge, TaskDirtyCause, TaskGuard,
},
persisted_storage_log::PersistedStorageLog,
storage::{get, get_many, get_mut, iter_many, remove, Storage},
storage::{get, get_many, get_mut, get_mut_or_insert_with, iter_many, remove, Storage},
},
backing_storage::BackingStorage,
data::{
ActiveType, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CachedDataItemType,
CachedDataItemValue, CachedDataItemValueRef, CachedDataUpdate, CellRef, CollectibleRef,
CollectiblesRef, DirtyState, InProgressCellState, InProgressState, OutputValue, RootState,
CollectiblesRef, DirtyState, InProgressCellState, InProgressState, OutputValue, RootType,
},
utils::{bi_map::BiMap, chunked_vec::ChunkedVec, ptr_eq_arc::PtrEqArc, sharded::Sharded},
};
Expand Down Expand Up @@ -447,7 +447,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
.unwrap_or_default()
.get(self.session_id);
if dirty_tasks > 0 || is_dirty {
let root = get!(task, AggregateRoot);
let root = get!(task, Activeness);
let mut task_ids_to_schedule: Vec<_> = Vec::new();
// When there are dirty task, subscribe to the all_clean_event
let root = if let Some(root) = root {
Expand All @@ -456,10 +456,9 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
// If we don't have a root state, add one. This also makes sure all tasks stay
// active and this task won't stale. CachedActiveUntilClean
// is automatically removed when this task is clean.
task.add_new(CachedDataItem::AggregateRoot {
value: RootState::new(ActiveType::CachedActiveUntilClean, task_id),
});
// A newly added AggregateRoot need to make sure to schedule the tasks
get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id))
.set_active_until_clean();
// A newly added Activeness need to make sure to schedule the tasks
task_ids_to_schedule = get_many!(
task,
AggregatedDirtyContainer {
Expand All @@ -471,7 +470,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
if is_dirty {
task_ids_to_schedule.push(task_id);
}
get!(task, AggregateRoot).unwrap()
get!(task, Activeness).unwrap()
};
let listener = root.all_clean_event.listen_with_note(move || {
format!(
Expand Down Expand Up @@ -1373,10 +1372,11 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
};
if !aggregated_update.is_zero() {
if aggregated_update.get(self.session_id) < 0 {
if let Some(root_state) = get!(task, AggregateRoot) {
if let Some(root_state) = get_mut!(task, Activeness) {
root_state.all_clean_event.notify(usize::MAX);
if matches!(root_state.ty, ActiveType::CachedActiveUntilClean) {
task.remove(&CachedDataItemKey::AggregateRoot {});
root_state.unset_active_until_clean();
if root_state.is_empty() {
task.remove(&CachedDataItemKey::Activeness {});
}
}
}
Expand Down Expand Up @@ -1700,8 +1700,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
fn create_transient_task(&self, task_type: TransientTaskType) -> TaskId {
let task_id = self.transient_task_id_factory.get();
let root_type = match task_type {
TransientTaskType::Root(_) => ActiveType::RootTask,
TransientTaskType::Once(_) => ActiveType::OnceTask,
TransientTaskType::Root(_) => RootType::RootTask,
TransientTaskType::Once(_) => RootType::OnceTask,
};
self.transient_tasks.insert(
task_id,
Expand All @@ -1719,13 +1719,14 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
effective: u32::MAX,
},
});
task.add(CachedDataItem::AggregateRoot {
value: RootState::new(root_type, task_id),
let mut activeness_state = ActivenessState::new(task_id);
activeness_state.set_root(root_type);
task.add(CachedDataItem::Activeness {
value: activeness_state,
});
task.add(CachedDataItem::new_scheduled(move || match root_type {
ActiveType::RootTask => "Root Task".to_string(),
ActiveType::OnceTask => "Once Task".to_string(),
_ => unreachable!(),
RootType::RootTask => "Root Task".to_string(),
RootType::OnceTask => "Once Task".to_string(),
}));
}
task_id
Expand All @@ -1744,11 +1745,12 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
dirty_containers.get(self.session_id) > 0
});
if is_dirty || has_dirty_containers {
if let Some(root_state) = get_mut!(task, AggregateRoot) {
if let Some(root_state) = get_mut!(task, Activeness) {
// We will finish the task, but it would be removed after the task is done
root_state.ty = ActiveType::CachedActiveUntilClean;
root_state.unset_root_type();
root_state.set_active_until_clean();
};
} else if let Some(root_state) = remove!(task, AggregateRoot) {
} else if let Some(root_state) = remove!(task, Activeness) {
// Technically nobody should be listening to this event, but just in case
// we notify it anyway
root_state.all_clean_event.notify(usize::MAX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use turbo_tasks::{FxIndexMap, SessionId, TaskId, TraitTypeId};

use crate::{
backend::{
get_mut,
operation::{
invalidate::{make_task_dirty, TaskDirtyCause},
ExecuteContext, Operation, TaskGuard,
Expand All @@ -26,8 +27,8 @@ use crate::{
TaskDataCategory,
},
data::{
ActiveType, AggregationNumber, CachedDataItem, CachedDataItemKey, CollectibleRef,
DirtyContainerCount, RootState,
ActivenessState, AggregationNumber, CachedDataItem, CachedDataItemKey, CollectibleRef,
DirtyContainerCount,
},
utils::deque_set::DequeSet,
};
Expand Down Expand Up @@ -244,9 +245,9 @@ impl AggregatedDataUpdate {
let mut result = Self::default();
if let Some((dirty_container_id, count)) = dirty_container_update {
// When a dirty container count is increased and the task is considered as active
// `AggregateRoot` we need to schedule the dirty tasks in the new dirty container
// we need to schedule the dirty tasks in the new dirty container
let current_session_update = count.get(session_id);
if current_session_update > 0 && task.has_key(&CachedDataItemKey::AggregateRoot {}) {
if current_session_update > 0 && task.has_key(&CachedDataItemKey::Activeness {}) {
queue.push_find_and_schedule_dirty(*dirty_container_id)
}

Expand Down Expand Up @@ -286,10 +287,11 @@ impl AggregatedDataUpdate {
if count.get(session_id) < 0 {
// When the current task is no longer dirty, we need to fire the aggregate
// root events and do some cleanup
if let Some(root_state) = get!(task, AggregateRoot) {
if let Some(root_state) = get_mut!(task, Activeness) {
root_state.all_clean_event.notify(usize::MAX);
if matches!(root_state.ty, ActiveType::CachedActiveUntilClean) {
task.remove(&CachedDataItemKey::AggregateRoot {});
root_state.unset_active_until_clean();
if root_state.is_empty() {
task.remove(&CachedDataItemKey::Activeness {});
}
}
}
Expand Down Expand Up @@ -921,8 +923,8 @@ impl AggregationUpdateQueue {
});
}

if upper.has_key(&CachedDataItemKey::AggregateRoot {}) {
// If the upper node is an `AggregateRoot` we need to schedule the
if upper.has_key(&CachedDataItemKey::Activeness {}) {
// If the upper node is has `Activeness` we need to schedule the
// dirty tasks in the new dirty container
self.push_find_and_schedule_dirty(task_id);
}
Expand Down Expand Up @@ -1025,13 +1027,15 @@ impl AggregationUpdateQueue {
}
}
if is_aggregating_node(get_aggregation_number(&task)) {
// if it has an `AggregateRoot` we can skip visiting the nested nodes since
// this would already be scheduled by the `AggregateRoot`
if !task.has_key(&CachedDataItemKey::AggregateRoot {}) {
// if it has `Activeness` we can skip visiting the nested nodes since
// this would already be scheduled by the `Activeness`
if !task.has_key(&CachedDataItemKey::Activeness {}) {
let dirty_containers: Vec<_> = get_many!(task, AggregatedDirtyContainer { task } count if count.get(session_id) > 0 => task);
if !dirty_containers.is_empty() || dirty {
task.insert(CachedDataItem::AggregateRoot {
value: RootState::new(ActiveType::CachedActiveUntilClean, task_id),
let mut activeness_state = ActivenessState::new(task_id);
activeness_state.set_active_until_clean();
task.insert(CachedDataItem::Activeness {
value: activeness_state,
});

self.extend_find_and_schedule_dirty(dirty_containers);
Expand Down Expand Up @@ -1245,7 +1249,7 @@ impl AggregationUpdateQueue {
get_aggregation_number(&follower)
};
let mut upper_upper_ids_with_new_follower = Vec::new();
let mut is_aggregate_root = false;
let mut is_active = false;
swap_retain(&mut upper_ids, |&mut upper_id| {
let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
// decide if it should be an inner or follower
Expand Down Expand Up @@ -1277,8 +1281,8 @@ impl AggregationUpdateQueue {
false
} else {
// It's an inner node, continue with the list
if upper.has_key(&CachedDataItemKey::AggregateRoot {}) {
is_aggregate_root = true;
if upper.has_key(&CachedDataItemKey::Activeness {}) {
is_active = true;
}
true
}
Expand Down Expand Up @@ -1346,7 +1350,7 @@ impl AggregationUpdateQueue {
drop(follower);
}
}
if is_aggregate_root {
if is_active {
self.push_find_and_schedule_dirty(new_follower_id);
}
if !upper_upper_ids_with_new_follower.is_empty() {
Expand Down Expand Up @@ -1381,12 +1385,12 @@ impl AggregationUpdateQueue {
.collect::<Vec<_>>();

let mut new_followers_of_upper_uppers = Vec::new();
let is_aggregate_root;
let is_active;
let mut upper_upper_ids_for_new_followers = Vec::new();
let upper_aggregation_number;
{
let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
is_aggregate_root = upper.has_key(&CachedDataItemKey::AggregateRoot {});
is_active = upper.has_key(&CachedDataItemKey::Activeness {});
// decide if it should be an inner or follower
upper_aggregation_number = get_aggregation_number(&upper);

Expand Down Expand Up @@ -1493,7 +1497,7 @@ impl AggregationUpdateQueue {
});
}
}
if is_aggregate_root {
if is_active {
self.extend_find_and_schedule_dirty(
followers_with_aggregation_number
.into_iter()
Expand Down Expand Up @@ -1527,7 +1531,7 @@ impl AggregationUpdateQueue {
};

let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
if upper.has_key(&CachedDataItemKey::AggregateRoot {}) {
if upper.has_key(&CachedDataItemKey::Activeness {}) {
self.push_find_and_schedule_dirty(new_follower_id);
}
// decide if it should be an inner or follower
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ pub fn make_task_dirty_internal(
AggregatedDataUpdate::new().dirty_container_update(task_id, aggregated_update),
));
}
task.has_key(&CachedDataItemKey::AggregateRoot {})
task.has_key(&CachedDataItemKey::Activeness {})
} else {
true
};
Expand Down
14 changes: 14 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,11 @@ pub trait TaskGuard: Debug {
fn remove(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValue>;
fn get(&self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRef<'_>>;
fn get_mut(&mut self, key: &CachedDataItemKey) -> Option<CachedDataItemValueRefMut<'_>>;
fn get_mut_or_insert_with(
&mut self,
key: &CachedDataItemKey,
insert: impl FnOnce() -> CachedDataItemValue,
) -> CachedDataItemValueRefMut<'_>;
fn has_key(&self, key: &CachedDataItemKey) -> bool;
fn count(&self, ty: CachedDataItemType) -> usize;
fn iter(
Expand Down Expand Up @@ -596,6 +601,15 @@ impl<B: BackingStorage> TaskGuard for TaskGuardImpl<'_, B> {
self.task.get_mut(key)
}

fn get_mut_or_insert_with(
&mut self,
key: &CachedDataItemKey,
insert: impl FnOnce() -> CachedDataItemValue,
) -> CachedDataItemValueRefMut<'_> {
self.check_access(key.category());
self.task.get_mut_or_insert_with(key, insert)
}

fn has_key(&self, key: &CachedDataItemKey) -> bool {
self.check_access(key.category());
self.task.has_key(key)
Expand Down
28 changes: 28 additions & 0 deletions turbopack/crates/turbo-tasks-backend/src/backend/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ impl InnerStorage {
self.get_map_mut(key.ty()).and_then(|m| m.get_mut(key))
}

pub fn get_mut_or_insert_with(
&mut self,
key: &CachedDataItemKey,
f: impl FnOnce() -> CachedDataItemValue,
) -> CachedDataItemValueRefMut<'_> {
self.get_or_create_map_mut(key.ty())
.get_mut_or_insert_with(&key, f)
}

pub fn has_key(&self, key: &CachedDataItemKey) -> bool {
self.get_map(key.ty())
.map(|m| m.contains_key(key))
Expand Down Expand Up @@ -366,6 +375,24 @@ macro_rules! get_mut {
};
}

macro_rules! get_mut_or_insert_with {
($task:ident, $key:ident $input:tt, $f:expr) => {{
#[allow(unused_imports)]
use $crate::backend::operation::TaskGuard;
let () = $crate::data::allow_mut_access::$key;
let functor = $f;
let $crate::data::CachedDataItemValueRefMut::$key {
value,
} = $task.get_mut_or_insert_with(&$crate::data::CachedDataItemKey::$key $input, move || $crate::data::CachedDataItemValue::$key { value: functor() }) else {
unreachable!()
};
value
}};
($task:ident, $key:ident, $f:expr) => {
$crate::backend::storage::get_mut_or_insert_with!($task, $key {}, $f)
};
}

/// Creates an iterator over all [`CachedDataItemKey::$key`][crate::data::CachedDataItemKey]s in
/// `$task` matching the given `$key_pattern`, optional `$value_pattern`, and optional `if $cond`.
///
Expand Down Expand Up @@ -530,6 +557,7 @@ pub(crate) use count;
pub(crate) use get;
pub(crate) use get_many;
pub(crate) use get_mut;
pub(crate) use get_mut_or_insert_with;
pub(crate) use iter_many;
pub(crate) use remove;
pub(crate) use update;
Expand Down
Loading

0 comments on commit d641329

Please sign in to comment.