Skip to content

Commit

Permalink
pageserver: reorder upload queue when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Dec 20, 2024
1 parent 9c53b41 commit 11ee986
Showing 1 changed file with 73 additions and 41 deletions.
114 changes: 73 additions & 41 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ use camino::Utf8Path;
use chrono::{NaiveDateTime, Utc};

pub(crate) use download::download_initdb_tar_zst;
use itertools::Itertools as _;
use pageserver_api::models::TimelineArchivalState;
use pageserver_api::shard::{ShardIndex, TenantShardId};
use regex::Regex;
Expand Down Expand Up @@ -1797,55 +1798,86 @@ impl RemoteTimelineClient {
Ok(())
}

/// Returns true if a can bypass b, i.e. if the operations don't conflict.
///
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
///
/// The caller needs to already hold the `upload_queue` lock.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some(next_op) = upload_queue.queued_operations.front() {
// Can we run this task now?
let can_run_now = match next_op {
UploadOp::UploadLayer(..) => {
// Can always be scheduled.
true
}
UploadOp::UploadMetadata { .. } => {
// These can only be performed after all the preceding operations
// have finished.
upload_queue.inprogress_tasks.is_empty()
}
UploadOp::Delete(..) => {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}
/// TODO: consider moving this and other associated logic into UploadOp and UploadQueue.
fn can_bypass(a: &UploadOp, b: &UploadOp) -> bool {
match (a, b) {
// Nothing can bypass a barrier or shutdown, and it can't bypass anything.
(UploadOp::Barrier(_), _) | (_, UploadOp::Barrier(_)) => false,
(UploadOp::Shutdown, _) | (_, UploadOp::Shutdown) => false,

// Uploads and deletes can bypass each other unless they're for the same file.
// TODO: index and memoize filenames.
(UploadOp::UploadLayer(a, _, _), UploadOp::UploadLayer(b, _, _)) => {
a.layer_desc().layer_name() != b.layer_desc().layer_name()
}
(UploadOp::UploadLayer(u, _, _), UploadOp::Delete(d))
| (UploadOp::Delete(d), UploadOp::UploadLayer(u, _, _)) => !d
.layers
.iter()
.map(|(name, _)| name)
.contains(&u.layer_desc().layer_name()),

// Deletes are idempotent and can always bypass each other
// TODO: verify this.
(UploadOp::Delete(_), UploadOp::Delete(_)) => true,

// Uploads and indexes can bypass each other unless they overlap.
(UploadOp::UploadLayer(u, _, _), UploadOp::UploadMetadata { uploaded: i })
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::UploadLayer(u, _, _)) => {
!i.layer_metadata.contains_key(&u.layer_desc().layer_name())
}

UploadOp::Barrier(_) | UploadOp::Shutdown => {
upload_queue.inprogress_tasks.is_empty()
}
};
// Deletes and indexes can bypass each other unless they overlap.
(UploadOp::Delete(d), UploadOp::UploadMetadata { uploaded: i })
| (UploadOp::UploadMetadata { uploaded: i }, UploadOp::Delete(d)) => !d
.layers
.iter()
.any(|(name, _)| i.layer_metadata.contains_key(name)),

// If we cannot launch this task, don't look any further.
//
// In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
// them now, but we don't try to do that currently. For example, if the frontmost task
// is an index-file upload that cannot proceed until preceding uploads have finished, we
// could still start layer uploads that were scheduled later.
if !can_run_now {
break;
// Indexes can never bypass each other.
// TODO: they can coalesce though, consider this.
(UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false,
}
}

/// Returns and removes the next operation from the queue, if any. This isn't necessarily the
/// first task in the queue, to avoid head-of-line blocking.
///
/// TODO: consider limiting the number of in-progress tasks.
fn next_queued_task(self: &Arc<Self>, queue: &mut UploadQueueInitialized) -> Option<UploadOp> {
// For each queued operation, check if it can bypass the in-progress and queued operations
// ahead of it (if any).
for (i, op) in queue.queued_operations.iter().enumerate() {
let in_progress = queue.inprogress_tasks.values().map(|task| &task.op);
let ahead = queue.queued_operations.iter().take(i);
if ahead.chain(in_progress).all(|q| Self::can_bypass(op, q)) {
return queue.queued_operations.remove(i);
}

if let UploadOp::Shutdown = next_op {
// leave the op in the queue but do not start more tasks; it will be dropped when
// the stop is called.
upload_queue.shutdown_ready.close();
break;
// Nothing can cross a barrier, so give up if we didn't already return it above.
if matches!(op, UploadOp::Barrier(_) | UploadOp::Shutdown) {
return None;
}
}
None
}

// We can launch this task. Remove it from the queue first.
let mut next_op = upload_queue.queued_operations.pop_front().unwrap();
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
///
/// The caller needs to already hold the `upload_queue` lock.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
// Check for a shutdown. Leave it in the queue, but don't start more tasks. It will be
// dropped on stop.
if let Some(UploadOp::Shutdown) = upload_queue.queued_operations.front() {
upload_queue.shutdown_ready.close();
return;
}

debug!("starting op: {}", next_op);
while let Some(mut next_op) = self.next_queued_task(upload_queue) {
debug!("starting op: {next_op}");

// Update the counters and prepare
match &mut next_op {
Expand Down

0 comments on commit 11ee986

Please sign in to comment.