Skip to content

Commit

Permalink
Remove partition_mode parameter from HashJoinExec::swap_inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Dec 29, 2024
1 parent fce3fb3 commit 0dbeda0
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 14 deletions.
18 changes: 9 additions & 9 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,9 @@ pub(crate) fn swap_join_type(join_type: JoinType) -> JoinType {
#[deprecated(since = "45.0.0", note = "use HashJoinExec::swap_inputs instead")]
pub fn swap_hash_join(
hash_join: &HashJoinExec,
partition_mode: PartitionMode,
_partition_mode: PartitionMode,
) -> Result<Arc<dyn ExecutionPlan>> {
hash_join.swap_inputs(partition_mode)
hash_join.swap_inputs()
}

/// Swaps inputs of `NestedLoopJoinExec` and wraps it into `ProjectionExec` is required
Expand Down Expand Up @@ -235,7 +235,7 @@ pub(crate) fn try_collect_left(
if hash_join.join_type().supports_swap()
&& should_swap_join_order(&**left, &**right)?
{
Ok(Some(hash_join.swap_inputs(PartitionMode::CollectLeft)?))
Ok(Some(hash_join.swap_inputs()?))
} else {
Ok(Some(Arc::new(HashJoinExec::try_new(
Arc::clone(left),
Expand All @@ -261,7 +261,7 @@ pub(crate) fn try_collect_left(
)?))),
(false, true) => {
if hash_join.join_type().supports_swap() {
hash_join.swap_inputs(PartitionMode::CollectLeft).map(Some)
hash_join.swap_inputs().map(Some)
} else {
Ok(None)
}
Expand All @@ -282,7 +282,7 @@ pub(crate) fn partitioned_hash_join(
let right = hash_join.right();
if hash_join.join_type().supports_swap() && should_swap_join_order(&**left, &**right)?
{
hash_join.swap_inputs(PartitionMode::Partitioned)
hash_join.swap_inputs()
} else {
Ok(Arc::new(HashJoinExec::try_new(
Arc::clone(left),
Expand Down Expand Up @@ -329,7 +329,7 @@ fn statistical_join_selection_subrule(
&& should_swap_join_order(&**left, &**right)?
{
hash_join
.swap_inputs(PartitionMode::Partitioned)
.swap_inputs()
.map(Some)?
} else {
None
Expand Down Expand Up @@ -563,10 +563,10 @@ fn swap_join_according_to_unboundedness(
JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | JoinType::Full,
) => internal_err!("{join_type} join cannot be swapped for unbounded input."),
(PartitionMode::Partitioned, _) => {
hash_join.swap_inputs(PartitionMode::Partitioned)
hash_join.swap_inputs()
}
(PartitionMode::CollectLeft, _) => {
hash_join.swap_inputs(PartitionMode::CollectLeft)
hash_join.swap_inputs()
}
(PartitionMode::Auto, _) => {
internal_err!("Auto is not acceptable for unbounded input here.")
Expand Down Expand Up @@ -1221,7 +1221,7 @@ mod tests_statistical {
)?);

let swapped = join
.swap_inputs(PartitionMode::Partitioned)
.swap_inputs()
.expect("swap_hash_join must support joins with projections");
let swapped_join = swapped.as_any().downcast_ref::<HashJoinExec>().expect(
"ProjectionExec won't be added above if HashJoinExec contains embedded projection",
Expand Down
7 changes: 2 additions & 5 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,10 +576,7 @@ impl HashJoinExec {
///
/// This function is public so other downstream projects can use it to
/// construct `HashJoinExec` with right side as the build side.
pub fn swap_inputs(
&self,
partition_mode: PartitionMode,
) -> Result<Arc<dyn ExecutionPlan>> {
pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>> {
let left = self.left();
let right = self.right();
let new_join = HashJoinExec::try_new(
Expand All @@ -597,7 +594,7 @@ impl HashJoinExec {
self.projection.as_ref(),
self.join_type(),
),
partition_mode,
*self.partition_mode(),
self.null_equals_null(),
)?;
// In case of anti / semi joins or if there is embedded projection in HashJoinExec, output column order is preserved, no need to add projection again
Expand Down

0 comments on commit 0dbeda0

Please sign in to comment.