Skip to content

Commit

Permalink
add hilbert_clustering_information
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jan 6, 2025
1 parent a2fd938 commit 933d544
Show file tree
Hide file tree
Showing 7 changed files with 272 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use databend_common_storages_fuse::table_functions::FuseEncodingFunc;
use databend_common_storages_fuse::table_functions::FuseStatisticsFunc;
use databend_common_storages_fuse::table_functions::FuseTimeTravelSizeFunc;
use databend_common_storages_fuse::table_functions::FuseVacuumTemporaryTable;
use databend_common_storages_fuse::table_functions::HilbertClusteringInfoFunc;
use databend_common_storages_fuse::table_functions::TableFunctionTemplate;
use databend_common_storages_stream::stream_status_table_func::StreamStatusTable;
use databend_storages_common_table_meta::table_id_ranges::SYS_TBL_FUC_ID_END;
Expand Down Expand Up @@ -186,6 +187,14 @@ impl TableFunctionFactory {
),
);

creators.insert(
"hilbert_clustering_information".to_string(),
(
next_id(),
Arc::new(TableFunctionTemplate::<HilbertClusteringInfoFunc>::create),
),
);

creators.insert(
"fuse_vacuum_temporary_table".to_string(),
(
Expand Down
8 changes: 4 additions & 4 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,10 @@ impl FuseTable {
}

pub fn linear_cluster_keys(&self, ctx: Arc<dyn TableContext>) -> Vec<RemoteExpr<String>> {
let Some(cluster_type) = self.cluster_type() else {
return vec![];
};
if matches!(cluster_type, ClusterType::Hilbert) {
if self
.cluster_type()
.is_none_or(|v| matches!(v, ClusterType::Hilbert))
{
return vec![];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl<'a> ClusteringInformation<'a> {
.get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear);
if matches!(typ, ClusterType::Hilbert) {
return Err(ErrorCode::UnsupportedClusterType(
"Unsupported 'hilbert' type",
"Unsupported 'hilbert' type, please use `hilbert_clustering_information` instead",
));
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/query/storages/fuse/src/table_functions/fuse_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;
use databend_common_expression::TableSchemaRefExt;
use databend_storages_common_table_meta::meta::SegmentInfo;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::TableSnapshot;

use crate::io::SegmentsIO;
Expand Down Expand Up @@ -85,7 +85,7 @@ impl TableMetaFunc for FuseSegment {
std::cmp::min(ctx.get_settings().get_max_threads()? as usize * 4, len).max(1);
for chunk in segment_locations.chunks(chunk_size) {
let segments = segments_io
.read_segments::<SegmentInfo>(chunk, true)
.read_segments::<Arc<CompactSegmentInfo>>(chunk, true)
.await?;

for (idx, segment) in segments.into_iter().enumerate() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use chrono::Utc;
use databend_common_catalog::catalog::CATALOG_DEFAULT;
use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_args::TableArgs;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::types::DataType;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::NumberScalar;
use databend_common_expression::BlockEntry;
use databend_common_expression::DataBlock;
use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableField;
use databend_common_expression::TableSchema;
use databend_common_expression::TableSchemaRef;
use databend_common_expression::TableSchemaRefExt;
use databend_common_expression::Value;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::table::ClusterType;
use databend_storages_common_table_meta::table::OPT_KEY_CLUSTER_TYPE;

use crate::io::SegmentsIO;
use crate::table_functions::parse_db_tb_args;
use crate::table_functions::string_literal;
use crate::table_functions::SimpleArgFunc;
use crate::table_functions::SimpleArgFuncTemplate;
use crate::FuseTable;

pub struct HilbertClusteringInfoArgs {
database_name: String,
table_name: String,
}

impl From<&HilbertClusteringInfoArgs> for TableArgs {
fn from(args: &HilbertClusteringInfoArgs) -> Self {
let tbl_args = vec![
string_literal(args.database_name.as_str()),
string_literal(args.table_name.as_str()),
];
TableArgs::new_positioned(tbl_args)
}
}

impl TryFrom<(&str, TableArgs)> for HilbertClusteringInfoArgs {
type Error = ErrorCode;
fn try_from(
(func_name, table_args): (&str, TableArgs),
) -> std::result::Result<Self, Self::Error> {
let (database_name, table_name) = parse_db_tb_args(&table_args, func_name)?;

Ok(Self {
database_name,
table_name,
})
}
}

pub type HilbertClusteringInfoFunc = SimpleArgFuncTemplate<HilbertClusteringInfo>;
pub struct HilbertClusteringInfo;

#[async_trait::async_trait]
impl SimpleArgFunc for HilbertClusteringInfo {
type Args = HilbertClusteringInfoArgs;

fn schema() -> TableSchemaRef {
HilbertClusteringInfoImpl::schema()
}

async fn apply(
ctx: &Arc<dyn TableContext>,
args: &Self::Args,
_plan: &DataSourcePlan,
) -> Result<DataBlock> {
let tenant_id = ctx.get_tenant();
let tbl = ctx
.get_catalog(CATALOG_DEFAULT)
.await?
.get_table(
&tenant_id,
args.database_name.as_str(),
args.table_name.as_str(),
)
.await?;

let tbl = FuseTable::try_from_table(tbl.as_ref())?;

HilbertClusteringInfoImpl::new(ctx.clone(), tbl)
.get_clustering_info()
.await
}
}

struct HilbertClusteringInfoImpl<'a> {
pub ctx: Arc<dyn TableContext>,
pub table: &'a FuseTable,
}

impl<'a> HilbertClusteringInfoImpl<'a> {
fn new(ctx: Arc<dyn TableContext>, table: &'a FuseTable) -> Self {
Self { ctx, table }
}

#[async_backtrace::framed]
async fn get_clustering_info(&self) -> Result<DataBlock> {
let Some(cluster_key_str) = self.table.cluster_key_str() else {
return Err(ErrorCode::UnclusteredTable(format!(
"Unclustered table {}",
self.table.table_info.desc
)));
};
let cluster_type = self
.table
.get_option(OPT_KEY_CLUSTER_TYPE, ClusterType::Linear);
if matches!(cluster_type, ClusterType::Linear) {
return Err(ErrorCode::UnsupportedClusterType(
"Unsupported `linear` type, please use `clustering_information` instead",
));
}

let snapshot = self.table.read_table_snapshot().await?;
let now = Utc::now();
let timestamp = snapshot
.as_ref()
.map_or(now, |s| s.timestamp.unwrap_or(now))
.timestamp_micros();
let mut total_segment_count = 0;
let mut stable_segment_count = 0;
let mut partial_segment_count = 0;
let mut unclustered_segment_count = 0;
if let Some(snapshot) = snapshot {
let total_count = snapshot.segments.len();
total_segment_count = total_count as u64;
let chunk_size = std::cmp::min(
self.ctx.get_settings().get_max_threads()? as usize * 4,
total_count,
)
.max(1);
let segments_io = SegmentsIO::create(
self.ctx.clone(),
self.table.operator.clone(),
self.table.schema(),
);
for chunk in snapshot.segments.chunks(chunk_size) {
let segments = segments_io
.read_segments::<Arc<CompactSegmentInfo>>(chunk, true)
.await?;
for segment in segments {
let segment = segment?;
let Some(level) = segment.summary.cluster_stats.as_ref().map(|v| v.level)
else {
unclustered_segment_count += 1;
continue;
};
if level == -1 {
stable_segment_count += 1;
} else {
partial_segment_count += 1;
}
}
}
}
Ok(DataBlock::new(
vec![
BlockEntry::new(
DataType::String,
Value::Scalar(Scalar::String(cluster_key_str.to_string())),
),
BlockEntry::new(
DataType::String,
Value::Scalar(Scalar::String("hilbert".to_string())),
),
BlockEntry::new(
DataType::Timestamp,
Value::Scalar(Scalar::Timestamp(timestamp)),
),
BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Scalar(Scalar::Number(NumberScalar::UInt64(total_segment_count))),
),
BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Scalar(Scalar::Number(NumberScalar::UInt64(stable_segment_count))),
),
BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Scalar(Scalar::Number(NumberScalar::UInt64(partial_segment_count))),
),
BlockEntry::new(
DataType::Number(NumberDataType::UInt64),
Value::Scalar(Scalar::Number(NumberScalar::UInt64(
unclustered_segment_count,
))),
),
],
1,
))
}

fn schema() -> Arc<TableSchema> {
TableSchemaRefExt::create(vec![
TableField::new("cluster_key", TableDataType::String),
TableField::new("type", TableDataType::String),
TableField::new("timestamp", TableDataType::Timestamp),
TableField::new(
"total_segment_count",
TableDataType::Number(NumberDataType::UInt64),
),
TableField::new(
"stable_segment_count",
TableDataType::Number(NumberDataType::UInt64),
),
TableField::new(
"partial_segment_count",
TableDataType::Number(NumberDataType::UInt64),
),
TableField::new(
"unclustered_segment_count",
TableDataType::Number(NumberDataType::UInt64),
),
])
}
}
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/table_functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod fuse_snapshot;
mod fuse_statistic;
mod fuse_time_travel_size;
mod fuse_vacuum_temporary_table;
mod hilbert_clustering_information;
mod table_args;

pub use clustering_information::ClusteringInformationFunc;
Expand All @@ -43,4 +44,5 @@ pub use fuse_statistic::FuseStatisticsFunc;
pub use fuse_time_travel_size::FuseTimeTravelSize;
pub use fuse_time_travel_size::FuseTimeTravelSizeFunc;
pub use fuse_vacuum_temporary_table::FuseVacuumTemporaryTable;
pub use hilbert_clustering_information::HilbertClusteringInfoFunc;
pub use table_args::*;
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ select count() from fuse_snapshot('test_hilbert','t');
----
4

statement error 4013
select * EXCLUDE(timestamp) from clustering_information('test_hilbert','t');

query TTIIII
select * EXCLUDE(timestamp) from hilbert_clustering_information('test_hilbert','t');
----
(a, b) hilbert 4 0 0 4

statement ok
alter table t recluster final;

Expand All @@ -63,6 +71,11 @@ insert into t values(9, 9);
statement ok
alter table t recluster final;

query TTIIII
select * EXCLUDE(timestamp) from hilbert_clustering_information('test_hilbert','t');
----
(a, b) hilbert 3 2 1 0

query I
select count() from fuse_snapshot('test_hilbert','t');
----
Expand Down

0 comments on commit 933d544

Please sign in to comment.