diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index f3e3a5c7e8d5..2349e643660a 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -231,15 +231,14 @@ impl OptimizeTableInterpreter { .await? { if !mutator.tasks.is_empty() { + let is_distributed = mutator.is_distributed(); let reclustered_block_count = mutator.recluster_blocks_count; let physical_plan = build_recluster_physical_plan( mutator.tasks, table.get_table_info().clone(), catalog.info(), mutator.snapshot, - mutator.remained_blocks, - mutator.removed_segment_indexes, - mutator.removed_segment_summary, + is_distributed, )?; build_res = diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 168b4516a1f2..2165ace6d64a 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -28,18 +28,17 @@ use databend_common_sql::executor::physical_plans::Exchange; use databend_common_sql::executor::physical_plans::FragmentKind; use databend_common_sql::executor::physical_plans::ReclusterSink; use databend_common_sql::executor::physical_plans::ReclusterSource; -use databend_common_sql::executor::physical_plans::ReclusterTask; use databend_common_sql::executor::PhysicalPlan; use databend_common_sql::plans::LockTableOption; +use databend_common_storages_fuse::operations::ReclusterTasks; use databend_common_storages_fuse::FuseTable; -use databend_storages_common_table_meta::meta::BlockMeta; -use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::TableSnapshot; use log::error; use log::warn; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; +use crate::interpreters::OptimizeTableInterpreter; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; @@ -148,16 +147,15 @@ impl Interpreter for ReclusterTableInterpreter { let mutator = mutator.unwrap(); if mutator.tasks.is_empty() { break; - }; + } + let is_distributed = mutator.is_distributed(); block_count += mutator.recluster_blocks_count; let physical_plan = build_recluster_physical_plan( mutator.tasks, table.get_table_info().clone(), catalog.info(), mutator.snapshot, - mutator.remained_blocks, - mutator.removed_segment_indexes, - mutator.removed_segment_summary, + is_distributed, )?; let mut build_res = @@ -217,44 +215,56 @@ impl Interpreter for ReclusterTableInterpreter { } } -#[allow(clippy::too_many_arguments)] pub fn build_recluster_physical_plan( - tasks: Vec, + tasks: ReclusterTasks, table_info: TableInfo, catalog_info: CatalogInfo, snapshot: Arc, - remained_blocks: Vec>, - removed_segment_indexes: Vec, - removed_segment_summary: Statistics, + is_distributed: bool, ) -> Result { - let is_distributed = tasks.len() > 1; - let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource { - tasks, - table_info: table_info.clone(), - catalog_info: catalog_info.clone(), - plan_id: u32::MAX, - })); + match tasks { + ReclusterTasks::Recluster { + tasks, + remained_blocks, + removed_segment_indexes, + removed_segment_summary, + } => { + let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource { + tasks, + table_info: table_info.clone(), + catalog_info: catalog_info.clone(), + plan_id: u32::MAX, + })); - if is_distributed { - root = PhysicalPlan::Exchange(Exchange { - plan_id: 0, - input: Box::new(root), - kind: FragmentKind::Merge, - keys: vec![], - allow_adjust_parallelism: true, - ignore_exchange: false, - }); + if is_distributed { + root = PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: Box::new(root), + kind: FragmentKind::Merge, + keys: vec![], + allow_adjust_parallelism: true, + ignore_exchange: false, + }); + } + let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink { + input: Box::new(root), + table_info, + catalog_info, + snapshot, + remained_blocks, + removed_segment_indexes, + removed_segment_summary, + plan_id: u32::MAX, + })); + plan.adjust_plan_id(&mut 0); + Ok(plan) + } + ReclusterTasks::Compact(parts) => OptimizeTableInterpreter::build_physical_plan( + parts, + table_info, + snapshot, + catalog_info, + is_distributed, + ), } - let mut plan = PhysicalPlan::ReclusterSink(Box::new(ReclusterSink { - input: Box::new(root), - table_info, - catalog_info, - snapshot, - remained_blocks, - removed_segment_indexes, - removed_segment_summary, - plan_id: u32::MAX, - })); - plan.adjust_plan_id(&mut 0); - Ok(plan) } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index f92aa63d1560..29ea42dbd7ba 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use databend_common_base::base::tokio; use databend_common_catalog::plan::PartInfoType; +use databend_common_catalog::plan::Partitions; use databend_common_catalog::table::CompactionLimits; use databend_common_catalog::table::Table; use databend_common_exception::Result; @@ -33,9 +34,11 @@ use databend_query::schedulers::build_query_pipeline_without_render_result_set; use databend_query::sessions::QueryContext; use databend_query::sessions::TableContext; use databend_query::test_kits::*; +use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::Statistics; use databend_storages_common_table_meta::meta::TableSnapshot; +use opendal::Operator; use rand::thread_rng; use rand::Rng; use uuid::Uuid; @@ -195,6 +198,7 @@ async fn test_safety() -> Result<()> { threshold, cluster_key_id, 5, + false, ) .await?; @@ -239,53 +243,72 @@ async fn test_safety() -> Result<()> { eprintln!("no target select"); continue; } - assert!(selections.partitions_type() != PartInfoType::LazyLevel); - - let mut actual_blocks_number = 0; - let mut compact_segment_indices = HashSet::new(); - let mut actual_block_ids = HashSet::new(); - for part in selections.partitions.into_iter() { - let part = CompactBlockPartInfo::from_part(&part)?; - match part { - CompactBlockPartInfo::CompactExtraInfo(extra) => { - compact_segment_indices.insert(extra.segment_index); - compact_segment_indices.extend(extra.removed_segment_indexes.iter()); - actual_blocks_number += extra.unchanged_blocks.len(); - for b in &extra.unchanged_blocks { - actual_block_ids.insert(b.1.location.clone()); - } + verify_compact_tasks( + ctx.get_application_level_data_operator()?.operator(), + selections, + locations, + HashSet::new(), + ) + .await?; + } + + Ok(()) +} + +pub async fn verify_compact_tasks( + dal: Operator, + parts: Partitions, + locations: Vec, + expected_segment_indices: HashSet, +) -> Result<()> { + assert!(parts.partitions_type() != PartInfoType::LazyLevel); + + let mut actual_blocks_number = 0; + let mut compact_segment_indices = HashSet::new(); + let mut actual_block_ids = HashSet::new(); + for part in parts.partitions.into_iter() { + let part = CompactBlockPartInfo::from_part(&part)?; + match part { + CompactBlockPartInfo::CompactExtraInfo(extra) => { + compact_segment_indices.insert(extra.segment_index); + compact_segment_indices.extend(extra.removed_segment_indexes.iter()); + actual_blocks_number += extra.unchanged_blocks.len(); + for b in &extra.unchanged_blocks { + actual_block_ids.insert(b.1.location.clone()); } - CompactBlockPartInfo::CompactTaskInfo(task) => { - compact_segment_indices.insert(task.index.segment_idx); - actual_blocks_number += task.blocks.len(); - for b in &task.blocks { - actual_block_ids.insert(b.location.clone()); - } + } + CompactBlockPartInfo::CompactTaskInfo(task) => { + compact_segment_indices.insert(task.index.segment_idx); + actual_blocks_number += task.blocks.len(); + for b in &task.blocks { + actual_block_ids.insert(b.location.clone()); } } } + } - eprintln!("compact_segment_indices: {:?}", compact_segment_indices); - let mut except_blocks_number = 0; - let mut except_block_ids = HashSet::new(); - for idx in compact_segment_indices.into_iter() { - let loc = locations.get(idx).unwrap(); - let compact_segment = SegmentsIO::read_compact_segment( - ctx.get_application_level_data_operator()?.operator(), - loc.clone(), - TestFixture::default_table_schema(), - false, - ) - .await?; - let segment = SegmentInfo::try_from(compact_segment)?; - except_blocks_number += segment.blocks.len(); - for b in &segment.blocks { - except_block_ids.insert(b.location.clone()); - } + eprintln!("compact_segment_indices: {:?}", compact_segment_indices); + if !expected_segment_indices.is_empty() { + assert_eq!(expected_segment_indices, compact_segment_indices); + } + let mut except_blocks_number = 0; + let mut except_block_ids = HashSet::new(); + for idx in compact_segment_indices.into_iter() { + let loc = locations.get(idx).unwrap(); + let compact_segment = SegmentsIO::read_compact_segment( + dal.clone(), + loc.clone(), + TestFixture::default_table_schema(), + false, + ) + .await?; + let segment = SegmentInfo::try_from(compact_segment)?; + except_blocks_number += segment.blocks.len(); + for b in &segment.blocks { + except_block_ids.insert(b.location.clone()); } - assert_eq!(except_blocks_number, actual_blocks_number); - assert_eq!(except_block_ids, actual_block_ids); } - + assert_eq!(except_blocks_number, actual_blocks_number); + assert_eq!(except_block_ids, actual_block_ids); Ok(()) } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs index d9d9a3411b99..154198361f62 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/mod.rs @@ -17,5 +17,6 @@ mod deletion; mod recluster_mutator; mod segments_compact_mutator; +pub use block_compact_mutator::verify_compact_tasks; pub use segments_compact_mutator::compact_segment; pub use segments_compact_mutator::CompactSegmentTestFixture; diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index 746f5f17ccff..a2e6c9dd659e 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -30,6 +30,7 @@ use databend_common_expression::TableSchemaRef; use databend_common_storages_fuse::io::SegmentWriter; use databend_common_storages_fuse::io::TableMetaLocationGenerator; use databend_common_storages_fuse::operations::ReclusterMutator; +use databend_common_storages_fuse::operations::ReclusterTasks; use databend_common_storages_fuse::pruning::create_segment_location_vector; use databend_common_storages_fuse::statistics::reducers::merge_statistics_mut; use databend_common_storages_fuse::statistics::reducers::reduce_block_metas; @@ -48,6 +49,7 @@ use rand::thread_rng; use rand::Rng; use uuid::Uuid; +use crate::storages::fuse::operations::mutation::verify_compact_tasks; use crate::storages::fuse::operations::mutation::CompactSegmentTestFixture; #[tokio::test(flavor = "multi_thread")] @@ -149,8 +151,11 @@ async fn test_recluster_mutator_block_select() -> Result<()> { ); let need_recluster = mutator.target_select(compact_segments).await?; assert!(need_recluster); - assert_eq!(mutator.tasks.len(), 1); - let total_block_nums = mutator.tasks.iter().map(|t| t.parts.len()).sum::(); + let ReclusterTasks::Recluster { tasks, .. } = mutator.tasks else { + return Err(ErrorCode::Internal("Logical error, it's a bug")); + }; + assert_eq!(tasks.len(), 1); + let total_block_nums = tasks.iter().map(|t| t.parts.len()).sum::(); assert_eq!(total_block_nums, 3); Ok(()) @@ -202,6 +207,17 @@ async fn test_safety_for_recluster() -> Result<()> { number_of_segments, number_of_blocks, ); + let unclustered: bool = rand.gen(); + let mut unclustered_segment_indices = HashSet::new(); + if unclustered { + unclustered_segment_indices = block_number_of_segments + .iter() + .rev() + .enumerate() + .filter(|(_, &num)| num % 4 == 0) + .map(|(index, _)| index) + .collect(); + } let (locations, _, segment_infos) = CompactSegmentTestFixture::gen_segments( ctx.clone(), block_number_of_segments, @@ -209,6 +225,7 @@ async fn test_safety_for_recluster() -> Result<()> { threshold, Some(cluster_key_id), block_per_seg, + unclustered, ) .await?; @@ -240,7 +257,7 @@ async fn test_safety_for_recluster() -> Result<()> { } let ctx: Arc = ctx.clone(); - let segment_locations = create_segment_location_vector(locations, None); + let segment_locations = create_segment_location_vector(locations.clone(), None); let compact_segments = FuseTable::segment_pruning( &ctx, schema.clone(), @@ -284,39 +301,58 @@ async fn test_safety_for_recluster() -> Result<()> { eprintln!("need_recluster: {}", need_recluster); if need_recluster { - let tasks = mutator.tasks; - assert!(tasks.len() <= max_tasks && !tasks.is_empty()); - eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks); - let mut blocks = Vec::new(); - for task in tasks.into_iter() { - let parts = task.parts.partitions; - assert!(task.total_bytes <= recluster_block_size); - for part in parts.into_iter() { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - blocks.push(fuse_part.location.clone()); + match mutator.tasks { + ReclusterTasks::Recluster { + tasks, + remained_blocks, + removed_segment_indexes, + .. + } => { + assert!(unclustered_segment_indices.is_empty()); + assert!(tasks.len() <= max_tasks && !tasks.is_empty()); + eprintln!("tasks_num: {}, max_tasks: {}", tasks.len(), max_tasks); + let mut blocks = Vec::new(); + for task in tasks.into_iter() { + let parts = task.parts.partitions; + assert!(task.total_bytes <= recluster_block_size); + for part in parts.into_iter() { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + blocks.push(fuse_part.location.clone()); + } + } + + eprintln!( + "selected segments number {}, selected blocks number {}, remained blocks number {}", + removed_segment_indexes.len(), + blocks.len(), + remained_blocks.len() + ); + for remain in remained_blocks { + blocks.push(remain.location.0.clone()); + } + + let block_ids_after_target = HashSet::from_iter(blocks.into_iter()); + + let mut origin_blocks_ids = HashSet::new(); + for idx in &removed_segment_indexes { + for b in &segment_infos[*idx].blocks { + origin_blocks_ids.insert(b.location.0.clone()); + } + } + assert_eq!(block_ids_after_target, origin_blocks_ids); } - } - - let remained_blocks = std::mem::take(&mut mutator.remained_blocks); - eprintln!( - "selected segments number {}, selected blocks number {}, remained blocks number {}", - mutator.removed_segment_indexes.len(), - blocks.len(), - remained_blocks.len() - ); - for remain in remained_blocks { - blocks.push(remain.location.0.clone()); - } - - let block_ids_after_target = HashSet::from_iter(blocks.into_iter()); - - let mut origin_blocks_ids = HashSet::new(); - for idx in &mutator.removed_segment_indexes { - for b in &segment_infos[*idx].blocks { - origin_blocks_ids.insert(b.location.0.clone()); + ReclusterTasks::Compact(parts) => { + assert!(unclustered); + assert!(!unclustered_segment_indices.is_empty()); + verify_compact_tasks( + ctx.get_application_level_data_operator()?.operator(), + parts, + locations, + unclustered_segment_indices, + ) + .await?; } - } - assert_eq!(block_ids_after_target, origin_blocks_ids); + }; } } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 2dfeb574a64b..2f8ee2261e12 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -681,6 +681,7 @@ impl CompactSegmentTestFixture { BlockThresholds::default(), cluster_key_id, block_per_seg as usize, + false, ) .await?; let mut summary = Statistics::default(); @@ -704,6 +705,7 @@ impl CompactSegmentTestFixture { thresholds: BlockThresholds, cluster_key_id: Option, block_per_seg: usize, + unclustered: bool, ) -> Result<(Vec, Vec, Vec)> { let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned()); let data_accessor = ctx.get_application_level_data_operator()?.operator(); @@ -728,7 +730,7 @@ impl CompactSegmentTestFixture { let col_stats = gen_columns_statistics(&block, None, &schema)?; - let cluster_stats = if num_blocks % 5 == 0 { + let cluster_stats = if unclustered && num_blocks % 4 == 0 { None } else { cluster_key_id.map(|v| { @@ -1010,6 +1012,7 @@ async fn test_compact_segment_with_cluster() -> Result<()> { BlockThresholds::default(), Some(cluster_key_id), block_per_seg as usize, + false, ) .await?; let mut summary = Statistics::default(); diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index dd78404d195f..eaf0ace0e487 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -148,7 +148,7 @@ impl BlockCompactMutator { for (segment_idx, compact_segment) in segment_infos.into_iter() { let segments_vec = checker.add(segment_idx, compact_segment); for segments in segments_vec { - self.generate_part(segments, &mut parts, &mut checker); + checker.generate_part(segments, &mut parts); } let residual_segment_cnt = checker.segments.len(); @@ -181,11 +181,7 @@ impl BlockCompactMutator { } // finalize the compaction. - self.generate_part( - std::mem::take(&mut checker.segments), - &mut parts, - &mut checker, - ); + checker.finalize(&mut parts); // Status. let elapsed_time = start.elapsed(); @@ -298,28 +294,9 @@ impl BlockCompactMutator { } } } - - fn generate_part( - &mut self, - segments: Vec<(SegmentIndex, Arc)>, - parts: &mut Vec, - checker: &mut SegmentCompactChecker, - ) { - if !segments.is_empty() && checker.check_for_compact(&segments) { - let mut segment_indices = Vec::with_capacity(segments.len()); - let mut compact_segments = Vec::with_capacity(segments.len()); - for (idx, segment) in segments.into_iter() { - segment_indices.push(idx); - compact_segments.push(segment); - } - - let lazy_part = CompactLazyPartInfo::create(segment_indices, compact_segments); - parts.push(lazy_part); - } - } } -struct SegmentCompactChecker { +pub struct SegmentCompactChecker { segments: Vec<(SegmentIndex, Arc)>, total_block_count: u64, block_threshold: u64, @@ -330,7 +307,7 @@ struct SegmentCompactChecker { } impl SegmentCompactChecker { - fn new(block_threshold: u64, cluster_key_id: Option) -> Self { + pub fn new(block_threshold: u64, cluster_key_id: Option) -> Self { Self { segments: vec![], total_block_count: 0, @@ -364,7 +341,7 @@ impl SegmentCompactChecker { true } - fn add( + pub fn add( &mut self, idx: SegmentIndex, segment: Arc, @@ -389,6 +366,29 @@ impl SegmentCompactChecker { self.segments.push((idx, segment)); vec![std::mem::take(&mut self.segments)] } + + pub fn generate_part( + &mut self, + segments: Vec<(SegmentIndex, Arc)>, + parts: &mut Vec, + ) { + if !segments.is_empty() && self.check_for_compact(&segments) { + let mut segment_indices = Vec::with_capacity(segments.len()); + let mut compact_segments = Vec::with_capacity(segments.len()); + for (idx, segment) in segments.into_iter() { + segment_indices.push(idx); + compact_segments.push(segment); + } + + let lazy_part = CompactLazyPartInfo::create(segment_indices, compact_segments); + parts.push(lazy_part); + } + } + + pub fn finalize(&mut self, parts: &mut Vec) { + let final_segments = std::mem::take(&mut self.segments); + self.generate_part(final_segments, parts); + } } struct CompactTaskBuilder { diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs b/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs index 7fd293d87803..0f63594424c0 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/mod.rs @@ -17,7 +17,9 @@ mod recluster_mutator; mod segment_compact_mutator; pub use block_compact_mutator::BlockCompactMutator; +pub use block_compact_mutator::SegmentCompactChecker; pub use recluster_mutator::ReclusterMutator; +pub use recluster_mutator::ReclusterTasks; pub use segment_compact_mutator::SegmentCompactMutator; pub use segment_compact_mutator::SegmentCompactionState; pub use segment_compact_mutator::SegmentCompactor; diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index 91d89acf831c..248c3a413524 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -19,6 +19,8 @@ use std::collections::HashSet; use std::sync::Arc; use databend_common_base::runtime::execute_futures_in_parallel; +use databend_common_catalog::plan::Partitions; +use databend_common_catalog::plan::PartitionsShuffleKind; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -41,6 +43,9 @@ use minitrace::full_name; use minitrace::future::FutureExt; use minitrace::Span; +use crate::operations::mutation::SegmentCompactChecker; +use crate::operations::BlockCompactMutator; +use crate::operations::CompactLazyPartInfo; use crate::statistics::reducers::merge_statistics_mut; use crate::FuseTable; use crate::SegmentLocation; @@ -49,6 +54,39 @@ use crate::DEFAULT_BLOCK_PER_SEGMENT; use crate::FUSE_OPT_KEY_BLOCK_PER_SEGMENT; use crate::FUSE_OPT_KEY_ROW_AVG_DEPTH_THRESHOLD; +#[derive(Clone)] +pub enum ReclusterTasks { + Recluster { + tasks: Vec, + remained_blocks: Vec>, + removed_segment_indexes: Vec, + removed_segment_summary: Statistics, + }, + Compact(Partitions), +} + +impl ReclusterTasks { + pub fn is_empty(&self) -> bool { + match self { + ReclusterTasks::Recluster { tasks, .. } => tasks.is_empty(), + ReclusterTasks::Compact(parts) => parts.is_empty(), + } + } + + pub fn new_recluster_tasks() -> Self { + Self::Recluster { + tasks: vec![], + remained_blocks: vec![], + removed_segment_indexes: vec![], + removed_segment_summary: Statistics::default(), + } + } + + pub fn new_compact_tasks() -> Self { + Self::Compact(Partitions::default()) + } +} + #[derive(Clone)] pub struct ReclusterMutator { pub(crate) ctx: Arc, @@ -61,11 +99,8 @@ pub struct ReclusterMutator { pub(crate) cluster_key_types: Vec, pub snapshot: Arc, - pub tasks: Vec, pub recluster_blocks_count: u64, - pub remained_blocks: Vec>, - pub removed_segment_indexes: Vec, - pub removed_segment_summary: Statistics, + pub tasks: ReclusterTasks, } impl ReclusterMutator { @@ -106,14 +141,12 @@ impl ReclusterMutator { block_per_seg, cluster_key_types, snapshot, - tasks: Vec::new(), - remained_blocks: Vec::new(), recluster_blocks_count: 0, - removed_segment_indexes: Vec::new(), - removed_segment_summary: Statistics::default(), + tasks: ReclusterTasks::new_recluster_tasks(), }) } + /// Used for tests. #[allow(clippy::too_many_arguments)] pub fn new( ctx: Arc, @@ -136,11 +169,8 @@ impl ReclusterMutator { block_per_seg, cluster_key_types, snapshot, - tasks: Vec::new(), - remained_blocks: Vec::new(), recluster_blocks_count: 0, - removed_segment_indexes: Vec::new(), - removed_segment_summary: Statistics::default(), + tasks: ReclusterTasks::new_recluster_tasks(), } } @@ -148,6 +178,19 @@ impl ReclusterMutator { pub async fn target_select( &mut self, compact_segments: Vec<(SegmentLocation, Arc)>, + ) -> Result { + match self.tasks { + ReclusterTasks::Compact(_) => self.generate_compact_tasks(compact_segments).await, + ReclusterTasks::Recluster { .. } => { + self.generate_recluster_tasks(compact_segments).await + } + } + } + + #[async_backtrace::framed] + pub async fn generate_recluster_tasks( + &mut self, + compact_segments: Vec<(SegmentLocation, Arc)>, ) -> Result { let mut selected_segments = Vec::with_capacity(compact_segments.len()); let mut selected_indices = Vec::with_capacity(compact_segments.len()); @@ -178,6 +221,7 @@ impl ReclusterMutator { let column_nodes = ColumnNodes::new_from_schema(&arrow_schema, Some(&self.schema)); let mut remained_blocks = Vec::new(); + let mut tasks = Vec::new(); let mut selected = false; for (level, block_metas) in blocks_map.into_iter() { let len = block_metas.len(); @@ -213,13 +257,13 @@ impl ReclusterMutator { .into_iter() .map(|meta| (None, meta)) .collect::>(); - self.generate_task( + tasks.push(self.generate_task( &block_metas, &column_nodes, total_rows as usize, total_bytes as usize, level, - ); + )); selected = true; continue; } @@ -250,19 +294,19 @@ impl ReclusterMutator { let block_size = block_meta.block_size as usize; let row_count = block_meta.row_count as usize; if task_bytes + block_size > memory_threshold && selected_blocks.len() > 1 { - self.generate_task( + tasks.push(self.generate_task( &selected_blocks, &column_nodes, task_rows, task_bytes, level, - ); + )); task_rows = 0; task_bytes = 0; selected_blocks.clear(); - if self.tasks.len() >= self.max_tasks { + if tasks.len() >= self.max_tasks { remained_blocks.push(block_meta); over_memory = true; continue; @@ -278,32 +322,88 @@ impl ReclusterMutator { match selected_blocks.len() { 0 => (), 1 => remained_blocks.push(selected_blocks[0].1.clone()), - _ => self.generate_task( + _ => tasks.push(self.generate_task( &selected_blocks, &column_nodes, task_rows, task_bytes, level, - ), + )), } selected = true; } if selected { - self.remained_blocks = remained_blocks; - selected_indices.sort_by(|a, b| b.cmp(a)); - self.removed_segment_indexes = selected_indices; let default_cluster_key_id = Some(self.cluster_key_id); + let mut removed_segment_summary = Statistics::default(); selected_statistics.iter().for_each(|v| { - merge_statistics_mut(&mut self.removed_segment_summary, v, default_cluster_key_id) + merge_statistics_mut(&mut removed_segment_summary, v, default_cluster_key_id) }); + self.tasks = ReclusterTasks::Recluster { + tasks, + remained_blocks, + removed_segment_indexes: selected_indices, + removed_segment_summary, + }; } Ok(selected) } + async fn generate_compact_tasks( + &mut self, + compact_segments: Vec<(SegmentLocation, Arc)>, + ) -> Result { + let mut parts = Vec::new(); + let mut checker = + SegmentCompactChecker::new(self.block_per_seg as u64, Some(self.cluster_key_id)); + + for (loc, compact_segment) in compact_segments.into_iter() { + self.recluster_blocks_count += compact_segment.summary.block_count; + let segments_vec = checker.add(loc.segment_idx, compact_segment); + for segments in segments_vec { + checker.generate_part(segments, &mut parts); + } + } + // finalize the compaction. + checker.finalize(&mut parts); + + let cluster = self.ctx.get_cluster(); + let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + let partitions = if cluster.is_empty() || parts.len() < cluster.nodes.len() * max_threads { + // NOTE: The snapshot schema does not contain the stream column. + let column_ids = self.snapshot.schema.to_leaf_column_id_set(); + let lazy_parts = parts + .into_iter() + .map(|v| { + v.as_any() + .downcast_ref::() + .unwrap() + .clone() + }) + .collect::>(); + Partitions::create( + PartitionsShuffleKind::Mod, + BlockCompactMutator::build_compact_tasks( + self.ctx.clone(), + column_ids, + Some(self.cluster_key_id), + self.block_thresholds, + lazy_parts, + ) + .await?, + ) + } else { + Partitions::create(PartitionsShuffleKind::Mod, parts) + }; + + let selected = !partitions.is_empty(); + self.tasks = ReclusterTasks::Compact(partitions); + Ok(selected) + } + fn generate_task( &mut self, block_metas: &[(Option, Arc)], @@ -311,18 +411,17 @@ impl ReclusterMutator { total_rows: usize, total_bytes: usize, level: i32, - ) { + ) -> ReclusterTask { let (stats, parts) = FuseTable::to_partitions(Some(&self.schema), block_metas, column_nodes, None, None); - let task = ReclusterTask { + self.recluster_blocks_count += block_metas.len() as u64; + ReclusterTask { parts, stats, total_rows, total_bytes, level, - }; - self.tasks.push(task); - self.recluster_blocks_count += block_metas.len() as u64; + } } pub fn select_segments( @@ -333,8 +432,23 @@ impl ReclusterMutator { let mut blocks_num = 0; let mut indices = IndexSet::new(); let mut points_map: HashMap, (Vec, Vec)> = HashMap::new(); + let mut unclustered_sg = IndexSet::new(); for (i, (_, compact_segment)) in compact_segments.iter().enumerate() { - if !self.segment_can_recluster(&compact_segment.summary) { + let mut level = -1; + let clustered = compact_segment + .summary + .cluster_stats + .as_ref() + .is_some_and(|v| { + level = v.level; + v.cluster_key_id == self.cluster_key_id + }); + if !clustered { + unclustered_sg.insert(i); + continue; + } + + if level < 0 && (compact_segment.summary.block_count as usize) >= self.block_per_seg { continue; } @@ -352,6 +466,11 @@ impl ReclusterMutator { } } + if !unclustered_sg.is_empty() { + self.tasks = ReclusterTasks::Compact(Partitions::default()); + return Ok(unclustered_sg); + } + if indices.len() < 2 || blocks_num < self.block_per_seg { return Ok(indices); } @@ -522,4 +641,18 @@ impl ReclusterMutator { let set: HashSet = HashSet::from_iter(start.iter().chain(end.iter()).cloned()); set.len() == 2 } + + pub fn is_distributed(&self) -> bool { + match &self.tasks { + ReclusterTasks::Recluster { tasks, .. } => tasks.len() > 1, + ReclusterTasks::Compact(_) => { + (!self.ctx.get_cluster().is_empty()) + && self + .ctx + .get_settings() + .get_enable_distributed_compact() + .unwrap_or(false) + } + } + } } diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test index ead5c3f2e817..83b81287bb62 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table.test @@ -781,6 +781,53 @@ select * from clustering_information('db_09_0008','t14') (a, b) 1 0 0 0.0 1.0 {"00001":1} +statement ok +create table t15(a int not null) row_per_block=3 + +statement ok +insert into t15 values(0),(1),(4) + +statement ok +insert into t15 values(3) + +statement ok +insert into t15 values(-6),(-8) + +statement ok +ALTER TABLE t15 cluster by(abs(a)) + +statement ok +insert into t15 values(2),(5),(-7) + +query TIIIFFT +select * from clustering_information('db_09_0008','t15') +---- +(abs(a)) 4 0 3 0.0 1.0 {"00001":1} + +statement ok +alter table t15 recluster + +query TIIIFFT +select * from clustering_information('db_09_0008','t15') +---- +(abs(a)) 3 0 0 2.0 3.0 {"00003":3} + +statement ok +alter table t15 recluster + +query TIIIFFT +select * from clustering_information('db_09_0008','t15') +---- +(abs(a)) 3 0 0 0.0 1.0 {"00001":3} + +query III +select segment_count, block_count, row_count from fuse_snapshot('db_09_0008','t15') limit 3 +---- +1 3 9 +2 3 9 +4 4 9 + + statement ok DROP DATABASE db_09_0008 diff --git a/tests/udf/udf_server.py b/tests/udf/udf_server.py index 9bd9b9d91950..0472cb9d8974 100644 --- a/tests/udf/udf_server.py +++ b/tests/udf/udf_server.py @@ -56,7 +56,10 @@ def gcd(x: int, y: int) -> int: (x, y) = (y, x % y) return x + gcd_error_cnt = 0 + + @udf( name="gcd_error", input_types=["INT", "INT"], @@ -71,7 +74,7 @@ def gcd_error(x: int, y: int) -> int: while y != 0: (x, y) = (y, x % y) return x - + @udf(input_types=["VARCHAR", "VARCHAR", "VARCHAR"], result_type="VARCHAR") def split_and_join(s: str, split_s: str, join_s: str) -> str: