From 19e2c9a03f8198873168872e2eec338facc00529 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Wed, 24 Jan 2024 20:32:05 +0800 Subject: [PATCH] chore: set peer_start to current row when do ranking functions (#14450) set peer_start to current row when do ranking functions --- .../transforms/window/transform_window.rs | 202 +++++++++++++++++- 1 file changed, 198 insertions(+), 4 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs b/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs index 5e1c9df1a34d..909588fa61d2 100644 --- a/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs +++ b/src/query/service/src/pipelines/processors/transforms/window/transform_window.rs @@ -334,8 +334,8 @@ impl TransformWindow { self.frame_ended = self.partition_ended; } - // Advance the current row to the next row - // if the current row is the last row of the current block, advance the current block and row = 0 + /// Advance the current row to the next row + /// if the current row is the last row of the current block, advance the current block and row = 0 fn advance_row(&self, mut row: RowPtr) -> RowPtr { debug_assert!(row.block >= self.first_block); @@ -362,6 +362,7 @@ impl TransformWindow { row } + /// calculate peer_group_end fn advance_peer_group_end(&mut self, mut row: RowPtr) { if !self.need_peer { return; @@ -436,6 +437,7 @@ impl TransformWindow { break; } } + // Release memory that is no longer needed. let first_used_block = if self.is_ranking { self.next_output_block.min(self.peer_group_start.block) @@ -968,6 +970,8 @@ where T: Number + ResultTypeOfUnary } else if self.is_null_frame { // Only one null frame can exist in one partition, so we don't need to check it again. self.need_check_null_frame = false; + } else if self.is_ranking { + self.peer_group_start = self.current_row; } // execute only once for each partition. @@ -993,7 +997,7 @@ where T: Number + ResultTypeOfUnary if self.is_empty_frame && !self.is_null_frame { // Non-NULL empty frame, no need to advance bounds. self.func.reset(); - } else { + } else if !self.is_ranking { self.advance_frame_start(); if !self.frame_started { debug_assert!(!self.input_is_finished); @@ -1302,6 +1306,28 @@ mod tests { ) } + fn get_transform_window_without_partition( + _unit: WindowFuncFrameUnits, + bounds: (FrameBound, FrameBound), + arg_type: DataType, + ) -> Result> { + let agg = AggregateFunctionFactory::instance().get("sum", vec![], vec![arg_type])?; + let func = WindowFunctionInfo::Aggregate(agg, vec![0]); + TransformWindow::try_create_rows( + InputPort::create(), + OutputPort::create(), + func, + vec![], + vec![SortColumnDescription { + offset: 0, + asc: false, + nulls_first: false, + is_nullable: false, + }], + bounds, + ) + } + fn get_transform_window( _unit: WindowFuncFrameUnits, bounds: (FrameBound, FrameBound), @@ -1465,21 +1491,32 @@ mod tests { #[test] fn test_block_release() -> Result<()> { + // ranking function release early { let mut transform = get_ranking_transform_window(( FrameBound::Preceding(None), FrameBound::CurrentRow, ))?; + // peer for 3 cross three blocks transform.add_block(Some(DataBlock::new_from_columns(vec![ Int32Type::from_data(vec![1, 1, 1, 2, 2, 3, 3, 3]), ])))?; + transform.check_outputs(); + assert_eq!(transform.blocks.len(), 1); + + transform.add_block(Some(DataBlock::new_from_columns(vec![ + Int32Type::from_data(vec![3, 3, 3]), + ])))?; + + transform.check_outputs(); + assert_eq!(transform.blocks.len(), 1); + transform.add_block(Some(DataBlock::new_from_columns(vec![ Int32Type::from_data(vec![3, 4, 4]), ])))?; - assert_eq!(transform.blocks.len(), 2); transform.check_outputs(); assert_eq!(transform.blocks.len(), 1); @@ -1503,6 +1540,21 @@ mod tests { &[output], ); + let output = transform.outputs.pop_front().unwrap(); + + assert_blocks_eq( + vec![ + "+----------+----------+", + "| Column 0 | Column 1 |", + "+----------+----------+", + "| 3 | 3 |", + "| 3 | 3 |", + "| 3 | 3 |", + "+----------+----------+", + ], + &[output], + ); + transform.input_is_finished = true; transform.add_block(None)?; transform.check_outputs(); @@ -1522,6 +1574,148 @@ mod tests { ); } + // ranking function release early + { + let mut transform = get_ranking_transform_window(( + FrameBound::Preceding(None), + FrameBound::CurrentRow, + ))?; + + // peer not cross any blocks + transform.add_block(Some(DataBlock::new_from_columns(vec![ + Int32Type::from_data(vec![1, 1, 1]), + ])))?; + + transform.check_outputs(); + assert_eq!(transform.blocks.len(), 1); + + transform.add_block(Some(DataBlock::new_from_columns(vec![ + Int32Type::from_data(vec![2, 2, 2]), + ])))?; + + transform.check_outputs(); + assert_eq!(transform.blocks.len(), 1); + + transform.add_block(Some(DataBlock::new_from_columns(vec![ + Int32Type::from_data(vec![3, 3, 3]), + ])))?; + + transform.check_outputs(); + assert_eq!(transform.blocks.len(), 1); + + let output = transform.outputs.pop_front().unwrap(); + + assert_blocks_eq( + vec![ + "+----------+----------+", + "| Column 0 | Column 1 |", + "+----------+----------+", + "| 1 | 1 |", + "| 1 | 1 |", + "| 1 | 1 |", + "+----------+----------+", + ], + &[output], + ); + + let output = transform.outputs.pop_front().unwrap(); + + assert_blocks_eq( + vec![ + "+----------+----------+", + "| Column 0 | Column 1 |", + "+----------+----------+", + "| 2 | 2 |", + "| 2 | 2 |", + "| 2 | 2 |", + "+----------+----------+", + ], + &[output], + ); + + transform.input_is_finished = true; + transform.add_block(None)?; + transform.check_outputs(); + let output = transform.outputs.pop_front().unwrap(); + + assert_blocks_eq( + vec![ + "+----------+----------+", + "| Column 0 | Column 1 |", + "+----------+----------+", + "| 3 | 3 |", + "| 3 | 3 |", + "| 3 | 3 |", + "+----------+----------+", + ], + &[output], + ); + } + + // normal agg function + { + let mut transform = get_transform_window_without_partition( + WindowFuncFrameUnits::Rows, + (FrameBound::Preceding(None), FrameBound::CurrentRow), + DataType::Number(NumberDataType::Int32), + )?; + + transform.add_block(Some(DataBlock::new_from_columns(vec![ + Int32Type::from_data(vec![1, 1, 1, 2, 2, 3, 3, 3]), + ])))?; + + transform.check_outputs(); + assert_eq!(transform.blocks.len(), 1); + + transform.add_block(Some(DataBlock::new_from_columns(vec![ + Int32Type::from_data(vec![3, 4, 4]), + ])))?; + + transform.check_outputs(); + assert_eq!(transform.blocks.len(), 2); + + let output = transform.outputs.pop_front().unwrap(); + + assert_blocks_eq( + vec![ + "+----------+----------+", + "| Column 0 | Column 1 |", + "+----------+----------+", + "| 1 | 1 |", + "| 1 | 2 |", + "| 1 | 3 |", + "| 2 | 5 |", + "| 2 | 7 |", + "| 3 | 10 |", + "| 3 | 13 |", + "| 3 | 16 |", + "+----------+----------+", + ], + &[output], + ); + + transform.input_is_finished = true; + + transform.add_block(None)?; + + transform.check_outputs(); + + let output = transform.outputs.pop_front().unwrap(); + + assert_blocks_eq( + vec![ + "+----------+----------+", + "| Column 0 | Column 1 |", + "+----------+----------+", + "| 3 | 19 |", + "| 4 | 23 |", + "| 4 | 27 |", + "+----------+----------+", + ], + &[output], + ); + } + Ok(()) }