diff --git a/pageserver/benches/upload_queue.rs b/pageserver/benches/upload_queue.rs index 528b3d54907a..4f463ce4d9e7 100644 --- a/pageserver/benches/upload_queue.rs +++ b/pageserver/benches/upload_queue.rs @@ -67,6 +67,7 @@ fn bench_upload_queue_next_ready(c: &mut Criterion) { task_id, retries: AtomicU32::new(0), op: delete.clone(), + coalesced_ops: Vec::new(), }), ); } diff --git a/pageserver/src/tenant/remote_timeline_client.rs b/pageserver/src/tenant/remote_timeline_client.rs index 405da14c076f..fdc9c563ac4d 100644 --- a/pageserver/src/tenant/remote_timeline_client.rs +++ b/pageserver/src/tenant/remote_timeline_client.rs @@ -1800,7 +1800,7 @@ impl RemoteTimelineClient { /// This can launch an unbounded number of queued tasks. `UploadQueue::next_ready()` also has /// worst-case quadratic cost in the number of tasks, and may struggle beyond 10,000 tasks. fn launch_queued_tasks(self: &Arc, upload_queue: &mut UploadQueueInitialized) { - while let Some(mut next_op) = upload_queue.next_ready() { + while let Some((mut next_op, coalesced_ops)) = upload_queue.next_ready() { debug!("starting op: {next_op}"); // Prepare upload. @@ -1838,6 +1838,7 @@ impl RemoteTimelineClient { let task = Arc::new(UploadTask { task_id: upload_task_id, op: next_op, + coalesced_ops, retries: AtomicU32::new(0), }); upload_queue @@ -2205,6 +2206,9 @@ impl RemoteTimelineClient { } self.metric_end(&task.op); + for coalesced_op in &task.coalesced_ops { + self.metric_end(coalesced_op); + } } fn metric_impl( diff --git a/pageserver/src/tenant/upload_queue.rs b/pageserver/src/tenant/upload_queue.rs index bd524e815344..1581a2a2924a 100644 --- a/pageserver/src/tenant/upload_queue.rs +++ b/pageserver/src/tenant/upload_queue.rs @@ -22,6 +22,11 @@ use tracing::info; static DISABLE_UPLOAD_QUEUE_REORDERING: Lazy = Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_REORDERING").as_deref() == Ok("true")); +/// Kill switch for index upload coalescing in case it causes problems. +/// TODO: remove this once we have confidence in it. +static DISABLE_UPLOAD_QUEUE_INDEX_COALESCING: Lazy = + Lazy::new(|| std::env::var("DISABLE_UPLOAD_QUEUE_INDEX_COALESCING").as_deref() == Ok("true")); + // clippy warns that Uninitialized is much smaller than Initialized, which wastes // memory for Uninitialized variants. Doesn't matter in practice, there are not // that many upload queues in a running pageserver, and most of them are initialized @@ -127,8 +132,10 @@ impl UploadQueueInitialized { /// the first operation in the queue, to avoid head-of-line blocking -- an operation can jump /// the queue if it doesn't conflict with operations ahead of it. /// + /// Also returns any operations that were coalesced into this one, e.g. multiple index uploads. + /// /// None may be returned even if the queue isn't empty, if no operations are ready yet. - pub fn next_ready(&mut self) -> Option { + pub fn next_ready(&mut self) -> Option<(UploadOp, Vec)> { // NB: this is quadratic, but queues are expected to be small. for (i, candidate) in self.queued_operations.iter().enumerate() { // If this candidate is ready, go for it. Otherwise, try the next one. @@ -142,7 +149,31 @@ impl UploadQueueInitialized { return None; } - return self.queued_operations.remove(i); + let mut op = self.queued_operations.remove(i).expect("i can't disappear"); + + // Coalesce any back-to-back index uploads by only uploading the newest one that's + // ready. This typically happens with layer/index/layer/index/... sequences, where + // the layers bypass the indexes, leaving the indexes queued. + // + // If other operations are interleaved between index uploads we don't try to + // coalesce them, since we may as well update the index concurrently with them. + // This keeps the index fresh and avoids starvation. + let mut replaced_ops = Vec::new(); + if matches!(op, UploadOp::UploadMetadata { .. }) { + while let Some(UploadOp::UploadMetadata { .. }) = self.queued_operations.get(i) + { + if *DISABLE_UPLOAD_QUEUE_INDEX_COALESCING { + break; + } + if !self.is_ready(i) { + break; + } + replaced_ops.push(op); + op = self.queued_operations.remove(i).expect("i can't disappear"); + } + } + + return Some((op, replaced_ops)); } // Nothing can bypass a barrier or shutdown. If it wasn't scheduled above, give up. @@ -216,11 +247,12 @@ impl UploadQueueInitialized { fn schedule_ready(&mut self) -> Vec> { let mut tasks = Vec::new(); // NB: schedule operations one by one, to handle conflicts with inprogress_tasks. - while let Some(op) = self.next_ready() { + while let Some((op, coalesced_ops)) = self.next_ready() { self.task_counter += 1; let task = Arc::new(UploadTask { task_id: self.task_counter, op, + coalesced_ops, retries: 0.into(), }); self.inprogress_tasks.insert(task.task_id, task.clone()); @@ -394,9 +426,13 @@ impl UploadQueue { pub struct UploadTask { /// Unique ID of this task. Used as the key in `inprogress_tasks` above. pub task_id: u64, + /// Number of task retries. pub retries: AtomicU32, - + /// The upload operation. pub op: UploadOp, + /// Any upload operations that were coalesced into this operation. This typically happens with + /// back-to-back index uploads, see `UploadQueueInitialized::next_ready()`. + pub coalesced_ops: Vec, } /// A deletion of some layers within the lifetime of a timeline. This is not used @@ -499,9 +535,8 @@ impl UploadOp { }) } - // Indexes can never bypass each other. - // TODO: we could coalesce them though, by only uploading the newest ready index. This - // is left for later, out of caution. + // Indexes can never bypass each other. They can coalesce though, and + // `UploadQueue::next_ready()` currently does this when possible. (UploadOp::UploadMetadata { .. }, UploadOp::UploadMetadata { .. }) => false, } } @@ -884,9 +919,9 @@ mod tests { Ok(()) } - /// Index uploads are serialized. + /// Index uploads are coalesced. #[test] - fn schedule_index_serial() -> anyhow::Result<()> { + fn schedule_index_coalesce() -> anyhow::Result<()> { let mut queue = UploadQueue::Uninitialized; let queue = queue.initialize_with_current_remote_index_part(&IndexPart::example())?; @@ -907,13 +942,11 @@ mod tests { queue.queued_operations.extend(ops.clone()); - // The uploads should run serially. - for op in ops { - let tasks = queue.schedule_ready(); - assert_eq!(tasks.len(), 1); - assert_same_op(&tasks[0].op, &op); - queue.complete(tasks[0].task_id); - } + // The index uploads are coalesced into a single operation. + let tasks = queue.schedule_ready(); + assert_eq!(tasks.len(), 1); + assert_same_op(&tasks[0].op, &ops[2]); + assert_same_ops(&tasks[0].coalesced_ops, &ops[0..2]); assert!(queue.queued_operations.is_empty()); @@ -972,18 +1005,14 @@ mod tests { assert_same_op(&index_tasks[0].op, &ops[1]); queue.complete(index_tasks[0].task_id); - // layer 1 completes. This unblocks index 1 then index 2. + // layer 1 completes. This unblocks index 1 and 2, which coalesce into + // a single upload for index 2. queue.complete(upload_tasks[1].task_id); - let index_tasks = queue.schedule_ready(); - assert_eq!(index_tasks.len(), 1); - assert_same_op(&index_tasks[0].op, &ops[3]); - queue.complete(index_tasks[0].task_id); - let index_tasks = queue.schedule_ready(); assert_eq!(index_tasks.len(), 1); assert_same_op(&index_tasks[0].op, &ops[5]); - queue.complete(index_tasks[0].task_id); + assert_same_ops(&index_tasks[0].coalesced_ops, &ops[3..4]); assert!(queue.queued_operations.is_empty()); @@ -1005,11 +1034,12 @@ mod tests { let index_deref = index_without(&index_upload, &layer); let ops = [ - // Initial upload. + // Initial upload, with a barrier to prevent index coalescing. UploadOp::UploadLayer(layer.clone(), layer.metadata(), None), UploadOp::UploadMetadata { uploaded: index_upload.clone(), }, + UploadOp::Barrier(tokio::sync::watch::channel(()).0), // Dereference the layer and delete it. UploadOp::UploadMetadata { uploaded: index_deref.clone(), @@ -1050,11 +1080,12 @@ mod tests { let index_ref = index_with(&index_deref, &layer); let ops = [ - // Initial upload. + // Initial upload, with a barrier to prevent index coalescing. UploadOp::UploadLayer(layer.clone(), layer.metadata(), None), UploadOp::UploadMetadata { uploaded: index_upload.clone(), }, + UploadOp::Barrier(tokio::sync::watch::channel(()).0), // Dereference the layer. UploadOp::UploadMetadata { uploaded: index_deref.clone(),