Skip to content

Commit

Permalink
Free hash table after grouping set/row number spill to release memory…
Browse files Browse the repository at this point in the history
… plus a hash table fix (#11180)

Summary:

Found in shadow testing that hash aggregation can use non-trivial amount of memory like a couple hundred MB
after reclaim because the hash table held by grouping set. Currently we only clear the hash table in grouping set
but not free the table inside (only free groups). Similar for row number operator.

This PR change includes
(1) free table after spill for both row number and grouping set to make memory reclamation or arbitration
efficient and see significant improvement in global arbitration shadow testing.
(2) free row number result vector in row number spill to have more strict test check and we assume a single
vector is small and just free 1MB per operator in real workload.
(3) fix free table in hash table which doesn't reset capacity and add unit test to cover

Reviewed By: oerling

Differential Revision: D63964822
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Oct 7, 2024
1 parent 13c18db commit 6d15c10
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 25 deletions.
10 changes: 5 additions & 5 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ bool GroupingSet::getOutput(
: 0;
if (numGroups == 0) {
if (table_ != nullptr) {
table_->clear();
table_->clear(/*freeTable=*/true);
}
return false;
}
Expand Down Expand Up @@ -789,9 +789,9 @@ void GroupingSet::extractGroups(
}
}

void GroupingSet::resetTable() {
void GroupingSet::resetTable(bool freeTable) {
if (table_ != nullptr) {
table_->clear();
table_->clear(freeTable);
}
}

Expand Down Expand Up @@ -1012,7 +1012,7 @@ void GroupingSet::spill() {
if (sortedAggregations_) {
sortedAggregations_->clear();
}
table_->clear();
table_->clear(/*freeTable=*/true);
}

void GroupingSet::spill(const RowContainerIterator& rowIterator) {
Expand All @@ -1038,7 +1038,7 @@ void GroupingSet::spill(const RowContainerIterator& rowIterator) {
// guarantee we don't accidentally enter an unsafe situation.
rows->stringAllocator().freezeAndExecute(
[&]() { spiller_->spill(rowIterator); });
table_->clear();
table_->clear(/*freeTable=*/true);
}

bool GroupingSet::getOutputWithSpill(
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/GroupingSet.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,9 @@ class GroupingSet {

/// Resets the hash table inside the grouping set when partial aggregation
/// is full or reclaims memory from distinct aggregation after it has received
/// all the inputs.
void resetTable();
/// all the inputs. If 'freeTable' is false, then hash table itself is not
/// freed but only table content.
void resetTable(bool freeTable = false);

/// Returns true if 'this' should start producing partial
/// aggregation results. Checks the memory consumption against
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/HashAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ void HashAggregation::reclaim(
}
if (isDistinct_) {
// Since we have seen all the input, we can safely reset the hash table.
groupingSet_->resetTable();
groupingSet_->resetTable(/*freeTable=*/true);
// Release the minimum reserved memory.
pool()->release();
return;
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ void HashTable<ignoreNullKeys>::allocateTables(
memory::AllocationTraits::numPages(size * tableSlotSize());
rows_->pool()->allocateContiguous(numPages, tableAllocation_);
table_ = tableAllocation_.data<char*>();
memset(table_, 0, capacity_ * sizeof(char*));
::memset(table_, 0, capacity_ * sizeof(char*));
}

template <bool ignoreNullKeys>
Expand All @@ -743,6 +743,7 @@ void HashTable<ignoreNullKeys>::clear(bool freeTable) {
} else {
rows_->pool()->freeContiguous(tableAllocation_);
table_ = nullptr;
capacity_ = 0;
}
}
numDistinct_ = 0;
Expand Down
4 changes: 4 additions & 0 deletions velox/exec/HashTable.h
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,10 @@ class HashTable : public BaseHashTable {
return rehashSize();
}

char** testingTable() const {
return table_;
}

void extractColumn(
folly::Range<char* const*> rows,
int32_t columnIndex,
Expand Down
8 changes: 6 additions & 2 deletions velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ RowVectorPtr RowNumber::getOutput() {
addInput(std::move(unspilledInput));
} else {
spillInputReader_ = nullptr;
table_->clear();
table_->clear(/*freeTable=*/true);
restoreNextSpillPartition();
}
}
Expand Down Expand Up @@ -412,7 +412,7 @@ SpillPartitionNumSet RowNumber::spillHashTable() {
hashTableSpiller->spill();
hashTableSpiller->finishSpill(spillHashTablePartitionSet_);

table_->clear();
table_->clear(/*freeTable=*/true);
pool()->release();
return hashTableSpiller->state().spilledPartitionSet();
}
Expand Down Expand Up @@ -455,6 +455,10 @@ void RowNumber::spill() {
spillInput(input_, memory::spillMemoryPool());
input_ = nullptr;
}
if (generateRowNumber_) {
results_.clear();
results_.resize(1);
}
}

void RowNumber::spillInput(
Expand Down
27 changes: 15 additions & 12 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,7 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
}
} testSettings[] = {
{0, true, true}, {0, false, false}, {1, true, true}, {1, false, false}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

Expand All @@ -2045,12 +2046,12 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
.copyResults(pool_.get());

folly::EventCount driverWait;
auto driverWaitKey = driverWait.prepareWait();
std::atomic_bool driverWaitFlag{true};
folly::EventCount testWait;
auto testWaitKey = testWait.prepareWait();
std::atomic_bool testWaitFlag{true};

std::atomic_int numInputs{0};
Operator* op;
Operator* op{nullptr};
SCOPED_TESTVALUE_SET(
"facebook::velox::exec::Driver::runInternal::addInput",
std::function<void(Operator*)>(([&](Operator* testOp) {
Expand Down Expand Up @@ -2079,8 +2080,9 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
} else {
ASSERT_EQ(reclaimableBytes, 0);
}
testWait.notify();
driverWait.wait(driverWaitKey);
testWaitFlag = false;
testWait.notifyAll();
driverWait.await([&] { return !driverWaitFlag.load(); });
})));

std::thread taskThread([&]() {
Expand Down Expand Up @@ -2108,11 +2110,13 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
}
});

testWait.wait(testWaitKey);
testWait.await([&]() { return !testWaitFlag.load(); });
ASSERT_TRUE(op != nullptr);
auto task = op->testingOperatorCtx()->task();
auto taskPauseWait = task->requestPause();
driverWait.notify();

driverWaitFlag = false;
driverWait.notifyAll();
taskPauseWait.wait();

uint64_t reclaimableBytes{0};
Expand All @@ -2126,7 +2130,6 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
}

if (testData.expectedReclaimable) {
const auto usedMemory = op->pool()->usedBytes();
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
op->pool()->reclaim(
Expand All @@ -2137,9 +2140,8 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimDuringInputProcessing) {
ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0);
ASSERT_GT(reclaimerStats_.reclaimedBytes, 0);
reclaimerStats_.reset();
// The hash table itself in the grouping set is not cleared so it still
// uses some memory.
ASSERT_LT(op->pool()->usedBytes(), usedMemory);
// We expect all the memory has been freed from the hash table.
ASSERT_EQ(op->pool()->usedBytes(), 0);
} else {
{
memory::ScopedMemoryArbitrationContext ctx(op->pool());
Expand Down Expand Up @@ -3140,7 +3142,8 @@ DEBUG_ONLY_TEST_F(AggregationTest, reclaimEmptyOutput) {
task->pool()->reclaim(kMaxBytes, 0, stats);
ASSERT_EQ(stats.numNonReclaimableAttempts, 0);
ASSERT_GT(stats.reclaimExecTimeUs, 0);
ASSERT_EQ(stats.reclaimedBytes, 0);
// We expect to reclaim the memory from the hash table.
ASSERT_GT(stats.reclaimedBytes, 0);
ASSERT_GT(stats.reclaimWaitTimeUs, 0);
}
})));
Expand Down
41 changes: 39 additions & 2 deletions velox/exec/tests/HashTableTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ TEST_P(HashTableTest, mixed6Sparse) {
}

// It should be safe to call clear() before we insert any data into HashTable
TEST_P(HashTableTest, clear) {
TEST_P(HashTableTest, clearBeforeInsert) {
std::vector<std::unique_ptr<VectorHasher>> keyHashers;
keyHashers.push_back(std::make_unique<VectorHasher>(BIGINT(), 0 /*channel*/));
core::QueryConfig config({});
Expand All @@ -644,9 +644,46 @@ TEST_P(HashTableTest, clear) {
config);

for (const bool clearTable : {false, true}) {
auto table = HashTable<true>::createForAggregation(
const auto table = HashTable<true>::createForAggregation(
std::move(keyHashers), {Accumulator{aggregate.get(), nullptr}}, pool());
ASSERT_NO_THROW(table->clear(clearTable));
if (clearTable) {
ASSERT_EQ(reinterpret_cast<uint64_t>(table->testingTable()), 0);
ASSERT_EQ(table->capacity(), 0);
} else {
ASSERT_EQ(reinterpret_cast<uint64_t>(table->testingTable()), 0);
ASSERT_EQ(table->capacity(), 0);
}
}
}

TEST_P(HashTableTest, clearAfterInsert) {
const auto rowType =
ROW({"a", "b", "c", "d"}, {BIGINT(), BIGINT(), BIGINT(), BIGINT()});
const auto numKeys = 4;

const int numBatches = 5;
std::vector<RowVectorPtr> inputBatches;
for (int i = 0; i < numBatches; ++i) {
VectorFuzzer fuzzer({}, pool());
inputBatches.push_back(fuzzer.fuzzRow(rowType));
}
for (const bool clearTable : {false, true}) {
const auto table = createHashTableForAggregation(rowType, numKeys);
auto lookup = std::make_unique<HashLookup>(table->hashers());
for (const auto& batch : inputBatches) {
lookup->reset(batch->size());
insertGroups(*batch, *lookup, *table);
}
const uint64_t capacityBeforeInsert = table->capacity();
ASSERT_NO_THROW(table->clear(clearTable));
if (clearTable) {
ASSERT_EQ(reinterpret_cast<uint64_t>(table->testingTable()), 0);
ASSERT_EQ(table->capacity(), 0);
} else {
ASSERT_NE(reinterpret_cast<uint64_t>(table->testingTable()), 0);
ASSERT_EQ(table->capacity(), capacityBeforeInsert);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions velox/exec/tests/RowNumberTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,8 @@ DEBUG_ONLY_TEST_F(RowNumberTest, spillOnlyDuringInputOrOutput) {
}

testingRunArbitration(op->pool(), 0);
// We expect all the memory to be freed after the spill.
ASSERT_EQ(op->pool()->usedBytes(), 0);
})));

core::PlanNodeId rowNumberPlanNodeId;
Expand Down

0 comments on commit 6d15c10

Please sign in to comment.