From dd2aa4123a5dcd988a857256b8e794783270e608 Mon Sep 17 00:00:00 2001 From: Andriy Berestovskyy Date: Wed, 27 Nov 2024 22:11:20 +0100 Subject: [PATCH] feat: EXC-1787: Simplify scheduler implementation - fixes scheduler divergence - simplifies the scheduler down to accumulated priority and long execution mode - uses round-robin partitioning instead of complicated algorithm (RUN-320) --- rs/execution_environment/benches/scheduler.rs | 18 +- .../tests/canister_snapshots.rs | 1 - rs/execution_environment/src/scheduler.rs | 2 +- .../src/scheduler/round_schedule.rs | 219 ++++-------------- .../src/scheduler/tests.rs | 214 +++++++++++++++-- .../v1/canister_state_bits.proto | 2 +- .../gen/state/state.canister_state_bits.v1.rs | 2 - rs/replicated_state/src/canister_state.rs | 9 - rs/state_layout/src/state_layout.rs | 3 - rs/state_layout/src/state_layout/tests.rs | 1 - rs/state_manager/src/checkpoint.rs | 1 - rs/state_manager/src/tip.rs | 1 - rs/types/types/src/lib.rs | 47 ++-- 13 files changed, 257 insertions(+), 263 deletions(-) diff --git a/rs/execution_environment/benches/scheduler.rs b/rs/execution_environment/benches/scheduler.rs index aa537716fb2..13cf3a44d0d 100644 --- a/rs/execution_environment/benches/scheduler.rs +++ b/rs/execution_environment/benches/scheduler.rs @@ -9,8 +9,7 @@ use std::collections::BTreeMap; fn main() { let mut canisters = BTreeMap::new(); - let mut ordered_new_execution_canister_ids = Vec::new(); - let mut ordered_long_execution_canister_ids = Vec::new(); + let mut ordered_canister_ids = Vec::new(); for i in 0..50_000 { let canister_id = canister_test_id(i); let scheduler_state = SchedulerState::default(); @@ -25,22 +24,11 @@ fn main() { CanisterState::new(system_state, None, scheduler_state), ); - if i % 10 == 0 { - ordered_long_execution_canister_ids.push(canister_id); - } else { - ordered_new_execution_canister_ids.push(canister_id); - } + ordered_canister_ids.push(canister_id); } let scheduler_cores = 4; - let long_execution_cores = 1; - let round_schedule = RoundSchedule::new( - scheduler_cores, - long_execution_cores, - 0, - ordered_new_execution_canister_ids, - ordered_long_execution_canister_ids, - ); + let round_schedule = RoundSchedule::new(scheduler_cores, 0, ordered_canister_ids); let mut criterion = Criterion::default(); let mut group = criterion.benchmark_group("RoundSchedule"); diff --git a/rs/execution_environment/src/execution_environment/tests/canister_snapshots.rs b/rs/execution_environment/src/execution_environment/tests/canister_snapshots.rs index 35c53bfa8cf..57ca4083532 100644 --- a/rs/execution_environment/src/execution_environment/tests/canister_snapshots.rs +++ b/rs/execution_environment/src/execution_environment/tests/canister_snapshots.rs @@ -1894,7 +1894,6 @@ fn canister_snapshot_change_guard_do_not_modify_without_reading_doc_comment() { last_full_execution_round: _, compute_allocation: _, accumulated_priority: _, - priority_credit: _, long_execution_mode: _, heap_delta_debit: _, install_code_debit: _, diff --git a/rs/execution_environment/src/scheduler.rs b/rs/execution_environment/src/scheduler.rs index 93bdee66015..0a9b1638a0f 100644 --- a/rs/execution_environment/src/scheduler.rs +++ b/rs/execution_environment/src/scheduler.rs @@ -1681,7 +1681,7 @@ impl FilteredCanisters { rate_limited_ids: &[CanisterId], ) { self.active_canister_ids - .extend(active_round_schedule.iter()); + .extend(active_round_schedule.ordered_canister_ids.iter()); self.rate_limited_canister_ids .extend(rate_limited_ids.iter()); } diff --git a/rs/execution_environment/src/scheduler/round_schedule.rs b/rs/execution_environment/src/scheduler/round_schedule.rs index e7f196ae8c9..f76bd956db2 100644 --- a/rs/execution_environment/src/scheduler/round_schedule.rs +++ b/rs/execution_environment/src/scheduler/round_schedule.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use ic_base_types::{CanisterId, NumBytes}; use ic_config::flag_status::FlagStatus; @@ -29,18 +29,6 @@ pub(super) struct CanisterRoundState { pub(super) has_aborted_or_paused_execution: bool, } -/// Represents three ordered active Canister ID groups to schedule. -/// TODO(RUN-320): remove, as it's not required for regular partitioning -#[derive(Debug, Default)] -pub(super) struct SchedulingOrder { - /// Prioritized long executions. - pub prioritized_long_canister_ids: P, - /// New executions. - pub new_canister_ids: N, - /// To be executed when the Canisters from previous two groups are idle. - pub opportunistic_long_canister_ids: R, -} - /// Represents the order in which the Canister IDs are be scheduled /// during the whole current round. /// TODO(RUN-320): remove, as it's not required for regular partitioning @@ -48,67 +36,22 @@ pub(super) struct SchedulingOrder { pub struct RoundSchedule { /// Total number of scheduler cores. pub scheduler_cores: usize, - /// Number of cores dedicated for long executions. - pub long_execution_cores: usize, // Sum of all canisters compute allocation in percent. pub total_compute_allocation_percent: i64, /// Ordered Canister IDs with new executions. - pub ordered_new_execution_canister_ids: Vec, - /// Ordered Canister IDs with long executions. - pub ordered_long_execution_canister_ids: Vec, + pub ordered_canister_ids: Vec, } impl RoundSchedule { pub fn new( scheduler_cores: usize, - long_execution_cores: usize, total_compute_allocation_percent: i64, - ordered_new_execution_canister_ids: Vec, - ordered_long_execution_canister_ids: Vec, + ordered_canister_ids: Vec, ) -> Self { RoundSchedule { scheduler_cores, - long_execution_cores: long_execution_cores - .min(ordered_long_execution_canister_ids.len()), total_compute_allocation_percent, - ordered_new_execution_canister_ids, - ordered_long_execution_canister_ids, - } - } - - pub(super) fn iter(&self) -> impl Iterator { - self.ordered_long_execution_canister_ids - .iter() - .chain(self.ordered_new_execution_canister_ids.iter()) - } - - pub(super) fn scheduling_order( - &self, - ) -> SchedulingOrder< - impl Iterator, - impl Iterator, - impl Iterator, - > { - SchedulingOrder { - // To guarantee progress and minimize the potential waste of an abort, top - // `long_execution_cores` canisters with prioritized long execution mode and highest - // priority get scheduled on long execution cores. - prioritized_long_canister_ids: self - .ordered_long_execution_canister_ids - .iter() - .take(self.long_execution_cores), - // Canisters with no pending long executions get scheduled across new execution - // cores according to their round priority as the regular scheduler does. This will - // guarantee their reservations; and ensure low latency except immediately after a long - // message execution. - new_canister_ids: self.ordered_new_execution_canister_ids.iter(), - // Remaining canisters with long pending executions get scheduled across - // all cores according to their priority order, starting from the next available core onto which a new - // execution canister would have been scheduled. - opportunistic_long_canister_ids: self - .ordered_long_execution_canister_ids - .iter() - .skip(self.long_execution_cores), + ordered_canister_ids, } } @@ -120,7 +63,7 @@ impl RoundSchedule { round_id: ExecutionRound, is_first_iteration: bool, ) { - for canister_id in self.ordered_new_execution_canister_ids.iter() { + for canister_id in self.ordered_canister_ids.iter() { let canister = canisters.get_mut(canister_id); if let Some(canister) = canister { let next_execution = canister.next_execution(); @@ -158,8 +101,8 @@ impl RoundSchedule { // Collect all active canisters and their next executions. // - // It is safe to use a `HashMap`, as we'll only be doing lookups. - let canister_next_executions: HashMap<_, _> = canisters + // It is safe to use a `HashSet`, as we'll only be doing lookups. + let canister_next_executions: HashSet<_> = canisters .iter() .filter_map(|(canister_id, canister)| { if rate_limiting_of_heap_delta == FlagStatus::Enabled @@ -174,104 +117,50 @@ impl RoundSchedule { match next_execution { // Filter out canisters with no messages or with paused installations. NextExecution::None | NextExecution::ContinueInstallCode => None, - - NextExecution::StartNew | NextExecution::ContinueLong => { - Some((canister_id, next_execution)) + NextExecution::StartNew => { + match canister.scheduler_state.long_execution_mode { + LongExecutionMode::Opportunistic => Some(*canister_id), + // Break prioritized canister execution, as the canister just finished + // its long execution. + LongExecutionMode::Prioritized => None, + } } + NextExecution::ContinueLong => Some(*canister_id), } }) .collect(); - let ordered_new_execution_canister_ids = self - .ordered_new_execution_canister_ids + let active_canister_ids = self + .ordered_canister_ids .iter() - .filter(|canister_id| canister_next_executions.contains_key(canister_id)) - .cloned() - .collect(); - - let ordered_long_execution_canister_ids = self - .ordered_long_execution_canister_ids - .iter() - .filter( - |canister_id| match canister_next_executions.get(canister_id) { - Some(NextExecution::ContinueLong) => true, - - // We expect long execution, but there is none, - // so the long execution was finished in the - // previous inner round. - // - // We should avoid scheduling this canister to: - // 1. Avoid the canister to bypass the logic in - // `apply_scheduling_strategy()`. - // 2. Charge canister for resources at the end - // of the round. - Some(NextExecution::StartNew) => false, - - None // No such canister. Should not happen. - | Some(NextExecution::None) // Idle canister. - | Some(NextExecution::ContinueInstallCode) // Subnet message. - => false, - }, - ) + .filter(|canister_id| canister_next_executions.contains(canister_id)) .cloned() .collect(); ( RoundSchedule::new( self.scheduler_cores, - self.long_execution_cores, self.total_compute_allocation_percent, - ordered_new_execution_canister_ids, - ordered_long_execution_canister_ids, + active_canister_ids, ), rate_limited_canister_ids, ) } - /// Partitions the executable Canisters to the available cores for execution. + /// Partitions round-robin the executable Canisters to the available cores. /// /// Returns the executable Canisters partitioned by cores and a map of /// the non-executable Canisters. - /// - /// ## Example - /// - /// Given a round schedule with: - /// - /// * 1 long execution core - /// * 3 Canisters (ids 1-3) with pending long executions - /// * 5 Canisters (ids 4-8) with new executions - /// - /// The function will produce the following result: - /// - /// * Core 1 (long execution core) takes: `CanisterId 1`, `CanisterId 3` - /// * Core 2 takes: `CanisterId 4`, `CanisterId 6`, `CanisterId 8` - /// * Core 3 takes: `CanisterId 5`, `CanisterId 7`, `CanisterId 2` pub(super) fn partition_canisters_to_cores( &self, mut canisters: BTreeMap, ) -> (Vec>, BTreeMap) { let mut canisters_partitioned_by_cores = vec![vec![]; self.scheduler_cores]; - let mut idx = 0; - let scheduling_order = self.scheduling_order(); - for canister_id in scheduling_order.prioritized_long_canister_ids { - let canister_state = canisters.remove(canister_id).unwrap(); - canisters_partitioned_by_cores[idx].push(canister_state); - idx += 1; - } - let last_prioritized_long = idx; - let new_execution_cores = self.scheduler_cores - last_prioritized_long; - debug_assert!(new_execution_cores > 0); - for canister_id in scheduling_order.new_canister_ids { - let canister_state = canisters.remove(canister_id).unwrap(); - canisters_partitioned_by_cores[idx].push(canister_state); - idx = last_prioritized_long - + (idx - last_prioritized_long + 1) % new_execution_cores.max(1); - } - for canister_id in scheduling_order.opportunistic_long_canister_ids { + let scheduler_cores = self.scheduler_cores.max(1); + for (idx, canister_id) in self.ordered_canister_ids.iter().enumerate() { let canister_state = canisters.remove(canister_id).unwrap(); - canisters_partitioned_by_cores[idx].push(canister_state); - idx = (idx + 1) % self.scheduler_cores; + canisters_partitioned_by_cores[idx % scheduler_cores].push(canister_state); } (canisters_partitioned_by_cores, canisters) @@ -318,7 +207,7 @@ impl RoundSchedule { for canister_id in fully_executed_canister_ids { if let Some(canister) = canister_states.get_mut(&canister_id) { total_charged_priority += 100 * multiplier; - canister.scheduler_state.priority_credit += (100 * multiplier).into(); + canister.scheduler_state.accumulated_priority -= (100 * multiplier).into(); } } @@ -338,7 +227,7 @@ impl RoundSchedule { let has_aborted_or_paused_execution = canister.has_aborted_execution() || canister.has_paused_execution(); if !has_aborted_or_paused_execution { - RoundSchedule::apply_priority_credit(canister); + canister.scheduler_state.long_execution_mode = LongExecutionMode::default(); } } } @@ -361,7 +250,6 @@ impl RoundSchedule { round_states.sort_by_key(|rs| { ( std::cmp::Reverse(rs.long_execution_mode), - std::cmp::Reverse(rs.has_aborted_or_paused_execution), std::cmp::Reverse(rs.accumulated_priority), rs.canister_id, ) @@ -416,19 +304,17 @@ impl RoundSchedule { let mut accumulated_priority_invariant = AccumulatedPriority::default(); let mut accumulated_priority_deviation = 0; for (&canister_id, canister) in canister_states.iter_mut() { - if is_reset_round { + let accumulated_priority = if is_reset_round { // By default, each canister accumulated priority is set to its compute allocation. - canister.scheduler_state.accumulated_priority = - (canister.scheduler_state.compute_allocation.as_percent() as i64 * multiplier) - .into(); - canister.scheduler_state.priority_credit = Default::default(); - } - + (canister.scheduler_state.compute_allocation.as_percent() as i64 * multiplier) + .into() + } else { + canister.scheduler_state.accumulated_priority + }; let has_aborted_or_paused_execution = canister.has_aborted_execution() || canister.has_paused_execution(); let compute_allocation = canister.scheduler_state.compute_allocation; - let accumulated_priority = canister.scheduler_state.accumulated_priority; round_states.push(CanisterRoundState { canister_id, accumulated_priority, @@ -477,10 +363,12 @@ impl RoundSchedule { .saturating_sub(total_compute_allocation_percent) * scheduler_cores as i64; + Self::order_canister_round_states(&mut round_states); + // Compute `long_execution_compute_allocation`. let mut long_executions_compute_allocation = 0; let mut number_of_long_executions = 0; - for rs in round_states.iter_mut() { + for rs in round_states.iter().take(scheduler_cores) { // De-facto compute allocation includes bonus allocation let factual = rs.compute_allocation.as_percent() as i64 * multiplier + free_capacity_per_canister; @@ -492,10 +380,11 @@ impl RoundSchedule { } } - // Compute the number of long execution cores by dividing - // `long_execution_compute_allocation` by `100%` and rounding up - // (as one scheduler core is reserved to guarantee long executions progress). - // The `long_execution_compute_allocation` is in multiplied percent. + // Count long execution cores by dividing `long_execution_compute_allocation` + // by `100%` and rounding up (as one scheduler core is reserved to guarantee + // long executions progress). + // Note, the `long_execution_compute_allocation` is in percent multiplied + // by the `multiplier`. let long_execution_cores = ((long_executions_compute_allocation + 100 * multiplier - 1) / (100 * multiplier)) as usize; // If there are long executions, the `long_execution_cores` must be non-zero. @@ -519,43 +408,21 @@ impl RoundSchedule { scheduler_cores ); - Self::order_canister_round_states(&mut round_states); - let round_schedule = RoundSchedule::new( scheduler_cores, - long_execution_cores, total_compute_allocation_percent, - round_states - .iter() - .skip(number_of_long_executions) - .map(|rs| rs.canister_id) - .collect(), - round_states - .iter() - .take(number_of_long_executions) - .map(|rs| rs.canister_id) - .collect(), + round_states.iter().map(|rs| rs.canister_id).collect(), ); - for canister_id in round_schedule - .ordered_long_execution_canister_ids + for rs in round_states .iter() + .filter(|rs| rs.has_aborted_or_paused_execution) .take(long_execution_cores) { - let canister = canister_states.get_mut(canister_id).unwrap(); + let canister = canister_states.get_mut(&rs.canister_id).unwrap(); canister.scheduler_state.long_execution_mode = LongExecutionMode::Prioritized; } round_schedule } - - /// Applies priority credit and resets long execution mode. - pub fn apply_priority_credit(canister: &mut CanisterState) { - canister.scheduler_state.accumulated_priority -= - std::mem::take(&mut canister.scheduler_state.priority_credit); - // Aborting a long-running execution moves the canister to the - // default execution mode because the canister does not have a - // pending execution anymore. - canister.scheduler_state.long_execution_mode = LongExecutionMode::default(); - } } diff --git a/rs/execution_environment/src/scheduler/tests.rs b/rs/execution_environment/src/scheduler/tests.rs index a6bf12432a2..09e64f847db 100644 --- a/rs/execution_environment/src/scheduler/tests.rs +++ b/rs/execution_environment/src/scheduler/tests.rs @@ -1852,6 +1852,156 @@ fn max_canisters_per_round() { assert_eq!(executed_canisters, 200 + 2 * 5); } +fn run_scheduler_test( + rounds: usize, + scheduler_cores: usize, + canisters_with_no_cycles: usize, + active_canisters_with_no_cycles: usize, + canisters_with_cycles: usize, + active_canisters_with_cycles: usize, + canisters_with_long_executions: usize, + active_canisters_with_long_executions: usize, + short_execution_instructions: u64, + long_execution_instructions: u64, +) -> SchedulerTest { + let multiplier = scheduler_cores + * (canisters_with_no_cycles + canisters_with_cycles + canisters_with_long_executions); + + let mut test = SchedulerTestBuilder::new() + .with_scheduler_config(SchedulerConfig { + scheduler_cores, + ..SchedulerConfig::application_subnet() + }) + .build(); + + // Bump up the round number to 1. + test.execute_round(ExecutionRoundType::OrdinaryRound); + + let mut no_cycles_ids = vec![]; + for _ in 0..canisters_with_no_cycles { + let canister_id = test.create_canister_with( + Cycles::new(0), + ComputeAllocation::zero(), + MemoryAllocation::BestEffort, + None, + None, + None, + ); + no_cycles_ids.push(canister_id); + } + let mut cycles_ids = vec![]; + for _ in 0..canisters_with_cycles { + let canister_id = test.create_canister(); + cycles_ids.push(canister_id); + } + let mut long_ids = vec![]; + for _ in 0..canisters_with_long_executions { + let canister_id = test.create_canister(); + long_ids.push(canister_id); + } + + let mut max_ap = i64::MIN; + let mut min_ap = i64::MAX; + let mut inv = 0; + for _r in 1..=rounds { + test.execute_round(ExecutionRoundType::OrdinaryRound); + + for id in no_cycles_ids.iter().take(active_canisters_with_no_cycles) { + test.send_ingress(*id, ingress(short_execution_instructions)); + } + for id in cycles_ids.iter().take(active_canisters_with_cycles) { + test.send_ingress(*id, ingress(short_execution_instructions)); + } + + for id in long_ids.iter().take(active_canisters_with_long_executions) { + test.send_ingress(*id, ingress(long_execution_instructions)); + } + + for canister in test.state().canisters_iter() { + min_ap = min_ap.min(canister.scheduler_state.accumulated_priority.get()); + max_ap = max_ap.max(canister.scheduler_state.accumulated_priority.get()); + inv += canister.scheduler_state.accumulated_priority.get(); + } + assert_eq!(inv, 0); + // Allow up to 20x divergence for long executions (40B/2B = 20). + assert!( + min_ap / (multiplier as i64) > -100 * 20 * active_canisters_with_long_executions as i64, + "Error checking min accumulated priority {} > {} (-100% * 20x * long executions)", + min_ap / multiplier as i64, + -100 * 20 * active_canisters_with_long_executions as i64 + ); + assert!( + max_ap / (multiplier as i64) < 100 * 20 * active_canisters_with_long_executions as i64, + "Error checking max accumulated priority {} < {} (100% * 20x * long executions)", + max_ap / multiplier as i64, + 100 * 20 * active_canisters_with_long_executions as i64 + ); + } + test +} + +#[test] +fn scheduler_accumulated_priority_divergence_many_short_executions() { + let scheduler_cores = 4; + + let canisters_with_no_cycles = 0; + let active_canisters_with_no_cycles = 0; + let canisters_with_cycles = scheduler_cores; + let active_canisters_with_cycles = scheduler_cores; + let short_execution_instructions = 13_000_000; + let canisters_with_long_executions = 1; + let active_canisters_with_long_executions = 1; + let long_execution_instructions = 40_000_000_000; + + let multiplier = scheduler_cores + * (canisters_with_no_cycles + canisters_with_cycles + canisters_with_long_executions); + let rounds = multiplier * 100; + + let _test = run_scheduler_test( + rounds, + scheduler_cores, + canisters_with_no_cycles, + active_canisters_with_no_cycles, + canisters_with_cycles, + active_canisters_with_cycles, + canisters_with_long_executions, + active_canisters_with_long_executions, + short_execution_instructions, + long_execution_instructions, + ); +} + +#[test] +fn scheduler_accumulated_priority_divergence_many_long_executions() { + let scheduler_cores = 4; + + let canisters_with_no_cycles = 0; + let active_canisters_with_no_cycles = 0; + let canisters_with_cycles = 1; + let active_canisters_with_cycles = 1; + let short_execution_instructions = 13_000_000; + let canisters_with_long_executions = scheduler_cores + 1; + let active_canisters_with_long_executions = scheduler_cores + 1; + let long_execution_instructions = 40_000_000_000; + + let multiplier = scheduler_cores + * (canisters_with_no_cycles + canisters_with_cycles + canisters_with_long_executions); + let rounds = multiplier * 100; + + let _test = run_scheduler_test( + rounds, + scheduler_cores, + canisters_with_no_cycles, + active_canisters_with_no_cycles, + canisters_with_cycles, + active_canisters_with_cycles, + canisters_with_long_executions, + active_canisters_with_long_executions, + short_execution_instructions, + long_execution_instructions, + ); +} + #[test] fn scheduler_long_execution_progress_across_checkpoints() { let scheduler_cores = 2; @@ -1910,18 +2060,18 @@ fn scheduler_long_execution_progress_across_checkpoints() { test.send_ingress(penalized_long_id, ingress(message_instructions)); test.execute_round(ExecutionRoundType::OrdinaryRound); test.execute_round(ExecutionRoundType::OrdinaryRound); - // Assert the LEM is prioritized. + // Assert the LEM is opportunistic, as the canister was penalized. let penalized = test.state().canister_state(&penalized_long_id).unwrap(); assert_eq!( penalized.scheduler_state.long_execution_mode, - LongExecutionMode::Prioritized + LongExecutionMode::Opportunistic ); // Start a long execution on another non-penalized canister. test.send_ingress(other_long_id, ingress(message_instructions)); test.execute_round(ExecutionRoundType::OrdinaryRound); test.execute_round(ExecutionRoundType::OrdinaryRound); - // Assert the LEM is opportunistic. + // Assert the LEM is opportunistic, as there is not enough long execution cores. let other = test.state().canister_state(&other_long_id).unwrap(); assert_eq!( other.scheduler_state.long_execution_mode, @@ -1938,19 +2088,53 @@ fn scheduler_long_execution_progress_across_checkpoints() { penalized.scheduler_state.accumulated_priority < other.scheduler_state.accumulated_priority ); let penalized_executed_before = penalized.system_state.canister_metrics.executed; + let other_executed_before = other.system_state.canister_metrics.executed; // Send a bunch of messages. for canister_id in &canister_ids { test.send_ingress(*canister_id, ingress(slice_instructions)); } - // Assert that after the checkpoint the penalized canister continues its long execution. + // Assert that after the checkpoint the other canister continues its long execution. + test.execute_round(ExecutionRoundType::OrdinaryRound); + let penalized = test.state().canister_state(&penalized_long_id).unwrap(); + let other = test.state().canister_state(&other_long_id).unwrap(); + assert_eq!( + penalized_executed_before, + penalized.system_state.canister_metrics.executed + ); + assert_eq!( + penalized.scheduler_state.long_execution_mode, + LongExecutionMode::Opportunistic + ); + assert_eq!( + other_executed_before, + other.system_state.canister_metrics.executed + ); + assert_eq!( + other.scheduler_state.long_execution_mode, + LongExecutionMode::Opportunistic + ); + test.execute_round(ExecutionRoundType::OrdinaryRound); let penalized = test.state().canister_state(&penalized_long_id).unwrap(); + let other = test.state().canister_state(&other_long_id).unwrap(); assert_eq!( penalized_executed_before + 1, penalized.system_state.canister_metrics.executed ); + assert_eq!( + penalized.scheduler_state.long_execution_mode, + LongExecutionMode::Opportunistic + ); + assert_eq!( + other_executed_before + 1, + other.system_state.canister_metrics.executed + ); + assert_eq!( + other.scheduler_state.long_execution_mode, + LongExecutionMode::Prioritized + ); } #[test] @@ -6085,7 +6269,6 @@ fn inner_round_first_execution_is_not_a_full_execution() { test.execute_round(ExecutionRoundType::OrdinaryRound); let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; for canister in test.state().canisters_iter() { let system_state = &canister.system_state; let scheduler_state = &canister.scheduler_state; @@ -6101,10 +6284,9 @@ fn inner_round_first_execution_is_not_a_full_execution() { assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); } total_accumulated_priority += scheduler_state.accumulated_priority.get(); - total_priority_credit += scheduler_state.priority_credit.get(); } // The accumulated priority invariant should be respected. - assert_eq!(total_accumulated_priority - total_priority_credit, 0); + assert_eq!(total_accumulated_priority, 0); } #[test] @@ -6143,7 +6325,6 @@ fn inner_round_long_execution_is_a_full_execution() { test.execute_round(ExecutionRoundType::OrdinaryRound); let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; for canister in test.state().canisters_iter() { let system_state = &canister.system_state; let scheduler_state = &canister.scheduler_state; @@ -6159,10 +6340,9 @@ fn inner_round_long_execution_is_a_full_execution() { // despite still having messages, executed a full slice of instructions. assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); total_accumulated_priority += scheduler_state.accumulated_priority.get(); - total_priority_credit += scheduler_state.priority_credit.get(); } // The accumulated priority invariant should be respected. - assert_eq!(total_accumulated_priority - total_priority_credit, 0); + assert_eq!(total_accumulated_priority, 0); } #[test_strategy::proptest(ProptestConfig { cases: 8, ..ProptestConfig::default() })] @@ -6200,7 +6380,6 @@ fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: test.execute_round(ExecutionRoundType::OrdinaryRound); let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; for (i, canister) in test.state().canisters_iter().enumerate() { if i < num_canisters as usize / 2 { // The first half of the canisters should finish their messages. @@ -6217,9 +6396,8 @@ fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: prop_assert_eq!(canister.scheduler_state.last_full_execution_round, 0.into()); } total_accumulated_priority += canister.scheduler_state.accumulated_priority.get(); - total_priority_credit += canister.scheduler_state.priority_credit.get(); } - prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0); + prop_assert_eq!(total_accumulated_priority, 0); // Send one more message for first half of the canisters. for (i, canister) in canister_ids.iter().enumerate() { @@ -6231,7 +6409,6 @@ fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: test.execute_round(ExecutionRoundType::OrdinaryRound); let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; for (i, canister) in test.state().canisters_iter().enumerate() { // Now all the canisters should be executed once. prop_assert_eq!(canister.system_state.canister_metrics.executed, 1); @@ -6253,9 +6430,8 @@ fn charge_canisters_for_full_execution(#[strategy(2..10_usize)] scheduler_cores: ); } total_accumulated_priority += canister.scheduler_state.accumulated_priority.get(); - total_priority_credit += canister.scheduler_state.priority_credit.get(); } - prop_assert_eq!(total_accumulated_priority - total_priority_credit, 0); + prop_assert_eq!(total_accumulated_priority, 0); } #[test] @@ -6298,7 +6474,6 @@ fn charge_idle_canisters_for_full_execution_round() { test.execute_round(ExecutionRoundType::OrdinaryRound); let mut total_accumulated_priority = 0; - let mut total_priority_credit = 0; for canister in test.state().canisters_iter() { let scheduler_state = &canister.scheduler_state; // Assert that we punished all idle canisters, not just top `scheduler_cores`. @@ -6307,14 +6482,13 @@ fn charge_idle_canisters_for_full_execution_round() { assert_eq!(scheduler_state.last_full_execution_round, test.last_round()); } // Assert there is no divergency in accumulated priorities. - let priority = scheduler_state.accumulated_priority - scheduler_state.priority_credit; + let priority = scheduler_state.accumulated_priority; assert!(priority.get() <= 100 * multiplier as i64); assert!(priority.get() >= -100 * multiplier as i64); total_accumulated_priority += scheduler_state.accumulated_priority.get(); - total_priority_credit += scheduler_state.priority_credit.get(); } // The accumulated priority invariant should be respected. - assert_eq!(total_accumulated_priority - total_priority_credit, 0); + assert_eq!(total_accumulated_priority, 0); } } diff --git a/rs/protobuf/def/state/canister_state_bits/v1/canister_state_bits.proto b/rs/protobuf/def/state/canister_state_bits/v1/canister_state_bits.proto index ebe064a15f9..75b7be0b941 100644 --- a/rs/protobuf/def/state/canister_state_bits/v1/canister_state_bits.proto +++ b/rs/protobuf/def/state/canister_state_bits/v1/canister_state_bits.proto @@ -436,7 +436,7 @@ message CanisterStateBits { // Captures the memory usage of all snapshots associated with a canister. uint64 snapshots_memory_usage = 52; reserved 47; - int64 priority_credit = 48; + reserved 48; LongExecutionMode long_execution_mode = 49; optional uint64 wasm_memory_threshold = 50; optional OnLowWasmMemoryHookStatus on_low_wasm_memory_hook_status = 53; diff --git a/rs/protobuf/src/gen/state/state.canister_state_bits.v1.rs b/rs/protobuf/src/gen/state/state.canister_state_bits.v1.rs index 270392b4ffd..b532ab4fa65 100644 --- a/rs/protobuf/src/gen/state/state.canister_state_bits.v1.rs +++ b/rs/protobuf/src/gen/state/state.canister_state_bits.v1.rs @@ -642,8 +642,6 @@ pub struct CanisterStateBits { /// Captures the memory usage of all snapshots associated with a canister. #[prost(uint64, tag = "52")] pub snapshots_memory_usage: u64, - #[prost(int64, tag = "48")] - pub priority_credit: i64, #[prost(enumeration = "LongExecutionMode", tag = "49")] pub long_execution_mode: i32, #[prost(uint64, optional, tag = "50")] diff --git a/rs/replicated_state/src/canister_state.rs b/rs/replicated_state/src/canister_state.rs index 6004697535f..f549f9da9fa 100644 --- a/rs/replicated_state/src/canister_state.rs +++ b/rs/replicated_state/src/canister_state.rs @@ -48,14 +48,6 @@ pub struct SchedulerState { /// in the vector d that corresponds to this canister. pub accumulated_priority: AccumulatedPriority, - /// Keeps the current priority credit of this Canister, accumulated during the - /// long execution. - /// - /// During the long execution, the Canister is temporarily credited with priority - /// to slightly boost the long execution priority. Only when the long execution - /// is done, then the `accumulated_priority` is decreased by the `priority_credit`. - pub priority_credit: AccumulatedPriority, - /// Long execution mode: Opportunistic (default) or Prioritized pub long_execution_mode: LongExecutionMode, @@ -91,7 +83,6 @@ impl Default for SchedulerState { last_full_execution_round: 0.into(), compute_allocation: ComputeAllocation::default(), accumulated_priority: AccumulatedPriority::default(), - priority_credit: AccumulatedPriority::default(), long_execution_mode: LongExecutionMode::default(), heap_delta_debit: 0.into(), install_code_debit: 0.into(), diff --git a/rs/state_layout/src/state_layout.rs b/rs/state_layout/src/state_layout.rs index 029082110bc..a3bfc899c28 100644 --- a/rs/state_layout/src/state_layout.rs +++ b/rs/state_layout/src/state_layout.rs @@ -146,7 +146,6 @@ pub struct CanisterStateBits { pub call_context_manager: Option, pub compute_allocation: ComputeAllocation, pub accumulated_priority: AccumulatedPriority, - pub priority_credit: AccumulatedPriority, pub long_execution_mode: LongExecutionMode, pub execution_state_bits: Option, pub memory_allocation: MemoryAllocation, @@ -2310,7 +2309,6 @@ impl From for pb_canister_state_bits::CanisterStateBits { call_context_manager: item.call_context_manager.as_ref().map(|v| v.into()), compute_allocation: item.compute_allocation.as_percent(), accumulated_priority: item.accumulated_priority.get(), - priority_credit: item.priority_credit.get(), long_execution_mode: pb_canister_state_bits::LongExecutionMode::from( item.long_execution_mode, ) @@ -2451,7 +2449,6 @@ impl TryFrom for CanisterStateBits { }, )?, accumulated_priority: value.accumulated_priority.into(), - priority_credit: value.priority_credit.into(), long_execution_mode: pb_canister_state_bits::LongExecutionMode::try_from( value.long_execution_mode, ) diff --git a/rs/state_layout/src/state_layout/tests.rs b/rs/state_layout/src/state_layout/tests.rs index e3f117fd597..f68f4888b2b 100644 --- a/rs/state_layout/src/state_layout/tests.rs +++ b/rs/state_layout/src/state_layout/tests.rs @@ -27,7 +27,6 @@ fn default_canister_state_bits() -> CanisterStateBits { call_context_manager: None, compute_allocation: ComputeAllocation::try_from(0).unwrap(), accumulated_priority: AccumulatedPriority::default(), - priority_credit: AccumulatedPriority::default(), long_execution_mode: LongExecutionMode::default(), execution_state_bits: None, memory_allocation: MemoryAllocation::default(), diff --git a/rs/state_manager/src/checkpoint.rs b/rs/state_manager/src/checkpoint.rs index ba76d6f4130..e766690efd1 100644 --- a/rs/state_manager/src/checkpoint.rs +++ b/rs/state_manager/src/checkpoint.rs @@ -511,7 +511,6 @@ pub fn load_canister_state( last_full_execution_round: canister_state_bits.last_full_execution_round, compute_allocation: canister_state_bits.compute_allocation, accumulated_priority: canister_state_bits.accumulated_priority, - priority_credit: canister_state_bits.priority_credit, long_execution_mode: canister_state_bits.long_execution_mode, heap_delta_debit: canister_state_bits.heap_delta_debit, install_code_debit: canister_state_bits.install_code_debit, diff --git a/rs/state_manager/src/tip.rs b/rs/state_manager/src/tip.rs index 0ae8a522a1b..df687790a58 100644 --- a/rs/state_manager/src/tip.rs +++ b/rs/state_manager/src/tip.rs @@ -1010,7 +1010,6 @@ fn serialize_canister_to_tip( last_full_execution_round: canister_state.scheduler_state.last_full_execution_round, call_context_manager: canister_state.system_state.call_context_manager().cloned(), compute_allocation: canister_state.scheduler_state.compute_allocation, - priority_credit: canister_state.scheduler_state.priority_credit, long_execution_mode: canister_state.scheduler_state.long_execution_mode, accumulated_priority: canister_state.scheduler_state.accumulated_priority, memory_allocation: canister_state.system_state.memory_allocation, diff --git a/rs/types/types/src/lib.rs b/rs/types/types/src/lib.rs index 2f5f8951ac0..459ae91e3a8 100644 --- a/rs/types/types/src/lib.rs +++ b/rs/types/types/src/lib.rs @@ -28,40 +28,23 @@ // Note [Scheduler and AccumulatedPriority] // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // -// Public Specification of IC describes `compute_allocation`. Each canister is -// initiated with an `accumulated_priority` of 0. The scheduler uses these values -// while calculating the priority of a canister at each round. The canisters -// are scheduled at each round in the following way: +// Public ICP Specification describes `compute_allocation`. Each canister is +// initialized with an `accumulated_priority` of `compute_allocation`. +// The scheduler uses these values to calculate the priority of a canister +// per round. Canisters are scheduled per round in the following way: // -// * For each canister, we compute the `round_priority` of that canister as the -// sum of its `accumulated_priority` and the multiplication of its -// `compute_allocation` with the `multiplier` (see the scheduler). -// * We distribute the free capacity equally to all the canisters. -// * For new executions: -// - We sort the canisters according to their round priorities in -// descending order. -// * For pending long executions: -// - Sort the canisters first according to their execution mode, -// and then round priorities. -// - Calculate how many scheduler cores we dedicate for long executions -// in this round using compute allocations of these long executions. -// - The first `long_execution_cores` many canisters are given the top -// priority in this round and get into the prioritized long execution mode. -// - The rest of the long executions are given an opportunity to be executed -// by scheduling them at the very end. -// * The first `scheduler_cores` many canisters are given the top priority in -// this round. Therefore, they are expected to be executed as the first of -// their threads. -// * As the last step, we credit the first `scheduler_cores` canisters -// with the sum of compute allocations of all canisters times `multiplier` -// divided by the number of canisters that are given top priority in -// this round. This `priority_credit` will be subtracted from the -// `accumulated_priority` at the end of the execution or at the checkpoint. +// * Canisters are sorted by execution mode and then by accumulated priority in +// descending order. +// * The number of scheduler cores dedicated to long executions per round +// is calculated based on the compute allocations of all long executions. +// * The first `long_execution_cores` canisters are assigned the highest priority +// per round and enter prioritized long execution mode. +// * The first `scheduler_cores` canisters are assigned the highest priority per round. +// Therefore, they are expected to be executed first among their threads. +// * At the end of each round, every fully executed canister is charged. +// * The total priority charged is evenly distributed across all canisters. // -// As a result, at each round, the sum of accumulated priorities minus -// the sum of priority credits remains 0. -// Similarly, the sum of all round priorities equals to the multiplication of -// the sum of all compute allocations with the multiplier. +// As a result, the sum of accumulated priorities remains 0 at the end of each round. pub mod artifact; pub mod batch;