Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jan 6, 2025
1 parent c35d6b6 commit d4a91b7
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 46 deletions.
157 changes: 111 additions & 46 deletions src/query/ee/src/hilbert_clustering/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use databend_common_storages_fuse::DEFAULT_BLOCK_PER_SEGMENT;
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_PER_SEGMENT;
use databend_enterprise_hilbert_clustering::HilbertClusteringHandler;
use databend_enterprise_hilbert_clustering::HilbertClusteringHandlerWrapper;
use databend_storages_common_table_meta::meta::ClusterStatistics;
use databend_storages_common_table_meta::meta::CompactSegmentInfo;
use databend_storages_common_table_meta::meta::Statistics;
use databend_storages_common_table_meta::meta::TableSnapshot;
Expand Down Expand Up @@ -67,10 +68,11 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {

let max_threads = ctx.get_settings().get_max_threads()? as usize;
let chunk_size = max_threads * 4;
let mut target_segments = vec![];
let mut total_rows = 0;
let mut total_size = 0;
let mut stable = false;
let mut checker = ReclusterChecker::new(
cluster_key_id,
thresholds,
push_downs.as_ref().is_none_or(|v| v.filters.is_none()),
);
'FOR: for chunk in segment_locations.chunks(chunk_size) {
// read segments.
let compact_segments = FuseTable::segment_pruning(
Expand All @@ -88,42 +90,19 @@ impl HilbertClusteringHandler for RealHilbertClusteringHandler {
}

for (location, segment) in compact_segments.into_iter() {
total_rows += segment.summary.row_count as usize;
total_size += segment.summary.uncompressed_byte_size as usize;
if !thresholds.check_large_enough(total_rows, total_size) {
// totals < N
target_segments.push((location, segment.clone()));
continue;
}

if thresholds.check_for_compact(total_rows, total_size) {
// N <= totals < 2N
target_segments.push((location, segment.clone()));
} else {
// totals >= 2N
let new_target_segments = vec![(location, segment)];
if Self::check_for_recluster(&new_target_segments, cluster_key_id, stable) {
target_segments = new_target_segments;
stable = true;
break 'FOR;
}
};

if Self::check_for_recluster(&target_segments, cluster_key_id, stable) {
stable = true;
if checker.add(location, segment) {
break 'FOR;
}
target_segments.clear();
total_rows = 0;
total_size = 0;
}
}

if !stable && !Self::check_for_recluster(&target_segments, cluster_key_id, stable) {
let target_segments = checker.finalize();
if target_segments.is_empty() {
return Ok(None);
}

let rows_per_block = block_thresholds.calc_rows_per_block(total_size, total_rows) as u64;
let rows_per_block =
block_thresholds.calc_rows_per_block(checker.total_size, checker.total_rows) as u64;
let block_size = ctx.get_settings().get_max_block_size()?;
ctx.get_settings()
.set_max_block_size(rows_per_block.min(block_size))?;
Expand Down Expand Up @@ -156,23 +135,109 @@ impl RealHilbertClusteringHandler {
GlobalInstance::set(Arc::new(wrapper));
Ok(())
}
}

struct ReclusterChecker {
segments: Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>,
default_cluster_id: u32,
thresholds: BlockThresholds,

total_rows: usize,
total_size: usize,

finished: bool,
// Whether the target segments is at the head of snapshot.
head_of_snapshot: bool,
}

impl ReclusterChecker {
fn new(default_cluster_id: u32, thresholds: BlockThresholds, head_of_snapshot: bool) -> Self {
Self {
segments: vec![],
default_cluster_id,
thresholds,
total_rows: 0,
total_size: 0,
finished: false,
head_of_snapshot,
}
}

fn check_for_recluster(
segments: &[(SegmentLocation, Arc<CompactSegmentInfo>)],
default_cluster_id: u32,
stable: bool,
) -> bool {
match segments.len() {
fn add(&mut self, location: SegmentLocation, segment: Arc<CompactSegmentInfo>) -> bool {
let row_count = segment.summary.row_count as usize;
let byte_size = segment.summary.uncompressed_byte_size as usize;
self.total_rows += row_count;
self.total_size += byte_size;
if !self
.thresholds
.check_large_enough(self.total_rows, self.total_size)
{
// totals < N
self.segments.push((location, segment));
return false;
}

let segment_should_recluster = self.should_recluster(&segment, |v| {
v.cluster_key_id != self.default_cluster_id || v.level != -1
});
let mut retained = false;
if !self.head_of_snapshot || segment_should_recluster {
if self
.thresholds
.check_for_compact(self.total_rows, self.total_size)
{
// N <= totals < 2N
self.segments.push((location, segment));
retained = true;
} else if segment_should_recluster {
// totals >= 2N
self.segments = vec![(location, segment)];
self.total_rows = row_count;
self.total_size = byte_size;
self.finished = true;
return true;
}
}

if self.check_for_recluster() {
if !retained {
self.total_rows -= row_count;
self.total_size -= byte_size;
}
self.finished = true;
return true;
}

self.reset();
false
}

fn finalize(&mut self) -> Vec<(SegmentLocation, Arc<CompactSegmentInfo>)> {
if !self.finished && !self.check_for_recluster() {
return vec![];
}
std::mem::take(&mut self.segments)
}

fn check_for_recluster(&self) -> bool {
match self.segments.len() {
0 => false,
1 => segments[0]
.1
.summary
.cluster_stats
.as_ref()
.map_or(true, |stats| {
stats.cluster_key_id != default_cluster_id || (stats.level != -1 && stable)
}),
1 => self.should_recluster(&self.segments[0].1, |v| {
v.cluster_key_id != self.default_cluster_id
}),
_ => true,
}
}

fn should_recluster<F>(&self, segment: &CompactSegmentInfo, pred: F) -> bool
where F: Fn(&ClusterStatistics) -> bool {
segment.summary.cluster_stats.as_ref().is_none_or(pred)
}

fn reset(&mut self) {
self.total_rows = 0;
self.total_size = 0;
self.head_of_snapshot = false;
self.segments.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ CREATE OR REPLACE DATABASE test_hilbert
statement ok
USE test_hilbert

statement ok
set enable_parallel_multi_merge_sort = 0;

statement ok
create or replace table t(a int, b int) cluster by hilbert(a, b) row_per_block=2 block_per_segment=2;

Expand Down Expand Up @@ -51,6 +54,27 @@ select count(a), sum(a) from t;
----
8 28

statement ok
insert into t values(8, 8);

statement ok
insert into t values(9, 9);

statement ok
alter table t recluster final;

query I
select count() from fuse_snapshot('test_hilbert','t');
----
9

query II
select block_count,row_count from fuse_segment('test_hilbert','t');
----
1 2
2 4
2 4

statement ok
drop table t all;

Expand Down

0 comments on commit d4a91b7

Please sign in to comment.