Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Causal replay #156

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/runtime/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,12 +679,17 @@ impl ExecutionState {

let is_yielding = std::mem::replace(&mut self.has_yielded, false);

let runnable_tasks = runnable
.iter()
.map(|id| self.tasks.get(id.0).unwrap())
.collect::<SmallVec<[&Task; DEFAULT_INLINE_TASKS]>>();
self.next_task = self
.scheduler
.borrow_mut()
.next_task(&runnable, self.current_task.id(), is_yielding)
.next_task(&runnable_tasks, self.current_task.id(), is_yielding)
.map(ScheduledTask::Some)
.unwrap_or(ScheduledTask::Stopped);
drop(runnable_tasks);

// If the task chosen by the scheduler is blocked, then it should be one that can be
// spuriously woken up, and we need to unblock it here so that it can execute.
Expand Down
4 changes: 2 additions & 2 deletions src/runtime/runner.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::runtime::execution::Execution;
use crate::runtime::task::TaskId;
use crate::runtime::task::{Task, TaskId};
use crate::runtime::thread::continuation::{ContinuationPool, CONTINUATION_POOL};
use crate::scheduler::metrics::MetricsScheduler;
use crate::scheduler::{Schedule, Scheduler};
Expand Down Expand Up @@ -237,7 +237,7 @@ impl<S: Scheduler> Scheduler for PortfolioStoppableScheduler<S> {

fn next_task(
&mut self,
runnable_tasks: &[TaskId],
runnable_tasks: &[&Task],
current_task: Option<TaskId>,
is_yielding: bool,
) -> Option<TaskId> {
Expand Down
59 changes: 34 additions & 25 deletions src/runtime/task/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ impl VectorClock {
}
}

#[cfg(test)]
pub(crate) fn new_from(v: &[u32]) -> Self {
Self {
time: SmallVec::from(v),
}
}

// Zero extend clock to accommodate `task_id` tasks.
pub(crate) fn extend(&mut self, task_id: TaskId) {
let num_new_tasks = 1 + task_id.0 - self.time.len();
Expand Down Expand Up @@ -50,6 +43,22 @@ impl VectorClock {
}
}

impl<const N: usize> From<&[u32; N]> for VectorClock {
fn from(v: &[u32; N]) -> Self {
Self {
time: SmallVec::from(&v[..]),
}
}
}

impl From<&[u32]> for VectorClock {
fn from(v: &[u32]) -> Self {
Self {
time: SmallVec::from(v),
}
}
}

impl std::ops::Deref for VectorClock {
type Target = [u32];
fn deref(&self) -> &Self::Target {
Expand Down Expand Up @@ -88,44 +97,44 @@ mod test {

#[test]
fn vector_clock() {
let v1 = VectorClock::new_from(&[1, 2, 3, 4]);
let v2 = VectorClock::new_from(&[1, 2, 4, 5]);
let v3 = VectorClock::new_from(&[1, 2, 3, 1]);
let v4 = VectorClock::new_from(&[1, 2, 4, 1]);
let v5 = VectorClock::new_from(&[1, 2, 3, 4]);
let v1 = VectorClock::from(&[1, 2, 3, 4]);
let v2 = VectorClock::from(&[1, 2, 4, 5]);
let v3 = VectorClock::from(&[1, 2, 3, 1]);
let v4 = VectorClock::from(&[1, 2, 4, 1]);
let v5 = VectorClock::from(&[1, 2, 3, 4]);
assert!(v1 < v2 && v1 > v3 && v1 == v5);
assert!(v2 > v3 && v2 > v4);
assert!(v3 < v4);
assert_eq!(v1.partial_cmp(&v4), None);

let v1 = VectorClock::new_from(&[1, 2, 3, 4]);
let v2 = VectorClock::new_from(&[1, 2, 2]);
let v3 = VectorClock::new_from(&[1, 2, 3]);
let v4 = VectorClock::new_from(&[1, 2, 4]);
let v1 = VectorClock::from(&[1, 2, 3, 4]);
let v2 = VectorClock::from(&[1, 2, 2]);
let v3 = VectorClock::from(&[1, 2, 3]);
let v4 = VectorClock::from(&[1, 2, 4]);
assert!(v1 > v2);
assert!(v1 > v3);
assert_eq!(v1.partial_cmp(&v4), None);

let v1 = VectorClock::new_from(&[]);
let v2 = VectorClock::new_from(&[1]);
let v1 = VectorClock::from(&[]);
let v2 = VectorClock::from(&[1]);
assert!(v1 < v2);

let v1 = VectorClock::new_from(&[1, 2, 1]);
let v2 = VectorClock::new_from(&[1, 3]);
let v3 = VectorClock::new_from(&[1, 1, 1, 2]);
let v4 = VectorClock::new_from(&[1, 1, 2]);
let v1 = VectorClock::from(&[1, 2, 1]);
let v2 = VectorClock::from(&[1, 3]);
let v3 = VectorClock::from(&[1, 1, 1, 2]);
let v4 = VectorClock::from(&[1, 1, 2]);

let mut v = v1.clone();
v.update(&v2);
assert_eq!(v, VectorClock::new_from(&[1, 3, 1]));
assert_eq!(v, VectorClock::from(&[1, 3, 1]));

let mut v = v1.clone();
v.update(&v3);
assert_eq!(v, VectorClock::new_from(&[1, 2, 1, 2]));
assert_eq!(v, VectorClock::from(&[1, 2, 1, 2]));

let mut v = v1.clone();
v.update(&v4);
assert_eq!(v, VectorClock::new_from(&[1, 2, 2]));
assert_eq!(v, VectorClock::from(&[1, 2, 2]));

let mut v = v1.clone();
v.update(&VectorClock::new());
Expand Down
5 changes: 3 additions & 2 deletions src/runtime/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ where
/// A `Task` represents a user-level unit of concurrency. Each task has an `id` that is unique within
/// the execution, and a `state` reflecting whether the task is runnable (enabled) or not.
#[derive(Debug)]
pub(crate) struct Task {
pub struct Task {
pub(super) id: TaskId,
pub(super) state: TaskState,
pub(super) detached: bool,
Expand Down Expand Up @@ -312,7 +312,8 @@ impl Task {
)
}

pub(crate) fn id(&self) -> TaskId {
/// Returns the identifier of this task.
pub fn id(&self) -> TaskId {
self.id
}

Expand Down
14 changes: 7 additions & 7 deletions src/scheduler/dfs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::task::TaskId;
use crate::runtime::task::{Task, TaskId};
use crate::scheduler::data::fixed::FixedDataSource;
use crate::scheduler::data::DataSource;
use crate::scheduler::{Schedule, Scheduler};
Expand Down Expand Up @@ -66,13 +66,13 @@ impl Scheduler for DfsScheduler {

// TODO should we respect `is_yielding` by not allowing `current` to be scheduled next? That
// TODO would be unsound but perhaps useful for validating some code
fn next_task(&mut self, runnable: &[TaskId], _current: Option<TaskId>, _is_yielding: bool) -> Option<TaskId> {
fn next_task(&mut self, runnable: &[&Task], _current: Option<TaskId>, _is_yielding: bool) -> Option<TaskId> {
let next = if self.steps >= self.levels.len() {
// First time we've reached this level
assert_eq!(self.steps, self.levels.len());
let to_run = runnable.first().unwrap();
self.levels.push((*to_run, runnable.len() == 1));
*to_run
let to_run = runnable.first().unwrap().id();
self.levels.push((to_run, runnable.len() == 1));
to_run
} else {
let (last_choice, was_last) = self.levels[self.steps];
if self.has_more_choices(self.steps + 1) {
Expand All @@ -84,8 +84,8 @@ impl Scheduler for DfsScheduler {
!was_last,
"if we are making a change, there should be another available option"
);
let next_idx = runnable.iter().position(|tid| *tid == last_choice).unwrap() + 1;
let next = runnable[next_idx];
let next_idx = runnable.iter().position(|t| t.id() == last_choice).unwrap() + 1;
let next = runnable[next_idx].id();
self.levels.drain(self.steps..);
self.levels.push((next, next_idx == runnable.len() - 1));
next
Expand Down
6 changes: 3 additions & 3 deletions src/scheduler/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::task::TaskId;
use crate::runtime::task::{Task, TaskId};
use crate::scheduler::{Schedule, Scheduler};
use tracing::info;

Expand Down Expand Up @@ -85,7 +85,7 @@ impl<S: Scheduler> Scheduler for MetricsScheduler<S> {

fn next_task(
&mut self,
runnable_tasks: &[TaskId],
runnable_tasks: &[&Task],
current_task: Option<TaskId>,
is_yielding: bool,
) -> Option<TaskId> {
Expand All @@ -94,7 +94,7 @@ impl<S: Scheduler> Scheduler for MetricsScheduler<S> {
self.steps += 1;
if choice != self.last_task {
self.context_switches += 1;
if runnable_tasks.contains(&self.last_task) {
if runnable_tasks.iter().any(|t| t.id() == self.last_task) {
self.preemptions += 1;
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ mod uncontrolled_nondeterminism;
pub(crate) mod metrics;
pub(crate) mod serialization;

pub use crate::runtime::task::TaskId;
pub use crate::runtime::task::{Task, TaskId};

pub use data::{DataSource, RandomDataSource};
pub use dfs::DfsScheduler;
Expand Down Expand Up @@ -101,7 +101,7 @@ pub trait Scheduler {
/// execution has not yet begun.
fn next_task(
&mut self,
runnable_tasks: &[TaskId],
runnable_tasks: &[&Task],
current_task: Option<TaskId>,
is_yielding: bool,
) -> Option<TaskId>;
Expand All @@ -117,7 +117,7 @@ impl Scheduler for Box<dyn Scheduler + Send> {

fn next_task(
&mut self,
runnable_tasks: &[TaskId],
runnable_tasks: &[&Task],
current_task: Option<TaskId>,
is_yielding: bool,
) -> Option<TaskId> {
Expand Down
13 changes: 7 additions & 6 deletions src/scheduler/pct.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::task::{TaskId, DEFAULT_INLINE_TASKS};
use crate::runtime::task::{Task, TaskId, DEFAULT_INLINE_TASKS};
use crate::scheduler::data::random::RandomDataSource;
use crate::scheduler::data::DataSource;
use crate::scheduler::{Schedule, Scheduler};
Expand Down Expand Up @@ -105,11 +105,11 @@ impl Scheduler for PctScheduler {
Some(Schedule::new(self.data_source.reinitialize()))
}

fn next_task(&mut self, runnable: &[TaskId], current: Option<TaskId>, is_yielding: bool) -> Option<TaskId> {
fn next_task(&mut self, runnable: &[&Task], current: Option<TaskId>, is_yielding: bool) -> Option<TaskId> {
// If any new tasks were created, assign them priorities by randomly swapping them with an
// existing task's priority, so we maintain the invariant that every priority is distinct
let max_known_task = self.priorities.len();
let max_new_task = usize::from(*runnable.iter().max().unwrap());
let max_new_task = usize::from(runnable.iter().map(|t| t.id()).max().unwrap());
for new_task_id in max_known_task..1 + max_new_task {
let new_task_id = TaskId::from(new_task_id);
// Make sure there's a chance to give the new task the lowest priority
Expand Down Expand Up @@ -150,10 +150,11 @@ impl Scheduler for PctScheduler {

// Choose the highest-priority (== lowest priority value) runnable task
Some(
*runnable
runnable
.iter()
.min_by_key(|tid| self.priorities.get(tid))
.expect("priority queue invariant"),
.min_by_key(|t| self.priorities.get(&t.id()))
.expect("priority queue invariant")
.id(),
)
}

Expand Down
6 changes: 3 additions & 3 deletions src/scheduler/random.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::runtime::task::TaskId;
use crate::runtime::task::{Task, TaskId};
use crate::scheduler::data::random::RandomDataSource;
use crate::scheduler::data::DataSource;
use crate::scheduler::{Schedule, Scheduler};
Expand Down Expand Up @@ -51,8 +51,8 @@ impl Scheduler for RandomScheduler {
}
}

fn next_task(&mut self, runnable: &[TaskId], _current: Option<TaskId>, _is_yielding: bool) -> Option<TaskId> {
Some(*runnable.choose(&mut self.rng).unwrap())
fn next_task(&mut self, runnable: &[&Task], _current: Option<TaskId>, _is_yielding: bool) -> Option<TaskId> {
Some(runnable.choose(&mut self.rng).unwrap().id())
}

fn next_u64(&mut self) -> u64 {
Expand Down
Loading
Loading