Skip to content

Commit

Permalink
pageserver: reorder upload queue when possible
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jan 4, 2025
1 parent 4b2f568 commit 419019e
Show file tree
Hide file tree
Showing 8 changed files with 1,028 additions and 126 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ postgres_backend.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true
postgres_initdb.workspace = true
pprof.workspace = true
rand.workspace = true
range-set-blaze = { version = "0.1.16", features = ["alloc"] }
regex.workspace = true
Expand Down Expand Up @@ -108,3 +109,7 @@ harness = false
[[bench]]
name = "bench_ingest"
harness = false

[[bench]]
name = "upload_queue"
harness = false
85 changes: 85 additions & 0 deletions pageserver/benches/upload_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
//! Upload queue benchmarks.
use std::str::FromStr as _;
use std::sync::atomic::AtomicU32;
use std::sync::Arc;

use criterion::{criterion_group, criterion_main, Bencher, Criterion};
use pageserver::tenant::metadata::TimelineMetadata;
use pageserver::tenant::remote_timeline_client::index::LayerFileMetadata;
use pageserver::tenant::storage_layer::{IndexPart, LayerName};
use pageserver::tenant::upload_queue::{Delete, UploadOp, UploadQueue, UploadTask};
use pprof::criterion::{Output, PProfProfiler};
use utils::generation::Generation;
use utils::shard::{ShardCount, ShardIndex, ShardNumber};

// Register benchmarks with Criterion.
criterion_group!(
name = benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = bench_upload_queue_next_ready,
);
criterion_main!(benches);

/// Benchmarks the cost of UploadQueue::next_ready() with the given number of in-progress tasks
/// (which is equivalent to tasks ahead of it in the queue). This has linear cost, and the upload
/// queue as a whole is thus quadratic.
///
/// UploadOp::UploadLayer requires an entire tenant and timeline to construct, so we just test
/// Delete and UploadMetadata instead. This is incidentally the most expensive case.
fn bench_upload_queue_next_ready(c: &mut Criterion) {
let mut g = c.benchmark_group("upload_queue_next_ready");
for inprogress in [0, 1, 10, 100, 1_000, 10_000, 100_000, 1_000_000] {
g.bench_function(format!("inprogress={inprogress}"), |b| {
run_bench(b, inprogress).unwrap()
});
}

fn run_bench(b: &mut Bencher, inprogress: usize) -> anyhow::Result<()> {
// Construct two layers. layer0 is in the indexes, layer1 will be deleted.
let layer0 = LayerName::from_str("000000000000000000000000000000000000-100000000000000000000000000000000000__00000000016B59D8-00000000016B5A51").expect("invalid name");
let layer1 = LayerName::from_str("100000000000000000000000000000000001-200000000000000000000000000000000000__00000000016B59D8-00000000016B5A51").expect("invalid name");

let metadata = LayerFileMetadata {
shard: ShardIndex::new(ShardNumber(1), ShardCount(2)),
generation: Generation::Valid(1),
file_size: 0,
};

// Construct the (initial and uploaded) index with layer0.
let mut index = IndexPart::empty(TimelineMetadata::example());
index.layer_metadata.insert(layer0, metadata.clone());

// Construct the queue.
let mut queue = UploadQueue::Uninitialized;
let queue = queue.initialize_with_current_remote_index_part(&index)?;

// Populate inprogress_tasks with a bunch of layer1 deletions.
let delete = UploadOp::Delete(Delete {
layers: vec![(layer1, metadata)],
});

for task_id in 0..(inprogress as u64) {
queue.inprogress_tasks.insert(
task_id,
Arc::new(UploadTask {
task_id,
retries: AtomicU32::new(0),
op: delete.clone(),
}),
);
}

// Benchmark index upload scheduling.
let index_upload = UploadOp::UploadMetadata {
uploaded: Box::new(index),
};

b.iter(|| {
queue.queued_operations.push_front(index_upload.clone());
assert!(queue.next_ready().is_some());
});

Ok(())
}
}
1 change: 0 additions & 1 deletion pageserver/src/tenant/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ impl TimelineMetadata {

// Checksums make it awkward to build a valid instance by hand. This helper
// provides a TimelineMetadata with a valid checksum in its header.
#[cfg(test)]
pub fn example() -> Self {
let instance = Self::new(
"0/16960E8".parse::<Lsn>().unwrap(),
Expand Down
101 changes: 18 additions & 83 deletions pageserver/src/tenant/remote_timeline_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,18 @@
//! The contract between client and its user is that the user is responsible of
//! scheduling operations in an order that keeps the remote consistent as
//! described above.
//!
//! From the user's perspective, the operations are executed sequentially.
//! Internally, the client knows which operations can be performed in parallel,
//! and which operations act like a "barrier" that require preceding operations
//! to finish. The calling code just needs to call the schedule-functions in the
//! correct order, and the client will parallelize the operations in a way that
//! is safe.
//!
//! The caller should be careful with deletion, though. They should not delete
//! local files that have been scheduled for upload but not yet finished uploading.
//! Otherwise the upload will fail. To wait for an upload to finish, use
//! the 'wait_completion' function (more on that later.)
//! is safe. For more details, see `UploadOp::can_bypass`.
//!
//! All of this relies on the following invariants:
//!
//! - We rely on read-after write consistency in the remote storage.
//! - Layer files are immutable
//! - Layer files are immutable.
//!
//! NB: Pageserver assumes that it has exclusive write access to the tenant in remote
//! storage. Different tenants can be attached to different pageservers, but if the
Expand Down Expand Up @@ -1797,57 +1793,17 @@ impl RemoteTimelineClient {
Ok(())
}

///
/// Pick next tasks from the queue, and start as many of them as possible without violating
/// the ordering constraints.
///
/// The caller needs to already hold the `upload_queue` lock.
/// TODO: consider limiting the number of in-progress tasks, beyond what remote_storage does.
/// This can launch an unbounded number of queued tasks. `UploadQueue::next_ready()` also has
/// worst-case quadratic cost in the number of tasks, and may struggle beyond 10,000 tasks.
fn launch_queued_tasks(self: &Arc<Self>, upload_queue: &mut UploadQueueInitialized) {
while let Some(next_op) = upload_queue.queued_operations.front() {
// Can we run this task now?
let can_run_now = match next_op {
UploadOp::UploadLayer(..) => {
// Can always be scheduled.
true
}
UploadOp::UploadMetadata { .. } => {
// These can only be performed after all the preceding operations
// have finished.
upload_queue.inprogress_tasks.is_empty()
}
UploadOp::Delete(..) => {
// Wait for preceding uploads to finish. Concurrent deletions are OK, though.
upload_queue.num_inprogress_deletions == upload_queue.inprogress_tasks.len()
}

UploadOp::Barrier(_) | UploadOp::Shutdown => {
upload_queue.inprogress_tasks.is_empty()
}
};

// If we cannot launch this task, don't look any further.
//
// In some cases, we could let some non-frontmost tasks to "jump the queue" and launch
// them now, but we don't try to do that currently. For example, if the frontmost task
// is an index-file upload that cannot proceed until preceding uploads have finished, we
// could still start layer uploads that were scheduled later.
if !can_run_now {
break;
}
while let Some(mut next_op) = upload_queue.next_ready() {
debug!("starting op: {next_op}");

if let UploadOp::Shutdown = next_op {
// leave the op in the queue but do not start more tasks; it will be dropped when
// the stop is called.
upload_queue.shutdown_ready.close();
break;
}

// We can launch this task. Remove it from the queue first.
let mut next_op = upload_queue.queued_operations.pop_front().unwrap();

debug!("starting op: {}", next_op);

// Update the counters and prepare
// Prepare upload.
match &mut next_op {
UploadOp::UploadLayer(layer, meta, mode) => {
if upload_queue
Expand All @@ -1858,18 +1814,14 @@ impl RemoteTimelineClient {
} else {
*mode = Some(OpType::MayReorder)
}
upload_queue.num_inprogress_layer_uploads += 1;
}
UploadOp::UploadMetadata { .. } => {
upload_queue.num_inprogress_metadata_uploads += 1;
}
UploadOp::UploadMetadata { .. } => {}
UploadOp::Delete(Delete { layers }) => {
for (name, meta) in layers {
upload_queue
.recently_deleted
.insert((name.clone(), meta.generation));
}
upload_queue.num_inprogress_deletions += 1;
}
UploadOp::Barrier(sender) => {
sender.send_replace(());
Expand Down Expand Up @@ -1969,6 +1921,8 @@ impl RemoteTimelineClient {

let upload_result: anyhow::Result<()> = match &task.op {
UploadOp::UploadLayer(ref layer, ref layer_metadata, mode) => {
// TODO: check if this mechanism can be removed now that can_bypass() performs
// conflict checks during scheduling.
if let Some(OpType::FlushDeletion) = mode {
if self.config.read().unwrap().block_deletions {
// Of course, this is not efficient... but usually the queue should be empty.
Expand Down Expand Up @@ -2191,13 +2145,8 @@ impl RemoteTimelineClient {
upload_queue.inprogress_tasks.remove(&task.task_id);

let lsn_update = match task.op {
UploadOp::UploadLayer(_, _, _) => {
upload_queue.num_inprogress_layer_uploads -= 1;
None
}
UploadOp::UploadLayer(_, _, _) => None,
UploadOp::UploadMetadata { ref uploaded } => {
upload_queue.num_inprogress_metadata_uploads -= 1;

// the task id is reused as a monotonicity check for storing the "clean"
// IndexPart.
let last_updater = upload_queue.clean.1;
Expand Down Expand Up @@ -2231,10 +2180,7 @@ impl RemoteTimelineClient {
None
}
}
UploadOp::Delete(_) => {
upload_queue.num_inprogress_deletions -= 1;
None
}
UploadOp::Delete(_) => None,
UploadOp::Barrier(..) | UploadOp::Shutdown => unreachable!(),
};

Expand Down Expand Up @@ -2358,9 +2304,6 @@ impl RemoteTimelineClient {
visible_remote_consistent_lsn: initialized
.visible_remote_consistent_lsn
.clone(),
num_inprogress_layer_uploads: 0,
num_inprogress_metadata_uploads: 0,
num_inprogress_deletions: 0,
inprogress_tasks: HashMap::default(),
queued_operations: VecDeque::default(),
#[cfg(feature = "testing")]
Expand All @@ -2387,14 +2330,6 @@ impl RemoteTimelineClient {
}
};

// consistency check
assert_eq!(
qi.num_inprogress_layer_uploads
+ qi.num_inprogress_metadata_uploads
+ qi.num_inprogress_deletions,
qi.inprogress_tasks.len()
);

// We don't need to do anything here for in-progress tasks. They will finish
// on their own, decrement the unfinished-task counter themselves, and observe
// that the queue is Stopped.
Expand Down Expand Up @@ -2841,8 +2776,8 @@ mod tests {
let mut guard = client.upload_queue.lock().unwrap();
let upload_queue = guard.initialized_mut().unwrap();
assert!(upload_queue.queued_operations.is_empty());
assert!(upload_queue.inprogress_tasks.len() == 2);
assert!(upload_queue.num_inprogress_layer_uploads == 2);
assert_eq!(upload_queue.inprogress_tasks.len(), 2);
assert_eq!(upload_queue.num_inprogress_layer_uploads(), 2);

// also check that `latest_file_changes` was updated
assert!(upload_queue.latest_files_changes_since_metadata_upload_scheduled == 2);
Expand Down Expand Up @@ -2912,8 +2847,8 @@ mod tests {
// Deletion schedules upload of the index file, and the file deletion itself
assert_eq!(upload_queue.queued_operations.len(), 2);
assert_eq!(upload_queue.inprogress_tasks.len(), 1);
assert_eq!(upload_queue.num_inprogress_layer_uploads, 1);
assert_eq!(upload_queue.num_inprogress_deletions, 0);
assert_eq!(upload_queue.num_inprogress_layer_uploads(), 1);
assert_eq!(upload_queue.num_inprogress_deletions(), 0);
assert_eq!(
upload_queue.latest_files_changes_since_metadata_upload_scheduled,
0
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/remote_timeline_client/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl IndexPart {

pub const FILE_NAME: &'static str = "index_part.json";

pub(crate) fn empty(metadata: TimelineMetadata) -> Self {
pub fn empty(metadata: TimelineMetadata) -> Self {
IndexPart {
version: Self::LATEST_VERSION,
layer_metadata: Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/tenant/storage_layer/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1812,7 +1812,7 @@ enum LayerKind {

/// Guard for forcing a layer be resident while it exists.
#[derive(Clone)]
pub(crate) struct ResidentLayer {
pub struct ResidentLayer {
owner: Layer,
downloaded: Arc<DownloadedLayer>,
}
Expand Down
Loading

0 comments on commit 419019e

Please sign in to comment.