Skip to content

Commit

Permalink
fix: Fix SortBuffer ensureOutputFits estimateOutputSize inaccurate (#…
Browse files Browse the repository at this point in the history
…11534)

Summary:
The output batch reserved size should be rowSize * numRows, missed numRows before.

Pull Request resolved: #11534

Reviewed By: zacw7

Differential Revision: D66104290

Pulled By: xiaoxmeng

fbshipit-source-id: cb1c382737cc6d2f51ef43d8e23c15d73a3b744d
  • Loading branch information
jinchengchenghh authored and facebook-github-bot committed Nov 19, 2024
1 parent f03ac2b commit 7a47fb4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 16 deletions.
27 changes: 13 additions & 14 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,12 @@ RowVectorPtr SortBuffer::getOutput(vector_size_t maxOutputRows) {
if (numOutputRows_ == numInputRows_) {
return nullptr;
}

ensureOutputFits();
prepareOutput(maxOutputRows);
VELOX_CHECK_GT(maxOutputRows, 0);
VELOX_CHECK_GT(numInputRows_, numOutputRows_);
const vector_size_t batchSize =
std::min<uint64_t>(numInputRows_ - numOutputRows_, maxOutputRows);
ensureOutputFits(batchSize);
prepareOutput(batchSize);
if (spiller_ != nullptr) {
getOutputWithSpill();
} else {
Expand Down Expand Up @@ -247,7 +250,8 @@ void SortBuffer::ensureInputFits(const VectorPtr& input) {
<< ", reservation: " << succinctBytes(pool()->reservedBytes());
}

void SortBuffer::ensureOutputFits() {
void SortBuffer::ensureOutputFits(vector_size_t batchSize) {
VELOX_CHECK_GT(batchSize, 0);
// Check if spilling is enabled or not.
if (spillConfig_ == nullptr) {
return;
Expand All @@ -259,12 +263,12 @@ void SortBuffer::ensureOutputFits() {
return;
}

if (!estimatedOutputRowSize_.has_value()) {
if (!estimatedOutputRowSize_.has_value() || spiller_ != nullptr) {
return;
}

const uint64_t outputBufferSizeToReserve =
estimatedOutputRowSize_.value() * 1.2;
estimatedOutputRowSize_.value() * batchSize * 1.2;
{
memory::ReclaimableSectionGuard guard(nonReclaimableSection_);
if (pool_->maybeReserve(outputBufferSizeToReserve)) {
Expand Down Expand Up @@ -373,11 +377,7 @@ void SortBuffer::spillOutput() {
finishSpill();
}

void SortBuffer::prepareOutput(vector_size_t maxOutputRows) {
VELOX_CHECK_GT(maxOutputRows, 0);
VELOX_CHECK_GT(numInputRows_, numOutputRows_);
const vector_size_t batchSize =
std::min<uint64_t>(numInputRows_ - numOutputRows_, maxOutputRows);
void SortBuffer::prepareOutput(vector_size_t batchSize) {
if (output_ != nullptr) {
VectorPtr output = std::move(output_);
BaseVector::prepareForReuse(output, batchSize);
Expand All @@ -392,13 +392,12 @@ void SortBuffer::prepareOutput(vector_size_t maxOutputRows) {
}

if (spiller_ != nullptr) {
spillSources_.resize(maxOutputRows);
spillSourceRows_.resize(maxOutputRows);
spillSources_.resize(batchSize);
spillSourceRows_.resize(batchSize);
prepareOutputWithSpill();
}

VELOX_CHECK_GT(output_->size(), 0);
VELOX_DCHECK_LE(output_->size(), maxOutputRows);
VELOX_CHECK_LE(output_->size() + numOutputRows_, numInputRows_);
}

Expand Down
4 changes: 2 additions & 2 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,13 @@ class SortBuffer {
void ensureInputFits(const VectorPtr& input);
// Reserves memory for output processing. If reservation cannot be increased,
// spills enough to make output fit.
void ensureOutputFits();
void ensureOutputFits(vector_size_t outputBatchSize);
// Reserves memory for sort. If reservation cannot be increased,
// spills enough to make output fit.
void ensureSortFits();
void updateEstimatedOutputRowSize();
// Invoked to initialize or reset the reusable output buffer to get output.
void prepareOutput(vector_size_t maxOutputRows);
void prepareOutput(vector_size_t outputBatchSize);
// Invoked to initialize reader to read the spilled data from storage for
// output processing.
void prepareOutputWithSpill();
Expand Down

0 comments on commit 7a47fb4

Please sign in to comment.