Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pageserver): support key range for manual compaction trigger #9723

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions libs/utils/src/http/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ pub async fn json_request<T: for<'de> Deserialize<'de>>(
.map_err(ApiError::BadRequest)
}

pub async fn json_request_maybe<T: for<'de> Deserialize<'de> + Default>(
request: &mut Request<Body>,
) -> Result<T, ApiError> {
let body = hyper::body::aggregate(request.body_mut())
.await
.context("Failed to read request body")
.map_err(ApiError::BadRequest)?;

if body.remaining() == 0 {
return Ok(T::default());
}

let mut deser = serde_json::de::Deserializer::from_reader(body.reader());

serde_path_to_error::deserialize(&mut deser)
// intentionally stringify because the debug version is not helpful in python logs
.map_err(|e| anyhow::anyhow!("Failed to parse json request: {e}"))
.map_err(ApiError::BadRequest)
}

pub fn json_response<T: Serialize>(
status: StatusCode,
data: T,
Expand Down
15 changes: 12 additions & 3 deletions pageserver/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ use crate::tenant::storage_layer::LayerName;
use crate::tenant::timeline::offload::offload_timeline;
use crate::tenant::timeline::offload::OffloadError;
use crate::tenant::timeline::CompactFlags;
use crate::tenant::timeline::CompactOptions;
use crate::tenant::timeline::CompactRange;
use crate::tenant::timeline::CompactionError;
use crate::tenant::timeline::Timeline;
use crate::tenant::GetTimelineError;
Expand All @@ -100,7 +102,7 @@ use utils::{
http::{
endpoint::{self, attach_openapi_ui, auth_middleware, check_permission_with},
error::{ApiError, HttpErrorBody},
json::{json_request, json_response},
json::{json_request, json_request_maybe, json_response},
request::parse_request_param,
RequestExt, RouterBuilder,
},
Expand Down Expand Up @@ -1927,13 +1929,15 @@ async fn timeline_gc_handler(

// Run compaction immediately on given timeline.
async fn timeline_compact_handler(
request: Request<Body>,
mut request: Request<Body>,
cancel: CancellationToken,
) -> Result<Response<Body>, ApiError> {
let tenant_shard_id: TenantShardId = parse_request_param(&request, "tenant_shard_id")?;
let timeline_id: TimelineId = parse_request_param(&request, "timeline_id")?;
check_permission(&request, Some(tenant_shard_id.tenant_id))?;

let compact_range = json_request_maybe::<Option<CompactRange>>(&mut request).await?;

let state = get_state(&request);

let mut flags = EnumSet::empty();
Expand All @@ -1957,11 +1961,16 @@ async fn timeline_compact_handler(
let wait_until_uploaded =
parse_query_param::<_, bool>(&request, "wait_until_uploaded")?.unwrap_or(false);

let options = CompactOptions {
compact_range,
flags,
};

async {
let ctx = RequestContext::new(TaskKind::MgmtRequest, DownloadBehavior::Download);
let timeline = active_timeline_of_active_tenant(&state.tenant_manager, tenant_shard_id, timeline_id).await?;
timeline
.compact(&cancel, flags, &ctx)
.compact_with_options(&cancel, options, &ctx)
.await
.map_err(|e| ApiError::InternalServerError(e.into()))?;
if wait_until_uploaded {
Expand Down
42 changes: 28 additions & 14 deletions pageserver/src/tenant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5170,7 +5170,7 @@ mod tests {
use storage_layer::PersistentLayerKey;
use tests::storage_layer::ValuesReconstructState;
use tests::timeline::{GetVectoredError, ShutdownMode};
use timeline::DeltaLayerTestDesc;
use timeline::{CompactOptions, DeltaLayerTestDesc};
use utils::id::TenantId;

#[cfg(feature = "testing")]
Expand Down Expand Up @@ -7644,7 +7644,7 @@ mod tests {

let cancel = CancellationToken::new();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();

Expand Down Expand Up @@ -7721,7 +7721,7 @@ mod tests {
guard.cutoffs.space = Lsn(0x40);
}
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();

Expand Down Expand Up @@ -8153,7 +8153,7 @@ mod tests {

let cancel = CancellationToken::new();
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();

Expand Down Expand Up @@ -8182,7 +8182,7 @@ mod tests {
guard.cutoffs.space = Lsn(0x40);
}
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();

Expand Down Expand Up @@ -8735,22 +8735,29 @@ mod tests {
dryrun_flags.insert(CompactFlags::DryRun);

tline
.compact_with_gc(&cancel, dryrun_flags, &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: dryrun_flags,
compact_range: None,
},
&ctx,
)
.await
.unwrap();
// We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs
// cleaning things up, and therefore, we don't do sanity checks on the layer map during unit tests.
verify_result().await;

tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await;

// compact again
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await;
Expand All @@ -8763,14 +8770,14 @@ mod tests {
guard.cutoffs.space = Lsn(0x38);
}
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await; // no wals between 0x30 and 0x38, so we should obtain the same result

// not increasing the GC horizon and compact again
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await;
Expand Down Expand Up @@ -8964,22 +8971,29 @@ mod tests {
dryrun_flags.insert(CompactFlags::DryRun);

tline
.compact_with_gc(&cancel, dryrun_flags, &ctx)
.compact_with_gc(
&cancel,
CompactOptions {
flags: dryrun_flags,
compact_range: None,
},
&ctx,
)
.await
.unwrap();
// We expect layer map to be the same b/c the dry run flag, but we don't know whether there will be other background jobs
// cleaning things up, and therefore, we don't do sanity checks on the layer map during unit tests.
verify_result().await;

tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await;

// compact again
tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();
verify_result().await;
Expand Down Expand Up @@ -9164,7 +9178,7 @@ mod tests {

let cancel = CancellationToken::new();
branch_tline
.compact_with_gc(&cancel, EnumSet::new(), &ctx)
.compact_with_gc(&cancel, CompactOptions::default(), &ctx)
.await
.unwrap();

Expand Down
36 changes: 35 additions & 1 deletion pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,21 @@ pub(crate) enum CompactFlags {
DryRun,
}

#[serde_with::serde_as]
#[derive(Debug, Clone, serde::Deserialize)]
pub(crate) struct CompactRange {
#[serde_as(as = "serde_with::DisplayFromStr")]
pub start: Key,
#[serde_as(as = "serde_with::DisplayFromStr")]
pub end: Key,
}

#[derive(Clone, Default)]
pub(crate) struct CompactOptions {
pub flags: EnumSet<CompactFlags>,
pub compact_range: Option<CompactRange>,
}

impl std::fmt::Debug for Timeline {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Timeline<{}>", self.timeline_id)
Expand Down Expand Up @@ -1589,6 +1604,25 @@ impl Timeline {
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
self.compact_with_options(
cancel,
CompactOptions {
flags,
compact_range: None,
},
ctx,
)
.await
}

/// Outermost timeline compaction operation; downloads needed layers. Returns whether we have pending
/// compaction tasks.
pub(crate) async fn compact_with_options(
self: &Arc<Self>,
cancel: &CancellationToken,
options: CompactOptions,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
// most likely the cancellation token is from background task, but in tests it could be the
// request task as well.
Expand Down Expand Up @@ -1626,7 +1660,7 @@ impl Timeline {
self.compact_tiered(cancel, ctx).await?;
Ok(false)
}
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, flags, ctx).await,
CompactionAlgorithm::Legacy => self.compact_legacy(cancel, options, ctx).await,
}
}

Expand Down
43 changes: 32 additions & 11 deletions pageserver/src/tenant/timeline/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::Arc;

use super::layer_manager::LayerManager;
use super::{
CompactFlags, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
CompactFlags, CompactOptions, CreateImageLayersError, DurationRecorder, ImageLayerCreationMode,
RecordedDuration, Timeline,
};

Expand Down Expand Up @@ -273,22 +273,32 @@ impl Timeline {
pub(crate) async fn compact_legacy(
self: &Arc<Self>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
options: CompactOptions,
ctx: &RequestContext,
) -> Result<bool, CompactionError> {
if flags.contains(CompactFlags::EnhancedGcBottomMostCompaction) {
self.compact_with_gc(cancel, flags, ctx)
if options
.flags
.contains(CompactFlags::EnhancedGcBottomMostCompaction)
{
self.compact_with_gc(cancel, options, ctx)
.await
.map_err(CompactionError::Other)?;
return Ok(false);
}

if flags.contains(CompactFlags::DryRun) {
if options.flags.contains(CompactFlags::DryRun) {
return Err(CompactionError::Other(anyhow!(
"dry-run mode is not supported for legacy compaction for now"
)));
}

if options.compact_range.is_some() {
// maybe useful in the future? could implement this at some point
return Err(CompactionError::Other(anyhow!(
"compaction range is not supported for legacy compaction for now"
)));
}

// High level strategy for compaction / image creation:
//
// 1. First, calculate the desired "partitioning" of the
Expand Down Expand Up @@ -338,7 +348,7 @@ impl Timeline {
.repartition(
self.get_last_record_lsn(),
self.get_compaction_target_size(),
flags,
options.flags,
ctx,
)
.await
Expand All @@ -354,7 +364,7 @@ impl Timeline {
let fully_compacted = self
.compact_level0(
target_file_size,
flags.contains(CompactFlags::ForceL0Compaction),
options.flags.contains(CompactFlags::ForceL0Compaction),
ctx,
)
.await?;
Expand All @@ -372,7 +382,10 @@ impl Timeline {
.create_image_layers(
&partitioning,
lsn,
if flags.contains(CompactFlags::ForceImageLayerCreation) {
if options
.flags
.contains(CompactFlags::ForceImageLayerCreation)
{
ImageLayerCreationMode::Force
} else {
ImageLayerCreationMode::Try
Expand Down Expand Up @@ -1736,11 +1749,19 @@ impl Timeline {
pub(crate) async fn compact_with_gc(
self: &Arc<Self>,
cancel: &CancellationToken,
flags: EnumSet<CompactFlags>,
options: CompactOptions,
ctx: &RequestContext,
) -> anyhow::Result<()> {
self.partial_compact_with_gc(Key::MIN..Key::MAX, cancel, flags, ctx)
.await
self.partial_compact_with_gc(
options
.compact_range
.map(|range| range.start..range.end)
.unwrap_or_else(|| Key::MIN..Key::MAX),
cancel,
options.flags,
ctx,
)
.await
}

/// An experimental compaction building block that combines compaction with garbage collection.
Expand Down
2 changes: 2 additions & 0 deletions test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ def timeline_compact(
force_l0_compaction=False,
wait_until_uploaded=False,
enhanced_gc_bottom_most_compaction=False,
body: Optional[dict[str, Any]] = None,
):
self.is_testing_enabled_or_skip()
query = {}
Expand All @@ -683,6 +684,7 @@ def timeline_compact(
res = self.put(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/compact",
params=query,
json=body,
)
log.info(f"Got compact request response code: {res.status_code}")
self.verbose_error(res)
Expand Down
Loading
Loading