Skip to content

Commit

Permalink
pageserver: coalesce index uploads when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jan 2, 2025
1 parent dad6139 commit 76698d5
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 26 deletions.
6 changes: 5 additions & 1 deletion pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1807,7 +1807,7 @@ impl RemoteTimelineClient {
/// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does,
/// to avoid tenants starving other tenants.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some(mut next_op) = upload_queue.next_ready() {
while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() {
debug!("starting op: {next_op}");

// Prepare upload.
Expand Down Expand Up @@ -1845,6 +1845,7 @@ impl RemoteTimelineClient {
let task = Arc::new(UploadTask {
task_id: upload_task_id,
op: next_op,
coalesced_ops,
retries: AtomicU32::new(0),
});
upload_queue
Expand Down Expand Up @@ -2210,6 +2211,9 @@ impl RemoteTimelineClient {
}

self.metric_end(&task.op);
for coalesced_op in &task.coalesced_ops {
self.metric_end(coalesced_op);
}
}

fn metric_impl(
Expand Down
81 changes: 56 additions & 25 deletions pageserver/src/tenant/upload_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ use utils::generation::Generation;
static DISABLE_UPLOAD_QUEUE_REORDERING: Lazy<bool> =
Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_REORDERING").as_deref() == Ok("true"));

/// Kill switch for index upload coalescing in case it causes problems.
/// TODO: remove this once we have confidence in it.
static DISABLE_UPLOAD_QUEUE_INDEX_COALESCING: Lazy<bool> =
Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_INDEX_COALESCING").as_deref() == Ok("true"));

// clippy warns that Uninitialized is much smaller than Initialized, which wastes
// memory for Uninitialized variants. Doesn't matter in practice, there are not
// that many upload queues in a running pageserver, and most of them are initialized
Expand Down Expand Up @@ -130,8 +135,10 @@ impl UploadQueueInitialized {
/// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump
/// the queue if it doesn't conflict with operations ahead of it.
///
/// Also returns any operations that were coalesced into this one, e.g. multiple index uploads.
///
/// None may be returned even if the queue isn't empty, if no operations are ready yet.
pub(crate) fn next_ready(&mut self) -> Option<UploadOp> {
pub(crate) fn next_ready(&mut self) -> Option<(UploadOp, Vec<UploadOp>)> {
// NB: this is quadratic, but queues are expected to be small.
for (i, candidate) in self.queued_operations.iter().enumerate() {
// If this candidate is ready, go for it. Otherwise, try the next one.
Expand All @@ -145,7 +152,31 @@ impl UploadQueueInitialized {
return None;
}

return self.queued_operations.remove(i);
let mut op = self.queued_operations.remove(i).expect("i can't disappear");

// Coalesce any back-to-back index uploads by only uploading the newest one that's
// ready. This typically happens with layer/index/layer/index/... sequences, where
// the layers bypass the indexes, leaving the indexes queued.
//
// If other operations are interleaved between index uploads we don't try to
// coalesce them, since we may as well update the index concurrently with them.
// This keeps the index fresh and avoids starvation.
let mut replaced_ops = Vec::new();
if matches!(op, UploadOp::UploadMetadata { .. }) {
while let Some(UploadOp::UploadMetadata { .. }) = self.queued_operations.get(i)
{
if *DISABLE_UPLOAD_QUEUE_INDEX_COALESCING {
break;
}
if !self.is_ready(i) {
break;
}
replaced_ops.push(op);
op = self.queued_operations.remove(i).expect("i can't disappear");
}
}

return Some((op, replaced_ops));
}

// Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up.
Expand Down Expand Up @@ -219,11 +250,12 @@ impl UploadQueueInitialized {
fn schedule_ready(&mut self) -> Vec<Arc<UploadTask>> {
let mut tasks = Vec::new();
// NB: schedule operations one by one, to handle conflicts with inprogress_tasks.
while let Some(op) = self.next_ready() {
while let Some((op, coalesced_ops)) = self.next_ready() {
self.task_counter += 1;
let task = Arc::new(UploadTask {
task_id: self.task_counter,
op,
coalesced_ops,
retries: 0.into(),
});
self.inprogress_tasks.insert(task.task_id, task.clone());
Expand Down Expand Up @@ -399,9 +431,13 @@ impl UploadQueue {
pub(crate) struct UploadTask {
/// Unique ID of this task. Used as the key in `inprogress_tasks` above.
pub(crate) task_id: u64,
/// Number of task retries.
pub(crate) retries: AtomicU32,

/// The upload operation.
pub(crate) op: UploadOp,
/// Any upload operations that were coalesced into this operation. This typically happens with
/// back-to-back index uploads, see `UploadQueueInitialized::next_ready()`.
pub(crate) coalesced_ops: Vec<UploadOp>,
}

/// A deletion of some layers within the lifetime of a timeline. This is not used
Expand Down Expand Up @@ -504,9 +540,8 @@ impl UploadOp {
})
}

// Indexes can never bypass each other.
// TODO: we could coalesce them though, by only uploading the newest ready index. This
// is left for later, out of caution.
// Indexes can never bypass each other. They can coalesce though, by dropping the
// earlier index upload -- next_queued_task() currently does this when possible.
(UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false,
}
}
Expand Down Expand Up @@ -889,9 +924,9 @@ mod tests {
Ok(())
}

/// Index uploads are serialized.
/// Index uploads are coalesced.
#[test]
fn schedule_index_serial() -> anyhow::Result<()> {
fn schedule_index_coalesce() -> anyhow::Result<()> {
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?;

Expand All @@ -912,13 +947,11 @@ mod tests {

queue.queued_operations.extend(ops.clone());

// The uploads should run serially.
for op in ops {
let tasks = queue.schedule_ready();
assert_eq!(tasks.len(), 1);
assert_same_op(&tasks[0].op, &op);
queue.complete(tasks[0].task_id);
}
// The index uploads are coalesced into a single operation.
let tasks = queue.schedule_ready();
assert_eq!(tasks.len(), 1);
assert_same_op(&tasks[0].op, &ops[2]);
assert_same_ops(&tasks[0].coalesced_ops, &ops[0..2]);

assert!(queue.queued_operations.is_empty());

Expand Down Expand Up @@ -977,18 +1010,14 @@ mod tests {
assert_same_op(&index_tasks[0].op, &ops[1]);
queue.complete(index_tasks[0].task_id);

// layer 1 completes. This unblocks index 1 then index 2.
// layer 1 completes. This unblocks index 1 and 2, which coalesce into
// a single upload for index 2.
queue.complete(upload_tasks[1].task_id);

let index_tasks = queue.schedule_ready();
assert_eq!(index_tasks.len(), 1);
assert_same_op(&index_tasks[0].op, &ops[3]);
queue.complete(index_tasks[0].task_id);

let index_tasks = queue.schedule_ready();
assert_eq!(index_tasks.len(), 1);
assert_same_op(&index_tasks[0].op, &ops[5]);
queue.complete(index_tasks[0].task_id);
assert_same_ops(&index_tasks[0].coalesced_ops, &ops[3..4]);

assert!(queue.queued_operations.is_empty());

Expand All @@ -1010,11 +1039,12 @@ mod tests {
let index_deref = index_without(&index_upload, &layer);

let ops = [
// Initial upload.
// Initial upload, with a barrier to prevent index coalescing.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index_upload.clone(),
},
UploadOp::Barrier(tokio::sync::watch::channel(()).0),
// Dereference the layer and delete it.
UploadOp::UploadMetadata {
uploaded: index_deref.clone(),
Expand Down Expand Up @@ -1055,11 +1085,12 @@ mod tests {
let index_ref = index_with(&index_deref, &layer);

let ops = [
// Initial upload.
// Initial upload, with a barrier to prevent index coalescing.
UploadOp::UploadLayer(layer.clone(), layer.metadata(), None),
UploadOp::UploadMetadata {
uploaded: index_upload.clone(),
},
UploadOp::Barrier(tokio::sync::watch::channel(()).0),
// Dereference the layer.
UploadOp::UploadMetadata {
uploaded: index_deref.clone(),
Expand Down

0 comments on commit 76698d5

Please sign in to comment.