From aeb768f78ee5f1415cbef3e2bc189395405e6654 Mon Sep 17 00:00:00 2001 From: xiaoxmeng Date: Tue, 12 Mar 2024 15:46:33 -0700 Subject: [PATCH] Extend join fuzzer to cover group execution mode Extend join fuzzer to cover group execution mode. To do this we split the probe and build input by partitioning on the join keys with one partition per each group. Then we write the split the inputs into separate files and create table scan splits from the generated files. For each existing query plan with table scan input, we create a corresponding grouped execution plan. Some refactors of join fuzzer by moving the test iteration settings into JoinFuzzer class to ease implementation. Extend AssertQueryBuilder to support group execution configuration. --- velox/exec/HashBuild.cpp | 11 +- velox/exec/tests/JoinFuzzer.cpp | 694 +++++++++++++------- velox/exec/tests/JoinFuzzerRunner.h | 20 +- velox/exec/tests/utils/AssertQueryBuilder.h | 28 +- velox/exec/tests/utils/Cursor.h | 6 +- 5 files changed, 503 insertions(+), 256 deletions(-) diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index d2f8c6a98abe6..be8855962b4f8 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -500,18 +500,17 @@ void HashBuild::spillInput(const RowVectorPtr& input) { computeSpillPartitions(input); vector_size_t numSpillInputs = 0; - for (auto rowIdx = 0; rowIdx < numInput; ++rowIdx) { - const auto partition = spillPartitions_[rowIdx]; - if (FOLLY_UNLIKELY(!activeRows_.isValid(rowIdx))) { + for (auto row = 0; row < numInput; ++row) { + const auto partition = spillPartitions_[row]; + if (FOLLY_UNLIKELY(!activeRows_.isValid(row))) { continue; } if (!spiller_->isSpilled(partition)) { continue; } - activeRows_.setValid(rowIdx, false); + activeRows_.setValid(row, false); ++numSpillInputs; - rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] = - rowIdx; + rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] = row; } if (numSpillInputs == 0) { return; diff --git a/velox/exec/tests/JoinFuzzer.cpp b/velox/exec/tests/JoinFuzzer.cpp index 4d1b04255af68..1efba1810c1c8 100644 --- a/velox/exec/tests/JoinFuzzer.cpp +++ b/velox/exec/tests/JoinFuzzer.cpp @@ -18,8 +18,10 @@ #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/PartitionIdGenerator.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/OperatorUtils.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -38,7 +40,7 @@ DEFINE_int32( 100, "The number of elements on each generated vector."); -DEFINE_int32(num_batches, 5, "The number of generated vectors."); +DEFINE_int32(num_batches, 10, "The number of generated vectors."); DEFINE_double( null_ratio, @@ -60,10 +62,10 @@ class JoinFuzzer { struct PlanWithSplits { core::PlanNodePtr plan; - std::unordered_map< - core::PlanNodeId, - std::vector>> + std::unordered_map> splits; + core::ExecutionStrategy executionStrategy{ + core::ExecutionStrategy::kUngrouped}; }; private: @@ -76,6 +78,12 @@ class JoinFuzzer { return opts; } + static inline const std::string kHiveConnectorId = "test-hive"; + + // Makes a connector split from a file path on storage. + static std::shared_ptr makeSplit( + const std::string& filePath); + void seed(size_t seed) { currentSeed_ = seed; vectorFuzzer_.reSeed(seed); @@ -86,55 +94,116 @@ class JoinFuzzer { seed(rng_()); } - /// Randomly pick a join type to test. - core::JoinType pickJoinType(); + // Randomly pick a join type to test. + void pickJoinType(); + + // Makes the query plan with default settings in JoinFuzzer and value inputs + // for both probe and build sides. + // + // NOTE: 'probeInput' and 'buildInput' could either input rows with lazy + // vectors or flatten ones. + JoinFuzzer::PlanWithSplits makeDefaultPlan( + const std::vector& probeInput, + const std::vector& buildInput); - void verify(core::JoinType joinType); + // Makes the default query plan with table scan as inputs for both probe and + // build sides. + JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan(); - /// Returns a list of randomly generated join key types. + // Makes the query plan from 'planWithTableScan' with grouped execution mode. + // Correspondingly, it replaces the table scan input splits with grouped ones. + JoinFuzzer::PlanWithSplits makeGroupedExecutionPlanWithTableScan( + const JoinFuzzer::PlanWithSplits& planWithTableScan); + + // Runs one test iteration from query plans generations, executions and result + // verifications. + void verify(); + + // Cleanup the test settings at the end of one test iteration. + void cleanup(); + + // Generates inputs for join plan generation including join keys, probe/build + // input types and input data in vectors. + void generateJoinPlanInputs(); + + // Returns a list of randomly generated join key types. std::vector generateJoinKeyTypes(int32_t numKeys); - /// Returns randomly generated probe input with upto 3 additional payload - /// columns. - std::vector generateProbeInput( - const std::vector& keyNames, - const std::vector& keyTypes); + // Returns randomly generated probe input with upto 3 additional payload + // columns. + void generateProbeInput(); - /// Same as generateProbeInput() but copies over 10% of the input in the probe - /// columns to ensure some matches during joining. Also generates an empty - /// input with a 10% chance. - std::vector generateBuildInput( - const std::vector& probeInput, - const std::vector& probeKeys, - const std::vector& buildKeys); + // Same as generateProbeInput() but copies over 10% of the input in the probe + // columns to ensure some matches during joining. Also generates an empty + // input with a 10% chance. + void generateBuildInput(); + + void shuffleJoinKeys(); - void shuffleJoinKeys( - std::vector& probeKeys, - std::vector& buildKeys); + void addPlansWithTableScan(std::vector& altPlans); + + // Generates the splits for group execution plans. The function splits the + // inputs into a number of groups by partitioning on the join keys. It then + // writes the partitioned inputs into separate files to generate grouped + // splits. + void generateTableScanSplitsWithGroup(); + + // Splits the input into groups by partitioning on the join keys. If 'isProbe' + // is true, this splits the input for probe side, otherwise for build. + std::vector> splitInputByGroup(bool isProbe); + + // Generates the grouped splits. If 'isProbe' is true, this generates grouped + // splits for probe side, otherwise for build. + void generateSplitsWithGroup(bool isProbe); RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); std::optional computeDuckDbResult( - const std::vector& probeInput, - const std::vector& buildInput, const core::PlanNodePtr& plan); int32_t randInt(int32_t min, int32_t max) { return boost::random::uniform_int_distribution(min, max)(rng_); } - static inline const std::string kHiveConnectorId = "test-hive"; - - static std::shared_ptr makeSplit( - const std::string& filePath); - FuzzerGenerator rng_; size_t currentSeed_{0}; std::shared_ptr rootPool_{ - memory::memoryManager()->addRootPool()}; - std::shared_ptr pool_{rootPool_->addLeafChild("leaf")}; + memory::memoryManager()->addRootPool( + "joinFuzzer", + memory::kMaxMemory, + memory::MemoryReclaimer::create())}; + std::shared_ptr pool_{rootPool_->addLeafChild( + "joinFuzzerLeaf", + true, + exec::MemoryReclaimer::create())}; + std::shared_ptr writerPool_{rootPool_->addAggregateChild( + "joinFuzzerWriter", + exec::MemoryReclaimer::create())}; + VectorFuzzer vectorFuzzer_; + + // The following fields are reset cross test iterations. + core::JoinType joinType_; + bool nullAware_; + std::vector keyTypes_; + std::vector probeKeyNames_; + std::vector buildKeyNames_; + RowTypePtr probeInputType_; + std::vector probeInputVectors_; + std::vector flatProbeInputVectors_; + std::vector> probeScanSplits_; + std::vector probeScanSplitsWithGroup_; + core::PlanNodeId probeScanNodeId_; + RowTypePtr buildInputType_; + std::vector buildInputVectors_; + std::vector flatBuildInputVectors_; + std::vector> buildScanSplits_; + std::vector buildScanSplitsWithGroup_; + core::PlanNodeId buildScanNodeId_; + std::vector outputColumnNames_; + int32_t numGroups_{0}; + std::shared_ptr tableScanDir_; }; JoinFuzzer::JoinFuzzer(size_t initialSeed) @@ -164,7 +233,7 @@ bool isDone(size_t i, T startTime) { return i >= FLAGS_steps; } -core::JoinType JoinFuzzer::pickJoinType() { +void JoinFuzzer::pickJoinType() { // Right joins are tested by flipping sides of the left joins. static std::vector kJoinTypes = { core::JoinType::kInner, @@ -172,11 +241,10 @@ core::JoinType JoinFuzzer::pickJoinType() { core::JoinType::kFull, core::JoinType::kLeftSemiFilter, core::JoinType::kLeftSemiProject, - core::JoinType::kAnti, - }; + core::JoinType::kAnti}; - size_t idx = randInt(0, kJoinTypes.size() - 1); - return kJoinTypes[idx]; + const size_t idx = randInt(0, kJoinTypes.size() - 1); + joinType_ = kJoinTypes[idx]; } std::vector JoinFuzzer::generateJoinKeyTypes(int32_t numKeys) { @@ -189,50 +257,46 @@ std::vector JoinFuzzer::generateJoinKeyTypes(int32_t numKeys) { return types; } -std::vector JoinFuzzer::generateProbeInput( - const std::vector& keyNames, - const std::vector& keyTypes) { - std::vector names = keyNames; - std::vector types = keyTypes; +void JoinFuzzer::generateProbeInput() { + std::vector names = probeKeyNames_; + std::vector types = keyTypes_; // Add up to 3 payload columns. - auto numPayload = randInt(0, 3); + const auto numPayload = randInt(0, 3); for (auto i = 0; i < numPayload; ++i) { - names.push_back(fmt::format("tp{}", i + keyNames.size())); - types.push_back(vectorFuzzer_.randType(2 /*maxDepth*/)); + names.push_back(fmt::format("tp{}", i + keyTypes_.size())); + types.push_back(vectorFuzzer_.randType(/*maxDepth=*/2)); } - auto inputType = ROW(std::move(names), std::move(types)); - std::vector input; + probeInputType_ = ROW(std::move(names), std::move(types)); + VELOX_CHECK(probeInputVectors_.empty()); for (auto i = 0; i < FLAGS_num_batches; ++i) { - input.push_back(vectorFuzzer_.fuzzInputRow(inputType)); + probeInputVectors_.push_back(vectorFuzzer_.fuzzInputRow(probeInputType_)); } - return input; } -std::vector JoinFuzzer::generateBuildInput( - const std::vector& probeInput, - const std::vector& probeKeys, - const std::vector& buildKeys) { - std::vector names = buildKeys; - std::vector types; - for (const auto& key : probeKeys) { - types.push_back(asRowType(probeInput[0]->type())->findChild(key)); - } +void JoinFuzzer::generateBuildInput() { + std::vector names = buildKeyNames_; + std::vector types = keyTypes_; // Add up to 3 payload columns. - auto numPayload = randInt(0, 3); - for (auto i = 0; i < numPayload; ++i) { - names.push_back(fmt::format("bp{}", i + buildKeys.size())); - types.push_back(vectorFuzzer_.randType(2 /*maxDepth*/)); + auto numPayloadColumns = randInt(0, 3); + for (auto i = 0; i < numPayloadColumns; ++i) { + names.push_back(fmt::format("bp{}", i + keyTypes_.size())); + types.push_back(vectorFuzzer_.randType(/*maxDepth=*/2)); } - auto rowType = ROW(std::move(names), std::move(types)); + buildInputType_ = ROW(std::move(names), std::move(types)); + + VELOX_CHECK(buildInputVectors_.empty()); // 1 in 10 times use empty build. + // // TODO Use non-empty build with no matches sometimes. if (vectorFuzzer_.coinToss(0.1)) { - return {BaseVector::create(rowType, 0, pool_.get())}; + buildInputVectors_ = { + BaseVector::create(buildInputType_, 0, pool_.get())}; + return; } // TODO Remove the assumption that probeKeys are the first columns in @@ -240,11 +304,12 @@ std::vector JoinFuzzer::generateBuildInput( // To ensure there are some matches, sample with replacement 10% of probe join // keys and use these as build keys. + // // TODO Add a few random rows as well. - std::vector input; - for (const auto& probe : probeInput) { - auto numRows = 1 + probe->size() / 10; - auto build = BaseVector::create(rowType, numRows, probe->pool()); + for (const auto& probe : probeInputVectors_) { + const auto numRows = 1 + probe->size() / 10; + auto buildInput = + BaseVector::create(buildInputType_, numRows, pool_.get()); // Pick probe side rows to copy. std::vector rowNumbers(numRows); @@ -253,21 +318,20 @@ std::vector JoinFuzzer::generateBuildInput( } SelectivityVector rows(numRows); - for (auto i = 0; i < probeKeys.size(); ++i) { - build->childAt(i)->resize(numRows); - build->childAt(i)->copy(probe->childAt(i).get(), rows, rowNumbers.data()); + for (auto i = 0; i < keyTypes_.size(); ++i) { + buildInput->childAt(i)->resize(numRows); + buildInput->childAt(i)->copy( + probe->childAt(i).get(), rows, rowNumbers.data()); } - for (auto i = 0; i < numPayload; ++i) { - auto column = i + probeKeys.size(); - build->childAt(column) = - vectorFuzzer_.fuzz(rowType->childAt(column), numRows); + for (auto i = 0; i < numPayloadColumns; ++i) { + const auto columnChannel = i + keyTypes_.size(); + buildInput->childAt(columnChannel) = + vectorFuzzer_.fuzz(buildInputType_->childAt(columnChannel), numRows); } - input.push_back(build); + buildInputVectors_.push_back(buildInput); } - - return input; } std::vector flatten(const std::vector& vectors) { @@ -283,12 +347,25 @@ std::vector flatten(const std::vector& vectors) { } RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { - LOG(INFO) << "Executing query plan: " << std::endl + LOG(INFO) << "Executing query plan with " + << executionStrategyToString(plan.executionStrategy) << " strategy[" + << (plan.executionStrategy == core::ExecutionStrategy::kGrouped + ? numGroups_ + : 0) + << " groups]" << (injectSpill ? " and spilling injection" : "") + << ": " << std::endl << plan.plan->toString(true, true); AssertQueryBuilder builder(plan.plan); - for (const auto& [nodeId, nodeSplits] : plan.splits) { - builder.splits(nodeId, nodeSplits); + for (const auto& [planNodeId, nodeSplits] : plan.splits) { + builder.splits(planNodeId, nodeSplits); + } + + if (plan.executionStrategy == core::ExecutionStrategy::kGrouped) { + builder.executionStrategy(core::ExecutionStrategy::kGrouped); + builder.groupedExecutionLeafNodeIds({probeScanNodeId_, buildScanNodeId_}); + builder.numSplitGroups(numGroups_); + builder.numConcurrentSplitGroups(1); } std::shared_ptr spillDirectory; @@ -302,11 +379,15 @@ RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { } TestScopedSpillInjection scopedSpillInjection(spillPct); - auto result = builder.maxDrivers(2).copyResults(pool_.get()); + const auto result = builder.maxDrivers(2).copyResults(pool_.get()); LOG(INFO) << "Results: " << result->toString(); if (VLOG_IS_ON(1)) { VLOG(1) << std::endl << result->toString(0, result->size()); } + // Wait for the task to be destroyed before start next query execution to + // avoid the potential interference of the background activities across query + // executions. + waitForAllTasksToBeDeleted(); return result; } @@ -389,25 +470,23 @@ bool containsUnsupportedTypes(const TypePtr& type) { } std::optional JoinFuzzer::computeDuckDbResult( - const std::vector& probeInput, - const std::vector& buildInput, const core::PlanNodePtr& plan) { - if (containsUnsupportedTypes(probeInput[0]->type())) { + if (containsUnsupportedTypes(probeInputVectors_[0]->type())) { return std::nullopt; } - if (containsUnsupportedTypes(buildInput[0]->type())) { + if (containsUnsupportedTypes(buildInputVectors_[0]->type())) { return std::nullopt; } DuckDbQueryRunner queryRunner; - queryRunner.createTable("t", probeInput); - queryRunner.createTable("u", buildInput); + queryRunner.createTable("t", probeInputVectors_); + queryRunner.createTable("u", buildInputVectors_); - auto joinNode = dynamic_cast(plan.get()); + auto* joinNode = dynamic_cast(plan.get()); VELOX_CHECK_NOT_NULL(joinNode); - auto joinKeysToSql = [](auto keys) { + const auto joinKeysToSql = [](auto keys) { std::stringstream out; for (auto i = 0; i < keys.size(); ++i) { if (i > 0) { @@ -418,7 +497,7 @@ std::optional JoinFuzzer::computeDuckDbResult( return out.str(); }; - auto equiClausesToSql = [](auto joinNode) { + const auto equiClausesToSql = [](auto joinNode) { std::stringstream out; for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { if (i > 0) { @@ -478,7 +557,8 @@ std::optional JoinFuzzer::computeDuckDbResult( } break; default: - VELOX_UNREACHABLE(); + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); } return queryRunner.execute(sql.str(), plan->outputType()); @@ -494,61 +574,66 @@ std::vector fieldNames( return names; } -JoinFuzzer::PlanWithSplits makeDefaultPlan( - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, +JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& output) { + const std::vector& buildInput) { auto planNodeIdGenerator = std::make_shared(); auto plan = PlanBuilder(planNodeIdGenerator) .values(probeInput) .hashJoin( - probeKeys, - buildKeys, + probeKeyNames_, + buildKeyNames_, PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - "" /*filter*/, - output, - joinType, - nullAware) + /*filter=*/"", + outputColumnNames_, + joinType_, + nullAware_) .planNode(); return {plan, {}}; } -JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan( - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const RowTypePtr& probeInputType, - const std::vector>& - probeSplits, - const RowTypePtr& buildInputType, - const std::vector>& - buildSplits, - const std::vector& output) { +std::vector fromConnectorSplits( + std::vector> connectorSplits, + int32_t groupId = -1) { + std::vector splits; + splits.reserve(connectorSplits.size()); + for (auto& connectorSplit : connectorSplits) { + splits.emplace_back(exec::Split{std::move(connectorSplit), groupId}); + } + return splits; +} + +JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan() { auto planNodeIdGenerator = std::make_shared(); - core::PlanNodeId probeId; - core::PlanNodeId buildId; auto plan = PlanBuilder(planNodeIdGenerator) - .tableScan(probeInputType) - .capturePlanNodeId(probeId) + .tableScan(probeInputType_) + .capturePlanNodeId(probeScanNodeId_) .hashJoin( - probeKeys, - buildKeys, + probeKeyNames_, + buildKeyNames_, PlanBuilder(planNodeIdGenerator) - .tableScan(buildInputType) - .capturePlanNodeId(buildId) + .tableScan(buildInputType_) + .capturePlanNodeId(buildScanNodeId_) .planNode(), - "" /*filter*/, - output, - joinType, - nullAware) + /*filter=*/"", + outputColumnNames_, + joinType_, + nullAware_) .planNode(); - return {plan, {{probeId, probeSplits}, {buildId, buildSplits}}}; + return { + plan, + {{probeScanNodeId_, fromConnectorSplits(probeScanSplits_)}, + {buildScanNodeId_, fromConnectorSplits(buildScanSplits_)}}}; +} + +JoinFuzzer::PlanWithSplits JoinFuzzer::makeGroupedExecutionPlanWithTableScan( + const JoinFuzzer::PlanWithSplits& planWithTableScan) { + return { + planWithTableScan.plan, + {{probeScanNodeId_, probeScanSplitsWithGroup_}, + {buildScanNodeId_, buildScanSplitsWithGroup_}}, + core::ExecutionStrategy::kGrouped}; } std::vector makeSources( @@ -583,8 +668,8 @@ void makeAlternativePlans( } // Parallelize probe and build sides. - auto probeKeys = fieldNames(joinNode->leftKeys()); - auto buildKeys = fieldNames(joinNode->rightKeys()); + const auto probeKeys = fieldNames(joinNode->leftKeys()); + const auto buildKeys = fieldNames(joinNode->rightKeys()); auto planNodeIdGenerator = std::make_shared(); plans.push_back( @@ -598,7 +683,7 @@ void makeAlternativePlans( .localPartitionRoundRobin( makeSources(buildInput, planNodeIdGenerator)) .planNode(), - "" /*filter*/, + /*filter=*/"", joinNode->outputType()->names(), joinNode->joinType(), joinNode->isNullAware()) @@ -619,7 +704,7 @@ void makeAlternativePlans( .values(buildInput) .orderBy(buildKeys, false) .planNode(), - "" /*filter*/, + /*filter=*/"", asRowType(joinNode->outputType())->names(), joinNode->joinType()) .planNode(), @@ -636,10 +721,10 @@ std::vector makeNames(const std::string& prefix, size_t n) { return names; } -void JoinFuzzer::shuffleJoinKeys( - std::vector& probeKeys, - std::vector& buildKeys) { - auto numKeys = probeKeys.size(); +void JoinFuzzer::shuffleJoinKeys() { + VELOX_CHECK(!keyTypes_.empty()); + + const auto numKeys = keyTypes_.size(); if (numKeys == 1) { return; } @@ -648,12 +733,12 @@ void JoinFuzzer::shuffleJoinKeys( std::iota(columnIndices.begin(), columnIndices.end(), 0); std::shuffle(columnIndices.begin(), columnIndices.end(), rng_); - auto copyProbeKeys = probeKeys; - auto copyBuildKeys = buildKeys; + const auto copyProbeKeys = probeKeyNames_; + const auto copyBuildKeys = buildKeyNames_; for (auto i = 0; i < numKeys; ++i) { - probeKeys[i] = copyProbeKeys[columnIndices[i]]; - buildKeys[i] = copyBuildKeys[columnIndices[i]]; + probeKeyNames_[i] = copyProbeKeys[columnIndices[i]]; + buildKeyNames_[i] = copyBuildKeys[columnIndices[i]]; } } @@ -708,138 +793,89 @@ bool isTableScanSupported(const TypePtr& type) { return true; } -void JoinFuzzer::verify(core::JoinType joinType) { - const bool nullAware = - isNullAwareSupported(joinType) && vectorFuzzer_.coinToss(0.5); - - const auto numKeys = nullAware ? 1 : randInt(1, 5); +void JoinFuzzer::generateJoinPlanInputs() { + const auto numKeys = nullAware_ ? 1 : randInt(1, 5); // Pick number and types of join keys. - std::vector keyTypes = generateJoinKeyTypes(numKeys); - std::vector probeKeys = makeNames("t", keyTypes.size()); - std::vector buildKeys = makeNames("u", keyTypes.size()); + keyTypes_ = generateJoinKeyTypes(numKeys); + probeKeyNames_ = makeNames("t", keyTypes_.size()); + buildKeyNames_ = makeNames("u", keyTypes_.size()); - auto probeInput = generateProbeInput(probeKeys, keyTypes); - auto buildInput = generateBuildInput(probeInput, probeKeys, buildKeys); + generateProbeInput(); + generateBuildInput(); // Flatten inputs. - auto flatProbeInput = flatten(probeInput); - auto flatBuildInput = flatten(buildInput); + flatProbeInputVectors_ = flatten(probeInputVectors_); + flatBuildInputVectors_ = flatten(buildInputVectors_); if (VLOG_IS_ON(1)) { - VLOG(1) << "Probe input: " << probeInput[0]->toString(); - for (const auto& v : flatProbeInput) { + VLOG(1) << "Probe input: " << probeInputVectors_[0]->toString(); + for (const auto& v : flatProbeInputVectors_) { VLOG(1) << std::endl << v->toString(0, v->size()); } - VLOG(1) << "Build input: " << buildInput[0]->toString(); - for (const auto& v : flatBuildInput) { + VLOG(1) << "Build input: " << buildInputVectors_[0]->toString(); + for (const auto& v : flatBuildInputVectors_) { VLOG(1) << std::endl << v->toString(0, v->size()); } } - auto output = - (core::isLeftSemiProjectJoin(joinType) || - core::isLeftSemiFilterJoin(joinType) || core::isAntiJoin(joinType)) - ? asRowType(probeInput[0]->type())->names() - : concat( - asRowType(probeInput[0]->type()), asRowType(buildInput[0]->type())) - ->names(); + outputColumnNames_ = + (core::isLeftSemiProjectJoin(joinType_) || + core::isLeftSemiFilterJoin(joinType_) || core::isAntiJoin(joinType_)) + ? probeInputType_->names() + : concat(probeInputType_, buildInputType_)->names(); // Shuffle output columns. - std::shuffle(output.begin(), output.end(), rng_); + std::shuffle(outputColumnNames_.begin(), outputColumnNames_.end(), rng_); // Remove some output columns. - auto numOutput = randInt(1, output.size()); - output.resize(numOutput); + const auto numOutput = randInt(1, outputColumnNames_.size()); + // CHECK(numOutput > 0); + outputColumnNames_.resize(numOutput); - if (core::isLeftSemiProjectJoin(joinType) || - core::isRightSemiProjectJoin(joinType)) { - output.push_back("match"); + if (core::isLeftSemiProjectJoin(joinType_) || + core::isRightSemiProjectJoin(joinType_)) { + outputColumnNames_.push_back("match"); } - shuffleJoinKeys(probeKeys, buildKeys); + shuffleJoinKeys(); +} + +void JoinFuzzer::verify() { + nullAware_ = isNullAwareSupported(joinType_) && vectorFuzzer_.coinToss(0.5); - auto plan = makeDefaultPlan( - joinType, - nullAware, - probeKeys, - buildKeys, - probeInput, - buildInput, - output); + generateJoinPlanInputs(); - auto expected = execute(plan, false /*injectSpill*/); + const auto defaultPlan = + makeDefaultPlan(probeInputVectors_, buildInputVectors_); + + const auto expected = execute(defaultPlan, /*injectSpill=*/false); // Verify results against DuckDB. - if (auto duckDbResult = - computeDuckDbResult(probeInput, buildInput, plan.plan)) { + if (auto duckDbResult = computeDuckDbResult(defaultPlan.plan)) { VELOX_CHECK( assertEqualResults( - duckDbResult.value(), plan.plan->outputType(), {expected}), + duckDbResult.value(), defaultPlan.plan->outputType(), {expected}), "Velox and DuckDB results don't match"); } std::vector altPlans; - altPlans.push_back(makeDefaultPlan( - joinType, - nullAware, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput, - output)); - makeAlternativePlans(plan.plan, probeInput, buildInput, altPlans); - makeAlternativePlans(plan.plan, flatProbeInput, flatBuildInput, altPlans); - - auto directory = exec::test::TempDirectoryPath::create(); - - if (isTableScanSupported(probeInput[0]->type()) && - isTableScanSupported(buildInput[0]->type())) { - auto writerPool = rootPool_->addAggregateChild("writer"); - - std::vector> probeSplits; - for (auto i = 0; i < probeInput.size(); ++i) { - const std::string filePath = - fmt::format("{}/probe{}", directory->path, i); - writeToFile(filePath, probeInput[i], writerPool.get()); - probeSplits.push_back(makeSplit(filePath)); - } - - std::vector> buildSplits; - for (auto i = 0; i < buildInput.size(); ++i) { - const std::string filePath = - fmt::format("{}/build{}", directory->path, i); - writeToFile(filePath, buildInput[i], writerPool.get()); - buildSplits.push_back(makeSplit(filePath)); - } - - auto tableScanPlan = makeDefaultPlanWithTableScan( - joinType, - nullAware, - probeKeys, - buildKeys, - asRowType(probeInput[0]->type()), - probeSplits, - asRowType(buildInput[0]->type()), - buildSplits, - output); - - altPlans.push_back(tableScanPlan); - - auto joinNode = - std::dynamic_pointer_cast(tableScanPlan.plan); - VELOX_CHECK_NOT_NULL(joinNode); - - // Flip join sides. - if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { - altPlans.push_back({flippedPlan, tableScanPlan.splits}); - } - } + altPlans.push_back( + makeDefaultPlan(flatProbeInputVectors_, flatBuildInputVectors_)); + makeAlternativePlans( + defaultPlan.plan, probeInputVectors_, buildInputVectors_, altPlans); + makeAlternativePlans( + defaultPlan.plan, + flatProbeInputVectors_, + flatBuildInputVectors_, + altPlans); + + addPlansWithTableScan(altPlans); for (auto i = 0; i < altPlans.size(); ++i) { LOG(INFO) << "Testing plan #" << i; - auto actual = execute(altPlans[i], false /*injectSpill*/); + auto actual = execute(altPlans[i], /*injectSpill=*/false); VELOX_CHECK( assertEqualResults({expected}, {actual}), "Logically equivalent plans produced different results"); @@ -854,7 +890,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { } LOG(INFO) << "Testing plan #" << i << " with spilling"; - actual = execute(altPlans[i], true /*injectSpill*/); + actual = execute(altPlans[i], /*=injectSpill=*/true); VELOX_CHECK( assertEqualResults({expected}, {actual}), "Logically equivalent plans produced different results"); @@ -862,12 +898,161 @@ void JoinFuzzer::verify(core::JoinType joinType) { } } +void JoinFuzzer::addPlansWithTableScan(std::vector& altPlans) { + VELOX_CHECK_NULL(tableScanDir_); + if (!isTableScanSupported(probeInputVectors_[0]->type()) || + !isTableScanSupported(buildInputVectors_[0]->type())) { + return; + } + + tableScanDir_ = exec::test::TempDirectoryPath::create(); + + VELOX_CHECK(probeScanSplits_.empty()); + for (auto i = 0; i < probeInputVectors_.size(); ++i) { + const std::string filePath = + fmt::format("{}/probe{}", tableScanDir_->path, i); + writeToFile(filePath, probeInputVectors_[i], writerPool_.get()); + probeScanSplits_.push_back(makeSplit(filePath)); + } + + VELOX_CHECK(buildScanSplits_.empty()); + for (auto i = 0; i < buildInputVectors_.size(); ++i) { + const std::string filePath = + fmt::format("{}/build{}", tableScanDir_->path, i); + writeToFile(filePath, buildInputVectors_[i], writerPool_.get()); + buildScanSplits_.push_back(makeSplit(filePath)); + } + + std::vector plansWithTableScan; + auto defaultPlan = makeDefaultPlanWithTableScan(); + plansWithTableScan.push_back(defaultPlan); + + auto joinNode = + std::dynamic_pointer_cast(defaultPlan.plan); + VELOX_CHECK_NOT_NULL(joinNode); + + // Flip join sides. + if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { + plansWithTableScan.push_back({flippedPlan, defaultPlan.splits}); + } + + generateTableScanSplitsWithGroup(); + + for (const auto& planWithTableScan : plansWithTableScan) { + altPlans.push_back(planWithTableScan); + altPlans.push_back( + makeGroupedExecutionPlanWithTableScan(planWithTableScan)); + } +} + +void JoinFuzzer::generateSplitsWithGroup(bool isProbe) { + const std::vector> inputVectorsByGroup = + splitInputByGroup(isProbe); + std::vector& splitsWithGroup = + isProbe ? probeScanSplitsWithGroup_ : buildScanSplitsWithGroup_; + + for (int32_t groupId = 0; groupId < numGroups_; ++groupId) { + for (auto i = 0; i < inputVectorsByGroup[groupId].size(); ++i) { + const std::string filePath = fmt::format( + "{}/grouped[{}].{}.{}", + tableScanDir_->path, + groupId, + isProbe ? "probe" : "build", + i); + writeToFile(filePath, inputVectorsByGroup[groupId][i], writerPool_.get()); + splitsWithGroup.push_back(exec::Split{makeSplit(filePath), groupId}); + } + splitsWithGroup.push_back(exec::Split{nullptr, groupId}); + } +} + +std::vector> JoinFuzzer::splitInputByGroup( + bool isProbe) { + const std::vector& inputVectors = + isProbe ? probeInputVectors_ : buildInputVectors_; + if (numGroups_ == 1) { + return {inputVectors}; + } + + // Partition 'inputVectors' based on the join keys for group execution with + // one partition per each group. + const RowTypePtr& inputType = isProbe ? probeInputType_ : buildInputType_; + std::vector partitionChannels(keyTypes_.size()); + std::iota(partitionChannels.begin(), partitionChannels.end(), 0); + std::vector> hashers; + hashers.reserve(keyTypes_.size()); + for (auto channel : partitionChannels) { + hashers.emplace_back( + exec::VectorHasher::create(inputType->childAt(channel), channel)); + } + + std::vector> inputVectorsByGroup{ + static_cast(numGroups_)}; + raw_vector groupHashes; + std::vector groupRows(numGroups_); + std::vector rawGroupRows(numGroups_); + std::vector groupSizes(numGroups_, 0); + SelectivityVector inputRows; + + for (const auto& inputVector : inputVectors) { + const int numRows = inputVector->size(); + inputRows.resize(numRows); + inputRows.setAll(); + groupHashes.resize(numRows); + std::fill(groupSizes.begin(), groupSizes.end(), 0); + std::fill(groupHashes.begin(), groupHashes.end(), 0); + + for (auto i = 0; i < hashers.size(); ++i) { + auto& hasher = hashers[i]; + auto* keyVector = + inputVector->childAt(hashers[i]->channel())->loadedVector(); + hashers[i]->decode(*keyVector, inputRows); + if (hasher->channel() != kConstantChannel) { + hashers[i]->hash(inputRows, i > 0, groupHashes); + } else { + hashers[i]->hashPrecomputed(inputRows, i > 0, groupHashes); + } + } + + for (int row = 0; row < numRows; ++row) { + const int32_t groupId = groupHashes[row] % numGroups_; + if (groupRows[groupId] == nullptr || + (groupRows[groupId]->capacity() < numRows * sizeof(vector_size_t))) { + groupRows[groupId] = allocateIndices(numRows, pool_.get()); + rawGroupRows[groupId] = groupRows[groupId]->asMutable(); + } + rawGroupRows[groupId][groupSizes[groupId]++] = row; + } + + for (int32_t groupId = 0; groupId < numGroups_; ++groupId) { + const size_t groupSize = groupSizes[groupId]; + if (groupSize != 0) { + VELOX_CHECK_NOT_NULL(groupRows[groupId]); + groupRows[groupId]->setSize( + groupSizes[groupId] * sizeof(vector_size_t)); + inputVectorsByGroup[groupId].push_back( + (groupSize == numRows) + ? inputVector + : exec::wrap( + groupSize, std::move(groupRows[groupId]), inputVector)); + } + } + } + return inputVectorsByGroup; +} + +void JoinFuzzer::generateTableScanSplitsWithGroup() { + numGroups_ = randInt(1, probeScanSplits_.size()); + generateSplitsWithGroup(/*isProbe=*/true); + generateSplitsWithGroup(/*isProbe=*/false); +} + void JoinFuzzer::go() { VELOX_CHECK( FLAGS_steps > 0 || FLAGS_duration_sec > 0, "Either --steps or --duration_sec needs to be greater than zero.") - auto startTime = std::chrono::system_clock::now(); + const auto startTime = std::chrono::system_clock::now(); size_t iteration = 0; while (!isDone(iteration, startTime)) { @@ -875,9 +1060,11 @@ void JoinFuzzer::go() { << iteration << " (seed: " << currentSeed_ << ")"; // Pick join type. - auto joinType = pickJoinType(); + pickJoinType(); - verify(joinType); + verify(); + + cleanup(); LOG(INFO) << "==============================> Done with iteration " << iteration; @@ -887,6 +1074,25 @@ void JoinFuzzer::go() { } } +void JoinFuzzer::cleanup() { + keyTypes_.clear(); + probeKeyNames_.clear(); + buildKeyNames_.clear(); + probeInputType_.reset(); + probeInputVectors_.clear(); + flatProbeInputVectors_.clear(); + probeScanSplits_.clear(); + probeScanSplitsWithGroup_.clear(); + buildInputType_.reset(); + buildInputVectors_.clear(); + flatBuildInputVectors_.clear(); + buildScanSplits_.clear(); + buildScanSplitsWithGroup_.clear(); + outputColumnNames_.clear(); + numGroups_ = 0; + tableScanDir_.reset(); +} + } // namespace void joinFuzzer(size_t seed) { diff --git a/velox/exec/tests/JoinFuzzerRunner.h b/velox/exec/tests/JoinFuzzerRunner.h index 15d388915a8f5..524240fe58ee4 100644 --- a/velox/exec/tests/JoinFuzzerRunner.h +++ b/velox/exec/tests/JoinFuzzerRunner.h @@ -18,6 +18,8 @@ #include #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/SharedArbitrator.h" +#include "velox/exec/MemoryReclaimer.h" #include "velox/exec/tests/JoinFuzzer.h" #include "velox/serializers/PrestoSerializer.h" @@ -56,7 +58,7 @@ class JoinFuzzerRunner { public: static int run(size_t seed) { - facebook::velox::memory::MemoryManager::initialize({}); + setupMemory(); facebook::velox::serializer::presto::PrestoVectorSerde:: registerVectorSerde(); facebook::velox::filesystems::registerLocalFileSystem(); @@ -64,4 +66,20 @@ class JoinFuzzerRunner { facebook::velox::exec::test::joinFuzzer(seed); return RUN_ALL_TESTS(); } + + private: + // Invoked to set up memory system with arbitration. + static void setupMemory() { + FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true; + FLAGS_velox_memory_leak_check_enabled = true; + facebook::velox::memory::SharedArbitrator::registerFactory(); + facebook::velox::memory::MemoryManagerOptions options; + options.allocatorCapacity = 8L << 30; + options.arbitratorCapacity = 6L << 30; + options.arbitratorKind = "SHARED"; + options.checkUsageLeak = true; + options.arbitrationStateCheckCb = + facebook::velox::exec::memoryArbitrationStateCheck; + facebook::velox::memory::MemoryManager::initialize(options); + } }; diff --git a/velox/exec/tests/utils/AssertQueryBuilder.h b/velox/exec/tests/utils/AssertQueryBuilder.h index 863dc6867bb97..be7bbd3871682 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.h +++ b/velox/exec/tests/utils/AssertQueryBuilder.h @@ -118,7 +118,31 @@ class AssertQueryBuilder { return *this; } - // Methods to run the query and verify the results. + /// Methods to configure the group execution mode. + AssertQueryBuilder& executionStrategy( + core::ExecutionStrategy executionStrategy) { + params_.executionStrategy = executionStrategy; + return *this; + } + + AssertQueryBuilder& numSplitGroups(int numSplitGroups) { + params_.numSplitGroups = numSplitGroups; + return *this; + } + + AssertQueryBuilder& numConcurrentSplitGroups( + int32_t numConcurrentSplitGroups) { + params_.numConcurrentSplitGroups = numConcurrentSplitGroups; + return *this; + } + + AssertQueryBuilder& groupedExecutionLeafNodeIds( + const std::unordered_set& groupedExecutionLeafNodeIds) { + params_.groupedExecutionLeafNodeIds = groupedExecutionLeafNodeIds; + return *this; + } + + /// Methods to run the query and verify the results. /// Run the query and verify results against DuckDB. Requires /// duckDbQueryRunner to be provided in the constructor. @@ -158,7 +182,7 @@ class AssertQueryBuilder { // Used by the created task as the default driver executor. std::unique_ptr executor_{ new folly::CPUThreadPoolExecutor(std::thread::hardware_concurrency())}; - DuckDbQueryRunner* FOLLY_NULLABLE const duckDbQueryRunner_; + DuckDbQueryRunner* const duckDbQueryRunner_; CursorParameters params_; std::unordered_map configs_; std::unordered_map> diff --git a/velox/exec/tests/utils/Cursor.h b/velox/exec/tests/utils/Cursor.h index fe1c3e17c04a4..6f4a0de7d2a91 100644 --- a/velox/exec/tests/utils/Cursor.h +++ b/velox/exec/tests/utils/Cursor.h @@ -46,15 +46,15 @@ struct CursorParameters { uint64_t bufferedBytes = 512 * 1024; - // Ungrouped (by default) or grouped (bucketed) execution. + /// Ungrouped (by default) or grouped (bucketed) execution. core::ExecutionStrategy executionStrategy{ core::ExecutionStrategy::kUngrouped}; /// Contains leaf plan nodes that need to be executed in the grouped mode. std::unordered_set groupedExecutionLeafNodeIds; - // Number of splits groups the task will be processing. Must be 1 for - // ungrouped execution. + /// Number of splits groups the task will be processing. Must be 1 for + /// ungrouped execution. int numSplitGroups{1}; /// Spilling directory, if not empty, then the task's spilling directory would