diff --git a/Cargo.lock b/Cargo.lock index 2fe59305bd03..3f55db18cc91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4233,7 +4233,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=a11db14b8502f55ca5348917fd18e6fcf140f55e#a11db14b8502f55ca5348917fd18e6fcf140f55e" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3cd71167ee067c5679a7fb17cf58bdfbb5487a0d#3cd71167ee067c5679a7fb17cf58bdfbb5487a0d" dependencies = [ "prost 0.12.4", "serde", @@ -10538,6 +10538,7 @@ dependencies = [ "datatypes", "derive_builder 0.12.0", "futures", + "greptime-proto", "humantime", "humantime-serde", "parquet", diff --git a/Cargo.toml b/Cargo.toml index 52cb6b1306db..cd75659d21b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -120,7 +120,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "a11db14b8502f55ca5348917fd18e6fcf140f55e" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3cd71167ee067c5679a7fb17cf58bdfbb5487a0d" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/function/src/table/flush_compact_table.rs b/src/common/function/src/table/flush_compact_table.rs index 83c0cbd93c80..968f5a99e8c1 100644 --- a/src/common/function/src/table/flush_compact_table.rs +++ b/src/common/function/src/table/flush_compact_table.rs @@ -13,7 +13,9 @@ // limitations under the License. use std::fmt; +use std::str::FromStr; +use api::v1::region::{compact_request, StrictWindow}; use common_error::ext::BoxedError; use common_macro::admin_fn; use common_query::error::Error::ThreadJoin; @@ -22,7 +24,7 @@ use common_query::error::{ UnsupportedInputDataTypeSnafu, }; use common_query::prelude::{Signature, Volatility}; -use common_telemetry::error; +use common_telemetry::{error, info}; use datatypes::prelude::*; use datatypes::vectors::VectorRef; use session::context::QueryContextRef; @@ -34,71 +36,78 @@ use crate::ensure_greptime; use crate::function::{Function, FunctionContext}; use crate::handlers::TableMutationHandlerRef; -macro_rules! define_table_function { - ($name: expr, $display_name_str: expr, $display_name: ident, $func: ident, $request: ident) => { - /// A function to $func table, such as `$display_name(table_name)`. - #[admin_fn(name = $name, display_name = $display_name_str, sig_fn = "signature", ret = "uint64")] - pub(crate) async fn $display_name( - table_mutation_handler: &TableMutationHandlerRef, - query_ctx: &QueryContextRef, - params: &[ValueRef<'_>], - ) -> Result { - ensure!( - params.len() == 1, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect 1, have: {}", - params.len() - ), - } - ); +/// Compact type: strict window. +const COMPACT_TYPE_STRICT_WINDOW: &str = "strict_window"; - let ValueRef::String(table_name) = params[0] else { - return UnsupportedInputDataTypeSnafu { - function: $display_name_str, - datatypes: params.iter().map(|v| v.data_type()).collect::>(), - } - .fail(); - }; - - let (catalog_name, schema_name, table_name) = - table_name_to_full_name(table_name, &query_ctx) - .map_err(BoxedError::new) - .context(TableMutationSnafu)?; - - let affected_rows = table_mutation_handler - .$func( - $request { - catalog_name, - schema_name, - table_name, - }, - query_ctx.clone(), - ) - .await?; - - Ok(Value::from(affected_rows as u64)) +#[admin_fn( + name = "FlushTableFunction", + display_name = "flush_table", + sig_fn = "flush_signature", + ret = "uint64" +)] +pub(crate) async fn flush_table( + table_mutation_handler: &TableMutationHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + ensure!( + params.len() == 1, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 1, have: {}", + params.len() + ), } + ); + + let ValueRef::String(table_name) = params[0] else { + return UnsupportedInputDataTypeSnafu { + function: "flush_table", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); }; + + let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx) + .map_err(BoxedError::new) + .context(TableMutationSnafu)?; + + let affected_rows = table_mutation_handler + .flush( + FlushTableRequest { + catalog_name, + schema_name, + table_name, + }, + query_ctx.clone(), + ) + .await?; + + Ok(Value::from(affected_rows as u64)) } -define_table_function!( - "FlushTableFunction", - "flush_table", - flush_table, - flush, - FlushTableRequest -); - -define_table_function!( - "CompactTableFunction", - "compact_table", - compact_table, - compact, - CompactTableRequest -); - -fn signature() -> Signature { +#[admin_fn( + name = "CompactTableFunction", + display_name = "compact_table", + sig_fn = "compact_signature", + ret = "uint64" +)] +pub(crate) async fn compact_table( + table_mutation_handler: &TableMutationHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + let request = parse_compact_params(params, query_ctx)?; + info!("Compact table request: {:?}", request); + + let affected_rows = table_mutation_handler + .compact(request, query_ctx.clone()) + .await?; + + Ok(Value::from(affected_rows as u64)) +} + +fn flush_signature() -> Signature { Signature::uniform( 1, vec![ConcreteDataType::string_datatype()], @@ -106,12 +115,98 @@ fn signature() -> Signature { ) } +fn compact_signature() -> Signature { + Signature::variadic( + vec![ConcreteDataType::string_datatype()], + Volatility::Immutable, + ) +} + +/// Parses `compact_table` UDF parameters. This function accepts following combinations: +/// - `[]`: only tables name provided, using default compaction type: regular +/// - `[, ]`: specify table name and compaction type. The compaction options will be default. +/// - `[, , ]`: provides both type and type-specific options. +fn parse_compact_params( + params: &[ValueRef<'_>], + query_ctx: &QueryContextRef, +) -> Result { + ensure!( + !params.is_empty(), + InvalidFuncArgsSnafu { + err_msg: "Args cannot be empty", + } + ); + + let (table_name, compact_type) = match params { + [ValueRef::String(table_name)] => ( + table_name, + compact_request::Options::Regular(Default::default()), + ), + [ValueRef::String(table_name), ValueRef::String(compact_ty_str)] => { + let compact_type = parse_compact_type(compact_ty_str, None)?; + (table_name, compact_type) + } + + [ValueRef::String(table_name), ValueRef::String(compact_ty_str), ValueRef::String(options_str)] => + { + let compact_type = parse_compact_type(compact_ty_str, Some(options_str))?; + (table_name, compact_type) + } + _ => { + return UnsupportedInputDataTypeSnafu { + function: "compact_table", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail() + } + }; + + let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx) + .map_err(BoxedError::new) + .context(TableMutationSnafu)?; + + Ok(CompactTableRequest { + catalog_name, + schema_name, + table_name, + compact_options: compact_type, + }) +} + +fn parse_compact_type(type_str: &str, option: Option<&str>) -> Result { + if type_str.eq_ignore_ascii_case(COMPACT_TYPE_STRICT_WINDOW) { + let window_seconds = option + .map(|v| { + i64::from_str(v).map_err(|_| { + InvalidFuncArgsSnafu { + err_msg: format!( + "Compact window is expected to be a valid number, provided: {}", + v + ), + } + .build() + }) + }) + .transpose()? + .unwrap_or(0); + + Ok(compact_request::Options::StrictWindow(StrictWindow { + window_seconds, + })) + } else { + Ok(compact_request::Options::Regular(Default::default())) + } +} + #[cfg(test)] mod tests { use std::sync::Arc; + use api::v1::region::compact_request::Options; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::prelude::TypeSignature; use datatypes::vectors::{StringVector, UInt64Vector}; + use session::context::QueryContext; use super::*; @@ -174,5 +269,109 @@ mod tests { define_table_function_test!(flush_table, FlushTableFunction); - define_table_function_test!(compact_table, CompactTableFunction); + fn check_parse_compact_params(cases: &[(&[&str], CompactTableRequest)]) { + for (params, expected) in cases { + let params = params + .iter() + .map(|s| ValueRef::String(s)) + .collect::>(); + + assert_eq!( + expected, + &parse_compact_params(¶ms, &QueryContext::arc()).unwrap() + ); + } + } + + #[test] + fn test_parse_compact_params() { + check_parse_compact_params(&[ + ( + &["table"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::Regular(Default::default()), + }, + ), + ( + &[&format!("{}.table", DEFAULT_SCHEMA_NAME)], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::Regular(Default::default()), + }, + ), + ( + &[&format!( + "{}.{}.table", + DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME + )], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::Regular(Default::default()), + }, + ), + ( + &["table", "regular"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::Regular(Default::default()), + }, + ), + ( + &["table", "strict_window"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::StrictWindow(StrictWindow { window_seconds: 0 }), + }, + ), + ( + &["table", "strict_window", "3600"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::StrictWindow(StrictWindow { + window_seconds: 3600, + }), + }, + ), + ( + &["table", "regular", "abcd"], + CompactTableRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: "table".to_string(), + compact_options: Options::Regular(Default::default()), + }, + ), + ]); + + assert!(parse_compact_params( + &["table", "strict_window", "abc"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err()); + + assert!(parse_compact_params( + &["a.b.table", "strict_window", "abc"] + .into_iter() + .map(ValueRef::String) + .collect::>(), + &QueryContext::arc(), + ) + .is_err()); + } } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index b5dbbac0b9ee..b8f236c7ded2 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -219,6 +219,7 @@ impl RegionServerHandler for RegionServer { .context(BuildRegionRequestsSnafu) .map_err(BoxedError::new) .context(ExecuteGrpcRequestSnafu)?; + let tracing_context = TracingContext::from_current_span(); let results = if is_parallel { diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 9cde6a0aad8f..424198439a88 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -12,35 +12,54 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod buckets; mod picker; +mod task; #[cfg(test)] mod test_util; mod twcs; +mod window; use std::collections::HashMap; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; +use api::v1::region::compact_request; use common_telemetry::{debug, error}; +use common_time::range::TimestampRange; +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; +use datafusion_common::ScalarValue; +use datafusion_expr::Expr; pub use picker::CompactionPickerRef; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; +use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; +use table::predicate::Predicate; use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; use crate::cache::CacheManagerRef; use crate::compaction::twcs::TwcsPicker; +use crate::compaction::window::WindowedCompactionPicker; use crate::config::MitoConfig; use crate::error::{ CompactRegionSnafu, Error, RegionClosedSnafu, RegionDroppedSnafu, RegionTruncatedSnafu, Result, + TimeRangePredicateOverflowSnafu, }; use crate::metrics::COMPACTION_STAGE_ELAPSED; +use crate::read::projection::ProjectionMapper; +use crate::read::scan_region::ScanInput; +use crate::read::seq_scan::SeqScan; +use crate::read::BoxedBatchReader; use crate::region::options::CompactionOptions; use crate::region::version::{VersionControlRef, VersionRef}; use crate::region::ManifestContextRef; use crate::request::{OptionOutputTx, OutputTx, WorkerRequest}; use crate::schedule::scheduler::SchedulerRef; +use crate::sst::file::{FileHandle, FileId, Level}; use crate::sst::file_purger::FilePurgerRef; +use crate::sst::version::LevelMeta; use crate::worker::WorkerListener; /// Region compaction request. @@ -116,9 +135,11 @@ impl CompactionScheduler { } /// Schedules a compaction for the region. + #[allow(clippy::too_many_arguments)] pub(crate) fn schedule_compaction( &mut self, region_id: RegionId, + compact_options: compact_request::Options, version_control: &VersionControlRef, access_layer: &AccessLayerRef, file_purger: &FilePurgerRef, @@ -147,7 +168,7 @@ impl CompactionScheduler { self.listener.clone(), ); self.region_status.insert(region_id, status); - self.schedule_compaction_request(request) + self.schedule_compaction_request(request, compact_options) } /// Notifies the scheduler that the compaction job is finished successfully. @@ -159,6 +180,7 @@ impl CompactionScheduler { let Some(status) = self.region_status.get_mut(®ion_id) else { return; }; + // We should always try to compact the region until picker returns None. let request = status.new_compaction_request( self.request_sender.clone(), @@ -169,7 +191,10 @@ impl CompactionScheduler { self.listener.clone(), ); // Try to schedule next compaction task for this region. - if let Err(e) = self.schedule_compaction_request(request) { + if let Err(e) = self.schedule_compaction_request( + request, + compact_request::Options::Regular(Default::default()), + ) { error!(e; "Failed to schedule next compaction for region {}", region_id); } } @@ -210,8 +235,22 @@ impl CompactionScheduler { /// Schedules a compaction request. /// /// If the region has nothing to compact, it removes the region from the status map. - fn schedule_compaction_request(&mut self, request: CompactionRequest) -> Result<()> { - let picker = compaction_options_to_picker(&request.current_version.options.compaction); + fn schedule_compaction_request( + &mut self, + request: CompactionRequest, + options: compact_request::Options, + ) -> Result<()> { + let picker = if let compact_request::Options::StrictWindow(window) = &options { + let window = if window.window_seconds == 0 { + None + } else { + Some(window.window_seconds) + }; + Arc::new(WindowedCompactionPicker::new(window)) as Arc<_> + } else { + compaction_options_to_picker(&request.current_version.options.compaction) + }; + let region_id = request.region_id(); debug!( "Pick compaction strategy {:?} for region: {}", @@ -226,6 +265,7 @@ impl CompactionScheduler { self.region_status.remove(®ion_id); return Ok(()); }; + drop(pick_timer); // Submit the compaction task. @@ -334,6 +374,7 @@ impl CompactionStatus { /// Creates a new compaction request for compaction picker. /// /// It consumes all pending compaction waiters. + #[allow(clippy::too_many_arguments)] fn new_compaction_request( &mut self, request_sender: Sender, @@ -368,6 +409,127 @@ impl CompactionStatus { } } +#[derive(Debug)] +pub(crate) struct CompactionOutput { + pub output_file_id: FileId, + /// Compaction output file level. + pub output_level: Level, + /// Compaction input files. + pub inputs: Vec, + /// Whether to remove deletion markers. + pub filter_deleted: bool, + /// Compaction output time range. + pub output_time_range: Option, +} + +/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. +async fn build_sst_reader( + metadata: RegionMetadataRef, + sst_layer: AccessLayerRef, + cache: Option, + inputs: &[FileHandle], + append_mode: bool, + filter_deleted: bool, + time_range: Option, +) -> Result { + let mut scan_input = ScanInput::new(sst_layer, ProjectionMapper::all(&metadata)?) + .with_files(inputs.to_vec()) + .with_append_mode(append_mode) + .with_cache(cache) + .with_filter_deleted(filter_deleted) + // We ignore file not found error during compaction. + .with_ignore_file_not_found(true); + + // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944 + // by converting time ranges into predicate. + if let Some(time_range) = time_range { + scan_input = scan_input.with_predicate(time_range_to_predicate(time_range, &metadata)?); + } + + SeqScan::new(scan_input).build_reader().await +} + +/// Converts time range to predicates so that rows outside the range will be filtered. +fn time_range_to_predicate( + range: TimestampRange, + metadata: &RegionMetadataRef, +) -> Result> { + let ts_col = metadata.time_index_column(); + + // safety: time index column's type must be a valid timestamp type. + let ts_col_unit = ts_col + .column_schema + .data_type + .as_timestamp() + .unwrap() + .unit(); + + let exprs = match (range.start(), range.end()) { + (Some(start), Some(end)) => { + vec![ + datafusion_expr::col(ts_col.column_schema.name.clone()) + .gt_eq(ts_to_lit(*start, ts_col_unit)?), + datafusion_expr::col(ts_col.column_schema.name.clone()) + .lt(ts_to_lit(*end, ts_col_unit)?), + ] + } + (Some(start), None) => { + vec![datafusion_expr::col(ts_col.column_schema.name.clone()) + .gt_eq(ts_to_lit(*start, ts_col_unit)?)] + } + + (None, Some(end)) => { + vec![datafusion_expr::col(ts_col.column_schema.name.clone()) + .lt(ts_to_lit(*end, ts_col_unit)?)] + } + (None, None) => { + return Ok(None); + } + }; + Ok(Some(Predicate::new(exprs))) +} + +fn ts_to_lit(ts: Timestamp, ts_col_unit: TimeUnit) -> Result { + let ts = ts + .convert_to(ts_col_unit) + .context(TimeRangePredicateOverflowSnafu { + timestamp: ts, + unit: ts_col_unit, + })?; + let val = ts.value(); + let scalar_value = match ts_col_unit { + TimeUnit::Second => ScalarValue::TimestampSecond(Some(val), None), + TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(Some(val), None), + TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(Some(val), None), + TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(Some(val), None), + }; + Ok(datafusion_expr::lit(scalar_value)) +} + +/// Finds all expired SSTs across levels. +fn get_expired_ssts( + levels: &[LevelMeta], + ttl: Option, + now: Timestamp, +) -> Vec { + let Some(ttl) = ttl else { + return vec![]; + }; + + let expire_time = match now.sub_duration(ttl) { + Ok(expire_time) => expire_time, + Err(e) => { + error!(e; "Failed to calculate region TTL expire time"); + return vec![]; + } + }; + + levels + .iter() + .flat_map(|l| l.get_expired_files(&expire_time).into_iter()) + .collect() +} + #[cfg(test)] mod tests { use std::sync::Mutex; @@ -397,6 +559,7 @@ mod tests { scheduler .schedule_compaction( builder.region_id(), + compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, &purger, @@ -415,6 +578,7 @@ mod tests { scheduler .schedule_compaction( builder.region_id(), + compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, &purger, @@ -477,6 +641,7 @@ mod tests { scheduler .schedule_compaction( region_id, + compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, &purger, @@ -505,6 +670,7 @@ mod tests { scheduler .schedule_compaction( region_id, + compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, &purger, @@ -536,6 +702,7 @@ mod tests { scheduler .schedule_compaction( region_id, + compact_request::Options::Regular(Default::default()), &version_control, &env.access_layer, &purger, diff --git a/src/mito2/src/compaction/buckets.rs b/src/mito2/src/compaction/buckets.rs new file mode 100644 index 000000000000..e094ceb8473e --- /dev/null +++ b/src/mito2/src/compaction/buckets.rs @@ -0,0 +1,126 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_time::timestamp::TimeUnit; +use common_time::Timestamp; + +use crate::sst::file::FileHandle; + +/// Infers the suitable time bucket duration. +/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span +/// into time bucket. +pub(crate) fn infer_time_bucket<'a>(files: impl Iterator) -> i64 { + let mut max_ts = Timestamp::new(i64::MIN, TimeUnit::Second); + let mut min_ts = Timestamp::new(i64::MAX, TimeUnit::Second); + + for f in files { + let (start, end) = f.time_range(); + min_ts = min_ts.min(start); + max_ts = max_ts.max(end); + } + + // safety: Convert whatever timestamp into seconds will not cause overflow. + let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value(); + let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value(); + + max_sec + .checked_sub(min_sec) + .map(|span| TIME_BUCKETS.fit_time_bucket(span)) // return the max bucket on subtraction overflow. + .unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty. +} + +pub(crate) struct TimeBuckets([i64; 7]); + +impl TimeBuckets { + /// Fits a given time span into time bucket by find the minimum bucket that can cover the span. + /// Returns the max bucket if no such bucket can be found. + fn fit_time_bucket(&self, span_sec: i64) -> i64 { + assert!(span_sec >= 0); + match self.0.binary_search(&span_sec) { + Ok(idx) => self.0[idx], + Err(idx) => { + if idx < self.0.len() { + self.0[idx] + } else { + self.0.last().copied().unwrap() + } + } + } + } + + #[cfg(test)] + fn get(&self, idx: usize) -> i64 { + self.0[idx] + } + + fn max(&self) -> i64 { + self.0.last().copied().unwrap() + } +} + +/// A set of predefined time buckets. +pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([ + 60 * 60, // one hour + 2 * 60 * 60, // two hours + 12 * 60 * 60, // twelve hours + 24 * 60 * 60, // one day + 7 * 24 * 60 * 60, // one week + 365 * 24 * 60 * 60, // one year + 10 * 365 * 24 * 60 * 60, // ten years +]); + +#[cfg(test)] +mod tests { + use super::*; + use crate::compaction::test_util::new_file_handle; + use crate::sst::file::FileId; + + #[test] + fn test_time_bucket() { + assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(1)); + assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(60 * 60)); + assert_eq!( + TIME_BUCKETS.get(1), + TIME_BUCKETS.fit_time_bucket(60 * 60 + 1) + ); + + assert_eq!( + TIME_BUCKETS.get(2), + TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2) - 1) + ); + assert_eq!( + TIME_BUCKETS.get(2), + TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2)) + ); + assert_eq!( + TIME_BUCKETS.get(3), + TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1) + ); + assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX)); + } + + #[test] + fn test_infer_time_buckets() { + assert_eq!( + TIME_BUCKETS.get(0), + infer_time_bucket( + [ + new_file_handle(FileId::random(), 0, TIME_BUCKETS.get(0) * 1000 - 1, 0), + new_file_handle(FileId::random(), 1, 10_000, 0) + ] + .iter() + ) + ); + } +} diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs new file mode 100644 index 000000000000..1869012f8d94 --- /dev/null +++ b/src/mito2/src/compaction/task.rs @@ -0,0 +1,318 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use common_telemetry::{error, info}; +use smallvec::SmallVec; +use snafu::ResultExt; +use store_api::metadata::RegionMetadataRef; +use store_api::storage::RegionId; +use tokio::sync::mpsc; + +use crate::access_layer::{AccessLayerRef, SstWriteRequest}; +use crate::cache::CacheManagerRef; +use crate::compaction::picker::CompactionTask; +use crate::compaction::{build_sst_reader, CompactionOutput}; +use crate::config::MitoConfig; +use crate::error; +use crate::error::CompactRegionSnafu; +use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; +use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; +use crate::read::Source; +use crate::region::options::IndexOptions; +use crate::region::version::VersionControlRef; +use crate::region::{ManifestContextRef, RegionState}; +use crate::request::{ + BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, +}; +use crate::sst::file::{FileHandle, FileMeta, IndexType}; +use crate::sst::file_purger::FilePurgerRef; +use crate::sst::parquet::WriteOptions; +use crate::worker::WorkerListener; + +const MAX_PARALLEL_COMPACTION: usize = 8; + +pub(crate) struct CompactionTaskImpl { + pub engine_config: Arc, + pub region_id: RegionId, + pub metadata: RegionMetadataRef, + pub sst_layer: AccessLayerRef, + pub outputs: Vec, + pub expired_ssts: Vec, + pub compaction_time_window: Option, + pub file_purger: FilePurgerRef, + /// Request sender to notify the worker. + pub(crate) request_sender: mpsc::Sender, + /// Senders that are used to notify waiters waiting for pending compaction tasks. + pub waiters: Vec, + /// Start time of compaction task + pub start_time: Instant, + pub(crate) cache_manager: CacheManagerRef, + /// Target storage of the region. + pub(crate) storage: Option, + /// Index options of the region. + pub(crate) index_options: IndexOptions, + /// The region is using append mode. + pub(crate) append_mode: bool, + /// Manifest context. + pub(crate) manifest_ctx: ManifestContextRef, + /// Version control to update. + pub(crate) version_control: VersionControlRef, + /// Event listener. + pub(crate) listener: WorkerListener, +} + +impl Debug for CompactionTaskImpl { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TwcsCompactionTask") + .field("region_id", &self.region_id) + .field("outputs", &self.outputs) + .field("expired_ssts", &self.expired_ssts) + .field("compaction_time_window", &self.compaction_time_window) + .field("append_mode", &self.append_mode) + .finish() + } +} + +impl Drop for CompactionTaskImpl { + fn drop(&mut self) { + self.mark_files_compacting(false) + } +} + +impl CompactionTaskImpl { + fn mark_files_compacting(&self, compacting: bool) { + self.outputs + .iter() + .flat_map(|o| o.inputs.iter()) + .for_each(|f| f.set_compacting(compacting)) + } + + /// Merges all SST files. + /// Returns `(output files, input files)`. + async fn merge_ssts(&mut self) -> error::Result<(Vec, Vec)> { + let mut futs = Vec::with_capacity(self.outputs.len()); + let mut compacted_inputs = + Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum()); + + for output in self.outputs.drain(..) { + compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta)); + + info!( + "Compaction region {} output [{}]-> {}", + self.region_id, + output + .inputs + .iter() + .map(|f| f.file_id().to_string()) + .collect::>() + .join(","), + output.output_file_id + ); + + let write_opts = WriteOptions { + write_buffer_size: self.engine_config.sst_write_buffer_size, + ..Default::default() + }; + let create_inverted_index = self + .engine_config + .inverted_index + .create_on_compaction + .auto(); + let mem_threshold_index_create = self + .engine_config + .inverted_index + .mem_threshold_on_create + .map(|m| m.as_bytes() as _); + let index_write_buffer_size = Some( + self.engine_config + .inverted_index + .write_buffer_size + .as_bytes() as usize, + ); + + let metadata = self.metadata.clone(); + let sst_layer = self.sst_layer.clone(); + let region_id = self.region_id; + let file_id = output.output_file_id; + let cache_manager = self.cache_manager.clone(); + let storage = self.storage.clone(); + let index_options = self.index_options.clone(); + let append_mode = self.append_mode; + futs.push(async move { + let reader = build_sst_reader( + metadata.clone(), + sst_layer.clone(), + Some(cache_manager.clone()), + &output.inputs, + append_mode, + output.filter_deleted, + output.output_time_range, + ) + .await?; + let file_meta_opt = sst_layer + .write_sst( + SstWriteRequest { + file_id, + metadata, + source: Source::Reader(reader), + cache_manager, + storage, + create_inverted_index, + mem_threshold_index_create, + index_write_buffer_size, + index_options, + }, + &write_opts, + ) + .await? + .map(|sst_info| FileMeta { + region_id, + file_id, + time_range: sst_info.time_range, + level: output.output_level, + file_size: sst_info.file_size, + available_indexes: sst_info + .inverted_index_available + .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) + .unwrap_or_default(), + index_file_size: sst_info.index_file_size, + }); + Ok(file_meta_opt) + }); + } + + let mut output_files = Vec::with_capacity(futs.len()); + while !futs.is_empty() { + let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION); + for _ in 0..MAX_PARALLEL_COMPACTION { + if let Some(task) = futs.pop() { + task_chunk.push(common_runtime::spawn_bg(task)); + } + } + let metas = futures::future::try_join_all(task_chunk) + .await + .context(error::JoinSnafu)? + .into_iter() + .collect::>>()?; + output_files.extend(metas.into_iter().flatten()); + } + + let inputs = compacted_inputs.into_iter().collect(); + Ok((output_files, inputs)) + } + + async fn handle_compaction(&mut self) -> error::Result<()> { + self.mark_files_compacting(true); + let merge_timer = COMPACTION_STAGE_ELAPSED + .with_label_values(&["merge"]) + .start_timer(); + let (added, mut deleted) = match self.merge_ssts().await { + Ok(v) => v, + Err(e) => { + error!(e; "Failed to compact region: {}", self.region_id); + merge_timer.stop_and_discard(); + return Err(e); + } + }; + deleted.extend(self.expired_ssts.iter().map(FileHandle::meta)); + let merge_time = merge_timer.stop_and_record(); + info!( + "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s", + self.region_id, + deleted, + added, + self.compaction_time_window, + self.waiters.len(), + merge_time, + ); + + self.listener.on_merge_ssts_finished(self.region_id).await; + + let _manifest_timer = COMPACTION_STAGE_ELAPSED + .with_label_values(&["write_manifest"]) + .start_timer(); + // Write region edit to manifest. + let edit = RegionEdit { + files_to_add: added, + files_to_remove: deleted, + compaction_time_window: self + .compaction_time_window + .map(|seconds| Duration::from_secs(seconds as u64)), + flushed_entry_id: None, + flushed_sequence: None, + }; + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); + // We might leak files if we fail to update manifest. We can add a cleanup task to + // remove them later. + self.manifest_ctx + .update_manifest(RegionState::Writable, action_list, || { + self.version_control + .apply_edit(edit, &[], self.file_purger.clone()); + }) + .await + } + + /// Handles compaction failure, notifies all waiters. + fn on_failure(&mut self, err: Arc) { + COMPACTION_FAILURE_COUNT.inc(); + for waiter in self.waiters.drain(..) { + waiter.send(Err(err.clone()).context(CompactRegionSnafu { + region_id: self.region_id, + })); + } + } + + /// Notifies region worker to handle post-compaction tasks. + async fn send_to_worker(&self, request: WorkerRequest) { + if let Err(e) = self.request_sender.send(request).await { + error!( + "Failed to notify compaction job status for region {}, request: {:?}", + self.region_id, e.0 + ); + } + } +} + +#[async_trait::async_trait] +impl CompactionTask for CompactionTaskImpl { + async fn run(&mut self) { + let notify = match self.handle_compaction().await { + Ok(()) => BackgroundNotify::CompactionFinished(CompactionFinished { + region_id: self.region_id, + senders: std::mem::take(&mut self.waiters), + start_time: self.start_time, + }), + Err(e) => { + error!(e; "Failed to compact region, region id: {}", self.region_id); + let err = Arc::new(e); + // notify compaction waiters + self.on_failure(err.clone()); + BackgroundNotify::CompactionFailed(CompactionFailed { + region_id: self.region_id, + err, + }) + } + }; + + self.send_to_worker(WorkerRequest::Background { + region_id: self.region_id, + notify, + }) + .await; + } +} diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 11aef62295ac..7c6bf0827574 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -15,44 +15,18 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap}; use std::fmt::{Debug, Formatter}; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use common_telemetry::{debug, error, info}; +use common_telemetry::{debug, info}; use common_time::timestamp::TimeUnit; use common_time::timestamp_millis::BucketAligned; use common_time::Timestamp; -use smallvec::SmallVec; -use snafu::ResultExt; -use store_api::metadata::RegionMetadataRef; -use store_api::storage::RegionId; -use tokio::sync::mpsc; -use crate::access_layer::{AccessLayerRef, SstWriteRequest}; -use crate::cache::CacheManagerRef; +use crate::compaction::buckets::infer_time_bucket; use crate::compaction::picker::{CompactionTask, Picker}; -use crate::compaction::CompactionRequest; -use crate::config::MitoConfig; -use crate::error::{self, CompactRegionSnafu}; -use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList}; -use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; -use crate::read::projection::ProjectionMapper; -use crate::read::scan_region::ScanInput; -use crate::read::seq_scan::SeqScan; -use crate::read::{BoxedBatchReader, Source}; -use crate::region::options::IndexOptions; -use crate::region::version::VersionControlRef; -use crate::region::{ManifestContextRef, RegionState}; -use crate::request::{ - BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, -}; -use crate::sst::file::{FileHandle, FileId, FileMeta, IndexType, Level}; -use crate::sst::file_purger::FilePurgerRef; -use crate::sst::parquet::WriteOptions; +use crate::compaction::task::CompactionTaskImpl; +use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest}; +use crate::sst::file::{FileHandle, FileId}; use crate::sst::version::LevelMeta; -use crate::worker::WorkerListener; - -const MAX_PARALLEL_COMPACTION: usize = 8; /// `TwcsPicker` picks files of which the max timestamp are in the same time window as compaction /// candidates. @@ -107,6 +81,7 @@ impl TwcsPicker { output_level: 1, // we only have two levels and always compact to l1 inputs: files_in_window.clone(), filter_deleted, + output_time_range: None, // we do not enforce output time range in twcs compactions. }); } else { debug!("Active window not present or no enough files in active window {:?}, window: {}", active_window, *window); @@ -119,6 +94,7 @@ impl TwcsPicker { output_level: 1, inputs: files_in_window.clone(), filter_deleted, + output_time_range: None, }); } else { debug!( @@ -147,6 +123,7 @@ impl Picker for TwcsPicker { manifest_ctx, version_control, listener, + .. } = req; let region_metadata = current_version.metadata.clone(); @@ -188,7 +165,7 @@ impl Picker for TwcsPicker { } return None; } - let task = TwcsCompactionTask { + let task = CompactionTaskImpl { engine_config, region_id, metadata: region_metadata, @@ -329,393 +306,6 @@ fn find_latest_window_in_seconds<'a>( .and_then(|ts| ts.value().align_to_ceil_by_bucket(time_window_size)) } -pub(crate) struct TwcsCompactionTask { - pub engine_config: Arc, - pub region_id: RegionId, - pub metadata: RegionMetadataRef, - pub sst_layer: AccessLayerRef, - pub outputs: Vec, - pub expired_ssts: Vec, - pub compaction_time_window: Option, - pub file_purger: FilePurgerRef, - /// Request sender to notify the worker. - pub(crate) request_sender: mpsc::Sender, - /// Senders that are used to notify waiters waiting for pending compaction tasks. - pub waiters: Vec, - /// Start time of compaction task - pub start_time: Instant, - pub(crate) cache_manager: CacheManagerRef, - /// Target storage of the region. - pub(crate) storage: Option, - /// Index options of the region. - pub(crate) index_options: IndexOptions, - /// The region is using append mode. - pub(crate) append_mode: bool, - /// Manifest context. - pub(crate) manifest_ctx: ManifestContextRef, - /// Version control to update. - pub(crate) version_control: VersionControlRef, - /// Event listener. - pub(crate) listener: WorkerListener, -} - -impl Debug for TwcsCompactionTask { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TwcsCompactionTask") - .field("region_id", &self.region_id) - .field("outputs", &self.outputs) - .field("expired_ssts", &self.expired_ssts) - .field("compaction_time_window", &self.compaction_time_window) - .field("append_mode", &self.append_mode) - .finish() - } -} - -impl Drop for TwcsCompactionTask { - fn drop(&mut self) { - self.mark_files_compacting(false) - } -} - -impl TwcsCompactionTask { - fn mark_files_compacting(&self, compacting: bool) { - self.outputs - .iter() - .flat_map(|o| o.inputs.iter()) - .for_each(|f| f.set_compacting(compacting)) - } - - /// Merges all SST files. - /// Returns `(output files, input files)`. - async fn merge_ssts(&mut self) -> error::Result<(Vec, Vec)> { - let mut futs = Vec::with_capacity(self.outputs.len()); - let mut compacted_inputs = - Vec::with_capacity(self.outputs.iter().map(|o| o.inputs.len()).sum()); - - for output in self.outputs.drain(..) { - compacted_inputs.extend(output.inputs.iter().map(FileHandle::meta)); - - info!( - "Compaction region {}. Input [{}] -> output {}", - self.region_id, - output - .inputs - .iter() - .map(|f| f.file_id().to_string()) - .collect::>() - .join(","), - output.output_file_id - ); - - let write_opts = WriteOptions { - write_buffer_size: self.engine_config.sst_write_buffer_size, - ..Default::default() - }; - let create_inverted_index = self - .engine_config - .inverted_index - .create_on_compaction - .auto(); - let mem_threshold_index_create = self - .engine_config - .inverted_index - .mem_threshold_on_create - .map(|m| m.as_bytes() as _); - let index_write_buffer_size = Some( - self.engine_config - .inverted_index - .write_buffer_size - .as_bytes() as usize, - ); - - let metadata = self.metadata.clone(); - let sst_layer = self.sst_layer.clone(); - let region_id = self.region_id; - let file_id = output.output_file_id; - let cache_manager = self.cache_manager.clone(); - let storage = self.storage.clone(); - let index_options = self.index_options.clone(); - let append_mode = self.append_mode; - futs.push(async move { - let reader = build_sst_reader( - metadata.clone(), - sst_layer.clone(), - Some(cache_manager.clone()), - &output.inputs, - append_mode, - output.filter_deleted, - ) - .await?; - let file_meta_opt = sst_layer - .write_sst( - SstWriteRequest { - file_id, - metadata, - source: Source::Reader(reader), - cache_manager, - storage, - create_inverted_index, - mem_threshold_index_create, - index_write_buffer_size, - index_options, - }, - &write_opts, - ) - .await? - .map(|sst_info| FileMeta { - region_id, - file_id, - time_range: sst_info.time_range, - level: output.output_level, - file_size: sst_info.file_size, - available_indexes: sst_info - .inverted_index_available - .then(|| SmallVec::from_iter([IndexType::InvertedIndex])) - .unwrap_or_default(), - index_file_size: sst_info.index_file_size, - }); - Ok(file_meta_opt) - }); - } - - let mut output_files = Vec::with_capacity(futs.len()); - while !futs.is_empty() { - let mut task_chunk = Vec::with_capacity(MAX_PARALLEL_COMPACTION); - for _ in 0..MAX_PARALLEL_COMPACTION { - if let Some(task) = futs.pop() { - task_chunk.push(common_runtime::spawn_bg(task)); - } - } - let metas = futures::future::try_join_all(task_chunk) - .await - .context(error::JoinSnafu)? - .into_iter() - .collect::>>()?; - output_files.extend(metas.into_iter().flatten()); - } - - let inputs = compacted_inputs.into_iter().collect(); - Ok((output_files, inputs)) - } - - async fn handle_compaction(&mut self) -> error::Result<()> { - self.mark_files_compacting(true); - let merge_timer = COMPACTION_STAGE_ELAPSED - .with_label_values(&["merge"]) - .start_timer(); - let (added, mut deleted) = match self.merge_ssts().await { - Ok(v) => v, - Err(e) => { - error!(e; "Failed to compact region: {}", self.region_id); - merge_timer.stop_and_discard(); - return Err(e); - } - }; - deleted.extend(self.expired_ssts.iter().map(FileHandle::meta)); - let merge_time = merge_timer.stop_and_record(); - info!( - "Compacted SST files, region_id: {}, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}, merge_time: {}s", - self.region_id, - deleted, - added, - self.compaction_time_window, - self.waiters.len(), - merge_time, - ); - - self.listener.on_merge_ssts_finished(self.region_id).await; - - let _manifest_timer = COMPACTION_STAGE_ELAPSED - .with_label_values(&["write_manifest"]) - .start_timer(); - // Write region edit to manifest. - let edit = RegionEdit { - files_to_add: added, - files_to_remove: deleted, - compaction_time_window: self - .compaction_time_window - .map(|seconds| Duration::from_secs(seconds as u64)), - flushed_entry_id: None, - flushed_sequence: None, - }; - let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())); - // We might leak files if we fail to update manifest. We can add a cleanup task to - // remove them later. - self.manifest_ctx - .update_manifest(RegionState::Writable, action_list, || { - self.version_control - .apply_edit(edit, &[], self.file_purger.clone()); - }) - .await - } - - /// Handles compaction failure, notifies all waiters. - fn on_failure(&mut self, err: Arc) { - COMPACTION_FAILURE_COUNT.inc(); - for waiter in self.waiters.drain(..) { - waiter.send(Err(err.clone()).context(CompactRegionSnafu { - region_id: self.region_id, - })); - } - } - - /// Notifies region worker to handle post-compaction tasks. - async fn send_to_worker(&self, request: WorkerRequest) { - if let Err(e) = self.request_sender.send(request).await { - error!( - "Failed to notify compaction job status for region {}, request: {:?}", - self.region_id, e.0 - ); - } - } -} - -#[async_trait::async_trait] -impl CompactionTask for TwcsCompactionTask { - async fn run(&mut self) { - let notify = match self.handle_compaction().await { - Ok(()) => BackgroundNotify::CompactionFinished(CompactionFinished { - region_id: self.region_id, - senders: std::mem::take(&mut self.waiters), - start_time: self.start_time, - }), - Err(e) => { - error!(e; "Failed to compact region, region id: {}", self.region_id); - let err = Arc::new(e); - // notify compaction waiters - self.on_failure(err.clone()); - BackgroundNotify::CompactionFailed(CompactionFailed { - region_id: self.region_id, - err, - }) - } - }; - - self.send_to_worker(WorkerRequest::Background { - region_id: self.region_id, - notify, - }) - .await; - } -} - -/// Infers the suitable time bucket duration. -/// Now it simply find the max and min timestamp across all SSTs in level and fit the time span -/// into time bucket. -pub(crate) fn infer_time_bucket<'a>(files: impl Iterator) -> i64 { - let mut max_ts = Timestamp::new(i64::MIN, TimeUnit::Second); - let mut min_ts = Timestamp::new(i64::MAX, TimeUnit::Second); - - for f in files { - let (start, end) = f.time_range(); - min_ts = min_ts.min(start); - max_ts = max_ts.max(end); - } - - // safety: Convert whatever timestamp into seconds will not cause overflow. - let min_sec = min_ts.convert_to(TimeUnit::Second).unwrap().value(); - let max_sec = max_ts.convert_to(TimeUnit::Second).unwrap().value(); - - max_sec - .checked_sub(min_sec) - .map(|span| TIME_BUCKETS.fit_time_bucket(span)) // return the max bucket on subtraction overflow. - .unwrap_or_else(|| TIME_BUCKETS.max()) // safety: TIME_BUCKETS cannot be empty. -} - -pub(crate) struct TimeBuckets([i64; 7]); - -impl TimeBuckets { - /// Fits a given time span into time bucket by find the minimum bucket that can cover the span. - /// Returns the max bucket if no such bucket can be found. - fn fit_time_bucket(&self, span_sec: i64) -> i64 { - assert!(span_sec >= 0); - match self.0.binary_search(&span_sec) { - Ok(idx) => self.0[idx], - Err(idx) => { - if idx < self.0.len() { - self.0[idx] - } else { - self.0.last().copied().unwrap() - } - } - } - } - - #[cfg(test)] - fn get(&self, idx: usize) -> i64 { - self.0[idx] - } - - fn max(&self) -> i64 { - self.0.last().copied().unwrap() - } -} - -/// A set of predefined time buckets. -pub(crate) const TIME_BUCKETS: TimeBuckets = TimeBuckets([ - 60 * 60, // one hour - 2 * 60 * 60, // two hours - 12 * 60 * 60, // twelve hours - 24 * 60 * 60, // one day - 7 * 24 * 60 * 60, // one week - 365 * 24 * 60 * 60, // one year - 10 * 365 * 24 * 60 * 60, // ten years -]); - -/// Finds all expired SSTs across levels. -fn get_expired_ssts( - levels: &[LevelMeta], - ttl: Option, - now: Timestamp, -) -> Vec { - let Some(ttl) = ttl else { - return vec![]; - }; - - let expire_time = match now.sub_duration(ttl) { - Ok(expire_time) => expire_time, - Err(e) => { - error!(e; "Failed to calculate region TTL expire time"); - return vec![]; - } - }; - - levels - .iter() - .flat_map(|l| l.get_expired_files(&expire_time).into_iter()) - .collect() -} - -#[derive(Debug)] -pub(crate) struct CompactionOutput { - pub output_file_id: FileId, - /// Compaction output file level. - pub output_level: Level, - /// Compaction input files. - pub inputs: Vec, - /// Whether to remove deletion markers. - pub filter_deleted: bool, -} - -/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order. -async fn build_sst_reader( - metadata: RegionMetadataRef, - sst_layer: AccessLayerRef, - cache: Option, - inputs: &[FileHandle], - append_mode: bool, - filter_deleted: bool, -) -> error::Result { - let scan_input = ScanInput::new(sst_layer, ProjectionMapper::all(&metadata)?) - .with_files(inputs.to_vec()) - .with_cache(cache) - .with_append_mode(append_mode) - .with_filter_deleted(filter_deleted) - // We ignore file not found error during compaction. - .with_ignore_file_not_found(true); - SeqScan::new(scan_input).build_reader().await -} - #[cfg(test)] mod tests { use std::collections::HashSet; @@ -1017,43 +607,5 @@ mod tests { .check(); } - #[test] - fn test_time_bucket() { - assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(1)); - assert_eq!(TIME_BUCKETS.get(0), TIME_BUCKETS.fit_time_bucket(60 * 60)); - assert_eq!( - TIME_BUCKETS.get(1), - TIME_BUCKETS.fit_time_bucket(60 * 60 + 1) - ); - - assert_eq!( - TIME_BUCKETS.get(2), - TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2) - 1) - ); - assert_eq!( - TIME_BUCKETS.get(2), - TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(2)) - ); - assert_eq!( - TIME_BUCKETS.get(3), - TIME_BUCKETS.fit_time_bucket(TIME_BUCKETS.get(3) - 1) - ); - assert_eq!(TIME_BUCKETS.get(6), TIME_BUCKETS.fit_time_bucket(i64::MAX)); - } - - #[test] - fn test_infer_time_buckets() { - assert_eq!( - TIME_BUCKETS.get(0), - infer_time_bucket( - [ - new_file_handle(FileId::random(), 0, TIME_BUCKETS.get(0) * 1000 - 1, 0), - new_file_handle(FileId::random(), 1, 10_000, 0) - ] - .iter() - ) - ); - } - // TODO(hl): TTL tester that checks if get_expired_ssts function works as expected. } diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs new file mode 100644 index 000000000000..2f0ff49c7f16 --- /dev/null +++ b/src/mito2/src/compaction/window.rs @@ -0,0 +1,420 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::fmt::Debug; + +use common_telemetry::info; +use common_time::range::TimestampRange; +use common_time::timestamp::TimeUnit; +use common_time::timestamp_millis::BucketAligned; +use common_time::Timestamp; +use store_api::storage::RegionId; + +use crate::compaction::buckets::infer_time_bucket; +use crate::compaction::picker::{CompactionTask, Picker}; +use crate::compaction::task::CompactionTaskImpl; +use crate::compaction::{get_expired_ssts, CompactionOutput, CompactionRequest}; +use crate::region::version::VersionRef; +use crate::sst::file::{FileHandle, FileId}; + +/// Compaction picker that splits the time range of all involved files to windows, and merges +/// the data segments intersects with those windows of files together so that the output files +/// never overlaps. +#[derive(Debug)] +pub struct WindowedCompactionPicker { + compaction_time_window_seconds: Option, +} + +impl WindowedCompactionPicker { + pub fn new(window_seconds: Option) -> Self { + Self { + compaction_time_window_seconds: window_seconds, + } + } + + // Computes compaction time window. First we respect user specified parameter, then + // use persisted window. If persist window is not present, we check the time window + // provided while creating table. If all of those are absent, we infer the window + // from files in level0. + fn calculate_time_window(&self, region_id: RegionId, current_version: &VersionRef) -> i64 { + self.compaction_time_window_seconds + .or(current_version + .compaction_time_window + .map(|t| t.as_secs() as i64)) + .unwrap_or_else(|| { + let levels = current_version.ssts.levels(); + let inferred = infer_time_bucket(levels[0].files()); + info!( + "Compaction window for region {} is not present, inferring from files: {:?}", + region_id, inferred + ); + inferred + }) + } + + fn pick_inner( + &self, + region_id: RegionId, + current_version: &VersionRef, + current_time: Timestamp, + ) -> (Vec, Vec, i64) { + let time_window = self.calculate_time_window(region_id, current_version); + info!( + "Compaction window for region: {} is {} seconds", + region_id, time_window + ); + + let expired_ssts = get_expired_ssts( + current_version.ssts.levels(), + current_version.options.ttl, + current_time, + ); + if !expired_ssts.is_empty() { + info!("Expired SSTs in region {}: {:?}", region_id, expired_ssts); + // here we mark expired SSTs as compacting to avoid them being picked. + expired_ssts.iter().for_each(|f| f.set_compacting(true)); + } + + let windows = assign_files_to_time_windows( + time_window, + current_version + .ssts + .levels() + .iter() + .flat_map(|level| level.files.values()), + ); + + (build_output(windows), expired_ssts, time_window) + } +} + +impl Picker for WindowedCompactionPicker { + fn pick(&self, req: CompactionRequest) -> Option> { + let region_id = req.region_id(); + let CompactionRequest { + engine_config, + current_version, + access_layer, + request_sender, + waiters, + file_purger, + start_time, + cache_manager, + manifest_ctx, + version_control, + listener, + } = req; + + let (outputs, expired_ssts, time_window) = + self.pick_inner(region_id, ¤t_version, Timestamp::current_millis()); + + let task = CompactionTaskImpl { + engine_config: engine_config.clone(), + region_id, + metadata: current_version.metadata.clone().clone(), + sst_layer: access_layer.clone(), + outputs, + expired_ssts, + compaction_time_window: Some(time_window), + request_sender, + waiters, + file_purger, + start_time, + cache_manager, + storage: current_version.options.storage.clone(), + index_options: current_version.options.index_options.clone(), + append_mode: current_version.options.append_mode, + manifest_ctx, + version_control, + listener, + }; + Some(Box::new(task)) + } +} + +fn build_output(windows: BTreeMap)>) -> Vec { + let mut outputs = Vec::with_capacity(windows.len()); + for (lower_bound, (upper_bound, files)) in windows { + // safety: the upper bound must > lower bound. + let output_time_range = Some( + TimestampRange::new( + Timestamp::new_second(lower_bound), + Timestamp::new_second(upper_bound), + ) + .unwrap(), + ); + + let output = CompactionOutput { + output_file_id: FileId::random(), + output_level: 1, + inputs: files, + filter_deleted: false, + output_time_range, + }; + outputs.push(output); + } + + outputs +} + +/// Assigns files to time windows. If file does not contain a time range in metadata, it will be +/// assigned to a special bucket `i64::MAX` (normally no timestamp can be aligned to this bucket) +/// so that all files without timestamp can be compacted together. +fn assign_files_to_time_windows<'a>( + bucket_sec: i64, + files: impl Iterator, +) -> BTreeMap)> { + let mut buckets = BTreeMap::new(); + + for file in files { + if file.compacting() { + continue; + } + let (start, end) = file.time_range(); + let bounds = file_time_bucket_span( + // safety: converting whatever timestamp to seconds will not overflow. + start.convert_to(TimeUnit::Second).unwrap().value(), + end.convert_to(TimeUnit::Second).unwrap().value(), + bucket_sec, + ); + for (lower_bound, upper_bound) in bounds { + let (_, files) = buckets + .entry(lower_bound) + .or_insert_with(|| (upper_bound, Vec::new())); + files.push(file.clone()); + } + } + buckets +} + +/// Calculates timestamp span between start and end timestamp. +fn file_time_bucket_span(start_sec: i64, end_sec: i64, bucket_sec: i64) -> Vec<(i64, i64)> { + assert!(start_sec <= end_sec); + + // if timestamp is between `[i64::MIN, i64::MIN.align_by_bucket(bucket)]`, which cannot + // be aligned to a valid i64 bound, simply return `i64::MIN` rather than just underflow. + let mut start_aligned = start_sec.align_by_bucket(bucket_sec).unwrap_or(i64::MIN); + let end_aligned = end_sec + .align_by_bucket(bucket_sec) + .unwrap_or(start_aligned + (end_sec - start_sec)); + + let mut res = Vec::with_capacity(((end_aligned - start_aligned) / bucket_sec + 1) as usize); + while start_aligned <= end_aligned { + let window_size = if start_aligned % bucket_sec == 0 { + bucket_sec + } else { + (start_aligned % bucket_sec).abs() + }; + let upper_bound = start_aligned.checked_add(window_size).unwrap_or(i64::MAX); + res.push((start_aligned, upper_bound)); + start_aligned = upper_bound; + } + res +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use common_time::range::TimestampRange; + use common_time::Timestamp; + use store_api::storage::RegionId; + + use crate::compaction::window::{file_time_bucket_span, WindowedCompactionPicker}; + use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder}; + use crate::memtable::time_partition::TimePartitions; + use crate::memtable::version::MemtableVersion; + use crate::region::options::RegionOptions; + use crate::region::version::{Version, VersionRef}; + use crate::sst::file::{FileId, FileMeta, Level}; + use crate::sst::version::SstVersion; + use crate::test_util::memtable_util::metadata_for_test; + use crate::test_util::NoopFilePurger; + + fn build_version(files: &[(FileId, i64, i64, Level)], ttl: Option) -> VersionRef { + let metadata = metadata_for_test(); + let memtables = Arc::new(MemtableVersion::new(Arc::new(TimePartitions::new( + metadata.clone(), + Arc::new(PartitionTreeMemtableBuilder::new( + PartitionTreeConfig::default(), + None, + )), + 0, + None, + )))); + let file_purger_ref = Arc::new(NoopFilePurger); + + let mut ssts = SstVersion::new(); + + ssts.add_files( + file_purger_ref, + files.iter().map(|(file_id, start, end, level)| FileMeta { + file_id: *file_id, + time_range: ( + Timestamp::new_millisecond(*start), + Timestamp::new_millisecond(*end), + ), + level: *level, + ..Default::default() + }), + ); + + Arc::new(Version { + metadata, + memtables, + ssts: Arc::new(ssts), + flushed_entry_id: 0, + flushed_sequence: 0, + truncated_entry_id: None, + compaction_time_window: None, + options: RegionOptions { + ttl, + compaction: Default::default(), + storage: None, + append_mode: false, + wal_options: Default::default(), + index_options: Default::default(), + memtable: None, + }, + }) + } + + #[test] + fn test_pick_expired() { + let picker = WindowedCompactionPicker::new(None); + let files = vec![(FileId::random(), 0, 10, 0)]; + + let version = build_version(&files, Some(Duration::from_millis(1))); + let (outputs, expired_ssts, _window) = picker.pick_inner( + RegionId::new(0, 0), + &version, + Timestamp::new_millisecond(12), + ); + assert!(outputs.is_empty()); + assert_eq!(1, expired_ssts.len()); + } + + const HOUR: i64 = 60 * 60 * 1000; + + #[test] + fn test_infer_window() { + let picker = WindowedCompactionPicker::new(None); + + let files = vec![ + (FileId::random(), 0, HOUR, 0), + (FileId::random(), HOUR, HOUR * 2 - 1, 0), + ]; + + let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64))); + + let (outputs, expired_ssts, window_seconds) = picker.pick_inner( + RegionId::new(0, 0), + &version, + Timestamp::new_millisecond(HOUR * 2), + ); + assert!(expired_ssts.is_empty()); + assert_eq!(2 * HOUR / 1000, window_seconds); + assert_eq!(1, outputs.len()); + assert_eq!(2, outputs[0].inputs.len()); + } + + #[test] + fn test_assign_files_to_windows() { + let picker = WindowedCompactionPicker::new(Some(HOUR / 1000)); + let files = vec![ + (FileId::random(), 0, 2 * HOUR - 1, 0), + (FileId::random(), HOUR, HOUR * 3 - 1, 0), + ]; + let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64))); + let (outputs, expired_ssts, window_seconds) = picker.pick_inner( + RegionId::new(0, 0), + &version, + Timestamp::new_millisecond(HOUR * 3), + ); + + assert!(expired_ssts.is_empty()); + assert_eq!(HOUR / 1000, window_seconds); + assert_eq!(3, outputs.len()); + + assert_eq!(1, outputs[0].inputs.len()); + assert_eq!(files[0].0, outputs[0].inputs[0].file_id()); + assert_eq!( + TimestampRange::new( + Timestamp::new_millisecond(0), + Timestamp::new_millisecond(HOUR) + ), + outputs[0].output_time_range + ); + + assert_eq!(2, outputs[1].inputs.len()); + assert_eq!( + TimestampRange::new( + Timestamp::new_millisecond(HOUR), + Timestamp::new_millisecond(2 * HOUR) + ), + outputs[1].output_time_range + ); + + assert_eq!(1, outputs[2].inputs.len()); + assert_eq!(files[1].0, outputs[2].inputs[0].file_id()); + assert_eq!( + TimestampRange::new( + Timestamp::new_millisecond(2 * HOUR), + Timestamp::new_millisecond(3 * HOUR) + ), + outputs[2].output_time_range + ); + } + + #[test] + fn test_file_time_bucket_span() { + assert_eq!( + vec![(i64::MIN, i64::MIN + 8),], + file_time_bucket_span(i64::MIN, i64::MIN + 1, 10) + ); + + assert_eq!( + vec![(i64::MIN, i64::MIN + 8), (i64::MIN + 8, i64::MIN + 18)], + file_time_bucket_span(i64::MIN, i64::MIN + 8, 10) + ); + + assert_eq!( + vec![ + (i64::MIN, i64::MIN + 8), + (i64::MIN + 8, i64::MIN + 18), + (i64::MIN + 18, i64::MIN + 28) + ], + file_time_bucket_span(i64::MIN, i64::MIN + 20, 10) + ); + + assert_eq!( + vec![(-10, 0), (0, 10), (10, 20)], + file_time_bucket_span(-1, 11, 10) + ); + + assert_eq!( + vec![(-3, 0), (0, 3), (3, 6)], + file_time_bucket_span(-1, 3, 3) + ); + + assert_eq!(vec![(0, 10)], file_time_bucket_span(0, 9, 10)); + + assert_eq!( + vec![(i64::MAX - (i64::MAX % 10), i64::MAX)], + file_time_bucket_span(i64::MAX - 1, i64::MAX, 10) + ); + } +} diff --git a/src/mito2/src/engine/append_mode_test.rs b/src/mito2/src/engine/append_mode_test.rs index 85509094fed7..77f2e4e67dd8 100644 --- a/src/mito2/src/engine/append_mode_test.rs +++ b/src/mito2/src/engine/append_mode_test.rs @@ -137,7 +137,10 @@ async fn test_append_mode_compaction() { flush_region(&engine, region_id, None).await; let output = engine - .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) .await .unwrap(); assert_eq!(output.affected_rows, 0); diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs index 8e6ac03b83d1..861d5e0b9065 100644 --- a/src/mito2/src/engine/compaction_test.rs +++ b/src/mito2/src/engine/compaction_test.rs @@ -131,7 +131,10 @@ async fn test_compaction_region() { put_and_flush(&engine, region_id, &column_schemas, 15..25).await; let result = engine - .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) .await .unwrap(); assert_eq!(result.affected_rows, 0); @@ -179,7 +182,10 @@ async fn test_compaction_region_with_overlapping() { delete_and_flush(&engine, region_id, &column_schemas, 0..3600).await; // window 3600 let result = engine - .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) .await .unwrap(); assert_eq!(result.affected_rows, 0); @@ -227,7 +233,10 @@ async fn test_compaction_region_with_overlapping_delete_all() { delete_and_flush(&engine, region_id, &column_schemas, 0..10800).await; // window 10800 let result = engine - .handle_request(region_id, RegionRequest::Compact(RegionCompactRequest {})) + .handle_request( + region_id, + RegionRequest::Compact(RegionCompactRequest::default()), + ) .await .unwrap(); assert_eq!(result.affected_rows, 0); diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index cebb9a97a2b3..7f3348eb7e08 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -20,6 +20,7 @@ use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; use common_runtime::JoinError; +use common_time::timestamp::TimeUnit; use common_time::Timestamp; use datatypes::arrow::error::ArrowError; use datatypes::prelude::ConcreteDataType; @@ -695,6 +696,18 @@ pub enum Error { location: Location, }, + #[snafu(display( + "Time range predicate overflows, timestamp: {:?}, target unit: {}", + timestamp, + unit + ))] + TimeRangePredicateOverflow { + timestamp: Timestamp, + unit: TimeUnit, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to build time range filters for value: {:?}", timestamp))] BuildTimeRangeFilter { timestamp: Timestamp, @@ -810,6 +823,7 @@ impl ErrorExt for Error { EncodeMemtable { .. } | ReadDataPart { .. } => StatusCode::Internal, ChecksumMismatch { .. } => StatusCode::Unexpected, RegionStopped { .. } => StatusCode::RegionNotReady, + TimeRangePredicateOverflow { .. } => StatusCode::InvalidArguments, BuildTimeRangeFilter { .. } => StatusCode::Unexpected, } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 7483c73bad31..db28d397cc47 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -665,8 +665,8 @@ impl RegionWorkerLoop { .await; continue; } - DdlRequest::Compact(_) => { - self.handle_compaction_request(ddl.region_id, ddl.sender); + DdlRequest::Compact(req) => { + self.handle_compaction_request(ddl.region_id, req, ddl.sender); continue; } DdlRequest::Truncate(_) => { diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs index dd624c95e5e9..57dd53c8c8e6 100644 --- a/src/mito2/src/worker/handle_compaction.rs +++ b/src/mito2/src/worker/handle_compaction.rs @@ -14,6 +14,7 @@ use common_telemetry::{error, info, warn}; use store_api::logstore::LogStore; +use store_api::region_request::RegionCompactRequest; use store_api::storage::RegionId; use crate::metrics::COMPACTION_REQUEST_COUNT; @@ -25,6 +26,7 @@ impl RegionWorkerLoop { pub(crate) fn handle_compaction_request( &mut self, region_id: RegionId, + req: RegionCompactRequest, mut sender: OptionOutputTx, ) { let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else { @@ -33,6 +35,7 @@ impl RegionWorkerLoop { COMPACTION_REQUEST_COUNT.inc(); if let Err(e) = self.compaction_scheduler.schedule_compaction( region.region_id, + req.options, ®ion.version_control, ®ion.access_layer, ®ion.file_purger, diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index 9b1f86df751d..6659fbc74a6d 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -16,6 +16,7 @@ use std::sync::Arc; +use api::v1::region::compact_request; use common_telemetry::{error, info, warn}; use store_api::logstore::LogStore; use store_api::region_request::RegionFlushRequest; @@ -236,6 +237,7 @@ impl RegionWorkerLoop { // Schedules compaction. if let Err(e) = self.compaction_scheduler.schedule_compaction( region.region_id, + compact_request::Options::Regular(Default::default()), ®ion.version_control, ®ion.access_layer, ®ion.file_purger, diff --git a/src/operator/src/request.rs b/src/operator/src/request.rs index 7aac8204b03f..9611bbfab1ff 100644 --- a/src/operator/src/request.rs +++ b/src/operator/src/request.rs @@ -109,6 +109,7 @@ impl Requester { .map(|partition| { RegionRequestBody::Compact(CompactRequest { region_id: partition.id.into(), + options: Some(request.compact_options.clone()), }) }) .collect(); @@ -145,6 +146,7 @@ impl Requester { ) -> Result { let request = RegionRequestBody::Compact(CompactRequest { region_id: region_id.into(), + options: None, // todo(hl): maybe also support parameters in region compaction. }); info!("Handle region manual compaction request: {region_id}"); diff --git a/src/session/src/table_name.rs b/src/session/src/table_name.rs index 7d56362bedfb..c0806ecaf555 100644 --- a/src/session/src/table_name.rs +++ b/src/session/src/table_name.rs @@ -12,20 +12,31 @@ // See the License for the specific language governing permissions and // limitations under the License. +use snafu::ensure; use sql::ast::ObjectName; -use sql::error::{InvalidSqlSnafu, Result}; +use sql::error::{InvalidSqlSnafu, PermissionDeniedSnafu, Result}; use sql::parser::ParserContext; use crate::QueryContextRef; -/// Parse table name into `(catalog, schema, table)` with query context. +/// Parse table name into `(catalog, schema, table)` with query context and validates +/// if catalog matches current catalog in query context. pub fn table_name_to_full_name( name: &str, query_ctx: &QueryContextRef, ) -> Result<(String, String, String)> { let obj_name = ParserContext::parse_table_name(name, query_ctx.sql_dialect())?; - table_idents_to_full_name(&obj_name, query_ctx) + let (catalog, schema, table) = table_idents_to_full_name(&obj_name, query_ctx)?; + // todo(hl): also check if schema matches when rbac is ready. https://github.com/GreptimeTeam/greptimedb/pull/3988/files#r1608687652 + ensure!( + catalog == query_ctx.current_catalog(), + PermissionDeniedSnafu { + target: catalog, + current: query_ctx.current_catalog(), + } + ); + Ok((catalog, schema, table)) } /// Converts maybe fully-qualified table name (`..`) to tuple. diff --git a/src/sql/src/error.rs b/src/sql/src/error.rs index 81a760f31787..8f815c79d722 100644 --- a/src/sql/src/error.rs +++ b/src/sql/src/error.rs @@ -213,6 +213,18 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display( + "Permission denied while operating catalog {} from current catalog {}", + target, + current + ))] + PermissionDenied { + target: String, + current: String, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -241,7 +253,8 @@ impl ErrorExt for Error { | InvalidSqlValue { .. } | TimestampOverflow { .. } | InvalidTableOption { .. } - | InvalidCast { .. } => StatusCode::InvalidArguments, + | InvalidCast { .. } + | PermissionDenied { .. } => StatusCode::InvalidArguments, SerializeColumnDefaultConstraint { source, .. } => source.status_code(), ConvertToGrpcDataType { source, .. } => source.status_code(), diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 6c4aaa28f47c..70271f017b35 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -18,9 +18,9 @@ use std::fmt; use api::helper::ColumnDataTypeWrapper; use api::v1::add_column_location::LocationType; use api::v1::region::{ - alter_request, region_request, AlterRequest, AlterRequests, CloseRequest, CompactRequest, - CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests, FlushRequest, - InsertRequests, OpenRequest, TruncateRequest, + alter_request, compact_request, region_request, AlterRequest, AlterRequests, CloseRequest, + CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests, + FlushRequest, InsertRequests, OpenRequest, TruncateRequest, }; use api::v1::{self, Rows, SemanticType}; pub use common_base::AffectedRows; @@ -199,9 +199,12 @@ fn make_region_flush(flush: FlushRequest) -> Result Result> { let region_id = compact.region_id.into(); + let options = compact + .options + .unwrap_or(compact_request::Options::Regular(Default::default())); Ok(vec![( region_id, - RegionRequest::Compact(RegionCompactRequest {}), + RegionRequest::Compact(RegionCompactRequest { options }), )]) } @@ -642,7 +645,18 @@ pub struct RegionFlushRequest { } #[derive(Debug)] -pub struct RegionCompactRequest {} +pub struct RegionCompactRequest { + pub options: compact_request::Options, +} + +impl Default for RegionCompactRequest { + fn default() -> Self { + Self { + // Default to regular compaction. + options: compact_request::Options::Regular(Default::default()), + } + } +} /// Truncate region request. #[derive(Debug)] diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index b285be08b907..9463b1809fe7 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -29,6 +29,7 @@ datafusion-physical-expr.workspace = true datatypes.workspace = true derive_builder.workspace = true futures.workspace = true +greptime-proto.workspace = true humantime.workspace = true humantime-serde.workspace = true paste = "1.0" diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 10182baeb463..e86eb4e8e423 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -25,6 +25,7 @@ use common_time::range::TimestampRange; use datatypes::data_type::ConcreteDataType; use datatypes::prelude::VectorRef; use datatypes::schema::ColumnSchema; +use greptime_proto::v1::region::compact_request; use serde::{Deserialize, Serialize}; use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, PHYSICAL_TABLE_METADATA_KEY}; use store_api::mito_engine_options::is_mito_engine_option_key; @@ -238,11 +239,23 @@ pub struct FlushTableRequest { pub table_name: String, } -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone, PartialEq)] pub struct CompactTableRequest { pub catalog_name: String, pub schema_name: String, pub table_name: String, + pub compact_options: compact_request::Options, +} + +impl Default for CompactTableRequest { + fn default() -> Self { + Self { + catalog_name: Default::default(), + schema_name: Default::default(), + table_name: Default::default(), + compact_options: compact_request::Options::Regular(Default::default()), + } + } } /// Truncate table request