Skip to content

Commit

Permalink
L0 flush: opt-in mechanism to bypass PageCache reads and writes (#8190)
Browse files Browse the repository at this point in the history
part of #7418

# Motivation

(reproducing #7418)

When we do an `InMemoryLayer::write_to_disk`, there is a tremendous
amount of random read I/O, as deltas from the ephemeral file (written in
LSN order) are written out to the delta layer in key order.

In benchmarks (#7409) we can
see that this delta layer writing phase is substantially more expensive
than the initial ingest of data, and that within the delta layer write a
significant amount of the CPU time is spent traversing the page cache.

# High-Level Changes

Add a new mode for L0 flush that works as follows:

* Read the full ephemeral file into memory -- layers are much smaller
than total memory, so this is afforable
* Do all the random reads directly from this in memory buffer instead of
using blob IO/page cache/disk reads.
* Add a semaphore to limit how many timelines may concurrently do this
(limit peak memory).
* Make the semaphore configurable via PS config.

# Implementation Details

The new `BlobReaderRef::Slice` is a temporary hack until we can ditch
`blob_io` for `InMemoryLayer` => Plan for this is laid out in
#8183

# Correctness

The correctness of this change is quite obvious to me: we do what we did
before (`blob_io`) but read from memory instead of going to disk.

The highest bug potential is in doing owned-buffers IO. I refactored the
API a bit in preliminary PR
#8186 to make it less
error-prone, but still, careful review is requested.

# Performance

I manually measured single-client ingest performance from `pgbench -i
...`.

Full report:
https://neondatabase.notion.site/2024-06-28-benchmarking-l0-flush-performance-e98cff3807f94cb38f2054d8c818fe84?pvs=4

tl;dr:

* no speed improvements during ingest,  but
* significantly lower pressure on PS PageCache (eviction rate drops to
1/3)
  * (that's why I'm working on this)
* noticable but modestly lower CPU time

This is good enough for merging this PR because the changes require
opt-in.

We'll do more testing in staging & pre-prod.

# Stability / Monitoring

**memory consumption**: there's no _hard_ limit on max `InMemoryLayer`
size (aka "checkpoint distance") , hence there's no hard limit on the
memory allocation we do for flushing. In practice, we a) [log a
warning](https://github.com/neondatabase/neon/blob/23827c6b0d400cbb9a972d4d05d49834816c40d1/pageserver/src/tenant/timeline.rs#L5741-L5743)
when we flush oversized layers, so we'd know which tenant is to blame
and b) if we were to put a hard limit in place, we would have to decide
what to do if there is an InMemoryLayer that exceeds the limit.
It seems like a better option to guarantee a max size for frozen layer,
dependent on `checkpoint_distance`. Then limit concurrency based on
that.

**metrics**: we do have the
[flush_time_histo](https://github.com/neondatabase/neon/blob/23827c6b0d400cbb9a972d4d05d49834816c40d1/pageserver/src/tenant/timeline.rs#L3725-L3726),
but that includes the wait time for the semaphore. We could add a
separate metric for the time spent after acquiring the semaphore, so one
can infer the wait time. Seems unnecessary at this point, though.
  • Loading branch information
problame authored and VladLazar committed Jul 8, 2024
1 parent 13a2558 commit 74bfb93
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 58 deletions.
5 changes: 5 additions & 0 deletions pageserver/src/bin/pageserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,10 @@ fn start_pageserver(
background_jobs_can_start: background_jobs_barrier.clone(),
};

info!(config=?conf.l0_flush, "using l0_flush config");
let l0_flush_global_state =
pageserver::l0_flush::L0FlushGlobalState::new(conf.l0_flush.clone());

// Scan the local 'tenants/' directory and start loading the tenants
let deletion_queue_client = deletion_queue.new_client();
let tenant_manager = BACKGROUND_RUNTIME.block_on(mgr::init_tenant_mgr(
Expand All @@ -429,6 +433,7 @@ fn start_pageserver(
broker_client: broker_client.clone(),
remote_storage: remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
},
order,
shutdown_pageserver.clone(),
Expand Down
18 changes: 17 additions & 1 deletion pageserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ use utils::{
logging::LogFormat,
};

use crate::tenant::timeline::GetVectoredImpl;
use crate::tenant::vectored_blob_io::MaxVectoredReadBytes;
use crate::tenant::{config::TenantConfOpt, timeline::GetImpl};
use crate::tenant::{TENANTS_SEGMENT_NAME, TIMELINES_SEGMENT_NAME};
use crate::{disk_usage_eviction_task::DiskUsageEvictionTaskConfig, virtual_file::io_engine};
use crate::{l0_flush::L0FlushConfig, tenant::timeline::GetVectoredImpl};
use crate::{tenant::config::TenantConf, virtual_file};
use crate::{TENANT_HEATMAP_BASENAME, TENANT_LOCATION_CONFIG_NAME, TIMELINE_DELETE_MARK_SUFFIX};

Expand Down Expand Up @@ -296,6 +296,8 @@ pub struct PageServerConf {
///
/// Setting this to zero disables limits on total ephemeral layer size.
pub ephemeral_bytes_per_memory_kb: usize,

pub l0_flush: L0FlushConfig,
}

/// We do not want to store this in a PageServerConf because the latter may be logged
Expand Down Expand Up @@ -403,6 +405,8 @@ struct PageServerConfigBuilder {
image_compression: BuilderValue<Option<ImageCompressionAlgorithm>>,

ephemeral_bytes_per_memory_kb: BuilderValue<usize>,

l0_flush: BuilderValue<L0FlushConfig>,
}

impl PageServerConfigBuilder {
Expand Down Expand Up @@ -492,6 +496,7 @@ impl PageServerConfigBuilder {
image_compression: Set(DEFAULT_IMAGE_COMPRESSION),
validate_vectored_get: Set(DEFAULT_VALIDATE_VECTORED_GET),
ephemeral_bytes_per_memory_kb: Set(DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB),
l0_flush: Set(L0FlushConfig::default()),
}
}
}
Expand Down Expand Up @@ -683,6 +688,10 @@ impl PageServerConfigBuilder {
self.ephemeral_bytes_per_memory_kb = BuilderValue::Set(value);
}

pub fn l0_flush(&mut self, value: L0FlushConfig) {
self.l0_flush = BuilderValue::Set(value);
}

pub fn build(self) -> anyhow::Result<PageServerConf> {
let default = Self::default_values();

Expand Down Expand Up @@ -741,6 +750,7 @@ impl PageServerConfigBuilder {
validate_vectored_get,
image_compression,
ephemeral_bytes_per_memory_kb,
l0_flush,
}
CUSTOM LOGIC
{
Expand Down Expand Up @@ -1023,6 +1033,9 @@ impl PageServerConf {
"ephemeral_bytes_per_memory_kb" => {
builder.get_ephemeral_bytes_per_memory_kb(parse_toml_u64("ephemeral_bytes_per_memory_kb", item)? as usize)
}
"l0_flush" => {
builder.l0_flush(utils::toml_edit_ext::deserialize_item(item).context("l0_flush")?)
}
_ => bail!("unrecognized pageserver option '{key}'"),
}
}
Expand Down Expand Up @@ -1107,6 +1120,7 @@ impl PageServerConf {
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
}
}
}
Expand Down Expand Up @@ -1347,6 +1361,7 @@ background_task_maximum_delay = '334 s'
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
},
"Correct defaults should be used when no config values are provided"
);
Expand Down Expand Up @@ -1421,6 +1436,7 @@ background_task_maximum_delay = '334 s'
validate_vectored_get: defaults::DEFAULT_VALIDATE_VECTORED_GET,
image_compression: defaults::DEFAULT_IMAGE_COMPRESSION,
ephemeral_bytes_per_memory_kb: defaults::DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB,
l0_flush: L0FlushConfig::default(),
},
"Should be able to parse all basic config values correctly"
);
Expand Down
46 changes: 46 additions & 0 deletions pageserver/src/l0_flush.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
use std::{num::NonZeroUsize, sync::Arc};

use crate::tenant::ephemeral_file;

#[derive(Default, Debug, PartialEq, Eq, Clone, serde::Deserialize)]
#[serde(tag = "mode", rename_all = "kebab-case", deny_unknown_fields)]
pub enum L0FlushConfig {
#[default]
PageCached,
#[serde(rename_all = "snake_case")]
Direct { max_concurrency: NonZeroUsize },
}

#[derive(Clone)]
pub struct L0FlushGlobalState(Arc<Inner>);

pub(crate) enum Inner {
PageCached,
Direct { semaphore: tokio::sync::Semaphore },
}

impl L0FlushGlobalState {
pub fn new(config: L0FlushConfig) -> Self {
match config {
L0FlushConfig::PageCached => Self(Arc::new(Inner::PageCached)),
L0FlushConfig::Direct { max_concurrency } => {
let semaphore = tokio::sync::Semaphore::new(max_concurrency.get());
Self(Arc::new(Inner::Direct { semaphore }))
}
}
}

pub(crate) fn inner(&self) -> &Arc<Inner> {
&self.0
}
}

impl L0FlushConfig {
pub(crate) fn prewarm_on_write(&self) -> ephemeral_file::PrewarmPageCacheOnWrite {
use L0FlushConfig::*;
match self {
PageCached => ephemeral_file::PrewarmPageCacheOnWrite::Yes,
Direct { .. } => ephemeral_file::PrewarmPageCacheOnWrite::No,
}
}
}
1 change: 1 addition & 0 deletions pageserver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod deletion_queue;
pub mod disk_usage_eviction_task;
pub mod http;
pub mod import_datadir;
pub mod l0_flush;
pub use pageserver_api::keyspace;
pub mod aux_file;
pub mod metrics;
Expand Down
13 changes: 13 additions & 0 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ use crate::deletion_queue::DeletionQueueClient;
use crate::deletion_queue::DeletionQueueError;
use crate::import_datadir;
use crate::is_uninit_mark;
use crate::l0_flush::L0FlushGlobalState;
use crate::metrics::TENANT;
use crate::metrics::{
remove_tenant_metrics, BROKEN_TENANTS_SET, TENANT_STATE_METRIC, TENANT_SYNTHETIC_SIZE_METRIC,
Expand Down Expand Up @@ -166,6 +167,7 @@ pub struct TenantSharedResources {
pub broker_client: storage_broker::BrokerClientChannel,
pub remote_storage: GenericRemoteStorage,
pub deletion_queue_client: DeletionQueueClient,
pub l0_flush_global_state: L0FlushGlobalState,
}

/// A [`Tenant`] is really an _attached_ tenant. The configuration
Expand Down Expand Up @@ -294,6 +296,8 @@ pub struct Tenant {

/// An ongoing timeline detach must be checked during attempts to GC or compact a timeline.
ongoing_timeline_detach: std::sync::Mutex<Option<(TimelineId, utils::completion::Barrier)>>,

l0_flush_global_state: L0FlushGlobalState,
}

impl std::fmt::Debug for Tenant {
Expand Down Expand Up @@ -676,6 +680,7 @@ impl Tenant {
broker_client,
remote_storage,
deletion_queue_client,
l0_flush_global_state,
} = resources;

let attach_mode = attached_conf.location.attach_mode;
Expand All @@ -690,6 +695,7 @@ impl Tenant {
tenant_shard_id,
remote_storage.clone(),
deletion_queue_client,
l0_flush_global_state,
));

// The attach task will carry a GateGuard, so that shutdown() reliably waits for it to drop out if
Expand Down Expand Up @@ -989,6 +995,7 @@ impl Tenant {
TimelineResources {
remote_client,
timeline_get_throttle: self.timeline_get_throttle.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
},
ctx,
)
Expand Down Expand Up @@ -2478,6 +2485,7 @@ impl Tenant {
tenant_shard_id: TenantShardId,
remote_storage: GenericRemoteStorage,
deletion_queue_client: DeletionQueueClient,
l0_flush_global_state: L0FlushGlobalState,
) -> Tenant {
debug_assert!(
!attached_conf.location.generation.is_none() || conf.control_plane_api.is_none()
Expand Down Expand Up @@ -2565,6 +2573,7 @@ impl Tenant {
)),
tenant_conf: Arc::new(ArcSwap::from_pointee(attached_conf)),
ongoing_timeline_detach: std::sync::Mutex::default(),
l0_flush_global_state,
}
}

Expand Down Expand Up @@ -3302,6 +3311,7 @@ impl Tenant {
TimelineResources {
remote_client,
timeline_get_throttle: self.timeline_get_throttle.clone(),
l0_flush_global_state: self.l0_flush_global_state.clone(),
}
}

Expand Down Expand Up @@ -3638,6 +3648,7 @@ pub(crate) mod harness {
use utils::logging;

use crate::deletion_queue::mock::MockDeletionQueue;
use crate::l0_flush::L0FlushConfig;
use crate::walredo::apply_neon;
use crate::{repository::Key, walrecord::NeonWalRecord};

Expand Down Expand Up @@ -3827,6 +3838,8 @@ pub(crate) mod harness {
self.tenant_shard_id,
self.remote_storage.clone(),
self.deletion_queue.new_client(),
// TODO: ideally we should run all unit tests with both configs
L0FlushGlobalState::new(L0FlushConfig::default()),
));

let preload = tenant
Expand Down
22 changes: 22 additions & 0 deletions pageserver/src/tenant/block_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ where
pub enum BlockLease<'a> {
PageReadGuard(PageReadGuard<'static>),
EphemeralFileMutableTail(&'a [u8; PAGE_SZ]),
Slice(&'a [u8; PAGE_SZ]),
#[cfg(test)]
Arc(std::sync::Arc<[u8; PAGE_SZ]>),
#[cfg(test)]
Expand All @@ -63,6 +64,7 @@ impl<'a> Deref for BlockLease<'a> {
match self {
BlockLease::PageReadGuard(v) => v.deref(),
BlockLease::EphemeralFileMutableTail(v) => v,
BlockLease::Slice(v) => v,
#[cfg(test)]
BlockLease::Arc(v) => v.deref(),
#[cfg(test)]
Expand All @@ -81,6 +83,7 @@ pub(crate) enum BlockReaderRef<'a> {
FileBlockReader(&'a FileBlockReader<'a>),
EphemeralFile(&'a EphemeralFile),
Adapter(Adapter<&'a DeltaLayerInner>),
Slice(&'a [u8]),
#[cfg(test)]
TestDisk(&'a super::disk_btree::tests::TestDisk),
#[cfg(test)]
Expand All @@ -99,6 +102,7 @@ impl<'a> BlockReaderRef<'a> {
FileBlockReader(r) => r.read_blk(blknum, ctx).await,
EphemeralFile(r) => r.read_blk(blknum, ctx).await,
Adapter(r) => r.read_blk(blknum, ctx).await,
Slice(s) => Self::read_blk_slice(s, blknum),
#[cfg(test)]
TestDisk(r) => r.read_blk(blknum),
#[cfg(test)]
Expand All @@ -107,6 +111,24 @@ impl<'a> BlockReaderRef<'a> {
}
}

impl<'a> BlockReaderRef<'a> {
fn read_blk_slice(slice: &[u8], blknum: u32) -> std::io::Result<BlockLease> {
let start = (blknum as usize).checked_mul(PAGE_SZ).unwrap();
let end = start.checked_add(PAGE_SZ).unwrap();
if end > slice.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("slice too short, len={} end={}", slice.len(), end),
));
}
let slice = &slice[start..end];
let page_sized: &[u8; PAGE_SZ] = slice
.try_into()
.expect("we add PAGE_SZ to start, so the slice must have PAGE_SZ");
Ok(BlockLease::Slice(page_sized))
}
}

///
/// A "cursor" for efficiently reading multiple pages from a BlockReader
///
Expand Down
8 changes: 7 additions & 1 deletion pageserver/src/tenant/ephemeral_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct EphemeralFile {
}

mod page_caching;
pub(crate) use page_caching::PrewarmOnWrite as PrewarmPageCacheOnWrite;
mod zero_padded_read_write;

impl EphemeralFile {
Expand Down Expand Up @@ -53,7 +54,7 @@ impl EphemeralFile {
Ok(EphemeralFile {
_tenant_shard_id: tenant_shard_id,
_timeline_id: timeline_id,
rw: page_caching::RW::new(file),
rw: page_caching::RW::new(file, conf.l0_flush.prewarm_on_write()),
})
}

Expand All @@ -65,6 +66,11 @@ impl EphemeralFile {
self.rw.page_cache_file_id()
}

/// See [`self::page_caching::RW::load_to_vec`].
pub(crate) async fn load_to_vec(&self, ctx: &RequestContext) -> Result<Vec<u8>, io::Error> {
self.rw.load_to_vec(ctx).await
}

pub(crate) async fn read_blk(
&self,
blknum: u32,
Expand Down
Loading

0 comments on commit 74bfb93

Please sign in to comment.