diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 774c87f52517a..39bf52cfdab92 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -578,7 +578,7 @@ class QueryConfig { /// calculate the spilling partition number for join spill or aggregation /// spill. uint8_t spillStartPartitionBit() const { - constexpr uint8_t kDefaultStartBit = 29; + constexpr uint8_t kDefaultStartBit = 48; return get(kSpillStartPartitionBit, kDefaultStartBit); } diff --git a/velox/exec/GroupingSet.cpp b/velox/exec/GroupingSet.cpp index e45f4e5cb8c68..7484e3ba3f741 100644 --- a/velox/exec/GroupingSet.cpp +++ b/velox/exec/GroupingSet.cpp @@ -224,7 +224,8 @@ void GroupingSet::addInputForActiveRows( TestValue::adjust( "facebook::velox::exec::GroupingSet::addInputForActiveRows", this); - table_->prepareForGroupProbe(*lookup_, input, activeRows_, ignoreNullKeys_); + table_->prepareForGroupProbe( + *lookup_, input, activeRows_, ignoreNullKeys_, -1); if (lookup_->rows.empty()) { // No rows to probe. Can happen when ignoreNullKeys_ is true and all rows // have null keys. diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index ce8a2f76ceb82..fa178718f72bc 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -823,7 +823,8 @@ bool HashBuild::finishHashBuild() { table_->prepareJoinTable( std::move(otherTables), allowParallelJoinBuild ? operatorCtx_->task()->queryCtx()->executor() - : nullptr); + : nullptr, + isInputFromSpill() ? spillConfig()->startPartitionBit : -1); addRuntimeStats(); if (joinBridge_->setHashTable( std::move(table_), std::move(spillPartitions), joinHasNullKeys_)) { diff --git a/velox/exec/HashTable.cpp b/velox/exec/HashTable.cpp index ce89fe1d60dd3..a820c6ba8d2d3 100644 --- a/velox/exec/HashTable.cpp +++ b/velox/exec/HashTable.cpp @@ -1607,7 +1607,8 @@ bool mayUseValueIds(const BaseHashTable& table) { template void HashTable::prepareJoinTable( std::vector> tables, - folly::Executor* executor) { + folly::Executor* executor, + int spillInputStartPartitionBit) { buildExecutor_ = executor; otherTables_.reserve(tables.size()); for (auto& table : tables) { @@ -1650,6 +1651,7 @@ void HashTable::prepareJoinTable( } else { decideHashMode(0); } + checkHashBitsOverlap(spillInputStartPartitionBit); } template @@ -1982,7 +1984,9 @@ void BaseHashTable::prepareForGroupProbe( HashLookup& lookup, const RowVectorPtr& input, SelectivityVector& rows, - bool ignoreNullKeys) { + bool ignoreNullKeys, + int spillInputStartPartitionBit) { + checkHashBitsOverlap(spillInputStartPartitionBit); auto& hashers = lookup.hashers; for (auto& hasher : hashers) { @@ -2015,7 +2019,8 @@ void BaseHashTable::prepareForGroupProbe( decideHashMode(input->size()); // Do not forward 'ignoreNullKeys' to avoid redundant evaluation of // deselectRowsWithNulls. - prepareForGroupProbe(lookup, input, rows, false); + prepareForGroupProbe( + lookup, input, rows, false, spillInputStartPartitionBit); return; } } diff --git a/velox/exec/HashTable.h b/velox/exec/HashTable.h index 5dc6e128934cd..aa931d3692b4c 100644 --- a/velox/exec/HashTable.h +++ b/velox/exec/HashTable.h @@ -181,7 +181,8 @@ class BaseHashTable { HashLookup& lookup, const RowVectorPtr& input, SelectivityVector& rows, - bool ignoreNullKeys); + bool ignoreNullKeys, + int spillInputStartPartitionBit); /// Finds or creates a group for each key in 'lookup'. The keys are /// returned in 'lookup.hits'. @@ -248,7 +249,8 @@ class BaseHashTable { virtual void prepareJoinTable( std::vector> tables, - folly::Executor* executor = nullptr) = 0; + folly::Executor* executor = nullptr, + int spillInputStartPartitionBit = -1) = 0; /// Returns the memory footprint in bytes for any data structures /// owned by 'this'. @@ -328,7 +330,10 @@ class BaseHashTable { /// Extracts a 7 bit tag from a hash number. The high bit is always set. static uint8_t hashTag(uint64_t hash) { - return static_cast(hash >> 32) | 0x80; + // NOTE: this is likely all 0 for small key types (<= 32 bits). It is + // probably not a severe problem as we are likely running in array mode in + // such cases. + return static_cast(hash >> 38) | 0x80; } /// Loads a vector of tags for bulk comparison. Disables tsan errors @@ -365,6 +370,19 @@ class BaseHashTable { virtual void setHashMode(HashMode mode, int32_t numNew) = 0; + virtual int sizeBits() const = 0; + + // We don't want there is any overlap in the bit ranges used by bucket index + // and those used by spill partitioning; otherwise because we receive data + // from only one partition, the overlapped bits would be the same and only a + // fraction of the buckets would be used. This would cause the insertion + // taking very long time and block driver threads. + void checkHashBitsOverlap(int spillInputStartPartitionBit) { + if (spillInputStartPartitionBit >= 0 && hashMode() != HashMode::kArray) { + VELOX_CHECK_LE(sizeBits(), spillInputStartPartitionBit); + } + } + std::vector> hashers_; std::unique_ptr rows_; @@ -525,7 +543,8 @@ class HashTable : public BaseHashTable { // and VectorHashers and decides the hash mode and representation. void prepareJoinTable( std::vector> tables, - folly::Executor* executor = nullptr) override; + folly::Executor* executor = nullptr, + int spillInputStartPartitionBit = -1) override; uint64_t hashTableSizeIncrease(int32_t numNewDistinct) const override { if (numDistinct_ + numNewDistinct > rehashSize()) { @@ -587,10 +606,6 @@ class HashTable : public BaseHashTable { // occupy exactly two (64 bytes) cache lines. class Bucket { public: - Bucket() { - static_assert(sizeof(Bucket) == 128); - } - uint8_t tagAt(int32_t slotIndex) { return reinterpret_cast(&tags_)[slotIndex]; } @@ -622,6 +637,7 @@ class HashTable : public BaseHashTable { char padding_[16]; }; + static_assert(sizeof(Bucket) == 128); static constexpr uint64_t kBucketSize = sizeof(Bucket); // Returns the bucket at byte offset 'offset' from 'table_'. @@ -881,6 +897,10 @@ class HashTable : public BaseHashTable { } } + int sizeBits() const final { + return sizeBits_; + } + // The min table size in row to trigger parallel join table build. const uint32_t minTableSizeForParallelJoinBuild_; @@ -938,7 +958,7 @@ class HashTable : public BaseHashTable { // Executor for parallelizing hash join build. This may be the // executor for Drivers. If this executor is indefinitely taken by - // other work, the thread of prepareJoinTables() will sequentially + // other work, the thread of prepareJoinTable() will sequentially // execute the parallel build steps. folly::Executor* buildExecutor_{nullptr}; diff --git a/velox/exec/RowNumber.cpp b/velox/exec/RowNumber.cpp index f753cde0e38fa..501ab2b6bcf95 100644 --- a/velox/exec/RowNumber.cpp +++ b/velox/exec/RowNumber.cpp @@ -78,7 +78,7 @@ void RowNumber::addInput(RowVectorPtr input) { } SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input, rows, false); + table_->prepareForGroupProbe(*lookup_, input, rows, false, -1); table_->groupProbe(*lookup_); // Initialize new partitions with zeros. @@ -93,7 +93,8 @@ void RowNumber::addInput(RowVectorPtr input) { void RowNumber::addSpillInput() { const auto numInput = input_->size(); SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input_, rows, false); + table_->prepareForGroupProbe( + *lookup_, input_, rows, false, spillConfig_->startPartitionBit); table_->groupProbe(*lookup_); // Initialize new partitions with zeros. @@ -157,7 +158,8 @@ void RowNumber::restoreNextSpillPartition() { const auto numInput = input->size(); SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input, rows, false); + table_->prepareForGroupProbe( + *lookup_, input, rows, false, spillConfig_->startPartitionBit); table_->groupProbe(*lookup_); auto* counts = data->children().back()->as>(); diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index ceb9dc2131e36..e429e13705e93 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -191,7 +191,7 @@ void TopNRowNumber::addInput(RowVectorPtr input) { ensureInputFits(input); SelectivityVector rows(numInput); - table_->prepareForGroupProbe(*lookup_, input, rows, false); + table_->prepareForGroupProbe(*lookup_, input, rows, false, -1); table_->groupProbe(*lookup_); // Initialize new partitions. diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 573c618989b4a..08dc5b8f76041 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -5038,6 +5038,22 @@ TEST_F(HashJoinTest, spillFileSize) { } } +TEST_F(HashJoinTest, spillPartitionBitsOverlap) { + auto builder = + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .numDrivers(numDrivers_) + .keyTypes({BIGINT(), BIGINT()}) + .probeVectors(2'000, 3) + .buildVectors(2'000, 3) + .referenceQuery( + "SELECT t_k0, t_k1, t_data, u_k0, u_k1, u_data FROM t, u WHERE t_k0 = u_k0 and t_k1 = u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "8") + .config(core::QueryConfig::kJoinSpillPartitionBits, "1") + .checkSpillStats(false) + .maxSpillLevel(0); + VELOX_ASSERT_THROW(builder.run(), "vs. 8"); +} + // The test is to verify if the hash build reservation has been released on // task error. DEBUG_ONLY_TEST_F(HashJoinTest, buildReservationReleaseCheck) { @@ -5242,6 +5258,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringInputProcessing) { .spillDirectory(testData.spillEnabled ? tempDirectory->path : "") .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { const auto statsPair = taskSpilledStats(*task); if (testData.expectedReclaimable) { @@ -5394,6 +5411,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringReserve) { .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { const auto statsPair = taskSpilledStats(*task); ASSERT_GT(statsPair.first.spilledBytes, 0); @@ -5788,6 +5806,7 @@ DEBUG_ONLY_TEST_F(HashJoinTest, reclaimDuringWaitForProbe) { .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { const auto statsPair = taskSpilledStats(*task); ASSERT_GT(statsPair.first.spilledBytes, 0); @@ -6351,6 +6370,7 @@ TEST_F(HashJoinTest, exceededMaxSpillLevel) { .spillDirectory(tempDirectory->path) .referenceQuery( "SELECT t_k1, t_k2, t_v1, u_k1, u_k2, u_v1 FROM t, u WHERE t.t_k1 = u.u_k1") + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .verifier([&](const std::shared_ptr& task, bool /*unused*/) { auto joinStats = task->taskStats() .pipelineStats.back() diff --git a/velox/exec/tests/utils/ArbitratorTestUtil.cpp b/velox/exec/tests/utils/ArbitratorTestUtil.cpp index aeb75584beabc..62761c9f33d82 100644 --- a/velox/exec/tests/utils/ArbitratorTestUtil.cpp +++ b/velox/exec/tests/utils/ArbitratorTestUtil.cpp @@ -99,6 +99,7 @@ QueryTestResult runHashJoinTask( .spillDirectory(spillDirectory->path) .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kJoinSpillEnabled, true) + .config(core::QueryConfig::kSpillStartPartitionBit, "29") .queryCtx(queryCtx) .maxDrivers(numDrivers) .copyResults(pool, result.task);