diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b7aca9e2707e..abed143eb545 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -705,6 +705,10 @@ config_namespace! { /// process to reorder the join keys pub top_down_join_key_reordering: bool, default = true + /// This is the maximum number of rows that either side of a table must have for Datafusion to + /// choose to use a `NestedLoopJoin` over a `SortMergeJoin` or `HashJoin` for equijoin conditions. + pub nested_loop_equijoin_threshold: usize, default = 5 + /// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. /// HashJoin can work more efficiently than SortMergeJoin but consumes more memory pub prefer_hash_join: bool, default = true diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 14188f6bf0c9..38d7b9f9f47f 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -78,10 +78,11 @@ use datafusion_expr::expr::{ use datafusion_expr::expr_rewriter::unnormalize_cols; use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary; use datafusion_expr::{ - Analyze, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension, FetchType, - Filter, JoinType, RecursiveQuery, SkipType, StringifiedPlan, WindowFrame, - WindowFrameBound, WriteOp, + binary, Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, + Extension, FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType, + StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, }; + use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::{ @@ -90,6 +91,8 @@ use datafusion_physical_expr::{ use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::execution_plan::InvariantLevel; +use datafusion_physical_plan::joins::utils::JoinFilter; +use datafusion_physical_plan::joins::JoinOn; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::recursive_query::RecursiveQueryExec; use datafusion_physical_plan::unnest::ListUnnest; @@ -1009,95 +1012,99 @@ impl DefaultPhysicalPlanner { let left_df_schema = left.schema(); let right_df_schema = right.schema(); let execution_props = session_state.execution_props(); - let join_on = keys - .iter() - .map(|(l, r)| { - let l = create_physical_expr(l, left_df_schema, execution_props)?; - let r = - create_physical_expr(r, right_df_schema, execution_props)?; - Ok((l, r)) - }) - .collect::>()?; - - let join_filter = match filter { - Some(expr) => { - // Extract columns from filter expression and saved in a HashSet - let cols = expr.column_refs(); - // Collect left & right field indices, the field indices are sorted in ascending order - let left_field_indices = cols - .iter() - .filter_map(|c| left_df_schema.index_of_column(c).ok()) - .sorted() - .collect::>(); - let right_field_indices = cols - .iter() - .filter_map(|c| right_df_schema.index_of_column(c).ok()) - .sorted() - .collect::>(); + // We declare a threshold here of 5 rows as NestedLoopJoins tend to better when one + // of the tables are small. + let threshold = session_state + .config_options() + .optimizer + .nested_loop_equijoin_threshold; + let left_rows = *physical_left + // We set the partition to None here to draw the num_rows from the plan + .partition_statistics(None)? + .num_rows + .get_value() + .unwrap() + <= threshold; + let right_rows = *physical_right + .partition_statistics(None)? + .num_rows + .get_value() + .unwrap() + <= threshold; + let use_nested_loop_join_equijoin = left_rows || right_rows; + + // If we can use a nested loop join then `join_on` will be empty because + // the expressions are moved into the join filter. + let join_on: JoinOn = if use_nested_loop_join_equijoin { + Vec::new() + } else { + keys.iter() + .map(|(l, r)| { + let l = + create_physical_expr(l, left_df_schema, execution_props)?; + let r = create_physical_expr( + r, + right_df_schema, + execution_props, + )?; + Ok((l, r)) + }) + .collect::>()? + }; - // Collect DFFields and Fields required for intermediate schemas - let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = - left_field_indices - .clone() - .into_iter() - .map(|i| { - ( - left_df_schema.qualified_field(i), - physical_left.schema().field(i).clone(), - ) + // If we can use nested loop join then we will combine the expressions in `join_on` + // and pass it into the join filter; create your join filters normally otherwise. + let join_filter: Option = if use_nested_loop_join_equijoin { + let combined_join_on_expression: Expr = filter + .as_ref() + .map(|expr| { + // Iterates though all `join_on` expressions, constructs a `BinaryExpr` + // and combines them together. + keys.iter() + .fold(expr.clone(), |acc, (l, r)| { + acc.and(l.clone().eq(r.clone())) }) - .chain(right_field_indices.clone().into_iter().map(|i| { - ( - right_df_schema.qualified_field(i), - physical_right.schema().field(i).clone(), - ) - })) - .unzip(); - let filter_df_fields = filter_df_fields - .into_iter() - .map(|(qualifier, field)| { - (qualifier.cloned(), Arc::new(field.clone())) - }) - .collect(); - - let metadata: HashMap<_, _> = left_df_schema - .metadata() - .clone() - .into_iter() - .chain(right_df_schema.metadata().clone()) - .collect(); - - // Construct intermediate schemas used for filtering data and - // convert logical expression to physical according to filter schema - let filter_df_schema = DFSchema::new_with_metadata( - filter_df_fields, - metadata.clone(), - )?; - let filter_schema = - Schema::new_with_metadata(filter_fields, metadata); - let filter_expr = create_physical_expr( - expr, - &filter_df_schema, - session_state.execution_props(), - )?; - let column_indices = join_utils::JoinFilter::build_column_indices( - left_field_indices, - right_field_indices, - ); + .clone() + }) + .unwrap(); - Some(join_utils::JoinFilter::new( - filter_expr, - column_indices, - Arc::new(filter_schema), - )) + // Combines the original join filter with the combined `on` expressions + let combined_filter: Expr = match &filter { + Some(expr) => combined_join_on_expression.and(expr.clone()), + None => combined_join_on_expression, + }; + + Some(build_join_filter( + &combined_filter, + left_df_schema, + right_df_schema, + &physical_left, + &physical_right, + &session_state, + )?) + } else { + match filter { + Some(ref expr) => { + let jf = build_join_filter( + expr, + left_df_schema, + right_df_schema, + &physical_left, + &physical_right, + &session_state, + )?; + Some(jf) + } + None => None, } - _ => None, }; let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join; + // We do not need to check if we use `NestedLoopJoin` for equijoin condition + // because `join_on` would be empty + `join_filter.is_some() == true`. let join: Arc = if join_on.is_empty() { if join_filter.is_none() && matches!(join_type, JoinType::Inner) { // cross join if there is no join conditions and no join filter set @@ -1304,6 +1311,76 @@ impl DefaultPhysicalPlanner { } } +// Helper function for constructing `JoinFilter` +fn build_join_filter( + expr: &Expr, + left_df_schema: &Arc, + right_df_schema: &Arc, + physical_left: &Arc, + physical_right: &Arc, + session_state: &SessionState, +) -> Result { + // Extract columns from filter expression and saved in a HashSet + let cols = expr.column_refs(); + + // Collect left & right field indices, the field indices are sorted in ascending order + let left_field_indices = cols + .iter() + .filter_map(|c| left_df_schema.index_of_column(c).ok()) + .sorted() + .collect::>(); + let right_field_indices = cols + .iter() + .filter_map(|c| right_df_schema.index_of_column(c).ok()) + .sorted() + .collect::>(); + + // Collect DFFields and Fields required for intermediate schemas + let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = left_field_indices + .clone() + .into_iter() + .map(|i| { + ( + left_df_schema.qualified_field(i), + physical_left.schema().field(i).clone(), + ) + }) + .chain(right_field_indices.clone().into_iter().map(|i| { + ( + right_df_schema.qualified_field(i), + physical_right.schema().field(i).clone(), + ) + })) + .unzip(); + let filter_df_fields = filter_df_fields + .into_iter() + .map(|(qualifier, field)| (qualifier.cloned(), Arc::new(field.clone()))) + .collect(); + + let metadata: HashMap<_, _> = left_df_schema + .metadata() + .clone() + .into_iter() + .chain(right_df_schema.metadata().clone()) + .collect(); + + // Construct intermediate schemas used for filtering data and + // convert logical expression to physical according to filter schema + let filter_df_schema = + DFSchema::new_with_metadata(filter_df_fields, metadata.clone()).unwrap(); + let filter_schema = Schema::new_with_metadata(filter_fields, metadata); + let filter_expr = + create_physical_expr(&expr, &filter_df_schema, session_state.execution_props()) + .unwrap(); + let column_indices = + JoinFilter::build_column_indices(left_field_indices, right_field_indices); + Ok(JoinFilter::new( + filter_expr, + column_indices, + Arc::new(filter_schema), + )) +} + /// Expand and align a GROUPING SET expression. /// (see ) /// diff --git a/datafusion/core/tests/dataframe/mod.rs b/datafusion/core/tests/dataframe/mod.rs index 8d60dbea3d01..1304e94bb93b 100644 --- a/datafusion/core/tests/dataframe/mod.rs +++ b/datafusion/core/tests/dataframe/mod.rs @@ -2633,23 +2633,22 @@ async fn test_count_wildcard_on_where_in() -> Result<()> { assert_snapshot!( pretty_format_batches(&sql_results).unwrap(), @r" - +---------------+------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+------------------------------------------------------------------------------------------------------------------------+ - | logical_plan | LeftSemi Join: CAST(t1.a AS Int64) = __correlated_sq_1.count(*) | - | | TableScan: t1 projection=[a, b] | - | | SubqueryAlias: __correlated_sq_1 | - | | Projection: count(Int64(1)) AS count(*) | - | | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] | - | | TableScan: t2 projection=[] | - | physical_plan | CoalesceBatchesExec: target_batch_size=8192 | - | | HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] | - | | ProjectionExec: expr=[4 as count(*)] | - | | PlaceholderRowExec | - | | ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0 AS Int64) as CAST(t1.a AS Int64)] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | | - +---------------+------------------------------------------------------------------------------------------------------------------------+ + +---------------+-----------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+-----------------------------------------------------------------------------------------------------------+ + | logical_plan | LeftSemi Join: CAST(t1.a AS Int64) = __correlated_sq_1.count(*) | + | | TableScan: t1 projection=[a, b] | + | | SubqueryAlias: __correlated_sq_1 | + | | Projection: count(Int64(1)) AS count(*) | + | | Aggregate: groupBy=[[]], aggr=[[count(Int64(1))]] | + | | TableScan: t2 projection=[] | + | physical_plan | NestedLoopJoinExec: join_type=RightSemi, filter=CAST(t1.a AS Int64)@0 = count(*)@1, projection=[a@0, b@1] | + | | ProjectionExec: expr=[4 as count(*)] | + | | PlaceholderRowExec | + | | ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0 AS Int64) as CAST(t1.a AS Int64)] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | | + +---------------+-----------------------------------------------------------------------------------------------------------+ " ); @@ -2679,22 +2678,21 @@ async fn test_count_wildcard_on_where_in() -> Result<()> { assert_snapshot!( pretty_format_batches(&df_results).unwrap(), @r" - +---------------+------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+------------------------------------------------------------------------------------------------------------------------+ - | logical_plan | LeftSemi Join: CAST(t1.a AS Int64) = __correlated_sq_1.count(*) | - | | TableScan: t1 projection=[a, b] | - | | SubqueryAlias: __correlated_sq_1 | - | | Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] | - | | TableScan: t2 projection=[] | - | physical_plan | CoalesceBatchesExec: target_batch_size=8192 | - | | HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(count(*)@0, CAST(t1.a AS Int64)@2)], projection=[a@0, b@1] | - | | ProjectionExec: expr=[4 as count(*)] | - | | PlaceholderRowExec | - | | ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0 AS Int64) as CAST(t1.a AS Int64)] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | | - +---------------+------------------------------------------------------------------------------------------------------------------------+ + +---------------+-----------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+-----------------------------------------------------------------------------------------------------------+ + | logical_plan | LeftSemi Join: CAST(t1.a AS Int64) = __correlated_sq_1.count(*) | + | | TableScan: t1 projection=[a, b] | + | | SubqueryAlias: __correlated_sq_1 | + | | Aggregate: groupBy=[[]], aggr=[[count(Int64(1)) AS count(*)]] | + | | TableScan: t2 projection=[] | + | physical_plan | NestedLoopJoinExec: join_type=RightSemi, filter=CAST(t1.a AS Int64)@0 = count(*)@1, projection=[a@0, b@1] | + | | ProjectionExec: expr=[4 as count(*)] | + | | PlaceholderRowExec | + | | ProjectionExec: expr=[a@0 as a, b@1 as b, CAST(a@0 AS Int64) as CAST(t1.a AS Int64)] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | | + +---------------+-----------------------------------------------------------------------------------------------------------+ " ); @@ -2910,32 +2908,31 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> { assert_snapshot!( pretty_format_batches(&sql_results).unwrap(), @r" - +---------------+---------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+---------------------------------------------------------------------------------------------------------------------------+ - | logical_plan | Projection: t1.a, t1.b | - | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | - | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | - | | Left Join: t1.a = __scalar_sq_1.a | - | | TableScan: t1 projection=[a, b] | - | | SubqueryAlias: __scalar_sq_1 | - | | Projection: count(Int64(1)) AS count(*), t2.a, Boolean(true) AS __always_true | - | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1))]] | - | | TableScan: t2 projection=[a] | - | physical_plan | CoalesceBatchesExec: target_batch_size=8192 | - | | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] | - | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 | - | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | - | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | | - +---------------+---------------------------------------------------------------------------------------------------------------------------+ + +---------------+------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+------------------------------------------------------------------------------------------------------------------+ + | logical_plan | Projection: t1.a, t1.b | + | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | + | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | + | | Left Join: t1.a = __scalar_sq_1.a | + | | TableScan: t1 projection=[a, b] | + | | SubqueryAlias: __scalar_sq_1 | + | | Projection: count(Int64(1)) AS count(*), t2.a, Boolean(true) AS __always_true | + | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1))]] | + | | TableScan: t2 projection=[a] | + | physical_plan | CoalesceBatchesExec: target_batch_size=8192 | + | | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | + | | NestedLoopJoinExec: join_type=Left, filter=a@0 = a@1, projection=[a@0, b@1, count(*)@2, __always_true@4] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | ProjectionExec: expr=[count(Int64(1))@1 as count(*), a@0 as a, true as __always_true] | + | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(Int64(1))] | + | | CoalesceBatchesExec: target_batch_size=8192 | + | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 | + | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | + | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(Int64(1))] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | | + +---------------+------------------------------------------------------------------------------------------------------------------+ " ); @@ -2967,32 +2964,31 @@ async fn test_count_wildcard_on_where_scalar_subquery() -> Result<()> { assert_snapshot!( pretty_format_batches(&df_results).unwrap(), @r" - +---------------+---------------------------------------------------------------------------------------------------------------------------+ - | plan_type | plan | - +---------------+---------------------------------------------------------------------------------------------------------------------------+ - | logical_plan | Projection: t1.a, t1.b | - | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | - | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | - | | Left Join: t1.a = __scalar_sq_1.a | - | | TableScan: t1 projection=[a, b] | - | | SubqueryAlias: __scalar_sq_1 | - | | Projection: count(*), t2.a, Boolean(true) AS __always_true | - | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1)) AS count(*)]] | - | | TableScan: t2 projection=[a] | - | physical_plan | CoalesceBatchesExec: target_batch_size=8192 | - | | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@1)], projection=[a@0, b@1, count(*)@2, __always_true@4] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] | - | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] | - | | CoalesceBatchesExec: target_batch_size=8192 | - | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 | - | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | - | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] | - | | DataSourceExec: partitions=1, partition_sizes=[1] | - | | | - +---------------+---------------------------------------------------------------------------------------------------------------------------+ + +---------------+------------------------------------------------------------------------------------------------------------------+ + | plan_type | plan | + +---------------+------------------------------------------------------------------------------------------------------------------+ + | logical_plan | Projection: t1.a, t1.b | + | | Filter: CASE WHEN __scalar_sq_1.__always_true IS NULL THEN Int64(0) ELSE __scalar_sq_1.count(*) END > Int64(0) | + | | Projection: t1.a, t1.b, __scalar_sq_1.count(*), __scalar_sq_1.__always_true | + | | Left Join: t1.a = __scalar_sq_1.a | + | | TableScan: t1 projection=[a, b] | + | | SubqueryAlias: __scalar_sq_1 | + | | Projection: count(*), t2.a, Boolean(true) AS __always_true | + | | Aggregate: groupBy=[[t2.a]], aggr=[[count(Int64(1)) AS count(*)]] | + | | TableScan: t2 projection=[a] | + | physical_plan | CoalesceBatchesExec: target_batch_size=8192 | + | | FilterExec: CASE WHEN __always_true@3 IS NULL THEN 0 ELSE count(*)@2 END > 0, projection=[a@0, b@1] | + | | NestedLoopJoinExec: join_type=Left, filter=a@0 = a@1, projection=[a@0, b@1, count(*)@2, __always_true@4] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | ProjectionExec: expr=[count(*)@1 as count(*), a@0 as a, true as __always_true] | + | | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[count(*)] | + | | CoalesceBatchesExec: target_batch_size=8192 | + | | RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4 | + | | RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 | + | | AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[count(*)] | + | | DataSourceExec: partitions=1, partition_sizes=[1] | + | | | + +---------------+------------------------------------------------------------------------------------------------------------------+ " );