diff --git a/README.md b/README.md index f199021d7d78..c2ede4833e9b 100644 --- a/README.md +++ b/README.md @@ -112,7 +112,8 @@ Default features: - `parquet`: support for reading the [Apache Parquet] format - `regex_expressions`: regular expression functions, such as `regexp_match` - `unicode_expressions`: Include unicode aware functions such as `character_length` -- `unparser` : enables support to reverse LogicalPlans back into SQL +- `unparser`: enables support to reverse LogicalPlans back into SQL +- `recursive-protection`: uses [recursive](https://docs.rs/recursive/latest/recursive/) for stack overflow protection. Optional features: diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 9549cfeeb3b8..9af27a90bc2a 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1544,7 +1544,6 @@ dependencies = [ "indexmap", "itertools", "log", - "recursive", "regex", "regex-syntax", ] diff --git a/datafusion/common/Cargo.toml b/datafusion/common/Cargo.toml index a81ec724dd66..918f0cd583f7 100644 --- a/datafusion/common/Cargo.toml +++ b/datafusion/common/Cargo.toml @@ -36,10 +36,12 @@ name = "datafusion_common" path = "src/lib.rs" [features] +default = ["recursive-protection"] avro = ["apache-avro"] backtrace = [] pyarrow = ["pyo3", "arrow/pyarrow", "parquet"] force_hash_collisions = [] +recursive-protection = ["dep:recursive"] [dependencies] ahash = { workspace = true } @@ -62,7 +64,7 @@ object_store = { workspace = true, optional = true } parquet = { workspace = true, optional = true, default-features = true } paste = "1.0.15" pyo3 = { version = "0.22.0", optional = true } -recursive = { workspace = true } +recursive = { workspace = true, optional = true } sqlparser = { workspace = true } tokio = { workspace = true } diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index 0c153583e34b..d92a2cc34b56 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -18,7 +18,6 @@ //! [`TreeNode`] for visiting and rewriting expression and plan trees use crate::Result; -use recursive::recursive; use std::collections::HashMap; use std::hash::Hash; use std::sync::Arc; @@ -125,7 +124,7 @@ pub trait TreeNode: Sized { /// TreeNodeVisitor::f_up(ChildNode2) /// TreeNodeVisitor::f_up(ParentNode) /// ``` - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn visit<'n, V: TreeNodeVisitor<'n, Node = Self>>( &'n self, visitor: &mut V, @@ -175,7 +174,7 @@ pub trait TreeNode: Sized { /// TreeNodeRewriter::f_up(ChildNode2) /// TreeNodeRewriter::f_up(ParentNode) /// ``` - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn rewrite>( self, rewriter: &mut R, @@ -198,7 +197,7 @@ pub trait TreeNode: Sized { &'n self, mut f: F, ) -> Result { - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn apply_impl<'n, N: TreeNode, F: FnMut(&'n N) -> Result>( node: &'n N, f: &mut F, @@ -233,7 +232,7 @@ pub trait TreeNode: Sized { self, mut f: F, ) -> Result> { - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn transform_down_impl Result>>( node: N, f: &mut F, @@ -257,7 +256,7 @@ pub trait TreeNode: Sized { self, mut f: F, ) -> Result> { - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn transform_up_impl Result>>( node: N, f: &mut F, @@ -372,7 +371,7 @@ pub trait TreeNode: Sized { mut f_down: FD, mut f_up: FU, ) -> Result> { - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn transform_down_up_impl< N: TreeNode, FD: FnMut(N) -> Result>, @@ -2350,6 +2349,7 @@ pub(crate) mod tests { Ok(()) } + #[cfg(feature = "recursive-protection")] #[test] fn test_large_tree() { let mut item = TestTreeNode::new_leaf("initial".to_string()); diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml index 2f41292f680f..403a80972c3b 100644 --- a/datafusion/expr/Cargo.toml +++ b/datafusion/expr/Cargo.toml @@ -36,6 +36,8 @@ name = "datafusion_expr" path = "src/lib.rs" [features] +default = ["recursive-protection"] +recursive-protection = ["dep:recursive"] [dependencies] arrow = { workspace = true } @@ -48,7 +50,7 @@ datafusion-functions-window-common = { workspace = true } datafusion-physical-expr-common = { workspace = true } indexmap = { workspace = true } paste = "^1.0" -recursive = { workspace = true } +recursive = { workspace = true, optional = true } serde_json = { workspace = true } sqlparser = { workspace = true } diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs index 3317deafbd6c..e61904e24918 100644 --- a/datafusion/expr/src/expr_schema.rs +++ b/datafusion/expr/src/expr_schema.rs @@ -32,7 +32,6 @@ use datafusion_common::{ TableReference, }; use datafusion_functions_window_common::field::WindowUDFFieldArgs; -use recursive::recursive; use std::collections::HashMap; use std::sync::Arc; @@ -100,7 +99,7 @@ impl ExprSchemable for Expr { /// expression refers to a column that does not exist in the /// schema, or when the expression is incorrectly typed /// (e.g. `[utf8] + [bool]`). - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn get_type(&self, schema: &dyn ExprSchema) -> Result { match self { Expr::Alias(Alias { expr, name, .. }) => match &**expr { diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 1539b69b4007..cdc95b84d837 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -45,7 +45,6 @@ use crate::{ UserDefinedLogicalNode, Values, Window, }; use datafusion_common::tree_node::TreeNodeRefContainer; -use recursive::recursive; use crate::expr::{Exists, InSubquery}; use datafusion_common::tree_node::{ @@ -669,7 +668,7 @@ impl LogicalPlan { /// Visits a plan similarly to [`Self::visit`], including subqueries that /// may appear in expressions such as `IN (SELECT ...)`. - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] pub fn visit_with_subqueries TreeNodeVisitor<'n, Node = Self>>( &self, visitor: &mut V, @@ -688,7 +687,7 @@ impl LogicalPlan { /// Similarly to [`Self::rewrite`], rewrites this node and its inputs using `f`, /// including subqueries that may appear in expressions such as `IN (SELECT /// ...)`. - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] pub fn rewrite_with_subqueries>( self, rewriter: &mut R, @@ -707,7 +706,7 @@ impl LogicalPlan { &self, mut f: F, ) -> Result { - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn apply_with_subqueries_impl< F: FnMut(&LogicalPlan) -> Result, >( @@ -742,7 +741,7 @@ impl LogicalPlan { self, mut f: F, ) -> Result> { - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn transform_down_with_subqueries_impl< F: FnMut(LogicalPlan) -> Result>, >( @@ -767,7 +766,7 @@ impl LogicalPlan { self, mut f: F, ) -> Result> { - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn transform_up_with_subqueries_impl< F: FnMut(LogicalPlan) -> Result>, >( @@ -795,7 +794,7 @@ impl LogicalPlan { mut f_down: FD, mut f_up: FU, ) -> Result> { - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn transform_down_up_with_subqueries_impl< FD: FnMut(LogicalPlan) -> Result>, FU: FnMut(LogicalPlan) -> Result>, diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 9979df689b0a..3032c67682b1 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -35,6 +35,10 @@ workspace = true name = "datafusion_optimizer" path = "src/lib.rs" +[features] +default = ["recursive-protection"] +recursive-protection = ["dep:recursive"] + [dependencies] arrow = { workspace = true } chrono = { workspace = true } @@ -44,7 +48,7 @@ datafusion-physical-expr = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true } log = { workspace = true } -recursive = { workspace = true } +recursive = { workspace = true, optional = true } regex = { workspace = true } regex-syntax = "0.8.0" diff --git a/datafusion/optimizer/src/analyzer/subquery.rs b/datafusion/optimizer/src/analyzer/subquery.rs index fee06eeb9f75..0d04efbcf36a 100644 --- a/datafusion/optimizer/src/analyzer/subquery.rs +++ b/datafusion/optimizer/src/analyzer/subquery.rs @@ -17,7 +17,6 @@ use crate::analyzer::check_plan; use crate::utils::collect_subquery_cols; -use recursive::recursive; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{plan_err, Result}; @@ -79,7 +78,7 @@ pub fn check_subquery_expr( match outer_plan { LogicalPlan::Projection(_) | LogicalPlan::Filter(_) => Ok(()), - LogicalPlan::Aggregate(Aggregate {group_expr, aggr_expr,..}) => { + LogicalPlan::Aggregate(Aggregate { group_expr, aggr_expr, .. }) => { if group_expr.contains(expr) && !aggr_expr.contains(expr) { // TODO revisit this validation logic plan_err!( @@ -88,7 +87,7 @@ pub fn check_subquery_expr( } else { Ok(()) } - }, + } _ => plan_err!( "Correlated scalar subquery can only be used in Projection, Filter, Aggregate plan nodes" ) @@ -129,7 +128,7 @@ fn check_correlations_in_subquery(inner_plan: &LogicalPlan) -> Result<()> { } // Recursively check the unsupported outer references in the sub query plan. -#[recursive] +#[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Result<()> { if !can_contain_outer_ref && inner_plan.contains_outer_reference() { return plan_err!("Accessing outer reference columns is not allowed in the plan"); diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index e7c9a198f3ad..ff75a6a60f4b 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -22,7 +22,6 @@ use std::fmt::Debug; use std::sync::Arc; use crate::{OptimizerConfig, OptimizerRule}; -use recursive::recursive; use crate::optimizer::ApplyOrder; use crate::utils::NamePreserver; @@ -532,7 +531,7 @@ impl OptimizerRule for CommonSubexprEliminate { None } - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn rewrite( &self, plan: LogicalPlan, @@ -952,7 +951,7 @@ mod test { )? .build()?; - let expected ="Aggregate: groupBy=[[]], aggr=[[avg(__common_expr_1) AS col1, my_agg(__common_expr_1) AS col2]]\ + let expected = "Aggregate: groupBy=[[]], aggr=[[avg(__common_expr_1) AS col1, my_agg(__common_expr_1) AS col2]]\ \n Projection: UInt32(1) + test.a AS __common_expr_1, test.a, test.b, test.c\ \n TableScan: test"; diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 32b7ce44a63a..9a47f437e444 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -17,7 +17,6 @@ //! [`EliminateCrossJoin`] converts `CROSS JOIN` to `INNER JOIN` if join predicates are available. use crate::{OptimizerConfig, OptimizerRule}; -use recursive::recursive; use std::sync::Arc; use crate::join_key_set::JoinKeySet; @@ -80,7 +79,7 @@ impl OptimizerRule for EliminateCrossJoin { true } - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn rewrite( &self, plan: LogicalPlan, @@ -651,7 +650,7 @@ mod tests { " Inner Join: t1.a = t2.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]", " Inner Join: t1.a = t3.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]", " TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]", - " TableScan: t3 [a:UInt32, b:UInt32, c:UInt32]", + " TableScan: t3 [a:UInt32, b:UInt32, c:UInt32]", " TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]" ]; @@ -1237,10 +1236,10 @@ mod tests { .build()?; let expected = vec![ - "Filter: t1.a + UInt32(100) = t2.a * UInt32(2) OR t2.b = t1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]", - " Cross Join: [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]", - " TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]", - " TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]", + "Filter: t1.a + UInt32(100) = t2.a * UInt32(2) OR t2.b = t1.a [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]", + " Cross Join: [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]", + " TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]", + " TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]", ]; assert_optimized_plan_eq(plan, expected); @@ -1293,10 +1292,10 @@ mod tests { .build()?; let expected = vec![ - "Filter: t2.c < UInt32(15) OR t2.c = UInt32(688) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]", - " Inner Join: t1.a + UInt32(100) = t2.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]", - " TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]", - " TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]", + "Filter: t2.c < UInt32(15) OR t2.c = UInt32(688) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]", + " Inner Join: t1.a + UInt32(100) = t2.a * UInt32(2) [a:UInt32, b:UInt32, c:UInt32, a:UInt32, b:UInt32, c:UInt32]", + " TableScan: t1 [a:UInt32, b:UInt32, c:UInt32]", + " TableScan: t2 [a:UInt32, b:UInt32, c:UInt32]", ]; assert_optimized_plan_eq(plan, expected); diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 1519c54dbf68..7c8e4120ea20 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -21,7 +21,6 @@ mod required_indices; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use recursive::recursive; use std::collections::HashSet; use std::sync::Arc; @@ -110,7 +109,7 @@ impl OptimizerRule for OptimizeProjections { /// columns. /// - `Ok(None)`: Signal that the given logical plan did not require any change. /// - `Err(error)`: An error occurred during the optimization process. -#[recursive] +#[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn optimize_projections( plan: LogicalPlan, config: &dyn OptimizerConfig, diff --git a/datafusion/physical-optimizer/Cargo.toml b/datafusion/physical-optimizer/Cargo.toml index 838617ae9889..c964ca47e6a0 100644 --- a/datafusion/physical-optimizer/Cargo.toml +++ b/datafusion/physical-optimizer/Cargo.toml @@ -31,6 +31,10 @@ rust-version = { workspace = true } [lints] workspace = true +[features] +default = ["recursive-protection"] +recursive-protection = ["dep:recursive"] + [dependencies] arrow = { workspace = true } datafusion-common = { workspace = true, default-features = true } @@ -40,7 +44,7 @@ datafusion-physical-expr = { workspace = true } datafusion-physical-plan = { workspace = true } itertools = { workspace = true } log = { workspace = true } -recursive = { workspace = true } +recursive = { workspace = true, optional = true } [dev-dependencies] datafusion-expr = { workspace = true } diff --git a/datafusion/physical-optimizer/src/aggregate_statistics.rs b/datafusion/physical-optimizer/src/aggregate_statistics.rs index 87077183110d..dffdc49adf09 100644 --- a/datafusion/physical-optimizer/src/aggregate_statistics.rs +++ b/datafusion/physical-optimizer/src/aggregate_statistics.rs @@ -25,7 +25,6 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::udaf::{AggregateFunctionExpr, StatisticsArgs}; use datafusion_physical_plan::{expressions, ExecutionPlan}; -use recursive::recursive; use std::sync::Arc; use crate::PhysicalOptimizerRule; @@ -42,7 +41,7 @@ impl AggregateStatistics { } impl PhysicalOptimizerRule for AggregateStatistics { - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn optimize( &self, plan: Arc, diff --git a/datafusion/sql/Cargo.toml b/datafusion/sql/Cargo.toml index e1e4d8df3d22..c6500e974206 100644 --- a/datafusion/sql/Cargo.toml +++ b/datafusion/sql/Cargo.toml @@ -36,9 +36,10 @@ name = "datafusion_sql" path = "src/lib.rs" [features] -default = ["unicode_expressions", "unparser"] +default = ["unicode_expressions", "unparser", "recursive-protection"] unicode_expressions = [] unparser = [] +recursive-protection = ["dep:recursive"] [dependencies] arrow = { workspace = true } @@ -49,7 +50,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } indexmap = { workspace = true } log = { workspace = true } -recursive = { workspace = true } +recursive = { workspace = true, optional = true } regex = { workspace = true } sqlparser = { workspace = true } diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index a651d8fa5d35..7c4d8dd21d66 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -20,7 +20,6 @@ use arrow_schema::TimeUnit; use datafusion_expr::planner::{ PlannerResult, RawBinaryExpr, RawDictionaryExpr, RawFieldAccessExpr, }; -use recursive::recursive; use sqlparser::ast::{ BinaryOperator, CastFormat, CastKind, DataType as SQLDataType, DictionaryField, Expr as SQLExpr, ExprWithAlias as SQLExprWithAlias, MapEntry, StructField, Subscript, @@ -197,7 +196,7 @@ impl SqlToRel<'_, S> { /// Internal implementation. Use /// [`Self::sql_expr_to_logical_expr`] to plan exprs. - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] fn sql_expr_to_logical_expr_internal( &self, sql: SQLExpr, diff --git a/datafusion/sql/src/set_expr.rs b/datafusion/sql/src/set_expr.rs index 3b1201d3dd59..d1569c81d350 100644 --- a/datafusion/sql/src/set_expr.rs +++ b/datafusion/sql/src/set_expr.rs @@ -18,11 +18,10 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use datafusion_common::{not_impl_err, Result}; use datafusion_expr::{LogicalPlan, LogicalPlanBuilder}; -use recursive::recursive; use sqlparser::ast::{SetExpr, SetOperator, SetQuantifier}; impl SqlToRel<'_, S> { - #[recursive] + #[cfg_attr(feature = "recursive-protection", recursive::recursive)] pub(super) fn set_expr_to_plan( &self, set_expr: SetExpr,