Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: recursive cte hangs on joins #9687

Merged
merged 3 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions datafusion/physical-plan/src/recursive_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,10 +309,9 @@ impl RecursiveQueryStream {
// Downstream plans should not expect any partitioning.
let partition = 0;

self.recursive_stream = Some(
self.recursive_term
.execute(partition, self.task_context.clone())?,
);
let recursive_plan = reset_plan_states(self.recursive_term.clone())?;
self.recursive_stream =
Some(recursive_plan.execute(partition, self.task_context.clone())?);
self.poll_next(cx)
}
}
Expand Down Expand Up @@ -343,6 +342,25 @@ fn assign_work_table(
.data()
}

/// Some plans will change their internal states after execution, making them unable to be executed again.
/// This function uses `ExecutionPlan::with_new_children` to fork a new plan with initial states.
///
/// An example is `CrossJoinExec`, which loads the left table into memory and stores it in the plan.
/// However, if the data of the left table is derived from the work table, it will become outdated
/// as the work table changes. When the next iteration executes this plan again, we must clear the left table.
fn reset_plan_states(plan: Arc<dyn ExecutionPlan>) -> Result<Arc<dyn ExecutionPlan>> {
plan.transform_up(&|plan| {
// WorkTableExec's states have already been updated correctly.
if plan.as_any().is::<WorkTableExec>() {
Ok(Transformed::no(plan))
} else {
let new_plan = plan.clone().with_new_children(plan.children())?;
Ok(Transformed::yes(new_plan))
}
})
.data()
}

impl Stream for RecursiveQueryStream {
type Item = Result<RecordBatch>;

Expand Down
73 changes: 68 additions & 5 deletions datafusion/sqllogictest/test_files/cte.slt
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,6 @@ ProjectionExec: expr=[1 as a, 2 as b, 3 as c]
--PlaceholderRowExec



# enable recursive CTEs
statement ok
set datafusion.execution.enable_recursive_ctes = true;

# trivial recursive CTE works
query I rowsort
WITH RECURSIVE nodes AS (
Expand Down Expand Up @@ -651,3 +646,71 @@ WITH RECURSIVE my_cte AS (
WHERE my_cte.a<5
)
SELECT a FROM my_cte;


# Test issue: https://github.com/apache/arrow-datafusion/issues/9680
query I
WITH RECURSIVE recursive_cte AS (
SELECT 1 as val
UNION ALL
(
WITH sub_cte AS (
SELECT 2 as val
)
SELECT
2 as val
FROM recursive_cte
CROSS JOIN sub_cte
WHERE recursive_cte.val < 2
)
)
SELECT * FROM recursive_cte;
----
1
2

# Test issue: https://github.com/apache/arrow-datafusion/issues/9680
# 'recursive_cte' should be on the left of the cross join, as this is the test purpose of the above query.
query TT
explain WITH RECURSIVE recursive_cte AS (
SELECT 1 as val
UNION ALL
(
WITH sub_cte AS (
SELECT 2 as val
)
SELECT
2 as val
FROM recursive_cte
CROSS JOIN sub_cte
WHERE recursive_cte.val < 2
)
)
SELECT * FROM recursive_cte;
----
logical_plan
Projection: recursive_cte.val
--SubqueryAlias: recursive_cte
----RecursiveQuery: is_distinct=false
------Projection: Int64(1) AS val
--------EmptyRelation
------Projection: Int64(2) AS val
--------CrossJoin:
----------Filter: recursive_cte.val < Int64(2)
------------TableScan: recursive_cte
----------SubqueryAlias: sub_cte
------------Projection: Int64(2) AS val
--------------EmptyRelation
physical_plan
RecursiveQueryExec: name=recursive_cte, is_distinct=false
--ProjectionExec: expr=[1 as val]
----PlaceholderRowExec
--ProjectionExec: expr=[2 as val]
----CrossJoinExec
------CoalescePartitionsExec
--------CoalesceBatchesExec: target_batch_size=8182
----------FilterExec: val@0 < 2
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------WorkTableExec: name=recursive_cte
------ProjectionExec: expr=[2 as val]
--------PlaceholderRowExec
Loading