diff --git a/velox/core/PlanFragment.cpp b/velox/core/PlanFragment.cpp index 338860ab2c9d..4fa862cbcc7c 100644 --- a/velox/core/PlanFragment.cpp +++ b/velox/core/PlanFragment.cpp @@ -28,4 +28,14 @@ bool PlanFragment::canSpill(const QueryConfig& queryConfig) const { }) != nullptr; } +std::string executionStrategyToString(ExecutionStrategy strategy) { + switch (strategy) { + case ExecutionStrategy::kGrouped: + return "GROUPED"; + case ExecutionStrategy::kUngrouped: + return "UNGROUPED"; + default: + return fmt::format("UNKNOWN: {}", static_cast(strategy)); + } +} } // namespace facebook::velox::core diff --git a/velox/core/PlanFragment.h b/velox/core/PlanFragment.h index 2ccd933fc78b..fd0f033b7093 100644 --- a/velox/core/PlanFragment.h +++ b/velox/core/PlanFragment.h @@ -35,6 +35,8 @@ enum class ExecutionStrategy { kGrouped, }; +std::string executionStrategyToString(ExecutionStrategy strategy); + /// Contains some information on how to execute the fragment of a plan. /// Used to construct Task. struct PlanFragment { diff --git a/velox/core/tests/PlanFragmentTest.cpp b/velox/core/tests/PlanFragmentTest.cpp index 19f564f57090..347136abcf3e 100644 --- a/velox/core/tests/PlanFragmentTest.cpp +++ b/velox/core/tests/PlanFragmentTest.cpp @@ -322,3 +322,14 @@ TEST_F(PlanFragmentTest, hashJoin) { testData.expectedCanSpill); } } + +TEST_F(PlanFragmentTest, executionStrategyToString) { + ASSERT_EQ( + executionStrategyToString(core::ExecutionStrategy::kUngrouped), + "UNGROUPED"); + ASSERT_EQ( + executionStrategyToString(core::ExecutionStrategy::kGrouped), "GROUPED"); + ASSERT_EQ( + executionStrategyToString(static_cast(999)), + "UNKNOWN: 999"); +} diff --git a/velox/exec/tests/JoinFuzzer.cpp b/velox/exec/tests/JoinFuzzer.cpp index 4d1b04255af6..bd44d550f96a 100644 --- a/velox/exec/tests/JoinFuzzer.cpp +++ b/velox/exec/tests/JoinFuzzer.cpp @@ -18,8 +18,10 @@ #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/PartitionIdGenerator.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/OperatorUtils.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -38,7 +40,7 @@ DEFINE_int32( 100, "The number of elements on each generated vector."); -DEFINE_int32(num_batches, 5, "The number of generated vectors."); +DEFINE_int32(num_batches, 10, "The number of generated vectors."); DEFINE_double( null_ratio, @@ -60,10 +62,30 @@ class JoinFuzzer { struct PlanWithSplits { core::PlanNodePtr plan; - std::unordered_map< - core::PlanNodeId, - std::vector>> + core::PlanNodeId probeScanId; + core::PlanNodeId buildScanId; + std::unordered_map> splits; + core::ExecutionStrategy executionStrategy{ + core::ExecutionStrategy::kUngrouped}; + int32_t numGroups; + + explicit PlanWithSplits( + const core::PlanNodePtr& _plan, + const core::PlanNodeId& _probeScanId = "", + const core::PlanNodeId& _buildScanId = "", + const std::unordered_map< + core::PlanNodeId, + std::vector>& _splits = {}, + core::ExecutionStrategy _executionStrategy = + core::ExecutionStrategy::kUngrouped, + int32_t _numGroups = 0) + : plan(_plan), + probeScanId(_probeScanId), + buildScanId(_buildScanId), + splits(_splits), + executionStrategy(_executionStrategy), + numGroups(_numGroups) {} }; private: @@ -76,6 +98,12 @@ class JoinFuzzer { return opts; } + static inline const std::string kHiveConnectorId = "test-hive"; + + // Makes a connector split from a file path on storage. + static std::shared_ptr makeSplit( + const std::string& filePath); + void seed(size_t seed) { currentSeed_ = seed; vectorFuzzer_.reSeed(seed); @@ -86,23 +114,63 @@ class JoinFuzzer { seed(rng_()); } - /// Randomly pick a join type to test. + // Randomly pick a join type to test. core::JoinType pickJoinType(); + // Makes the query plan with default settings in JoinFuzzer and value inputs + // for both probe and build sides. + // + // NOTE: 'probeInput' and 'buildInput' could either input rows with lazy + // vectors or flatten ones. + JoinFuzzer::PlanWithSplits makeDefaultPlan( + core::JoinType joinType, + bool nullAware, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector& probeInput, + const std::vector& buildInput, + const std::vector& outputColumns); + + // Makes the default query plan with table scan as inputs for both probe and + // build sides. + JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan( + const std::string& tableDir, + core::JoinType joinType, + bool nullAware, + const RowTypePtr& probeType, + const RowTypePtr& buildType, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector>& + probeSplits, + const std::vector>& + buildSplits, + const std::vector& outputColumns); + + // Makes the query plan from 'planWithTableScan' with grouped execution mode. + // Correspondingly, it replaces the table scan input splits with grouped ones. + JoinFuzzer::PlanWithSplits makeGroupedExecutionPlanWithTableScan( + const JoinFuzzer::PlanWithSplits& planWithTableScan, + int32_t numGroups, + const std::vector& groupedProbeScanSplits, + const std::vector& groupedBuildScanSplits); + + // Runs one test iteration from query plans generations, executions and result + // verifications. void verify(core::JoinType joinType); - /// Returns a list of randomly generated join key types. + // Returns a list of randomly generated join key types. std::vector generateJoinKeyTypes(int32_t numKeys); - /// Returns randomly generated probe input with upto 3 additional payload - /// columns. + // Returns randomly generated probe input with upto 3 additional payload + // columns. std::vector generateProbeInput( const std::vector& keyNames, const std::vector& keyTypes); - /// Same as generateProbeInput() but copies over 10% of the input in the probe - /// columns to ensure some matches during joining. Also generates an empty - /// input with a 10% chance. + // Same as generateProbeInput() but copies over 10% of the input in the probe + // columns to ensure some matches during joining. Also generates an empty + // input with a 10% chance. std::vector generateBuildInput( const std::vector& probeInput, const std::vector& probeKeys, @@ -112,6 +180,31 @@ class JoinFuzzer { std::vector& probeKeys, std::vector& buildKeys); + void addPlansWithTableScan( + const std::string& tableDir, + core::JoinType joinType, + bool nullAware, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector& probeInput, + const std::vector& buildInput, + const std::vector& outputColumns, + std::vector& altPlans); + + // Splits the input into groups by partitioning on the join keys. + std::vector> splitInputByGroup( + int32_t numGroups, + size_t numKeys, + const std::vector& inputs); + + // Generates the grouped splits. + std::vector generateSplitsWithGroup( + const std::string& tableDir, + int32_t numGroups, + bool isProbe, + size_t numKeys, + const std::vector& input); + RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); std::optional computeDuckDbResult( @@ -123,17 +216,22 @@ class JoinFuzzer { return boost::random::uniform_int_distribution(min, max)(rng_); } - static inline const std::string kHiveConnectorId = "test-hive"; - - static std::shared_ptr makeSplit( - const std::string& filePath); - FuzzerGenerator rng_; size_t currentSeed_{0}; std::shared_ptr rootPool_{ - memory::memoryManager()->addRootPool()}; - std::shared_ptr pool_{rootPool_->addLeafChild("leaf")}; + memory::memoryManager()->addRootPool( + "joinFuzzer", + memory::kMaxMemory, + memory::MemoryReclaimer::create())}; + std::shared_ptr pool_{rootPool_->addLeafChild( + "joinFuzzerLeaf", + true, + exec::MemoryReclaimer::create())}; + std::shared_ptr writerPool_{rootPool_->addAggregateChild( + "joinFuzzerWriter", + exec::MemoryReclaimer::create())}; + VectorFuzzer vectorFuzzer_; }; @@ -172,10 +270,9 @@ core::JoinType JoinFuzzer::pickJoinType() { core::JoinType::kFull, core::JoinType::kLeftSemiFilter, core::JoinType::kLeftSemiProject, - core::JoinType::kAnti, - }; + core::JoinType::kAnti}; - size_t idx = randInt(0, kJoinTypes.size() - 1); + const size_t idx = randInt(0, kJoinTypes.size() - 1); return kJoinTypes[idx]; } @@ -196,13 +293,13 @@ std::vector JoinFuzzer::generateProbeInput( std::vector types = keyTypes; // Add up to 3 payload columns. - auto numPayload = randInt(0, 3); + const auto numPayload = randInt(0, 3); for (auto i = 0; i < numPayload; ++i) { names.push_back(fmt::format("tp{}", i + keyNames.size())); types.push_back(vectorFuzzer_.randType(2 /*maxDepth*/)); } - auto inputType = ROW(std::move(names), std::move(types)); + const auto inputType = ROW(std::move(names), std::move(types)); std::vector input; for (auto i = 0; i < FLAGS_num_batches; ++i) { input.push_back(vectorFuzzer_.fuzzInputRow(inputType)); @@ -221,13 +318,13 @@ std::vector JoinFuzzer::generateBuildInput( } // Add up to 3 payload columns. - auto numPayload = randInt(0, 3); + const auto numPayload = randInt(0, 3); for (auto i = 0; i < numPayload; ++i) { names.push_back(fmt::format("bp{}", i + buildKeys.size())); types.push_back(vectorFuzzer_.randType(2 /*maxDepth*/)); } - auto rowType = ROW(std::move(names), std::move(types)); + const auto rowType = ROW(std::move(names), std::move(types)); // 1 in 10 times use empty build. // TODO Use non-empty build with no matches sometimes. @@ -283,12 +380,26 @@ std::vector flatten(const std::vector& vectors) { } RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { - LOG(INFO) << "Executing query plan: " << std::endl - << plan.plan->toString(true, true); + LOG(ERROR) << "Executing query plan with " + << executionStrategyToString(plan.executionStrategy) + << " strategy[" + << (plan.executionStrategy == core::ExecutionStrategy::kGrouped + ? plan.numGroups + : 0) + << " groups]" << (injectSpill ? " and spilling injection" : "") + << ": " << std::endl + << plan.plan->toString(true, true); AssertQueryBuilder builder(plan.plan); - for (const auto& [nodeId, nodeSplits] : plan.splits) { - builder.splits(nodeId, nodeSplits); + for (const auto& [planNodeId, nodeSplits] : plan.splits) { + builder.splits(planNodeId, nodeSplits); + } + + if (plan.executionStrategy == core::ExecutionStrategy::kGrouped) { + builder.executionStrategy(core::ExecutionStrategy::kGrouped); + builder.groupedExecutionLeafNodeIds({plan.probeScanId, plan.buildScanId}); + builder.numSplitGroups(plan.numGroups); + builder.numConcurrentSplitGroups(randInt(1, plan.numGroups)); } std::shared_ptr spillDirectory; @@ -302,11 +413,15 @@ RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { } TestScopedSpillInjection scopedSpillInjection(spillPct); - auto result = builder.maxDrivers(2).copyResults(pool_.get()); + const auto result = builder.maxDrivers(2).copyResults(pool_.get()); LOG(INFO) << "Results: " << result->toString(); if (VLOG_IS_ON(1)) { VLOG(1) << std::endl << result->toString(0, result->size()); } + // Wait for the task to be destroyed before start next query execution to + // avoid the potential interference of the background activities across query + // executions. + waitForAllTasksToBeDeleted(); return result; } @@ -404,10 +519,10 @@ std::optional JoinFuzzer::computeDuckDbResult( queryRunner.createTable("t", probeInput); queryRunner.createTable("u", buildInput); - auto joinNode = dynamic_cast(plan.get()); + auto* joinNode = dynamic_cast(plan.get()); VELOX_CHECK_NOT_NULL(joinNode); - auto joinKeysToSql = [](auto keys) { + const auto joinKeysToSql = [](auto keys) { std::stringstream out; for (auto i = 0; i < keys.size(); ++i) { if (i > 0) { @@ -418,7 +533,7 @@ std::optional JoinFuzzer::computeDuckDbResult( return out.str(); }; - auto equiClausesToSql = [](auto joinNode) { + const auto equiClausesToSql = [](auto joinNode) { std::stringstream out; for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { if (i > 0) { @@ -478,7 +593,8 @@ std::optional JoinFuzzer::computeDuckDbResult( } break; default: - VELOX_UNREACHABLE(); + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); } return queryRunner.execute(sql.str(), plan->outputType()); @@ -494,14 +610,14 @@ std::vector fieldNames( return names; } -JoinFuzzer::PlanWithSplits makeDefaultPlan( +JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( core::JoinType joinType, bool nullAware, const std::vector& probeKeys, const std::vector& buildKeys, const std::vector& probeInput, const std::vector& buildInput, - const std::vector& output) { + const std::vector& outputColumns) { auto planNodeIdGenerator = std::make_shared(); auto plan = PlanBuilder(planNodeIdGenerator) @@ -510,45 +626,75 @@ JoinFuzzer::PlanWithSplits makeDefaultPlan( probeKeys, buildKeys, PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - "" /*filter*/, - output, + /*filter=*/"", + outputColumns, joinType, nullAware) .planNode(); - return {plan, {}}; + return PlanWithSplits{plan}; } -JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan( +std::vector fromConnectorSplits( + std::vector> connectorSplits, + int32_t groupId = -1) { + std::vector splits; + splits.reserve(connectorSplits.size()); + for (auto& connectorSplit : connectorSplits) { + splits.emplace_back(exec::Split{std::move(connectorSplit), groupId}); + } + return splits; +} + +JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( + const std::string& tableDir, core::JoinType joinType, bool nullAware, + const RowTypePtr& probeType, + const RowTypePtr& buildType, const std::vector& probeKeys, const std::vector& buildKeys, - const RowTypePtr& probeInputType, - const std::vector>& - probeSplits, - const RowTypePtr& buildInputType, - const std::vector>& - buildSplits, - const std::vector& output) { + const std::vector>& probeSplits, + const std::vector>& buildSplits, + const std::vector& outputColumns) { auto planNodeIdGenerator = std::make_shared(); - core::PlanNodeId probeId; - core::PlanNodeId buildId; + core::PlanNodeId probeScanId; + core::PlanNodeId buildScanId; auto plan = PlanBuilder(planNodeIdGenerator) - .tableScan(probeInputType) - .capturePlanNodeId(probeId) + .tableScan(probeType) + .capturePlanNodeId(probeScanId) .hashJoin( probeKeys, buildKeys, PlanBuilder(planNodeIdGenerator) - .tableScan(buildInputType) - .capturePlanNodeId(buildId) + .tableScan(buildType) + .capturePlanNodeId(buildScanId) .planNode(), - "" /*filter*/, - output, + /*filter=*/"", + outputColumns, joinType, nullAware) .planNode(); - return {plan, {{probeId, probeSplits}, {buildId, buildSplits}}}; + return PlanWithSplits{ + plan, + probeScanId, + buildScanId, + {{probeScanId, fromConnectorSplits(probeSplits)}, + {buildScanId, fromConnectorSplits(buildSplits)}}}; +} + +JoinFuzzer::PlanWithSplits JoinFuzzer::makeGroupedExecutionPlanWithTableScan( + const JoinFuzzer::PlanWithSplits& planWithTableScan, + int32_t numGroups, + const std::vector& groupedProbeScanSplits, + const std::vector& groupedBuildScanSplits) { + return PlanWithSplits{ + planWithTableScan.plan, + planWithTableScan.probeScanId, + planWithTableScan.buildScanId, + {{planWithTableScan.probeScanId, groupedProbeScanSplits}, + {planWithTableScan.buildScanId, groupedBuildScanSplits}}, + core::ExecutionStrategy::kGrouped, + numGroups}; } std::vector makeSources( @@ -579,51 +725,49 @@ void makeAlternativePlans( // Flip join sides. if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { - plans.push_back({flippedPlan, {}}); + plans.push_back(JoinFuzzer::PlanWithSplits{flippedPlan}); } // Parallelize probe and build sides. - auto probeKeys = fieldNames(joinNode->leftKeys()); - auto buildKeys = fieldNames(joinNode->rightKeys()); + const auto probeKeys = fieldNames(joinNode->leftKeys()); + const auto buildKeys = fieldNames(joinNode->rightKeys()); auto planNodeIdGenerator = std::make_shared(); - plans.push_back( - {PlanBuilder(planNodeIdGenerator) - .localPartitionRoundRobin( - makeSources(probeInput, planNodeIdGenerator)) - .hashJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .localPartitionRoundRobin( - makeSources(buildInput, planNodeIdGenerator)) - .planNode(), - "" /*filter*/, - joinNode->outputType()->names(), - joinNode->joinType(), - joinNode->isNullAware()) - .planNode(), - {}}); + plans.push_back(JoinFuzzer::PlanWithSplits{ + PlanBuilder(planNodeIdGenerator) + .localPartitionRoundRobin( + makeSources(probeInput, planNodeIdGenerator)) + .hashJoin( + probeKeys, + buildKeys, + PlanBuilder(planNodeIdGenerator) + .localPartitionRoundRobin( + makeSources(buildInput, planNodeIdGenerator)) + .planNode(), + /*filter=*/"", + joinNode->outputType()->names(), + joinNode->joinType(), + joinNode->isNullAware()) + .planNode()}); // Use OrderBy + MergeJoin (if join type is inner or left). if (joinNode->isInnerJoin() || joinNode->isLeftJoin()) { planNodeIdGenerator->reset(); - plans.push_back( - {PlanBuilder(planNodeIdGenerator) - .values(probeInput) - .orderBy(probeKeys, false) - .mergeJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .values(buildInput) - .orderBy(buildKeys, false) - .planNode(), - "" /*filter*/, - asRowType(joinNode->outputType())->names(), - joinNode->joinType()) - .planNode(), - {}}); + plans.push_back({JoinFuzzer::PlanWithSplits{ + PlanBuilder(planNodeIdGenerator) + .values(probeInput) + .orderBy(probeKeys, false) + .mergeJoin( + probeKeys, + buildKeys, + PlanBuilder(planNodeIdGenerator) + .values(buildInput) + .orderBy(buildKeys, false) + .planNode(), + /*filter=*/"", + asRowType(joinNode->outputType())->names(), + joinNode->joinType()) + .planNode()}}); } } @@ -715,7 +859,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { const auto numKeys = nullAware ? 1 : randInt(1, 5); // Pick number and types of join keys. - std::vector keyTypes = generateJoinKeyTypes(numKeys); + const std::vector keyTypes = generateJoinKeyTypes(numKeys); std::vector probeKeys = makeNames("t", keyTypes.size()); std::vector buildKeys = makeNames("u", keyTypes.size()); @@ -738,7 +882,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { } } - auto output = + auto outputColumns = (core::isLeftSemiProjectJoin(joinType) || core::isLeftSemiFilterJoin(joinType) || core::isAntiJoin(joinType)) ? asRowType(probeInput[0]->type())->names() @@ -747,36 +891,36 @@ void JoinFuzzer::verify(core::JoinType joinType) { ->names(); // Shuffle output columns. - std::shuffle(output.begin(), output.end(), rng_); + std::shuffle(outputColumns.begin(), outputColumns.end(), rng_); // Remove some output columns. - auto numOutput = randInt(1, output.size()); - output.resize(numOutput); + const auto numOutput = randInt(1, outputColumns.size()); + outputColumns.resize(numOutput); if (core::isLeftSemiProjectJoin(joinType) || core::isRightSemiProjectJoin(joinType)) { - output.push_back("match"); + outputColumns.push_back("match"); } shuffleJoinKeys(probeKeys, buildKeys); - auto plan = makeDefaultPlan( + const auto defaultPlan = makeDefaultPlan( joinType, nullAware, probeKeys, buildKeys, probeInput, buildInput, - output); + outputColumns); - auto expected = execute(plan, false /*injectSpill*/); + const auto expected = execute(defaultPlan, /*injectSpill=*/false); // Verify results against DuckDB. if (auto duckDbResult = - computeDuckDbResult(probeInput, buildInput, plan.plan)) { + computeDuckDbResult(probeInput, buildInput, defaultPlan.plan)) { VELOX_CHECK( assertEqualResults( - duckDbResult.value(), plan.plan->outputType(), {expected}), + duckDbResult.value(), defaultPlan.plan->outputType(), {expected}), "Velox and DuckDB results don't match"); } @@ -788,58 +932,23 @@ void JoinFuzzer::verify(core::JoinType joinType) { buildKeys, flatProbeInput, flatBuildInput, - output)); - makeAlternativePlans(plan.plan, probeInput, buildInput, altPlans); - makeAlternativePlans(plan.plan, flatProbeInput, flatBuildInput, altPlans); - - auto directory = exec::test::TempDirectoryPath::create(); - - if (isTableScanSupported(probeInput[0]->type()) && - isTableScanSupported(buildInput[0]->type())) { - auto writerPool = rootPool_->addAggregateChild("writer"); - - std::vector> probeSplits; - for (auto i = 0; i < probeInput.size(); ++i) { - const std::string filePath = - fmt::format("{}/probe{}", directory->path, i); - writeToFile(filePath, probeInput[i], writerPool.get()); - probeSplits.push_back(makeSplit(filePath)); - } - - std::vector> buildSplits; - for (auto i = 0; i < buildInput.size(); ++i) { - const std::string filePath = - fmt::format("{}/build{}", directory->path, i); - writeToFile(filePath, buildInput[i], writerPool.get()); - buildSplits.push_back(makeSplit(filePath)); - } + outputColumns)); - auto tableScanPlan = makeDefaultPlanWithTableScan( - joinType, - nullAware, - probeKeys, - buildKeys, - asRowType(probeInput[0]->type()), - probeSplits, - asRowType(buildInput[0]->type()), - buildSplits, - output); - - altPlans.push_back(tableScanPlan); - - auto joinNode = - std::dynamic_pointer_cast(tableScanPlan.plan); - VELOX_CHECK_NOT_NULL(joinNode); - - // Flip join sides. - if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { - altPlans.push_back({flippedPlan, tableScanPlan.splits}); - } - } + const auto tableScanDir = exec::test::TempDirectoryPath::create(); + addPlansWithTableScan( + tableScanDir->path, + joinType, + nullAware, + probeKeys, + buildKeys, + flatProbeInput, + flatBuildInput, + outputColumns, + altPlans); for (auto i = 0; i < altPlans.size(); ++i) { LOG(INFO) << "Testing plan #" << i; - auto actual = execute(altPlans[i], false /*injectSpill*/); + auto actual = execute(altPlans[i], /*injectSpill=*/false); VELOX_CHECK( assertEqualResults({expected}, {actual}), "Logically equivalent plans produced different results"); @@ -854,7 +963,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { } LOG(INFO) << "Testing plan #" << i << " with spilling"; - actual = execute(altPlans[i], true /*injectSpill*/); + actual = execute(altPlans[i], /*=injectSpill=*/true); VELOX_CHECK( assertEqualResults({expected}, {actual}), "Logically equivalent plans produced different results"); @@ -862,12 +971,189 @@ void JoinFuzzer::verify(core::JoinType joinType) { } } +void JoinFuzzer::addPlansWithTableScan( + const std::string& tableDir, + core::JoinType joinType, + bool nullAware, + const std::vector& probeKeys, + const std::vector& buildKeys, + const std::vector& probeInput, + const std::vector& buildInput, + const std::vector& outputColumns, + std::vector& altPlans) { + VELOX_CHECK(!tableDir.empty()); + + if (!isTableScanSupported(probeInput[0]->type()) || + !isTableScanSupported(buildInput[0]->type())) { + return; + } + + std::vector> probeScanSplits; + for (auto i = 0; i < probeInput.size(); ++i) { + const std::string filePath = fmt::format("{}/probe{}", tableDir, i); + writeToFile(filePath, probeInput[i], writerPool_.get()); + probeScanSplits.push_back(makeSplit(filePath)); + } + + std::vector> buildScanSplits; + for (auto i = 0; i < buildInput.size(); ++i) { + const std::string filePath = fmt::format("{}/build{}", tableDir, i); + writeToFile(filePath, buildInput[i], writerPool_.get()); + buildScanSplits.push_back(makeSplit(filePath)); + } + + std::vector plansWithTableScan; + auto defaultPlan = makeDefaultPlanWithTableScan( + tableDir, + joinType, + nullAware, + asRowType(probeInput[0]->type()), + asRowType(buildInput[0]->type()), + probeKeys, + buildKeys, + probeScanSplits, + buildScanSplits, + outputColumns); + plansWithTableScan.push_back(defaultPlan); + + auto joinNode = + std::dynamic_pointer_cast(defaultPlan.plan); + VELOX_CHECK_NOT_NULL(joinNode); + + // Flip join sides. + if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { + plansWithTableScan.push_back(PlanWithSplits{ + flippedPlan, + defaultPlan.probeScanId, + defaultPlan.buildScanId, + defaultPlan.splits, + core::ExecutionStrategy::kUngrouped, + 0}); + } + + const int32_t numGroups = randInt(1, probeScanSplits.size()); + const std::vector groupedProbeScanSplits = + generateSplitsWithGroup( + tableDir, numGroups, /*isProbe=*/true, probeKeys.size(), probeInput); + const std::vector groupedBuildScanSplits = + generateSplitsWithGroup( + tableDir, numGroups, /*isProbe=*/false, buildKeys.size(), buildInput); + + for (const auto& planWithTableScan : plansWithTableScan) { + altPlans.push_back(planWithTableScan); + altPlans.push_back(makeGroupedExecutionPlanWithTableScan( + planWithTableScan, + numGroups, + groupedProbeScanSplits, + groupedBuildScanSplits)); + } +} + +std::vector JoinFuzzer::generateSplitsWithGroup( + const std::string& tableDir, + int32_t numGroups, + bool isProbe, + size_t numKeys, + const std::vector& input) { + const std::vector> inputVectorsByGroup = + splitInputByGroup(numGroups, numKeys, input); + + std::vector splitsWithGroup; + for (int32_t groupId = 0; groupId < numGroups; ++groupId) { + for (auto i = 0; i < inputVectorsByGroup[groupId].size(); ++i) { + const std::string filePath = fmt::format( + "{}/grouped[{}].{}.{}", + tableDir, + groupId, + isProbe ? "probe" : "build", + i); + writeToFile(filePath, inputVectorsByGroup[groupId][i], writerPool_.get()); + splitsWithGroup.push_back(exec::Split{makeSplit(filePath), groupId}); + } + splitsWithGroup.push_back(exec::Split{nullptr, groupId}); + } + return splitsWithGroup; +} + +std::vector> JoinFuzzer::splitInputByGroup( + int32_t numGroups, + size_t numKeys, + const std::vector& inputs) { + if (numGroups == 1) { + return {inputs}; + } + + // Partition 'input' based on the join keys for group execution with one + // partition per each group. + const RowTypePtr& inputType = asRowType(inputs[0]->type()); + std::vector partitionChannels(numKeys); + std::iota(partitionChannels.begin(), partitionChannels.end(), 0); + std::vector> hashers; + hashers.reserve(numKeys); + for (auto channel : partitionChannels) { + hashers.emplace_back( + exec::VectorHasher::create(inputType->childAt(channel), channel)); + } + + std::vector> inputsByGroup{ + static_cast(numGroups)}; + raw_vector groupHashes; + std::vector groupRows(numGroups); + std::vector rawGroupRows(numGroups); + std::vector groupSizes(numGroups, 0); + SelectivityVector inputRows; + + for (const auto& input : inputs) { + const int numRows = input->size(); + inputRows.resize(numRows); + inputRows.setAll(); + groupHashes.resize(numRows); + std::fill(groupSizes.begin(), groupSizes.end(), 0); + std::fill(groupHashes.begin(), groupHashes.end(), 0); + + for (auto i = 0; i < hashers.size(); ++i) { + auto& hasher = hashers[i]; + auto* keyVector = input->childAt(hashers[i]->channel())->loadedVector(); + hashers[i]->decode(*keyVector, inputRows); + if (hasher->channel() != kConstantChannel) { + hashers[i]->hash(inputRows, i > 0, groupHashes); + } else { + hashers[i]->hashPrecomputed(inputRows, i > 0, groupHashes); + } + } + + for (int row = 0; row < numRows; ++row) { + const int32_t groupId = groupHashes[row] % numGroups; + if (groupRows[groupId] == nullptr || + (groupRows[groupId]->capacity() < numRows * sizeof(vector_size_t))) { + groupRows[groupId] = allocateIndices(numRows, pool_.get()); + rawGroupRows[groupId] = groupRows[groupId]->asMutable(); + } + rawGroupRows[groupId][groupSizes[groupId]++] = row; + } + + for (int32_t groupId = 0; groupId < numGroups; ++groupId) { + const size_t groupSize = groupSizes[groupId]; + if (groupSize != 0) { + VELOX_CHECK_NOT_NULL(groupRows[groupId]); + groupRows[groupId]->setSize( + groupSizes[groupId] * sizeof(vector_size_t)); + inputsByGroup[groupId].push_back( + (groupSize == numRows) + ? input + : exec::wrap(groupSize, std::move(groupRows[groupId]), input)); + } + } + } + return inputsByGroup; +} + void JoinFuzzer::go() { VELOX_CHECK( FLAGS_steps > 0 || FLAGS_duration_sec > 0, "Either --steps or --duration_sec needs to be greater than zero.") - auto startTime = std::chrono::system_clock::now(); + const auto startTime = std::chrono::system_clock::now(); size_t iteration = 0; while (!isDone(iteration, startTime)) { @@ -875,7 +1161,7 @@ void JoinFuzzer::go() { << iteration << " (seed: " << currentSeed_ << ")"; // Pick join type. - auto joinType = pickJoinType(); + const auto joinType = pickJoinType(); verify(joinType); diff --git a/velox/exec/tests/JoinFuzzerRunner.h b/velox/exec/tests/JoinFuzzerRunner.h index 15d388915a8f..524240fe58ee 100644 --- a/velox/exec/tests/JoinFuzzerRunner.h +++ b/velox/exec/tests/JoinFuzzerRunner.h @@ -18,6 +18,8 @@ #include #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/SharedArbitrator.h" +#include "velox/exec/MemoryReclaimer.h" #include "velox/exec/tests/JoinFuzzer.h" #include "velox/serializers/PrestoSerializer.h" @@ -56,7 +58,7 @@ class JoinFuzzerRunner { public: static int run(size_t seed) { - facebook::velox::memory::MemoryManager::initialize({}); + setupMemory(); facebook::velox::serializer::presto::PrestoVectorSerde:: registerVectorSerde(); facebook::velox::filesystems::registerLocalFileSystem(); @@ -64,4 +66,20 @@ class JoinFuzzerRunner { facebook::velox::exec::test::joinFuzzer(seed); return RUN_ALL_TESTS(); } + + private: + // Invoked to set up memory system with arbitration. + static void setupMemory() { + FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true; + FLAGS_velox_memory_leak_check_enabled = true; + facebook::velox::memory::SharedArbitrator::registerFactory(); + facebook::velox::memory::MemoryManagerOptions options; + options.allocatorCapacity = 8L << 30; + options.arbitratorCapacity = 6L << 30; + options.arbitratorKind = "SHARED"; + options.checkUsageLeak = true; + options.arbitrationStateCheckCb = + facebook::velox::exec::memoryArbitrationStateCheck; + facebook::velox::memory::MemoryManager::initialize(options); + } }; diff --git a/velox/exec/tests/utils/AssertQueryBuilder.h b/velox/exec/tests/utils/AssertQueryBuilder.h index 863dc6867bb9..be7bbd387168 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.h +++ b/velox/exec/tests/utils/AssertQueryBuilder.h @@ -118,7 +118,31 @@ class AssertQueryBuilder { return *this; } - // Methods to run the query and verify the results. + /// Methods to configure the group execution mode. + AssertQueryBuilder& executionStrategy( + core::ExecutionStrategy executionStrategy) { + params_.executionStrategy = executionStrategy; + return *this; + } + + AssertQueryBuilder& numSplitGroups(int numSplitGroups) { + params_.numSplitGroups = numSplitGroups; + return *this; + } + + AssertQueryBuilder& numConcurrentSplitGroups( + int32_t numConcurrentSplitGroups) { + params_.numConcurrentSplitGroups = numConcurrentSplitGroups; + return *this; + } + + AssertQueryBuilder& groupedExecutionLeafNodeIds( + const std::unordered_set& groupedExecutionLeafNodeIds) { + params_.groupedExecutionLeafNodeIds = groupedExecutionLeafNodeIds; + return *this; + } + + /// Methods to run the query and verify the results. /// Run the query and verify results against DuckDB. Requires /// duckDbQueryRunner to be provided in the constructor. @@ -158,7 +182,7 @@ class AssertQueryBuilder { // Used by the created task as the default driver executor. std::unique_ptr executor_{ new folly::CPUThreadPoolExecutor(std::thread::hardware_concurrency())}; - DuckDbQueryRunner* FOLLY_NULLABLE const duckDbQueryRunner_; + DuckDbQueryRunner* const duckDbQueryRunner_; CursorParameters params_; std::unordered_map configs_; std::unordered_map> diff --git a/velox/exec/tests/utils/Cursor.h b/velox/exec/tests/utils/Cursor.h index fe1c3e17c04a..6f4a0de7d2a9 100644 --- a/velox/exec/tests/utils/Cursor.h +++ b/velox/exec/tests/utils/Cursor.h @@ -46,15 +46,15 @@ struct CursorParameters { uint64_t bufferedBytes = 512 * 1024; - // Ungrouped (by default) or grouped (bucketed) execution. + /// Ungrouped (by default) or grouped (bucketed) execution. core::ExecutionStrategy executionStrategy{ core::ExecutionStrategy::kUngrouped}; /// Contains leaf plan nodes that need to be executed in the grouped mode. std::unordered_set groupedExecutionLeafNodeIds; - // Number of splits groups the task will be processing. Must be 1 for - // ungrouped execution. + /// Number of splits groups the task will be processing. Must be 1 for + /// ungrouped execution. int numSplitGroups{1}; /// Spilling directory, if not empty, then the task's spilling directory would