From 0e17f7dc0d97c9c7a05a901d5ad0f5ed02c090a9 Mon Sep 17 00:00:00 2001 From: Daniel Hunte Date: Fri, 13 Dec 2024 17:15:55 -0800 Subject: [PATCH] feat(fuzzer): Support Join Filters in Join Fuzzer (#11473) Summary: This changes adds the support for join filter 10% of the time. Currently it supports boolean and integer columns. Reviewed By: kagamiori Differential Revision: D65629460 --- velox/exec/fuzzer/JoinFuzzer.cpp | 145 ++++++++++++++++++++++--------- 1 file changed, 104 insertions(+), 41 deletions(-) diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 5218a9ff4db1e..333a79c24b746 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -61,6 +61,11 @@ DEFINE_bool( "up after failures. Therefore, results are not compared when this is " "enabled. Note that this option only works in debug builds."); +DEFINE_bool( + enable_filter, + false, + "Whether to test plans with filters enabled."); + namespace facebook::velox::exec::test { namespace { @@ -142,7 +147,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string& filter); JoinFuzzer::PlanWithSplits makeMergeJoinPlan( core::JoinType joinType, @@ -150,7 +156,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string& filter); // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. // If withFilter is true, uses the equality filter between probeKeys and @@ -162,7 +169,7 @@ class JoinFuzzer { const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - bool withFilter = true); + const std::string& filter); // Makes the default query plan with table scan as inputs for both probe and // build sides. @@ -175,7 +182,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string& filter); JoinFuzzer::PlanWithSplits makeMergeJoinPlanWithTableScan( core::JoinType joinType, @@ -185,7 +193,8 @@ class JoinFuzzer { const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns); + const std::vector& outputColumns, + const std::string& filter); // Returns a PlanWithSplits for NestedLoopJoin with inputs from TableScan // nodes. If withFilter is true, uses the equiality filter between probeKeys @@ -199,13 +208,14 @@ class JoinFuzzer { const std::vector& probeSplits, const std::vector& buildSplits, const std::vector& outputColumns, - bool withFilter = true); + const std::string& filter); void makeAlternativePlans( const core::PlanNodePtr& plan, const std::vector& probeInput, const std::vector& buildInput, - std::vector& plans); + std::vector& plans, + const std::string& filter); // Makes the query plan from 'planWithTableScan' with grouped execution mode. // Correspondingly, it replaces the table scan input splits with grouped ones. @@ -249,7 +259,8 @@ class JoinFuzzer { const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - std::vector& altPlans); + std::vector& altPlans, + const std::string& filter); // Splits the input into groups by partitioning on the join keys. std::vector> splitInputByGroup( @@ -597,6 +608,12 @@ std::optional tryFlipJoinType(core::JoinType joinType) { // Returns a plan with flipped join sides of the input hash join node. If the // join type doesn't allow flipping, returns a nullptr. core::PlanNodePtr tryFlipJoinSides(const core::HashJoinNode& joinNode) { + // Null-aware right semi project join doesn't support filter. + if (joinNode.filter() && + joinNode.joinType() == core::JoinType::kLeftSemiProject && + joinNode.isNullAware()) { + return nullptr; + } auto flippedJoinType = tryFlipJoinType(joinNode.joinType()); if (!flippedJoinType.has_value()) { return nullptr; @@ -688,7 +705,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); auto plan = PlanBuilder(planNodeIdGenerator) @@ -697,7 +715,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( probeKeys, buildKeys, PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - /*filter=*/"", + filter, outputColumns, joinType, nullAware) @@ -714,7 +732,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; @@ -728,7 +747,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( .tableScan(buildType) .capturePlanNodeId(buildScanId) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType, nullAware) @@ -819,7 +838,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); return JoinFuzzer::PlanWithSplits{PlanBuilder(planNodeIdGenerator) .values(probeInput) @@ -831,7 +851,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( .values(buildInput) .orderBy(buildKeys, false) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType) .planNode()}; @@ -844,10 +864,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - bool withFilter) { + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); - const std::string filter = - withFilter ? makeJoinFilter(probeKeys, buildKeys) : ""; return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .values(probeInput) @@ -863,7 +881,8 @@ void JoinFuzzer::makeAlternativePlans( const core::PlanNodePtr& plan, const std::vector& probeInput, const std::vector& buildInput, - std::vector& plans) { + std::vector& plans, + const std::string& filter) { auto joinNode = std::dynamic_pointer_cast(plan); VELOX_CHECK_NOT_NULL(joinNode); @@ -888,7 +907,7 @@ void JoinFuzzer::makeAlternativePlans( .localPartitionRoundRobin( makeSources(buildInput, planNodeIdGenerator)) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType, joinNode->isNullAware()) @@ -897,7 +916,13 @@ void JoinFuzzer::makeAlternativePlans( // Use OrderBy + MergeJoin if (core::MergeJoinNode::isSupported(joinNode->joinType())) { auto planWithSplits = makeMergeJoinPlan( - joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns); + joinType, + probeKeys, + buildKeys, + probeInput, + buildInput, + outputColumns, + filter); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits.plan, plans); @@ -905,8 +930,18 @@ void JoinFuzzer::makeAlternativePlans( // Use NestedLoopJoin. if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { + std::string joinCondition = filter.empty() + ? makeJoinFilter(probeKeys, buildKeys) + : fmt::format( + "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); auto planWithSplits = makeNestedLoopJoinPlan( - joinType, probeKeys, buildKeys, probeInput, buildInput, outputColumns); + joinType, + probeKeys, + buildKeys, + probeInput, + buildInput, + outputColumns, + joinCondition); plans.push_back(planWithSplits); addFlippedJoinPlan(planWithSplits.plan, plans); @@ -957,7 +992,7 @@ RowVectorPtr JoinFuzzer::testCrossProduct( probeInput, buildInput, outputColumns, - /*withFilter*/ false); + /*filter=*/""); const auto expected = execute(plan, /*injectSpill=*/false); // If OOM injection is not enabled verify the results against Reference query @@ -992,7 +1027,7 @@ RowVectorPtr JoinFuzzer::testCrossProduct( probeScanSplits, buildScanSplits, outputColumns, - /*withFilter*/ false)); + /*filter=*/"")); } addFlippedJoinPlan(plan.plan, altPlans); @@ -1011,10 +1046,27 @@ void JoinFuzzer::verify(core::JoinType joinType) { const bool nullAware = isNullAwareSupported(joinType) && vectorFuzzer_.coinToss(0.5); - const auto numKeys = nullAware ? 1 : randInt(1, 5); + // Add boolean/integer join filter 10% of the time. + const bool withFilter = vectorFuzzer_.coinToss(0.1) && FLAGS_enable_filter; + // Null-aware joins allow only one join key. + const int numKeys = nullAware ? (withFilter ? 0 : 1) : randInt(1, 5); + std::vector keyTypes = generateJoinKeyTypes(numKeys); + std::string filter; + + if (withFilter) { + if (vectorFuzzer_.coinToss(0.5)) { + keyTypes.push_back(BOOLEAN()); + filter = vectorFuzzer_.coinToss(0.5) + ? fmt::format("t{} = true", keyTypes.size() - 1) + : fmt::format("u{} = true", keyTypes.size() - 1); + } else { + keyTypes.push_back(INTEGER()); + filter = vectorFuzzer_.coinToss(0.5) + ? fmt::format("t{} % {} = 0", keyTypes.size() - 1, randInt(1, 9)) + : fmt::format("u{} % {} = 0", keyTypes.size() - 1, randInt(1, 9)); + } + } - // Pick number and types of join keys. - const std::vector keyTypes = generateJoinKeyTypes(numKeys); std::vector probeKeys = makeNames("t", keyTypes.size()); std::vector buildKeys = makeNames("u", keyTypes.size()); @@ -1094,7 +1146,8 @@ void JoinFuzzer::verify(core::JoinType joinType) { buildKeys, probeInput, buildInput, - outputColumns); + outputColumns, + filter); const auto expected = execute(defaultPlan, /*injectSpill=*/false); @@ -1110,7 +1163,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { {expected}), "Velox and Reference results don't match"); - LOG(INFO) << "Result matches with referenc DB."; + LOG(INFO) << "Result matches with reference DB."; stats_.numVerified++; } } @@ -1123,11 +1176,13 @@ void JoinFuzzer::verify(core::JoinType joinType) { buildKeys, flatProbeInput, flatBuildInput, - outputColumns)); + outputColumns, + filter)); - makeAlternativePlans(defaultPlan.plan, probeInput, buildInput, altPlans); makeAlternativePlans( - defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans); + defaultPlan.plan, probeInput, buildInput, altPlans, filter); + makeAlternativePlans( + defaultPlan.plan, flatProbeInput, flatBuildInput, altPlans, filter); addPlansWithTableScan( tableScanDir->getPath(), @@ -1138,7 +1193,8 @@ void JoinFuzzer::verify(core::JoinType joinType) { flatProbeInput, flatBuildInput, outputColumns, - altPlans); + altPlans, + filter); for (auto i = 0; i < altPlans.size(); ++i) { LOG(INFO) << "Testing plan #" << i; @@ -1190,7 +1246,8 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan( const std::vector& buildKeys, const std::vector& probeSplits, const std::vector& buildSplits, - const std::vector& outputColumns) { + const std::vector& outputColumns, + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; @@ -1208,7 +1265,7 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan( .capturePlanNodeId(buildScanId) .orderBy(buildKeys, false) .planNode(), - /*filter=*/"", + filter, outputColumns, joinType) .planNode(), @@ -1226,13 +1283,11 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlanWithTableScan( const std::vector& probeSplits, const std::vector& buildSplits, const std::vector& outputColumns, - bool withFilter) { + const std::string& filter) { auto planNodeIdGenerator = std::make_shared(); core::PlanNodeId probeScanId; core::PlanNodeId buildScanId; - const std::string filter = - withFilter ? makeJoinFilter(probeKeys, buildKeys) : ""; return JoinFuzzer::PlanWithSplits{ PlanBuilder(planNodeIdGenerator) .tableScan(probeType) @@ -1260,7 +1315,8 @@ void JoinFuzzer::addPlansWithTableScan( const std::vector& probeInput, const std::vector& buildInput, const std::vector& outputColumns, - std::vector& altPlans) { + std::vector& altPlans, + const std::string& filter) { VELOX_CHECK(!tableDir.empty()); if (!isTableScanSupported(probeInput[0]->type()) || @@ -1286,7 +1342,8 @@ void JoinFuzzer::addPlansWithTableScan( buildKeys, probeScanSplits, buildScanSplits, - outputColumns); + outputColumns, + filter); plansWithTableScan.push_back(defaultPlan); auto joinNode = @@ -1336,7 +1393,8 @@ void JoinFuzzer::addPlansWithTableScan( buildKeys, probeScanSplits, buildScanSplits, - outputColumns); + outputColumns, + filter); altPlans.push_back(planWithSplits); addFlippedJoinPlan( @@ -1350,6 +1408,10 @@ void JoinFuzzer::addPlansWithTableScan( // Add ungrouped NestedLoopJoin with TableScan. if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { + std::string joinCondition = filter.empty() + ? makeJoinFilter(probeKeys, buildKeys) + : fmt::format( + "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); auto planWithSplits = makeNestedLoopJoinPlanWithTableScan( joinType, probeType, @@ -1358,7 +1420,8 @@ void JoinFuzzer::addPlansWithTableScan( buildKeys, probeScanSplits, buildScanSplits, - outputColumns); + outputColumns, + joinCondition); altPlans.push_back(planWithSplits); addFlippedJoinPlan(