Skip to content

Commit

Permalink
fix: fetch is missed in the EnsureSorting (#14192)
Browse files Browse the repository at this point in the history
* fix: fetch is missed in the EnsureSorting

* fix conflict

* resolve comments from alamb

* update
  • Loading branch information
xudong963 authored Jan 20, 2025
1 parent d3f1c9a commit 163314d
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 13 deletions.
16 changes: 9 additions & 7 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,11 @@ impl PhysicalOptimizerRule for EnforceSorting {
)
})
.data()?;

// Execute a top-down traversal to exploit sort push-down opportunities
// missed by the bottom-up traversal:
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
let adjusted = pushdown_sorts(sort_pushdown)?;

adjusted
.plan
.transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
Expand Down Expand Up @@ -403,7 +401,10 @@ fn analyze_immediate_sort_removal(
{
// Replace the sort with a sort-preserving merge:
let expr = LexOrdering::new(sort_exec.expr().to_vec());
Arc::new(SortPreservingMergeExec::new(expr, Arc::clone(sort_input))) as _
Arc::new(
SortPreservingMergeExec::new(expr, Arc::clone(sort_input))
.with_fetch(sort_exec.fetch()),
) as _
} else {
// Remove the sort:
node.children = node.children.swap_remove(0).children;
Expand Down Expand Up @@ -626,11 +627,12 @@ fn remove_corresponding_sort_from_sub_plan(
// If there is existing ordering, to preserve ordering use
// `SortPreservingMergeExec` instead of a `CoalescePartitionsExec`.
let plan = Arc::clone(&node.plan);
let fetch = plan.fetch();
let plan = if let Some(ordering) = plan.output_ordering() {
Arc::new(SortPreservingMergeExec::new(
LexOrdering::new(ordering.to_vec()),
plan,
)) as _
Arc::new(
SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan)
.with_fetch(fetch),
) as _
} else {
Arc::new(CoalescePartitionsExec::new(plan)) as _
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,20 @@ fn plan_with_order_preserving_variants(
// Flag indicating that it is desirable to replace `CoalescePartitionsExec`s
// with `SortPreservingMergeExec`s:
is_spm_better: bool,
fetch: Option<usize>,
) -> Result<OrderPreservationContext> {
sort_input.children = sort_input
.children
.into_iter()
.map(|node| {
// Update descendants in the given tree if there is a connection:
if node.data {
plan_with_order_preserving_variants(node, is_spr_better, is_spm_better)
plan_with_order_preserving_variants(
node,
is_spr_better,
is_spm_better,
fetch,
)
} else {
Ok(node)
}
Expand All @@ -133,7 +139,8 @@ fn plan_with_order_preserving_variants(
if let Some(ordering) = child.output_ordering() {
// When the input of a `CoalescePartitionsExec` has an ordering,
// replace it with a `SortPreservingMergeExec` if appropriate:
let spm = SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child));
let spm = SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child))
.with_fetch(fetch);
sort_input.plan = Arc::new(spm) as _;
sort_input.children[0].data = true;
return Ok(sort_input);
Expand Down Expand Up @@ -252,6 +259,7 @@ pub(crate) fn replace_with_order_preserving_variants(
requirements.children.swap_remove(0),
is_spr_better || use_order_preserving_variant,
is_spm_better || use_order_preserving_variant,
requirements.plan.fetch(),
)?;

// If the alternate plan makes this sort unnecessary, accept the alternate:
Expand Down
10 changes: 7 additions & 3 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) {
for (child, requirement) in node.children.iter_mut().zip(reqs) {
child.data = ParentRequirements {
ordering_requirement: requirement,
fetch: None,
// If the parent has a fetch value, assign it to the children
// Or use the fetch value of the child.
fetch: child.plan.fetch(),
};
}
}
Expand Down Expand Up @@ -95,6 +97,7 @@ fn pushdown_sorts_helper(
.ordering_satisfy_requirement(&parent_reqs);

if is_sort(plan) {
let sort_fetch = plan.fetch();
let required_ordering = plan
.output_ordering()
.cloned()
Expand All @@ -103,7 +106,8 @@ fn pushdown_sorts_helper(
if !satisfy_parent {
// Make sure this `SortExec` satisfies parent requirements:
let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default();
let fetch = requirements.data.fetch;
// It's possible current plan (`SortExec`) has a fetch value.
let fetch = requirements.data.fetch.or(sort_fetch);
requirements = requirements.children.swap_remove(0);
requirements = add_sort_above(requirements, sort_reqs, fetch);
};
Expand All @@ -113,7 +117,7 @@ fn pushdown_sorts_helper(
if let Some(adjusted) =
pushdown_requirement_to_children(&child.plan, &required_ordering)?
{
let fetch = child.plan.fetch();
let fetch = sort_fetch.or_else(|| child.plan.fetch());
for (grand_child, order) in child.children.iter_mut().zip(adjusted) {
grand_child.data = ParentRequirements {
ordering_requirement: order,
Expand Down
7 changes: 6 additions & 1 deletion datafusion/sqllogictest/test_files/topk.slt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,12 @@ select * from topk order by x desc limit 3;
8
5


query I
select * from (select * from topk limit 8) order by x limit 3;
----
0
1
2


statement ok
Expand Down

0 comments on commit 163314d

Please sign in to comment.