Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: coalesce index uploads when possible #10248

Open
wants to merge 1 commit into
base: erik/upload-reorder
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pageserver/benches/upload_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) {
task_id,
retries: AtomicU32::new(0),
op: delete.clone(),
coalesced_ops: Vec::new(),
}),
);
}
Expand Down
6 changes: 5 additions & 1 deletion pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1800,7 +1800,7 @@ impl RemoteTimelineClient {
/// This can launch an unbounded number of queued tasks. `UploadQueue::next_ready()` also has
/// worst-case quadratic cost in the number of tasks, and may struggle beyond 10,000 tasks.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some(mut next_op) = upload_queue.next_ready() {
while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() {
debug!("starting op: {next_op}");

// Prepare upload.
Expand Down Expand Up @@ -1838,6 +1838,7 @@ impl RemoteTimelineClient {
let task = Arc::new(UploadTask {
task_id: upload_task_id,
op: next_op,
coalesced_ops,
retries: AtomicU32::new(0),
});
upload_queue
Expand Down Expand Up @@ -2205,6 +2206,9 @@ impl RemoteTimelineClient {
}

self.metric_end(&task.op);
for coalesced_op in &task.coalesced_ops {
self.metric_end(coalesced_op);
}
}

fn metric_impl(
Expand Down
81 changes: 56 additions & 25 deletions pageserver/src/tenant/upload_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ use tracing::info;
static DISABLE_UPLOAD_QUEUE_REORDERING: Lazy<bool> =
Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_REORDERING").as_deref() == Ok("true"));

/// Kill switch for index upload coalescing in case it causes problems.
/// TODO: remove this once we have confidence in it.
static DISABLE_UPLOAD_QUEUE_INDEX_COALESCING: Lazy<bool> =
Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_INDEX_COALESCING").as_deref() == Ok("true"));

// clippy warns that Uninitialized is much smaller than Initialized, which wastes
// memory for Uninitialized variants. Doesn't matter in practice, there are not
// that many upload queues in a running pageserver, and most of them are initialized
Expand Down Expand Up @@ -127,8 +132,10 @@ impl UploadQueueInitialized {
/// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump
/// the queue if it doesn't conflict with operations ahead of it.
///
/// Also returns any operations that were coalesced into this one, e.g. multiple index uploads.
///
/// None may be returned even if the queue isn't empty, if no operations are ready yet.
pub fn next_ready(&mut self) -> Option<UploadOp> {
pub fn next_ready(&mut self) -> Option<(UploadOp, Vec<UploadOp>)> {
// NB: this is quadratic, but queues are expected to be small.
for (i, candidate) in self.queued_operations.iter().enumerate() {
// If this candidate is ready, go for it. Otherwise, try the next one.
Expand All @@ -142,7 +149,31 @@ impl UploadQueueInitialized {
return None;
}

return self.queued_operations.remove(i);
let mut op = self.queued_operations.remove(i).expect("i can't disappear");

// Coalesce any back-to-back index uploads by only uploading the newest one that's
// ready. This typically happens with layer/index/layer/index/... sequences, where
// the layers bypass the indexes, leaving the indexes queued.
//
// If other operations are interleaved between index uploads we don't try to
// coalesce them, since we may as well update the index concurrently with them.
// This keeps the index fresh and avoids starvation.
let mut replaced_ops = Vec::new();
if matches!(op, UploadOp::UploadMetadata { .. }) {
while let Some(UploadOp::UploadMetadata { .. }) = self.queued_operations.get(i)
{
if *DISABLE_UPLOAD_QUEUE_INDEX_COALESCING {
break;
}
if !self.is_ready(i) {
break;
}
replaced_ops.push(op);
op = self.queued_operations.remove(i).expect("i can't disappear");
}
}

return Some((op, replaced_ops));
}

// Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up.
Expand Down Expand Up @@ -216,11 +247,12 @@ impl UploadQueueInitialized {
fn schedule_ready(&mut self) -> Vec<Arc<UploadTask>> {
let mut tasks = Vec::new();
// NB: schedule operations one by one, to handle conflicts with inprogress_tasks.
while let Some(op) = self.next_ready() {
while let Some((op, coalesced_ops)) = self.next_ready() {
self.task_counter += 1;
let task = Arc::new(UploadTask {
task_id: self.task_counter,
op,
coalesced_ops,
retries: 0.into(),
});
self.inprogress_tasks.insert(task.task_id, task.clone());
Expand Down Expand Up @@ -394,9 +426,13 @@ impl UploadQueue {
pub struct UploadTask {
/// Unique ID of this task. Used as the key in `inprogress_tasks` above.
pub task_id: u64,
/// Number of task retries.
pub retries: AtomicU32,

/// The upload operation.
pub op: UploadOp,
/// Any upload operations that were coalesced into this operation. This typically happens with
/// back-to-back index uploads, see `UploadQueueInitialized::next_ready()`.
pub coalesced_ops: Vec<UploadOp>,
}

/// A deletion of some layers within the lifetime of a timeline. This is not used
Expand Down Expand Up @@ -499,9 +535,8 @@ impl UploadOp {
})
}

// Indexes can never bypass each other.
// TODO: we could coalesce them though, by only uploading the newest ready index. This
// is left for later, out of caution.
// Indexes can never bypass each other. They can coalesce though, and
// `UploadQueue::next_ready()` currently does this when possible.
(UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false,
}
}
Expand Down Expand Up @@ -884,9 +919,9 @@ mod tests {
Ok(())
}

/// Index uploads are serialized.
/// Index uploads are coalesced.
#[test]
fn schedule_index_serial() -> anyhow::Result<()> {
fn schedule_index_coalesce() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?;

Expand All @@ -907,13 +942,11 @@ mod tests {

queue.queued_operations.extend(ops.clone());

// The uploads should run serially.
for op in ops {
let tasks = queue.schedule_ready();
assert_eq!(tasks.len(), 1);
assert_same_op(&tasks[0].op, &op);
queue.complete(tasks[0].task_id);
}
// The index uploads are coalesced into a single operation.
let tasks = queue.schedule_ready();
assert_eq!(tasks.len(), 1);
assert_same_op(&tasks[0].op, &ops[2]);
assert_same_ops(&tasks[0].coalesced_ops, &ops[0..2]);

assert!(queue.queued_operations.is_empty());

Expand Down Expand Up @@ -972,18 +1005,14 @@ mod tests {
assert_same_op(&index_tasks[0].op, &ops[1]);
queue.complete(index_tasks[0].task_id);

// layer 1 completes. This unblocks index 1 then index 2.
// layer 1 completes. This unblocks index 1 and 2, which coalesce into
// a single upload for index 2.
queue.complete(upload_tasks[1].task_id);

let index_tasks = queue.schedule_ready();
assert_eq!(index_tasks.len(), 1);
assert_same_op(&index_tasks[0].op, &ops[3]);
queue.complete(index_tasks[0].task_id);

let index_tasks = queue.schedule_ready();
assert_eq!(index_tasks.len(), 1);
assert_same_op(&index_tasks[0].op, &ops[5]);
queue.complete(index_tasks[0].task_id);
assert_same_ops(&index_tasks[0].coalesced_ops, &ops[3..4]);

assert!(queue.queued_operations.is_empty());

Expand All @@ -1005,11 +1034,12 @@ mod tests {
let index_deref = index_without(&index_upload, &layer);

let ops = [
// Initial upload.
// Initial upload, with a barrier to prevent index coalescing.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index_upload.clone(),
},
UploadOp::Barrier(tokio::sync::watch::channel(()).0),
// Dereference the layer and delete it.
UploadOp::UploadMetadata {
uploaded: index_deref.clone(),
Expand Down Expand Up @@ -1050,11 +1080,12 @@ mod tests {
let index_ref = index_with(&index_deref, &layer);

let ops = [
// Initial upload.
// Initial upload, with a barrier to prevent index coalescing.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index_upload.clone(),
},
UploadOp::Barrier(tokio::sync::watch::channel(()).0),
// Dereference the layer.
UploadOp::UploadMetadata {
uploaded: index_deref.clone(),
Expand Down
Loading