From bb3b7cab238830b83d04d770550ab96d1a556df8 Mon Sep 17 00:00:00 2001 From: Jialiang Tan Date: Tue, 22 Oct 2024 10:50:27 -0700 Subject: [PATCH] Centralize join probe/build table spill to join bridge (#11294) Summary: Consolidates table spill logic from hash probe and hash build to a centralized place, hash join bridge. This effort will reduce duplicate and unnecessary code, as well as making reclaiming from higher level possible (directly from join bridge under some circumstances). Pull Request resolved: https://github.com/facebookincubator/velox/pull/11294 Reviewed By: xiaoxmeng Differential Revision: D64568888 Pulled By: tanjialiang fbshipit-source-id: b532f778b412362ccc3a33233eaa0eb0bfa404c3 --- velox/exec/HashBuild.cpp | 64 ++-------- velox/exec/HashBuild.h | 18 +-- velox/exec/HashJoinBridge.cpp | 152 ++++++++++++++++++++++++ velox/exec/HashJoinBridge.h | 50 ++++++-- velox/exec/HashProbe.cpp | 140 +++------------------- velox/exec/HashProbe.h | 26 ++-- velox/exec/HashTable.h | 4 +- velox/exec/Operator.cpp | 3 +- velox/exec/tests/HashJoinBridgeTest.cpp | 84 +++++++++++++ 9 files changed, 327 insertions(+), 214 deletions(-) diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 42da1fbbd6fd..9ae79de3745e 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -77,18 +77,12 @@ HashBuild::HashBuild( const auto numKeys = joinNode_->rightKeys().size(); keyChannels_.reserve(numKeys); - std::vector names; - names.reserve(inputType->size()); - std::vector types; - types.reserve(inputType->size()); for (int i = 0; i < numKeys; ++i) { auto& key = joinNode_->rightKeys()[i]; auto channel = exprToChannel(key.get(), inputType); keyChannelMap_[channel] = i; keyChannels_.emplace_back(channel); - names.emplace_back(inputType->nameOf(channel)); - types.emplace_back(inputType->childAt(channel)); } // Identify the non-key build side columns and make a decoder for each. @@ -106,12 +100,10 @@ HashBuild::HashBuild( if (keyChannelMap_.find(i) == keyChannelMap_.end()) { dependentChannels_.emplace_back(i); decoders_.emplace_back(std::make_unique()); - names.emplace_back(inputType->nameOf(i)); - types.emplace_back(inputType->childAt(i)); } } - tableType_ = ROW(std::move(names), std::move(types)); + tableType_ = hashJoinTableType(joinNode_); setupTable(); setupSpiller(); stateCleared_ = false; @@ -1053,13 +1045,13 @@ void HashBuild::reclaim( VELOX_CHECK_NOT_NULL(driver); VELOX_CHECK(!nonReclaimableSection_); + const auto* config = spillConfig(); + VELOX_CHECK_NOT_NULL(config); if (UNLIKELY(exceededMaxSpillLevelLimit_)) { // 'canReclaim()' already checks the spill limit is not exceeding max, there // is only a small chance from the time 'canReclaim()' is checked to the // actual reclaim happens that the operator has spilled such that the spill // level exceeds max. - const auto* config = spillConfig(); - VELOX_CHECK_NOT_NULL(config); LOG(WARNING) << "Can't reclaim from hash build operator, exceeded maximum spill " "level of " @@ -1089,6 +1081,7 @@ void HashBuild::reclaim( VELOX_CHECK(task->pauseRequested()); const std::vector operators = task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this); + for (auto* op : operators) { HashBuild* buildOp = dynamic_cast(op); VELOX_CHECK_NOT_NULL(buildOp); @@ -1106,53 +1099,18 @@ void HashBuild::reclaim( } } - struct SpillResult { - const std::exception_ptr error{nullptr}; - - explicit SpillResult(std::exception_ptr _error) : error(_error) {} - }; - - std::vector>> spillTasks; - auto* spillExecutor = spillConfig()->executor; + std::vector spillers; for (auto* op : operators) { HashBuild* buildOp = static_cast(op); - spillTasks.push_back( - memory::createAsyncMemoryReclaimTask([buildOp]() { - try { - buildOp->spiller_->spill(); - buildOp->table_->clear(true); - // Release the minimum reserved memory. - buildOp->pool()->release(); - return std::make_unique(nullptr); - } catch (const std::exception& e) { - LOG(ERROR) << "Spill from hash build pool " - << buildOp->pool()->name() << " failed: " << e.what(); - // The exception is captured and thrown by the caller. - return std::make_unique(std::current_exception()); - } - })); - if ((operators.size() > 1) && (spillExecutor != nullptr)) { - spillExecutor->add([source = spillTasks.back()]() { source->prepare(); }); - } + spillers.push_back(buildOp->spiller_.get()); } - auto syncGuard = folly::makeGuard([&]() { - for (auto& spillTask : spillTasks) { - // We consume the result for the pending tasks. This is a cleanup in the - // guard and must not throw. The first error is already captured before - // this runs. - try { - spillTask->move(); - } catch (const std::exception&) { - } - } - }); + spillHashJoinTable(spillers, config); - for (auto& spillTask : spillTasks) { - const auto result = spillTask->move(); - if (result->error) { - std::rethrow_exception(result->error); - } + for (auto* op : operators) { + HashBuild* buildOp = static_cast(op); + buildOp->table_->clear(true); + buildOp->pool()->release(); } } diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index f3534706c0f4..74be0f4cef81 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -26,15 +26,15 @@ namespace facebook::velox::exec { -// Builds a hash table for use in HashProbe. This is the final -// Operator in a build side Driver. The build side pipeline has -// multiple Drivers, each with its own HashBuild. The build finishes -// when the last Driver of the build pipeline finishes. Hence finishHashBuild() -// has a barrier where the last one to enter gathers the data -// accumulated by the other Drivers and makes the join hash -// table. This table is then passed to the probe side pipeline via -// JoinBridge. After this, all build side Drivers finish and free -// their state. +/// Builds a hash table for use in HashProbe. This is the final +/// Operator in a build side Driver. The build side pipeline has +/// multiple Drivers, each with its own HashBuild. The build finishes +/// when the last Driver of the build pipeline finishes. Hence finishHashBuild() +/// has a barrier where the last one to enter gathers the data +/// accumulated by the other Drivers and makes the join hash +/// table. This table is then passed to the probe side pipeline via +/// JoinBridge. After this, all build side Drivers finish and free +/// their state. class HashBuild final : public Operator { public: /// Define the internal execution state for hash build. diff --git a/velox/exec/HashJoinBridge.cpp b/velox/exec/HashJoinBridge.cpp index 733d096940af..eebda2edddef 100644 --- a/velox/exec/HashJoinBridge.cpp +++ b/velox/exec/HashJoinBridge.cpp @@ -15,12 +15,43 @@ */ #include "velox/exec/HashJoinBridge.h" +#include "velox/common/memory/MemoryArbitrator.h" namespace facebook::velox::exec { namespace { static const char* kSpillProbedFlagColumnName = "__probedFlag"; } +RowTypePtr hashJoinTableType( + const std::shared_ptr& joinNode) { + const auto inputType = joinNode->sources()[1]->outputType(); + const auto numKeys = joinNode->rightKeys().size(); + + std::vector names; + names.reserve(inputType->size()); + std::vector types; + types.reserve(inputType->size()); + std::unordered_set keyChannelSet; + keyChannelSet.reserve(inputType->size()); + + for (int i = 0; i < numKeys; ++i) { + auto& key = joinNode->rightKeys()[i]; + auto channel = exprToChannel(key.get(), inputType); + keyChannelSet.insert(channel); + names.emplace_back(inputType->nameOf(channel)); + types.emplace_back(inputType->childAt(channel)); + } + + for (auto i = 0; i < inputType->size(); ++i) { + if (keyChannelSet.find(i) == keyChannelSet.end()) { + names.emplace_back(inputType->nameOf(i)); + types.emplace_back(inputType->childAt(i)); + } + } + + return ROW(std::move(names), std::move(types)); +} + void HashJoinBridge::start() { std::lock_guard l(mutex_); started_ = true; @@ -33,6 +64,127 @@ void HashJoinBridge::addBuilder() { ++numBuilders_; } +namespace { +// Create spiller for spilling the row container from one of the sub-table from +// 'table' to parallelize the table spilling. The function spills all the rows +// from the row container and returns the spiller for the caller to collect the +// spilled partitions and stats. +std::unique_ptr createSpiller( + RowContainer* subTableRows, + core::JoinType joinType, + const RowTypePtr& tableType, + const HashBitRange& hashBitRange, + const common::SpillConfig* spillConfig, + folly::Synchronized* stats) { + return std::make_unique( + Spiller::Type::kHashJoinBuild, + joinType, + subTableRows, + hashJoinTableSpillType(tableType, joinType), + hashBitRange, + spillConfig, + stats); +} +} // namespace + +std::vector> spillHashJoinTable( + const std::vector& spillers, + const common::SpillConfig* spillConfig) { + VELOX_CHECK_NOT_NULL(spillConfig); + auto spillExecutor = spillConfig->executor; + std::vector>> + spillTasks; + for (auto* spiller : spillers) { + spillTasks.push_back( + memory::createAsyncMemoryReclaimTask( + [spiller]() { + try { + spiller->spill(); + return std::make_unique(spiller); + } catch (const std::exception& e) { + LOG(ERROR) << "Spill from hash join bridge failed: " + << e.what(); + // The exception is captured and thrown by the caller. + return std::make_unique( + std::current_exception()); + } + })); + if ((spillTasks.size() > 1) && (spillExecutor != nullptr)) { + spillExecutor->add([source = spillTasks.back()]() { source->prepare(); }); + } + } + + auto syncGuard = folly::makeGuard([&]() { + for (auto& spillTask : spillTasks) { + // We consume the result for the pending tasks. This is a cleanup in the + // guard and must not throw. The first error is already captured before + // this runs. + try { + spillTask->move(); + } catch (const std::exception&) { + } + } + }); + + std::vector> spillResults; + for (auto& spillTask : spillTasks) { + auto result = spillTask->move(); + if (result->error) { + std::rethrow_exception(result->error); + } + spillResults.push_back(std::move(result)); + } + return spillResults; +} + +SpillPartitionSet spillHashJoinTable( + std::shared_ptr table, + const HashBitRange& hashBitRange, + const std::shared_ptr& joinNode, + const common::SpillConfig* spillConfig, + folly::Synchronized* stats) { + VELOX_CHECK_NOT_NULL(table); + VELOX_CHECK_NOT_NULL(spillConfig); + if (table->numDistinct() == 0) { + // Empty build side. + return {}; + } + + std::vector> spillersHolder; + std::vector spillers; + const auto rowContainers = table->allRows(); + const auto tableType = hashJoinTableType(joinNode); + for (auto* rowContainer : rowContainers) { + if (rowContainer->numRows() == 0) { + continue; + } + spillersHolder.push_back(createSpiller( + rowContainer, + joinNode->joinType(), + tableType, + hashBitRange, + spillConfig, + stats)); + spillers.push_back(spillersHolder.back().get()); + } + if (spillersHolder.empty()) { + return {}; + } + + const auto spillResults = spillHashJoinTable(spillers, spillConfig); + + SpillPartitionSet spillPartitions; + for (const auto& spillResult : spillResults) { + VELOX_CHECK_NULL(spillResult->error); + spillResult->spiller->finishSpill(spillPartitions); + } + + // Remove the spilled partitions which are empty so as we don't need to + // trigger unnecessary spilling at hash probe side. + removeEmptyPartitions(spillPartitions); + return spillPartitions; +} + void HashJoinBridge::setHashTable( std::unique_ptr table, SpillPartitionSet spillPartitionSet, diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index 6661b5927f0e..a869684c5094 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -55,12 +55,10 @@ class HashJoinBridge : public JoinBridge { void setAntiJoinHasNullKeys(); - /// Represents the result of HashBuild operators: a hash table, an optional - /// restored spill partition id associated with the table, and the spilled - /// partitions while building the table if not empty. In case of an anti join, - /// a build side entry with a null in a join key makes the join return - /// nothing. In this case, HashBuild operators finishes early without - /// processing all the input and without finishing building the hash table. + /// Represents the result of HashBuild operators. In case of an anti join, a + /// build side entry with a null in a join key makes the join return nothing. + /// In this case, HashBuild operators finishes early without processing all + /// the input and without finishing building the hash table. struct HashBuildResult { HashBuildResult( std::shared_ptr _table, @@ -76,7 +74,14 @@ class HashJoinBridge : public JoinBridge { bool hasNullKeys; std::shared_ptr table; + + /// Restored spill partition id associated with 'table', null if 'table' is + /// not built from restoration. std::optional restoredPartitionId; + + /// Spilled partitions while building hash table. Since we don't support + /// fine-grained spilling for hash table, either 'table' is empty or + /// 'spillPartitionIds' is empty. SpillPartitionIdSet spillPartitionIds; }; @@ -115,6 +120,8 @@ class HashJoinBridge : public JoinBridge { private: uint32_t numBuilders_{0}; + // The result of the build side. It is set by the last build operator when + // build is done. std::optional buildResult_; // restoringSpillPartitionXxx member variables are populated by the @@ -123,7 +130,7 @@ class HashJoinBridge : public JoinBridge { // among the HashBuild operators and notifies these operators that they can // start building HashTables from these shards. - // If not null, set to the currently restoring spill partition id. + // If not null, set to the currently restoring table spill partition id. std::optional restoringSpillPartitionId_; // If 'restoringSpillPartitionId_' is not null, this set to the restoring @@ -172,6 +179,35 @@ bool isHashProbeMemoryPool(const memory::MemoryPool& pool); bool needRightSideJoin(core::JoinType joinType); +/// Returns the type of the hash table associated with this join. +RowTypePtr hashJoinTableType( + const std::shared_ptr& joinNode); + +struct HashJoinTableSpillResult { + Spiller* spiller{nullptr}; + const std::exception_ptr error{nullptr}; + + explicit HashJoinTableSpillResult(std::exception_ptr _error) + : error(_error) {} + explicit HashJoinTableSpillResult(Spiller* _spiller) : spiller(_spiller) {} +}; + +/// Invoked to spill the hash table from a set of spillers. If 'spillExecutor' +/// is provided, then we do parallel spill. This is used by hash build to spill +/// a partially built hash join table. +std::vector> spillHashJoinTable( + const std::vector& spillers, + const common::SpillConfig* spillConfig); + +/// Invoked to spill 'table' and returns spilled partitions. This is used by +/// hash probe or hash join bridge to spill a fully built table. +SpillPartitionSet spillHashJoinTable( + std::shared_ptr table, + const HashBitRange& hashBitRange, + const std::shared_ptr& joinNode, + const common::SpillConfig* spillConfig, + folly::Synchronized* stats); + /// Returns the type used to spill a given hash table type. The function /// might attach a boolean column at the end of 'tableType' if 'joinType' needs /// right side join processing. It is used by the hash join table spilling diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 3b9dc93f1c78..c2e6eb0b4531 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -270,7 +270,7 @@ void HashProbe::maybeSetupInputSpiller( // spill input, then we shall just finish the spiller and records the spilled // partition set accordingly. if (noMoreSpillInput_) { - inputSpiller_->finishSpill(spillPartitionSet_); + inputSpiller_->finishSpill(inputSpillPartitionSet_); } } @@ -283,13 +283,13 @@ void HashProbe::maybeSetupSpillInputReader( // If 'restoredPartitionId' is not null, then 'table_' is built from the // spilled build data. Create an unsorted reader to read the probe inputs from // the corresponding spilled probe partition on disk. - auto iter = spillPartitionSet_.find(restoredPartitionId.value()); - VELOX_CHECK(iter != spillPartitionSet_.end()); + auto iter = inputSpillPartitionSet_.find(restoredPartitionId.value()); + VELOX_CHECK(iter != inputSpillPartitionSet_.end()); auto partition = std::move(iter->second); VELOX_CHECK_EQ(partition->id(), restoredPartitionId.value()); spillInputReader_ = partition->createUnorderedReader( spillConfig_->readBufferSize, pool(), &spillStats_); - spillPartitionSet_.erase(iter); + inputSpillPartitionSet_.erase(iter); } void HashProbe::initializeResultIter() { @@ -337,7 +337,7 @@ void HashProbe::asyncWaitForHashTable() { // Null-aware anti join with null keys on the build side without a filter // always returns nothing. // The flag must be set on the first (and only) built 'table_'. - VELOX_CHECK(spillPartitionSet_.empty()); + VELOX_CHECK(inputSpillPartitionSet_.empty()); noMoreInput(); return; } @@ -351,11 +351,11 @@ void HashProbe::asyncWaitForHashTable() { maybeSetupSpillInputReader(hashBuildResult->restoredPartitionId); maybeSetupInputSpiller(hashBuildResult->spillPartitionIds); - prepareTableSpill(hashBuildResult->restoredPartitionId); + checkMaxSpillLevel(hashBuildResult->restoredPartitionId); if (table_->numDistinct() == 0) { if (skipProbeOnEmptyBuild()) { - if (!needSpillInput()) { + if (!needToSpillInput()) { if (isSpillInput() || operatorCtx_->driverCtx() ->queryConfig() @@ -467,7 +467,7 @@ void HashProbe::addSpillInput() { } void HashProbe::spillInput(RowVectorPtr& input) { - VELOX_CHECK(needSpillInput()); + VELOX_CHECK(needToSpillInput()); const auto numInput = input->size(); prepareInputIndicesBuffers( @@ -622,7 +622,7 @@ void HashProbe::addInput(RowVectorPtr input) { bool hasDecoded = false; - if (needSpillInput()) { + if (needToSpillInput()) { if (isRightSemiProjectJoin(joinType_) && !probeSideHasNullKeys_) { decodeAndDetectNonNullKeys(); hasDecoded = true; @@ -637,7 +637,7 @@ void HashProbe::addInput(RowVectorPtr input) { if (table_->numDistinct() == 0) { if (skipProbeOnEmptyBuild()) { - VELOX_CHECK(needSpillInput()); + VELOX_CHECK(needToSpillInput()); input_ = nullptr; return; } @@ -882,11 +882,11 @@ bool HashProbe::canSpill() const { } bool HashProbe::hasMoreSpillData() const { - VELOX_CHECK(spillPartitionSet_.empty() || canSpill()); - return !spillPartitionSet_.empty() || needSpillInput(); + VELOX_CHECK(inputSpillPartitionSet_.empty() || canSpill()); + return !inputSpillPartitionSet_.empty() || needToSpillInput(); } -bool HashProbe::needSpillInput() const { +bool HashProbe::needToSpillInput() const { VELOX_CHECK(spillInputPartitionIds_.empty() || canSpill()); VELOX_CHECK_EQ(spillInputPartitionIds_.empty(), inputSpiller_ == nullptr); @@ -1573,7 +1573,7 @@ void HashProbe::noMoreInputInternal() { VELOX_CHECK_EQ( spillInputPartitionIds_.size(), inputSpiller_->spilledPartitionSet().size()); - inputSpiller_->finishSpill(spillPartitionSet_); + inputSpiller_->finishSpill(inputSpillPartitionSet_); VELOX_CHECK_EQ(spillStats_.rlock()->spillSortTimeNanos, 0); } @@ -1750,7 +1750,8 @@ void HashProbe::reclaim( if (hasMoreProbeInput) { // Only spill hash table if any hash probe operators still has input probe // data, otherwise we skip this step. - spillPartitionSet = spillTable(); + spillPartitionSet = spillHashJoinTable( + table_, tableSpillHashBits_, joinNode_, spillConfig(), &spillStats_); VELOX_CHECK(!spillPartitionSet.empty()); } const auto spillPartitionIdSet = toSpillPartitionIdSet(spillPartitionSet); @@ -1881,82 +1882,7 @@ void HashProbe::maybeSetupSpillOutputReader() { spillOutputPartitionSet_.clear(); } -SpillPartitionSet HashProbe::spillTable() { - struct SpillResult { - std::unique_ptr spiller{nullptr}; - const std::exception_ptr error{nullptr}; - - explicit SpillResult(std::exception_ptr _error) : error(_error) {} - explicit SpillResult(std::unique_ptr _spiller) - : spiller(std::move(_spiller)) {} - }; - - const std::vector rowContainers = table_->allRows(); - std::vector>> spillTasks; - auto* spillExecutor = spillConfig()->executor; - for (auto* rowContainer : rowContainers) { - if (rowContainer->numRows() == 0) { - continue; - } - spillTasks.push_back(memory::createAsyncMemoryReclaimTask( - [this, rowContainer]() { - try { - return std::make_unique(spillTable(rowContainer)); - } catch (const std::exception& e) { - LOG(ERROR) << "Spill sub-table from hash probe pool " - << pool()->name() << " failed: " << e.what(); - // The exception is captured and thrown by the caller. - return std::make_unique(std::current_exception()); - } - })); - if ((spillTasks.size() > 1) && (spillExecutor != nullptr)) { - spillExecutor->add([source = spillTasks.back()]() { source->prepare(); }); - } - } - - auto syncGuard = folly::makeGuard([&]() { - for (auto& spillTask : spillTasks) { - // We consume the result for the pending tasks. This is a cleanup in the - // guard and must not throw. The first error is already captured before - // this runs. - try { - spillTask->move(); - } catch (const std::exception&) { - } - } - }); - - SpillPartitionSet spillPartitions; - for (auto& spillTask : spillTasks) { - const auto result = spillTask->move(); - if (result->error) { - std::rethrow_exception(result->error); - } - result->spiller->finishSpill(spillPartitions); - } - - // Remove the spilled partitions which are empty so as we don't need to - // trigger unnecessary spilling at hash probe side. - removeEmptyPartitions(spillPartitions); - return spillPartitions; -} - -std::unique_ptr HashProbe::spillTable(RowContainer* subTableRows) { - VELOX_CHECK_NOT_NULL(tableSpillType_); - - auto tableSpiller = std::make_unique( - Spiller::Type::kHashJoinBuild, - joinType_, - subTableRows, - tableSpillType_, - std::move(tableSpillHashBits_), - spillConfig(), - &spillStats_); - tableSpiller->spill(); - return tableSpiller; -} - -void HashProbe::prepareTableSpill( +void HashProbe::checkMaxSpillLevel( const std::optional& restoredPartitionId) { if (!canSpill()) { return; @@ -1980,40 +1906,8 @@ void HashProbe::prepareTableSpill( } } exceededMaxSpillLevelLimit_ = false; - tableSpillHashBits_ = HashBitRange( startPartitionBit, startPartitionBit + config->numPartitionBits); - - // NOTE: we only need to init 'tableSpillType_' once. - if (tableSpillType_ != nullptr) { - return; - } - - const auto& tableInputType = joinNode_->sources()[1]->outputType(); - std::vector names; - names.reserve(tableInputType->size()); - std::vector types; - types.reserve(tableInputType->size()); - const auto numKeys = joinNode_->rightKeys().size(); - - // Identify the non-key build side columns. - folly::F14FastMap keyChannelMap; - for (int i = 0; i < numKeys; ++i) { - const auto& key = joinNode_->rightKeys()[i]; - const auto channel = exprToChannel(key.get(), tableInputType); - keyChannelMap[channel] = i; - names.emplace_back(tableInputType->nameOf(channel)); - types.emplace_back(tableInputType->childAt(channel)); - } - const auto numDependents = tableInputType->size() - numKeys; - for (auto i = 0; i < tableInputType->size(); ++i) { - if (keyChannelMap.find(i) == keyChannelMap.end()) { - names.emplace_back(tableInputType->nameOf(i)); - types.emplace_back(tableInputType->childAt(i)); - } - } - tableSpillType_ = hashJoinTableSpillType( - ROW(std::move(names), std::move(types)), joinType_); } void HashProbe::close() { diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 865048356074..121120f991f5 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -201,9 +201,9 @@ class HashProbe : public Operator { void maybeSetupSpillInputReader( const std::optional& restoredSpillPartitionId); - // Prepares the table spill by checking the spill level limit, setting spill - // partition bits and table spill type. - void prepareTableSpill( + // Checks the hash table's spill level limit from the restored table. Sets the + // 'exceededMaxSpillLevelLimit_' accordingly. + void checkMaxSpillLevel( const std::optional& restoredPartitionId); bool canSpill() const override; @@ -218,7 +218,7 @@ class HashProbe : public Operator { // Indicates if the operator needs to spill probe inputs. It is true if parts // of the build-side rows have been spilled. Hence, the probe operator needs // to spill the corresponding probe-side rows as well. - bool needSpillInput() const; + bool needToSpillInput() const; // This ensures there is sufficient buffer reserved to produce the next output // batch. This might trigger memory arbitration underneath and the probe @@ -252,14 +252,6 @@ class HashProbe : public Operator { // Produces and spills output from this probe operator. void spillOutput(); - // Spills the composed 'table_' from the built side. - SpillPartitionSet spillTable(); - // Spills the row container from one of the sub-table from 'table_' to - // parallelize the table spilling. The function spills all the rows from the - // row container and returns the spiller for the caller to collect the spilled - // partitions and stats. - std::unique_ptr spillTable(RowContainer* subTableRows); - // Invoked to spill rows in 'input' to disk directly if the corresponding // partitions have been spilled at the build side. // @@ -300,7 +292,7 @@ class HashProbe : public Operator { // restore. Also note that the spilled partition at build side must not be // empty. bool emptyBuildSide() const { - return table_->numDistinct() == 0 && spillPartitionSet_.empty() && + return table_->numDistinct() == 0 && inputSpillPartitionSet_.empty() && spillInputPartitionIds_.empty(); } @@ -373,8 +365,8 @@ class HashProbe : public Operator { std::vector> hashers_; - // Table shared between other HashProbes in other Drivers of the same - // pipeline. + // Current working hash table that is shared between other HashProbes in other + // Drivers of the same pipeline. std::shared_ptr table_; // Indicates whether there was no input. Used for right semi join project. @@ -624,8 +616,6 @@ class HashProbe : public Operator { // The partition bits used to spill the hash table. HashBitRange tableSpillHashBits_; - // The row type used to spill hash table on disk. - RowTypePtr tableSpillType_; // The spilled output partition set which is cleared after setup // 'spillOutputReader_'. @@ -668,7 +658,7 @@ class HashProbe : public Operator { bool noMoreSpillInput_{false}; // The spilled probe partitions remaining to restore. - SpillPartitionSet spillPartitionSet_; + SpillPartitionSet inputSpillPartitionSet_; }; inline std::ostream& operator<<(std::ostream& os, ProbeOperatorState state) { diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 7f504c58e42b..434ca9570976 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -342,8 +342,8 @@ class BaseHashTable { int8_t spillInputStartPartitionBit, bool disableRangeArrayHash = false) = 0; - // Removes 'rows' from the hash table and its RowContainer. 'rows' must exist - // and be unique. + /// Removes 'rows' from the hash table and its RowContainer. 'rows' must exist + /// and be unique. virtual void erase(folly::Range rows) = 0; /// Returns a brief description for use in debugging. diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 3b65536ba04c..04e5bfbec9fc 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -486,9 +486,8 @@ column_index_t exprToChannel( if (dynamic_cast(expr)) { return kConstantChannel; } - VELOX_FAIL( + VELOX_UNREACHABLE( "Expression must be field access or constant, got: {}", expr->toString()); - return 0; // not reached. } std::vector calculateOutputChannels( diff --git a/velox/exec/tests/HashJoinBridgeTest.cpp b/velox/exec/tests/HashJoinBridgeTest.cpp index fce7c01303f8..fd4024805241 100644 --- a/velox/exec/tests/HashJoinBridgeTest.cpp +++ b/velox/exec/tests/HashJoinBridgeTest.cpp @@ -586,6 +586,90 @@ TEST(HashJoinBridgeTest, needRightSideJoin) { } } +TEST_P(HashJoinBridgeTest, hashJoinTableType) { + core::TypedExprPtr filter{ + std::make_shared(BOOLEAN(), true)}; + struct TestSetting { + core::JoinType joinType; + RowTypePtr probeKeyType; + RowTypePtr buildKeyType; + RowTypePtr probeSourceType; + RowTypePtr buildSourceType; + std::string debugString() const { + return fmt::format( + "joinType {} probeKeyType {} buildKeyType {} probeSourceType {} buildSourceType {}", + joinType, + probeKeyType->toString(), + buildKeyType->toString(), + buildSourceType->toString(), + probeSourceType->toString()); + } + }; + std::vector testSettings{ + {core::JoinType::kInner, + ROW({"p0"}, {BIGINT()}), + ROW({"b0"}, {BIGINT()}), + ROW({"p0", "p1"}, {BIGINT(), BIGINT()}), + ROW({"b0", "b1"}, {BIGINT(), BIGINT()})}, + {core::JoinType::kRight, + ROW({"p1", "p0"}, {BIGINT(), BIGINT()}), + ROW({"b1", "b0"}, {BIGINT(), BIGINT()}), + ROW({"p0", "p1"}, {BIGINT(), BIGINT()}), + ROW({"b0", "b1"}, {BIGINT(), BIGINT()})}, + {core::JoinType::kLeft, + ROW({"p1"}, {BIGINT()}), + ROW({"b1"}, {BIGINT()}), + ROW({"p0", "p1"}, {BIGINT(), BIGINT()}), + ROW({"b0", "b1"}, {BIGINT(), BIGINT()})}}; + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + const auto emptyBuildVector = {std::make_shared( + pool_.get(), + testData.buildSourceType, + nullptr, // nulls + 0, + std::vector{})}; + const auto buildValueNode = + std::make_shared("buildValueNode", emptyBuildVector); + + const auto emptyProbeVectors = {std::make_shared( + pool_.get(), + testData.probeSourceType, + nullptr, // nulls + 0, + std::vector{})}; + const auto probeValueNode = + std::make_shared("probeValueNode", emptyProbeVectors); + + std::vector buildKeys; + std::vector probeKeys; + for (uint32_t i = 0; i < testData.buildKeyType->size(); i++) { + buildKeys.push_back(std::make_shared( + testData.buildKeyType->childAt(i), testData.buildKeyType->nameOf(i))); + } + for (uint32_t i = 0; i < testData.probeKeyType->size(); i++) { + probeKeys.push_back(std::make_shared( + testData.probeKeyType->childAt(i), testData.probeKeyType->nameOf(i))); + } + const auto joinNode = std::make_shared( + "join-bridge-test", + testData.joinType, + false, + probeKeys, + buildKeys, + filter, + probeValueNode, + buildValueNode, + ROW({})); + + auto tableType = hashJoinTableType(joinNode); + ASSERT_EQ(tableType->size(), testData.buildSourceType->size()); + for (uint32_t i = 0; i < buildKeys.size(); i++) { + ASSERT_EQ(tableType->childAt(i), testData.buildKeyType->childAt(i)); + } + } +} + TEST(HashJoinBridgeTest, hashJoinTableSpillType) { const RowTypePtr tableType = ROW({"k1", "k2"}, {BIGINT(), BIGINT()}); const RowTypePtr spillTypeWithProbedFlag =