Skip to content

Commit

Permalink
Release memory when reservation triggers spilling (facebookincubator#…
Browse files Browse the repository at this point in the history
…11063)

Summary:
Spillable operator might trigger spill when it reserve memory from the arbitrator. If that happens, we shall release the reserved memory if the reserved memory is no longer needed after spilling.

Pull Request resolved: facebookincubator#11063

Reviewed By: xiaoxmeng

Differential Revision: D63207407

Pulled By: tanjialiang

fbshipit-source-id: 8a956a25be69ca6fccc55e929a62034d4b9e8c5b
  • Loading branch information
tanjialiang authored and facebook-github-bot committed Sep 24, 2024
1 parent b48c5dd commit a95acb7
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 8 deletions.
8 changes: 8 additions & 0 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,14 @@ void GroupingSet::ensureOutputFits() {
{
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (pool_.maybeReserve(outputBufferSizeToReserve)) {
if (hasSpilled()) {
// If reservation triggers spilling on the 'GroupingSet' itself, we will
// no longer need the reserved memory for output processing as the
// output processing will be conducted from unspilled data through
// 'getOutputWithSpill()', and it does not require this amount of memory
// to process.
pool_.release();
}
return;
}
}
Expand Down
26 changes: 19 additions & 7 deletions velox/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -495,15 +495,21 @@ void HashBuild::ensureInputFits(RowVectorPtr& input) {

{
Operator::ReclaimableSectionGuard guard(this);
if (!pool()->maybeReserve(targetIncrementBytes)) {
LOG(WARNING) << "Failed to reserve "
<< succinctBytes(targetIncrementBytes) << " for memory pool "
<< pool()->name()
<< ", usage: " << succinctBytes(pool()->usedBytes())
<< ", reservation: "
<< succinctBytes(pool()->reservedBytes());
if (pool()->maybeReserve(targetIncrementBytes)) {
// If above reservation triggers the spilling of 'HashBuild' operator
// itself, we will no longer need the reserved memory for building hash
// table as the table is spilled, and the input will be directly spilled,
// too.
if (spiller_->isAllSpilled()) {
pool()->release();
}
return;
}
}
LOG(WARNING) << "Failed to reserve " << succinctBytes(targetIncrementBytes)
<< " for memory pool " << pool()->name()
<< ", usage: " << succinctBytes(pool()->usedBytes())
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
}

void HashBuild::spillInput(const RowVectorPtr& input) {
Expand Down Expand Up @@ -806,6 +812,12 @@ void HashBuild::ensureTableFits(uint64_t numRows) {
{
Operator::ReclaimableSectionGuard guard(this);
if (pool()->maybeReserve(memoryBytesToReserve)) {
// If reservation triggers the spilling of 'HashBuild' operator itself, we
// will no longer need the reserved memory for building hash table as the
// table is spilled.
if (spiller_->isAllSpilled()) {
pool()->release();
}
return;
}
}
Expand Down
12 changes: 11 additions & 1 deletion velox/exec/RowNumber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,10 @@ void RowNumber::addInput(RowVectorPtr input) {

void RowNumber::addSpillInput() {
VELOX_CHECK_NOT_NULL(input_);
VELOX_CHECK_NULL(inputSpiller_);
ensureInputFits(input_);
if (input_ == nullptr) {
VELOX_CHECK_NOT_NULL(inputSpiller_);
// Memory arbitration might be triggered by ensureInputFits() which will
// spill 'input_'.
return;
Expand All @@ -124,6 +126,7 @@ void RowNumber::noMoreInput() {

if (inputSpiller_ != nullptr) {
inputSpiller_->finishSpill(spillInputPartitionSet_);
inputSpiller_.reset();
removeEmptyPartitions(spillInputPartitionSet_);
restoreNextSpillPartition();
}
Expand Down Expand Up @@ -185,7 +188,7 @@ void RowNumber::restoreNextSpillPartition() {
}

void RowNumber::ensureInputFits(const RowVectorPtr& input) {
if (!spillEnabled()) {
if (!spillEnabled() || inputSpiller_ != nullptr) {
// Spilling is disabled.
return;
}
Expand Down Expand Up @@ -242,6 +245,12 @@ void RowNumber::ensureInputFits(const RowVectorPtr& input) {
{
Operator::ReclaimableSectionGuard guard(this);
if (pool()->maybeReserve(targetIncrementBytes)) {
// If reservation triggers the spilling of 'RowNumber' operator itself, we
// will no longer need the reserved memory for building hash table as the
// table is spilled.
if (inputSpiller_ != nullptr) {
pool()->release();
}
return;
}
}
Expand Down Expand Up @@ -524,6 +533,7 @@ void RowNumber::recursiveSpillInput() {
}

inputSpiller_->finishSpill(spillInputPartitionSet_);
inputSpiller_.reset();
spillInputReader_ = nullptr;

removeEmptyPartitions(spillInputPartitionSet_);
Expand Down

0 comments on commit a95acb7

Please sign in to comment.