Skip to content

Commit

Permalink
replay scheduler with causal dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
Aurel300 committed Aug 14, 2024
1 parent 4281c33 commit 83cdcef
Show file tree
Hide file tree
Showing 15 changed files with 273 additions and 87 deletions.
7 changes: 6 additions & 1 deletion src/runtime/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,12 +679,17 @@ impl ExecutionState {

let is_yielding = std::mem::replace(&mut self.has_yielded, false);

let runnable_tasks = runnable
.iter()
.map(|id| self.tasks.get(id.0).unwrap())
.collect::<SmallVec<[&Task; DEFAULT_INLINE_TASKS]>>();
self.next_task = self
.scheduler
.borrow_mut()
.next_task(&runnable, self.current_task.id(), is_yielding)
.next_task(&runnable_tasks, self.current_task.id(), is_yielding)
.map(ScheduledTask::Some)
.unwrap_or(ScheduledTask::Stopped);
drop(runnable_tasks);

// If the task chosen by the scheduler is blocked, then it should be one that can be
// spuriously woken up, and we need to unblock it here so that it can execute.
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/runner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::runtime::execution::Execution;
use crate::runtime::task::TaskId;
use crate::runtime::task::{Task, TaskId};
use crate::runtime::thread::continuation::{ContinuationPool, CONTINUATION_POOL};
use crate::scheduler::metrics::MetricsScheduler;
use crate::scheduler::{Schedule, Scheduler};
Expand Down Expand Up @@ -237,7 +237,7 @@ impl<S: Scheduler> Scheduler for PortfolioStoppableScheduler<S> {

fn next_task(
&mut self,
runnable_tasks: &[TaskId],
runnable_tasks: &[&Task],
current_task: Option<TaskId>,
is_yielding: bool,
) -> Option<TaskId> {
Expand Down
59 changes: 34 additions & 25 deletions src/runtime/task/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ impl VectorClock {
}
}

#[cfg(test)]
pub(crate) fn new_from(v: &[u32]) -> Self {
Self {
time: SmallVec::from(v),
}
}

// Zero extend clock to accommodate `task_id` tasks.
pub(crate) fn extend(&mut self, task_id: TaskId) {
let num_new_tasks = 1 + task_id.0 - self.time.len();
Expand Down Expand Up @@ -50,6 +43,22 @@ impl VectorClock {
}
}

impl<const N: usize> From<&[u32; N]> for VectorClock {
fn from(v: &[u32; N]) -> Self {
Self {
time: SmallVec::from(&v[..]),
}
}
}

impl From<&[u32]> for VectorClock {
fn from(v: &[u32]) -> Self {
Self {
time: SmallVec::from(v),
}
}
}

impl std::ops::Deref for VectorClock {
type Target = [u32];
fn deref(&self) -> &Self::Target {
Expand Down Expand Up @@ -88,44 +97,44 @@ mod test {

#[test]
fn vector_clock() {
let v1 = VectorClock::new_from(&[1, 2, 3, 4]);
let v2 = VectorClock::new_from(&[1, 2, 4, 5]);
let v3 = VectorClock::new_from(&[1, 2, 3, 1]);
let v4 = VectorClock::new_from(&[1, 2, 4, 1]);
let v5 = VectorClock::new_from(&[1, 2, 3, 4]);
let v1 = VectorClock::from(&[1, 2, 3, 4]);
let v2 = VectorClock::from(&[1, 2, 4, 5]);
let v3 = VectorClock::from(&[1, 2, 3, 1]);
let v4 = VectorClock::from(&[1, 2, 4, 1]);
let v5 = VectorClock::from(&[1, 2, 3, 4]);
assert!(v1 < v2 && v1 > v3 && v1 == v5);
assert!(v2 > v3 && v2 > v4);
assert!(v3 < v4);
assert_eq!(v1.partial_cmp(&v4), None);

let v1 = VectorClock::new_from(&[1, 2, 3, 4]);
let v2 = VectorClock::new_from(&[1, 2, 2]);
let v3 = VectorClock::new_from(&[1, 2, 3]);
let v4 = VectorClock::new_from(&[1, 2, 4]);
let v1 = VectorClock::from(&[1, 2, 3, 4]);
let v2 = VectorClock::from(&[1, 2, 2]);
let v3 = VectorClock::from(&[1, 2, 3]);
let v4 = VectorClock::from(&[1, 2, 4]);
assert!(v1 > v2);
assert!(v1 > v3);
assert_eq!(v1.partial_cmp(&v4), None);

let v1 = VectorClock::new_from(&[]);
let v2 = VectorClock::new_from(&[1]);
let v1 = VectorClock::from(&[]);
let v2 = VectorClock::from(&[1]);
assert!(v1 < v2);

let v1 = VectorClock::new_from(&[1, 2, 1]);
let v2 = VectorClock::new_from(&[1, 3]);
let v3 = VectorClock::new_from(&[1, 1, 1, 2]);
let v4 = VectorClock::new_from(&[1, 1, 2]);
let v1 = VectorClock::from(&[1, 2, 1]);
let v2 = VectorClock::from(&[1, 3]);
let v3 = VectorClock::from(&[1, 1, 1, 2]);
let v4 = VectorClock::from(&[1, 1, 2]);

let mut v = v1.clone();
v.update(&v2);
assert_eq!(v, VectorClock::new_from(&[1, 3, 1]));
assert_eq!(v, VectorClock::from(&[1, 3, 1]));

let mut v = v1.clone();
v.update(&v3);
assert_eq!(v, VectorClock::new_from(&[1, 2, 1, 2]));
assert_eq!(v, VectorClock::from(&[1, 2, 1, 2]));

let mut v = v1.clone();
v.update(&v4);
assert_eq!(v, VectorClock::new_from(&[1, 2, 2]));
assert_eq!(v, VectorClock::from(&[1, 2, 2]));

let mut v = v1.clone();
v.update(&VectorClock::new());
Expand Down
5 changes: 3 additions & 2 deletions src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ where
/// A `Task` represents a user-level unit of concurrency. Each task has an `id` that is unique within
/// the execution, and a `state` reflecting whether the task is runnable (enabled) or not.
#[derive(Debug)]
pub(crate) struct Task {
pub struct Task {
pub(super) id: TaskId,
pub(super) state: TaskState,
pub(super) detached: bool,
Expand Down Expand Up @@ -312,7 +312,8 @@ impl Task {
)
}

pub(crate) fn id(&self) -> TaskId {
/// Returns the identifier of this task.
pub fn id(&self) -> TaskId {
self.id
}

Expand Down
14 changes: 7 additions & 7 deletions src/scheduler/dfs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::task::TaskId;
use crate::runtime::task::{Task, TaskId};
use crate::scheduler::data::fixed::FixedDataSource;
use crate::scheduler::data::DataSource;
use crate::scheduler::{Schedule, Scheduler};
Expand Down Expand Up @@ -66,13 +66,13 @@ impl Scheduler for DfsScheduler {

// TODO should we respect `is_yielding` by not allowing `current` to be scheduled next? That
// TODO would be unsound but perhaps useful for validating some code
fn next_task(&mut self, runnable: &[TaskId], _current: Option<TaskId>, _is_yielding: bool) -> Option<TaskId> {
fn next_task(&mut self, runnable: &[&Task], _current: Option<TaskId>, _is_yielding: bool) -> Option<TaskId> {
let next = if self.steps >= self.levels.len() {
// First time we've reached this level
assert_eq!(self.steps, self.levels.len());
let to_run = runnable.first().unwrap();
self.levels.push((*to_run, runnable.len() == 1));
*to_run
let to_run = runnable.first().unwrap().id();
self.levels.push((to_run, runnable.len() == 1));
to_run
} else {
let (last_choice, was_last) = self.levels[self.steps];
if self.has_more_choices(self.steps + 1) {
Expand All @@ -84,8 +84,8 @@ impl Scheduler for DfsScheduler {
!was_last,
"if we are making a change, there should be another available option"
);
let next_idx = runnable.iter().position(|tid| *tid == last_choice).unwrap() + 1;
let next = runnable[next_idx];
let next_idx = runnable.iter().position(|t| t.id() == last_choice).unwrap() + 1;
let next = runnable[next_idx].id();
self.levels.drain(self.steps..);
self.levels.push((next, next_idx == runnable.len() - 1));
next
Expand Down
6 changes: 3 additions & 3 deletions src/scheduler/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::task::TaskId;
use crate::runtime::task::{Task, TaskId};
use crate::scheduler::{Schedule, Scheduler};
use tracing::info;

Expand Down Expand Up @@ -85,7 +85,7 @@ impl<S: Scheduler> Scheduler for MetricsScheduler<S> {

fn next_task(
&mut self,
runnable_tasks: &[TaskId],
runnable_tasks: &[&Task],
current_task: Option<TaskId>,
is_yielding: bool,
) -> Option<TaskId> {
Expand All @@ -94,7 +94,7 @@ impl<S: Scheduler> Scheduler for MetricsScheduler<S> {
self.steps += 1;
if choice != self.last_task {
self.context_switches += 1;
if runnable_tasks.contains(&self.last_task) {
if runnable_tasks.iter().any(|t| t.id() == self.last_task) {
self.preemptions += 1;
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod uncontrolled_nondeterminism;
pub(crate) mod metrics;
pub(crate) mod serialization;

pub use crate::runtime::task::TaskId;
pub use crate::runtime::task::{Task, TaskId};

pub use data::{DataSource, RandomDataSource};
pub use dfs::DfsScheduler;
Expand Down Expand Up @@ -101,7 +101,7 @@ pub trait Scheduler {
/// execution has not yet begun.
fn next_task(
&mut self,
runnable_tasks: &[TaskId],
runnable_tasks: &[&Task],
current_task: Option<TaskId>,
is_yielding: bool,
) -> Option<TaskId>;
Expand All @@ -117,7 +117,7 @@ impl Scheduler for Box<dyn Scheduler + Send> {

fn next_task(
&mut self,
runnable_tasks: &[TaskId],
runnable_tasks: &[&Task],
current_task: Option<TaskId>,
is_yielding: bool,
) -> Option<TaskId> {
Expand Down
13 changes: 7 additions & 6 deletions src/scheduler/pct.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::task::{TaskId, DEFAULT_INLINE_TASKS};
use crate::runtime::task::{Task, TaskId, DEFAULT_INLINE_TASKS};
use crate::scheduler::data::random::RandomDataSource;
use crate::scheduler::data::DataSource;
use crate::scheduler::{Schedule, Scheduler};
Expand Down Expand Up @@ -105,11 +105,11 @@ impl Scheduler for PctScheduler {
Some(Schedule::new(self.data_source.reinitialize()))
}

fn next_task(&mut self, runnable: &[TaskId], current: Option<TaskId>, is_yielding: bool) -> Option<TaskId> {
fn next_task(&mut self, runnable: &[&Task], current: Option<TaskId>, is_yielding: bool) -> Option<TaskId> {
// If any new tasks were created, assign them priorities by randomly swapping them with an
// existing task's priority, so we maintain the invariant that every priority is distinct
let max_known_task = self.priorities.len();
let max_new_task = usize::from(*runnable.iter().max().unwrap());
let max_new_task = usize::from(runnable.iter().map(|t| t.id()).max().unwrap());
for new_task_id in max_known_task..1 + max_new_task {
let new_task_id = TaskId::from(new_task_id);
// Make sure there's a chance to give the new task the lowest priority
Expand Down Expand Up @@ -150,10 +150,11 @@ impl Scheduler for PctScheduler {

// Choose the highest-priority (== lowest priority value) runnable task
Some(
*runnable
runnable
.iter()
.min_by_key(|tid| self.priorities.get(tid))
.expect("priority queue invariant"),
.min_by_key(|t| self.priorities.get(&t.id()))
.expect("priority queue invariant")
.id(),
)
}

Expand Down
6 changes: 3 additions & 3 deletions src/scheduler/random.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::task::TaskId;
use crate::runtime::task::{Task, TaskId};
use crate::scheduler::data::random::RandomDataSource;
use crate::scheduler::data::DataSource;
use crate::scheduler::{Schedule, Scheduler};
Expand Down Expand Up @@ -51,8 +51,8 @@ impl Scheduler for RandomScheduler {
}
}

fn next_task(&mut self, runnable: &[TaskId], _current: Option<TaskId>, _is_yielding: bool) -> Option<TaskId> {
Some(*runnable.choose(&mut self.rng).unwrap())
fn next_task(&mut self, runnable: &[&Task], _current: Option<TaskId>, _is_yielding: bool) -> Option<TaskId> {
Some(runnable.choose(&mut self.rng).unwrap().id())
}

fn next_u64(&mut self) -> u64 {
Expand Down
Loading

0 comments on commit 83cdcef

Please sign in to comment.