Skip to content

Commit

Permalink
chore: set peer_start to current row when do ranking functions (datab…
Browse files Browse the repository at this point in the history
…endlabs#14450)

set peer_start to current row when do ranking functions
  • Loading branch information
ariesdevil authored Jan 24, 2024
1 parent 53a41aa commit 19e2c9a
Showing 1 changed file with 198 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ impl<T: Number> TransformWindow<T> {
self.frame_ended = self.partition_ended;
}

// Advance the current row to the next row
// if the current row is the last row of the current block, advance the current block and row = 0
/// Advance the current row to the next row
/// if the current row is the last row of the current block, advance the current block and row = 0
fn advance_row(&self, mut row: RowPtr) -> RowPtr {
debug_assert!(row.block >= self.first_block);

Expand All @@ -362,6 +362,7 @@ impl<T: Number> TransformWindow<T> {
row
}

/// calculate peer_group_end
fn advance_peer_group_end(&mut self, mut row: RowPtr) {
if !self.need_peer {
return;
Expand Down Expand Up @@ -436,6 +437,7 @@ impl<T: Number> TransformWindow<T> {
break;
}
}

// Release memory that is no longer needed.
let first_used_block = if self.is_ranking {
self.next_output_block.min(self.peer_group_start.block)
Expand Down Expand Up @@ -968,6 +970,8 @@ where T: Number + ResultTypeOfUnary
} else if self.is_null_frame {
// Only one null frame can exist in one partition, so we don't need to check it again.
self.need_check_null_frame = false;
} else if self.is_ranking {
self.peer_group_start = self.current_row;
}

// execute only once for each partition.
Expand All @@ -993,7 +997,7 @@ where T: Number + ResultTypeOfUnary
if self.is_empty_frame && !self.is_null_frame {
// Non-NULL empty frame, no need to advance bounds.
self.func.reset();
} else {
} else if !self.is_ranking {
self.advance_frame_start();
if !self.frame_started {
debug_assert!(!self.input_is_finished);
Expand Down Expand Up @@ -1302,6 +1306,28 @@ mod tests {
)
}

fn get_transform_window_without_partition(
_unit: WindowFuncFrameUnits,
bounds: (FrameBound<u64>, FrameBound<u64>),
arg_type: DataType,
) -> Result<TransformWindow<u64>> {
let agg = AggregateFunctionFactory::instance().get("sum", vec![], vec![arg_type])?;
let func = WindowFunctionInfo::Aggregate(agg, vec![0]);
TransformWindow::try_create_rows(
InputPort::create(),
OutputPort::create(),
func,
vec![],
vec![SortColumnDescription {
offset: 0,
asc: false,
nulls_first: false,
is_nullable: false,
}],
bounds,
)
}

fn get_transform_window(
_unit: WindowFuncFrameUnits,
bounds: (FrameBound<u64>, FrameBound<u64>),
Expand Down Expand Up @@ -1465,21 +1491,32 @@ mod tests {

#[test]
fn test_block_release() -> Result<()> {
// ranking function release early
{
let mut transform = get_ranking_transform_window((
FrameBound::Preceding(None),
FrameBound::CurrentRow,
))?;

// peer for 3 cross three blocks
transform.add_block(Some(DataBlock::new_from_columns(vec![
Int32Type::from_data(vec![1, 1, 1, 2, 2, 3, 3, 3]),
])))?;

transform.check_outputs();
assert_eq!(transform.blocks.len(), 1);

transform.add_block(Some(DataBlock::new_from_columns(vec![
Int32Type::from_data(vec![3, 3, 3]),
])))?;

transform.check_outputs();
assert_eq!(transform.blocks.len(), 1);

transform.add_block(Some(DataBlock::new_from_columns(vec![
Int32Type::from_data(vec![3, 4, 4]),
])))?;

assert_eq!(transform.blocks.len(), 2);
transform.check_outputs();
assert_eq!(transform.blocks.len(), 1);

Expand All @@ -1503,6 +1540,21 @@ mod tests {
&[output],
);

let output = transform.outputs.pop_front().unwrap();

assert_blocks_eq(
vec![
"+----------+----------+",
"| Column 0 | Column 1 |",
"+----------+----------+",
"| 3 | 3 |",
"| 3 | 3 |",
"| 3 | 3 |",
"+----------+----------+",
],
&[output],
);

transform.input_is_finished = true;
transform.add_block(None)?;
transform.check_outputs();
Expand All @@ -1522,6 +1574,148 @@ mod tests {
);
}

// ranking function release early
{
let mut transform = get_ranking_transform_window((
FrameBound::Preceding(None),
FrameBound::CurrentRow,
))?;

// peer not cross any blocks
transform.add_block(Some(DataBlock::new_from_columns(vec![
Int32Type::from_data(vec![1, 1, 1]),
])))?;

transform.check_outputs();
assert_eq!(transform.blocks.len(), 1);

transform.add_block(Some(DataBlock::new_from_columns(vec![
Int32Type::from_data(vec![2, 2, 2]),
])))?;

transform.check_outputs();
assert_eq!(transform.blocks.len(), 1);

transform.add_block(Some(DataBlock::new_from_columns(vec![
Int32Type::from_data(vec![3, 3, 3]),
])))?;

transform.check_outputs();
assert_eq!(transform.blocks.len(), 1);

let output = transform.outputs.pop_front().unwrap();

assert_blocks_eq(
vec![
"+----------+----------+",
"| Column 0 | Column 1 |",
"+----------+----------+",
"| 1 | 1 |",
"| 1 | 1 |",
"| 1 | 1 |",
"+----------+----------+",
],
&[output],
);

let output = transform.outputs.pop_front().unwrap();

assert_blocks_eq(
vec![
"+----------+----------+",
"| Column 0 | Column 1 |",
"+----------+----------+",
"| 2 | 2 |",
"| 2 | 2 |",
"| 2 | 2 |",
"+----------+----------+",
],
&[output],
);

transform.input_is_finished = true;
transform.add_block(None)?;
transform.check_outputs();
let output = transform.outputs.pop_front().unwrap();

assert_blocks_eq(
vec![
"+----------+----------+",
"| Column 0 | Column 1 |",
"+----------+----------+",
"| 3 | 3 |",
"| 3 | 3 |",
"| 3 | 3 |",
"+----------+----------+",
],
&[output],
);
}

// normal agg function
{
let mut transform = get_transform_window_without_partition(
WindowFuncFrameUnits::Rows,
(FrameBound::Preceding(None), FrameBound::CurrentRow),
DataType::Number(NumberDataType::Int32),
)?;

transform.add_block(Some(DataBlock::new_from_columns(vec![
Int32Type::from_data(vec![1, 1, 1, 2, 2, 3, 3, 3]),
])))?;

transform.check_outputs();
assert_eq!(transform.blocks.len(), 1);

transform.add_block(Some(DataBlock::new_from_columns(vec![
Int32Type::from_data(vec![3, 4, 4]),
])))?;

transform.check_outputs();
assert_eq!(transform.blocks.len(), 2);

let output = transform.outputs.pop_front().unwrap();

assert_blocks_eq(
vec![
"+----------+----------+",
"| Column 0 | Column 1 |",
"+----------+----------+",
"| 1 | 1 |",
"| 1 | 2 |",
"| 1 | 3 |",
"| 2 | 5 |",
"| 2 | 7 |",
"| 3 | 10 |",
"| 3 | 13 |",
"| 3 | 16 |",
"+----------+----------+",
],
&[output],
);

transform.input_is_finished = true;

transform.add_block(None)?;

transform.check_outputs();

let output = transform.outputs.pop_front().unwrap();

assert_blocks_eq(
vec![
"+----------+----------+",
"| Column 0 | Column 1 |",
"+----------+----------+",
"| 3 | 19 |",
"| 4 | 23 |",
"| 4 | 27 |",
"+----------+----------+",
],
&[output],
);
}

Ok(())
}

Expand Down

0 comments on commit 19e2c9a

Please sign in to comment.