Skip to content

Commit

Permalink
chore: add some tests
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed May 21, 2024
1 parent 747903f commit 955b3e3
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 64 deletions.
121 changes: 119 additions & 2 deletions src/common/function/src/table/flush_compact_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;

/// Compact type: strict window.
const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";

#[admin_fn(
name = "FlushTableFunction",
display_name = "flush_table",
Expand Down Expand Up @@ -119,6 +122,10 @@ fn compact_signature() -> Signature {
)
}

/// Parses `compact_table` UDF parameters. This function accepts following combinations:
/// - `[<table_name>]`: only tables name provided, using default compaction type: regular
/// - `[<table_name>, <type>]`: specify table name and compaction type. The compaction options will be default.
/// - `[<table_name>, <type>, <options>]`: provides both type and type-specific options.
fn parse_compact_params(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
Expand Down Expand Up @@ -167,8 +174,22 @@ fn parse_compact_params(
}

fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
if type_str.eq_ignore_ascii_case("strict_window") {
let window_seconds = option.and_then(|v| i64::from_str(v).ok()).unwrap_or(0);
if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW) {
let window_seconds = option
.map(|v| {
i64::from_str(v).map_err(|_| {
InvalidFuncArgsSnafu {
err_msg: format!(
"Compact window is expected to be a valid number, provided: {}",
v
),
}
.build()
})
})
.transpose()?
.unwrap_or(0);

Ok(compact_request::Options::StrictWindow(StrictWindow {
window_seconds,
}))
Expand All @@ -181,8 +202,10 @@ fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_re
mod tests {
use std::sync::Arc;

use api::v1::region::compact_request::Options;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{StringVector, UInt64Vector};
use session::context::QueryContext;

use super::*;

Expand Down Expand Up @@ -244,4 +267,98 @@ mod tests {
}

define_table_function_test!(flush_table, FlushTableFunction);

fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
for (params, expected) in cases {
let params = params
.iter()
.map(|s| ValueRef::String(s))
.collect::<Vec<_>>();

assert_eq!(
expected,
&parse_compact_params(&params, &QueryContext::arc()).unwrap()
);
}
}

#[test]
fn test_parse_compact_params() {
check_parse_compact_params(&[
(
&["table"],
CompactTableRequest {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&["a.table"],
CompactTableRequest {
catalog_name: "greptime".to_string(),
schema_name: "a".to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&["a.b.c"],
CompactTableRequest {
catalog_name: "a".to_string(),
schema_name: "b".to_string(),
table_name: "c".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&["table", "regular"],
CompactTableRequest {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&["table", "strict_window"],
CompactTableRequest {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "table".to_string(),
compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
},
),
(
&["table", "strict_window", "3600"],
CompactTableRequest {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "table".to_string(),
compact_options: Options::StrictWindow(StrictWindow {
window_seconds: 3600,
}),
},
),
(
&["table", "regular", "abcd"],
CompactTableRequest {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
]);

assert!(parse_compact_params(
&["table", "strict_window", "abc"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err());
}
}
35 changes: 18 additions & 17 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ pub struct CompactionRequest {
pub(crate) engine_config: Arc<MitoConfig>,
pub(crate) current_version: VersionRef,
pub(crate) access_layer: AccessLayerRef,
pub(crate) compact_options: compact_request::Options,
/// Sender to send notification to the region worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequest>,
/// Waiters of the compaction request.
Expand Down Expand Up @@ -161,7 +160,6 @@ impl CompactionScheduler {
file_purger.clone(),
);
let request = status.new_compaction_request(
compact_options,
self.request_sender.clone(),
waiter,
self.engine_config.clone(),
Expand All @@ -170,7 +168,7 @@ impl CompactionScheduler {
self.listener.clone(),
);
self.region_status.insert(region_id, status);
self.schedule_compaction_request(request)
self.schedule_compaction_request(request, compact_options)
}

/// Notifies the scheduler that the compaction job is finished successfully.
Expand All @@ -185,7 +183,6 @@ impl CompactionScheduler {

// We should always try to compact the region until picker returns None.
let request = status.new_compaction_request(
compact_request::Options::Regular(Default::default()), // todo(hl): we should not trigger a regular compaction after major compaction.
self.request_sender.clone(),
OptionOutputTx::none(),
self.engine_config.clone(),
Expand All @@ -194,7 +191,10 @@ impl CompactionScheduler {
self.listener.clone(),
);
// Try to schedule next compaction task for this region.
if let Err(e) = self.schedule_compaction_request(request) {
if let Err(e) = self.schedule_compaction_request(
request,
compact_request::Options::Regular(Default::default()),
) {
error!(e; "Failed to schedule next compaction for region {}", region_id);
}
}
Expand Down Expand Up @@ -235,18 +235,21 @@ impl CompactionScheduler {
/// Schedules a compaction request.
///
/// If the region has nothing to compact, it removes the region from the status map.
fn schedule_compaction_request(&mut self, request: CompactionRequest) -> Result<()> {
let picker =
if let compact_request::Options::StrictWindow(window) = &request.compact_options {
let window = if window.window_seconds == 0 {
None
} else {
Some(window.window_seconds)
};
Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
fn schedule_compaction_request(
&mut self,
request: CompactionRequest,
options: compact_request::Options,
) -> Result<()> {
let picker = if let compact_request::Options::StrictWindow(window) = &options {
let window = if window.window_seconds == 0 {
None
} else {
compaction_options_to_picker(&request.current_version.options.compaction)
Some(window.window_seconds)
};
Arc::new(WindowedCompactionPicker::new(window)) as Arc<_>
} else {
compaction_options_to_picker(&request.current_version.options.compaction)
};

let region_id = request.region_id();
debug!(
Expand Down Expand Up @@ -374,7 +377,6 @@ impl CompactionStatus {
#[allow(clippy::too_many_arguments)]
fn new_compaction_request(
&mut self,
compact_options: compact_request::Options,
request_sender: Sender<WorkerRequest>,
waiter: OptionOutputTx,
engine_config: Arc<MitoConfig>,
Expand All @@ -388,7 +390,6 @@ impl CompactionStatus {
engine_config,
current_version,
access_layer: self.access_layer.clone(),
compact_options,
request_sender: request_sender.clone(),
waiters: Vec::new(),
file_purger: self.file_purger.clone(),
Expand Down
Loading

0 comments on commit 955b3e3

Please sign in to comment.