Skip to content

Commit

Permalink
feat: manual compaction (#3988)
Browse files Browse the repository at this point in the history
* add compaction udf params

* wip: pass compaction options through grpc

* wip: pass compaction options all the way down to region server

* wip: window compaction task

* feat: trigger major compaction

* refactor: optimize compaction parameter parsing

* chore: rebase main

* chore: update proto

* chore: add some tests

* feat: validate catalog

* chore: fix typo and rebase main

* fix: some cr comments

* fix: file_time_bucket_span

* fix: avoid upper bound overflow

* chore: update proto
  • Loading branch information
v0y4g3r authored May 22, 2024
1 parent 9e1af79 commit 090b59e
Show file tree
Hide file tree
Showing 21 changed files with 1,412 additions and 543 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a11db14b8502f55ca5348917fd18e6fcf140f55e" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3cd71167ee067c5679a7fb17cf58bdfbb5487a0d" }
humantime = "2.1"
humantime-serde = "1.1"
itertools = "0.10"
Expand Down
323 changes: 261 additions & 62 deletions src/common/function/src/table/flush_compact_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

use std::fmt;
use std::str::FromStr;

use api::v1::region::{compact_request, StrictWindow};
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
Expand All @@ -22,7 +24,7 @@ use common_query::error::{
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use common_telemetry::{error, info};
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
Expand All @@ -34,84 +36,177 @@ use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;

macro_rules! define_table_function {
($name: expr, $display_name_str: expr, $display_name: ident, $func: ident, $request: ident) => {
/// A function to $func table, such as `$display_name(table_name)`.
#[admin_fn(name = $name, display_name = $display_name_str, sig_fn = "signature", ret = "uint64")]
pub(crate) async fn $display_name(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);
/// Compact type: strict window.
const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window";

let ValueRef::String(table_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: $display_name_str,
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};

let (catalog_name, schema_name, table_name) =
table_name_to_full_name(table_name, &query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;

let affected_rows = table_mutation_handler
.$func(
$request {
catalog_name,
schema_name,
table_name,
},
query_ctx.clone(),
)
.await?;

Ok(Value::from(affected_rows as u64))
#[admin_fn(
name = "FlushTableFunction",
display_name = "flush_table",
sig_fn = "flush_signature",
ret = "uint64"
)]
pub(crate) async fn flush_table(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);

let ValueRef::String(table_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "flush_table",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};

let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;

let affected_rows = table_mutation_handler
.flush(
FlushTableRequest {
catalog_name,
schema_name,
table_name,
},
query_ctx.clone(),
)
.await?;

Ok(Value::from(affected_rows as u64))
}

define_table_function!(
"FlushTableFunction",
"flush_table",
flush_table,
flush,
FlushTableRequest
);

define_table_function!(
"CompactTableFunction",
"compact_table",
compact_table,
compact,
CompactTableRequest
);

fn signature() -> Signature {
#[admin_fn(
name = "CompactTableFunction",
display_name = "compact_table",
sig_fn = "compact_signature",
ret = "uint64"
)]
pub(crate) async fn compact_table(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let request = parse_compact_params(params, query_ctx)?;
info!("Compact table request: {:?}", request);

let affected_rows = table_mutation_handler
.compact(request, query_ctx.clone())
.await?;

Ok(Value::from(affected_rows as u64))
}

fn flush_signature() -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}

fn compact_signature() -> Signature {
Signature::variadic(
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}

/// Parses `compact_table` UDF parameters. This function accepts following combinations:
/// - `[<table_name>]`: only tables name provided, using default compaction type: regular
/// - `[<table_name>, <type>]`: specify table name and compaction type. The compaction options will be default.
/// - `[<table_name>, <type>, <options>]`: provides both type and type-specific options.
fn parse_compact_params(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<CompactTableRequest> {
ensure!(
!params.is_empty(),
InvalidFuncArgsSnafu {
err_msg: "Args cannot be empty",
}
);

let (table_name, compact_type) = match params {
[ValueRef::String(table_name)] => (
table_name,
compact_request::Options::Regular(Default::default()),
),
[ValueRef::String(table_name), ValueRef::String(compact_ty_str)] => {
let compact_type = parse_compact_type(compact_ty_str, None)?;
(table_name, compact_type)
}

[ValueRef::String(table_name), ValueRef::String(compact_ty_str), ValueRef::String(options_str)] =>
{
let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?;
(table_name, compact_type)
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: "compact_table",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail()
}
};

let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;

Ok(CompactTableRequest {
catalog_name,
schema_name,
table_name,
compact_options: compact_type,
})
}

fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result<compact_request::Options> {
if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW) {
let window_seconds = option
.map(|v| {
i64::from_str(v).map_err(|_| {
InvalidFuncArgsSnafu {
err_msg: format!(
"Compact window is expected to be a valid number, provided: {}",
v
),
}
.build()
})
})
.transpose()?
.unwrap_or(0);

Ok(compact_request::Options::StrictWindow(StrictWindow {
window_seconds,
}))
} else {
Ok(compact_request::Options::Regular(Default::default()))
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use api::v1::region::compact_request::Options;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::prelude::TypeSignature;
use datatypes::vectors::{StringVector, UInt64Vector};
use session::context::QueryContext;

use super::*;

Expand Down Expand Up @@ -174,5 +269,109 @@ mod tests {

define_table_function_test!(flush_table, FlushTableFunction);

define_table_function_test!(compact_table, CompactTableFunction);
fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) {
for (params, expected) in cases {
let params = params
.iter()
.map(|s| ValueRef::String(s))
.collect::<Vec<_>>();

assert_eq!(
expected,
&parse_compact_params(&params, &QueryContext::arc()).unwrap()
);
}
}

#[test]
fn test_parse_compact_params() {
check_parse_compact_params(&[
(
&["table"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&[&format!("{}.table", DEFAULT_SCHEMA_NAME)],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&[&format!(
"{}.{}.table",
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME
)],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&["table", "regular"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
(
&["table", "strict_window"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }),
},
),
(
&["table", "strict_window", "3600"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::StrictWindow(StrictWindow {
window_seconds: 3600,
}),
},
),
(
&["table", "regular", "abcd"],
CompactTableRequest {
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "table".to_string(),
compact_options: Options::Regular(Default::default()),
},
),
]);

assert!(parse_compact_params(
&["table", "strict_window", "abc"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err());

assert!(parse_compact_params(
&["a.b.table", "strict_window", "abc"]
.into_iter()
.map(ValueRef::String)
.collect::<Vec<_>>(),
&QueryContext::arc(),
)
.is_err());
}
}
Loading

0 comments on commit 090b59e

Please sign in to comment.