diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index 42da1fbbd6fd..9ae79de3745e 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -77,18 +77,12 @@ HashBuild::HashBuild( const auto numKeys = joinNode_->rightKeys().size(); keyChannels_.reserve(numKeys); - std::vector names; - names.reserve(inputType->size()); - std::vector types; - types.reserve(inputType->size()); for (int i = 0; i < numKeys; ++i) { auto& key = joinNode_->rightKeys()[i]; auto channel = exprToChannel(key.get(), inputType); keyChannelMap_[channel] = i; keyChannels_.emplace_back(channel); - names.emplace_back(inputType->nameOf(channel)); - types.emplace_back(inputType->childAt(channel)); } // Identify the non-key build side columns and make a decoder for each. @@ -106,12 +100,10 @@ HashBuild::HashBuild( if (keyChannelMap_.find(i) == keyChannelMap_.end()) { dependentChannels_.emplace_back(i); decoders_.emplace_back(std::make_unique()); - names.emplace_back(inputType->nameOf(i)); - types.emplace_back(inputType->childAt(i)); } } - tableType_ = ROW(std::move(names), std::move(types)); + tableType_ = hashJoinTableType(joinNode_); setupTable(); setupSpiller(); stateCleared_ = false; @@ -1053,13 +1045,13 @@ void HashBuild::reclaim( VELOX_CHECK_NOT_NULL(driver); VELOX_CHECK(!nonReclaimableSection_); + const auto* config = spillConfig(); + VELOX_CHECK_NOT_NULL(config); if (UNLIKELY(exceededMaxSpillLevelLimit_)) { // 'canReclaim()' already checks the spill limit is not exceeding max, there // is only a small chance from the time 'canReclaim()' is checked to the // actual reclaim happens that the operator has spilled such that the spill // level exceeds max. - const auto* config = spillConfig(); - VELOX_CHECK_NOT_NULL(config); LOG(WARNING) << "Can't reclaim from hash build operator, exceeded maximum spill " "level of " @@ -1089,6 +1081,7 @@ void HashBuild::reclaim( VELOX_CHECK(task->pauseRequested()); const std::vector operators = task->findPeerOperators(operatorCtx_->driverCtx()->pipelineId, this); + for (auto* op : operators) { HashBuild* buildOp = dynamic_cast(op); VELOX_CHECK_NOT_NULL(buildOp); @@ -1106,53 +1099,18 @@ void HashBuild::reclaim( } } - struct SpillResult { - const std::exception_ptr error{nullptr}; - - explicit SpillResult(std::exception_ptr _error) : error(_error) {} - }; - - std::vector>> spillTasks; - auto* spillExecutor = spillConfig()->executor; + std::vector spillers; for (auto* op : operators) { HashBuild* buildOp = static_cast(op); - spillTasks.push_back( - memory::createAsyncMemoryReclaimTask([buildOp]() { - try { - buildOp->spiller_->spill(); - buildOp->table_->clear(true); - // Release the minimum reserved memory. - buildOp->pool()->release(); - return std::make_unique(nullptr); - } catch (const std::exception& e) { - LOG(ERROR) << "Spill from hash build pool " - << buildOp->pool()->name() << " failed: " << e.what(); - // The exception is captured and thrown by the caller. - return std::make_unique(std::current_exception()); - } - })); - if ((operators.size() > 1) && (spillExecutor != nullptr)) { - spillExecutor->add([source = spillTasks.back()]() { source->prepare(); }); - } + spillers.push_back(buildOp->spiller_.get()); } - auto syncGuard = folly::makeGuard([&]() { - for (auto& spillTask : spillTasks) { - // We consume the result for the pending tasks. This is a cleanup in the - // guard and must not throw. The first error is already captured before - // this runs. - try { - spillTask->move(); - } catch (const std::exception&) { - } - } - }); + spillHashJoinTable(spillers, config); - for (auto& spillTask : spillTasks) { - const auto result = spillTask->move(); - if (result->error) { - std::rethrow_exception(result->error); - } + for (auto* op : operators) { + HashBuild* buildOp = static_cast(op); + buildOp->table_->clear(true); + buildOp->pool()->release(); } } diff --git a/velox/exec/HashBuild.h b/velox/exec/HashBuild.h index f3534706c0f4..74be0f4cef81 100644 --- a/velox/exec/HashBuild.h +++ b/velox/exec/HashBuild.h @@ -26,15 +26,15 @@ namespace facebook::velox::exec { -// Builds a hash table for use in HashProbe. This is the final -// Operator in a build side Driver. The build side pipeline has -// multiple Drivers, each with its own HashBuild. The build finishes -// when the last Driver of the build pipeline finishes. Hence finishHashBuild() -// has a barrier where the last one to enter gathers the data -// accumulated by the other Drivers and makes the join hash -// table. This table is then passed to the probe side pipeline via -// JoinBridge. After this, all build side Drivers finish and free -// their state. +/// Builds a hash table for use in HashProbe. This is the final +/// Operator in a build side Driver. The build side pipeline has +/// multiple Drivers, each with its own HashBuild. The build finishes +/// when the last Driver of the build pipeline finishes. Hence finishHashBuild() +/// has a barrier where the last one to enter gathers the data +/// accumulated by the other Drivers and makes the join hash +/// table. This table is then passed to the probe side pipeline via +/// JoinBridge. After this, all build side Drivers finish and free +/// their state. class HashBuild final : public Operator { public: /// Define the internal execution state for hash build. diff --git a/velox/exec/HashJoinBridge.cpp b/velox/exec/HashJoinBridge.cpp index 733d096940af..eebda2edddef 100644 --- a/velox/exec/HashJoinBridge.cpp +++ b/velox/exec/HashJoinBridge.cpp @@ -15,12 +15,43 @@ */ #include "velox/exec/HashJoinBridge.h" +#include "velox/common/memory/MemoryArbitrator.h" namespace facebook::velox::exec { namespace { static const char* kSpillProbedFlagColumnName = "__probedFlag"; } +RowTypePtr hashJoinTableType( + const std::shared_ptr& joinNode) { + const auto inputType = joinNode->sources()[1]->outputType(); + const auto numKeys = joinNode->rightKeys().size(); + + std::vector names; + names.reserve(inputType->size()); + std::vector types; + types.reserve(inputType->size()); + std::unordered_set keyChannelSet; + keyChannelSet.reserve(inputType->size()); + + for (int i = 0; i < numKeys; ++i) { + auto& key = joinNode->rightKeys()[i]; + auto channel = exprToChannel(key.get(), inputType); + keyChannelSet.insert(channel); + names.emplace_back(inputType->nameOf(channel)); + types.emplace_back(inputType->childAt(channel)); + } + + for (auto i = 0; i < inputType->size(); ++i) { + if (keyChannelSet.find(i) == keyChannelSet.end()) { + names.emplace_back(inputType->nameOf(i)); + types.emplace_back(inputType->childAt(i)); + } + } + + return ROW(std::move(names), std::move(types)); +} + void HashJoinBridge::start() { std::lock_guard l(mutex_); started_ = true; @@ -33,6 +64,127 @@ void HashJoinBridge::addBuilder() { ++numBuilders_; } +namespace { +// Create spiller for spilling the row container from one of the sub-table from +// 'table' to parallelize the table spilling. The function spills all the rows +// from the row container and returns the spiller for the caller to collect the +// spilled partitions and stats. +std::unique_ptr createSpiller( + RowContainer* subTableRows, + core::JoinType joinType, + const RowTypePtr& tableType, + const HashBitRange& hashBitRange, + const common::SpillConfig* spillConfig, + folly::Synchronized* stats) { + return std::make_unique( + Spiller::Type::kHashJoinBuild, + joinType, + subTableRows, + hashJoinTableSpillType(tableType, joinType), + hashBitRange, + spillConfig, + stats); +} +} // namespace + +std::vector> spillHashJoinTable( + const std::vector& spillers, + const common::SpillConfig* spillConfig) { + VELOX_CHECK_NOT_NULL(spillConfig); + auto spillExecutor = spillConfig->executor; + std::vector>> + spillTasks; + for (auto* spiller : spillers) { + spillTasks.push_back( + memory::createAsyncMemoryReclaimTask( + [spiller]() { + try { + spiller->spill(); + return std::make_unique(spiller); + } catch (const std::exception& e) { + LOG(ERROR) << "Spill from hash join bridge failed: " + << e.what(); + // The exception is captured and thrown by the caller. + return std::make_unique( + std::current_exception()); + } + })); + if ((spillTasks.size() > 1) && (spillExecutor != nullptr)) { + spillExecutor->add([source = spillTasks.back()]() { source->prepare(); }); + } + } + + auto syncGuard = folly::makeGuard([&]() { + for (auto& spillTask : spillTasks) { + // We consume the result for the pending tasks. This is a cleanup in the + // guard and must not throw. The first error is already captured before + // this runs. + try { + spillTask->move(); + } catch (const std::exception&) { + } + } + }); + + std::vector> spillResults; + for (auto& spillTask : spillTasks) { + auto result = spillTask->move(); + if (result->error) { + std::rethrow_exception(result->error); + } + spillResults.push_back(std::move(result)); + } + return spillResults; +} + +SpillPartitionSet spillHashJoinTable( + std::shared_ptr table, + const HashBitRange& hashBitRange, + const std::shared_ptr& joinNode, + const common::SpillConfig* spillConfig, + folly::Synchronized* stats) { + VELOX_CHECK_NOT_NULL(table); + VELOX_CHECK_NOT_NULL(spillConfig); + if (table->numDistinct() == 0) { + // Empty build side. + return {}; + } + + std::vector> spillersHolder; + std::vector spillers; + const auto rowContainers = table->allRows(); + const auto tableType = hashJoinTableType(joinNode); + for (auto* rowContainer : rowContainers) { + if (rowContainer->numRows() == 0) { + continue; + } + spillersHolder.push_back(createSpiller( + rowContainer, + joinNode->joinType(), + tableType, + hashBitRange, + spillConfig, + stats)); + spillers.push_back(spillersHolder.back().get()); + } + if (spillersHolder.empty()) { + return {}; + } + + const auto spillResults = spillHashJoinTable(spillers, spillConfig); + + SpillPartitionSet spillPartitions; + for (const auto& spillResult : spillResults) { + VELOX_CHECK_NULL(spillResult->error); + spillResult->spiller->finishSpill(spillPartitions); + } + + // Remove the spilled partitions which are empty so as we don't need to + // trigger unnecessary spilling at hash probe side. + removeEmptyPartitions(spillPartitions); + return spillPartitions; +} + void HashJoinBridge::setHashTable( std::unique_ptr table, SpillPartitionSet spillPartitionSet, diff --git a/velox/exec/HashJoinBridge.h b/velox/exec/HashJoinBridge.h index 6661b5927f0e..a869684c5094 100644 --- a/velox/exec/HashJoinBridge.h +++ b/velox/exec/HashJoinBridge.h @@ -55,12 +55,10 @@ class HashJoinBridge : public JoinBridge { void setAntiJoinHasNullKeys(); - /// Represents the result of HashBuild operators: a hash table, an optional - /// restored spill partition id associated with the table, and the spilled - /// partitions while building the table if not empty. In case of an anti join, - /// a build side entry with a null in a join key makes the join return - /// nothing. In this case, HashBuild operators finishes early without - /// processing all the input and without finishing building the hash table. + /// Represents the result of HashBuild operators. In case of an anti join, a + /// build side entry with a null in a join key makes the join return nothing. + /// In this case, HashBuild operators finishes early without processing all + /// the input and without finishing building the hash table. struct HashBuildResult { HashBuildResult( std::shared_ptr _table, @@ -76,7 +74,14 @@ class HashJoinBridge : public JoinBridge { bool hasNullKeys; std::shared_ptr table; + + /// Restored spill partition id associated with 'table', null if 'table' is + /// not built from restoration. std::optional restoredPartitionId; + + /// Spilled partitions while building hash table. Since we don't support + /// fine-grained spilling for hash table, either 'table' is empty or + /// 'spillPartitionIds' is empty. SpillPartitionIdSet spillPartitionIds; }; @@ -115,6 +120,8 @@ class HashJoinBridge : public JoinBridge { private: uint32_t numBuilders_{0}; + // The result of the build side. It is set by the last build operator when + // build is done. std::optional buildResult_; // restoringSpillPartitionXxx member variables are populated by the @@ -123,7 +130,7 @@ class HashJoinBridge : public JoinBridge { // among the HashBuild operators and notifies these operators that they can // start building HashTables from these shards. - // If not null, set to the currently restoring spill partition id. + // If not null, set to the currently restoring table spill partition id. std::optional restoringSpillPartitionId_; // If 'restoringSpillPartitionId_' is not null, this set to the restoring @@ -172,6 +179,35 @@ bool isHashProbeMemoryPool(const memory::MemoryPool& pool); bool needRightSideJoin(core::JoinType joinType); +/// Returns the type of the hash table associated with this join. +RowTypePtr hashJoinTableType( + const std::shared_ptr& joinNode); + +struct HashJoinTableSpillResult { + Spiller* spiller{nullptr}; + const std::exception_ptr error{nullptr}; + + explicit HashJoinTableSpillResult(std::exception_ptr _error) + : error(_error) {} + explicit HashJoinTableSpillResult(Spiller* _spiller) : spiller(_spiller) {} +}; + +/// Invoked to spill the hash table from a set of spillers. If 'spillExecutor' +/// is provided, then we do parallel spill. This is used by hash build to spill +/// a partially built hash join table. +std::vector> spillHashJoinTable( + const std::vector& spillers, + const common::SpillConfig* spillConfig); + +/// Invoked to spill 'table' and returns spilled partitions. This is used by +/// hash probe or hash join bridge to spill a fully built table. +SpillPartitionSet spillHashJoinTable( + std::shared_ptr table, + const HashBitRange& hashBitRange, + const std::shared_ptr& joinNode, + const common::SpillConfig* spillConfig, + folly::Synchronized* stats); + /// Returns the type used to spill a given hash table type. The function /// might attach a boolean column at the end of 'tableType' if 'joinType' needs /// right side join processing. It is used by the hash join table spilling diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index 3b9dc93f1c78..c2e6eb0b4531 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -270,7 +270,7 @@ void HashProbe::maybeSetupInputSpiller( // spill input, then we shall just finish the spiller and records the spilled // partition set accordingly. if (noMoreSpillInput_) { - inputSpiller_->finishSpill(spillPartitionSet_); + inputSpiller_->finishSpill(inputSpillPartitionSet_); } } @@ -283,13 +283,13 @@ void HashProbe::maybeSetupSpillInputReader( // If 'restoredPartitionId' is not null, then 'table_' is built from the // spilled build data. Create an unsorted reader to read the probe inputs from // the corresponding spilled probe partition on disk. - auto iter = spillPartitionSet_.find(restoredPartitionId.value()); - VELOX_CHECK(iter != spillPartitionSet_.end()); + auto iter = inputSpillPartitionSet_.find(restoredPartitionId.value()); + VELOX_CHECK(iter != inputSpillPartitionSet_.end()); auto partition = std::move(iter->second); VELOX_CHECK_EQ(partition->id(), restoredPartitionId.value()); spillInputReader_ = partition->createUnorderedReader( spillConfig_->readBufferSize, pool(), &spillStats_); - spillPartitionSet_.erase(iter); + inputSpillPartitionSet_.erase(iter); } void HashProbe::initializeResultIter() { @@ -337,7 +337,7 @@ void HashProbe::asyncWaitForHashTable() { // Null-aware anti join with null keys on the build side without a filter // always returns nothing. // The flag must be set on the first (and only) built 'table_'. - VELOX_CHECK(spillPartitionSet_.empty()); + VELOX_CHECK(inputSpillPartitionSet_.empty()); noMoreInput(); return; } @@ -351,11 +351,11 @@ void HashProbe::asyncWaitForHashTable() { maybeSetupSpillInputReader(hashBuildResult->restoredPartitionId); maybeSetupInputSpiller(hashBuildResult->spillPartitionIds); - prepareTableSpill(hashBuildResult->restoredPartitionId); + checkMaxSpillLevel(hashBuildResult->restoredPartitionId); if (table_->numDistinct() == 0) { if (skipProbeOnEmptyBuild()) { - if (!needSpillInput()) { + if (!needToSpillInput()) { if (isSpillInput() || operatorCtx_->driverCtx() ->queryConfig() @@ -467,7 +467,7 @@ void HashProbe::addSpillInput() { } void HashProbe::spillInput(RowVectorPtr& input) { - VELOX_CHECK(needSpillInput()); + VELOX_CHECK(needToSpillInput()); const auto numInput = input->size(); prepareInputIndicesBuffers( @@ -622,7 +622,7 @@ void HashProbe::addInput(RowVectorPtr input) { bool hasDecoded = false; - if (needSpillInput()) { + if (needToSpillInput()) { if (isRightSemiProjectJoin(joinType_) && !probeSideHasNullKeys_) { decodeAndDetectNonNullKeys(); hasDecoded = true; @@ -637,7 +637,7 @@ void HashProbe::addInput(RowVectorPtr input) { if (table_->numDistinct() == 0) { if (skipProbeOnEmptyBuild()) { - VELOX_CHECK(needSpillInput()); + VELOX_CHECK(needToSpillInput()); input_ = nullptr; return; } @@ -882,11 +882,11 @@ bool HashProbe::canSpill() const { } bool HashProbe::hasMoreSpillData() const { - VELOX_CHECK(spillPartitionSet_.empty() || canSpill()); - return !spillPartitionSet_.empty() || needSpillInput(); + VELOX_CHECK(inputSpillPartitionSet_.empty() || canSpill()); + return !inputSpillPartitionSet_.empty() || needToSpillInput(); } -bool HashProbe::needSpillInput() const { +bool HashProbe::needToSpillInput() const { VELOX_CHECK(spillInputPartitionIds_.empty() || canSpill()); VELOX_CHECK_EQ(spillInputPartitionIds_.empty(), inputSpiller_ == nullptr); @@ -1573,7 +1573,7 @@ void HashProbe::noMoreInputInternal() { VELOX_CHECK_EQ( spillInputPartitionIds_.size(), inputSpiller_->spilledPartitionSet().size()); - inputSpiller_->finishSpill(spillPartitionSet_); + inputSpiller_->finishSpill(inputSpillPartitionSet_); VELOX_CHECK_EQ(spillStats_.rlock()->spillSortTimeNanos, 0); } @@ -1750,7 +1750,8 @@ void HashProbe::reclaim( if (hasMoreProbeInput) { // Only spill hash table if any hash probe operators still has input probe // data, otherwise we skip this step. - spillPartitionSet = spillTable(); + spillPartitionSet = spillHashJoinTable( + table_, tableSpillHashBits_, joinNode_, spillConfig(), &spillStats_); VELOX_CHECK(!spillPartitionSet.empty()); } const auto spillPartitionIdSet = toSpillPartitionIdSet(spillPartitionSet); @@ -1881,82 +1882,7 @@ void HashProbe::maybeSetupSpillOutputReader() { spillOutputPartitionSet_.clear(); } -SpillPartitionSet HashProbe::spillTable() { - struct SpillResult { - std::unique_ptr spiller{nullptr}; - const std::exception_ptr error{nullptr}; - - explicit SpillResult(std::exception_ptr _error) : error(_error) {} - explicit SpillResult(std::unique_ptr _spiller) - : spiller(std::move(_spiller)) {} - }; - - const std::vector rowContainers = table_->allRows(); - std::vector>> spillTasks; - auto* spillExecutor = spillConfig()->executor; - for (auto* rowContainer : rowContainers) { - if (rowContainer->numRows() == 0) { - continue; - } - spillTasks.push_back(memory::createAsyncMemoryReclaimTask( - [this, rowContainer]() { - try { - return std::make_unique(spillTable(rowContainer)); - } catch (const std::exception& e) { - LOG(ERROR) << "Spill sub-table from hash probe pool " - << pool()->name() << " failed: " << e.what(); - // The exception is captured and thrown by the caller. - return std::make_unique(std::current_exception()); - } - })); - if ((spillTasks.size() > 1) && (spillExecutor != nullptr)) { - spillExecutor->add([source = spillTasks.back()]() { source->prepare(); }); - } - } - - auto syncGuard = folly::makeGuard([&]() { - for (auto& spillTask : spillTasks) { - // We consume the result for the pending tasks. This is a cleanup in the - // guard and must not throw. The first error is already captured before - // this runs. - try { - spillTask->move(); - } catch (const std::exception&) { - } - } - }); - - SpillPartitionSet spillPartitions; - for (auto& spillTask : spillTasks) { - const auto result = spillTask->move(); - if (result->error) { - std::rethrow_exception(result->error); - } - result->spiller->finishSpill(spillPartitions); - } - - // Remove the spilled partitions which are empty so as we don't need to - // trigger unnecessary spilling at hash probe side. - removeEmptyPartitions(spillPartitions); - return spillPartitions; -} - -std::unique_ptr HashProbe::spillTable(RowContainer* subTableRows) { - VELOX_CHECK_NOT_NULL(tableSpillType_); - - auto tableSpiller = std::make_unique( - Spiller::Type::kHashJoinBuild, - joinType_, - subTableRows, - tableSpillType_, - std::move(tableSpillHashBits_), - spillConfig(), - &spillStats_); - tableSpiller->spill(); - return tableSpiller; -} - -void HashProbe::prepareTableSpill( +void HashProbe::checkMaxSpillLevel( const std::optional& restoredPartitionId) { if (!canSpill()) { return; @@ -1980,40 +1906,8 @@ void HashProbe::prepareTableSpill( } } exceededMaxSpillLevelLimit_ = false; - tableSpillHashBits_ = HashBitRange( startPartitionBit, startPartitionBit + config->numPartitionBits); - - // NOTE: we only need to init 'tableSpillType_' once. - if (tableSpillType_ != nullptr) { - return; - } - - const auto& tableInputType = joinNode_->sources()[1]->outputType(); - std::vector names; - names.reserve(tableInputType->size()); - std::vector types; - types.reserve(tableInputType->size()); - const auto numKeys = joinNode_->rightKeys().size(); - - // Identify the non-key build side columns. - folly::F14FastMap keyChannelMap; - for (int i = 0; i < numKeys; ++i) { - const auto& key = joinNode_->rightKeys()[i]; - const auto channel = exprToChannel(key.get(), tableInputType); - keyChannelMap[channel] = i; - names.emplace_back(tableInputType->nameOf(channel)); - types.emplace_back(tableInputType->childAt(channel)); - } - const auto numDependents = tableInputType->size() - numKeys; - for (auto i = 0; i < tableInputType->size(); ++i) { - if (keyChannelMap.find(i) == keyChannelMap.end()) { - names.emplace_back(tableInputType->nameOf(i)); - types.emplace_back(tableInputType->childAt(i)); - } - } - tableSpillType_ = hashJoinTableSpillType( - ROW(std::move(names), std::move(types)), joinType_); } void HashProbe::close() { diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index 865048356074..121120f991f5 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -201,9 +201,9 @@ class HashProbe : public Operator { void maybeSetupSpillInputReader( const std::optional& restoredSpillPartitionId); - // Prepares the table spill by checking the spill level limit, setting spill - // partition bits and table spill type. - void prepareTableSpill( + // Checks the hash table's spill level limit from the restored table. Sets the + // 'exceededMaxSpillLevelLimit_' accordingly. + void checkMaxSpillLevel( const std::optional& restoredPartitionId); bool canSpill() const override; @@ -218,7 +218,7 @@ class HashProbe : public Operator { // Indicates if the operator needs to spill probe inputs. It is true if parts // of the build-side rows have been spilled. Hence, the probe operator needs // to spill the corresponding probe-side rows as well. - bool needSpillInput() const; + bool needToSpillInput() const; // This ensures there is sufficient buffer reserved to produce the next output // batch. This might trigger memory arbitration underneath and the probe @@ -252,14 +252,6 @@ class HashProbe : public Operator { // Produces and spills output from this probe operator. void spillOutput(); - // Spills the composed 'table_' from the built side. - SpillPartitionSet spillTable(); - // Spills the row container from one of the sub-table from 'table_' to - // parallelize the table spilling. The function spills all the rows from the - // row container and returns the spiller for the caller to collect the spilled - // partitions and stats. - std::unique_ptr spillTable(RowContainer* subTableRows); - // Invoked to spill rows in 'input' to disk directly if the corresponding // partitions have been spilled at the build side. // @@ -300,7 +292,7 @@ class HashProbe : public Operator { // restore. Also note that the spilled partition at build side must not be // empty. bool emptyBuildSide() const { - return table_->numDistinct() == 0 && spillPartitionSet_.empty() && + return table_->numDistinct() == 0 && inputSpillPartitionSet_.empty() && spillInputPartitionIds_.empty(); } @@ -373,8 +365,8 @@ class HashProbe : public Operator { std::vector> hashers_; - // Table shared between other HashProbes in other Drivers of the same - // pipeline. + // Current working hash table that is shared between other HashProbes in other + // Drivers of the same pipeline. std::shared_ptr table_; // Indicates whether there was no input. Used for right semi join project. @@ -624,8 +616,6 @@ class HashProbe : public Operator { // The partition bits used to spill the hash table. HashBitRange tableSpillHashBits_; - // The row type used to spill hash table on disk. - RowTypePtr tableSpillType_; // The spilled output partition set which is cleared after setup // 'spillOutputReader_'. @@ -668,7 +658,7 @@ class HashProbe : public Operator { bool noMoreSpillInput_{false}; // The spilled probe partitions remaining to restore. - SpillPartitionSet spillPartitionSet_; + SpillPartitionSet inputSpillPartitionSet_; }; inline std::ostream& operator<<(std::ostream& os, ProbeOperatorState state) { diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 7f504c58e42b..434ca9570976 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -342,8 +342,8 @@ class BaseHashTable { int8_t spillInputStartPartitionBit, bool disableRangeArrayHash = false) = 0; - // Removes 'rows' from the hash table and its RowContainer. 'rows' must exist - // and be unique. + /// Removes 'rows' from the hash table and its RowContainer. 'rows' must exist + /// and be unique. virtual void erase(folly::Range rows) = 0; /// Returns a brief description for use in debugging. diff --git a/velox/exec/Operator.cpp b/velox/exec/Operator.cpp index 3b65536ba04c..04e5bfbec9fc 100644 --- a/velox/exec/Operator.cpp +++ b/velox/exec/Operator.cpp @@ -486,9 +486,8 @@ column_index_t exprToChannel( if (dynamic_cast(expr)) { return kConstantChannel; } - VELOX_FAIL( + VELOX_UNREACHABLE( "Expression must be field access or constant, got: {}", expr->toString()); - return 0; // not reached. } std::vector calculateOutputChannels( diff --git a/velox/exec/tests/HashJoinBridgeTest.cpp b/velox/exec/tests/HashJoinBridgeTest.cpp index fce7c01303f8..fd4024805241 100644 --- a/velox/exec/tests/HashJoinBridgeTest.cpp +++ b/velox/exec/tests/HashJoinBridgeTest.cpp @@ -586,6 +586,90 @@ TEST(HashJoinBridgeTest, needRightSideJoin) { } } +TEST_P(HashJoinBridgeTest, hashJoinTableType) { + core::TypedExprPtr filter{ + std::make_shared(BOOLEAN(), true)}; + struct TestSetting { + core::JoinType joinType; + RowTypePtr probeKeyType; + RowTypePtr buildKeyType; + RowTypePtr probeSourceType; + RowTypePtr buildSourceType; + std::string debugString() const { + return fmt::format( + "joinType {} probeKeyType {} buildKeyType {} probeSourceType {} buildSourceType {}", + joinType, + probeKeyType->toString(), + buildKeyType->toString(), + buildSourceType->toString(), + probeSourceType->toString()); + } + }; + std::vector testSettings{ + {core::JoinType::kInner, + ROW({"p0"}, {BIGINT()}), + ROW({"b0"}, {BIGINT()}), + ROW({"p0", "p1"}, {BIGINT(), BIGINT()}), + ROW({"b0", "b1"}, {BIGINT(), BIGINT()})}, + {core::JoinType::kRight, + ROW({"p1", "p0"}, {BIGINT(), BIGINT()}), + ROW({"b1", "b0"}, {BIGINT(), BIGINT()}), + ROW({"p0", "p1"}, {BIGINT(), BIGINT()}), + ROW({"b0", "b1"}, {BIGINT(), BIGINT()})}, + {core::JoinType::kLeft, + ROW({"p1"}, {BIGINT()}), + ROW({"b1"}, {BIGINT()}), + ROW({"p0", "p1"}, {BIGINT(), BIGINT()}), + ROW({"b0", "b1"}, {BIGINT(), BIGINT()})}}; + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + const auto emptyBuildVector = {std::make_shared( + pool_.get(), + testData.buildSourceType, + nullptr, // nulls + 0, + std::vector{})}; + const auto buildValueNode = + std::make_shared("buildValueNode", emptyBuildVector); + + const auto emptyProbeVectors = {std::make_shared( + pool_.get(), + testData.probeSourceType, + nullptr, // nulls + 0, + std::vector{})}; + const auto probeValueNode = + std::make_shared("probeValueNode", emptyProbeVectors); + + std::vector buildKeys; + std::vector probeKeys; + for (uint32_t i = 0; i < testData.buildKeyType->size(); i++) { + buildKeys.push_back(std::make_shared( + testData.buildKeyType->childAt(i), testData.buildKeyType->nameOf(i))); + } + for (uint32_t i = 0; i < testData.probeKeyType->size(); i++) { + probeKeys.push_back(std::make_shared( + testData.probeKeyType->childAt(i), testData.probeKeyType->nameOf(i))); + } + const auto joinNode = std::make_shared( + "join-bridge-test", + testData.joinType, + false, + probeKeys, + buildKeys, + filter, + probeValueNode, + buildValueNode, + ROW({})); + + auto tableType = hashJoinTableType(joinNode); + ASSERT_EQ(tableType->size(), testData.buildSourceType->size()); + for (uint32_t i = 0; i < buildKeys.size(); i++) { + ASSERT_EQ(tableType->childAt(i), testData.buildKeyType->childAt(i)); + } + } +} + TEST(HashJoinBridgeTest, hashJoinTableSpillType) { const RowTypePtr tableType = ROW({"k1", "k2"}, {BIGINT(), BIGINT()}); const RowTypePtr spillTypeWithProbedFlag =