Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist pg_stat information in pageserver #6560

Merged
merged 1 commit into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/postgres_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ pub fn generate_pg_control(
checkpoint_bytes: &[u8],
lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<(Bytes, u64)> {
) -> anyhow::Result<(Bytes, u64, bool)> {
dispatch_pgversion!(
pg_version,
pgv::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
Expand Down
42 changes: 39 additions & 3 deletions libs/postgres_ffi/src/xlog_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,59 @@ pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
}
}

/// Generate a pg_control file, for a basebackup for starting up Postgres at the given LSN
///
/// 'pg_control_bytes' and 'checkpoint_bytes' are the contents of those keys persisted in
/// the pageserver. They use the same format as the PostgreSQL control file and the
/// checkpoint record, but see walingest.rs for how exactly they are kept up to date.
/// 'lsn' is the LSN at which we're starting up.
///
/// Returns:
/// - pg_control file contents
/// - system_identifier, extracted from the persisted information
/// - true, if we're starting up from a "clean shutdown", i.e. if there was a shutdown
/// checkpoint at the given LSN
pub fn generate_pg_control(
pg_control_bytes: &[u8],
checkpoint_bytes: &[u8],
lsn: Lsn,
) -> anyhow::Result<(Bytes, u64)> {
) -> anyhow::Result<(Bytes, u64, bool)> {
let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;

// Generate new pg_control needed for bootstrap
//
// NB: In the checkpoint struct that we persist in the pageserver, we have a different
// convention for the 'redo' field than in PostgreSQL: On a shutdown checkpoint,
// 'redo' points the *end* of the checkpoint WAL record. On PostgreSQL, it points to
// the beginning. Furthermore, on an online checkpoint, 'redo' is set to 0.
//
// We didn't always have this convention however, and old persisted records will have
// old REDO values that point to some old LSN.
//
// The upshot is that if 'redo' is equal to the "current" LSN, there was a shutdown
// checkpoint record at that point in WAL, with no new WAL records after it. That case
// can be treated as starting from a clean shutdown. All other cases are treated as
// non-clean shutdown. In Neon, we don't do WAL replay at startup in either case, so
// that distinction doesn't matter very much. As of this writing, it only affects
// whether the persisted pg_stats information can be used or not.
//
// In the Checkpoint struct in the returned pg_control file, the redo pointer is
// always set to the LSN we're starting at, to hint that no WAL replay is required.
// (There's some neon-specific code in Postgres startup to make that work, though.
// Just setting the redo pointer is not sufficient.)
let was_shutdown = Lsn(checkpoint.redo) == lsn;
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;

//save new values in pg_control
// We use DBState_DB_SHUTDOWNED even if it was not a clean shutdown. The
// neon-specific code at postgres startup ignores the state stored in the control
// file, similar to archive recovery in standalone PostgreSQL. Similarly, the
// checkPoint pointer is ignored, so just set it to 0.
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;
pg_control.state = DBState_DB_SHUTDOWNED;

Ok((pg_control.encode(), pg_control.system_identifier))
Ok((pg_control.encode(), pg_control.system_identifier, was_shutdown))
}

pub fn get_current_timestamp() -> TimestampTz {
Expand Down
1 change: 1 addition & 0 deletions pageserver/ctl/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ impl AuxFileV2 {
AuxFileV2::Recognized("pg_logical/replorigin_checkpoint", hash)
}
(2, 1) => AuxFileV2::Recognized("pg_replslot/", hash),
(3, 1) => AuxFileV2::Recognized("pg_stat/pgstat.stat", hash),
(1, 0xff) => AuxFileV2::OtherWithPrefix("pg_logical/", hash),
(0xff, 0xff) => AuxFileV2::Other(hash),
_ => return None,
Expand Down
4 changes: 4 additions & 0 deletions pageserver/src/aux_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fn aux_hash_to_metadata_key(dir_level1: u8, dir_level2: u8, data: &[u8]) -> Key

const AUX_DIR_PG_LOGICAL: u8 = 0x01;
const AUX_DIR_PG_REPLSLOT: u8 = 0x02;
const AUX_DIR_PG_STAT: u8 = 0x03;
const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;

/// Encode the aux file into a fixed-size key.
Expand All @@ -53,6 +54,7 @@ const AUX_DIR_PG_UNKNOWN: u8 = 0xFF;
/// * pg_logical/replorigin_checkpoint -> 0x0103
/// * pg_logical/others -> 0x01FF
/// * pg_replslot/ -> 0x0201
/// * pg_stat/pgstat.stat -> 0x0301
/// * others -> 0xFFFF
///
/// If you add new AUX files to this function, please also add a test case to `test_encoding_portable`.
Expand All @@ -75,6 +77,8 @@ pub fn encode_aux_file_key(path: &str) -> Key {
aux_hash_to_metadata_key(AUX_DIR_PG_LOGICAL, 0xFF, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_replslot/") {
aux_hash_to_metadata_key(AUX_DIR_PG_REPLSLOT, 0x01, fname.as_bytes())
} else if let Some(fname) = path.strip_prefix("pg_stat/") {
aux_hash_to_metadata_key(AUX_DIR_PG_STAT, 0x01, fname.as_bytes())
} else {
if cfg!(debug_assertions) {
warn!(
Expand Down
58 changes: 37 additions & 21 deletions pageserver/src/basebackup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,31 @@ where
async fn send_tarball(mut self) -> Result<(), BasebackupError> {
// TODO include checksum

// Construct the pg_control file from the persisted checkpoint and pg_control
// information. But we only add this to the tarball at the end, so that if the
// writing is interrupted half-way through, the resulting incomplete tarball will
// be missing the pg_control file, which prevents PostgreSQL from starting up on
// it. With proper error handling, you should never try to start up from an
// incomplete basebackup in the first place, of course, but this is a nice little
// extra safety measure.
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn, self.ctx)
.await
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn, self.ctx)
.await
.context("failed to get control bytes")?;
let (pg_control_bytes, system_identifier, was_shutdown) =
postgres_ffi::generate_pg_control(
&pg_control_bytes,
&checkpoint_bytes,
self.lsn,
self.timeline.pg_version,
)?;

let lazy_slru_download = self.timeline.get_lazy_slru_download() && !self.full_backup;

let pgversion = self.timeline.pg_version;
Expand Down Expand Up @@ -401,6 +426,10 @@ where
// In future we will not generate AUX record for "pg_logical/replorigin_checkpoint" at all,
// but now we should handle (skip) it for backward compatibility.
continue;
} else if path == "pg_stat/pgstat.stat" && !was_shutdown {
// Drop statistic in case of abnormal termination, i.e. if we're not starting from the exact LSN
// of a shutdown checkpoint.
continue;
}
let header = new_tar_header(&path, content.len() as u64)?;
self.ar
Expand Down Expand Up @@ -462,8 +491,9 @@ where
)))
});

// Generate pg_control and bootstrap WAL segment.
self.add_pgcontrol_file().await?;
// Last, add the pg_control file and bootstrap WAL segment.
self.add_pgcontrol_file(pg_control_bytes, system_identifier)
.await?;
self.ar
.finish()
.await
Expand Down Expand Up @@ -671,7 +701,11 @@ where
// Add generated pg_control file and bootstrap WAL segment.
// Also send zenith.signal file with extra bootstrap data.
//
async fn add_pgcontrol_file(&mut self) -> Result<(), BasebackupError> {
async fn add_pgcontrol_file(
&mut self,
pg_control_bytes: Bytes,
system_identifier: u64,
) -> Result<(), BasebackupError> {
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
Expand All @@ -694,24 +728,6 @@ where
.await
.map_err(|e| BasebackupError::Client(e, "add_pgcontrol_file,zenith.signal"))?;

let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn, self.ctx)
.await
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn, self.ctx)
.await
.context("failed get control bytes")?;

let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
&pg_control_bytes,
&checkpoint_bytes,
self.lsn,
self.timeline.pg_version,
)?;

//send pg_control
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar
Expand Down
9 changes: 8 additions & 1 deletion pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::ops::ControlFlow;
use std::ops::Range;
use strum::IntoEnumIterator;
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace, warn};
use tracing::{debug, info, trace, warn};
use utils::bin_ser::DeserializeError;
use utils::pausable_failpoint;
use utils::{bin_ser::BeSer, lsn::Lsn};
Expand Down Expand Up @@ -2264,6 +2264,13 @@ impl DatadirModification<'_> {
self.tline.aux_file_size_estimator.on_add(content.len());
new_files.push((path, content));
}
// Compute may request delete of old version of pgstat AUX file if new one exceeds size limit.
// Compute doesn't know if previous version of this file exists or not, so
// attempt to delete non-existing file can cause this message.
// To avoid false alarms, log it as info rather than warning.
(None, true) if path.starts_with("pg_stat/") => {
info!("removing non-existing pg_stat file: {}", path)
}
(None, true) => warn!("removing non-existing aux file: {}", path),
}
let new_val = aux_file::encode_file_value(&new_files)?;
Expand Down
8 changes: 5 additions & 3 deletions pageserver/src/tenant/storage_layer/delta_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use futures::StreamExt;
use itertools::Itertools;
use pageserver_api::config::MaxVectoredReadBytes;
use pageserver_api::key::DBDIR_KEY;
use pageserver_api::key::{Key, KEY_SIZE};
use pageserver_api::key::{Key, DBDIR_KEY, KEY_SIZE};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::models::ImageCompressionAlgorithm;
use pageserver_api::shard::TenantShardId;
Expand Down Expand Up @@ -967,7 +966,10 @@ impl DeltaLayerInner {
.as_slice()
.iter()
.filter_map(|(_, blob_meta)| {
if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
if blob_meta.key.is_rel_dir_key()
|| blob_meta.key == DBDIR_KEY
|| blob_meta.key.is_aux_file_key()
{
// The size of values for these keys is unbounded and can
// grow very large in pathological cases.
None
Expand Down
8 changes: 5 additions & 3 deletions pageserver/src/tenant/storage_layer/image_layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use hex;
use itertools::Itertools;
use pageserver_api::config::MaxVectoredReadBytes;
use pageserver_api::key::DBDIR_KEY;
use pageserver_api::key::{Key, KEY_SIZE};
use pageserver_api::key::{Key, DBDIR_KEY, KEY_SIZE};
use pageserver_api::keyspace::KeySpace;
use pageserver_api::shard::{ShardIdentity, TenantShardId};
use pageserver_api::value::Value;
Expand Down Expand Up @@ -603,7 +602,10 @@ impl ImageLayerInner {
.as_slice()
.iter()
.filter_map(|(_, blob_meta)| {
if blob_meta.key.is_rel_dir_key() || blob_meta.key == DBDIR_KEY {
if blob_meta.key.is_rel_dir_key()
|| blob_meta.key == DBDIR_KEY
|| blob_meta.key.is_aux_file_key()
{
// The size of values for these keys is unbounded and can
// grow very large in pathological cases.
None
Expand Down
44 changes: 44 additions & 0 deletions pageserver/src/walingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,50 @@ impl WalIngest {
} else {
cp.oldestActiveXid = xlog_checkpoint.oldestActiveXid;
}
// NB: We abuse the Checkpoint.redo field:
//
// - In PostgreSQL, the Checkpoint struct doesn't store the information
// of whether this is an online checkpoint or a shutdown checkpoint. It's
// stored in the XLOG info field of the WAL record, shutdown checkpoints
// use record type XLOG_CHECKPOINT_SHUTDOWN and online checkpoints use
// XLOG_CHECKPOINT_ONLINE. We don't store the original WAL record headers
// in the pageserver, however.
//
// - In PostgreSQL, the Checkpoint.redo field stores the *start* of the
// checkpoint record, if it's a shutdown checkpoint. But when we are
// starting from a shutdown checkpoint, the basebackup LSN is the *end*
// of the shutdown checkpoint WAL record. That makes it difficult to
// correctly detect whether we're starting from a shutdown record or
// not.
//
// To address both of those issues, we store 0 in the redo field if it's
// an online checkpoint record, and the record's *end* LSN if it's a
// shutdown checkpoint. We don't need the original redo pointer in neon,
// because we don't perform WAL replay at startup anyway, so we can get
// away with abusing the redo field like this.
//
// XXX: Ideally, we would persist the extra information in a more
// explicit format, rather than repurpose the fields of the Postgres
// struct like this. However, we already have persisted data like this,
// so we need to maintain backwards compatibility.
//
// NB: We didn't originally have this convention, so there are still old
// persisted records that didn't do this. Before, we didn't update the
// persisted redo field at all. That means that old records have a bogus
// redo pointer that points to some old value, from the checkpoint record
// that was originally imported from the data directory. If it was a
// project created in Neon, that means it points to the first checkpoint
// after initdb. That's OK for our purposes: all such old checkpoints are
// treated as old online checkpoints when the basebackup is created.
cp.redo = if info == pg_constants::XLOG_CHECKPOINT_SHUTDOWN {
// Store the *end* LSN of the checkpoint record. Or to be precise,
// the start LSN of the *next* record, i.e. if the record ends
// exactly at page boundary, the redo LSN points to just after the
// page header on the next page.
lsn.into()
} else {
Lsn::INVALID.into()
};

// Write a new checkpoint key-value pair on every checkpoint record, even
// if nothing really changed. Not strictly required, but it seems nice to
Expand Down
4 changes: 3 additions & 1 deletion test_runner/regress/test_broken_timeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
".*failed to load metadata.*",
".*load failed.*load local timeline.*",
".*: layer load failed, assuming permanent failure:.*",
".*failed to get checkpoint bytes.*",
".*failed to get control bytes.*",
]
)

Expand Down Expand Up @@ -75,7 +77,7 @@ def test_local_corruption(neon_env_builder: NeonEnvBuilder):
# (We don't check layer file contents on startup, when loading the timeline)
#
# This will change when we implement checksums for layers
with pytest.raises(Exception, match="get_values_reconstruct_data for layer ") as err:
with pytest.raises(Exception, match="failed to get checkpoint bytes") as err:
pg1.start()
log.info(
f"As expected, compute startup failed for timeline {tenant1}/{timeline1} with corrupt layers: {err}"
Expand Down
Loading
Loading