diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index ba5b2608bdf1..39d4e46c9663 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -421,6 +421,10 @@ fn start_pageserver( background_jobs_can_start: background_jobs_barrier.clone(), }; + info!(config=?conf.l0_flush, "using l0_flush config"); + let l0_flush_global_state = + pageserver::l0_flush::L0FlushGlobalState::new(conf.l0_flush.clone()); + // Scan the local 'tenants/' directory and start loading the tenants let deletion_queue_client = deletion_queue.new_client(); let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr( @@ -429,6 +433,7 @@ fn start_pageserver( broker_client: broker_client.clone(), remote_storage: remote_storage.clone(), deletion_queue_client, + l0_flush_global_state, }, order, shutdown_pageserver.clone(), diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 470e941c33f9..fa7f7d8d97c0 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -30,11 +30,11 @@ use utils::{ logging::LogFormat, }; -use crate::tenant::timeline::GetVectoredImpl; use crate::tenant::vectored_blob_io::MaxVectoredReadBytes; use crate::tenant::{config::TenantConfOpt, timeline::GetImpl}; use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME}; use crate::{disk_usage_eviction_task::DiskUsageEvictionTaskConfig, virtual_file::io_engine}; +use crate::{l0_flush::L0FlushConfig, tenant::timeline::GetVectoredImpl}; use crate::{tenant::config::TenantConf, virtual_file}; use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX}; @@ -296,6 +296,8 @@ pub struct PageServerConf { /// /// Setting this to zero disables limits on total ephemeral layer size. pub ephemeral_bytes_per_memory_kb: usize, + + pub l0_flush: L0FlushConfig, } /// We do not want to store this in a PageServerConf because the latter may be logged @@ -403,6 +405,8 @@ struct PageServerConfigBuilder { image_compression: BuilderValue>, ephemeral_bytes_per_memory_kb: BuilderValue, + + l0_flush: BuilderValue, } impl PageServerConfigBuilder { @@ -492,6 +496,7 @@ impl PageServerConfigBuilder { image_compression: Set(DEFAULT_IMAGE_COMPRESSION), validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET), ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB), + l0_flush: Set(L0FlushConfig::default()), } } } @@ -683,6 +688,10 @@ impl PageServerConfigBuilder { self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value); } + pub fn l0_flush(&mut self, value: L0FlushConfig) { + self.l0_flush = BuilderValue::Set(value); + } + pub fn build(self) -> anyhow::Result { let default = Self::default_values(); @@ -741,6 +750,7 @@ impl PageServerConfigBuilder { validate_vectored_get, image_compression, ephemeral_bytes_per_memory_kb, + l0_flush, } CUSTOM LOGIC { @@ -1023,6 +1033,9 @@ impl PageServerConf { "ephemeral_bytes_per_memory_kb" => { builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize) } + "l0_flush" => { + builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?) + } _ => bail!("unrecognized pageserver option '{key}'"), } } @@ -1107,6 +1120,7 @@ impl PageServerConf { image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, + l0_flush: L0FlushConfig::default(), } } } @@ -1347,6 +1361,7 @@ background_task_maximum_delay = '334 s' validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, + l0_flush: L0FlushConfig::default(), }, "Correct defaults should be used when no config values are provided" ); @@ -1421,6 +1436,7 @@ background_task_maximum_delay = '334 s' validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET, image_compression: defaults::DEFAULT_IMAGE_COMPRESSION, ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB, + l0_flush: L0FlushConfig::default(), }, "Should be able to parse all basic config values correctly" ); diff --git a/pageserver/src/l0_flush.rs b/pageserver/src/l0_flush.rs new file mode 100644 index 000000000000..7fe8fedc6394 --- /dev/null +++ b/pageserver/src/l0_flush.rs @@ -0,0 +1,46 @@ +use std::{num::NonZeroUsize, sync::Arc}; + +use crate::tenant::ephemeral_file; + +#[derive(Default, Debug, PartialEq, Eq, Clone, serde::Deserialize)] +#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)] +pub enum L0FlushConfig { + #[default] + PageCached, + #[serde(rename_all = "snake_case")] + Direct { max_concurrency: NonZeroUsize }, +} + +#[derive(Clone)] +pub struct L0FlushGlobalState(Arc); + +pub(crate) enum Inner { + PageCached, + Direct { semaphore: tokio::sync::Semaphore }, +} + +impl L0FlushGlobalState { + pub fn new(config: L0FlushConfig) -> Self { + match config { + L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)), + L0FlushConfig::Direct { max_concurrency } => { + let semaphore = tokio::sync::Semaphore::new(max_concurrency.get()); + Self(Arc::new(Inner::Direct { semaphore })) + } + } + } + + pub(crate) fn inner(&self) -> &Arc { + &self.0 + } +} + +impl L0FlushConfig { + pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite { + use L0FlushConfig::*; + match self { + PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes, + Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No, + } + } +} diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index 353f97264c5f..ac6b9b4f2a60 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -11,6 +11,7 @@ pub mod deletion_queue; pub mod disk_usage_eviction_task; pub mod http; pub mod import_datadir; +pub mod l0_flush; pub use pageserver_api::keyspace; pub mod aux_file; pub mod metrics; diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 116481a1ebbb..89bf89471cef 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -73,6 +73,7 @@ use crate::deletion_queue::DeletionQueueClient; use crate::deletion_queue::DeletionQueueError; use crate::import_datadir; use crate::is_uninit_mark; +use crate::l0_flush::L0FlushGlobalState; use crate::metrics::TENANT; use crate::metrics::{ remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC, @@ -166,6 +167,7 @@ pub struct TenantSharedResources { pub broker_client: storage_broker::BrokerClientChannel, pub remote_storage: GenericRemoteStorage, pub deletion_queue_client: DeletionQueueClient, + pub l0_flush_global_state: L0FlushGlobalState, } /// A [`Tenant`] is really an _attached_ tenant. The configuration @@ -294,6 +296,8 @@ pub struct Tenant { /// An ongoing timeline detach must be checked during attempts to GC or compact a timeline. ongoing_timeline_detach: std::sync::Mutex>, + + l0_flush_global_state: L0FlushGlobalState, } impl std::fmt::Debug for Tenant { @@ -676,6 +680,7 @@ impl Tenant { broker_client, remote_storage, deletion_queue_client, + l0_flush_global_state, } = resources; let attach_mode = attached_conf.location.attach_mode; @@ -690,6 +695,7 @@ impl Tenant { tenant_shard_id, remote_storage.clone(), deletion_queue_client, + l0_flush_global_state, )); // The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if @@ -989,6 +995,7 @@ impl Tenant { TimelineResources { remote_client, timeline_get_throttle: self.timeline_get_throttle.clone(), + l0_flush_global_state: self.l0_flush_global_state.clone(), }, ctx, ) @@ -2478,6 +2485,7 @@ impl Tenant { tenant_shard_id: TenantShardId, remote_storage: GenericRemoteStorage, deletion_queue_client: DeletionQueueClient, + l0_flush_global_state: L0FlushGlobalState, ) -> Tenant { debug_assert!( !attached_conf.location.generation.is_none() || conf.control_plane_api.is_none() @@ -2565,6 +2573,7 @@ impl Tenant { )), tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)), ongoing_timeline_detach: std::sync::Mutex::default(), + l0_flush_global_state, } } @@ -3302,6 +3311,7 @@ impl Tenant { TimelineResources { remote_client, timeline_get_throttle: self.timeline_get_throttle.clone(), + l0_flush_global_state: self.l0_flush_global_state.clone(), } } @@ -3638,6 +3648,7 @@ pub(crate) mod harness { use utils::logging; use crate::deletion_queue::mock::MockDeletionQueue; + use crate::l0_flush::L0FlushConfig; use crate::walredo::apply_neon; use crate::{repository::Key, walrecord::NeonWalRecord}; @@ -3827,6 +3838,8 @@ pub(crate) mod harness { self.tenant_shard_id, self.remote_storage.clone(), self.deletion_queue.new_client(), + // TODO: ideally we should run all unit tests with both configs + L0FlushGlobalState::new(L0FlushConfig::default()), )); let preload = tenant diff --git a/pageserver/src/tenant/block_io.rs b/pageserver/src/tenant/block_io.rs index b406d5033243..85f3b1c79942 100644 --- a/pageserver/src/tenant/block_io.rs +++ b/pageserver/src/tenant/block_io.rs @@ -37,6 +37,7 @@ where pub enum BlockLease<'a> { PageReadGuard(PageReadGuard<'static>), EphemeralFileMutableTail(&'a [u8; PAGE_SZ]), + Slice(&'a [u8; PAGE_SZ]), #[cfg(test)] Arc(std::sync::Arc<[u8; PAGE_SZ]>), #[cfg(test)] @@ -63,6 +64,7 @@ impl<'a> Deref for BlockLease<'a> { match self { BlockLease::PageReadGuard(v) => v.deref(), BlockLease::EphemeralFileMutableTail(v) => v, + BlockLease::Slice(v) => v, #[cfg(test)] BlockLease::Arc(v) => v.deref(), #[cfg(test)] @@ -81,6 +83,7 @@ pub(crate) enum BlockReaderRef<'a> { FileBlockReader(&'a FileBlockReader<'a>), EphemeralFile(&'a EphemeralFile), Adapter(Adapter<&'a DeltaLayerInner>), + Slice(&'a [u8]), #[cfg(test)] TestDisk(&'a super::disk_btree::tests::TestDisk), #[cfg(test)] @@ -99,6 +102,7 @@ impl<'a> BlockReaderRef<'a> { FileBlockReader(r) => r.read_blk(blknum, ctx).await, EphemeralFile(r) => r.read_blk(blknum, ctx).await, Adapter(r) => r.read_blk(blknum, ctx).await, + Slice(s) => Self::read_blk_slice(s, blknum), #[cfg(test)] TestDisk(r) => r.read_blk(blknum), #[cfg(test)] @@ -107,6 +111,24 @@ impl<'a> BlockReaderRef<'a> { } } +impl<'a> BlockReaderRef<'a> { + fn read_blk_slice(slice: &[u8], blknum: u32) -> std::io::Result { + let start = (blknum as usize).checked_mul(PAGE_SZ).unwrap(); + let end = start.checked_add(PAGE_SZ).unwrap(); + if end > slice.len() { + return Err(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + format!("slice too short, len={} end={}", slice.len(), end), + )); + } + let slice = &slice[start..end]; + let page_sized: &[u8; PAGE_SZ] = slice + .try_into() + .expect("we add PAGE_SZ to start, so the slice must have PAGE_SZ"); + Ok(BlockLease::Slice(page_sized)) + } +} + /// /// A "cursor" for efficiently reading multiple pages from a BlockReader /// diff --git a/pageserver/src/tenant/ephemeral_file.rs b/pageserver/src/tenant/ephemeral_file.rs index 79cc7bf15373..bb65ae24fc5e 100644 --- a/pageserver/src/tenant/ephemeral_file.rs +++ b/pageserver/src/tenant/ephemeral_file.rs @@ -21,6 +21,7 @@ pub struct EphemeralFile { } mod page_caching; +pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite; mod zero_padded_read_write; impl EphemeralFile { @@ -53,7 +54,7 @@ impl EphemeralFile { Ok(EphemeralFile { _tenant_shard_id: tenant_shard_id, _timeline_id: timeline_id, - rw: page_caching::RW::new(file), + rw: page_caching::RW::new(file, conf.l0_flush.prewarm_on_write()), }) } @@ -65,6 +66,11 @@ impl EphemeralFile { self.rw.page_cache_file_id() } + /// See [`self::page_caching::RW::load_to_vec`]. + pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result, io::Error> { + self.rw.load_to_vec(ctx).await + } + pub(crate) async fn read_blk( &self, blknum: u32, diff --git a/pageserver/src/tenant/ephemeral_file/page_caching.rs b/pageserver/src/tenant/ephemeral_file/page_caching.rs index 276ac8706493..43b9fff28d98 100644 --- a/pageserver/src/tenant/ephemeral_file/page_caching.rs +++ b/pageserver/src/tenant/ephemeral_file/page_caching.rs @@ -8,6 +8,7 @@ use crate::virtual_file::VirtualFile; use once_cell::sync::Lazy; use std::io::{self, ErrorKind}; +use std::ops::{Deref, Range}; use tokio_epoll_uring::BoundedBuf; use tracing::*; @@ -19,14 +20,23 @@ pub struct RW { rw: super::zero_padded_read_write::RW, } +/// When we flush a block to the underlying [`crate::virtual_file::VirtualFile`], +/// should we pre-warm the [`crate::page_cache`] with the contents? +#[derive(Clone, Copy)] +pub enum PrewarmOnWrite { + Yes, + No, +} + impl RW { - pub fn new(file: VirtualFile) -> Self { + pub fn new(file: VirtualFile, prewarm_on_write: PrewarmOnWrite) -> Self { let page_cache_file_id = page_cache::next_file_id(); Self { page_cache_file_id, rw: super::zero_padded_read_write::RW::new(PreWarmingWriter::new( page_cache_file_id, file, + prewarm_on_write, )), } } @@ -49,6 +59,43 @@ impl RW { self.rw.bytes_written() } + /// Load all blocks that can be read via [`Self::read_blk`] into a contiguous memory buffer. + /// + /// This includes the blocks that aren't yet flushed to disk by the internal buffered writer. + /// The last block is zero-padded to [`PAGE_SZ`], so, the returned buffer is always a multiple of [`PAGE_SZ`]. + pub(super) async fn load_to_vec(&self, ctx: &RequestContext) -> Result, io::Error> { + // round up to the next PAGE_SZ multiple, required by blob_io + let size = { + let s = usize::try_from(self.bytes_written()).unwrap(); + if s % PAGE_SZ == 0 { + s + } else { + s.checked_add(PAGE_SZ - (s % PAGE_SZ)).unwrap() + } + }; + let vec = Vec::with_capacity(size); + + // read from disk what we've already flushed + let writer = self.rw.as_writer(); + let flushed_range = writer.written_range(); + let mut vec = writer + .file + .read_exact_at( + vec.slice(0..(flushed_range.end - flushed_range.start)), + u64::try_from(flushed_range.start).unwrap(), + ctx, + ) + .await? + .into_inner(); + + // copy from in-memory buffer what we haven't flushed yet but would return when accessed via read_blk + let buffered = self.rw.get_tail_zero_padded(); + vec.extend_from_slice(buffered); + assert_eq!(vec.len(), size); + assert_eq!(vec.len() % PAGE_SZ, 0); + Ok(vec) + } + pub(crate) async fn read_blk( &self, blknum: u32, @@ -116,19 +163,40 @@ impl Drop for RW { } struct PreWarmingWriter { + prewarm_on_write: PrewarmOnWrite, nwritten_blocks: u32, page_cache_file_id: page_cache::FileId, file: VirtualFile, } impl PreWarmingWriter { - fn new(page_cache_file_id: page_cache::FileId, file: VirtualFile) -> Self { + fn new( + page_cache_file_id: page_cache::FileId, + file: VirtualFile, + prewarm_on_write: PrewarmOnWrite, + ) -> Self { Self { + prewarm_on_write, nwritten_blocks: 0, page_cache_file_id, file, } } + + /// Return the byte range within `file` that has been written though `write_all`. + /// + /// The returned range would be invalidated by another `write_all`. To prevent that, we capture `&_`. + fn written_range(&self) -> (impl Deref> + '_) { + let nwritten_blocks = usize::try_from(self.nwritten_blocks).unwrap(); + struct Wrapper(Range); + impl Deref for Wrapper { + type Target = Range; + fn deref(&self) -> &Range { + &self.0 + } + } + Wrapper(0..nwritten_blocks * PAGE_SZ) + } } impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmingWriter { @@ -178,45 +246,51 @@ impl crate::virtual_file::owned_buffers_io::write::OwnedAsyncWriter for PreWarmi assert_eq!(&check_bounds_stuff_works, &*buf); } - // Pre-warm page cache with the contents. - // At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming - // benefits the code that writes InMemoryLayer=>L0 layers. let nblocks = buflen / PAGE_SZ; let nblocks32 = u32::try_from(nblocks).unwrap(); - let cache = page_cache::get(); - static CTX: Lazy = Lazy::new(|| { - RequestContext::new( - crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache, - crate::context::DownloadBehavior::Error, - ) - }); - for blknum_in_buffer in 0..nblocks { - let blk_in_buffer = &buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ]; - let blknum = self - .nwritten_blocks - .checked_add(blknum_in_buffer as u32) - .unwrap(); - match cache - .read_immutable_buf(self.page_cache_file_id, blknum, &CTX) - .await - { - Err(e) => { - error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}"); - // fail gracefully, it's not the end of the world if we can't pre-warm the cache here - } - Ok(v) => match v { - page_cache::ReadBufResult::Found(_guard) => { - // This function takes &mut self, so, it shouldn't be possible to reach this point. - unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \ - and this function takes &mut self, so, no concurrent read_blk is possible"); - } - page_cache::ReadBufResult::NotFound(mut write_guard) => { - write_guard.copy_from_slice(blk_in_buffer); - let _ = write_guard.mark_valid(); + + if matches!(self.prewarm_on_write, PrewarmOnWrite::Yes) { + // Pre-warm page cache with the contents. + // At least in isolated bulk ingest benchmarks (test_bulk_insert.py), the pre-warming + // benefits the code that writes InMemoryLayer=>L0 layers. + + let cache = page_cache::get(); + static CTX: Lazy = Lazy::new(|| { + RequestContext::new( + crate::task_mgr::TaskKind::EphemeralFilePreWarmPageCache, + crate::context::DownloadBehavior::Error, + ) + }); + for blknum_in_buffer in 0..nblocks { + let blk_in_buffer = + &buf[blknum_in_buffer * PAGE_SZ..(blknum_in_buffer + 1) * PAGE_SZ]; + let blknum = self + .nwritten_blocks + .checked_add(blknum_in_buffer as u32) + .unwrap(); + match cache + .read_immutable_buf(self.page_cache_file_id, blknum, &CTX) + .await + { + Err(e) => { + error!("ephemeral_file write_blob failed to get immutable buf to pre-warm page cache: {e:?}"); + // fail gracefully, it's not the end of the world if we can't pre-warm the cache here } - }, + Ok(v) => match v { + page_cache::ReadBufResult::Found(_guard) => { + // This function takes &mut self, so, it shouldn't be possible to reach this point. + unreachable!("we just wrote block {blknum} to the VirtualFile, which is owned by Self, \ + and this function takes &mut self, so, no concurrent read_blk is possible"); + } + page_cache::ReadBufResult::NotFound(mut write_guard) => { + write_guard.copy_from_slice(blk_in_buffer); + let _ = write_guard.mark_valid(); + } + }, + } } } + self.nwritten_blocks = self.nwritten_blocks.checked_add(nblocks32).unwrap(); Ok((buflen, buf.into_inner())) } diff --git a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs index b37eafb52c5b..fe310acab888 100644 --- a/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs +++ b/pageserver/src/tenant/ephemeral_file/zero_padded_read_write.rs @@ -75,6 +75,21 @@ where flushed_offset + u64::try_from(buffer.pending()).unwrap() } + /// Get a slice of all blocks that [`Self::read_blk`] would return as [`ReadResult::ServedFromZeroPaddedMutableTail`]. + pub fn get_tail_zero_padded(&self) -> &[u8] { + let buffer: &zero_padded::Buffer = self.buffered_writer.inspect_buffer(); + let buffer_written_up_to = buffer.pending(); + // pad to next page boundary + let read_up_to = if buffer_written_up_to % PAGE_SZ == 0 { + buffer_written_up_to + } else { + buffer_written_up_to + .checked_add(PAGE_SZ - (buffer_written_up_to % PAGE_SZ)) + .unwrap() + }; + &buffer.as_zero_padded_slice()[0..read_up_to] + } + pub(crate) async fn read_blk(&self, blknum: u32) -> Result, std::io::Error> { let flushed_offset = self.buffered_writer.as_inner().bytes_written(); let buffer: &zero_padded::Buffer = self.buffered_writer.inspect_buffer(); diff --git a/pageserver/src/tenant/storage_layer/inmemory_layer.rs b/pageserver/src/tenant/storage_layer/inmemory_layer.rs index 6624fb7e6ba5..e1eaea90af57 100644 --- a/pageserver/src/tenant/storage_layer/inmemory_layer.rs +++ b/pageserver/src/tenant/storage_layer/inmemory_layer.rs @@ -6,13 +6,14 @@ //! use crate::config::PageServerConf; use crate::context::{PageContentKind, RequestContext, RequestContextBuilder}; +use crate::page_cache::PAGE_SZ; use crate::repository::{Key, Value}; -use crate::tenant::block_io::BlockReader; +use crate::tenant::block_io::{BlockCursor, BlockReader, BlockReaderRef}; use crate::tenant::ephemeral_file::EphemeralFile; use crate::tenant::storage_layer::ValueReconstructResult; use crate::tenant::timeline::GetVectoredError; use crate::tenant::{PageReconstructError, Timeline}; -use crate::{page_cache, walrecord}; +use crate::{l0_flush, page_cache, walrecord}; use anyhow::{anyhow, ensure, Result}; use pageserver_api::keyspace::KeySpace; use pageserver_api::models::InMemoryLayerInfo; @@ -410,6 +411,7 @@ impl InMemoryLayer { continue; } + // TODO: this uses the page cache => https://github.com/neondatabase/neon/issues/8183 let buf = reader.read_blob(block_read.block_offset, &ctx).await; if let Err(e) = buf { reconstruct_state @@ -620,6 +622,13 @@ impl InMemoryLayer { // rare though, so we just accept the potential latency hit for now. let inner = self.inner.read().await; + let l0_flush_global_state = timeline.l0_flush_global_state.inner().clone(); + use l0_flush::Inner; + let _concurrency_permit = match &*l0_flush_global_state { + Inner::PageCached => None, + Inner::Direct { semaphore, .. } => Some(semaphore.acquire().await), + }; + let end_lsn = *self.end_lsn.get().unwrap(); let key_count = if let Some(key_range) = key_range { @@ -645,28 +654,77 @@ impl InMemoryLayer { ) .await?; - let mut buf = Vec::new(); - - let cursor = inner.file.block_cursor(); + match &*l0_flush_global_state { + l0_flush::Inner::PageCached => { + let ctx = RequestContextBuilder::extend(ctx) + .page_content_kind(PageContentKind::InMemoryLayer) + .build(); + + let mut buf = Vec::new(); + + let cursor = inner.file.block_cursor(); + + for (key, vec_map) in inner.index.iter() { + // Write all page versions + for (lsn, pos) in vec_map.as_slice() { + cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?; + let will_init = Value::des(&buf)?.will_init(); + let res; + (buf, res) = delta_layer_writer + .put_value_bytes(*key, *lsn, buf, will_init, &ctx) + .await; + res?; + } + } + } + l0_flush::Inner::Direct { .. } => { + let file_contents: Vec = inner.file.load_to_vec(ctx).await?; + assert_eq!( + file_contents.len() % PAGE_SZ, + 0, + "needed by BlockReaderRef::Slice" + ); + assert_eq!(file_contents.len(), { + let written = usize::try_from(inner.file.len()).unwrap(); + if written % PAGE_SZ == 0 { + written + } else { + written.checked_add(PAGE_SZ - (written % PAGE_SZ)).unwrap() + } + }); + + let cursor = BlockCursor::new(BlockReaderRef::Slice(&file_contents)); + + let mut buf = Vec::new(); + + for (key, vec_map) in inner.index.iter() { + // Write all page versions + for (lsn, pos) in vec_map.as_slice() { + // TODO: once we have blob lengths in the in-memory index, we can + // 1. get rid of the blob_io / BlockReaderRef::Slice business and + // 2. load the file contents into a Bytes and + // 3. the use `Bytes::slice` to get the `buf` that is our blob + // 4. pass that `buf` into `put_value_bytes` + // => https://github.com/neondatabase/neon/issues/8183 + cursor.read_blob_into_buf(*pos, &mut buf, ctx).await?; + let will_init = Value::des(&buf)?.will_init(); + let res; + (buf, res) = delta_layer_writer + .put_value_bytes(*key, *lsn, buf, will_init, ctx) + .await; + res?; + } + } - let ctx = RequestContextBuilder::extend(ctx) - .page_content_kind(PageContentKind::InMemoryLayer) - .build(); - for (key, vec_map) in inner.index.iter() { - // Write all page versions - for (lsn, pos) in vec_map.as_slice() { - cursor.read_blob_into_buf(*pos, &mut buf, &ctx).await?; - let will_init = Value::des(&buf)?.will_init(); - let res; - (buf, res) = delta_layer_writer - .put_value_bytes(*key, *lsn, buf, will_init, &ctx) - .await; - res?; + // Hold the permit until the IO is done; if we didn't, one could drop this future, + // thereby releasing the permit, but the Vec remains allocated until the IO completes. + // => we'd have more concurrenct Vec than allowed as per the semaphore. + drop(_concurrency_permit); } } // MAX is used here because we identify L0 layers by full key range - let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, &ctx).await?; + let delta_layer = delta_layer_writer.finish(Key::MAX, timeline, ctx).await?; Ok(Some(delta_layer)) } } diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index ec94ed3a56db..de9361d72103 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -65,7 +65,6 @@ use std::{ ops::{Deref, Range}, }; -use crate::metrics::GetKind; use crate::pgdatadir_mapping::MAX_AUX_FILE_V2_DELTAS; use crate::{ aux_file::AuxFileSizeEstimator, @@ -90,6 +89,10 @@ use crate::{ use crate::{ disk_usage_eviction_task::EvictionCandidate, tenant::storage_layer::delta_layer::DeltaEntry, }; +use crate::{ + l0_flush::{self, L0FlushGlobalState}, + metrics::GetKind, +}; use crate::{ metrics::ScanLatencyOngoingRecording, tenant::timeline::logical_size::CurrentLogicalSize, }; @@ -208,6 +211,7 @@ pub struct TimelineResources { pub timeline_get_throttle: Arc< crate::tenant::throttle::Throttle<&'static crate::metrics::tenant_throttling::TimelineGet>, >, + pub l0_flush_global_state: l0_flush::L0FlushGlobalState, } pub(crate) struct AuxFilesState { @@ -433,6 +437,8 @@ pub struct Timeline { /// in the future, add `extra_test_sparse_keyspace` if necessary. #[cfg(test)] pub(crate) extra_test_dense_keyspace: ArcSwap, + + pub(crate) l0_flush_global_state: L0FlushGlobalState, } pub struct WalReceiverInfo { @@ -2392,6 +2398,8 @@ impl Timeline { #[cfg(test)] extra_test_dense_keyspace: ArcSwap::new(Arc::new(KeySpace::default())), + + l0_flush_global_state: resources.l0_flush_global_state, }; result.repartition_threshold = result.get_checkpoint_distance() / REPARTITION_FREQ_IN_CHECKPOINT_DISTANCE; diff --git a/pageserver/src/tenant/timeline/delete.rs b/pageserver/src/tenant/timeline/delete.rs index 6d747d424dde..b0088f4ea228 100644 --- a/pageserver/src/tenant/timeline/delete.rs +++ b/pageserver/src/tenant/timeline/delete.rs @@ -272,6 +272,7 @@ impl DeleteTimelineFlow { TimelineResources { remote_client, timeline_get_throttle: tenant.timeline_get_throttle.clone(), + l0_flush_global_state: tenant.l0_flush_global_state.clone(), }, // Important. We dont pass ancestor above because it can be missing. // Thus we need to skip the validation here.