Skip to content

Commit

Permalink
resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Dec 7, 2023
1 parent 2ceee00 commit 6e334dc
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 45 deletions.
72 changes: 34 additions & 38 deletions velox/exec/SortBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,28 +166,9 @@ void SortBuffer::spill() {
updateEstimatedOutputRowSize();

if (sortedRows_.empty()) {
// Input stage spill.
if (spiller_ == nullptr) {
spiller_ = makeSpiller(Spiller::Type::kOrderByInput);
}
spiller_->spill();
data_->clear();
spillInput();
} else {
// Output stage spill.
if (spiller_ != nullptr) {
// Already spilled.
return;
}

spiller_ = makeSpiller(Spiller::Type::kOrderByOutput);
auto spillRows = std::vector<char*>(
sortedRows_.begin() + numOutputRows_, sortedRows_.end());
spiller_->spill(spillRows);
data_->clear();
sortedRows_.clear();
// Finish right after spilling as the output spiller only spills at most
// once.
finishSpill();
spillOutput();
}
}

Expand Down Expand Up @@ -279,10 +260,10 @@ void SortBuffer::updateEstimatedOutputRowSize() {
}
}

std::unique_ptr<Spiller> SortBuffer::makeSpiller(Spiller::Type type) {
std::unique_ptr<Spiller> spiller;
if (type == Spiller::Type::kOrderByInput) {
spiller = std::make_unique<Spiller>(
void SortBuffer::spillInput() {
if (spiller_ == nullptr) {
VELOX_CHECK(!noMoreInput_);
spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderByInput,
data_.get(),
spillerStoreType_,
Expand All @@ -294,20 +275,35 @@ std::unique_ptr<Spiller> SortBuffer::makeSpiller(Spiller::Type type) {
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
} else {
spiller = std::make_unique<Spiller>(
Spiller::Type::kOrderByOutput,
data_.get(),
spillerStoreType_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
}
VELOX_CHECK_EQ(spiller->state().maxPartitions(), 1);
return spiller;
spiller_->spill();
data_->clear();
}

void SortBuffer::spillOutput() {
if (spiller_ != nullptr) {
// Already spilled.
return;
}

spiller_ = std::make_unique<Spiller>(
Spiller::Type::kOrderByOutput,
data_.get(),
spillerStoreType_,
spillConfig_->getSpillDirPathCb,
spillConfig_->fileNamePrefix,
spillConfig_->writeBufferSize,
spillConfig_->compressionKind,
memory::spillMemoryPool(),
spillConfig_->executor);
auto spillRows = std::vector<char*>(
sortedRows_.begin() + numOutputRows_, sortedRows_.end());
spiller_->spill(spillRows);
data_->clear();
sortedRows_.clear();
// Finish right after spilling as the output spiller only spills at most
// once.
finishSpill();
}

void SortBuffer::prepareOutput(uint32_t maxOutputRows) {
Expand Down
7 changes: 4 additions & 3 deletions velox/exec/SortBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,14 @@ class SortBuffer {
void prepareOutput(uint32_t maxOutputRows);
void getOutputWithoutSpill();
void getOutputWithSpill();

// Spill during input stage.
void spillInput();
// Spill during output stage.
void spillOutput();
// Finish spill, and we shouldn't get any rows from non-spilled partition as
// there is only one hash partition for SortBuffer.
void finishSpill();

std::unique_ptr<Spiller> makeSpiller(Spiller::Type type);

const RowTypePtr input_;
const std::vector<CompareFlags> sortCompareFlags_;
velox::memory::MemoryPool* const pool_;
Expand Down
8 changes: 4 additions & 4 deletions velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,17 +283,17 @@ class Spiller {

void checkEmptySpillRuns() const;

/// Marks all the partitions have been spilled as we don't support
/// fine-grained spilling as for now.
// Marks all the partitions have been spilled as we don't support
// fine-grained spilling as for now.
void markAllPartitionsSpilled();

// Prepares spill runs for the spillable data from all the hash partitions.
// If 'startRowIter' is not null, we prepare runs starting from the offset
// pointed by 'startRowIter'.
void fillSpillRuns(const RowContainerIterator* startRowIter = nullptr);

/// Prepares spill run of a single partition for the spillable data from the
/// rows.
// Prepares spill run of a single partition for the spillable data from the
// rows.
void fillSpillRun(std::vector<char*>& rows);

// Writes out all the rows collected in spillRuns_.
Expand Down

0 comments on commit 6e334dc

Please sign in to comment.