diff --git a/velox/common/base/SpillConfig.cpp b/velox/common/base/SpillConfig.cpp index 71188fc5c6dc..45d9f104b1f6 100644 --- a/velox/common/base/SpillConfig.cpp +++ b/velox/common/base/SpillConfig.cpp @@ -30,6 +30,7 @@ SpillConfig::SpillConfig( int32_t _spillableReservationGrowthPct, uint8_t _startPartitionBit, uint8_t _joinPartitionBits, + uint8_t _rowNumberPartitionBits, int32_t _maxSpillLevel, uint64_t _maxSpillRunRows, uint64_t _writerFlushThresholdSize, @@ -48,6 +49,7 @@ SpillConfig::SpillConfig( spillableReservationGrowthPct(_spillableReservationGrowthPct), startPartitionBit(_startPartitionBit), joinPartitionBits(_joinPartitionBits), + rowNumberPartitionBits(_rowNumberPartitionBits), maxSpillLevel(_maxSpillLevel), maxSpillRunRows(_maxSpillRunRows), writerFlushThresholdSize(_writerFlushThresholdSize), @@ -59,8 +61,9 @@ SpillConfig::SpillConfig( "Spillable memory reservation growth pct should not be lower than minimum available pct"); } -int32_t SpillConfig::joinSpillLevel(uint8_t startBitOffset) const { - const auto numPartitionBits = joinPartitionBits; +int32_t SpillConfig::spillLevel( + uint8_t startBitOffset, + uint8_t numPartitionBits) const { VELOX_CHECK_LE( startBitOffset + numPartitionBits, 64, @@ -78,13 +81,15 @@ int32_t SpillConfig::joinSpillLevel(uint8_t startBitOffset) const { return deltaBits / numPartitionBits; } -bool SpillConfig::exceedJoinSpillLevelLimit(uint8_t startBitOffset) const { - if (startBitOffset + joinPartitionBits > 64) { +bool SpillConfig::exceedSpillLevelLimit( + uint8_t startBitOffset, + uint8_t numPartitionBits) const { + if (startBitOffset + numPartitionBits > 64) { return true; } if (maxSpillLevel == -1) { return false; } - return joinSpillLevel(startBitOffset) > maxSpillLevel; + return spillLevel(startBitOffset, numPartitionBits) > maxSpillLevel; } } // namespace facebook::velox::common diff --git a/velox/common/base/SpillConfig.h b/velox/common/base/SpillConfig.h index e018f2ad2f0b..7d122f12ec92 100644 --- a/velox/common/base/SpillConfig.h +++ b/velox/common/base/SpillConfig.h @@ -57,21 +57,24 @@ struct SpillConfig { int32_t _spillableReservationGrowthPct, uint8_t _startPartitionBit, uint8_t _joinPartitionBits, + uint8_t _rowNumberPartitionBits, int32_t _maxSpillLevel, uint64_t _maxSpillRunRows, uint64_t _writerFlushThresholdSize, const std::string& _compressionKind, const std::string& _fileCreateConfig = {}); - /// Returns the hash join spilling level with given 'startBitOffset'. + /// Returns the spilling level with given 'startBitOffset' and + /// 'numPartitionBits'. /// /// NOTE: we advance (or right shift) the partition bit offset when goes to /// the next level of recursive spilling. - int32_t joinSpillLevel(uint8_t startBitOffset) const; + int32_t spillLevel(uint8_t startBitOffset, uint8_t numPartitionBits) const; - /// Checks if the given 'startBitOffset' has exceeded the max hash join - /// spill limit. - bool exceedJoinSpillLevelLimit(uint8_t startBitOffset) const; + /// Checks if the given 'startBitOffset' and 'numPartitionBits' has exceeded + /// the max hash join spill limit. + bool exceedSpillLevelLimit(uint8_t startBitOffset, uint8_t numPartitionBits) + const; /// A callback function that returns the spill directory path. Implementations /// can use it to ensure the path exists before returning. @@ -113,13 +116,17 @@ struct SpillConfig { /// memory usage. int32_t spillableReservationGrowthPct; - /// Used to calculate spill partition number. + /// The start partition bit offset of the top (the first level) partitions. uint8_t startPartitionBit; /// Used to calculate the spill hash partition number for hash join with /// 'startPartitionBit'. uint8_t joinPartitionBits; + /// Used to calculate the spill partition number of the hash table in + /// RowNumber with 'startPartitionBit'. + uint8_t rowNumberPartitionBits; + /// The max allowed spilling level with zero being the initial spilling /// level. This only applies for hash build spilling which needs recursive /// spilling when the build table is too big. If it is set to -1, then there diff --git a/velox/common/base/tests/SpillConfigTest.cpp b/velox/common/base/tests/SpillConfigTest.cpp index 11b1f4274856..b3e0229bc54b 100644 --- a/velox/common/base/tests/SpillConfigTest.cpp +++ b/velox/common/base/tests/SpillConfigTest.cpp @@ -37,6 +37,7 @@ TEST(SpillConfig, spillLevel) { 0, kInitialBitOffset, kNumPartitionsBits, + kNumPartitionsBits, 0, 0, 0, @@ -63,10 +64,12 @@ TEST(SpillConfig, spillLevel) { for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); if (testData.expectedLevel == -1) { - ASSERT_ANY_THROW(config.joinSpillLevel(testData.bitOffset)); + ASSERT_ANY_THROW( + config.spillLevel(testData.bitOffset, kNumPartitionsBits)); } else { ASSERT_EQ( - config.joinSpillLevel(testData.bitOffset), testData.expectedLevel); + config.spillLevel(testData.bitOffset, kNumPartitionsBits), + testData.expectedLevel); } } } @@ -123,6 +126,7 @@ TEST(SpillConfig, spillLevelLimit) { 0, testData.startBitOffset, testData.numBits, + testData.numBits, testData.maxSpillLevel, 0, 0, @@ -130,7 +134,7 @@ TEST(SpillConfig, spillLevelLimit) { ASSERT_EQ( testData.expectedExceeds, - config.exceedJoinSpillLevelLimit(testData.bitOffset)); + config.exceedSpillLevelLimit(testData.bitOffset, testData.numBits)); } } @@ -171,6 +175,7 @@ TEST(SpillConfig, spillableReservationPercentages) { 0, 0, 0, + 0, 1'000'000, 0, "none"); diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index 12515f2de998..1945164c110b 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -97,6 +97,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase { 0, 0, 0, + 0, writerFlushThreshold, "none"); } diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 72109b3817c5..d54a23d8ad5e 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -273,6 +273,9 @@ class QueryConfig { static constexpr const char* kJoinSpillPartitionBits = "join_spiller_partition_bits"; + static constexpr const char* kRowNumberSpillPartitionBits = + "row_number_spiller_partition_bits"; + static constexpr const char* kMinSpillableReservationPct = "min_spillable_reservation_pct"; @@ -577,9 +580,8 @@ class QueryConfig { return get(kSpillStartPartitionBit, kDefaultStartBit); } - /// Returns the number of bits used to calculate the spilling partition - /// number for hash join. The number of spilling partitions will be power of - /// two. + /// Returns the number of bits used to calculate the spill partition number + /// for hash join. The number of spill partitions will be power of two. /// /// NOTE: as for now, we only support up to 8-way spill partitioning. uint8_t joinSpillPartitionBits() const { @@ -589,6 +591,17 @@ class QueryConfig { kMaxBits, get(kJoinSpillPartitionBits, kDefaultBits)); } + /// Returns the number of bits used to calculate the spill partition number + /// for RowNumber. The number of spill partitions will be power of two. + /// + /// NOTE: as for now, we only support up to 8-way spill partitioning. + uint8_t rowNumberSpillPartitionBits() const { + constexpr uint8_t kDefaultBits = 3; + constexpr uint8_t kMaxBits = 3; + return std::min( + kMaxBits, get(kRowNumberSpillPartitionBits, kDefaultBits)); + } + uint64_t writerFlushThresholdBytes() const { return get(kWriterFlushThresholdBytes, 96L << 20); } diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index 4aa4bc86d317..55776b8fddad 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -257,6 +257,7 @@ class E2EWriterTest : public testing::Test { 0, 0, 0, + 0, writerFlushThresholdSize, "none"); } diff --git a/velox/exec/Driver.cpp b/velox/exec/Driver.cpp index e37022f97ff6..c21634a86fda 100644 --- a/velox/exec/Driver.cpp +++ b/velox/exec/Driver.cpp @@ -156,6 +156,7 @@ std::optional DriverCtx::makeSpillConfig( queryConfig.spillableReservationGrowthPct(), queryConfig.spillStartPartitionBit(), queryConfig.joinSpillPartitionBits(), + queryConfig.rowNumberSpillPartitionBits(), queryConfig.maxSpillLevel(), queryConfig.maxSpillRunRows(), queryConfig.writerFlushThresholdBytes(), diff --git a/velox/exec/HashBuild.cpp b/velox/exec/HashBuild.cpp index f2f830204416..f9d1598c737d 100644 --- a/velox/exec/HashBuild.cpp +++ b/velox/exec/HashBuild.cpp @@ -225,7 +225,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) { spillConfig.joinPartitionBits; // Disable spilling if exceeding the max spill level and the query might run // out of memory if the restored partition still can't fit in memory. - if (spillConfig.exceedJoinSpillLevelLimit(startBit)) { + if (spillConfig.exceedSpillLevelLimit( + startBit, spillConfig.joinPartitionBits)) { RECORD_METRIC_VALUE(kMetricMaxSpillLevelExceededCount); LOG(WARNING) << "Exceeded spill level limit: " << spillConfig.maxSpillLevel @@ -905,8 +906,8 @@ void HashBuild::addRuntimeStats() { if (spiller_ != nullptr && spiller_->isAnySpilled()) { lockedStats->addRuntimeStat( "maxSpillLevel", - RuntimeCounter( - spillConfig()->joinSpillLevel(spiller_->hashBits().begin()))); + RuntimeCounter(spillConfig()->spillLevel( + spiller_->hashBits().begin(), spillConfig()->joinPartitionBits))); } } diff --git a/velox/exec/tests/SortBufferTest.cpp b/velox/exec/tests/SortBufferTest.cpp index f965652651ac..838501735161 100644 --- a/velox/exec/tests/SortBufferTest.cpp +++ b/velox/exec/tests/SortBufferTest.cpp @@ -58,6 +58,7 @@ class SortBufferTest : public OperatorTestBase { 0, 0, 0, + 0, "none"); } @@ -300,6 +301,7 @@ TEST_F(SortBufferTest, batchOutput) { 0, 0, 0, + 0, "none"); auto sortBuffer = std::make_unique( inputType_, @@ -395,6 +397,7 @@ TEST_F(SortBufferTest, spill) { 0, 0, 0, + 0, "none"); auto sortBuffer = std::make_unique( inputType_,