diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 167f9d6d45e7..76bb49efb64e 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -186,13 +186,11 @@ impl PhysicalOptimizerRule for EnforceSorting { ) }) .data()?; - // Execute a top-down traversal to exploit sort push-down opportunities // missed by the bottom-up traversal: let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan); assign_initial_requirements(&mut sort_pushdown); let adjusted = pushdown_sorts(sort_pushdown)?; - adjusted .plan .transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?))) @@ -403,7 +401,10 @@ fn analyze_immediate_sort_removal( { // Replace the sort with a sort-preserving merge: let expr = LexOrdering::new(sort_exec.expr().to_vec()); - Arc::new(SortPreservingMergeExec::new(expr, Arc::clone(sort_input))) as _ + Arc::new( + SortPreservingMergeExec::new(expr, Arc::clone(sort_input)) + .with_fetch(sort_exec.fetch()), + ) as _ } else { // Remove the sort: node.children = node.children.swap_remove(0).children; @@ -626,11 +627,12 @@ fn remove_corresponding_sort_from_sub_plan( // If there is existing ordering, to preserve ordering use // `SortPreservingMergeExec` instead of a `CoalescePartitionsExec`. let plan = Arc::clone(&node.plan); + let fetch = plan.fetch(); let plan = if let Some(ordering) = plan.output_ordering() { - Arc::new(SortPreservingMergeExec::new( - LexOrdering::new(ordering.to_vec()), - plan, - )) as _ + Arc::new( + SortPreservingMergeExec::new(LexOrdering::new(ordering.to_vec()), plan) + .with_fetch(fetch), + ) as _ } else { Arc::new(CoalescePartitionsExec::new(plan)) as _ }; diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index f32ffa8a5830..fa6d7f62166a 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -100,6 +100,7 @@ fn plan_with_order_preserving_variants( // Flag indicating that it is desirable to replace `CoalescePartitionsExec`s // with `SortPreservingMergeExec`s: is_spm_better: bool, + fetch: Option, ) -> Result { sort_input.children = sort_input .children @@ -107,7 +108,12 @@ fn plan_with_order_preserving_variants( .map(|node| { // Update descendants in the given tree if there is a connection: if node.data { - plan_with_order_preserving_variants(node, is_spr_better, is_spm_better) + plan_with_order_preserving_variants( + node, + is_spr_better, + is_spm_better, + fetch, + ) } else { Ok(node) } @@ -133,7 +139,8 @@ fn plan_with_order_preserving_variants( if let Some(ordering) = child.output_ordering() { // When the input of a `CoalescePartitionsExec` has an ordering, // replace it with a `SortPreservingMergeExec` if appropriate: - let spm = SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child)); + let spm = SortPreservingMergeExec::new(ordering.clone(), Arc::clone(child)) + .with_fetch(fetch); sort_input.plan = Arc::new(spm) as _; sort_input.children[0].data = true; return Ok(sort_input); @@ -252,6 +259,7 @@ pub(crate) fn replace_with_order_preserving_variants( requirements.children.swap_remove(0), is_spr_better || use_order_preserving_variant, is_spm_better || use_order_preserving_variant, + requirements.plan.fetch(), )?; // If the alternate plan makes this sort unnecessary, accept the alternate: diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index 6c761f674b3b..e24e66250704 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -63,7 +63,9 @@ pub fn assign_initial_requirements(node: &mut SortPushDown) { for (child, requirement) in node.children.iter_mut().zip(reqs) { child.data = ParentRequirements { ordering_requirement: requirement, - fetch: None, + // If the parent has a fetch value, assign it to the children + // Or use the fetch value of the child. + fetch: child.plan.fetch(), }; } } @@ -95,6 +97,7 @@ fn pushdown_sorts_helper( .ordering_satisfy_requirement(&parent_reqs); if is_sort(plan) { + let sort_fetch = plan.fetch(); let required_ordering = plan .output_ordering() .cloned() @@ -103,7 +106,8 @@ fn pushdown_sorts_helper( if !satisfy_parent { // Make sure this `SortExec` satisfies parent requirements: let sort_reqs = requirements.data.ordering_requirement.unwrap_or_default(); - let fetch = requirements.data.fetch; + // It's possible current plan (`SortExec`) has a fetch value. + let fetch = requirements.data.fetch.or(sort_fetch); requirements = requirements.children.swap_remove(0); requirements = add_sort_above(requirements, sort_reqs, fetch); }; @@ -113,7 +117,7 @@ fn pushdown_sorts_helper( if let Some(adjusted) = pushdown_requirement_to_children(&child.plan, &required_ordering)? { - let fetch = child.plan.fetch(); + let fetch = sort_fetch.or_else(|| child.plan.fetch()); for (grand_child, order) in child.children.iter_mut().zip(adjusted) { grand_child.data = ParentRequirements { ordering_requirement: order, diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt index 1dbce79e0f1a..57a4dd95f522 100644 --- a/datafusion/sqllogictest/test_files/topk.slt +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -49,7 +49,12 @@ select * from topk order by x desc limit 3; 8 5 - +query I +select * from (select * from topk limit 8) order by x limit 3; +---- +0 +1 +2 statement ok