Skip to content

Commit

Permalink
Add RowNumber spill partition bits (facebookincubator#8978)
Browse files Browse the repository at this point in the history
Summary:
Add `rowNumberPartitionBits` in SpillConfig for RowNumber,
and consolidate the `spillLevel` and `exceedMaxSpillLevel`
methods.

Pull Request resolved: facebookincubator#8978

Reviewed By: tanjialiang

Differential Revision: D54934695

Pulled By: xiaoxmeng

fbshipit-source-id: 615d64cb33a33c87dda3efecbd41c7bd69cb985d
  • Loading branch information
duanmeng authored and Joe-Abraham committed Jun 7, 2024
1 parent bd95801 commit cef6f2d
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 20 deletions.
15 changes: 10 additions & 5 deletions velox/common/base/SpillConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ SpillConfig::SpillConfig(
int32_t _spillableReservationGrowthPct,
uint8_t _startPartitionBit,
uint8_t _joinPartitionBits,
uint8_t _rowNumberPartitionBits,
int32_t _maxSpillLevel,
uint64_t _maxSpillRunRows,
uint64_t _writerFlushThresholdSize,
Expand All @@ -48,6 +49,7 @@ SpillConfig::SpillConfig(
spillableReservationGrowthPct(_spillableReservationGrowthPct),
startPartitionBit(_startPartitionBit),
joinPartitionBits(_joinPartitionBits),
rowNumberPartitionBits(_rowNumberPartitionBits),
maxSpillLevel(_maxSpillLevel),
maxSpillRunRows(_maxSpillRunRows),
writerFlushThresholdSize(_writerFlushThresholdSize),
Expand All @@ -59,8 +61,9 @@ SpillConfig::SpillConfig(
"Spillable memory reservation growth pct should not be lower than minimum available pct");
}

int32_t SpillConfig::joinSpillLevel(uint8_t startBitOffset) const {
const auto numPartitionBits = joinPartitionBits;
int32_t SpillConfig::spillLevel(
uint8_t startBitOffset,
uint8_t numPartitionBits) const {
VELOX_CHECK_LE(
startBitOffset + numPartitionBits,
64,
Expand All @@ -78,13 +81,15 @@ int32_t SpillConfig::joinSpillLevel(uint8_t startBitOffset) const {
return deltaBits / numPartitionBits;
}

bool SpillConfig::exceedJoinSpillLevelLimit(uint8_t startBitOffset) const {
if (startBitOffset + joinPartitionBits > 64) {
bool SpillConfig::exceedSpillLevelLimit(
uint8_t startBitOffset,
uint8_t numPartitionBits) const {
if (startBitOffset + numPartitionBits > 64) {
return true;
}
if (maxSpillLevel == -1) {
return false;
}
return joinSpillLevel(startBitOffset) > maxSpillLevel;
return spillLevel(startBitOffset, numPartitionBits) > maxSpillLevel;
}
} // namespace facebook::velox::common
19 changes: 13 additions & 6 deletions velox/common/base/SpillConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,21 +57,24 @@ struct SpillConfig {
int32_t _spillableReservationGrowthPct,
uint8_t _startPartitionBit,
uint8_t _joinPartitionBits,
uint8_t _rowNumberPartitionBits,
int32_t _maxSpillLevel,
uint64_t _maxSpillRunRows,
uint64_t _writerFlushThresholdSize,
const std::string& _compressionKind,
const std::string& _fileCreateConfig = {});

/// Returns the hash join spilling level with given 'startBitOffset'.
/// Returns the spilling level with given 'startBitOffset' and
/// 'numPartitionBits'.
///
/// NOTE: we advance (or right shift) the partition bit offset when goes to
/// the next level of recursive spilling.
int32_t joinSpillLevel(uint8_t startBitOffset) const;
int32_t spillLevel(uint8_t startBitOffset, uint8_t numPartitionBits) const;

/// Checks if the given 'startBitOffset' has exceeded the max hash join
/// spill limit.
bool exceedJoinSpillLevelLimit(uint8_t startBitOffset) const;
/// Checks if the given 'startBitOffset' and 'numPartitionBits' has exceeded
/// the max hash join spill limit.
bool exceedSpillLevelLimit(uint8_t startBitOffset, uint8_t numPartitionBits)
const;

/// A callback function that returns the spill directory path. Implementations
/// can use it to ensure the path exists before returning.
Expand Down Expand Up @@ -113,13 +116,17 @@ struct SpillConfig {
/// memory usage.
int32_t spillableReservationGrowthPct;

/// Used to calculate spill partition number.
/// The start partition bit offset of the top (the first level) partitions.
uint8_t startPartitionBit;

/// Used to calculate the spill hash partition number for hash join with
/// 'startPartitionBit'.
uint8_t joinPartitionBits;

/// Used to calculate the spill partition number of the hash table in
/// RowNumber with 'startPartitionBit'.
uint8_t rowNumberPartitionBits;

/// The max allowed spilling level with zero being the initial spilling
/// level. This only applies for hash build spilling which needs recursive
/// spilling when the build table is too big. If it is set to -1, then there
Expand Down
11 changes: 8 additions & 3 deletions velox/common/base/tests/SpillConfigTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ TEST(SpillConfig, spillLevel) {
0,
kInitialBitOffset,
kNumPartitionsBits,
kNumPartitionsBits,
0,
0,
0,
Expand All @@ -63,10 +64,12 @@ TEST(SpillConfig, spillLevel) {
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());
if (testData.expectedLevel == -1) {
ASSERT_ANY_THROW(config.joinSpillLevel(testData.bitOffset));
ASSERT_ANY_THROW(
config.spillLevel(testData.bitOffset, kNumPartitionsBits));
} else {
ASSERT_EQ(
config.joinSpillLevel(testData.bitOffset), testData.expectedLevel);
config.spillLevel(testData.bitOffset, kNumPartitionsBits),
testData.expectedLevel);
}
}
}
Expand Down Expand Up @@ -123,14 +126,15 @@ TEST(SpillConfig, spillLevelLimit) {
0,
testData.startBitOffset,
testData.numBits,
testData.numBits,
testData.maxSpillLevel,
0,
0,
"none");

ASSERT_EQ(
testData.expectedExceeds,
config.exceedJoinSpillLevelLimit(testData.bitOffset));
config.exceedSpillLevelLimit(testData.bitOffset, testData.numBits));
}
}

Expand Down Expand Up @@ -171,6 +175,7 @@ TEST(SpillConfig, spillableReservationPercentages) {
0,
0,
0,
0,
1'000'000,
0,
"none");
Expand Down
1 change: 1 addition & 0 deletions velox/connectors/hive/tests/HiveDataSinkTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class HiveDataSinkTest : public exec::test::HiveConnectorTestBase {
0,
0,
0,
0,
writerFlushThreshold,
"none");
}
Expand Down
19 changes: 16 additions & 3 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ class QueryConfig {
static constexpr const char* kJoinSpillPartitionBits =
"join_spiller_partition_bits";

static constexpr const char* kRowNumberSpillPartitionBits =
"row_number_spiller_partition_bits";

static constexpr const char* kMinSpillableReservationPct =
"min_spillable_reservation_pct";

Expand Down Expand Up @@ -577,9 +580,8 @@ class QueryConfig {
return get<uint8_t>(kSpillStartPartitionBit, kDefaultStartBit);
}

/// Returns the number of bits used to calculate the spilling partition
/// number for hash join. The number of spilling partitions will be power of
/// two.
/// Returns the number of bits used to calculate the spill partition number
/// for hash join. The number of spill partitions will be power of two.
///
/// NOTE: as for now, we only support up to 8-way spill partitioning.
uint8_t joinSpillPartitionBits() const {
Expand All @@ -589,6 +591,17 @@ class QueryConfig {
kMaxBits, get<uint8_t>(kJoinSpillPartitionBits, kDefaultBits));
}

/// Returns the number of bits used to calculate the spill partition number
/// for RowNumber. The number of spill partitions will be power of two.
///
/// NOTE: as for now, we only support up to 8-way spill partitioning.
uint8_t rowNumberSpillPartitionBits() const {
constexpr uint8_t kDefaultBits = 3;
constexpr uint8_t kMaxBits = 3;
return std::min(
kMaxBits, get<uint8_t>(kRowNumberSpillPartitionBits, kDefaultBits));
}

uint64_t writerFlushThresholdBytes() const {
return get<uint64_t>(kWriterFlushThresholdBytes, 96L << 20);
}
Expand Down
1 change: 1 addition & 0 deletions velox/dwio/dwrf/test/E2EWriterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ class E2EWriterTest : public testing::Test {
0,
0,
0,
0,
writerFlushThresholdSize,
"none");
}
Expand Down
1 change: 1 addition & 0 deletions velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
queryConfig.spillableReservationGrowthPct(),
queryConfig.spillStartPartitionBit(),
queryConfig.joinSpillPartitionBits(),
queryConfig.rowNumberSpillPartitionBits(),
queryConfig.maxSpillLevel(),
queryConfig.maxSpillRunRows(),
queryConfig.writerFlushThresholdBytes(),
Expand Down
7 changes: 4 additions & 3 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ void HashBuild::setupSpiller(SpillPartition* spillPartition) {
spillConfig.joinPartitionBits;
// Disable spilling if exceeding the max spill level and the query might run
// out of memory if the restored partition still can't fit in memory.
if (spillConfig.exceedJoinSpillLevelLimit(startBit)) {
if (spillConfig.exceedSpillLevelLimit(
startBit, spillConfig.joinPartitionBits)) {
RECORD_METRIC_VALUE(kMetricMaxSpillLevelExceededCount);
LOG(WARNING) << "Exceeded spill level limit: "
<< spillConfig.maxSpillLevel
Expand Down Expand Up @@ -905,8 +906,8 @@ void HashBuild::addRuntimeStats() {
if (spiller_ != nullptr && spiller_->isAnySpilled()) {
lockedStats->addRuntimeStat(
"maxSpillLevel",
RuntimeCounter(
spillConfig()->joinSpillLevel(spiller_->hashBits().begin())));
RuntimeCounter(spillConfig()->spillLevel(
spiller_->hashBits().begin(), spillConfig()->joinPartitionBits)));
}
}

Expand Down
3 changes: 3 additions & 0 deletions velox/exec/tests/SortBufferTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class SortBufferTest : public OperatorTestBase {
0,
0,
0,
0,
"none");
}

Expand Down Expand Up @@ -300,6 +301,7 @@ TEST_F(SortBufferTest, batchOutput) {
0,
0,
0,
0,
"none");
auto sortBuffer = std::make_unique<SortBuffer>(
inputType_,
Expand Down Expand Up @@ -395,6 +397,7 @@ TEST_F(SortBufferTest, spill) {
0,
0,
0,
0,
"none");
auto sortBuffer = std::make_unique<SortBuffer>(
inputType_,
Expand Down

0 comments on commit cef6f2d

Please sign in to comment.