diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 68abc9653a8b..140820ff782a 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -309,10 +309,9 @@ impl RecursiveQueryStream { // Downstream plans should not expect any partitioning. let partition = 0; - self.recursive_stream = Some( - self.recursive_term - .execute(partition, self.task_context.clone())?, - ); + let recursive_plan = reset_plan_states(self.recursive_term.clone())?; + self.recursive_stream = + Some(recursive_plan.execute(partition, self.task_context.clone())?); self.poll_next(cx) } } @@ -343,6 +342,25 @@ fn assign_work_table( .data() } +/// Some plans will change their internal states after execution, making them unable to be executed again. +/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states. +/// +/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan. +/// However, if the data of the left table is derived from the work table, it will become outdated +/// as the work table changes. When the next iteration executes this plan again, we must clear the left table. +fn reset_plan_states(plan: Arc) -> Result> { + plan.transform_up(&|plan| { + // WorkTableExec's states have already been updated correctly. + if plan.as_any().is::() { + Ok(Transformed::no(plan)) + } else { + let new_plan = plan.clone().with_new_children(plan.children())?; + Ok(Transformed::yes(new_plan)) + } + }) + .data() +} + impl Stream for RecursiveQueryStream { type Item = Result; diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index 6b9db5589391..50c88e41959f 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -40,11 +40,6 @@ ProjectionExec: expr=[1 as a, 2 as b, 3 as c] --PlaceholderRowExec - -# enable recursive CTEs -statement ok -set datafusion.execution.enable_recursive_ctes = true; - # trivial recursive CTE works query I rowsort WITH RECURSIVE nodes AS ( @@ -651,3 +646,71 @@ WITH RECURSIVE my_cte AS ( WHERE my_cte.a<5 ) SELECT a FROM my_cte; + + +# Test issue: https://github.com/apache/arrow-datafusion/issues/9680 +query I +WITH RECURSIVE recursive_cte AS ( + SELECT 1 as val + UNION ALL + ( + WITH sub_cte AS ( + SELECT 2 as val + ) + SELECT + 2 as val + FROM recursive_cte + CROSS JOIN sub_cte + WHERE recursive_cte.val < 2 + ) +) +SELECT * FROM recursive_cte; +---- +1 +2 + +# Test issue: https://github.com/apache/arrow-datafusion/issues/9680 +# 'recursive_cte' should be on the left of the cross join, as this is the test purpose of the above query. +query TT +explain WITH RECURSIVE recursive_cte AS ( + SELECT 1 as val + UNION ALL + ( + WITH sub_cte AS ( + SELECT 2 as val + ) + SELECT + 2 as val + FROM recursive_cte + CROSS JOIN sub_cte + WHERE recursive_cte.val < 2 + ) +) +SELECT * FROM recursive_cte; +---- +logical_plan +Projection: recursive_cte.val +--SubqueryAlias: recursive_cte +----RecursiveQuery: is_distinct=false +------Projection: Int64(1) AS val +--------EmptyRelation +------Projection: Int64(2) AS val +--------CrossJoin: +----------Filter: recursive_cte.val < Int64(2) +------------TableScan: recursive_cte +----------SubqueryAlias: sub_cte +------------Projection: Int64(2) AS val +--------------EmptyRelation +physical_plan +RecursiveQueryExec: name=recursive_cte, is_distinct=false +--ProjectionExec: expr=[1 as val] +----PlaceholderRowExec +--ProjectionExec: expr=[2 as val] +----CrossJoinExec +------CoalescePartitionsExec +--------CoalesceBatchesExec: target_batch_size=8182 +----------FilterExec: val@0 < 2 +------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 +--------------WorkTableExec: name=recursive_cte +------ProjectionExec: expr=[2 as val] +--------PlaceholderRowExec