From 933d54406d3f155c9a333cbfa90755cd65acd16c Mon Sep 17 00:00:00 2001 From: zhyass Date: Mon, 6 Jan 2025 09:22:57 +0800 Subject: [PATCH] add hilbert_clustering_information --- .../table_functions/table_function_factory.rs | 9 + src/query/storages/fuse/src/fuse_table.rs | 8 +- .../table_functions/clustering_information.rs | 2 +- .../fuse/src/table_functions/fuse_segment.rs | 4 +- .../hilbert_clustering_information.rs | 241 ++++++++++++++++++ .../storages/fuse/src/table_functions/mod.rs | 2 + .../07_0000_recluster_final.test | 13 + 7 files changed, 272 insertions(+), 7 deletions(-) create mode 100644 src/query/storages/fuse/src/table_functions/hilbert_clustering_information.rs diff --git a/src/query/service/src/table_functions/table_function_factory.rs b/src/query/service/src/table_functions/table_function_factory.rs index 019d01f4e7e3f..18daa861d03b2 100644 --- a/src/query/service/src/table_functions/table_function_factory.rs +++ b/src/query/service/src/table_functions/table_function_factory.rs @@ -27,6 +27,7 @@ use databend_common_storages_fuse::table_functions::FuseEncodingFunc; use databend_common_storages_fuse::table_functions::FuseStatisticsFunc; use databend_common_storages_fuse::table_functions::FuseTimeTravelSizeFunc; use databend_common_storages_fuse::table_functions::FuseVacuumTemporaryTable; +use databend_common_storages_fuse::table_functions::HilbertClusteringInfoFunc; use databend_common_storages_fuse::table_functions::TableFunctionTemplate; use databend_common_storages_stream::stream_status_table_func::StreamStatusTable; use databend_storages_common_table_meta::table_id_ranges::SYS_TBL_FUC_ID_END; @@ -186,6 +187,14 @@ impl TableFunctionFactory { ), ); + creators.insert( + "hilbert_clustering_information".to_string(), + ( + next_id(), + Arc::new(TableFunctionTemplate::::create), + ), + ); + creators.insert( "fuse_vacuum_temporary_table".to_string(), ( diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 3c665b06c21a2..ba50983379d69 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -403,10 +403,10 @@ impl FuseTable { } pub fn linear_cluster_keys(&self, ctx: Arc) -> Vec> { - let Some(cluster_type) = self.cluster_type() else { - return vec![]; - }; - if matches!(cluster_type, ClusterType::Hilbert) { + if self + .cluster_type() + .is_none_or(|v| matches!(v, ClusterType::Hilbert)) + { return vec![]; } diff --git a/src/query/storages/fuse/src/table_functions/clustering_information.rs b/src/query/storages/fuse/src/table_functions/clustering_information.rs index 4d8920d66131e..4dd15b734ad22 100644 --- a/src/query/storages/fuse/src/table_functions/clustering_information.rs +++ b/src/query/storages/fuse/src/table_functions/clustering_information.rs @@ -211,7 +211,7 @@ impl<'a> ClusteringInformation<'a> { .get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear); if matches!(typ, ClusterType::Hilbert) { return Err(ErrorCode::UnsupportedClusterType( - "Unsupported 'hilbert' type", + "Unsupported 'hilbert' type, please use `hilbert_clustering_information` instead", )); } } diff --git a/src/query/storages/fuse/src/table_functions/fuse_segment.rs b/src/query/storages/fuse/src/table_functions/fuse_segment.rs index 8a84757f8eacd..d5a6e91e9205c 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_segment.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_segment.rs @@ -25,7 +25,7 @@ use databend_common_expression::TableDataType; use databend_common_expression::TableField; use databend_common_expression::TableSchema; use databend_common_expression::TableSchemaRefExt; -use databend_storages_common_table_meta::meta::SegmentInfo; +use databend_storages_common_table_meta::meta::CompactSegmentInfo; use databend_storages_common_table_meta::meta::TableSnapshot; use crate::io::SegmentsIO; @@ -85,7 +85,7 @@ impl TableMetaFunc for FuseSegment { std::cmp::min(ctx.get_settings().get_max_threads()? as usize * 4, len).max(1); for chunk in segment_locations.chunks(chunk_size) { let segments = segments_io - .read_segments::(chunk, true) + .read_segments::>(chunk, true) .await?; for (idx, segment) in segments.into_iter().enumerate() { diff --git a/src/query/storages/fuse/src/table_functions/hilbert_clustering_information.rs b/src/query/storages/fuse/src/table_functions/hilbert_clustering_information.rs new file mode 100644 index 0000000000000..f57ef9bf9f711 --- /dev/null +++ b/src/query/storages/fuse/src/table_functions/hilbert_clustering_information.rs @@ -0,0 +1,241 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use chrono::Utc; +use databend_common_catalog::catalog::CATALOG_DEFAULT; +use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::table::Table; +use databend_common_catalog::table_args::TableArgs; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::NumberScalar; +use databend_common_expression::BlockEntry; +use databend_common_expression::DataBlock; +use databend_common_expression::Scalar; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchema; +use databend_common_expression::TableSchemaRef; +use databend_common_expression::TableSchemaRefExt; +use databend_common_expression::Value; +use databend_storages_common_table_meta::meta::CompactSegmentInfo; +use databend_storages_common_table_meta::table::ClusterType; +use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE; + +use crate::io::SegmentsIO; +use crate::table_functions::parse_db_tb_args; +use crate::table_functions::string_literal; +use crate::table_functions::SimpleArgFunc; +use crate::table_functions::SimpleArgFuncTemplate; +use crate::FuseTable; + +pub struct HilbertClusteringInfoArgs { + database_name: String, + table_name: String, +} + +impl From<&HilbertClusteringInfoArgs> for TableArgs { + fn from(args: &HilbertClusteringInfoArgs) -> Self { + let tbl_args = vec![ + string_literal(args.database_name.as_str()), + string_literal(args.table_name.as_str()), + ]; + TableArgs::new_positioned(tbl_args) + } +} + +impl TryFrom<(&str, TableArgs)> for HilbertClusteringInfoArgs { + type Error = ErrorCode; + fn try_from( + (func_name, table_args): (&str, TableArgs), + ) -> std::result::Result { + let (database_name, table_name) = parse_db_tb_args(&table_args, func_name)?; + + Ok(Self { + database_name, + table_name, + }) + } +} + +pub type HilbertClusteringInfoFunc = SimpleArgFuncTemplate; +pub struct HilbertClusteringInfo; + +#[async_trait::async_trait] +impl SimpleArgFunc for HilbertClusteringInfo { + type Args = HilbertClusteringInfoArgs; + + fn schema() -> TableSchemaRef { + HilbertClusteringInfoImpl::schema() + } + + async fn apply( + ctx: &Arc, + args: &Self::Args, + _plan: &DataSourcePlan, + ) -> Result { + let tenant_id = ctx.get_tenant(); + let tbl = ctx + .get_catalog(CATALOG_DEFAULT) + .await? + .get_table( + &tenant_id, + args.database_name.as_str(), + args.table_name.as_str(), + ) + .await?; + + let tbl = FuseTable::try_from_table(tbl.as_ref())?; + + HilbertClusteringInfoImpl::new(ctx.clone(), tbl) + .get_clustering_info() + .await + } +} + +struct HilbertClusteringInfoImpl<'a> { + pub ctx: Arc, + pub table: &'a FuseTable, +} + +impl<'a> HilbertClusteringInfoImpl<'a> { + fn new(ctx: Arc, table: &'a FuseTable) -> Self { + Self { ctx, table } + } + + #[async_backtrace::framed] + async fn get_clustering_info(&self) -> Result { + let Some(cluster_key_str) = self.table.cluster_key_str() else { + return Err(ErrorCode::UnclusteredTable(format!( + "Unclustered table {}", + self.table.table_info.desc + ))); + }; + let cluster_type = self + .table + .get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear); + if matches!(cluster_type, ClusterType::Linear) { + return Err(ErrorCode::UnsupportedClusterType( + "Unsupported `linear` type, please use `clustering_information` instead", + )); + } + + let snapshot = self.table.read_table_snapshot().await?; + let now = Utc::now(); + let timestamp = snapshot + .as_ref() + .map_or(now, |s| s.timestamp.unwrap_or(now)) + .timestamp_micros(); + let mut total_segment_count = 0; + let mut stable_segment_count = 0; + let mut partial_segment_count = 0; + let mut unclustered_segment_count = 0; + if let Some(snapshot) = snapshot { + let total_count = snapshot.segments.len(); + total_segment_count = total_count as u64; + let chunk_size = std::cmp::min( + self.ctx.get_settings().get_max_threads()? as usize * 4, + total_count, + ) + .max(1); + let segments_io = SegmentsIO::create( + self.ctx.clone(), + self.table.operator.clone(), + self.table.schema(), + ); + for chunk in snapshot.segments.chunks(chunk_size) { + let segments = segments_io + .read_segments::>(chunk, true) + .await?; + for segment in segments { + let segment = segment?; + let Some(level) = segment.summary.cluster_stats.as_ref().map(|v| v.level) + else { + unclustered_segment_count += 1; + continue; + }; + if level == -1 { + stable_segment_count += 1; + } else { + partial_segment_count += 1; + } + } + } + } + Ok(DataBlock::new( + vec![ + BlockEntry::new( + DataType::String, + Value::Scalar(Scalar::String(cluster_key_str.to_string())), + ), + BlockEntry::new( + DataType::String, + Value::Scalar(Scalar::String("hilbert".to_string())), + ), + BlockEntry::new( + DataType::Timestamp, + Value::Scalar(Scalar::Timestamp(timestamp)), + ), + BlockEntry::new( + DataType::Number(NumberDataType::UInt64), + Value::Scalar(Scalar::Number(NumberScalar::UInt64(total_segment_count))), + ), + BlockEntry::new( + DataType::Number(NumberDataType::UInt64), + Value::Scalar(Scalar::Number(NumberScalar::UInt64(stable_segment_count))), + ), + BlockEntry::new( + DataType::Number(NumberDataType::UInt64), + Value::Scalar(Scalar::Number(NumberScalar::UInt64(partial_segment_count))), + ), + BlockEntry::new( + DataType::Number(NumberDataType::UInt64), + Value::Scalar(Scalar::Number(NumberScalar::UInt64( + unclustered_segment_count, + ))), + ), + ], + 1, + )) + } + + fn schema() -> Arc { + TableSchemaRefExt::create(vec![ + TableField::new("cluster_key", TableDataType::String), + TableField::new("type", TableDataType::String), + TableField::new("timestamp", TableDataType::Timestamp), + TableField::new( + "total_segment_count", + TableDataType::Number(NumberDataType::UInt64), + ), + TableField::new( + "stable_segment_count", + TableDataType::Number(NumberDataType::UInt64), + ), + TableField::new( + "partial_segment_count", + TableDataType::Number(NumberDataType::UInt64), + ), + TableField::new( + "unclustered_segment_count", + TableDataType::Number(NumberDataType::UInt64), + ), + ]) + } +} diff --git a/src/query/storages/fuse/src/table_functions/mod.rs b/src/query/storages/fuse/src/table_functions/mod.rs index 2f68941a86630..67e2897faa102 100644 --- a/src/query/storages/fuse/src/table_functions/mod.rs +++ b/src/query/storages/fuse/src/table_functions/mod.rs @@ -24,6 +24,7 @@ mod fuse_snapshot; mod fuse_statistic; mod fuse_time_travel_size; mod fuse_vacuum_temporary_table; +mod hilbert_clustering_information; mod table_args; pub use clustering_information::ClusteringInformationFunc; @@ -43,4 +44,5 @@ pub use fuse_statistic::FuseStatisticsFunc; pub use fuse_time_travel_size::FuseTimeTravelSize; pub use fuse_time_travel_size::FuseTimeTravelSizeFunc; pub use fuse_vacuum_temporary_table::FuseVacuumTemporaryTable; +pub use hilbert_clustering_information::HilbertClusteringInfoFunc; pub use table_args::*; diff --git a/tests/sqllogictests/suites/ee/07_hilbert_clustering/07_0000_recluster_final.test b/tests/sqllogictests/suites/ee/07_hilbert_clustering/07_0000_recluster_final.test index 0a2cf356ca3a5..01b2579043fb5 100644 --- a/tests/sqllogictests/suites/ee/07_hilbert_clustering/07_0000_recluster_final.test +++ b/tests/sqllogictests/suites/ee/07_hilbert_clustering/07_0000_recluster_final.test @@ -41,6 +41,14 @@ select count() from fuse_snapshot('test_hilbert','t'); ---- 4 +statement error 4013 +select * EXCLUDE(timestamp) from clustering_information('test_hilbert','t'); + +query TTIIII +select * EXCLUDE(timestamp) from hilbert_clustering_information('test_hilbert','t'); +---- +(a, b) hilbert 4 0 0 4 + statement ok alter table t recluster final; @@ -63,6 +71,11 @@ insert into t values(9, 9); statement ok alter table t recluster final; +query TTIIII +select * EXCLUDE(timestamp) from hilbert_clustering_information('test_hilbert','t'); +---- +(a, b) hilbert 3 2 1 0 + query I select count() from fuse_snapshot('test_hilbert','t'); ----