Skip to content

Commit

Permalink
feat(pageserver): validate data integrity during gc-compaction (#10131)
Browse files Browse the repository at this point in the history
## Problem

part of #9114
part of investigation of
#10049

## Summary of changes

* If `cfg!(test) or cfg!(feature = testing)`, then we will always try
generating an image to ensure the history is replayable, but not put the
image layer into the final layer results, therefore discovering wrong
key history before we hit a read error.
* I suspect it's easier to trigger some races if gc-compaction is
continuously run on a timeline, so I increased the frequency to twice
per 10 churns.
* Also, create branches in gc-compaction smoke tests to get more test
coverage.

---------

Signed-off-by: Alex Chi Z <[email protected]>
Co-authored-by: Arpad Müller <[email protected]>
  • Loading branch information
skyzh and arpad-m authored Jan 15, 2025
1 parent 55a68b2 commit a753349
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 24 deletions.
77 changes: 57 additions & 20 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1776,7 +1776,10 @@ impl Timeline {
base_img_from_ancestor: Option<(Key, Lsn, Bytes)>,
) -> anyhow::Result<KeyHistoryRetention> {
// Pre-checks for the invariants
if cfg!(debug_assertions) {

let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");

if debug_mode {
for (log_key, _, _) in full_history {
assert_eq!(log_key, &key, "mismatched key");
}
Expand Down Expand Up @@ -1922,15 +1925,19 @@ impl Timeline {
output
}

let mut key_exists = false;
for (i, split_for_lsn) in split_history.into_iter().enumerate() {
// TODO: there could be image keys inside the splits, and we can compute records_since_last_image accordingly.
records_since_last_image += split_for_lsn.len();
let generate_image = if i == 0 && !has_ancestor {
// Whether to produce an image into the final layer files
let produce_image = if i == 0 && !has_ancestor {
// We always generate images for the first batch (below horizon / lowest retain_lsn)
true
} else if i == batch_cnt - 1 {
// Do not generate images for the last batch (above horizon)
false
} else if records_since_last_image == 0 {
false
} else if records_since_last_image >= delta_threshold_cnt {
// Generate images when there are too many records
true
Expand All @@ -1945,29 +1952,45 @@ impl Timeline {
break;
}
}
if let Some((_, _, val)) = replay_history.first() {
if !val.will_init() {
return Err(anyhow::anyhow!("invalid history, no base image")).with_context(
|| {
generate_debug_trace(
Some(&replay_history),
full_history,
retain_lsn_below_horizon,
horizon,
)
},
);
}
if replay_history.is_empty() && !key_exists {
// The key does not exist at earlier LSN, we can skip this iteration.
retention.push(Vec::new());
continue;
} else {
key_exists = true;
}
if generate_image && records_since_last_image > 0 {
let Some((_, _, val)) = replay_history.first() else {
unreachable!("replay history should not be empty once it exists")
};
if !val.will_init() {
return Err(anyhow::anyhow!("invalid history, no base image")).with_context(|| {
generate_debug_trace(
Some(&replay_history),
full_history,
retain_lsn_below_horizon,
horizon,
)
});
}
// Whether to reconstruct the image. In debug mode, we will generate an image
// at every retain_lsn to ensure data is not corrupted, but we won't put the
// image into the final layer.
let generate_image = produce_image || debug_mode;
if produce_image {
records_since_last_image = 0;
let replay_history_for_debug = if cfg!(debug_assertions) {
}
let img_and_lsn = if generate_image {
let replay_history_for_debug = if debug_mode {
Some(replay_history.clone())
} else {
None
};
let replay_history_for_debug_ref = replay_history_for_debug.as_deref();
let history = std::mem::take(&mut replay_history);
let history = if produce_image {
std::mem::take(&mut replay_history)
} else {
replay_history.clone()
};
let mut img = None;
let mut records = Vec::with_capacity(history.len());
if let (_, lsn, Value::Image(val)) = history.first().as_ref().unwrap() {
Expand Down Expand Up @@ -2004,8 +2027,20 @@ impl Timeline {
}
records.reverse();
let state = ValueReconstructState { img, records };
let request_lsn = lsn_split_points[i]; // last batch does not generate image so i is always in range
// last batch does not generate image so i is always in range, unless we force generate
// an image during testing
let request_lsn = if i >= lsn_split_points.len() {
Lsn::MAX
} else {
lsn_split_points[i]
};
let img = self.reconstruct_value(key, request_lsn, state).await?;
Some((request_lsn, img))
} else {
None
};
if produce_image {
let (request_lsn, img) = img_and_lsn.unwrap();
replay_history.push((key, request_lsn, Value::Image(img.clone())));
retention.push(vec![(request_lsn, Value::Image(img))]);
} else {
Expand Down Expand Up @@ -2273,6 +2308,8 @@ impl Timeline {
let compact_key_range = job.compact_key_range;
let compact_lsn_range = job.compact_lsn_range;

let debug_mode = cfg!(debug_assertions) || cfg!(feature = "testing");

info!("running enhanced gc bottom-most compaction, dry_run={dry_run}, compact_key_range={}..{}, compact_lsn_range={}..{}", compact_key_range.start, compact_key_range.end, compact_lsn_range.start, compact_lsn_range.end);

scopeguard::defer! {
Expand Down Expand Up @@ -2398,7 +2435,7 @@ impl Timeline {
.first()
.copied()
.unwrap_or(job_desc.gc_cutoff);
if cfg!(debug_assertions) {
if debug_mode {
assert_eq!(
res,
job_desc
Expand Down
16 changes: 16 additions & 0 deletions test_runner/fixtures/workload.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ def __init__(
self._endpoint: Endpoint | None = None
self._endpoint_opts = endpoint_opts or {}

def branch(
self,
timeline_id: TimelineId,
branch_name: str | None = None,
endpoint_opts: dict[str, Any] | None = None,
) -> Workload:
"""
Checkpoint the current status of the workload in case of branching
"""
branch_workload = Workload(
self.env, self.tenant_id, timeline_id, branch_name, endpoint_opts
)
branch_workload.expect_rows = self.expect_rows
branch_workload.churn_cursor = self.churn_cursor
return branch_workload

def reconfigure(self) -> None:
"""
Request the endpoint to reconfigure based on location reported by storage controller
Expand Down
20 changes: 16 additions & 4 deletions test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ def test_pageserver_compaction_smoke(


@skip_in_debug_build("only run with release build")
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
@pytest.mark.parametrize(
"with_branches",
["with_branches", "no_branches"],
)
def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder, with_branches: str):
SMOKE_CONF = {
# Run both gc and gc-compaction.
"gc_period": "5s",
Expand Down Expand Up @@ -143,12 +147,17 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
log.info("Writing initial data ...")
workload.write_rows(row_count, env.pageserver.id)

child_workloads: list[Workload] = []

for i in range(1, churn_rounds + 1):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")

if (i - 1) % 10 == 0:
# Run gc-compaction every 10 rounds to ensure the test doesn't take too long time.
if i % 10 == 5 and with_branches == "with_branches":
branch_name = f"child-{i}"
branch_timeline_id = env.create_branch(branch_name)
child_workloads.append(workload.branch(branch_timeline_id, branch_name))
if (i - 1) % 10 == 0 or (i - 1) % 10 == 1:
# Run gc-compaction twice every 10 rounds to ensure the test doesn't take too long time.
ps_http.timeline_compact(
tenant_id,
timeline_id,
Expand Down Expand Up @@ -179,6 +188,9 @@ def compaction_finished():

log.info("Validating at workload end ...")
workload.validate(env.pageserver.id)
for child_workload in child_workloads:
log.info(f"Validating at branch {child_workload.branch_name}")
child_workload.validate(env.pageserver.id)

# Run a legacy compaction+gc to ensure gc-compaction can coexist with legacy compaction.
ps_http.timeline_checkpoint(tenant_id, timeline_id, wait_until_uploaded=True)
Expand Down

1 comment on commit a753349

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7477 tests run: 7090 passed, 0 failed, 387 skipped (full report)


Flaky tests (4)

Postgres 17

Postgres 16

  • test_physical_replication_config_mismatch_max_locks_per_transaction: release-arm64

Postgres 15

  • test_physical_replication_config_mismatch_max_locks_per_transaction: release-x86-64

Code coverage* (full report)

  • functions: 33.7% (8420 of 25015 functions)
  • lines: 49.2% (70400 of 143232 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
a753349 at 2025-01-16T00:15:08.108Z :recycle:

Please sign in to comment.