diff --git a/velox/core/PlanNode.cpp b/velox/core/PlanNode.cpp index 7bc126068166..5d48bb3b9217 100644 --- a/velox/core/PlanNode.cpp +++ b/velox/core/PlanNode.cpp @@ -1093,10 +1093,50 @@ PlanNodePtr HashJoinNode::create(const folly::dynamic& obj, void* context) { outputType); } +MergeJoinNode::MergeJoinNode( + const PlanNodeId& id, + JoinType joinType, + const std::vector& leftKeys, + const std::vector& rightKeys, + TypedExprPtr filter, + PlanNodePtr left, + PlanNodePtr right, + RowTypePtr outputType) + : AbstractJoinNode( + id, + joinType, + leftKeys, + rightKeys, + std::move(filter), + std::move(left), + std::move(right), + std::move(outputType)) { + VELOX_USER_CHECK( + isSupported(joinType_), + "The join type is not supported by merge join: ", + joinTypeName(joinType_)); +} + folly::dynamic MergeJoinNode::serialize() const { return serializeBase(); } +// static +bool MergeJoinNode::isSupported(core::JoinType joinType) { + switch (joinType) { + case core::JoinType::kInner: + case core::JoinType::kLeft: + case core::JoinType::kRight: + case core::JoinType::kLeftSemiFilter: + case core::JoinType::kRightSemiFilter: + case core::JoinType::kAnti: + return true; + + default: + return false; + } +} + // static PlanNodePtr MergeJoinNode::create(const folly::dynamic& obj, void* context) { auto sources = deserializeSources(obj, context); @@ -1136,9 +1176,8 @@ NestedLoopJoinNode::NestedLoopJoinNode( sources_({std::move(left), std::move(right)}), outputType_(std::move(outputType)) { VELOX_USER_CHECK( - core::isInnerJoin(joinType_) || core::isLeftJoin(joinType_) || - core::isRightJoin(joinType_) || core::isFullJoin(joinType_), - "{} unsupported, NestedLoopJoin only supports inner and outer join", + isSupported(joinType_), + "The join type is not supported by nested loop join: ", joinTypeName(joinType_)); auto leftType = sources_[0]->outputType(); @@ -1170,6 +1209,20 @@ NestedLoopJoinNode::NestedLoopJoinNode( right, outputType) {} +// static +bool NestedLoopJoinNode::isSupported(core::JoinType joinType) { + switch (joinType) { + case core::JoinType::kInner: + case core::JoinType::kLeft: + case core::JoinType::kRight: + case core::JoinType::kFull: + return true; + + default: + return false; + } +} + void NestedLoopJoinNode::addDetails(std::stringstream& stream) const { stream << joinTypeName(joinType_); if (joinCondition_) { diff --git a/velox/core/PlanNode.h b/velox/core/PlanNode.h index b0aea5aa9330..ffe37eb8dbe9 100644 --- a/velox/core/PlanNode.h +++ b/velox/core/PlanNode.h @@ -27,9 +27,7 @@ namespace facebook::velox::core { typedef std::string PlanNodeId; -/** - * Generic representation of InsertTable - */ +/// Generic representation of InsertTable struct InsertTableHandle { public: InsertTableHandle( @@ -1643,16 +1641,7 @@ class MergeJoinNode : public AbstractJoinNode { TypedExprPtr filter, PlanNodePtr left, PlanNodePtr right, - RowTypePtr outputType) - : AbstractJoinNode( - id, - joinType, - leftKeys, - rightKeys, - std::move(filter), - std::move(left), - std::move(right), - std::move(outputType)) {} + RowTypePtr outputType); std::string_view name() const override { return "MergeJoin"; @@ -1660,6 +1649,9 @@ class MergeJoinNode : public AbstractJoinNode { folly::dynamic serialize() const override; + /// If merge join supports this join type. + static bool isSupported(core::JoinType joinType); + static PlanNodePtr create(const folly::dynamic& obj, void* context); }; @@ -1667,7 +1659,7 @@ class MergeJoinNode : public AbstractJoinNode { /// exec::NestedLoopJoinProbe and exec::NestedLoopJoinBuild. A separate pipeline /// is produced for the build side when generating exec::Operators. /// -/// Nested loop join supports both equal and non-equal joins. Expressions +/// Nested loop join (NLJ) supports both equal and non-equal joins. Expressions /// specified in joinCondition are evaluated on every combination of left/right /// tuple, to emit result. Results are emitted following the same input order of /// probe rows for inner and left joins, for each thread of execution. @@ -1712,6 +1704,9 @@ class NestedLoopJoinNode : public PlanNode { folly::dynamic serialize() const override; + /// If nested loop join supports this join type. + static bool isSupported(core::JoinType joinType); + static PlanNodePtr create(const folly::dynamic& obj, void* context); private: diff --git a/velox/exec/MergeJoin.cpp b/velox/exec/MergeJoin.cpp index ddd6acdb41ba..194be412df52 100644 --- a/velox/exec/MergeJoin.cpp +++ b/velox/exec/MergeJoin.cpp @@ -35,27 +35,11 @@ MergeJoin::MergeJoin( numKeys_{joinNode->leftKeys().size()}, joinNode_(joinNode) { VELOX_USER_CHECK( - isSupported(joinNode_->joinType()), + core::MergeJoinNode::isSupported(joinNode_->joinType()), "The join type is not supported by merge join: ", joinTypeName(joinNode_->joinType())); } -// static -bool MergeJoin::isSupported(core::JoinType joinType) { - switch (joinType) { - case core::JoinType::kInner: - case core::JoinType::kLeft: - case core::JoinType::kRight: - case core::JoinType::kLeftSemiFilter: - case core::JoinType::kRightSemiFilter: - case core::JoinType::kAnti: - return true; - - default: - return false; - } -} - void MergeJoin::initialize() { Operator::initialize(); VELOX_CHECK_NOT_NULL(joinNode_); diff --git a/velox/exec/MergeJoin.h b/velox/exec/MergeJoin.h index 08021c70f8a0..42222f83ae2e 100644 --- a/velox/exec/MergeJoin.h +++ b/velox/exec/MergeJoin.h @@ -68,9 +68,6 @@ class MergeJoin : public Operator { Operator::close(); } - /// If merge join supports this join type. - static bool isSupported(core::JoinType joinType); - private: // Sets up 'filter_' and related member variables. void initializeFilter( diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 13c9829a0b8d..cb98ce6adf70 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -19,7 +19,6 @@ #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" #include "velox/connectors/hive/PartitionIdGenerator.h" -#include "velox/exec/MergeJoin.h" #include "velox/exec/OperatorUtils.h" #include "velox/exec/fuzzer/FuzzerUtil.h" #include "velox/exec/fuzzer/ReferenceQueryRunner.h" @@ -148,7 +147,7 @@ class JoinFuzzer { const std::vector& outputColumns); // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. - // If withFilter is true, uses the equiality filter between probeKeys and + // If withFilter is true, uses the equality filter between probeKeys and // buildKeys as the join filter. Uses empty join filter otherwise. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan( core::JoinType joinType, @@ -860,7 +859,7 @@ void JoinFuzzer::makeAlternativePlans( .planNode()}); // Use OrderBy + MergeJoin - if (exec::MergeJoin::isSupported(joinNode->joinType())) { + if (core::MergeJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeMergeJoinPlan( joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns); plans.push_back(planWithSplits); @@ -869,8 +868,7 @@ void JoinFuzzer::makeAlternativePlans( } // Use NestedLoopJoin. - if (joinNode->isInnerJoin() || joinNode->isLeftJoin() || - joinNode->isFullJoin()) { + if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeNestedLoopJoinPlan( joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns); plans.push_back(planWithSplits); @@ -1285,7 +1283,7 @@ void JoinFuzzer::addPlansWithTableScan( } // Add ungrouped MergeJoin with TableScan. - if (joinNode->isInnerJoin() || joinNode->isLeftJoin()) { + if (core::MergeJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeMergeJoinPlanWithTableScan( joinType, probeType, @@ -1307,8 +1305,7 @@ void JoinFuzzer::addPlansWithTableScan( } // Add ungrouped NestedLoopJoin with TableScan. - if (joinNode->isInnerJoin() || joinNode->isLeftJoin() || - joinNode->isFullJoin()) { + if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeNestedLoopJoinPlanWithTableScan( joinType, probeType,