Skip to content

Commit

Permalink
Simplify CompleteJobs (#1493)
Browse files Browse the repository at this point in the history
We never actually use the "jobs before `last_flush` count as completed"
property of `CompleteJobs`.

From discussions in chat, this may have been a legacy from when we sent
literally every previous job in the dependency list.
Now that flushes act as a full dependency barrier, it shouldn't be
possible to depend on anything before a flush.

This PR removes special-casing of `last_flush`; instead it becomes first
among equals in `completed: Vec<JobId>`. Most of the changes are to edit
test cases that violate the "flushes are a full barrier" property.
  • Loading branch information
mkeeter authored Oct 11, 2024
1 parent 95e6424 commit 158e3fd
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 67 deletions.
29 changes: 11 additions & 18 deletions downstairs/src/complete_jobs.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
use crate::JobId;

/// Stores a flush operation and a set of complete jobs
///
/// The flush operation is not included in the list of complete jobs, but acts
/// as a lower bound for dependencies; any job older than the flush is assumed
/// to have completed.
/// Stores a set of complete jobs
#[derive(Debug)]
pub struct CompletedJobs {
last_flush: Option<JobId>,
completed: Vec<JobId>,
}

impl CompletedJobs {
pub fn new(last_flush: Option<JobId>) -> Self {
Self {
last_flush,
completed: vec![],
completed: last_flush.into_iter().collect(),
}
}

#[cfg(test)]
pub fn is_empty(&self) -> bool {
self.completed.is_empty()
}
Expand All @@ -28,28 +23,26 @@ impl CompletedJobs {
self.completed.push(id);
}

/// Resets the data structure given a new barrier operation
/// Resets the data structure, given a new barrier operation
///
/// All older jobs are forgotten, and the provided operation becomes the
/// oldest complete job.
pub fn reset(&mut self, id: JobId) {
self.last_flush = Some(id);
self.completed.clear();
self.completed.push(id);
}

/// Checks whether the given job is complete
///
/// A job is complete if it precedes our last barrier operation or is listed
/// in the set of complete jobs.
/// A job is complete if it is listed in the set of complete jobs.
pub fn is_complete(&self, id: JobId) -> bool {
// We deliberately reverse the `completed` list because new jobs are at
// the back and are more likely to be what we care about
self.last_flush.map(|b| id <= b).unwrap_or(false)
|| self.completed.iter().rev().any(|j| *j == id)
self.completed.iter().rev().any(|j| *j == id)
}

/// Returns the list of completed jobs
pub fn completed(&self) -> &[JobId] {
&self.completed
}

pub fn last_flush(&self) -> Option<JobId> {
self.last_flush
}
}
70 changes: 21 additions & 49 deletions downstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,6 @@ pub fn show_work(ds: &mut Downstairs) {
}

info!(ds.log, "Completed work {:?}", work.completed());
info!(ds.log, "Last flush: {:?}", work.last_flush());
}
}

Expand Down Expand Up @@ -3263,10 +3262,6 @@ impl Work {
self.completed.completed()
}

fn last_flush(&self) -> Option<JobId> {
self.completed.last_flush()
}

fn jobs(&self) -> usize {
self.dep_wait.len()
}
Expand Down Expand Up @@ -3673,17 +3668,13 @@ mod test {
}

fn complete(work: &mut Work, ds_id: JobId, job: IOop) {
let is_flush = {
// validate that deps are done
let dep_list = job.deps();
for &dep in dep_list {
assert!(work.completed.is_complete(dep));
}

matches!(job, IOop::Flush { .. })
};
// validate that deps are done
assert!(job
.deps()
.iter()
.all(|dep| work.completed.is_complete(*dep)));

if is_flush {
if matches!(job, IOop::Flush { .. }) {
work.completed.reset(ds_id);
} else {
work.completed.push(ds_id);
Expand Down Expand Up @@ -4138,14 +4129,14 @@ mod test {
let writes = create_generic_test_write(eid);

let rio = IOop::Write {
dependencies: vec![JobId(1000), JobId(1001)],
dependencies: vec![JobId(1001)],
writes,
};
ds.active_mut(conn_id).add_work(JobId(1002), rio);

// Now close the extent
let rio = IOop::ExtentClose {
dependencies: vec![JobId(1000), JobId(1001), JobId(1002)],
dependencies: vec![JobId(1001), JobId(1002)],
extent: eid,
};
ds.active_mut(conn_id).add_work(JobId(1003), rio);
Expand Down Expand Up @@ -4770,24 +4761,21 @@ mod test {

test_do_work(&mut work, next_jobs);

assert_eq!(work.last_flush(), Some(JobId(1000)));
assert!(work.completed.is_empty());
assert_eq!(work.completed(), vec![JobId(1000)]);
let next_jobs = test_push_next_jobs(&mut work);
assert_eq!(to_job_ids(&next_jobs), vec![JobId(1001)]);
assert_eq!(work.new_work(), vec![JobId(1002)]);

test_do_work(&mut work, next_jobs);

assert_eq!(work.last_flush(), Some(JobId(1000)));
assert_eq!(work.completed(), [JobId(1001)]);
assert_eq!(work.completed(), vec![JobId(1000), JobId(1001)]);
let next_jobs = test_push_next_jobs(&mut work);
assert_eq!(to_job_ids(&next_jobs), vec![JobId(1002)]);
assert!(work.new_work().is_empty());

test_do_work(&mut work, next_jobs);

assert_eq!(work.last_flush(), Some(JobId(1000)));
assert_eq!(work.completed(), [JobId(1001), JobId(1002)]);
assert_eq!(work.completed(), [JobId(1000), JobId(1001), JobId(1002)]);
}

#[test]
Expand All @@ -4797,12 +4785,7 @@ mod test {
// Add three jobs all blocked on each other in a chain, second is flush
add_work(&mut work, JobId(1000), vec![], false);
add_work(&mut work, JobId(1001), vec![JobId(1000)], true);
add_work(
&mut work,
JobId(1002),
vec![JobId(1000), JobId(1001)],
false,
);
add_work(&mut work, JobId(1002), vec![JobId(1001)], false);

// new_work returns all new or dep wait jobs
assert_eq!(
Expand All @@ -4826,16 +4809,14 @@ mod test {

test_do_work(&mut work, next_jobs);

assert_eq!(work.last_flush(), Some(JobId(1001)));
assert!(work.completed.is_empty());
assert_eq!(work.completed(), vec![JobId(1001)]);
let next_jobs = test_push_next_jobs(&mut work);
assert_eq!(to_job_ids(&next_jobs), vec![JobId(1002)]);
assert!(work.new_work().is_empty());

test_do_work(&mut work, next_jobs);

assert_eq!(work.last_flush(), Some(JobId(1001)));
assert_eq!(work.completed(), [JobId(1002)]);
assert_eq!(work.completed(), [JobId(1001), JobId(1002)]);
}

#[test]
Expand All @@ -4860,21 +4841,15 @@ mod test {
assert_eq!(to_job_ids(&next_jobs), vec![JobId(1002)]);
test_do_work(&mut work, next_jobs);

assert_eq!(work.last_flush(), Some(JobId(1002)));
assert!(work.completed.is_empty());
assert_eq!(work.completed(), vec![JobId(1002)]);

// Upstairs sends a job with these three in deps, not knowing Downstairs
// Upstairs sends a job with the flush in deps, not knowing Downstairs
// has done the jobs already
add_work(
&mut work,
JobId(1003),
vec![JobId(1000), JobId(1001), JobId(1002)],
false,
);
add_work(&mut work, JobId(1003), vec![JobId(1002)], false);
add_work(
&mut work,
JobId(1004),
vec![JobId(1000), JobId(1001), JobId(1002), JobId(1003)],
vec![JobId(1002), JobId(1003)],
false,
);

Expand All @@ -4886,8 +4861,7 @@ mod test {
assert_eq!(to_job_ids(&next_jobs), vec![JobId(1004)]);
test_do_work(&mut work, next_jobs);

assert_eq!(work.last_flush(), Some(JobId(1002)));
assert_eq!(work.completed(), [JobId(1003), JobId(1004)]);
assert_eq!(work.completed(), [JobId(1002), JobId(1003), JobId(1004)]);
}

#[test]
Expand All @@ -4914,8 +4888,7 @@ mod test {
assert_eq!(to_job_ids(&next_jobs), vec![JobId(1002)]);
test_do_work(&mut work, next_jobs);

assert_eq!(work.last_flush(), Some(JobId(1002)));
assert!(work.completed.is_empty());
assert_eq!(work.completed(), vec![JobId(1002)]);

assert_eq!(work.new_work(), vec![JobId(1003)]);
}
Expand Down Expand Up @@ -4957,8 +4930,7 @@ mod test {
assert_eq!(to_job_ids(&next_jobs), vec![JobId(1002), JobId(2002)]);
test_do_work(&mut work, next_jobs);

assert_eq!(work.last_flush(), Some(JobId(2002)));
assert!(work.completed.is_empty());
assert_eq!(work.completed(), vec![JobId(2002)]);
}

#[test]
Expand Down

0 comments on commit 158e3fd

Please sign in to comment.