Skip to content

Commit

Permalink
Free up memory resource after hash probe spill (#10879)
Browse files Browse the repository at this point in the history
Summary:
The join fuzzer detects the memory usage of hash probe operator increased after spill and
this mostly happen on the first hash probe input spill which creates buffers and spilled output
reader which consumes memory from the hash probe operator pool. This PR fixes this by
clear the buffers at the end of spill as well as defer the spilled output reader creation.

Pull Request resolved: #10879

Test Plan: Join fuzzer tests run through >5hrs with opt build and debug build with oom injection

Reviewed By: Yuhta, bikramSingh91

Differential Revision: D61928578

Pulled By: xiaoxmeng

fbshipit-source-id: 78de4345c0b4c4aaaeb72773c6e1436f29ddcec1
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Aug 29, 2024
1 parent 5f935a7 commit 00194ad
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 16 deletions.
34 changes: 25 additions & 9 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,8 @@ RowVectorPtr HashProbe::getOutputInternal(bool toSpillOutput) {
}

bool HashProbe::maybeReadSpillOutput() {
maybeSetupSpillOutputReader();

if (spillOutputReader_ == nullptr) {
return false;
}
Expand Down Expand Up @@ -1734,6 +1736,7 @@ void HashProbe::reclaim(

for (auto* probeOp : probeOps) {
VELOX_CHECK_NOT_NULL(probeOp);
probeOp->clearBuffers();
// Setup all the probe operators to spill the rest of probe inputs if the
// table has been spilled.
if (!spillPartitionSet.empty()) {
Expand Down Expand Up @@ -1837,16 +1840,24 @@ void HashProbe::spillOutput() {
}
VELOX_CHECK_LE(outputSpiller->spilledPartitionSet().size(), 1);

SpillPartitionSet outputSpillSet;
outputSpiller->finishSpill(outputSpillSet);
VELOX_CHECK_EQ(outputSpillSet.size(), 1);
VELOX_CHECK(spillOutputPartitionSet_.empty());
outputSpiller->finishSpill(spillOutputPartitionSet_);
VELOX_CHECK_EQ(spillOutputPartitionSet_.size(), 1);

removeEmptyPartitions(spillOutputPartitionSet_);
}

removeEmptyPartitions(outputSpillSet);
if (outputSpillSet.empty()) {
void HashProbe::maybeSetupSpillOutputReader() {
if (spillOutputPartitionSet_.empty()) {
return;
}
spillOutputReader_ = outputSpillSet.begin()->second->createUnorderedReader(
spillConfig_->readBufferSize, pool(), &spillStats_);
VELOX_CHECK_EQ(spillOutputPartitionSet_.size(), 1);
VELOX_CHECK_NULL(spillOutputReader_);

spillOutputReader_ =
spillOutputPartitionSet_.begin()->second->createUnorderedReader(
spillConfig_->readBufferSize, pool(), &spillStats_);
spillOutputPartitionSet_.clear();
}

SpillPartitionSet HashProbe::spillTable() {
Expand Down Expand Up @@ -1991,15 +2002,20 @@ void HashProbe::close() {
joinBridge_.reset();
inputSpiller_.reset();
table_.reset();
spillInputReader_.reset();
spillOutputPartitionSet_.clear();
spillOutputReader_.reset();
clearBuffers();
}

void HashProbe::clearBuffers() {
outputRowMapping_.reset();
tempOutputRowMapping_.reset();
outputTableRows_.reset();
tempOutputTableRows_.reset();
output_.reset();
nonSpillInputIndicesBuffer_.reset();
spillInputIndicesBuffers_.clear();
spillInputReader_.reset();
spillOutputReader_.reset();
}

} // namespace facebook::velox::exec
11 changes: 10 additions & 1 deletion velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ class HashProbe : public Operator {
// operator is set to reclaimable at this stage.
void ensureOutputFits();

// Setups spilled output reader if 'spillOutputPartitionSet_' is not empty.
void maybeSetupSpillOutputReader();

// Reads from the spilled output if the spilling has been triggered during the
// middle of an input processing. The latter produces all the outputs and
// spill them on to disk in case the output is too large to fit in memory in
Expand Down Expand Up @@ -303,7 +306,9 @@ class HashProbe : public Operator {
// Wake up the peer hash probe operators when last probe operator finishes.
void wakeupPeerOperators();

// std::vector<Operator*> findPeerOperators();
// Invoked to release internal buffers to free up memory resources after
// memory reclamation or operator close.
void clearBuffers();

// TODO: Define batch size as bytes based on RowContainer row sizes.
const uint32_t outputBatchSize_;
Expand Down Expand Up @@ -618,6 +623,10 @@ class HashProbe : public Operator {
// The row type used to spill hash table on disk.
RowTypePtr tableSpillType_;

// The spilled output partition set which is cleared after setup
// 'spillOutputReader_'.
SpillPartitionSet spillOutputPartitionSet_;

// The reader used to read the spilled output produced by pending input during
// the spill processing.
std::unique_ptr<UnorderedStreamReader<BatchStream>> spillOutputReader_;
Expand Down
6 changes: 0 additions & 6 deletions velox/exec/Operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -656,12 +656,6 @@ uint64_t Operator::MemoryReclaimer::reclaim(
memory::ScopedReclaimedBytesRecorder recoder(pool, &reclaimedBytes);
op_->reclaim(targetBytes, stats);
}
// NOTE: the parallel hash build is running at the background thread
// pool which won't stop during memory reclamation so the operator's
// memory usage might increase in such case. memory usage.
if (op_->operatorType() == "HashBuild") {
reclaimedBytes = std::max<int64_t>(0, reclaimedBytes);
}
return reclaimedBytes;
},
stats);
Expand Down

0 comments on commit 00194ad

Please sign in to comment.