Skip to content

Commit

Permalink
wal_decoder: rename end_lsn to next_record_lsn
Browse files Browse the repository at this point in the history
It turns out that `WalStreamDecoder::poll_decode` returns the start
LSN of the next record and not the end LSN of the current record.
They are not always equal.

Rename things to reflect that.
  • Loading branch information
VladLazar committed Nov 15, 2024
1 parent e12628f commit 965808d
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 23 deletions.
18 changes: 11 additions & 7 deletions libs/wal_decoder/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl InterpretedWalRecord {
pub fn from_bytes_filtered(
buf: Bytes,
shard: &ShardIdentity,
record_end_lsn: Lsn,
next_record_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<InterpretedWalRecord> {
let mut decoded = DecodedWALRecord::default();
Expand All @@ -32,18 +32,18 @@ impl InterpretedWalRecord {
FlushUncommittedRecords::No
};

let metadata_record = MetadataRecord::from_decoded(&decoded, record_end_lsn, pg_version)?;
let metadata_record = MetadataRecord::from_decoded(&decoded, next_record_lsn, pg_version)?;
let batch = SerializedValueBatch::from_decoded_filtered(
decoded,
shard,
record_end_lsn,
next_record_lsn,
pg_version,
)?;

Ok(InterpretedWalRecord {
metadata_record,
batch,
end_lsn: record_end_lsn,
next_record_lsn,
flush_uncommitted,
xid,
})
Expand All @@ -53,7 +53,7 @@ impl InterpretedWalRecord {
impl MetadataRecord {
fn from_decoded(
decoded: &DecodedWALRecord,
record_end_lsn: Lsn,
next_record_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<Option<MetadataRecord>> {
// Note: this doesn't actually copy the bytes since
Expand All @@ -74,7 +74,9 @@ impl MetadataRecord {
Ok(None)
}
pg_constants::RM_CLOG_ID => Self::decode_clog_record(&mut buf, decoded, pg_version),
pg_constants::RM_XACT_ID => Self::decode_xact_record(&mut buf, decoded, record_end_lsn),
pg_constants::RM_XACT_ID => {
Self::decode_xact_record(&mut buf, decoded, next_record_lsn)
}
pg_constants::RM_MULTIXACT_ID => {
Self::decode_multixact_record(&mut buf, decoded, pg_version)
}
Expand All @@ -86,7 +88,9 @@ impl MetadataRecord {
//
// Alternatively, one can make the checkpoint part of the subscription protocol
// to the pageserver. This should work fine, but can be done at a later point.
pg_constants::RM_XLOG_ID => Self::decode_xlog_record(&mut buf, decoded, record_end_lsn),
pg_constants::RM_XLOG_ID => {
Self::decode_xlog_record(&mut buf, decoded, next_record_lsn)
}
pg_constants::RM_LOGICALMSG_ID => {
Self::decode_logical_message_record(&mut buf, decoded)
}
Expand Down
6 changes: 4 additions & 2 deletions libs/wal_decoder/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ pub struct InterpretedWalRecord {
/// A pre-serialized batch along with the required metadata for ingestion
/// by the pageserver
pub batch: SerializedValueBatch,
/// Byte offset within WAL for the end of the original PG WAL record
pub end_lsn: Lsn,
/// Byte offset within WAL for the start of the next PG WAL record.
/// Usually this is the end LSN of the current record, but in case of
/// XLOG SWITCH records it will be within the next segment.
pub next_record_lsn: Lsn,
/// Whether to flush all uncommitted modifications to the storage engine
/// before ingesting this record. This is currently only used for legacy PG
/// database creations which read pages from a template database. Such WAL
Expand Down
18 changes: 11 additions & 7 deletions libs/wal_decoder/src/serialized_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl SerializedValueBatch {
pub(crate) fn from_decoded_filtered(
decoded: DecodedWALRecord,
shard: &ShardIdentity,
record_end_lsn: Lsn,
next_record_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<SerializedValueBatch> {
// First determine how big the buffer needs to be and allocate it up-front.
Expand All @@ -156,13 +156,17 @@ impl SerializedValueBatch {
let key = rel_block_to_key(rel, blk.blkno);

if !key.is_valid_key_on_write_path() {
anyhow::bail!("Unsupported key decoded at LSN {}: {}", record_end_lsn, key);
anyhow::bail!(
"Unsupported key decoded at LSN {}: {}",
next_record_lsn,
key
);
}

let key_is_local = shard.is_key_local(&key);

tracing::debug!(
lsn=%record_end_lsn,
lsn=%next_record_lsn,
key=%key,
"ingest: shard decision {}",
if !key_is_local { "drop" } else { "keep" },
Expand All @@ -174,7 +178,7 @@ impl SerializedValueBatch {
// its blkno in case it implicitly extends a relation.
metadata.push(ValueMeta::Observed(ObservedValueMeta {
key: key.to_compact(),
lsn: record_end_lsn,
lsn: next_record_lsn,
}))
}

Expand Down Expand Up @@ -205,7 +209,7 @@ impl SerializedValueBatch {
// that would corrupt the page.
//
if !page_is_new(&image) {
page_set_lsn(&mut image, record_end_lsn)
page_set_lsn(&mut image, next_record_lsn)
}
assert_eq!(image.len(), BLCKSZ as usize);

Expand All @@ -224,12 +228,12 @@ impl SerializedValueBatch {

metadata.push(ValueMeta::Serialized(SerializedValueMeta {
key: key.to_compact(),
lsn: record_end_lsn,
lsn: next_record_lsn,
batch_offset: relative_off,
len: val_ser_size,
will_init: val.will_init(),
}));
max_lsn = std::cmp::max(max_lsn, record_end_lsn);
max_lsn = std::cmp::max(max_lsn, next_record_lsn);
len += 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,19 +331,19 @@ pub(super) async fn handle_walreceiver_connection(
Ok(())
}

while let Some((record_end_lsn, recdata)) = waldecoder.poll_decode()? {
while let Some((next_record_lsn, recdata)) = waldecoder.poll_decode()? {
// It is important to deal with the aligned records as lsn in getPage@LSN is
// aligned and can be several bytes bigger. Without this alignment we are
// at risk of hitting a deadlock.
if !record_end_lsn.is_aligned() {
if !next_record_lsn.is_aligned() {
return Err(WalReceiverError::Other(anyhow!("LSN not aligned")));
}

// Deserialize and interpret WAL record
let interpreted = InterpretedWalRecord::from_bytes_filtered(
recdata,
modification.tline.get_shard_identity(),
record_end_lsn,
next_record_lsn,
modification.tline.pg_version,
)?;

Expand All @@ -367,10 +367,10 @@ pub(super) async fn handle_walreceiver_connection(
.ingest_record(interpreted, &mut modification, &ctx)
.await
.with_context(|| {
format!("could not ingest record at {record_end_lsn}")
format!("could not ingest record at {next_record_lsn}")
})?;
if !ingested {
tracing::debug!("ingest: filtered out record @ LSN {record_end_lsn}");
tracing::debug!("ingest: filtered out record @ LSN {next_record_lsn}");
WAL_INGEST.records_filtered.inc();
filtered_records += 1;
}
Expand All @@ -380,7 +380,7 @@ pub(super) async fn handle_walreceiver_connection(
// to timeout the tests.
fail_point!("walreceiver-after-ingest");

last_rec_lsn = record_end_lsn;
last_rec_lsn = next_record_lsn;

// Commit every ingest_batch_size records. Even if we filtered out
// all records, we still need to call commit to advance the LSN.
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/walingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl WalIngest {
WAL_INGEST.records_received.inc();
let prev_len = modification.len();

modification.set_lsn(interpreted.end_lsn)?;
modification.set_lsn(interpreted.next_record_lsn)?;

if matches!(interpreted.flush_uncommitted, FlushUncommittedRecords::Yes) {
// Records of this type should always be preceded by a commit(), as they
Expand Down

0 comments on commit 965808d

Please sign in to comment.