From 6e334dc418ee82d156ff2d6adc37c7d08b9e968a Mon Sep 17 00:00:00 2001 From: duanmeng Date: Thu, 7 Dec 2023 16:13:41 +0800 Subject: [PATCH] resolve comments --- velox/exec/SortBuffer.cpp | 72 ++++++++++++++++++--------------------- velox/exec/SortBuffer.h | 7 ++-- velox/exec/Spiller.h | 8 ++--- 3 files changed, 42 insertions(+), 45 deletions(-) diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index ee5ecd32c8a9..aeca6590b1d3 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -166,28 +166,9 @@ void SortBuffer::spill() { updateEstimatedOutputRowSize(); if (sortedRows_.empty()) { - // Input stage spill. - if (spiller_ == nullptr) { - spiller_ = makeSpiller(Spiller::Type::kOrderByInput); - } - spiller_->spill(); - data_->clear(); + spillInput(); } else { - // Output stage spill. - if (spiller_ != nullptr) { - // Already spilled. - return; - } - - spiller_ = makeSpiller(Spiller::Type::kOrderByOutput); - auto spillRows = std::vector( - sortedRows_.begin() + numOutputRows_, sortedRows_.end()); - spiller_->spill(spillRows); - data_->clear(); - sortedRows_.clear(); - // Finish right after spilling as the output spiller only spills at most - // once. - finishSpill(); + spillOutput(); } } @@ -279,10 +260,10 @@ void SortBuffer::updateEstimatedOutputRowSize() { } } -std::unique_ptr SortBuffer::makeSpiller(Spiller::Type type) { - std::unique_ptr spiller; - if (type == Spiller::Type::kOrderByInput) { - spiller = std::make_unique( +void SortBuffer::spillInput() { + if (spiller_ == nullptr) { + VELOX_CHECK(!noMoreInput_); + spiller_ = std::make_unique( Spiller::Type::kOrderByInput, data_.get(), spillerStoreType_, @@ -294,20 +275,35 @@ std::unique_ptr SortBuffer::makeSpiller(Spiller::Type type) { spillConfig_->compressionKind, memory::spillMemoryPool(), spillConfig_->executor); - } else { - spiller = std::make_unique( - Spiller::Type::kOrderByOutput, - data_.get(), - spillerStoreType_, - spillConfig_->getSpillDirPathCb, - spillConfig_->fileNamePrefix, - spillConfig_->writeBufferSize, - spillConfig_->compressionKind, - memory::spillMemoryPool(), - spillConfig_->executor); } - VELOX_CHECK_EQ(spiller->state().maxPartitions(), 1); - return spiller; + spiller_->spill(); + data_->clear(); +} + +void SortBuffer::spillOutput() { + if (spiller_ != nullptr) { + // Already spilled. + return; + } + + spiller_ = std::make_unique( + Spiller::Type::kOrderByOutput, + data_.get(), + spillerStoreType_, + spillConfig_->getSpillDirPathCb, + spillConfig_->fileNamePrefix, + spillConfig_->writeBufferSize, + spillConfig_->compressionKind, + memory::spillMemoryPool(), + spillConfig_->executor); + auto spillRows = std::vector( + sortedRows_.begin() + numOutputRows_, sortedRows_.end()); + spiller_->spill(spillRows); + data_->clear(); + sortedRows_.clear(); + // Finish right after spilling as the output spiller only spills at most + // once. + finishSpill(); } void SortBuffer::prepareOutput(uint32_t maxOutputRows) { diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index 2ed3bfbda48b..fa62460d203a 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -80,13 +80,14 @@ class SortBuffer { void prepareOutput(uint32_t maxOutputRows); void getOutputWithoutSpill(); void getOutputWithSpill(); - + // Spill during input stage. + void spillInput(); + // Spill during output stage. + void spillOutput(); // Finish spill, and we shouldn't get any rows from non-spilled partition as // there is only one hash partition for SortBuffer. void finishSpill(); - std::unique_ptr makeSpiller(Spiller::Type type); - const RowTypePtr input_; const std::vector sortCompareFlags_; velox::memory::MemoryPool* const pool_; diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index def850133251..815361bf8033 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -283,8 +283,8 @@ class Spiller { void checkEmptySpillRuns() const; - /// Marks all the partitions have been spilled as we don't support - /// fine-grained spilling as for now. + // Marks all the partitions have been spilled as we don't support + // fine-grained spilling as for now. void markAllPartitionsSpilled(); // Prepares spill runs for the spillable data from all the hash partitions. @@ -292,8 +292,8 @@ class Spiller { // pointed by 'startRowIter'. void fillSpillRuns(const RowContainerIterator* startRowIter = nullptr); - /// Prepares spill run of a single partition for the spillable data from the - /// rows. + // Prepares spill run of a single partition for the spillable data from the + // rows. void fillSpillRun(std::vector& rows); // Writes out all the rows collected in spillRuns_.