From 5ca9daa18c00dcc0244b521c5304376466d61943 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Tue, 12 Mar 2024 22:51:09 -0700 Subject: [PATCH] Extend join fuzzer to cover group execution mode Extend join fuzzer to cover group execution mode. To do this we split the probe and build input by partitioning on the join keys with one partition per each group. Then we write the split the inputs into separate files and create table scan splits from the generated files. For each existing query plan with table scan input, we create a corresponding grouped execution plan. Extend AssertQueryBuilder to support group execution configuration. --- velox/core/PlanFragment.cpp | 10 + velox/core/PlanFragment.h | 2 + velox/core/tests/PlanFragmentTest.cpp | 11 + velox/exec/tests/JoinFuzzer.cpp | 594 +++++++++++++++----- velox/exec/tests/JoinFuzzerRunner.h | 20 +- velox/exec/tests/utils/AssertQueryBuilder.h | 28 +- velox/exec/tests/utils/Cursor.h | 6 +- 7 files changed, 511 insertions(+), 160 deletions(-) diff --git a/velox/core/PlanFragment.cpp b/velox/core/PlanFragment.cpp index 338860ab2c9d..4fa862cbcc7c 100644 --- a/velox/core/PlanFragment.cpp +++ b/velox/core/PlanFragment.cpp @@ -28,4 +28,14 @@ bool PlanFragment::canSpill(const QueryConfig& queryConfig) const { }) != nullptr; } +std::string executionStrategyToString(ExecutionStrategy strategy) { + switch (strategy) { + case ExecutionStrategy::kGrouped: + return "GROUPED"; + case ExecutionStrategy::kUngrouped: + return "UNGROUPED"; + default: + return fmt::format("UNKNOWN: {}", static_cast(strategy)); + } +} } // namespace facebook::velox::core diff --git a/velox/core/PlanFragment.h b/velox/core/PlanFragment.h index 2ccd933fc78b..fd0f033b7093 100644 --- a/velox/core/PlanFragment.h +++ b/velox/core/PlanFragment.h @@ -35,6 +35,8 @@ enum class ExecutionStrategy { kGrouped, }; +std::string executionStrategyToString(ExecutionStrategy strategy); + /// Contains some information on how to execute the fragment of a plan. /// Used to construct Task. struct PlanFragment { diff --git a/velox/core/tests/PlanFragmentTest.cpp b/velox/core/tests/PlanFragmentTest.cpp index 19f564f57090..347136abcf3e 100644 --- a/velox/core/tests/PlanFragmentTest.cpp +++ b/velox/core/tests/PlanFragmentTest.cpp @@ -322,3 +322,14 @@ TEST_F(PlanFragmentTest, hashJoin) { testData.expectedCanSpill); } } + +TEST_F(PlanFragmentTest, executionStrategyToString) { + ASSERT_EQ( + executionStrategyToString(core::ExecutionStrategy::kUngrouped), + "UNGROUPED"); + ASSERT_EQ( + executionStrategyToString(core::ExecutionStrategy::kGrouped), "GROUPED"); + ASSERT_EQ( + executionStrategyToString(static_cast(999)), + "UNKNOWN: 999"); +} diff --git a/velox/exec/tests/JoinFuzzer.cpp b/velox/exec/tests/JoinFuzzer.cpp index 4d1b04255af6..bd44d550f96a 100644 --- a/velox/exec/tests/JoinFuzzer.cpp +++ b/velox/exec/tests/JoinFuzzer.cpp @@ -18,8 +18,10 @@ #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/PartitionIdGenerator.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/OperatorUtils.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -38,7 +40,7 @@ DEFINE_int32( 100, "The number of elements on each generated vector."); -DEFINE_int32(num_batches, 5, "The number of generated vectors."); +DEFINE_int32(num_batches, 10, "The number of generated vectors."); DEFINE_double( null_ratio, @@ -60,10 +62,30 @@ class JoinFuzzer { struct PlanWithSplits { core::PlanNodePtr plan; - std::unordered_map< - core::PlanNodeId, - std::vector>> + core::PlanNodeId probeScanId; + core::PlanNodeId buildScanId; + std::unordered_map> splits; + core::ExecutionStrategy executionStrategy{ + core::ExecutionStrategy::kUngrouped}; + int32_t numGroups; + + explicit PlanWithSplits( + const core::PlanNodePtr& _plan, + const core::PlanNodeId& _probeScanId = "", + const core::PlanNodeId& _buildScanId = "", + const std::unordered_map< + core::PlanNodeId, + std::vector>& _splits = {}, + core::ExecutionStrategy _executionStrategy = + core::ExecutionStrategy::kUngrouped, + int32_t _numGroups = 0) + : plan(_plan), + probeScanId(_probeScanId), + buildScanId(_buildScanId), + splits(_splits), + executionStrategy(_executionStrategy), + numGroups(_numGroups) {} }; private: @@ -76,6 +98,12 @@ class JoinFuzzer { return opts; } + static inline const std::string kHiveConnectorId = "test-hive"; + + // Makes a connector split from a file path on storage. + static std::shared_ptr makeSplit( + const std::string& filePath); + void seed(size_t seed) { currentSeed_ = seed; vectorFuzzer_.reSeed(seed); @@ -86,23 +114,63 @@ class JoinFuzzer { seed(rng_()); } - /// Randomly pick a join type to test. + // Randomly pick a join type to test. core::JoinType pickJoinType(); + // Makes the query plan with default settings in JoinFuzzer and value inputs + // for both probe and build sides. + // + // NOTE: 'probeInput' and 'buildInput' could either input rows with lazy + // vectors or flatten ones. + JoinFuzzer::PlanWithSplits makeDefaultPlan( + core::JoinType joinType, + bool nullAware, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector& probeInput, + const std::vector& buildInput, + const std::vector& outputColumns); + + // Makes the default query plan with table scan as inputs for both probe and + // build sides. + JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan( + const std::string& tableDir, + core::JoinType joinType, + bool nullAware, + const RowTypePtr& probeType, + const RowTypePtr& buildType, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector>& + probeSplits, + const std::vector>& + buildSplits, + const std::vector& outputColumns); + + // Makes the query plan from 'planWithTableScan' with grouped execution mode. + // Correspondingly, it replaces the table scan input splits with grouped ones. + JoinFuzzer::PlanWithSplits makeGroupedExecutionPlanWithTableScan( + const JoinFuzzer::PlanWithSplits& planWithTableScan, + int32_t numGroups, + const std::vector& groupedProbeScanSplits, + const std::vector& groupedBuildScanSplits); + + // Runs one test iteration from query plans generations, executions and result + // verifications. void verify(core::JoinType joinType); - /// Returns a list of randomly generated join key types. + // Returns a list of randomly generated join key types. std::vector generateJoinKeyTypes(int32_t numKeys); - /// Returns randomly generated probe input with upto 3 additional payload - /// columns. + // Returns randomly generated probe input with upto 3 additional payload + // columns. std::vector generateProbeInput( const std::vector& keyNames, const std::vector& keyTypes); - /// Same as generateProbeInput() but copies over 10% of the input in the probe - /// columns to ensure some matches during joining. Also generates an empty - /// input with a 10% chance. + // Same as generateProbeInput() but copies over 10% of the input in the probe + // columns to ensure some matches during joining. Also generates an empty + // input with a 10% chance. std::vector generateBuildInput( const std::vector& probeInput, const std::vector& probeKeys, @@ -112,6 +180,31 @@ class JoinFuzzer { std::vector& probeKeys, std::vector& buildKeys); + void addPlansWithTableScan( + const std::string& tableDir, + core::JoinType joinType, + bool nullAware, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector& probeInput, + const std::vector& buildInput, + const std::vector& outputColumns, + std::vector& altPlans); + + // Splits the input into groups by partitioning on the join keys. + std::vector> splitInputByGroup( + int32_t numGroups, + size_t numKeys, + const std::vector& inputs); + + // Generates the grouped splits. + std::vector generateSplitsWithGroup( + const std::string& tableDir, + int32_t numGroups, + bool isProbe, + size_t numKeys, + const std::vector& input); + RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); std::optional computeDuckDbResult( @@ -123,17 +216,22 @@ class JoinFuzzer { return boost::random::uniform_int_distribution(min, max)(rng_); } - static inline const std::string kHiveConnectorId = "test-hive"; - - static std::shared_ptr makeSplit( - const std::string& filePath); - FuzzerGenerator rng_; size_t currentSeed_{0}; std::shared_ptr rootPool_{ - memory::memoryManager()->addRootPool()}; - std::shared_ptr pool_{rootPool_->addLeafChild("leaf")}; + memory::memoryManager()->addRootPool( + "joinFuzzer", + memory::kMaxMemory, + memory::MemoryReclaimer::create())}; + std::shared_ptr pool_{rootPool_->addLeafChild( + "joinFuzzerLeaf", + true, + exec::MemoryReclaimer::create())}; + std::shared_ptr writerPool_{rootPool_->addAggregateChild( + "joinFuzzerWriter", + exec::MemoryReclaimer::create())}; + VectorFuzzer vectorFuzzer_; }; @@ -172,10 +270,9 @@ core::JoinType JoinFuzzer::pickJoinType() { core::JoinType::kFull, core::JoinType::kLeftSemiFilter, core::JoinType::kLeftSemiProject, - core::JoinType::kAnti, - }; + core::JoinType::kAnti}; - size_t idx = randInt(0, kJoinTypes.size() - 1); + const size_t idx = randInt(0, kJoinTypes.size() - 1); return kJoinTypes[idx]; } @@ -196,13 +293,13 @@ std::vector JoinFuzzer::generateProbeInput( std::vector types = keyTypes; // Add up to 3 payload columns. - auto numPayload = randInt(0, 3); + const auto numPayload = randInt(0, 3); for (auto i = 0; i < numPayload; ++i) { names.push_back(fmt::format("tp{}", i + keyNames.size())); types.push_back(vectorFuzzer_.randType(2 /*maxDepth*/)); } - auto inputType = ROW(std::move(names), std::move(types)); + const auto inputType = ROW(std::move(names), std::move(types)); std::vector input; for (auto i = 0; i < FLAGS_num_batches; ++i) { input.push_back(vectorFuzzer_.fuzzInputRow(inputType)); @@ -221,13 +318,13 @@ std::vector JoinFuzzer::generateBuildInput( } // Add up to 3 payload columns. - auto numPayload = randInt(0, 3); + const auto numPayload = randInt(0, 3); for (auto i = 0; i < numPayload; ++i) { names.push_back(fmt::format("bp{}", i + buildKeys.size())); types.push_back(vectorFuzzer_.randType(2 /*maxDepth*/)); } - auto rowType = ROW(std::move(names), std::move(types)); + const auto rowType = ROW(std::move(names), std::move(types)); // 1 in 10 times use empty build. // TODO Use non-empty build with no matches sometimes. @@ -283,12 +380,26 @@ std::vector flatten(const std::vector& vectors) { } RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { - LOG(INFO) << "Executing query plan: " << std::endl - << plan.plan->toString(true, true); + LOG(ERROR) << "Executing query plan with " + << executionStrategyToString(plan.executionStrategy) + << " strategy[" + << (plan.executionStrategy == core::ExecutionStrategy::kGrouped + ? plan.numGroups + : 0) + << " groups]" << (injectSpill ? " and spilling injection" : "") + << ": " << std::endl + << plan.plan->toString(true, true); AssertQueryBuilder builder(plan.plan); - for (const auto& [nodeId, nodeSplits] : plan.splits) { - builder.splits(nodeId, nodeSplits); + for (const auto& [planNodeId, nodeSplits] : plan.splits) { + builder.splits(planNodeId, nodeSplits); + } + + if (plan.executionStrategy == core::ExecutionStrategy::kGrouped) { + builder.executionStrategy(core::ExecutionStrategy::kGrouped); + builder.groupedExecutionLeafNodeIds({plan.probeScanId, plan.buildScanId}); + builder.numSplitGroups(plan.numGroups); + builder.numConcurrentSplitGroups(randInt(1, plan.numGroups)); } std::shared_ptr spillDirectory; @@ -302,11 +413,15 @@ RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { } TestScopedSpillInjection scopedSpillInjection(spillPct); - auto result = builder.maxDrivers(2).copyResults(pool_.get()); + const auto result = builder.maxDrivers(2).copyResults(pool_.get()); LOG(INFO) << "Results: " << result->toString(); if (VLOG_IS_ON(1)) { VLOG(1) << std::endl << result->toString(0, result->size()); } + // Wait for the task to be destroyed before start next query execution to + // avoid the potential interference of the background activities across query + // executions. + waitForAllTasksToBeDeleted(); return result; } @@ -404,10 +519,10 @@ std::optional JoinFuzzer::computeDuckDbResult( queryRunner.createTable("t", probeInput); queryRunner.createTable("u", buildInput); - auto joinNode = dynamic_cast(plan.get()); + auto* joinNode = dynamic_cast(plan.get()); VELOX_CHECK_NOT_NULL(joinNode); - auto joinKeysToSql = [](auto keys) { + const auto joinKeysToSql = [](auto keys) { std::stringstream out; for (auto i = 0; i < keys.size(); ++i) { if (i > 0) { @@ -418,7 +533,7 @@ std::optional JoinFuzzer::computeDuckDbResult( return out.str(); }; - auto equiClausesToSql = [](auto joinNode) { + const auto equiClausesToSql = [](auto joinNode) { std::stringstream out; for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { if (i > 0) { @@ -478,7 +593,8 @@ std::optional JoinFuzzer::computeDuckDbResult( } break; default: - VELOX_UNREACHABLE(); + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); } return queryRunner.execute(sql.str(), plan->outputType()); @@ -494,14 +610,14 @@ std::vector fieldNames( return names; } -JoinFuzzer::PlanWithSplits makeDefaultPlan( +JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( core::JoinType joinType, bool nullAware, const std::vector& probeKeys, const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& output) { + const std::vector& outputColumns) { auto planNodeIdGenerator = std::make_shared(); auto plan = PlanBuilder(planNodeIdGenerator) @@ -510,45 +626,75 @@ JoinFuzzer::PlanWithSplits makeDefaultPlan( probeKeys, buildKeys, PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - "" /*filter*/, - output, + /*filter=*/"", + outputColumns, joinType, nullAware) .planNode(); - return {plan, {}}; + return PlanWithSplits{plan}; } -JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan( +std::vector fromConnectorSplits( + std::vector> connectorSplits, + int32_t groupId = -1) { + std::vector splits; + splits.reserve(connectorSplits.size()); + for (auto& connectorSplit : connectorSplits) { + splits.emplace_back(exec::Split{std::move(connectorSplit), groupId}); + } + return splits; +} + +JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( + const std::string& tableDir, core::JoinType joinType, bool nullAware, + const RowTypePtr& probeType, + const RowTypePtr& buildType, const std::vector& probeKeys, const std::vector& buildKeys, - const RowTypePtr& probeInputType, - const std::vector>& - probeSplits, - const RowTypePtr& buildInputType, - const std::vector>& - buildSplits, - const std::vector& output) { + const std::vector>& probeSplits, + const std::vector>& buildSplits, + const std::vector& outputColumns) { auto planNodeIdGenerator = std::make_shared(); - core::PlanNodeId probeId; - core::PlanNodeId buildId; + core::PlanNodeId probeScanId; + core::PlanNodeId buildScanId; auto plan = PlanBuilder(planNodeIdGenerator) - .tableScan(probeInputType) - .capturePlanNodeId(probeId) + .tableScan(probeType) + .capturePlanNodeId(probeScanId) .hashJoin( probeKeys, buildKeys, PlanBuilder(planNodeIdGenerator) - .tableScan(buildInputType) - .capturePlanNodeId(buildId) + .tableScan(buildType) + .capturePlanNodeId(buildScanId) .planNode(), - "" /*filter*/, - output, + /*filter=*/"", + outputColumns, joinType, nullAware) .planNode(); - return {plan, {{probeId, probeSplits}, {buildId, buildSplits}}}; + return PlanWithSplits{ + plan, + probeScanId, + buildScanId, + {{probeScanId, fromConnectorSplits(probeSplits)}, + {buildScanId, fromConnectorSplits(buildSplits)}}}; +} + +JoinFuzzer::PlanWithSplits JoinFuzzer::makeGroupedExecutionPlanWithTableScan( + const JoinFuzzer::PlanWithSplits& planWithTableScan, + int32_t numGroups, + const std::vector& groupedProbeScanSplits, + const std::vector& groupedBuildScanSplits) { + return PlanWithSplits{ + planWithTableScan.plan, + planWithTableScan.probeScanId, + planWithTableScan.buildScanId, + {{planWithTableScan.probeScanId, groupedProbeScanSplits}, + {planWithTableScan.buildScanId, groupedBuildScanSplits}}, + core::ExecutionStrategy::kGrouped, + numGroups}; } std::vector makeSources( @@ -579,51 +725,49 @@ void makeAlternativePlans( // Flip join sides. if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { - plans.push_back({flippedPlan, {}}); + plans.push_back(JoinFuzzer::PlanWithSplits{flippedPlan}); } // Parallelize probe and build sides. - auto probeKeys = fieldNames(joinNode->leftKeys()); - auto buildKeys = fieldNames(joinNode->rightKeys()); + const auto probeKeys = fieldNames(joinNode->leftKeys()); + const auto buildKeys = fieldNames(joinNode->rightKeys()); auto planNodeIdGenerator = std::make_shared(); - plans.push_back( - {PlanBuilder(planNodeIdGenerator) - .localPartitionRoundRobin( - makeSources(probeInput, planNodeIdGenerator)) - .hashJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .localPartitionRoundRobin( - makeSources(buildInput, planNodeIdGenerator)) - .planNode(), - "" /*filter*/, - joinNode->outputType()->names(), - joinNode->joinType(), - joinNode->isNullAware()) - .planNode(), - {}}); + plans.push_back(JoinFuzzer::PlanWithSplits{ + PlanBuilder(planNodeIdGenerator) + .localPartitionRoundRobin( + makeSources(probeInput, planNodeIdGenerator)) + .hashJoin( + probeKeys, + buildKeys, + PlanBuilder(planNodeIdGenerator) + .localPartitionRoundRobin( + makeSources(buildInput, planNodeIdGenerator)) + .planNode(), + /*filter=*/"", + joinNode->outputType()->names(), + joinNode->joinType(), + joinNode->isNullAware()) + .planNode()}); // Use OrderBy + MergeJoin (if join type is inner or left). if (joinNode->isInnerJoin() || joinNode->isLeftJoin()) { planNodeIdGenerator->reset(); - plans.push_back( - {PlanBuilder(planNodeIdGenerator) - .values(probeInput) - .orderBy(probeKeys, false) - .mergeJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .values(buildInput) - .orderBy(buildKeys, false) - .planNode(), - "" /*filter*/, - asRowType(joinNode->outputType())->names(), - joinNode->joinType()) - .planNode(), - {}}); + plans.push_back({JoinFuzzer::PlanWithSplits{ + PlanBuilder(planNodeIdGenerator) + .values(probeInput) + .orderBy(probeKeys, false) + .mergeJoin( + probeKeys, + buildKeys, + PlanBuilder(planNodeIdGenerator) + .values(buildInput) + .orderBy(buildKeys, false) + .planNode(), + /*filter=*/"", + asRowType(joinNode->outputType())->names(), + joinNode->joinType()) + .planNode()}}); } } @@ -715,7 +859,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { const auto numKeys = nullAware ? 1 : randInt(1, 5); // Pick number and types of join keys. - std::vector keyTypes = generateJoinKeyTypes(numKeys); + const std::vector keyTypes = generateJoinKeyTypes(numKeys); std::vector probeKeys = makeNames("t", keyTypes.size()); std::vector buildKeys = makeNames("u", keyTypes.size()); @@ -738,7 +882,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { } } - auto output = + auto outputColumns = (core::isLeftSemiProjectJoin(joinType) || core::isLeftSemiFilterJoin(joinType) || core::isAntiJoin(joinType)) ? asRowType(probeInput[0]->type())->names() @@ -747,36 +891,36 @@ void JoinFuzzer::verify(core::JoinType joinType) { ->names(); // Shuffle output columns. - std::shuffle(output.begin(), output.end(), rng_); + std::shuffle(outputColumns.begin(), outputColumns.end(), rng_); // Remove some output columns. - auto numOutput = randInt(1, output.size()); - output.resize(numOutput); + const auto numOutput = randInt(1, outputColumns.size()); + outputColumns.resize(numOutput); if (core::isLeftSemiProjectJoin(joinType) || core::isRightSemiProjectJoin(joinType)) { - output.push_back("match"); + outputColumns.push_back("match"); } shuffleJoinKeys(probeKeys, buildKeys); - auto plan = makeDefaultPlan( + const auto defaultPlan = makeDefaultPlan( joinType, nullAware, probeKeys, buildKeys, probeInput, buildInput, - output); + outputColumns); - auto expected = execute(plan, false /*injectSpill*/); + const auto expected = execute(defaultPlan, /*injectSpill=*/false); // Verify results against DuckDB. if (auto duckDbResult = - computeDuckDbResult(probeInput, buildInput, plan.plan)) { + computeDuckDbResult(probeInput, buildInput, defaultPlan.plan)) { VELOX_CHECK( assertEqualResults( - duckDbResult.value(), plan.plan->outputType(), {expected}), + duckDbResult.value(), defaultPlan.plan->outputType(), {expected}), "Velox and DuckDB results don't match"); } @@ -788,58 +932,23 @@ void JoinFuzzer::verify(core::JoinType joinType) { buildKeys, flatProbeInput, flatBuildInput, - output)); - makeAlternativePlans(plan.plan, probeInput, buildInput, altPlans); - makeAlternativePlans(plan.plan, flatProbeInput, flatBuildInput, altPlans); - - auto directory = exec::test::TempDirectoryPath::create(); - - if (isTableScanSupported(probeInput[0]->type()) && - isTableScanSupported(buildInput[0]->type())) { - auto writerPool = rootPool_->addAggregateChild("writer"); - - std::vector> probeSplits; - for (auto i = 0; i < probeInput.size(); ++i) { - const std::string filePath = - fmt::format("{}/probe{}", directory->path, i); - writeToFile(filePath, probeInput[i], writerPool.get()); - probeSplits.push_back(makeSplit(filePath)); - } - - std::vector> buildSplits; - for (auto i = 0; i < buildInput.size(); ++i) { - const std::string filePath = - fmt::format("{}/build{}", directory->path, i); - writeToFile(filePath, buildInput[i], writerPool.get()); - buildSplits.push_back(makeSplit(filePath)); - } + outputColumns)); - auto tableScanPlan = makeDefaultPlanWithTableScan( - joinType, - nullAware, - probeKeys, - buildKeys, - asRowType(probeInput[0]->type()), - probeSplits, - asRowType(buildInput[0]->type()), - buildSplits, - output); - - altPlans.push_back(tableScanPlan); - - auto joinNode = - std::dynamic_pointer_cast(tableScanPlan.plan); - VELOX_CHECK_NOT_NULL(joinNode); - - // Flip join sides. - if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { - altPlans.push_back({flippedPlan, tableScanPlan.splits}); - } - } + const auto tableScanDir = exec::test::TempDirectoryPath::create(); + addPlansWithTableScan( + tableScanDir->path, + joinType, + nullAware, + probeKeys, + buildKeys, + flatProbeInput, + flatBuildInput, + outputColumns, + altPlans); for (auto i = 0; i < altPlans.size(); ++i) { LOG(INFO) << "Testing plan #" << i; - auto actual = execute(altPlans[i], false /*injectSpill*/); + auto actual = execute(altPlans[i], /*injectSpill=*/false); VELOX_CHECK( assertEqualResults({expected}, {actual}), "Logically equivalent plans produced different results"); @@ -854,7 +963,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { } LOG(INFO) << "Testing plan #" << i << " with spilling"; - actual = execute(altPlans[i], true /*injectSpill*/); + actual = execute(altPlans[i], /*=injectSpill=*/true); VELOX_CHECK( assertEqualResults({expected}, {actual}), "Logically equivalent plans produced different results"); @@ -862,12 +971,189 @@ void JoinFuzzer::verify(core::JoinType joinType) { } } +void JoinFuzzer::addPlansWithTableScan( + const std::string& tableDir, + core::JoinType joinType, + bool nullAware, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector& probeInput, + const std::vector& buildInput, + const std::vector& outputColumns, + std::vector& altPlans) { + VELOX_CHECK(!tableDir.empty()); + + if (!isTableScanSupported(probeInput[0]->type()) || + !isTableScanSupported(buildInput[0]->type())) { + return; + } + + std::vector> probeScanSplits; + for (auto i = 0; i < probeInput.size(); ++i) { + const std::string filePath = fmt::format("{}/probe{}", tableDir, i); + writeToFile(filePath, probeInput[i], writerPool_.get()); + probeScanSplits.push_back(makeSplit(filePath)); + } + + std::vector> buildScanSplits; + for (auto i = 0; i < buildInput.size(); ++i) { + const std::string filePath = fmt::format("{}/build{}", tableDir, i); + writeToFile(filePath, buildInput[i], writerPool_.get()); + buildScanSplits.push_back(makeSplit(filePath)); + } + + std::vector plansWithTableScan; + auto defaultPlan = makeDefaultPlanWithTableScan( + tableDir, + joinType, + nullAware, + asRowType(probeInput[0]->type()), + asRowType(buildInput[0]->type()), + probeKeys, + buildKeys, + probeScanSplits, + buildScanSplits, + outputColumns); + plansWithTableScan.push_back(defaultPlan); + + auto joinNode = + std::dynamic_pointer_cast(defaultPlan.plan); + VELOX_CHECK_NOT_NULL(joinNode); + + // Flip join sides. + if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { + plansWithTableScan.push_back(PlanWithSplits{ + flippedPlan, + defaultPlan.probeScanId, + defaultPlan.buildScanId, + defaultPlan.splits, + core::ExecutionStrategy::kUngrouped, + 0}); + } + + const int32_t numGroups = randInt(1, probeScanSplits.size()); + const std::vector groupedProbeScanSplits = + generateSplitsWithGroup( + tableDir, numGroups, /*isProbe=*/true, probeKeys.size(), probeInput); + const std::vector groupedBuildScanSplits = + generateSplitsWithGroup( + tableDir, numGroups, /*isProbe=*/false, buildKeys.size(), buildInput); + + for (const auto& planWithTableScan : plansWithTableScan) { + altPlans.push_back(planWithTableScan); + altPlans.push_back(makeGroupedExecutionPlanWithTableScan( + planWithTableScan, + numGroups, + groupedProbeScanSplits, + groupedBuildScanSplits)); + } +} + +std::vector JoinFuzzer::generateSplitsWithGroup( + const std::string& tableDir, + int32_t numGroups, + bool isProbe, + size_t numKeys, + const std::vector& input) { + const std::vector> inputVectorsByGroup = + splitInputByGroup(numGroups, numKeys, input); + + std::vector splitsWithGroup; + for (int32_t groupId = 0; groupId < numGroups; ++groupId) { + for (auto i = 0; i < inputVectorsByGroup[groupId].size(); ++i) { + const std::string filePath = fmt::format( + "{}/grouped[{}].{}.{}", + tableDir, + groupId, + isProbe ? "probe" : "build", + i); + writeToFile(filePath, inputVectorsByGroup[groupId][i], writerPool_.get()); + splitsWithGroup.push_back(exec::Split{makeSplit(filePath), groupId}); + } + splitsWithGroup.push_back(exec::Split{nullptr, groupId}); + } + return splitsWithGroup; +} + +std::vector> JoinFuzzer::splitInputByGroup( + int32_t numGroups, + size_t numKeys, + const std::vector& inputs) { + if (numGroups == 1) { + return {inputs}; + } + + // Partition 'input' based on the join keys for group execution with one + // partition per each group. + const RowTypePtr& inputType = asRowType(inputs[0]->type()); + std::vector partitionChannels(numKeys); + std::iota(partitionChannels.begin(), partitionChannels.end(), 0); + std::vector> hashers; + hashers.reserve(numKeys); + for (auto channel : partitionChannels) { + hashers.emplace_back( + exec::VectorHasher::create(inputType->childAt(channel), channel)); + } + + std::vector> inputsByGroup{ + static_cast(numGroups)}; + raw_vector groupHashes; + std::vector groupRows(numGroups); + std::vector rawGroupRows(numGroups); + std::vector groupSizes(numGroups, 0); + SelectivityVector inputRows; + + for (const auto& input : inputs) { + const int numRows = input->size(); + inputRows.resize(numRows); + inputRows.setAll(); + groupHashes.resize(numRows); + std::fill(groupSizes.begin(), groupSizes.end(), 0); + std::fill(groupHashes.begin(), groupHashes.end(), 0); + + for (auto i = 0; i < hashers.size(); ++i) { + auto& hasher = hashers[i]; + auto* keyVector = input->childAt(hashers[i]->channel())->loadedVector(); + hashers[i]->decode(*keyVector, inputRows); + if (hasher->channel() != kConstantChannel) { + hashers[i]->hash(inputRows, i > 0, groupHashes); + } else { + hashers[i]->hashPrecomputed(inputRows, i > 0, groupHashes); + } + } + + for (int row = 0; row < numRows; ++row) { + const int32_t groupId = groupHashes[row] % numGroups; + if (groupRows[groupId] == nullptr || + (groupRows[groupId]->capacity() < numRows * sizeof(vector_size_t))) { + groupRows[groupId] = allocateIndices(numRows, pool_.get()); + rawGroupRows[groupId] = groupRows[groupId]->asMutable(); + } + rawGroupRows[groupId][groupSizes[groupId]++] = row; + } + + for (int32_t groupId = 0; groupId < numGroups; ++groupId) { + const size_t groupSize = groupSizes[groupId]; + if (groupSize != 0) { + VELOX_CHECK_NOT_NULL(groupRows[groupId]); + groupRows[groupId]->setSize( + groupSizes[groupId] * sizeof(vector_size_t)); + inputsByGroup[groupId].push_back( + (groupSize == numRows) + ? input + : exec::wrap(groupSize, std::move(groupRows[groupId]), input)); + } + } + } + return inputsByGroup; +} + void JoinFuzzer::go() { VELOX_CHECK( FLAGS_steps > 0 || FLAGS_duration_sec > 0, "Either --steps or --duration_sec needs to be greater than zero.") - auto startTime = std::chrono::system_clock::now(); + const auto startTime = std::chrono::system_clock::now(); size_t iteration = 0; while (!isDone(iteration, startTime)) { @@ -875,7 +1161,7 @@ void JoinFuzzer::go() { << iteration << " (seed: " << currentSeed_ << ")"; // Pick join type. - auto joinType = pickJoinType(); + const auto joinType = pickJoinType(); verify(joinType); diff --git a/velox/exec/tests/JoinFuzzerRunner.h b/velox/exec/tests/JoinFuzzerRunner.h index 15d388915a8f..524240fe58ee 100644 --- a/velox/exec/tests/JoinFuzzerRunner.h +++ b/velox/exec/tests/JoinFuzzerRunner.h @@ -18,6 +18,8 @@ #include #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/SharedArbitrator.h" +#include "velox/exec/MemoryReclaimer.h" #include "velox/exec/tests/JoinFuzzer.h" #include "velox/serializers/PrestoSerializer.h" @@ -56,7 +58,7 @@ class JoinFuzzerRunner { public: static int run(size_t seed) { - facebook::velox::memory::MemoryManager::initialize({}); + setupMemory(); facebook::velox::serializer::presto::PrestoVectorSerde:: registerVectorSerde(); facebook::velox::filesystems::registerLocalFileSystem(); @@ -64,4 +66,20 @@ class JoinFuzzerRunner { facebook::velox::exec::test::joinFuzzer(seed); return RUN_ALL_TESTS(); } + + private: + // Invoked to set up memory system with arbitration. + static void setupMemory() { + FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true; + FLAGS_velox_memory_leak_check_enabled = true; + facebook::velox::memory::SharedArbitrator::registerFactory(); + facebook::velox::memory::MemoryManagerOptions options; + options.allocatorCapacity = 8L << 30; + options.arbitratorCapacity = 6L << 30; + options.arbitratorKind = "SHARED"; + options.checkUsageLeak = true; + options.arbitrationStateCheckCb = + facebook::velox::exec::memoryArbitrationStateCheck; + facebook::velox::memory::MemoryManager::initialize(options); + } }; diff --git a/velox/exec/tests/utils/AssertQueryBuilder.h b/velox/exec/tests/utils/AssertQueryBuilder.h index 863dc6867bb9..be7bbd387168 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.h +++ b/velox/exec/tests/utils/AssertQueryBuilder.h @@ -118,7 +118,31 @@ class AssertQueryBuilder { return *this; } - // Methods to run the query and verify the results. + /// Methods to configure the group execution mode. + AssertQueryBuilder& executionStrategy( + core::ExecutionStrategy executionStrategy) { + params_.executionStrategy = executionStrategy; + return *this; + } + + AssertQueryBuilder& numSplitGroups(int numSplitGroups) { + params_.numSplitGroups = numSplitGroups; + return *this; + } + + AssertQueryBuilder& numConcurrentSplitGroups( + int32_t numConcurrentSplitGroups) { + params_.numConcurrentSplitGroups = numConcurrentSplitGroups; + return *this; + } + + AssertQueryBuilder& groupedExecutionLeafNodeIds( + const std::unordered_set& groupedExecutionLeafNodeIds) { + params_.groupedExecutionLeafNodeIds = groupedExecutionLeafNodeIds; + return *this; + } + + /// Methods to run the query and verify the results. /// Run the query and verify results against DuckDB. Requires /// duckDbQueryRunner to be provided in the constructor. @@ -158,7 +182,7 @@ class AssertQueryBuilder { // Used by the created task as the default driver executor. std::unique_ptr executor_{ new folly::CPUThreadPoolExecutor(std::thread::hardware_concurrency())}; - DuckDbQueryRunner* FOLLY_NULLABLE const duckDbQueryRunner_; + DuckDbQueryRunner* const duckDbQueryRunner_; CursorParameters params_; std::unordered_map configs_; std::unordered_map> diff --git a/velox/exec/tests/utils/Cursor.h b/velox/exec/tests/utils/Cursor.h index fe1c3e17c04a..6f4a0de7d2a9 100644 --- a/velox/exec/tests/utils/Cursor.h +++ b/velox/exec/tests/utils/Cursor.h @@ -46,15 +46,15 @@ struct CursorParameters { uint64_t bufferedBytes = 512 * 1024; - // Ungrouped (by default) or grouped (bucketed) execution. + /// Ungrouped (by default) or grouped (bucketed) execution. core::ExecutionStrategy executionStrategy{ core::ExecutionStrategy::kUngrouped}; /// Contains leaf plan nodes that need to be executed in the grouped mode. std::unordered_set groupedExecutionLeafNodeIds; - // Number of splits groups the task will be processing. Must be 1 for - // ungrouped execution. + /// Number of splits groups the task will be processing. Must be 1 for + /// ungrouped execution. int numSplitGroups{1}; /// Spilling directory, if not empty, then the task's spilling directory would