diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index d2f8c6a98abe6..be8855962b4f8 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -500,18 +500,17 @@ void HashBuild::spillInput(const RowVectorPtr& input) { computeSpillPartitions(input); vector_size_t numSpillInputs = 0; - for (auto rowIdx = 0; rowIdx < numInput; ++rowIdx) { - const auto partition = spillPartitions_[rowIdx]; - if (FOLLY_UNLIKELY(!activeRows_.isValid(rowIdx))) { + for (auto row = 0; row < numInput; ++row) { + const auto partition = spillPartitions_[row]; + if (FOLLY_UNLIKELY(!activeRows_.isValid(row))) { continue; } if (!spiller_->isSpilled(partition)) { continue; } - activeRows_.setValid(rowIdx, false); + activeRows_.setValid(row, false); ++numSpillInputs; - rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] = - rowIdx; + rawSpillInputIndicesBuffers_[partition][numSpillInputs_[partition]++] = row; } if (numSpillInputs == 0) { return; diff --git a/velox/exec/tests/JoinFuzzer.cpp b/velox/exec/tests/JoinFuzzer.cpp index 4d1b04255af68..1efba1810c1c8 100644 --- a/velox/exec/tests/JoinFuzzer.cpp +++ b/velox/exec/tests/JoinFuzzer.cpp @@ -18,8 +18,10 @@ #include "velox/common/file/FileSystems.h" #include "velox/connectors/hive/HiveConnector.h" #include "velox/connectors/hive/HiveConnectorSplit.h" +#include "velox/connectors/hive/PartitionIdGenerator.h" #include "velox/dwio/dwrf/reader/DwrfReader.h" #include "velox/dwio/dwrf/writer/Writer.h" +#include "velox/exec/OperatorUtils.h" #include "velox/exec/tests/utils/AssertQueryBuilder.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -38,7 +40,7 @@ DEFINE_int32( 100, "The number of elements on each generated vector."); -DEFINE_int32(num_batches, 5, "The number of generated vectors."); +DEFINE_int32(num_batches, 10, "The number of generated vectors."); DEFINE_double( null_ratio, @@ -60,10 +62,10 @@ class JoinFuzzer { struct PlanWithSplits { core::PlanNodePtr plan; - std::unordered_map< - core::PlanNodeId, - std::vector>> + std::unordered_map> splits; + core::ExecutionStrategy executionStrategy{ + core::ExecutionStrategy::kUngrouped}; }; private: @@ -76,6 +78,12 @@ class JoinFuzzer { return opts; } + static inline const std::string kHiveConnectorId = "test-hive"; + + // Makes a connector split from a file path on storage. + static std::shared_ptr makeSplit( + const std::string& filePath); + void seed(size_t seed) { currentSeed_ = seed; vectorFuzzer_.reSeed(seed); @@ -86,55 +94,116 @@ class JoinFuzzer { seed(rng_()); } - /// Randomly pick a join type to test. - core::JoinType pickJoinType(); + // Randomly pick a join type to test. + void pickJoinType(); + + // Makes the query plan with default settings in JoinFuzzer and value inputs + // for both probe and build sides. + // + // NOTE: 'probeInput' and 'buildInput' could either input rows with lazy + // vectors or flatten ones. + JoinFuzzer::PlanWithSplits makeDefaultPlan( + const std::vector& probeInput, + const std::vector& buildInput); - void verify(core::JoinType joinType); + // Makes the default query plan with table scan as inputs for both probe and + // build sides. + JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan(); - /// Returns a list of randomly generated join key types. + // Makes the query plan from 'planWithTableScan' with grouped execution mode. + // Correspondingly, it replaces the table scan input splits with grouped ones. + JoinFuzzer::PlanWithSplits makeGroupedExecutionPlanWithTableScan( + const JoinFuzzer::PlanWithSplits& planWithTableScan); + + // Runs one test iteration from query plans generations, executions and result + // verifications. + void verify(); + + // Cleanup the test settings at the end of one test iteration. + void cleanup(); + + // Generates inputs for join plan generation including join keys, probe/build + // input types and input data in vectors. + void generateJoinPlanInputs(); + + // Returns a list of randomly generated join key types. std::vector generateJoinKeyTypes(int32_t numKeys); - /// Returns randomly generated probe input with upto 3 additional payload - /// columns. - std::vector generateProbeInput( - const std::vector& keyNames, - const std::vector& keyTypes); + // Returns randomly generated probe input with upto 3 additional payload + // columns. + void generateProbeInput(); - /// Same as generateProbeInput() but copies over 10% of the input in the probe - /// columns to ensure some matches during joining. Also generates an empty - /// input with a 10% chance. - std::vector generateBuildInput( - const std::vector& probeInput, - const std::vector& probeKeys, - const std::vector& buildKeys); + // Same as generateProbeInput() but copies over 10% of the input in the probe + // columns to ensure some matches during joining. Also generates an empty + // input with a 10% chance. + void generateBuildInput(); + + void shuffleJoinKeys(); - void shuffleJoinKeys( - std::vector& probeKeys, - std::vector& buildKeys); + void addPlansWithTableScan(std::vector& altPlans); + + // Generates the splits for group execution plans. The function splits the + // inputs into a number of groups by partitioning on the join keys. It then + // writes the partitioned inputs into separate files to generate grouped + // splits. + void generateTableScanSplitsWithGroup(); + + // Splits the input into groups by partitioning on the join keys. If 'isProbe' + // is true, this splits the input for probe side, otherwise for build. + std::vector> splitInputByGroup(bool isProbe); + + // Generates the grouped splits. If 'isProbe' is true, this generates grouped + // splits for probe side, otherwise for build. + void generateSplitsWithGroup(bool isProbe); RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); std::optional computeDuckDbResult( - const std::vector& probeInput, - const std::vector& buildInput, const core::PlanNodePtr& plan); int32_t randInt(int32_t min, int32_t max) { return boost::random::uniform_int_distribution(min, max)(rng_); } - static inline const std::string kHiveConnectorId = "test-hive"; - - static std::shared_ptr makeSplit( - const std::string& filePath); - FuzzerGenerator rng_; size_t currentSeed_{0}; std::shared_ptr rootPool_{ - memory::memoryManager()->addRootPool()}; - std::shared_ptr pool_{rootPool_->addLeafChild("leaf")}; + memory::memoryManager()->addRootPool( + "joinFuzzer", + memory::kMaxMemory, + memory::MemoryReclaimer::create())}; + std::shared_ptr pool_{rootPool_->addLeafChild( + "joinFuzzerLeaf", + true, + exec::MemoryReclaimer::create())}; + std::shared_ptr writerPool_{rootPool_->addAggregateChild( + "joinFuzzerWriter", + exec::MemoryReclaimer::create())}; + VectorFuzzer vectorFuzzer_; + + // The following fields are reset cross test iterations. + core::JoinType joinType_; + bool nullAware_; + std::vector keyTypes_; + std::vector probeKeyNames_; + std::vector buildKeyNames_; + RowTypePtr probeInputType_; + std::vector probeInputVectors_; + std::vector flatProbeInputVectors_; + std::vector> probeScanSplits_; + std::vector probeScanSplitsWithGroup_; + core::PlanNodeId probeScanNodeId_; + RowTypePtr buildInputType_; + std::vector buildInputVectors_; + std::vector flatBuildInputVectors_; + std::vector> buildScanSplits_; + std::vector buildScanSplitsWithGroup_; + core::PlanNodeId buildScanNodeId_; + std::vector outputColumnNames_; + int32_t numGroups_{0}; + std::shared_ptr tableScanDir_; }; JoinFuzzer::JoinFuzzer(size_t initialSeed) @@ -164,7 +233,7 @@ bool isDone(size_t i, T startTime) { return i >= FLAGS_steps; } -core::JoinType JoinFuzzer::pickJoinType() { +void JoinFuzzer::pickJoinType() { // Right joins are tested by flipping sides of the left joins. static std::vector kJoinTypes = { core::JoinType::kInner, @@ -172,11 +241,10 @@ core::JoinType JoinFuzzer::pickJoinType() { core::JoinType::kFull, core::JoinType::kLeftSemiFilter, core::JoinType::kLeftSemiProject, - core::JoinType::kAnti, - }; + core::JoinType::kAnti}; - size_t idx = randInt(0, kJoinTypes.size() - 1); - return kJoinTypes[idx]; + const size_t idx = randInt(0, kJoinTypes.size() - 1); + joinType_ = kJoinTypes[idx]; } std::vector JoinFuzzer::generateJoinKeyTypes(int32_t numKeys) { @@ -189,50 +257,46 @@ std::vector JoinFuzzer::generateJoinKeyTypes(int32_t numKeys) { return types; } -std::vector JoinFuzzer::generateProbeInput( - const std::vector& keyNames, - const std::vector& keyTypes) { - std::vector names = keyNames; - std::vector types = keyTypes; +void JoinFuzzer::generateProbeInput() { + std::vector names = probeKeyNames_; + std::vector types = keyTypes_; // Add up to 3 payload columns. - auto numPayload = randInt(0, 3); + const auto numPayload = randInt(0, 3); for (auto i = 0; i < numPayload; ++i) { - names.push_back(fmt::format("tp{}", i + keyNames.size())); - types.push_back(vectorFuzzer_.randType(2 /*maxDepth*/)); + names.push_back(fmt::format("tp{}", i + keyTypes_.size())); + types.push_back(vectorFuzzer_.randType(/*maxDepth=*/2)); } - auto inputType = ROW(std::move(names), std::move(types)); - std::vector input; + probeInputType_ = ROW(std::move(names), std::move(types)); + VELOX_CHECK(probeInputVectors_.empty()); for (auto i = 0; i < FLAGS_num_batches; ++i) { - input.push_back(vectorFuzzer_.fuzzInputRow(inputType)); + probeInputVectors_.push_back(vectorFuzzer_.fuzzInputRow(probeInputType_)); } - return input; } -std::vector JoinFuzzer::generateBuildInput( - const std::vector& probeInput, - const std::vector& probeKeys, - const std::vector& buildKeys) { - std::vector names = buildKeys; - std::vector types; - for (const auto& key : probeKeys) { - types.push_back(asRowType(probeInput[0]->type())->findChild(key)); - } +void JoinFuzzer::generateBuildInput() { + std::vector names = buildKeyNames_; + std::vector types = keyTypes_; // Add up to 3 payload columns. - auto numPayload = randInt(0, 3); - for (auto i = 0; i < numPayload; ++i) { - names.push_back(fmt::format("bp{}", i + buildKeys.size())); - types.push_back(vectorFuzzer_.randType(2 /*maxDepth*/)); + auto numPayloadColumns = randInt(0, 3); + for (auto i = 0; i < numPayloadColumns; ++i) { + names.push_back(fmt::format("bp{}", i + keyTypes_.size())); + types.push_back(vectorFuzzer_.randType(/*maxDepth=*/2)); } - auto rowType = ROW(std::move(names), std::move(types)); + buildInputType_ = ROW(std::move(names), std::move(types)); + + VELOX_CHECK(buildInputVectors_.empty()); // 1 in 10 times use empty build. + // // TODO Use non-empty build with no matches sometimes. if (vectorFuzzer_.coinToss(0.1)) { - return {BaseVector::create(rowType, 0, pool_.get())}; + buildInputVectors_ = { + BaseVector::create(buildInputType_, 0, pool_.get())}; + return; } // TODO Remove the assumption that probeKeys are the first columns in @@ -240,11 +304,12 @@ std::vector JoinFuzzer::generateBuildInput( // To ensure there are some matches, sample with replacement 10% of probe join // keys and use these as build keys. + // // TODO Add a few random rows as well. - std::vector input; - for (const auto& probe : probeInput) { - auto numRows = 1 + probe->size() / 10; - auto build = BaseVector::create(rowType, numRows, probe->pool()); + for (const auto& probe : probeInputVectors_) { + const auto numRows = 1 + probe->size() / 10; + auto buildInput = + BaseVector::create(buildInputType_, numRows, pool_.get()); // Pick probe side rows to copy. std::vector rowNumbers(numRows); @@ -253,21 +318,20 @@ std::vector JoinFuzzer::generateBuildInput( } SelectivityVector rows(numRows); - for (auto i = 0; i < probeKeys.size(); ++i) { - build->childAt(i)->resize(numRows); - build->childAt(i)->copy(probe->childAt(i).get(), rows, rowNumbers.data()); + for (auto i = 0; i < keyTypes_.size(); ++i) { + buildInput->childAt(i)->resize(numRows); + buildInput->childAt(i)->copy( + probe->childAt(i).get(), rows, rowNumbers.data()); } - for (auto i = 0; i < numPayload; ++i) { - auto column = i + probeKeys.size(); - build->childAt(column) = - vectorFuzzer_.fuzz(rowType->childAt(column), numRows); + for (auto i = 0; i < numPayloadColumns; ++i) { + const auto columnChannel = i + keyTypes_.size(); + buildInput->childAt(columnChannel) = + vectorFuzzer_.fuzz(buildInputType_->childAt(columnChannel), numRows); } - input.push_back(build); + buildInputVectors_.push_back(buildInput); } - - return input; } std::vector flatten(const std::vector& vectors) { @@ -283,12 +347,25 @@ std::vector flatten(const std::vector& vectors) { } RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { - LOG(INFO) << "Executing query plan: " << std::endl + LOG(INFO) << "Executing query plan with " + << executionStrategyToString(plan.executionStrategy) << " strategy[" + << (plan.executionStrategy == core::ExecutionStrategy::kGrouped + ? numGroups_ + : 0) + << " groups]" << (injectSpill ? " and spilling injection" : "") + << ": " << std::endl << plan.plan->toString(true, true); AssertQueryBuilder builder(plan.plan); - for (const auto& [nodeId, nodeSplits] : plan.splits) { - builder.splits(nodeId, nodeSplits); + for (const auto& [planNodeId, nodeSplits] : plan.splits) { + builder.splits(planNodeId, nodeSplits); + } + + if (plan.executionStrategy == core::ExecutionStrategy::kGrouped) { + builder.executionStrategy(core::ExecutionStrategy::kGrouped); + builder.groupedExecutionLeafNodeIds({probeScanNodeId_, buildScanNodeId_}); + builder.numSplitGroups(numGroups_); + builder.numConcurrentSplitGroups(1); } std::shared_ptr spillDirectory; @@ -302,11 +379,15 @@ RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { } TestScopedSpillInjection scopedSpillInjection(spillPct); - auto result = builder.maxDrivers(2).copyResults(pool_.get()); + const auto result = builder.maxDrivers(2).copyResults(pool_.get()); LOG(INFO) << "Results: " << result->toString(); if (VLOG_IS_ON(1)) { VLOG(1) << std::endl << result->toString(0, result->size()); } + // Wait for the task to be destroyed before start next query execution to + // avoid the potential interference of the background activities across query + // executions. + waitForAllTasksToBeDeleted(); return result; } @@ -389,25 +470,23 @@ bool containsUnsupportedTypes(const TypePtr& type) { } std::optional JoinFuzzer::computeDuckDbResult( - const std::vector& probeInput, - const std::vector& buildInput, const core::PlanNodePtr& plan) { - if (containsUnsupportedTypes(probeInput[0]->type())) { + if (containsUnsupportedTypes(probeInputVectors_[0]->type())) { return std::nullopt; } - if (containsUnsupportedTypes(buildInput[0]->type())) { + if (containsUnsupportedTypes(buildInputVectors_[0]->type())) { return std::nullopt; } DuckDbQueryRunner queryRunner; - queryRunner.createTable("t", probeInput); - queryRunner.createTable("u", buildInput); + queryRunner.createTable("t", probeInputVectors_); + queryRunner.createTable("u", buildInputVectors_); - auto joinNode = dynamic_cast(plan.get()); + auto* joinNode = dynamic_cast(plan.get()); VELOX_CHECK_NOT_NULL(joinNode); - auto joinKeysToSql = [](auto keys) { + const auto joinKeysToSql = [](auto keys) { std::stringstream out; for (auto i = 0; i < keys.size(); ++i) { if (i > 0) { @@ -418,7 +497,7 @@ std::optional JoinFuzzer::computeDuckDbResult( return out.str(); }; - auto equiClausesToSql = [](auto joinNode) { + const auto equiClausesToSql = [](auto joinNode) { std::stringstream out; for (auto i = 0; i < joinNode->leftKeys().size(); ++i) { if (i > 0) { @@ -478,7 +557,8 @@ std::optional JoinFuzzer::computeDuckDbResult( } break; default: - VELOX_UNREACHABLE(); + VELOX_UNREACHABLE( + "Unknown join type: {}", static_cast(joinNode->joinType())); } return queryRunner.execute(sql.str(), plan->outputType()); @@ -494,61 +574,66 @@ std::vector fieldNames( return names; } -JoinFuzzer::PlanWithSplits makeDefaultPlan( - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, +JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& output) { + const std::vector& buildInput) { auto planNodeIdGenerator = std::make_shared(); auto plan = PlanBuilder(planNodeIdGenerator) .values(probeInput) .hashJoin( - probeKeys, - buildKeys, + probeKeyNames_, + buildKeyNames_, PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - "" /*filter*/, - output, - joinType, - nullAware) + /*filter=*/"", + outputColumnNames_, + joinType_, + nullAware_) .planNode(); return {plan, {}}; } -JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan( - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const RowTypePtr& probeInputType, - const std::vector>& - probeSplits, - const RowTypePtr& buildInputType, - const std::vector>& - buildSplits, - const std::vector& output) { +std::vector fromConnectorSplits( + std::vector> connectorSplits, + int32_t groupId = -1) { + std::vector splits; + splits.reserve(connectorSplits.size()); + for (auto& connectorSplit : connectorSplits) { + splits.emplace_back(exec::Split{std::move(connectorSplit), groupId}); + } + return splits; +} + +JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan() { auto planNodeIdGenerator = std::make_shared(); - core::PlanNodeId probeId; - core::PlanNodeId buildId; auto plan = PlanBuilder(planNodeIdGenerator) - .tableScan(probeInputType) - .capturePlanNodeId(probeId) + .tableScan(probeInputType_) + .capturePlanNodeId(probeScanNodeId_) .hashJoin( - probeKeys, - buildKeys, + probeKeyNames_, + buildKeyNames_, PlanBuilder(planNodeIdGenerator) - .tableScan(buildInputType) - .capturePlanNodeId(buildId) + .tableScan(buildInputType_) + .capturePlanNodeId(buildScanNodeId_) .planNode(), - "" /*filter*/, - output, - joinType, - nullAware) + /*filter=*/"", + outputColumnNames_, + joinType_, + nullAware_) .planNode(); - return {plan, {{probeId, probeSplits}, {buildId, buildSplits}}}; + return { + plan, + {{probeScanNodeId_, fromConnectorSplits(probeScanSplits_)}, + {buildScanNodeId_, fromConnectorSplits(buildScanSplits_)}}}; +} + +JoinFuzzer::PlanWithSplits JoinFuzzer::makeGroupedExecutionPlanWithTableScan( + const JoinFuzzer::PlanWithSplits& planWithTableScan) { + return { + planWithTableScan.plan, + {{probeScanNodeId_, probeScanSplitsWithGroup_}, + {buildScanNodeId_, buildScanSplitsWithGroup_}}, + core::ExecutionStrategy::kGrouped}; } std::vector makeSources( @@ -583,8 +668,8 @@ void makeAlternativePlans( } // Parallelize probe and build sides. - auto probeKeys = fieldNames(joinNode->leftKeys()); - auto buildKeys = fieldNames(joinNode->rightKeys()); + const auto probeKeys = fieldNames(joinNode->leftKeys()); + const auto buildKeys = fieldNames(joinNode->rightKeys()); auto planNodeIdGenerator = std::make_shared(); plans.push_back( @@ -598,7 +683,7 @@ void makeAlternativePlans( .localPartitionRoundRobin( makeSources(buildInput, planNodeIdGenerator)) .planNode(), - "" /*filter*/, + /*filter=*/"", joinNode->outputType()->names(), joinNode->joinType(), joinNode->isNullAware()) @@ -619,7 +704,7 @@ void makeAlternativePlans( .values(buildInput) .orderBy(buildKeys, false) .planNode(), - "" /*filter*/, + /*filter=*/"", asRowType(joinNode->outputType())->names(), joinNode->joinType()) .planNode(), @@ -636,10 +721,10 @@ std::vector makeNames(const std::string& prefix, size_t n) { return names; } -void JoinFuzzer::shuffleJoinKeys( - std::vector& probeKeys, - std::vector& buildKeys) { - auto numKeys = probeKeys.size(); +void JoinFuzzer::shuffleJoinKeys() { + VELOX_CHECK(!keyTypes_.empty()); + + const auto numKeys = keyTypes_.size(); if (numKeys == 1) { return; } @@ -648,12 +733,12 @@ void JoinFuzzer::shuffleJoinKeys( std::iota(columnIndices.begin(), columnIndices.end(), 0); std::shuffle(columnIndices.begin(), columnIndices.end(), rng_); - auto copyProbeKeys = probeKeys; - auto copyBuildKeys = buildKeys; + const auto copyProbeKeys = probeKeyNames_; + const auto copyBuildKeys = buildKeyNames_; for (auto i = 0; i < numKeys; ++i) { - probeKeys[i] = copyProbeKeys[columnIndices[i]]; - buildKeys[i] = copyBuildKeys[columnIndices[i]]; + probeKeyNames_[i] = copyProbeKeys[columnIndices[i]]; + buildKeyNames_[i] = copyBuildKeys[columnIndices[i]]; } } @@ -708,138 +793,89 @@ bool isTableScanSupported(const TypePtr& type) { return true; } -void JoinFuzzer::verify(core::JoinType joinType) { - const bool nullAware = - isNullAwareSupported(joinType) && vectorFuzzer_.coinToss(0.5); - - const auto numKeys = nullAware ? 1 : randInt(1, 5); +void JoinFuzzer::generateJoinPlanInputs() { + const auto numKeys = nullAware_ ? 1 : randInt(1, 5); // Pick number and types of join keys. - std::vector keyTypes = generateJoinKeyTypes(numKeys); - std::vector probeKeys = makeNames("t", keyTypes.size()); - std::vector buildKeys = makeNames("u", keyTypes.size()); + keyTypes_ = generateJoinKeyTypes(numKeys); + probeKeyNames_ = makeNames("t", keyTypes_.size()); + buildKeyNames_ = makeNames("u", keyTypes_.size()); - auto probeInput = generateProbeInput(probeKeys, keyTypes); - auto buildInput = generateBuildInput(probeInput, probeKeys, buildKeys); + generateProbeInput(); + generateBuildInput(); // Flatten inputs. - auto flatProbeInput = flatten(probeInput); - auto flatBuildInput = flatten(buildInput); + flatProbeInputVectors_ = flatten(probeInputVectors_); + flatBuildInputVectors_ = flatten(buildInputVectors_); if (VLOG_IS_ON(1)) { - VLOG(1) << "Probe input: " << probeInput[0]->toString(); - for (const auto& v : flatProbeInput) { + VLOG(1) << "Probe input: " << probeInputVectors_[0]->toString(); + for (const auto& v : flatProbeInputVectors_) { VLOG(1) << std::endl << v->toString(0, v->size()); } - VLOG(1) << "Build input: " << buildInput[0]->toString(); - for (const auto& v : flatBuildInput) { + VLOG(1) << "Build input: " << buildInputVectors_[0]->toString(); + for (const auto& v : flatBuildInputVectors_) { VLOG(1) << std::endl << v->toString(0, v->size()); } } - auto output = - (core::isLeftSemiProjectJoin(joinType) || - core::isLeftSemiFilterJoin(joinType) || core::isAntiJoin(joinType)) - ? asRowType(probeInput[0]->type())->names() - : concat( - asRowType(probeInput[0]->type()), asRowType(buildInput[0]->type())) - ->names(); + outputColumnNames_ = + (core::isLeftSemiProjectJoin(joinType_) || + core::isLeftSemiFilterJoin(joinType_) || core::isAntiJoin(joinType_)) + ? probeInputType_->names() + : concat(probeInputType_, buildInputType_)->names(); // Shuffle output columns. - std::shuffle(output.begin(), output.end(), rng_); + std::shuffle(outputColumnNames_.begin(), outputColumnNames_.end(), rng_); // Remove some output columns. - auto numOutput = randInt(1, output.size()); - output.resize(numOutput); + const auto numOutput = randInt(1, outputColumnNames_.size()); + // CHECK(numOutput > 0); + outputColumnNames_.resize(numOutput); - if (core::isLeftSemiProjectJoin(joinType) || - core::isRightSemiProjectJoin(joinType)) { - output.push_back("match"); + if (core::isLeftSemiProjectJoin(joinType_) || + core::isRightSemiProjectJoin(joinType_)) { + outputColumnNames_.push_back("match"); } - shuffleJoinKeys(probeKeys, buildKeys); + shuffleJoinKeys(); +} + +void JoinFuzzer::verify() { + nullAware_ = isNullAwareSupported(joinType_) && vectorFuzzer_.coinToss(0.5); - auto plan = makeDefaultPlan( - joinType, - nullAware, - probeKeys, - buildKeys, - probeInput, - buildInput, - output); + generateJoinPlanInputs(); - auto expected = execute(plan, false /*injectSpill*/); + const auto defaultPlan = + makeDefaultPlan(probeInputVectors_, buildInputVectors_); + + const auto expected = execute(defaultPlan, /*injectSpill=*/false); // Verify results against DuckDB. - if (auto duckDbResult = - computeDuckDbResult(probeInput, buildInput, plan.plan)) { + if (auto duckDbResult = computeDuckDbResult(defaultPlan.plan)) { VELOX_CHECK( assertEqualResults( - duckDbResult.value(), plan.plan->outputType(), {expected}), + duckDbResult.value(), defaultPlan.plan->outputType(), {expected}), "Velox and DuckDB results don't match"); } std::vector altPlans; - altPlans.push_back(makeDefaultPlan( - joinType, - nullAware, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput, - output)); - makeAlternativePlans(plan.plan, probeInput, buildInput, altPlans); - makeAlternativePlans(plan.plan, flatProbeInput, flatBuildInput, altPlans); - - auto directory = exec::test::TempDirectoryPath::create(); - - if (isTableScanSupported(probeInput[0]->type()) && - isTableScanSupported(buildInput[0]->type())) { - auto writerPool = rootPool_->addAggregateChild("writer"); - - std::vector> probeSplits; - for (auto i = 0; i < probeInput.size(); ++i) { - const std::string filePath = - fmt::format("{}/probe{}", directory->path, i); - writeToFile(filePath, probeInput[i], writerPool.get()); - probeSplits.push_back(makeSplit(filePath)); - } - - std::vector> buildSplits; - for (auto i = 0; i < buildInput.size(); ++i) { - const std::string filePath = - fmt::format("{}/build{}", directory->path, i); - writeToFile(filePath, buildInput[i], writerPool.get()); - buildSplits.push_back(makeSplit(filePath)); - } - - auto tableScanPlan = makeDefaultPlanWithTableScan( - joinType, - nullAware, - probeKeys, - buildKeys, - asRowType(probeInput[0]->type()), - probeSplits, - asRowType(buildInput[0]->type()), - buildSplits, - output); - - altPlans.push_back(tableScanPlan); - - auto joinNode = - std::dynamic_pointer_cast(tableScanPlan.plan); - VELOX_CHECK_NOT_NULL(joinNode); - - // Flip join sides. - if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { - altPlans.push_back({flippedPlan, tableScanPlan.splits}); - } - } + altPlans.push_back( + makeDefaultPlan(flatProbeInputVectors_, flatBuildInputVectors_)); + makeAlternativePlans( + defaultPlan.plan, probeInputVectors_, buildInputVectors_, altPlans); + makeAlternativePlans( + defaultPlan.plan, + flatProbeInputVectors_, + flatBuildInputVectors_, + altPlans); + + addPlansWithTableScan(altPlans); for (auto i = 0; i < altPlans.size(); ++i) { LOG(INFO) << "Testing plan #" << i; - auto actual = execute(altPlans[i], false /*injectSpill*/); + auto actual = execute(altPlans[i], /*injectSpill=*/false); VELOX_CHECK( assertEqualResults({expected}, {actual}), "Logically equivalent plans produced different results"); @@ -854,7 +890,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { } LOG(INFO) << "Testing plan #" << i << " with spilling"; - actual = execute(altPlans[i], true /*injectSpill*/); + actual = execute(altPlans[i], /*=injectSpill=*/true); VELOX_CHECK( assertEqualResults({expected}, {actual}), "Logically equivalent plans produced different results"); @@ -862,12 +898,161 @@ void JoinFuzzer::verify(core::JoinType joinType) { } } +void JoinFuzzer::addPlansWithTableScan(std::vector& altPlans) { + VELOX_CHECK_NULL(tableScanDir_); + if (!isTableScanSupported(probeInputVectors_[0]->type()) || + !isTableScanSupported(buildInputVectors_[0]->type())) { + return; + } + + tableScanDir_ = exec::test::TempDirectoryPath::create(); + + VELOX_CHECK(probeScanSplits_.empty()); + for (auto i = 0; i < probeInputVectors_.size(); ++i) { + const std::string filePath = + fmt::format("{}/probe{}", tableScanDir_->path, i); + writeToFile(filePath, probeInputVectors_[i], writerPool_.get()); + probeScanSplits_.push_back(makeSplit(filePath)); + } + + VELOX_CHECK(buildScanSplits_.empty()); + for (auto i = 0; i < buildInputVectors_.size(); ++i) { + const std::string filePath = + fmt::format("{}/build{}", tableScanDir_->path, i); + writeToFile(filePath, buildInputVectors_[i], writerPool_.get()); + buildScanSplits_.push_back(makeSplit(filePath)); + } + + std::vector plansWithTableScan; + auto defaultPlan = makeDefaultPlanWithTableScan(); + plansWithTableScan.push_back(defaultPlan); + + auto joinNode = + std::dynamic_pointer_cast(defaultPlan.plan); + VELOX_CHECK_NOT_NULL(joinNode); + + // Flip join sides. + if (auto flippedPlan = tryFlipJoinSides(*joinNode)) { + plansWithTableScan.push_back({flippedPlan, defaultPlan.splits}); + } + + generateTableScanSplitsWithGroup(); + + for (const auto& planWithTableScan : plansWithTableScan) { + altPlans.push_back(planWithTableScan); + altPlans.push_back( + makeGroupedExecutionPlanWithTableScan(planWithTableScan)); + } +} + +void JoinFuzzer::generateSplitsWithGroup(bool isProbe) { + const std::vector> inputVectorsByGroup = + splitInputByGroup(isProbe); + std::vector& splitsWithGroup = + isProbe ? probeScanSplitsWithGroup_ : buildScanSplitsWithGroup_; + + for (int32_t groupId = 0; groupId < numGroups_; ++groupId) { + for (auto i = 0; i < inputVectorsByGroup[groupId].size(); ++i) { + const std::string filePath = fmt::format( + "{}/grouped[{}].{}.{}", + tableScanDir_->path, + groupId, + isProbe ? "probe" : "build", + i); + writeToFile(filePath, inputVectorsByGroup[groupId][i], writerPool_.get()); + splitsWithGroup.push_back(exec::Split{makeSplit(filePath), groupId}); + } + splitsWithGroup.push_back(exec::Split{nullptr, groupId}); + } +} + +std::vector> JoinFuzzer::splitInputByGroup( + bool isProbe) { + const std::vector& inputVectors = + isProbe ? probeInputVectors_ : buildInputVectors_; + if (numGroups_ == 1) { + return {inputVectors}; + } + + // Partition 'inputVectors' based on the join keys for group execution with + // one partition per each group. + const RowTypePtr& inputType = isProbe ? probeInputType_ : buildInputType_; + std::vector partitionChannels(keyTypes_.size()); + std::iota(partitionChannels.begin(), partitionChannels.end(), 0); + std::vector> hashers; + hashers.reserve(keyTypes_.size()); + for (auto channel : partitionChannels) { + hashers.emplace_back( + exec::VectorHasher::create(inputType->childAt(channel), channel)); + } + + std::vector> inputVectorsByGroup{ + static_cast(numGroups_)}; + raw_vector groupHashes; + std::vector groupRows(numGroups_); + std::vector rawGroupRows(numGroups_); + std::vector groupSizes(numGroups_, 0); + SelectivityVector inputRows; + + for (const auto& inputVector : inputVectors) { + const int numRows = inputVector->size(); + inputRows.resize(numRows); + inputRows.setAll(); + groupHashes.resize(numRows); + std::fill(groupSizes.begin(), groupSizes.end(), 0); + std::fill(groupHashes.begin(), groupHashes.end(), 0); + + for (auto i = 0; i < hashers.size(); ++i) { + auto& hasher = hashers[i]; + auto* keyVector = + inputVector->childAt(hashers[i]->channel())->loadedVector(); + hashers[i]->decode(*keyVector, inputRows); + if (hasher->channel() != kConstantChannel) { + hashers[i]->hash(inputRows, i > 0, groupHashes); + } else { + hashers[i]->hashPrecomputed(inputRows, i > 0, groupHashes); + } + } + + for (int row = 0; row < numRows; ++row) { + const int32_t groupId = groupHashes[row] % numGroups_; + if (groupRows[groupId] == nullptr || + (groupRows[groupId]->capacity() < numRows * sizeof(vector_size_t))) { + groupRows[groupId] = allocateIndices(numRows, pool_.get()); + rawGroupRows[groupId] = groupRows[groupId]->asMutable(); + } + rawGroupRows[groupId][groupSizes[groupId]++] = row; + } + + for (int32_t groupId = 0; groupId < numGroups_; ++groupId) { + const size_t groupSize = groupSizes[groupId]; + if (groupSize != 0) { + VELOX_CHECK_NOT_NULL(groupRows[groupId]); + groupRows[groupId]->setSize( + groupSizes[groupId] * sizeof(vector_size_t)); + inputVectorsByGroup[groupId].push_back( + (groupSize == numRows) + ? inputVector + : exec::wrap( + groupSize, std::move(groupRows[groupId]), inputVector)); + } + } + } + return inputVectorsByGroup; +} + +void JoinFuzzer::generateTableScanSplitsWithGroup() { + numGroups_ = randInt(1, probeScanSplits_.size()); + generateSplitsWithGroup(/*isProbe=*/true); + generateSplitsWithGroup(/*isProbe=*/false); +} + void JoinFuzzer::go() { VELOX_CHECK( FLAGS_steps > 0 || FLAGS_duration_sec > 0, "Either --steps or --duration_sec needs to be greater than zero.") - auto startTime = std::chrono::system_clock::now(); + const auto startTime = std::chrono::system_clock::now(); size_t iteration = 0; while (!isDone(iteration, startTime)) { @@ -875,9 +1060,11 @@ void JoinFuzzer::go() { << iteration << " (seed: " << currentSeed_ << ")"; // Pick join type. - auto joinType = pickJoinType(); + pickJoinType(); - verify(joinType); + verify(); + + cleanup(); LOG(INFO) << "==============================> Done with iteration " << iteration; @@ -887,6 +1074,25 @@ void JoinFuzzer::go() { } } +void JoinFuzzer::cleanup() { + keyTypes_.clear(); + probeKeyNames_.clear(); + buildKeyNames_.clear(); + probeInputType_.reset(); + probeInputVectors_.clear(); + flatProbeInputVectors_.clear(); + probeScanSplits_.clear(); + probeScanSplitsWithGroup_.clear(); + buildInputType_.reset(); + buildInputVectors_.clear(); + flatBuildInputVectors_.clear(); + buildScanSplits_.clear(); + buildScanSplitsWithGroup_.clear(); + outputColumnNames_.clear(); + numGroups_ = 0; + tableScanDir_.reset(); +} + } // namespace void joinFuzzer(size_t seed) { diff --git a/velox/exec/tests/JoinFuzzerRunner.h b/velox/exec/tests/JoinFuzzerRunner.h index 15d388915a8f5..524240fe58ee4 100644 --- a/velox/exec/tests/JoinFuzzerRunner.h +++ b/velox/exec/tests/JoinFuzzerRunner.h @@ -18,6 +18,8 @@ #include #include "velox/common/file/FileSystems.h" +#include "velox/common/memory/SharedArbitrator.h" +#include "velox/exec/MemoryReclaimer.h" #include "velox/exec/tests/JoinFuzzer.h" #include "velox/serializers/PrestoSerializer.h" @@ -56,7 +58,7 @@ class JoinFuzzerRunner { public: static int run(size_t seed) { - facebook::velox::memory::MemoryManager::initialize({}); + setupMemory(); facebook::velox::serializer::presto::PrestoVectorSerde:: registerVectorSerde(); facebook::velox::filesystems::registerLocalFileSystem(); @@ -64,4 +66,20 @@ class JoinFuzzerRunner { facebook::velox::exec::test::joinFuzzer(seed); return RUN_ALL_TESTS(); } + + private: + // Invoked to set up memory system with arbitration. + static void setupMemory() { + FLAGS_velox_enable_memory_usage_track_in_default_memory_pool = true; + FLAGS_velox_memory_leak_check_enabled = true; + facebook::velox::memory::SharedArbitrator::registerFactory(); + facebook::velox::memory::MemoryManagerOptions options; + options.allocatorCapacity = 8L << 30; + options.arbitratorCapacity = 6L << 30; + options.arbitratorKind = "SHARED"; + options.checkUsageLeak = true; + options.arbitrationStateCheckCb = + facebook::velox::exec::memoryArbitrationStateCheck; + facebook::velox::memory::MemoryManager::initialize(options); + } }; diff --git a/velox/exec/tests/utils/AssertQueryBuilder.h b/velox/exec/tests/utils/AssertQueryBuilder.h index 863dc6867bb97..be7bbd3871682 100644 --- a/velox/exec/tests/utils/AssertQueryBuilder.h +++ b/velox/exec/tests/utils/AssertQueryBuilder.h @@ -118,7 +118,31 @@ class AssertQueryBuilder { return *this; } - // Methods to run the query and verify the results. + /// Methods to configure the group execution mode. + AssertQueryBuilder& executionStrategy( + core::ExecutionStrategy executionStrategy) { + params_.executionStrategy = executionStrategy; + return *this; + } + + AssertQueryBuilder& numSplitGroups(int numSplitGroups) { + params_.numSplitGroups = numSplitGroups; + return *this; + } + + AssertQueryBuilder& numConcurrentSplitGroups( + int32_t numConcurrentSplitGroups) { + params_.numConcurrentSplitGroups = numConcurrentSplitGroups; + return *this; + } + + AssertQueryBuilder& groupedExecutionLeafNodeIds( + const std::unordered_set& groupedExecutionLeafNodeIds) { + params_.groupedExecutionLeafNodeIds = groupedExecutionLeafNodeIds; + return *this; + } + + /// Methods to run the query and verify the results. /// Run the query and verify results against DuckDB. Requires /// duckDbQueryRunner to be provided in the constructor. @@ -158,7 +182,7 @@ class AssertQueryBuilder { // Used by the created task as the default driver executor. std::unique_ptr executor_{ new folly::CPUThreadPoolExecutor(std::thread::hardware_concurrency())}; - DuckDbQueryRunner* FOLLY_NULLABLE const duckDbQueryRunner_; + DuckDbQueryRunner* const duckDbQueryRunner_; CursorParameters params_; std::unordered_map configs_; std::unordered_map> diff --git a/velox/exec/tests/utils/Cursor.h b/velox/exec/tests/utils/Cursor.h index fe1c3e17c04a4..6f4a0de7d2a91 100644 --- a/velox/exec/tests/utils/Cursor.h +++ b/velox/exec/tests/utils/Cursor.h @@ -46,15 +46,15 @@ struct CursorParameters { uint64_t bufferedBytes = 512 * 1024; - // Ungrouped (by default) or grouped (bucketed) execution. + /// Ungrouped (by default) or grouped (bucketed) execution. core::ExecutionStrategy executionStrategy{ core::ExecutionStrategy::kUngrouped}; /// Contains leaf plan nodes that need to be executed in the grouped mode. std::unordered_set groupedExecutionLeafNodeIds; - // Number of splits groups the task will be processing. Must be 1 for - // ungrouped execution. + /// Number of splits groups the task will be processing. Must be 1 for + /// ungrouped execution. int numSplitGroups{1}; /// Spilling directory, if not empty, then the task's spilling directory would