Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track both jobs and bytes in each IO state #1507

Merged
merged 3 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions upstairs/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,11 @@ pub(crate) struct DownstairsClient {
/// this handle should never be dropped before that point.
client_task: ClientTaskHandle,

/// IO state counters
pub(crate) io_state_count: ClientIOStateCount,
/// Number of jobs in each IO state
io_state_job_count: ClientIOStateCount,

/// Number of bytes associated with each IO state
io_state_byte_count: ClientIOStateCount<u64>,

/// Jobs, write bytes, and total IO bytes in this client's queue
///
Expand Down Expand Up @@ -229,7 +232,8 @@ impl DownstairsClient {
skipped_jobs: BTreeSet::new(),
region_metadata: None,
repair_info: None,
io_state_count: ClientIOStateCount::new(),
io_state_job_count: ClientIOStateCount::default(),
io_state_byte_count: ClientIOStateCount::default(),
backpressure_counters: BackpressureCounters::new(),
connection_id: ConnectionId(0),
client_delay_us,
Expand Down Expand Up @@ -268,7 +272,8 @@ impl DownstairsClient {
skipped_jobs: BTreeSet::new(),
region_metadata: None,
repair_info: None,
io_state_count: ClientIOStateCount::new(),
io_state_job_count: ClientIOStateCount::default(),
io_state_byte_count: ClientIOStateCount::default(),
backpressure_counters: BackpressureCounters::new(),
connection_id: ConnectionId(0),
client_delay_us,
Expand Down Expand Up @@ -339,17 +344,19 @@ impl DownstairsClient {
}
}

/// Sets a job state, handling `io_state_count` counters
/// Sets a job state, handling `io_state/byte_count` counters
fn set_job_state(
&mut self,
job: &mut DownstairsIO,
new_state: IOState,
) -> IOState {
let is_running = matches!(new_state, IOState::InProgress);
self.io_state_count.incr(&new_state);
self.io_state_job_count[&new_state] += 1;
self.io_state_byte_count[&new_state] += job.work.job_bytes();
let old_state = job.state.insert(self.client_id, new_state);
let was_running = matches!(old_state, IOState::InProgress);
self.io_state_count.decr(&old_state);
self.io_state_job_count[&old_state] -= 1;
self.io_state_byte_count[&old_state] -= job.work.job_bytes();

// Update our bytes-in-flight counter
if was_running && !is_running {
Expand All @@ -372,6 +379,24 @@ impl DownstairsClient {
old_state
}

/// Retire a job state, handling `io_state/byte_count` counters
pub(crate) fn retire_job(&mut self, job: &DownstairsIO) {
let state = &job.state[self.client_id];
self.io_state_job_count[state] -= 1;
self.io_state_byte_count[state] -= job.work.job_bytes();
}

/// Returns the number of jobs in each IO state
pub(crate) fn io_state_job_count(&self) -> ClientIOStateCount {
self.io_state_job_count
}

/// Returns the number of bytes associated with each IO state
#[allow(unused)] // XXX this will be used in the future!
pub(crate) fn io_state_byte_count(&self) -> ClientIOStateCount<u64> {
self.io_state_byte_count
}

/// Returns a client-specialized copy of the job's `IOop`
///
/// Dependencies are pruned if we're in live-repair, and the `extent_limit`
Expand Down Expand Up @@ -879,7 +904,7 @@ impl DownstairsClient {
/// Returns `true` if it should be sent and `false` otherwise
///
/// If the job should be skipped, then it is added to `self.skipped_jobs`.
/// `self.io_state_count` is updated with the incoming job state.
/// `self.io_state_job_count` is updated with the incoming job state.
#[must_use]
pub(crate) fn enqueue(
&mut self,
Expand All @@ -901,12 +926,14 @@ impl DownstairsClient {
self.skipped_jobs.insert(ds_id);
}

// Update our backpressure guard if we're going to send this job
self.io_state_count.incr(if should_send {
// Update our state counters based on the job state
let state = if should_send {
&IOState::InProgress
} else {
&IOState::Skipped
});
};
self.io_state_job_count[&state] += 1;
self.io_state_byte_count[&state] += io.job_bytes();
should_send
}

Expand Down Expand Up @@ -2252,7 +2279,7 @@ impl DownstairsClient {
}

pub(crate) fn total_live_work(&self) -> usize {
(self.io_state_count.new + self.io_state_count.in_progress) as usize
self.io_state_job_count.in_progress as usize
}

pub(crate) fn total_bytes_outstanding(&self) -> usize {
Expand Down
5 changes: 2 additions & 3 deletions upstairs/src/downstairs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2671,8 +2671,7 @@ impl Downstairs {
let summary = job.io_summarize(id);
self.completed_jobs.push(summary);
for cid in ClientId::iter() {
let old_state = &job.state[cid];
self.clients[cid].io_state_count.decr(old_state);
self.clients[cid].retire_job(&job);
}
}
// Now that we've collected jobs to retire, remove them from the map
Expand Down Expand Up @@ -2803,7 +2802,7 @@ impl Downstairs {
}

pub fn io_state_count(&self) -> IOStateCount {
let d = self.collect_stats(|c| c.io_state_count);
let d = self.collect_stats(|c| c.io_state_job_count());
let f = |g: fn(ClientIOStateCount) -> u32| {
ClientData([g(d[0]), g(d[1]), g(d[2])])
};
Expand Down
58 changes: 19 additions & 39 deletions upstairs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -991,20 +991,7 @@ impl DownstairsIO {
* We don't consider repair IOs in the size calculation.
*/
pub fn io_size(&self) -> usize {
match &self.work {
IOop::Write { data, .. } | IOop::WriteUnwritten { data, .. } => {
data.len()
}
IOop::Read {
count, block_size, ..
} => (*count * *block_size) as usize,
IOop::Flush { .. }
| IOop::Barrier { .. }
| IOop::ExtentFlushClose { .. }
| IOop::ExtentLiveRepair { .. }
| IOop::ExtentLiveReopen { .. }
| IOop::ExtentLiveNoOp { .. } => 0,
}
self.work.job_bytes() as usize
}

/*
Expand Down Expand Up @@ -1423,36 +1410,29 @@ impl fmt::Display for IOState {
}
}

#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
pub struct ClientIOStateCount {
pub new: u32,
pub in_progress: u32,
pub done: u32,
pub skipped: u32,
pub error: u32,
#[derive(Debug, Default, Copy, Clone, Serialize, Deserialize)]
pub struct ClientIOStateCount<T = u32> {
pub in_progress: T,
pub done: T,
pub skipped: T,
pub error: T,
}

impl ClientIOStateCount {
fn new() -> ClientIOStateCount {
ClientIOStateCount {
new: 0,
in_progress: 0,
done: 0,
skipped: 0,
error: 0,
impl<T> std::ops::Index<&IOState> for ClientIOStateCount<T> {
type Output = T;
fn index(&self, index: &IOState) -> &Self::Output {
match index {
IOState::InProgress => &self.in_progress,
IOState::Done => &self.done,
IOState::Skipped => &self.skipped,
IOState::Error(_) => &self.error,
}
}
}

pub fn incr(&mut self, state: &IOState) {
*self.get_mut(state) += 1;
}

pub fn decr(&mut self, state: &IOState) {
*self.get_mut(state) -= 1;
}

fn get_mut(&mut self, state: &IOState) -> &mut u32 {
match state {
impl<T> std::ops::IndexMut<&IOState> for ClientIOStateCount<T> {
fn index_mut(&mut self, index: &IOState) -> &mut Self::Output {
match index {
IOState::InProgress => &mut self.in_progress,
IOState::Done => &mut self.done,
IOState::Skipped => &mut self.skipped,
Expand Down