Skip to content

Commit

Permalink
add active_counter as source of activeness
Browse files Browse the repository at this point in the history
  • Loading branch information
sokra committed Jan 14, 2025
1 parent 2ee138d commit d761c84
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use turbo_tasks::{FxIndexMap, SessionId, TaskId, TraitTypeId};

use crate::{
backend::{
get_mut,
get_mut, get_mut_or_insert_with,
operation::{
invalidate::{make_task_dirty, TaskDirtyCause},
ExecuteContext, Operation, TaskGuard,
Expand Down Expand Up @@ -138,6 +138,14 @@ pub enum AggregationUpdateJob {
task_ids: SmallVec<[TaskId; 4]>,
collectible_type: TraitTypeId,
},
/// Increases the active counter of the task
IncreaseActiveCount { task: TaskId },
/// Increases the active counters of the tasks
IncreaseActiveCounts { task_ids: Vec<TaskId> },
/// Decreases the active counter of the task
DecreaseActiveCount { task: TaskId },
/// Decreases the active counters of the tasks
DecreaseActiveCounts { task_ids: Vec<TaskId> },
/// Balances the edges of the graph. This checks if the graph invariant is still met for this
/// edge and coverts a upper edge to a follower edge or vice versa. Balancing might triggers
/// more changes to the structure.
Expand Down Expand Up @@ -776,6 +784,32 @@ impl AggregationUpdateQueue {
);
}
}
AggregationUpdateJob::DecreaseActiveCount { task } => {
self.decrease_active_count(ctx, task);
}
AggregationUpdateJob::DecreaseActiveCounts { mut task_ids } => {
if let Some(task_id) = task_ids.pop() {
self.decrease_active_count(ctx, task_id);
if !task_ids.is_empty() {
self.jobs.push_front(AggregationUpdateJobItem::new(
AggregationUpdateJob::DecreaseActiveCounts { task_ids },
));
}
}
}
AggregationUpdateJob::IncreaseActiveCount { task } => {
self.increase_active_count(ctx, task);
}
AggregationUpdateJob::IncreaseActiveCounts { mut task_ids } => {
if let Some(task_id) = task_ids.pop() {
self.increase_active_count(ctx, task_id);
if !task_ids.is_empty() {
self.jobs.push_front(AggregationUpdateJobItem::new(
AggregationUpdateJob::IncreaseActiveCounts { task_ids },
));
}
}
}
}
false
} else if !self.number_updates.is_empty() {
Expand Down Expand Up @@ -907,6 +941,8 @@ impl AggregationUpdateQueue {
// followers
let data = AggregatedDataUpdate::from_task(&mut task);
let followers = get_followers(&task);
let has_active_count =
get!(task, Activeness).is_some_and(|a| a.active_counter > 0);
let diff = data.apply(&mut upper, ctx.session_id(), self);

if !upper_ids.is_empty() && !diff.is_empty() {
Expand All @@ -917,6 +953,12 @@ impl AggregationUpdateQueue {
});
}
if !followers.is_empty() {
if has_active_count {
// TODO combine both operations to avoid the clone
self.push(AggregationUpdateJob::IncreaseActiveCounts {
task_ids: followers.clone(),
})
}
self.push(AggregationUpdateJob::InnerOfUpperHasNewFollowers {
upper_id,
new_follower_ids: followers,
Expand Down Expand Up @@ -960,6 +1002,10 @@ impl AggregationUpdateQueue {

// Add the same amount of follower edges
if update_count!(upper, Follower { task: task_id }, count) {
// update active count
if get!(task, Activeness).is_some_and(|a| a.active_counter > 0) {
self.push(AggregationUpdateJob::IncreaseActiveCount { task: task_id });
}
// notify uppers about new follower
if !upper_ids.is_empty() {
self.push(AggregationUpdateJob::InnerOfUppersHasNewFollower {
Expand Down Expand Up @@ -1037,6 +1083,7 @@ impl AggregationUpdateQueue {
task.insert(CachedDataItem::Activeness {
value: activeness_state,
});
drop(task);

self.extend_find_and_schedule_dirty(dirty_containers);
}
Expand Down Expand Up @@ -1139,7 +1186,17 @@ impl AggregationUpdateQueue {
},
-1
) {
let has_active_count =
get!(upper, Activeness).is_some_and(|a| a.active_counter > 0);
let upper_ids = get_uppers(&upper);
drop(upper);
// update active count
if has_active_count {
self.push(AggregationUpdateJob::DecreaseActiveCount {
task: lost_follower_id,
});
}
// notify uppers about new follower
if !upper_ids.is_empty() {
self.push(AggregationUpdateJob::InnerOfUppersLostFollower {
upper_ids,
Expand Down Expand Up @@ -1224,6 +1281,16 @@ impl AggregationUpdateQueue {
-1
) {
let upper_ids = get_uppers(&upper);
let has_active_count =
get!(upper, Activeness).is_some_and(|a| a.active_counter > 0);
drop(upper);
// update active count
if has_active_count {
self.push(AggregationUpdateJob::DecreaseActiveCount {
task: lost_follower_id,
});
}
// notify uppers about new follower
if !upper_ids.is_empty() {
self.push(AggregationUpdateJob::InnerOfUppersLostFollower {
upper_ids,
Expand All @@ -1249,6 +1316,7 @@ impl AggregationUpdateQueue {
get_aggregation_number(&follower)
};
let mut upper_upper_ids_with_new_follower = Vec::new();
let mut tasks_for_which_increment_active_count = Vec::new();
let mut is_active = false;
swap_retain(&mut upper_ids, |&mut upper_id| {
let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
Expand All @@ -1266,6 +1334,11 @@ impl AggregationUpdateQueue {
},
1
) {
// update active count
if get!(upper, Activeness).is_some_and(|a| a.active_counter > 0) {
tasks_for_which_increment_active_count.push(new_follower_id);
}
// notify uppers about new follower
upper_upper_ids_with_new_follower.extend(iter_uppers(&upper));
}

Expand Down Expand Up @@ -1353,6 +1426,11 @@ impl AggregationUpdateQueue {
if is_active {
self.push_find_and_schedule_dirty(new_follower_id);
}
if !tasks_for_which_increment_active_count.is_empty() {
self.push(AggregationUpdateJob::IncreaseActiveCounts {
task_ids: tasks_for_which_increment_active_count,
});
}
if !upper_upper_ids_with_new_follower.is_empty() {
#[cfg(feature = "trace_aggregation_update")]
let _span = trace_span!("new follower").entered();
Expand Down Expand Up @@ -1386,11 +1464,14 @@ impl AggregationUpdateQueue {

let mut new_followers_of_upper_uppers = Vec::new();
let is_active;
let has_active_count;
let mut upper_upper_ids_for_new_followers = Vec::new();
let upper_aggregation_number;
{
let mut upper = ctx.task(upper_id, TaskDataCategory::Meta);
is_active = upper.has_key(&CachedDataItemKey::Activeness {});
let activeness_state = get!(upper, Activeness);
is_active = activeness_state.is_some();
has_active_count = activeness_state.is_some_and(|a| a.active_counter > 0);
// decide if it should be an inner or follower
upper_aggregation_number = get_aggregation_number(&upper);

Expand Down Expand Up @@ -1505,14 +1586,23 @@ impl AggregationUpdateQueue {
);
}
}
// notify uppers about new follower
if !upper_upper_ids_for_new_followers.is_empty() {
if !new_followers_of_upper_uppers.is_empty() {
#[cfg(feature = "trace_aggregation_update")]
let _span = trace_span!("new follower").entered();
self.push(AggregationUpdateJob::InnerOfUppersHasNewFollowers {
upper_ids: upper_upper_ids_for_new_followers,
new_follower_ids: new_followers_of_upper_uppers,
});
// update active count
if has_active_count {
// TODO combine both operations to avoid the clone
self.push(AggregationUpdateJob::IncreaseActiveCounts {
task_ids: new_followers_of_upper_uppers.clone(),
});
}
// notify uppers about new follower
if !upper_upper_ids_for_new_followers.is_empty() {
self.push(AggregationUpdateJob::InnerOfUppersHasNewFollowers {
upper_ids: upper_upper_ids_for_new_followers,
new_follower_ids: new_followers_of_upper_uppers,
});
}
}
}

Expand Down Expand Up @@ -1551,8 +1641,17 @@ impl AggregationUpdateQueue {
},
1
) {
let has_active_count =
get!(upper, Activeness).is_some_and(|a| a.active_counter > 0);
let upper_ids = get_uppers(&upper);
drop(upper);
// update active count
if has_active_count {
self.push(AggregationUpdateJob::IncreaseActiveCount {
task: new_follower_id,
});
}
// notify uppers about new follower
if !upper_ids.is_empty() {
self.push(AggregationUpdateJob::InnerOfUppersHasNewFollower {
upper_ids,
Expand Down Expand Up @@ -1610,6 +1709,52 @@ impl AggregationUpdateQueue {
}
}

fn decrease_active_count(&mut self, ctx: &mut impl ExecuteContext, task_id: TaskId) {
#[cfg(feature = "trace_aggregation_update")]
let _span = trace_span!("decrease active count").entered();

let mut task = ctx.task(task_id, TaskDataCategory::Meta);
let state = get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id));
let is_zero = state.decrement_active_counter();
let is_empty = state.is_empty();
if is_empty {
task.remove(&CachedDataItemKey::Activeness {});
}
if is_zero {
let followers = get_followers(&task);
drop(task);
if !followers.is_empty() {
self.push(AggregationUpdateJob::DecreaseActiveCounts {
task_ids: followers,
});
}
}
}

fn increase_active_count(&mut self, ctx: &mut impl ExecuteContext, task_id: TaskId) {
#[cfg(feature = "trace_aggregation_update")]
let _span = trace_span!("increase active count").entered();

let mut task = ctx.task(task_id, TaskDataCategory::Meta);
let state = get_mut_or_insert_with!(task, Activeness, || ActivenessState::new(task_id));
let is_positive = state.increment_active_counter();
let is_empty = state.is_empty();
// This can happen if active count is negative before
if is_empty {
task.remove(&CachedDataItemKey::Activeness {});
}
if is_positive {
let followers = get_followers(&task);
drop(task);
if !followers.is_empty() {
self.push(AggregationUpdateJob::IncreaseActiveCounts {
task_ids: followers,
});
}
self.push_find_and_schedule_dirty(task_id);
}
}

fn update_aggregation_number(
&mut self,
ctx: &mut impl ExecuteContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use turbo_tasks::{TaskId, ValueTypeId};

use crate::{
backend::{
get,
operation::{
aggregation_update::{
get_aggregation_number, get_uppers, is_aggregating_node, AggregationUpdateJob,
Expand Down Expand Up @@ -89,6 +90,13 @@ impl Operation for CleanupOldEdgesOperation {
});
} else {
let upper_ids = get_uppers(&task);
if get!(task, Activeness).is_some_and(|a| a.active_counter > 0)
{
// TODO combine both operations to avoid the clone
queue.push(AggregationUpdateJob::DecreaseActiveCounts {
task_ids: children.clone(),
});
}
queue.push(AggregationUpdateJob::InnerOfUppersLostFollowers {
upper_ids,
lost_follower_ids: children,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ impl ConnectChildOperation {
}) {
let mut queue = AggregationUpdateQueue::new();

if get!(parent_task, Activeness).is_some_and(|a| a.active_counter > 0) {
queue.push(AggregationUpdateJob::IncreaseActiveCount {
task: child_task_id,
})
}

// Get the children count
let children_count = count!(parent_task, Child);

Expand Down
17 changes: 16 additions & 1 deletion turbopack/crates/turbo-tasks-backend/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ impl OutputValue {

#[derive(Debug)]
pub struct ActivenessState {
/// When this counter is > 0, the task is active.
pub active_counter: i32,
/// The task is a root or once task and is active due to that.
pub root_ty: Option<RootType>,
/// The subgraph is active as long it's dirty. Once it become clean, it will unset this flag.
Expand All @@ -86,6 +88,7 @@ pub struct ActivenessState {
impl ActivenessState {
pub fn new(id: TaskId) -> Self {
Self {
active_counter: 0,
root_ty: None,
active_until_clean: false,
all_clean_event: Event::new(move || {
Expand All @@ -108,6 +111,18 @@ impl ActivenessState {
self.active_until_clean = true;
}

/// Increment the active counter and return true if the counter was 0 before.
pub fn increment_active_counter(&mut self) -> bool {
self.active_counter += 1;
self.active_counter == 1
}

/// Decrement the active counter and return true if the counter is 0 after.
pub fn decrement_active_counter(&mut self) -> bool {
self.active_counter -= 1;
self.active_counter == 0
}

pub fn unset_root_type(&mut self) {
self.root_ty = None;
}
Expand All @@ -117,7 +132,7 @@ impl ActivenessState {
}

pub fn is_empty(&self) -> bool {
self.root_ty.is_none() && !self.active_until_clean
self.root_ty.is_none() && !self.active_until_clean && self.active_counter == 0
}
}

Expand Down

0 comments on commit d761c84

Please sign in to comment.