Skip to content

feat: Use NestedLoopJoin instead of HashJoin/SortMergeJoin for small tables #16450

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,10 @@ config_namespace! {
/// process to reorder the join keys
pub top_down_join_key_reordering: bool, default = true

/// This is the maximum number of rows that either side of a table must have for Datafusion to
/// choose to use a `NestedLoopJoin` over a `SortMergeJoin` or `HashJoin` for equijoin conditions.
pub nested_loop_equijoin_threshold: usize, default = 5

/// When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin.
/// HashJoin can work more efficiently than SortMergeJoin but consumes more memory
pub prefer_hash_join: bool, default = true
Expand Down
241 changes: 159 additions & 82 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ use datafusion_expr::expr::{
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
Analyze, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension, FetchType,
Filter, JoinType, RecursiveQuery, SkipType, StringifiedPlan, WindowFrame,
WindowFrameBound, WriteOp,
binary, Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat,
Extension, FetchType, Filter, JoinType, Operator, RecursiveQuery, SkipType,
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
};

use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::{
Expand All @@ -90,6 +91,8 @@ use datafusion_physical_expr::{
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::execution_plan::InvariantLevel;
use datafusion_physical_plan::joins::utils::JoinFilter;
use datafusion_physical_plan::joins::JoinOn;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
use datafusion_physical_plan::unnest::ListUnnest;
Expand Down Expand Up @@ -1009,95 +1012,99 @@ impl DefaultPhysicalPlanner {
let left_df_schema = left.schema();
let right_df_schema = right.schema();
let execution_props = session_state.execution_props();
let join_on = keys
.iter()
.map(|(l, r)| {
let l = create_physical_expr(l, left_df_schema, execution_props)?;
let r =
create_physical_expr(r, right_df_schema, execution_props)?;
Ok((l, r))
})
.collect::<Result<join_utils::JoinOn>>()?;

let join_filter = match filter {
Some(expr) => {
// Extract columns from filter expression and saved in a HashSet
let cols = expr.column_refs();

// Collect left & right field indices, the field indices are sorted in ascending order
let left_field_indices = cols
.iter()
.filter_map(|c| left_df_schema.index_of_column(c).ok())
.sorted()
.collect::<Vec<_>>();
let right_field_indices = cols
.iter()
.filter_map(|c| right_df_schema.index_of_column(c).ok())
.sorted()
.collect::<Vec<_>>();
// We declare a threshold here of 5 rows as NestedLoopJoins tend to better when one
// of the tables are small.
let threshold = session_state
.config_options()
.optimizer
.nested_loop_equijoin_threshold;
let left_rows = *physical_left
// We set the partition to None here to draw the num_rows from the plan
.partition_statistics(None)?
.num_rows
.get_value()
.unwrap()
<= threshold;
let right_rows = *physical_right
.partition_statistics(None)?
.num_rows
.get_value()
.unwrap()
<= threshold;
let use_nested_loop_join_equijoin = left_rows || right_rows;

// If we can use a nested loop join then `join_on` will be empty because
// the expressions are moved into the join filter.
let join_on: JoinOn = if use_nested_loop_join_equijoin {
Vec::new()
} else {
keys.iter()
.map(|(l, r)| {
let l =
create_physical_expr(l, left_df_schema, execution_props)?;
let r = create_physical_expr(
r,
right_df_schema,
execution_props,
)?;
Ok((l, r))
})
.collect::<Result<_>>()?
};

// Collect DFFields and Fields required for intermediate schemas
let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) =
left_field_indices
.clone()
.into_iter()
.map(|i| {
(
left_df_schema.qualified_field(i),
physical_left.schema().field(i).clone(),
)
// If we can use nested loop join then we will combine the expressions in `join_on`
// and pass it into the join filter; create your join filters normally otherwise.
let join_filter: Option<JoinFilter> = if use_nested_loop_join_equijoin {
let combined_join_on_expression: Expr = filter
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that when we combine the JoinOn expressions here it will cause an error when both sides in the expression have the same unqualified name leading to duplicate unqualified fields. Is there a function that is able to qualify it with the schema, I can't seem to find one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, this is originally not a problem when the expression is in JoinOn as each unqualified column is referring to their own table before being combined into one expression.

.as_ref()
.map(|expr| {
// Iterates though all `join_on` expressions, constructs a `BinaryExpr`
// and combines them together.
keys.iter()
.fold(expr.clone(), |acc, (l, r)| {
acc.and(l.clone().eq(r.clone()))
})
.chain(right_field_indices.clone().into_iter().map(|i| {
(
right_df_schema.qualified_field(i),
physical_right.schema().field(i).clone(),
)
}))
.unzip();
let filter_df_fields = filter_df_fields
.into_iter()
.map(|(qualifier, field)| {
(qualifier.cloned(), Arc::new(field.clone()))
})
.collect();

let metadata: HashMap<_, _> = left_df_schema
.metadata()
.clone()
.into_iter()
.chain(right_df_schema.metadata().clone())
.collect();

// Construct intermediate schemas used for filtering data and
// convert logical expression to physical according to filter schema
let filter_df_schema = DFSchema::new_with_metadata(
filter_df_fields,
metadata.clone(),
)?;
let filter_schema =
Schema::new_with_metadata(filter_fields, metadata);
let filter_expr = create_physical_expr(
expr,
&filter_df_schema,
session_state.execution_props(),
)?;
let column_indices = join_utils::JoinFilter::build_column_indices(
left_field_indices,
right_field_indices,
);
.clone()
})
.unwrap();

Some(join_utils::JoinFilter::new(
filter_expr,
column_indices,
Arc::new(filter_schema),
))
// Combines the original join filter with the combined `on` expressions
let combined_filter: Expr = match &filter {
Some(expr) => combined_join_on_expression.and(expr.clone()),
None => combined_join_on_expression,
};

Some(build_join_filter(
&combined_filter,
left_df_schema,
right_df_schema,
&physical_left,
&physical_right,
&session_state,
)?)
} else {
match filter {
Some(ref expr) => {
let jf = build_join_filter(
expr,
left_df_schema,
right_df_schema,
&physical_left,
&physical_right,
&session_state,
)?;
Some(jf)
}
None => None,
}
_ => None,
};

let prefer_hash_join =
session_state.config_options().optimizer.prefer_hash_join;

// We do not need to check if we use `NestedLoopJoin` for equijoin condition
// because `join_on` would be empty + `join_filter.is_some() == true`.
let join: Arc<dyn ExecutionPlan> = if join_on.is_empty() {
if join_filter.is_none() && matches!(join_type, JoinType::Inner) {
// cross join if there is no join conditions and no join filter set
Expand Down Expand Up @@ -1304,6 +1311,76 @@ impl DefaultPhysicalPlanner {
}
}

// Helper function for constructing `JoinFilter`
fn build_join_filter(
expr: &Expr,
left_df_schema: &Arc<DFSchema>,
right_df_schema: &Arc<DFSchema>,
physical_left: &Arc<dyn ExecutionPlan>,
physical_right: &Arc<dyn ExecutionPlan>,
session_state: &SessionState,
) -> Result<JoinFilter> {
// Extract columns from filter expression and saved in a HashSet
let cols = expr.column_refs();

// Collect left & right field indices, the field indices are sorted in ascending order
let left_field_indices = cols
.iter()
.filter_map(|c| left_df_schema.index_of_column(c).ok())
.sorted()
.collect::<Vec<_>>();
let right_field_indices = cols
.iter()
.filter_map(|c| right_df_schema.index_of_column(c).ok())
.sorted()
.collect::<Vec<_>>();

// Collect DFFields and Fields required for intermediate schemas
let (filter_df_fields, filter_fields): (Vec<_>, Vec<_>) = left_field_indices
.clone()
.into_iter()
.map(|i| {
(
left_df_schema.qualified_field(i),
physical_left.schema().field(i).clone(),
)
})
.chain(right_field_indices.clone().into_iter().map(|i| {
(
right_df_schema.qualified_field(i),
physical_right.schema().field(i).clone(),
)
}))
.unzip();
let filter_df_fields = filter_df_fields
.into_iter()
.map(|(qualifier, field)| (qualifier.cloned(), Arc::new(field.clone())))
.collect();

let metadata: HashMap<_, _> = left_df_schema
.metadata()
.clone()
.into_iter()
.chain(right_df_schema.metadata().clone())
.collect();

// Construct intermediate schemas used for filtering data and
// convert logical expression to physical according to filter schema
let filter_df_schema =
DFSchema::new_with_metadata(filter_df_fields, metadata.clone()).unwrap();
let filter_schema = Schema::new_with_metadata(filter_fields, metadata);
let filter_expr =
create_physical_expr(&expr, &filter_df_schema, session_state.execution_props())
.unwrap();
let column_indices =
JoinFilter::build_column_indices(left_field_indices, right_field_indices);
Ok(JoinFilter::new(
filter_expr,
column_indices,
Arc::new(filter_schema),
))
}

/// Expand and align a GROUPING SET expression.
/// (see <https://www.postgresql.org/docs/current/queries-table-expressions.html#QUERIES-GROUPING-SETS>)
///
Expand Down
Loading
Loading