Skip to content

Commit

Permalink
pageserver: separate metadata and data pages in DatadirModification (#…
Browse files Browse the repository at this point in the history
…8621)

## Problem

Currently, DatadirModification keeps a key-indexed map of all pending
writes, even though we (almost) never need to read back dirty pages for
anything other than metadata pages (e.g. relation sizes).

Related: #6345

## Summary of changes

- commit() modifications before ingesting database creation wal records,
so that they are guaranteed to be able to get() everything they need
directly from the underlying Timeline.
- Split dirty pages in DatadirModification into pending_metadata_pages
and pending_data_pages. The data ones don't need to be in a
key-addressable format, so they just go in a Vec instead.
- Special case handling of zero-page writes in DatadirModification,
putting them in a map which is flushed on the end of a WAL record. This
handles the case where during ingest, we might first write a zero page,
and then ingest a postgres write to that page. We used to do this via
the key-indexed map of writes, but in this PR we change the data page
write path to not bother indexing these by key.

My least favorite thing about this PR is that I needed to change the
DatadirModification interface to add the on_record_end call. This is not
very invasive because there's really only one place we use it, but it
changes the object's behaviour from being clearly an aggregation of many
records to having some per-record state. I could avoid this by
implicitly doing the work when someone calls set_lsn or commit -- I'm
open to opinions on whether that's cleaner or dirtier.

## Performance

There may be some efficiency improvement here, but the primary
motivation is to enable an earlier stage of ingest to operate without
access to a Timeline. The `pending_data_pages` part is the "fast path"
bulk write data that can in principle be generated without a Timeline,
in parallel with other ingest batches, and ultimately on the safekeeper.

`test_bulk_insert` on AX102 shows approximately the same results as in
the previous PR #8591:

```
------------------------------ Benchmark results -------------------------------
test_bulk_insert[neon-release-pg16].insert: 23.577 s
test_bulk_insert[neon-release-pg16].pageserver_writes: 5,428 MB
test_bulk_insert[neon-release-pg16].peak_mem: 637 MB
test_bulk_insert[neon-release-pg16].size: 0 MB
test_bulk_insert[neon-release-pg16].data_uploaded: 1,922 MB
test_bulk_insert[neon-release-pg16].num_files_uploaded: 8 
test_bulk_insert[neon-release-pg16].wal_written: 1,382 MB
test_bulk_insert[neon-release-pg16].wal_recovery: 18.264 s
test_bulk_insert[neon-release-pg16].compaction: 0.052 s
```
  • Loading branch information
jcsp authored Sep 3, 2024
1 parent c7187be commit c4fe664
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 98 deletions.
12 changes: 8 additions & 4 deletions pageserver/src/import_datadir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::metrics::WAL_INGEST;
use crate::pgdatadir_mapping::*;
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::decode_wal_record;
use crate::walrecord::DecodedWALRecord;
use pageserver_api::reltag::{RelTag, SlruKind};
use postgres_ffi::pg_constants;
Expand Down Expand Up @@ -310,11 +311,13 @@ async fn import_wal(

let mut nrecords = 0;
let mut modification = tline.begin_modification(last_lsn);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= endpoint {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;

walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.ingest_record(decoded, lsn, &mut modification, ctx)
.await?;
WAL_INGEST.records_committed.inc();

Expand Down Expand Up @@ -449,11 +452,12 @@ pub async fn import_wal_from_tar(
waldecoder.feed_bytes(&bytes[offset..]);

let mut modification = tline.begin_modification(last_lsn);
let mut decoded = DecodedWALRecord::default();
while last_lsn <= end_lsn {
if let Some((lsn, recdata)) = waldecoder.poll_decode()? {
let mut decoded = DecodedWALRecord::default();
decode_wal_record(recdata, &mut decoded, tline.pg_version)?;
walingest
.ingest_record(recdata, lsn, &mut modification, &mut decoded, ctx)
.ingest_record(decoded, lsn, &mut modification, ctx)
.await?;
modification.commit(ctx).await?;
last_lsn = lsn;
Expand Down
228 changes: 162 additions & 66 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::keyspace::{KeySpace, KeySpaceAccum};
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id;
use crate::walrecord::NeonWalRecord;
use crate::{aux_file, repository::*};
use anyhow::{bail, ensure, Context};
use anyhow::{ensure, Context};
use bytes::{Buf, Bytes, BytesMut};
use enum_map::Enum;
use pageserver_api::key::{
Expand Down Expand Up @@ -168,7 +168,9 @@ impl Timeline {
DatadirModification {
tline: self,
pending_lsns: Vec::new(),
pending_updates: HashMap::new(),
pending_metadata_pages: HashMap::new(),
pending_data_pages: Vec::new(),
pending_zero_data_pages: Default::default(),
pending_deletions: Vec::new(),
pending_nblocks: 0,
pending_directory_entries: Vec::new(),
Expand Down Expand Up @@ -1031,10 +1033,24 @@ pub struct DatadirModification<'a> {
// The put-functions add the modifications here, and they are flushed to the
// underlying key-value store by the 'finish' function.
pending_lsns: Vec<Lsn>,
pending_updates: HashMap<Key, Vec<(Lsn, usize, Value)>>,
pending_deletions: Vec<(Range<Key>, Lsn)>,
pending_nblocks: i64,

/// Metadata writes, indexed by key so that they can be read from not-yet-committed modifications
/// while ingesting subsequent records. See [`Self::is_data_key`] for the definition of 'metadata'.
pending_metadata_pages: HashMap<CompactKey, Vec<(Lsn, usize, Value)>>,

/// Data writes, ready to be flushed into an ephemeral layer. See [`Self::is_data_key`] for
/// which keys are stored here.
pending_data_pages: Vec<(CompactKey, Lsn, usize, Value)>,

// Sometimes during ingest, for example when extending a relation, we would like to write a zero page. However,
// if we encounter a write from postgres in the same wal record, we will drop this entry.
//
// Unlike other 'pending' fields, this does not last until the next call to commit(): it is flushed
// at the end of each wal record, and all these writes implicitly are at lsn Self::lsn
pending_zero_data_pages: HashSet<CompactKey>,

/// For special "directory" keys that store key-value maps, track the size of the map
/// if it was updated in this modification.
pending_directory_entries: Vec<(DirectoryKind, usize)>,
Expand All @@ -1058,6 +1074,10 @@ impl<'a> DatadirModification<'a> {
self.pending_bytes
}

pub(crate) fn has_dirty_data_pages(&self) -> bool {
(!self.pending_data_pages.is_empty()) || (!self.pending_zero_data_pages.is_empty())
}

/// Set the current lsn
pub(crate) fn set_lsn(&mut self, lsn: Lsn) -> anyhow::Result<()> {
ensure!(
Expand All @@ -1066,13 +1086,28 @@ impl<'a> DatadirModification<'a> {
lsn,
self.lsn
);

// If we are advancing LSN, then state from previous wal record should have been flushed.
assert!(self.pending_zero_data_pages.is_empty());

if lsn > self.lsn {
self.pending_lsns.push(self.lsn);
self.lsn = lsn;
}
Ok(())
}

/// In this context, 'metadata' means keys that are only read by the pageserver internally, and 'data' means
/// keys that represent literal blocks that postgres can read. So data includes relation blocks and
/// SLRU blocks, which are read directly by postgres, and everything else is considered metadata.
///
/// The distinction is important because data keys are handled on a fast path where dirty writes are
/// not readable until this modification is committed, whereas metadata keys are visible for read
/// via [`Self::get`] as soon as their record has been ingested.
fn is_data_key(key: &Key) -> bool {
key.is_rel_block_key() || key.is_slru_block_key()
}

/// Initialize a completely new repository.
///
/// This inserts the directory metadata entries that are assumed to
Expand Down Expand Up @@ -1180,6 +1215,31 @@ impl<'a> DatadirModification<'a> {
Ok(())
}

pub(crate) fn put_rel_page_image_zero(&mut self, rel: RelTag, blknum: BlockNumber) {
self.pending_zero_data_pages
.insert(rel_block_to_key(rel, blknum).to_compact());
self.pending_bytes += ZERO_PAGE.len();
}

pub(crate) fn put_slru_page_image_zero(
&mut self,
kind: SlruKind,
segno: u32,
blknum: BlockNumber,
) {
self.pending_zero_data_pages
.insert(slru_block_to_key(kind, segno, blknum).to_compact());
self.pending_bytes += ZERO_PAGE.len();
}

/// Call this at the end of each WAL record.
pub(crate) fn on_record_end(&mut self) {
let pending_zero_data_pages = std::mem::take(&mut self.pending_zero_data_pages);
for key in pending_zero_data_pages {
self.put_data(key, Value::Image(ZERO_PAGE.clone()));
}
}

/// Store a relmapper file (pg_filenode.map) in the repository
pub async fn put_relmap_file(
&mut self,
Expand Down Expand Up @@ -1778,7 +1838,7 @@ impl<'a> DatadirModification<'a> {
/// retains all the metadata, but data pages are flushed. That's again OK
/// for bulk import, where you are just loading data pages and won't try to
/// modify the same pages twice.
pub async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
pub(crate) async fn flush(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
// Unless we have accumulated a decent amount of changes, it's not worth it
// to scan through the pending_updates list.
let pending_nblocks = self.pending_nblocks;
Expand All @@ -1789,31 +1849,11 @@ impl<'a> DatadirModification<'a> {
let mut writer = self.tline.writer().await;

// Flush relation and SLRU data blocks, keep metadata.
let mut retained_pending_updates = HashMap::<_, Vec<_>>::new();
for (key, values) in self.pending_updates.drain() {
if !key.is_valid_key_on_write_path() {
bail!(
"the request contains data not supported by pageserver at TimelineWriter::put: {}", key
);
}
let mut write_batch = Vec::new();
for (lsn, value_ser_size, value) in values {
if key.is_rel_block_key() || key.is_slru_block_key() {
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
write_batch.push((key.to_compact(), lsn, value_ser_size, value));
} else {
retained_pending_updates.entry(key).or_default().push((
lsn,
value_ser_size,
value,
));
}
}
writer.put_batch(write_batch, ctx).await?;
}
let pending_data_pages = std::mem::take(&mut self.pending_data_pages);

self.pending_updates = retained_pending_updates;
// This bails out on first error without modifying pending_updates.
// That's Ok, cf this function's doc comment.
writer.put_batch(pending_data_pages, ctx).await?;
self.pending_bytes = 0;

if pending_nblocks != 0 {
Expand All @@ -1834,29 +1874,31 @@ impl<'a> DatadirModification<'a> {
/// All the modifications in this atomic update are stamped by the specified LSN.
///
pub async fn commit(&mut self, ctx: &RequestContext) -> anyhow::Result<()> {
// Commit should never be called mid-wal-record
assert!(self.pending_zero_data_pages.is_empty());

let mut writer = self.tline.writer().await;

let pending_nblocks = self.pending_nblocks;
self.pending_nblocks = 0;

if !self.pending_updates.is_empty() {
// Ordering: the items in this batch do not need to be in any global order, but values for
// a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
// this to do efficient updates to its index.
let batch: Vec<(CompactKey, Lsn, usize, Value)> = self
.pending_updates
// Ordering: the items in this batch do not need to be in any global order, but values for
// a particular Key must be in Lsn order relative to one another. InMemoryLayer relies on
// this to do efficient updates to its index.
let mut write_batch = std::mem::take(&mut self.pending_data_pages);

write_batch.extend(
self.pending_metadata_pages
.drain()
.flat_map(|(key, values)| {
values.into_iter().map(move |(lsn, val_ser_size, value)| {
if !key.is_valid_key_on_write_path() {
bail!("the request contains data not supported by pageserver at TimelineWriter::put: {}", key);
}
Ok((key.to_compact(), lsn, val_ser_size, value))
})
})
.collect::<anyhow::Result<Vec<_>>>()?;
values
.into_iter()
.map(move |(lsn, value_size, value)| (key, lsn, value_size, value))
}),
);

writer.put_batch(batch, ctx).await?;
if !write_batch.is_empty() {
writer.put_batch(write_batch, ctx).await?;
}

if !self.pending_deletions.is_empty() {
Expand Down Expand Up @@ -1887,33 +1929,58 @@ impl<'a> DatadirModification<'a> {
}

pub(crate) fn len(&self) -> usize {
self.pending_updates.len() + self.pending_deletions.len()
self.pending_metadata_pages.len()
+ self.pending_data_pages.len()
+ self.pending_deletions.len()
}

// Internal helper functions to batch the modifications

/// Read a page from the Timeline we are writing to. For metadata pages, this passes through
/// a cache in Self, which makes writes earlier in this modification visible to WAL records later
/// in the modification.
///
/// For data pages, reads pass directly to the owning Timeline: any ingest code which reads a data
/// page must ensure that the pages they read are already committed in Timeline, for example
/// DB create operations are always preceded by a call to commit(). This is special cased because
/// it's rare: all the 'normal' WAL operations will only read metadata pages such as relation sizes,
/// and not data pages.
async fn get(&self, key: Key, ctx: &RequestContext) -> Result<Bytes, PageReconstructError> {
// Have we already updated the same key? Read the latest pending updated
// version in that case.
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
if let Some(values) = self.pending_updates.get(&key) {
if let Some((_, _, value)) = values.last() {
return if let Value::Image(img) = value {
Ok(img.clone())
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
Err(PageReconstructError::Other(anyhow::anyhow!(
"unexpected pending WAL record"
)))
};
if !Self::is_data_key(&key) {
// Have we already updated the same key? Read the latest pending updated
// version in that case.
//
// Note: we don't check pending_deletions. It is an error to request a
// value that has been removed, deletion only avoids leaking storage.
if let Some(values) = self.pending_metadata_pages.get(&key.to_compact()) {
if let Some((_, _, value)) = values.last() {
return if let Value::Image(img) = value {
Ok(img.clone())
} else {
// Currently, we never need to read back a WAL record that we
// inserted in the same "transaction". All the metadata updates
// work directly with Images, and we never need to read actual
// data pages. We could handle this if we had to, by calling
// the walredo manager, but let's keep it simple for now.
Err(PageReconstructError::Other(anyhow::anyhow!(
"unexpected pending WAL record"
)))
};
}
}
} else {
// This is an expensive check, so we only do it in debug mode. If reading a data key,
// this key should never be present in pending_data_pages. We ensure this by committing
// modifications before ingesting DB create operations, which are the only kind that reads
// data pages during ingest.
if cfg!(debug_assertions) {
for (dirty_key, _, _, _) in &self.pending_data_pages {
debug_assert!(&key.to_compact() != dirty_key);
}

debug_assert!(!self.pending_zero_data_pages.contains(&key.to_compact()))
}
}

// Metadata page cache miss, or we're reading a data page.
let lsn = Lsn::max(self.tline.get_last_record_lsn(), self.lsn);
self.tline.get(key, lsn, ctx).await
}
Expand All @@ -1925,11 +1992,40 @@ impl<'a> DatadirModification<'a> {
}

fn put(&mut self, key: Key, val: Value) {
let values = self.pending_updates.entry(key).or_default();
if Self::is_data_key(&key) {
self.put_data(key.to_compact(), val)
} else {
self.put_metadata(key.to_compact(), val)
}
}

fn put_data(&mut self, key: CompactKey, val: Value) {
let val_serialized_size = val.serialized_size().unwrap() as usize;

// If this page was previously zero'd in the same WalRecord, then drop the previous zero page write. This
// is an optimization that avoids persisting both the zero page generated by us (e.g. during a relation extend),
// and the subsequent postgres-originating write
if self.pending_zero_data_pages.remove(&key) {
self.pending_bytes -= ZERO_PAGE.len();
}

self.pending_bytes += val_serialized_size;
self.pending_data_pages
.push((key, self.lsn, val_serialized_size, val))
}

fn put_metadata(&mut self, key: CompactKey, val: Value) {
let values = self.pending_metadata_pages.entry(key).or_default();
// Replace the previous value if it exists at the same lsn
if let Some((last_lsn, last_value_ser_size, last_value)) = values.last_mut() {
if *last_lsn == self.lsn {
// Update the pending_bytes contribution from this entry, and update the serialized size in place
self.pending_bytes -= *last_value_ser_size;
*last_value_ser_size = val.serialized_size().unwrap() as usize;
self.pending_bytes += *last_value_ser_size;

// Use the latest value, this replaces any earlier write to the same (key,lsn), such as much
// have been generated by synthesized zero page writes prior to the first real write to a page.
*last_value = val;
return;
}
Expand Down
9 changes: 7 additions & 2 deletions pageserver/src/tenant/storage_layer/inmemory_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,13 @@ impl InMemoryLayer {
let vec_map = inner.index.entry(key).or_default();
let old = vec_map.append_or_update_last(lsn, index_entry).unwrap().0;
if old.is_some() {
// We already had an entry for this LSN. That's odd..
warn!("Key {} at {} already exists", key, lsn);
// This should not break anything, but is unexpected: ingestion code aims to filter out
// multiple writes to the same key at the same LSN. This happens in cases where our
// ingenstion code generates some write like an empty page, and we see a write from postgres
// to the same key in the same wal record. If one such write makes it through, we
// index the most recent write, implicitly ignoring the earlier write. We log a warning
// because this case is unexpected, and we would like tests to fail if this happens.
warn!("Key {} at {} written twice at same LSN", key, lsn);
}
}

Expand Down
Loading

1 comment on commit c4fe664

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3815 tests run: 3708 passed, 1 failed, 106 skipped (full report)


Failures on Postgres 14

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_tenant_deletion[release-pg14]"
Flaky tests (2)

Postgres 15

Postgres 14

Test coverage report is not available

The comment gets automatically updated with the latest test results
c4fe664 at 2024-09-03T18:29:23.621Z :recycle:

Please sign in to comment.