Skip to content

Commit

Permalink
make compact_target_max_size_mb configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi Z <[email protected]>
  • Loading branch information
skyzh committed Dec 9, 2024
1 parent 3966318 commit 6fa8592
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 9 deletions.
5 changes: 5 additions & 0 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,10 @@ async fn timeline_compact_handler(
.as_ref()
.map(|r| r.sub_compaction)
.unwrap_or(false);
let sub_compaction_max_job_size_mb = compact_request
.as_ref()
.and_then(|r| r.sub_compaction_max_job_size_mb);

let options = CompactOptions {
compact_key_range: compact_request
.as_ref()
Expand All @@ -2049,6 +2053,7 @@ async fn timeline_compact_handler(
.and_then(|r| r.compact_lsn_range.clone()),
flags,
sub_compaction,
sub_compaction_max_job_size_mb,
};

let scheduled = compact_request
Expand Down
14 changes: 10 additions & 4 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3017,10 +3017,15 @@ impl Tenant {
warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options);
} else if next_scheduled_compaction_task.options.sub_compaction {
info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = timeline
.gc_compaction_split_jobs(GcCompactJob::from_compact_options(
next_scheduled_compaction_task.options,
))
let jobs: Vec<GcCompactJob> = timeline
.gc_compaction_split_jobs(
GcCompactJob::from_compact_options(
next_scheduled_compaction_task.options.clone(),
),
next_scheduled_compaction_task
.options
.sub_compaction_max_job_size_mb,
)
.await
.map_err(CompactionError::Other)?;
if jobs.is_empty() {
Expand All @@ -3043,6 +3048,7 @@ impl Tenant {
sub_compaction: false,
compact_key_range: Some(job.compact_key_range.into()),
compact_lsn_range: Some(job.compact_lsn_range.into()),
sub_compaction_max_job_size_mb: None,
};
tline_pending_tasks.push_back(if idx == jobs_len - 1 {
ScheduledCompactionTask {
Expand Down
6 changes: 6 additions & 0 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,8 @@ pub(crate) struct CompactRequest {
/// Whether the compaction job should be split across key ranges.
#[serde(default)]
pub sub_compaction: bool,
/// Max job size for each subcompaction job.
pub sub_compaction_max_job_size_mb: Option<u64>,
}

#[serde_with::serde_as]
Expand Down Expand Up @@ -859,6 +861,9 @@ pub(crate) struct CompactOptions {
/// Enable sub-compaction (split compaction job across key ranges).
/// This option is only used by GC compaction.
pub sub_compaction: bool,
/// Set job size for the GC compaction.
/// This option is only used by GC compaction.
pub sub_compaction_max_job_size_mb: Option<u64>,
}

impl std::fmt::Debug for Timeline {
Expand Down Expand Up @@ -1683,6 +1688,7 @@ impl Timeline {
compact_key_range: None,
compact_lsn_range: None,
sub_compaction: false,
sub_compaction_max_job_size_mb: None,
},
ctx,
)
Expand Down
17 changes: 12 additions & 5 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1805,12 +1805,19 @@ impl Timeline {
pub(crate) async fn gc_compaction_split_jobs(
self: &Arc<Self>,
job: GcCompactJob,
sub_compaction_max_job_size_mb: Option<u64>,
) -> anyhow::Result<Vec<GcCompactJob>> {
let compact_below_lsn = if job.compact_lsn_range.end != Lsn::MAX {
job.compact_lsn_range.end
} else {
*self.get_latest_gc_cutoff_lsn() // use the real gc cutoff
};

// Split compaction job to about 4GB each
const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024;
let sub_compaction_max_job_size_mb =
sub_compaction_max_job_size_mb.unwrap_or(GC_COMPACT_MAX_SIZE_MB);

let mut compact_jobs = Vec::new();
// For now, we simply use the key partitioning information; we should do a more fine-grained partitioning
// by estimating the amount of files read for a compaction job. We should also partition on LSN.
Expand Down Expand Up @@ -1857,8 +1864,6 @@ impl Timeline {
let guard = self.layers.read().await;
let layer_map = guard.layer_map()?;
let mut current_start = None;
// Split compaction job to about 2GB each
const GC_COMPACT_MAX_SIZE_MB: u64 = 4 * 1024; // 4GB, TODO: should be configuration in the future
let ranges_num = split_key_ranges.len();
for (idx, (start, end)) in split_key_ranges.into_iter().enumerate() {
if current_start.is_none() {
Expand All @@ -1871,7 +1876,7 @@ impl Timeline {
}
let res = layer_map.range_search(start..end, compact_below_lsn);
let total_size = res.found.keys().map(|x| x.layer.file_size()).sum::<u64>();
if total_size > GC_COMPACT_MAX_SIZE_MB * 1024 * 1024 || ranges_num == idx + 1 {
if total_size > sub_compaction_max_job_size_mb * 1024 * 1024 || ranges_num == idx + 1 {
// Try to extend the compaction range so that we include at least one full layer file.
let extended_end = res
.found
Expand Down Expand Up @@ -1924,10 +1929,12 @@ impl Timeline {
ctx: &RequestContext,
) -> anyhow::Result<()> {
let sub_compaction = options.sub_compaction;
let job = GcCompactJob::from_compact_options(options);
let job = GcCompactJob::from_compact_options(options.clone());
if sub_compaction {
info!("running enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs");
let jobs = self.gc_compaction_split_jobs(job).await?;
let jobs = self
.gc_compaction_split_jobs(job, options.sub_compaction_max_job_size_mb)
.await?;
let jobs_len = jobs.len();
for (idx, job) in jobs.into_iter().enumerate() {
info!(
Expand Down
2 changes: 2 additions & 0 deletions test_runner/regress/test_compaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
if i % 10 == 0:
log.info(f"Running churn round {i}/{churn_rounds} ...")

if (i - 1) % 10 == 0:
# Run gc-compaction every 10 rounds to ensure the test doesn't take too long time.
ps_http.timeline_compact(
tenant_id,
Expand All @@ -165,6 +166,7 @@ def test_pageserver_gc_compaction_smoke(neon_env_builder: NeonEnvBuilder):
"start": "000000000000000000000000000000000000",
"end": "030000000000000000000000000000000000",
},
"sub_compaction_max_job_size_mb": 16,
},
)

Expand Down

0 comments on commit 6fa8592

Please sign in to comment.