Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IOop::Barrier #1494

Merged
merged 2 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion downstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ enum IOop {
snapshot_details: Option<SnapshotDetails>,
extent_limit: Option<ExtentId>,
},
Barrier {
dependencies: Vec<JobId>, // Jobs that must finish before this
},
/*
* These operations are for repairing a bad downstairs
*/
Expand Down Expand Up @@ -114,6 +117,7 @@ impl IOop {
match &self {
IOop::Write { dependencies, .. }
| IOop::Flush { dependencies, .. }
| IOop::Barrier { dependencies, .. }
| IOop::Read { dependencies, .. }
| IOop::WriteUnwritten { dependencies, .. }
| IOop::ExtentClose { dependencies, .. }
Expand Down Expand Up @@ -687,6 +691,9 @@ pub fn show_work(ds: &mut Downstairs) {
IOop::Read { dependencies, .. } => ("Read", dependencies),
IOop::Write { dependencies, .. } => ("Write", dependencies),
IOop::Flush { dependencies, .. } => ("Flush", dependencies),
IOop::Barrier { dependencies, .. } => {
("Barrier", dependencies)
}
IOop::WriteUnwritten { dependencies, .. } => {
("WriteU", dependencies)
}
Expand Down Expand Up @@ -721,6 +728,7 @@ pub mod cdt {
fn submit__writeunwritten__start(_: u64) {}
fn submit__write__start(_: u64) {}
fn submit__flush__start(_: u64) {}
fn submit__barrier__start(_: u64) {}
fn submit__el__close__start(_: u64) {}
fn submit__el__flush__close__start(_: u64) {}
fn submit__el__repair__start(_: u64) {}
Expand Down Expand Up @@ -977,6 +985,11 @@ impl ActiveConnection {
session_id,
..
}
| Message::Barrier {
upstairs_id,
session_id,
..
}
| Message::ReadRequest {
upstairs_id,
session_id,
Expand Down Expand Up @@ -1146,6 +1159,25 @@ impl ActiveConnection {
)
.await?
}
Message::Barrier {
job_id,
dependencies,
..
} => {
cdt::submit__barrier__start!(|| job_id.0);

let new_barrier = IOop::Barrier { dependencies };

self.do_work_if_ready(
job_id,
new_barrier,
flags,
reqwest_client,
dss,
region,
)
.await?
}
Message::WriteUnwritten { header, data } => {
cdt::submit__writeunwritten__start!(|| header.job_id.0);
let writes = RegionWrite::new(
Expand Down Expand Up @@ -1719,6 +1751,18 @@ impl ActiveConnection {
result,
}
}
IOop::Barrier { dependencies } => {
debug!(self.log, "Barrier :{job_id} deps:{dependencies:?}",);

// No work is actually done here; the barrier just lets us drop
// older dependencies (because it waits for all of them).
Message::BarrierAck {
upstairs_id: upstairs_connection.upstairs_id,
session_id: upstairs_connection.session_id,
job_id,
result: Ok(()),
}
}
IOop::ExtentClose {
dependencies,
extent,
Expand Down Expand Up @@ -3328,6 +3372,7 @@ impl Work {
IOop::Write { .. } => "Write",
IOop::WriteUnwritten { .. } => "WriteUnwritten",
IOop::Flush { .. } => "Flush",
IOop::Barrier { .. } => "Barrier",
IOop::Read { .. } => "Read",
IOop::ExtentClose { .. } => "ECLose",
IOop::ExtentFlushClose { .. } => "EFlushCLose",
Expand Down Expand Up @@ -3674,7 +3719,10 @@ mod test {
.iter()
.all(|dep| work.completed.is_complete(*dep)));

if matches!(job, IOop::Flush { .. }) {
// Flushes and barriers both guarantee that no future jobs will depend
// on jobs that preceded them, so we reset the completed jobs list to
// only include their job id.
if matches!(job, IOop::Flush { .. } | IOop::Barrier { .. }) {
leftwo marked this conversation as resolved.
Show resolved Hide resolved
work.completed.reset(ds_id);
} else {
work.completed.push(ds_id);
Expand Down
21 changes: 19 additions & 2 deletions protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,9 @@ pub struct SnapshotDetails {
#[repr(u32)]
#[derive(IntoPrimitive)]
pub enum MessageVersion {
/// Add `Barrier` and `BarrierAck`
V12 = 12,

/// Use `ReadBlockContext` instead of `Option<BlockContext>`
V11 = 11,

Expand Down Expand Up @@ -207,7 +210,7 @@ pub enum MessageVersion {
}
impl MessageVersion {
pub const fn current() -> Self {
Self::V11
Self::V12
}
}

Expand All @@ -216,7 +219,7 @@ impl MessageVersion {
* This, along with the MessageVersion enum above should be updated whenever
* changes are made to the Message enum below.
*/
pub const CRUCIBLE_MESSAGE_VERSION: u32 = 11;
pub const CRUCIBLE_MESSAGE_VERSION: u32 = MessageVersion::current() as u32;

/*
* If you add or change the Message enum, you must also increment the
Expand Down Expand Up @@ -506,6 +509,18 @@ pub enum Message {
job_id: JobId,
result: Result<(), CrucibleError>,
},
Barrier {
upstairs_id: Uuid,
session_id: Uuid,
job_id: JobId,
dependencies: Vec<JobId>,
},
BarrierAck {
upstairs_id: Uuid,
session_id: Uuid,
job_id: JobId,
result: Result<(), CrucibleError>,
},

ReadRequest {
upstairs_id: Uuid,
Expand Down Expand Up @@ -621,6 +636,7 @@ impl Message {
| Message::ExtentLiveReopen { .. }
| Message::ExtentLiveNoOp { .. }
| Message::Flush { .. }
| Message::Barrier { .. }
| Message::ReadRequest { .. }
| Message::WriteUnwritten { .. }
| Message::Unknown(..) => None,
Expand All @@ -634,6 +650,7 @@ impl Message {
| Message::ExtentLiveAckId { result, .. }
| Message::WriteAck { result, .. }
| Message::FlushAck { result, .. }
| Message::BarrierAck { result, .. }
| Message::WriteUnwrittenAck { result, .. } => {
result.as_ref().err()
}
Expand Down
1 change: 1 addition & 0 deletions upstairs/src/active_jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ impl ActiveJobs {
// tracker have been recorded.
match &io.work {
IOop::Flush { .. }
| IOop::Barrier { .. }
| IOop::Write { .. }
| IOop::WriteUnwritten { .. }
| IOop::Read { .. }
Expand Down
18 changes: 16 additions & 2 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ impl DownstairsClient {
IOop::Write { dependencies, .. }
| IOop::WriteUnwritten { dependencies, .. }
| IOop::Flush { dependencies, .. }
| IOop::Barrier { dependencies, .. }
| IOop::Read { dependencies, .. }
| IOop::ExtentFlushClose { dependencies, .. }
| IOop::ExtentLiveRepair { dependencies, .. }
Expand Down Expand Up @@ -1300,7 +1301,8 @@ impl DownstairsClient {
// XXX: Errors should be reported to nexus
IOop::Write { .. }
| IOop::WriteUnwritten { .. }
| IOop::Flush { .. } => {
| IOop::Flush { .. }
| IOop::Barrier { .. } => {
self.stats.downstairs_errors += 1;
}

Expand Down Expand Up @@ -1401,7 +1403,9 @@ impl DownstairsClient {
* as those jobs should never be acked before all three
* are done.
*/
IOop::Write { .. } | IOop::WriteUnwritten { .. } => {}
IOop::Write { .. }
| IOop::WriteUnwritten { .. }
| IOop::Barrier { .. } => {}
IOop::ExtentFlushClose { .. }
| IOop::ExtentLiveRepair { .. }
| IOop::ExtentLiveReopen { .. }
Expand Down Expand Up @@ -1508,6 +1512,16 @@ impl DownstairsClient {
}
self.last_flush = ds_id;
}
IOop::Barrier { .. } => {
assert!(read_data.blocks.is_empty());
assert!(read_data.data.is_empty());
assert!(extent_info.is_none());

if jobs_completed_ok == 3 {
ackable = true;
cdt::up__to__ds__barrier__done!(|| ds_id.0);
}
}
IOop::ExtentFlushClose { .. } => {
assert!(read_data.blocks.is_empty());
assert!(read_data.data.is_empty());
Expand Down
47 changes: 47 additions & 0 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,10 @@ impl Downstairs {
cdt::gw__flush__done!(|| (ds_id.0));
stats.add_flush();
}
IOop::Barrier { .. } => {
cdt::gw__barrier__done!(|| (ds_id.0));
stats.add_barrier();
}
IOop::ExtentFlushClose { extent, .. } => {
cdt::gw__close__done!(|| (ds_id.0, extent.0));
stats.add_flush_close();
Expand Down Expand Up @@ -1916,6 +1920,20 @@ impl Downstairs {
next_id
}

pub(crate) fn submit_barrier(&mut self) -> JobId {
let next_id = self.next_id();
cdt::gw__barrier__start!(|| (next_id.0));

// A barrier has the same deps as a flush (namely, it depends on all
// previous jobs and acts as the only dependency for all subsequent
// jobs)
let dependencies = self.ds_active.deps_for_flush(next_id);
debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}");

self.enqueue(next_id, IOop::Barrier { dependencies }, ClientMap::new());
next_id
}

/// Reserves repair IDs if impacted blocks overlap our extent under repair
fn check_repair_ids_for_range(&mut self, impacted_blocks: ImpactedBlocks) {
let Some(eur) = self.get_extent_under_repair() else {
Expand Down Expand Up @@ -2210,6 +2228,15 @@ impl Downstairs {
extent_limit,
}
}
IOop::Barrier { dependencies } => {
cdt::ds__barrier__client__start!(|| (ds_id.0, client_id.get()));
Message::Barrier {
upstairs_id: self.cfg.upstairs_id,
session_id: self.cfg.session_id,
job_id: ds_id,
dependencies,
}
}
IOop::Read {
dependencies,
start_eid,
Expand Down Expand Up @@ -2688,6 +2715,10 @@ impl Downstairs {
let job_type = "Flush".to_string();
(job_type, 0)
}
IOop::Barrier { .. } => {
let job_type = "Barrier".to_string();
(job_type, 0)
}
IOop::ExtentFlushClose { extent, .. } => {
let job_type = "FClose".to_string();
(job_type, extent.0 as usize)
Expand Down Expand Up @@ -2817,6 +2848,21 @@ impl Downstairs {
None,
)
}
Message::BarrierAck {
upstairs_id,
session_id,
job_id,
result,
} => {
cdt::ds__barrier__client__done!(|| (job_id.0, client_id.get()));
(
upstairs_id,
session_id,
job_id,
result.map(|_| Default::default()),
None,
)
}
Message::ReadResponse { header, data } => {
cdt::ds__read__client__done!(|| (
header.job_id.0,
Expand Down Expand Up @@ -3180,6 +3226,7 @@ impl Downstairs {
job.work,
IOop::Write { .. }
| IOop::Flush { .. }
| IOop::Barrier { .. }
| IOop::WriteUnwritten { .. }
| IOop::ExtentFlushClose { .. }
| IOop::ExtentLiveRepair { .. }
Expand Down
Loading