Skip to content

Commit

Permalink
Only send barriers when needed
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeter committed Oct 15, 2024
1 parent 3a7e2f8 commit d3e9973
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 42 deletions.
59 changes: 58 additions & 1 deletion upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ pub(crate) struct Downstairs {
/// buffered (i.e. none have been retired by a `Barrier` operation).
can_replay: bool,

/// How many `Flush` or `Barrier` operations are pending?
///
/// We only want to send a `Barrier` if there isn't already one pending, so
/// we track it here (incrementing in `submit_flush` / `submit_barrier` and
/// decrementing in `retire_check`).
pending_barrier: usize,

/// Ringbuf of completed downstairs job IDs.
completed: AllocRingBuffer<JobId>,

Expand Down Expand Up @@ -286,6 +293,7 @@ impl Downstairs {
cfg,
next_flush: 0,
can_replay: true,
pending_barrier: 0,
ds_active: ActiveJobs::new(),
completed: AllocRingBuffer::new(2048),
completed_jobs: AllocRingBuffer::new(8),
Expand Down Expand Up @@ -1935,6 +1943,7 @@ impl Downstairs {
extent_limit: extent_under_repair,
};

self.pending_barrier += 1;
self.enqueue(
next_id,
flush,
Expand All @@ -1944,6 +1953,47 @@ impl Downstairs {
next_id
}

/// Checks to see whether a `Barrier` operation is needed
///
/// A `Barrier` is needed if we have buffered more than
/// `IO_CACHED_MAX_BYTES/JOBS` worth of complete jobs, and there are no
/// other barrier (or flush) operations in flight
pub(crate) fn needs_barrier(&self) -> bool {
if self.pending_barrier > 0 {
return false;
}

// n.b. This may not be 100% reliable: if different Downstairs have
// finished a different subset of jobs, then it's theoretically possible
// for each DownstairsClient to be under our limits, but for the true
// number of cached bytes/jobs to be over the limits.
//
// It's hard to imagine how we could encounter such a situation, given
// job dependencies and no out-of-order execution, so this is more of a
// "fun fact" and less an actual concern.
let max_jobs = self
.clients
.iter()
.map(|c| {
let i = c.io_state_job_count();
i.skipped + i.done + i.error
})
.max()
.unwrap();
let max_bytes = self
.clients
.iter()
.map(|c| {
let i = c.io_state_byte_count();
i.skipped + i.done + i.error
})
.max()
.unwrap();

max_jobs as u64 > crate::IO_CACHED_MAX_JOBS
|| max_bytes > crate::IO_CACHED_MAX_BYTES
}

pub(crate) fn submit_barrier(&mut self) -> JobId {
let next_id = self.next_id();
cdt::gw__barrier__start!(|| (next_id.0));
Expand All @@ -1954,6 +2004,7 @@ impl Downstairs {
let dependencies = self.ds_active.deps_for_flush(next_id);
debug!(self.log, "IO Barrier {next_id} has deps {dependencies:?}");

self.pending_barrier += 1;
self.enqueue(
next_id,
IOop::Barrier { dependencies },
Expand Down Expand Up @@ -2694,13 +2745,19 @@ impl Downstairs {
let summary = job.io_summarize(id);
self.completed_jobs.push(summary);
for cid in ClientId::iter() {
self.clients[cid].retire_job(&job);
self.clients[cid].retire_job(job);
}
}
// Now that we've collected jobs to retire, remove them from the map
for &id in &retired {
let job = self.ds_active.remove(&id);

// Update our barrier count for the removed job
if matches!(job.work, IOop::Flush { .. } | IOop::Barrier { .. })
{
self.pending_barrier.checked_sub(1).unwrap();
}

// Jobs should have their backpressure contribution removed when
// they are completed (in `process_io_completion_inner`),
// **not** when they are retired. We'll do a sanity check here
Expand Down
15 changes: 13 additions & 2 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,19 @@ const IO_OUTSTANDING_MAX_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB
///
/// If we exceed this value, the upstairs will give up and mark that offline
/// downstairs as faulted.
pub const IO_OUTSTANDING_MAX_JOBS: usize = 10000;
const IO_OUTSTANDING_MAX_JOBS: usize = 10000;

/// Maximum of bytes to cache from complete (but un-flushed) IO
///
/// Caching complete jobs allows us to replay them if a Downstairs goes offline
/// them comes back.
const IO_CACHED_MAX_BYTES: u64 = 1024 * 1024 * 1024; // 1 GiB

/// Maximum of jobs to cache from complete (but un-flushed) IO
///
/// Caching complete jobs allows us to replay them if a Downstairs goes offline
/// them comes back.
const IO_CACHED_MAX_JOBS: u64 = 10000;

/// The BlockIO trait behaves like a physical NVMe disk (or a virtio virtual
/// disk): there is no contract about what order operations that are submitted
Expand Down Expand Up @@ -306,7 +318,6 @@ mod cdt {
fn up__action_deferred_message(_: u64) {}
fn up__action_leak_check(_: u64) {}
fn up__action_flush_check(_: u64) {}
fn up__action_barrier_check(_: u64) {}
fn up__action_stat_check(_: u64) {}
fn up__action_repair_check(_: u64) {}
fn up__action_control_check(_: u64) {}
Expand Down
45 changes: 6 additions & 39 deletions upstairs/src/upstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ pub struct UpCounters {
action_deferred_message: u64,
action_leak_check: u64,
action_flush_check: u64,
action_barrier_check: u64,
action_stat_check: u64,
action_repair_check: u64,
action_control_check: u64,
Expand All @@ -103,7 +102,6 @@ impl UpCounters {
action_deferred_message: 0,
action_leak_check: 0,
action_flush_check: 0,
action_barrier_check: 0,
action_stat_check: 0,
action_repair_check: 0,
action_control_check: 0,
Expand Down Expand Up @@ -212,14 +210,6 @@ pub(crate) struct Upstairs {
/// command we put on the work queue was not a flush.
need_flush: bool,

/// Marks whether a barrier is needed
///
/// The Upstairs keeps all IOs in memory until a flush or barrier is ACK'd
/// back from all three downstairs. This flag indicates that we have sent
/// IOs and have not sent a flush or barrier; we should send a flush or
/// barrier periodically to keep the dependency list down.
need_barrier: bool,

/// Statistics for this upstairs
///
/// Shared with the metrics producer, so this `struct` wraps a
Expand All @@ -240,9 +230,6 @@ pub(crate) struct Upstairs {
/// Next time to leak IOP / bandwidth tokens from the Guest
leak_deadline: Instant,

/// Next time to trigger a dependency barrier
barrier_deadline: Instant,

/// Next time to trigger an automatic flush
flush_deadline: Instant,

Expand Down Expand Up @@ -284,7 +271,6 @@ pub(crate) enum UpstairsAction {

LeakCheck,
FlushCheck,
BarrierCheck,
StatUpdate,
RepairCheck,
Control(ControlRequest),
Expand Down Expand Up @@ -417,15 +403,13 @@ impl Upstairs {
cfg,
repair_check_interval: None,
leak_deadline: deadline_secs(1.0),
barrier_deadline: deadline_secs(flush_timeout_secs),
flush_deadline: deadline_secs(flush_timeout_secs),
stat_deadline: deadline_secs(STAT_INTERVAL_SECS),
flush_timeout_secs,
guest,
guest_dropped: false,
ddef: rd_status,
need_flush: false,
need_barrier: false,
stats,
counters,
log,
Expand Down Expand Up @@ -532,9 +516,6 @@ impl Upstairs {
_ = sleep_until(self.leak_deadline) => {
UpstairsAction::LeakCheck
}
_ = sleep_until(self.barrier_deadline) => {
UpstairsAction::BarrierCheck
}
_ = sleep_until(self.flush_deadline) => {
UpstairsAction::FlushCheck
}
Expand Down Expand Up @@ -602,22 +583,6 @@ impl Upstairs {
self.submit_flush(None, None);
}
self.flush_deadline = deadline_secs(self.flush_timeout_secs);
self.barrier_deadline = deadline_secs(self.flush_timeout_secs);
}
UpstairsAction::BarrierCheck => {
self.counters.action_barrier_check += 1;
cdt::up__action_barrier_check!(|| (self
.counters
.action_barrier_check));
// Upgrade from a Barrier to a Flush if eligible
if self.need_flush && !self.downstairs.has_live_jobs() {
self.submit_flush(None, None);
self.flush_deadline =
deadline_secs(self.flush_timeout_secs);
} else if self.need_barrier {
self.submit_barrier();
}
self.barrier_deadline = deadline_secs(self.flush_timeout_secs);
}
UpstairsAction::StatUpdate => {
self.counters.action_stat_check += 1;
Expand Down Expand Up @@ -657,6 +622,12 @@ impl Upstairs {
self.flush_deadline = deadline_secs(self.flush_timeout_secs);
}

// Check whether we need to send a Barrier operation to clean out
// complete-but-unflushed jobs.
if self.downstairs.needs_barrier() {
self.submit_barrier()
}

// Check to see whether live-repair can continue
//
// This must be called before acking jobs, because it looks in
Expand Down Expand Up @@ -1309,7 +1280,6 @@ impl Upstairs {
// BlockOp::Flush level above.

self.need_flush = false;
self.need_barrier = false; // flushes also serve as a barrier

/*
* Get the next ID for our new guest work job. Note that the flush
Expand All @@ -1331,7 +1301,6 @@ impl Upstairs {
// guest_io_ready here. The upstairs itself calls submit_barrier
// without the guest being involved; indeed the guest is not allowed to
// call it!
self.need_barrier = false;
let ds_id = self.downstairs.submit_barrier();
self.guest.guest_work.submit_job(ds_id, false);

Expand Down Expand Up @@ -1378,7 +1347,6 @@ impl Upstairs {
}

self.need_flush = true;
self.need_barrier = true;

/*
* Given the offset and buffer size, figure out what extent and
Expand Down Expand Up @@ -1510,7 +1478,6 @@ impl Upstairs {
* handles the operation(s) on the storage side.
*/
self.need_flush = true;
self.need_barrier = true;

/*
* Grab this ID after extent_from_offset: in case of Err we don't
Expand Down

0 comments on commit d3e9973

Please sign in to comment.