From 055ea2e43324db5a7157376b08b46861553344f3 Mon Sep 17 00:00:00 2001 From: yan ma Date: Fri, 8 Dec 2023 23:13:06 +0800 Subject: [PATCH] Revert "Add OderBy output stage spill (#7759)" This reverts commit ddc34718836f53a4ae1bcdaf7ed212a29ae935ac. --- velox/exec/OrderBy.cpp | 19 ++++- velox/exec/SortBuffer.cpp | 80 ++++++--------------- velox/exec/SortBuffer.h | 7 -- velox/exec/SortWindowBuild.cpp | 2 +- velox/exec/Spiller.cpp | 68 +++++------------- velox/exec/Spiller.h | 23 +----- velox/exec/TopNRowNumber.cpp | 2 +- velox/exec/tests/OrderByTest.cpp | 13 ++-- velox/exec/tests/SpillerTest.cpp | 118 +++---------------------------- 9 files changed, 81 insertions(+), 251 deletions(-) diff --git a/velox/exec/OrderBy.cpp b/velox/exec/OrderBy.cpp index f78b4517e671..37c5def34bf0 100644 --- a/velox/exec/OrderBy.cpp +++ b/velox/exec/OrderBy.cpp @@ -78,11 +78,24 @@ void OrderBy::reclaim( memory::MemoryReclaimer::Stats& stats) { VELOX_CHECK(canReclaim()); VELOX_CHECK(!nonReclaimableSection_); + auto* driver = operatorCtx_->driver(); - // TODO: support fine-grain disk spilling based on 'targetBytes' after - // having row container memory compaction support later. - sortBuffer_->spill(); + // NOTE: an order by operator is reclaimable if it hasn't started output + // processing and is not under non-reclaimable execution section. + if (noMoreInput_) { + // TODO: reduce the log frequency if it is too verbose. + ++stats.numNonReclaimableAttempts; + LOG(WARNING) + << "Can't reclaim from order by operator which has started producing output: " + << pool()->name() + << ", usage: " << succinctBytes(pool()->currentBytes()) + << ", reservation: " << succinctBytes(pool()->reservedBytes()); + return; + } + // TODO: support fine-grain disk spilling based on 'targetBytes' after having + // row container memory compaction support later. + sortBuffer_->spill(); // Release the minimum reserved memory. pool()->release(); } diff --git a/velox/exec/SortBuffer.cpp b/velox/exec/SortBuffer.cpp index aeca6590b1d3..7e981d1a90e4 100644 --- a/velox/exec/SortBuffer.cpp +++ b/velox/exec/SortBuffer.cpp @@ -16,6 +16,7 @@ #include "SortBuffer.h" #include "velox/exec/MemoryReclaimer.h" +#include "velox/vector/BaseVector.h" namespace facebook::velox::exec { @@ -132,7 +133,11 @@ void SortBuffer::noMoreInput() { // for now. spill(); - finishSpill(); + // Finish spill, and we shouldn't get any rows from non-spilled partition as + // there is only one hash partition for SortBuffer. + VELOX_CHECK_NULL(spillMerger_); + auto spillPartition = spiller_->finishSpill(); + spillMerger_ = spillPartition.createOrderedReader(pool()); } // Releases the unused memory reservation after procesing input. @@ -165,11 +170,24 @@ void SortBuffer::spill() { } updateEstimatedOutputRowSize(); - if (sortedRows_.empty()) { - spillInput(); - } else { - spillOutput(); + if (spiller_ == nullptr) { + spiller_ = std::make_unique( + Spiller::Type::kOrderBy, + data_.get(), + spillerStoreType_, + data_->keyTypes().size(), + sortCompareFlags_, + spillConfig_->getSpillDirPathCb, + spillConfig_->fileNamePrefix, + spillConfig_->writeBufferSize, + spillConfig_->compressionKind, + memory::spillMemoryPool(), + spillConfig_->executor); + VELOX_CHECK_EQ(spiller_->state().maxPartitions(), 1); } + + spiller_->spill(); + data_->clear(); } std::optional SortBuffer::estimateOutputRowSize() const { @@ -260,52 +278,6 @@ void SortBuffer::updateEstimatedOutputRowSize() { } } -void SortBuffer::spillInput() { - if (spiller_ == nullptr) { - VELOX_CHECK(!noMoreInput_); - spiller_ = std::make_unique( - Spiller::Type::kOrderByInput, - data_.get(), - spillerStoreType_, - data_->keyTypes().size(), - sortCompareFlags_, - spillConfig_->getSpillDirPathCb, - spillConfig_->fileNamePrefix, - spillConfig_->writeBufferSize, - spillConfig_->compressionKind, - memory::spillMemoryPool(), - spillConfig_->executor); - } - spiller_->spill(); - data_->clear(); -} - -void SortBuffer::spillOutput() { - if (spiller_ != nullptr) { - // Already spilled. - return; - } - - spiller_ = std::make_unique( - Spiller::Type::kOrderByOutput, - data_.get(), - spillerStoreType_, - spillConfig_->getSpillDirPathCb, - spillConfig_->fileNamePrefix, - spillConfig_->writeBufferSize, - spillConfig_->compressionKind, - memory::spillMemoryPool(), - spillConfig_->executor); - auto spillRows = std::vector( - sortedRows_.begin() + numOutputRows_, sortedRows_.end()); - spiller_->spill(spillRows); - data_->clear(); - sortedRows_.clear(); - // Finish right after spilling as the output spiller only spills at most - // once. - finishSpill(); -} - void SortBuffer::prepareOutput(uint32_t maxOutputRows) { VELOX_CHECK_GT(maxOutputRows, 0); VELOX_CHECK_GT(numInputRows_, numOutputRows_); @@ -392,10 +364,4 @@ void SortBuffer::getOutputWithSpill() { numOutputRows_ += output_->size(); } -void SortBuffer::finishSpill() { - VELOX_CHECK_NULL(spillMerger_); - auto spillPartition = spiller_->finishSpill(); - spillMerger_ = spillPartition.createOrderedReader(pool()); -} - } // namespace facebook::velox::exec diff --git a/velox/exec/SortBuffer.h b/velox/exec/SortBuffer.h index fa62460d203a..0fcd3817d25f 100644 --- a/velox/exec/SortBuffer.h +++ b/velox/exec/SortBuffer.h @@ -80,13 +80,6 @@ class SortBuffer { void prepareOutput(uint32_t maxOutputRows); void getOutputWithoutSpill(); void getOutputWithSpill(); - // Spill during input stage. - void spillInput(); - // Spill during output stage. - void spillOutput(); - // Finish spill, and we shouldn't get any rows from non-spilled partition as - // there is only one hash partition for SortBuffer. - void finishSpill(); const RowTypePtr input_; const std::vector sortCompareFlags_; diff --git a/velox/exec/SortWindowBuild.cpp b/velox/exec/SortWindowBuild.cpp index 7cb9e8667f7e..dd2fd71d4764 100644 --- a/velox/exec/SortWindowBuild.cpp +++ b/velox/exec/SortWindowBuild.cpp @@ -140,7 +140,7 @@ void SortWindowBuild::setupSpiller() { spiller_ = std::make_unique( // TODO Replace Spiller::Type::kOrderBy. - Spiller::Type::kOrderByInput, + Spiller::Type::kOrderBy, data_.get(), inputType_, spillCompareFlags_.size(), diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index 3ffb6fd7fbd9..ca0b1582082b 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -61,7 +61,7 @@ Spiller::Spiller( executor, writeFileOptions) { VELOX_CHECK( - type_ == Type::kOrderByInput || type_ == Type::kAggregateInput, + type_ == Type::kOrderBy || type_ == Type::kAggregateInput, "Unexpected spiller type: {}", typeName(type_)); VELOX_CHECK_EQ(state_.maxPartitions(), 1); @@ -94,8 +94,9 @@ Spiller::Spiller( pool, executor, writeFileOptions) { - VELOX_CHECK( - type_ == Type::kAggregateOutput || type_ == Type::kOrderByOutput, + VELOX_CHECK_EQ( + type, + Type::kAggregateOutput, "Unexpected spiller type: {}", typeName(type_)); VELOX_CHECK_EQ(state_.maxPartitions(), 1); @@ -446,11 +447,9 @@ void Spiller::runSpill() { VELOX_CHECK_EQ(numWritten, run.rows.size()); run.clear(); // When a sorted run ends, we start with a new file next time. For - // aggregation output / orderby output spiller, we expect only one spill - // call to spill all the rows starting from the specified row offset. - if (needSort() || - (type_ == Spiller::Type::kAggregateOutput || - type_ == Spiller::Type::kOrderByOutput)) { + // aggregation output spiller, we expect only one spill call to spill all + // the rows starting from the specified row offset. + if (needSort() || (type_ == Spiller::Type::kAggregateOutput)) { state_.finishFile(partition); } } @@ -468,7 +467,7 @@ void Spiller::updateSpillSortTime(uint64_t timeUs) { bool Spiller::needSort() const { return type_ != Type::kHashJoinProbe && type_ != Type::kHashJoinBuild && - type_ != Type::kAggregateOutput && type_ != Type::kOrderByOutput; + type_ != Type::kAggregateOutput; } void Spiller::spill() { @@ -484,23 +483,15 @@ void Spiller::spill(const RowContainerIterator* startRowIter) { CHECK_NOT_FINALIZED(); VELOX_CHECK_NE(type_, Type::kHashJoinProbe); - markAllPartitionsSpilled(); - - fillSpillRuns(startRowIter); - runSpill(); - checkEmptySpillRuns(); -} - -void Spiller::spill(std::vector& rows) { - CHECK_NOT_FINALIZED(); - VELOX_CHECK_EQ(type_, Type::kOrderByOutput); - if (rows.empty()) { - return; + // Marks all the partitions have been spilled as we don't support fine-grained + // spilling as for now. + for (auto partition = 0; partition < state_.maxPartitions(); ++partition) { + if (!state_.isPartitionSpilled(partition)) { + state_.setPartitionSpilled(partition); + } } - markAllPartitionsSpilled(); - - fillSpillRun(rows); + fillSpillRuns(startRowIter); runSpill(); checkEmptySpillRuns(); } @@ -511,14 +502,6 @@ void Spiller::checkEmptySpillRuns() const { } } -void Spiller::markAllPartitionsSpilled() { - for (auto partition = 0; partition < state_.maxPartitions(); ++partition) { - if (!state_.isPartitionSpilled(partition)) { - state_.setPartitionSpilled(partition); - } - } -} - void Spiller::spill(uint32_t partition, const RowVectorPtr& spillVector) { CHECK_NOT_FINALIZED(); VELOX_CHECK( @@ -615,21 +598,6 @@ void Spiller::fillSpillRuns(const RowContainerIterator* startRowIter) { updateSpillFillTime(execTimeUs); } -void Spiller::fillSpillRun(std::vector& rows) { - VELOX_CHECK_EQ(bits_.numPartitions(), 1); - checkEmptySpillRuns(); - uint64_t execTimeUs{0}; - { - MicrosecondTimer timer(&execTimeUs); - spillRuns_[0].rows = - SpillRows(rows.begin(), rows.end(), spillRuns_[0].rows.get_allocator()); - for (const auto* row : rows) { - spillRuns_[0].numBytes += container_->rowSize(row); - } - } - updateSpillFillTime(execTimeUs); -} - std::string Spiller::toString() const { return fmt::format( "{}\t{}\tMAX_PARTITIONS:{}\tFINALIZED:{}", @@ -642,10 +610,8 @@ std::string Spiller::toString() const { // static std::string Spiller::typeName(Type type) { switch (type) { - case Type::kOrderByInput: - return "ORDER_BY_INPUT"; - case Type::kOrderByOutput: - return "ORDER_BY_OUTPUT"; + case Type::kOrderBy: + return "ORDER_BY"; case Type::kHashJoinBuild: return "HASH_JOIN_BUILD"; case Type::kHashJoinProbe: diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index 815361bf8033..765950747a46 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -35,14 +35,11 @@ class Spiller { kHashJoinBuild = 2, // Used for hash join probe. kHashJoinProbe = 3, - // Used for order by input processing stage. - kOrderByInput = 4, - // Used for order by output processing stage. - kOrderByOutput = 5, + // Used for order by. + kOrderBy = 4, // Number of spiller types. - kNumTypes = 6, + kNumTypes = 5, }; - static std::string typeName(Type); using SpillRows = std::vector>; @@ -121,12 +118,6 @@ class Spiller { /// The caller needs to erase them from the row container. void spill(const RowContainerIterator& startRowIter); - /// Invoked to spill all the rows pointed by rows. This is used by - /// 'kOrderByOutput' spiller type to spill during the order by - /// output processing. Similarly, the spilled rows still stays in the row - /// container. The caller needs to erase them from the row container. - void spill(std::vector& rows); - /// Append 'spillVector' into the spill file of given 'partition'. It is now /// only used by the spilling operator which doesn't need data sort, such as /// hash join build and hash join probe. @@ -283,19 +274,11 @@ class Spiller { void checkEmptySpillRuns() const; - // Marks all the partitions have been spilled as we don't support - // fine-grained spilling as for now. - void markAllPartitionsSpilled(); - // Prepares spill runs for the spillable data from all the hash partitions. // If 'startRowIter' is not null, we prepare runs starting from the offset // pointed by 'startRowIter'. void fillSpillRuns(const RowContainerIterator* startRowIter = nullptr); - // Prepares spill run of a single partition for the spillable data from the - // rows. - void fillSpillRun(std::vector& rows); - // Writes out all the rows collected in spillRuns_. void runSpill(); diff --git a/velox/exec/TopNRowNumber.cpp b/velox/exec/TopNRowNumber.cpp index 5bb0791fbd2e..5bb8b4a9fb0d 100644 --- a/velox/exec/TopNRowNumber.cpp +++ b/velox/exec/TopNRowNumber.cpp @@ -737,7 +737,7 @@ void TopNRowNumber::setupSpiller() { spiller_ = std::make_unique( // TODO Replace Spiller::Type::kOrderBy. - Spiller::Type::kOrderByInput, + Spiller::Type::kOrderBy, data_.get(), inputType_, spillCompareFlags_.size(), diff --git a/velox/exec/tests/OrderByTest.cpp b/velox/exec/tests/OrderByTest.cpp index 205eaa8bd4d2..a3c07e45cfda 100644 --- a/velox/exec/tests/OrderByTest.cpp +++ b/velox/exec/tests/OrderByTest.cpp @@ -1082,10 +1082,15 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { if (enableSpilling) { ASSERT_GT(reclaimableBytes, 0); - reclaimerStats_.reset(); - reclaimAndRestoreCapacity(op, reclaimableBytes, reclaimerStats_); - ASSERT_EQ(reclaimerStats_.reclaimedBytes, reclaimableBytes); + const auto usedMemoryBytes = op->pool()->currentBytes(); + reclaimAndRestoreCapacity( + op, + folly::Random::oneIn(2) ? 0 : folly::Random::rand32(rng_), + reclaimerStats_); + ASSERT_GT(reclaimerStats_.reclaimedBytes, 0); ASSERT_GT(reclaimerStats_.reclaimExecTimeUs, 0); + // No reclaim as the operator has started output processing. + ASSERT_EQ(usedMemoryBytes, op->pool()->currentBytes()); } else { ASSERT_EQ(reclaimableBytes, 0); VELOX_ASSERT_THROW( @@ -1103,7 +1108,7 @@ DEBUG_ONLY_TEST_F(OrderByTest, reclaimDuringOutputProcessing) { ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 0); OperatorTestBase::deleteTaskAndCheckSpillDirectory(task); } - ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 0); + ASSERT_EQ(reclaimerStats_.numNonReclaimableAttempts, 1); } DEBUG_ONLY_TEST_F(OrderByTest, abortDuringOutputProcessing) { diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index 371cc6d3ce4a..c99e9e6cf7a0 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -120,8 +120,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { compressionKind_(param.compressionKind), hashBits_( 0, - (type_ == Spiller::Type::kOrderByInput || - type_ == Spiller::Type::kOrderByOutput || + (type_ == Spiller::Type::kOrderBy || type_ == Spiller::Type::kAggregateOutput || type_ == Spiller::Type::kAggregateInput) ? 0 @@ -385,7 +384,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { } } - void setupSpillContainer(const RowTypePtr& rowType, int32_t numKeys) { + void setupSpillContainer(RowTypePtr rowType, int32_t numKeys) { const auto& childTypes = rowType->children(); std::vector keys(childTypes.begin(), childTypes.begin() + numKeys); std::vector dependents; @@ -397,7 +396,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { rowType_ = rowType; } - void writeSpillData(const std::vector& batches) { + void writeSpillData(std::vector batches) { vector_size_t numRows = 0; for (const auto& batch : batches) { numRows += batch->size(); @@ -451,8 +450,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { // NOTE: for aggregation output type, we expect the merge read to produce // the output rows in the same order of the row insertion. So do need the // sort for testing. - if (type_ == Spiller::Type::kAggregateOutput || - type_ == Spiller::Type::kOrderByOutput) { + if (type_ == Spiller::Type::kAggregateOutput) { return; } for (auto& partition : partitions_) { @@ -492,7 +490,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { pool_.get(), executor()); } else if ( - type_ == Spiller::Type::kOrderByInput || + type_ == Spiller::Type::kOrderBy || type_ == Spiller::Type::kAggregateInput) { // We spill 'data' in one partition in type of kOrderBy, otherwise in 4 // partitions. @@ -508,9 +506,7 @@ class SpillerTest : public exec::test::RowContainerTestBase { compressionKind_, pool_.get(), executor()); - } else if ( - type_ == Spiller::Type::kAggregateOutput || - type_ == Spiller::Type::kOrderByOutput) { + } else if (type_ == Spiller::Type::kAggregateOutput) { spiller_ = std::make_unique( type_, rowContainer_.get(), @@ -1048,9 +1044,7 @@ class NoHashJoin : public SpillerTest, static std::vector getTestParams() { return TestParamsBuilder{ .typesToExclude = - {Spiller::Type::kHashJoinProbe, - Spiller::Type::kHashJoinBuild, - Spiller::Type::kOrderByOutput}} + {Spiller::Type::kHashJoinProbe, Spiller::Type::kHashJoinBuild}} .getTestParams(); } }; @@ -1099,8 +1093,7 @@ TEST_P(NoHashJoin, error) { } TEST_P(AllTypes, nonSortedSpillFunctions) { - if (type_ == Spiller::Type::kOrderByInput || - type_ == Spiller::Type::kOrderByOutput || + if (type_ == Spiller::Type::kOrderBy || type_ == Spiller::Type::kAggregateInput || type_ == Spiller::Type::kAggregateOutput) { setupSpillData(rowType_, numKeys_, 1'000, 1, nullptr, {}); @@ -1120,7 +1113,6 @@ TEST_P(AllTypes, nonSortedSpillFunctions) { verifySortedSpillData(spillPartitionSet.begin()->second.get()); return; } - testNonSortedSpill(1, 1000, 1, 1); testNonSortedSpill(1, 1000, 10, 1); testNonSortedSpill(1, 1000, 1, 1'000'000'000); @@ -1144,8 +1136,7 @@ class HashJoinBuildOnly : public SpillerTest, {Spiller::Type::kAggregateInput, Spiller::Type::kAggregateOutput, Spiller::Type::kHashJoinProbe, - Spiller::Type::kOrderByInput, - Spiller::Type::kOrderByOutput}} + Spiller::Type::kOrderBy}} .getTestParams(); } }; @@ -1236,8 +1227,7 @@ class AggregationOutputOnly : public SpillerTest, {Spiller::Type::kAggregateInput, Spiller::Type::kHashJoinBuild, Spiller::Type::kHashJoinProbe, - Spiller::Type::kOrderByInput, - Spiller::Type::kOrderByOutput}} + Spiller::Type::kOrderBy}} .getTestParams(); } }; @@ -1278,6 +1268,7 @@ TEST_P(AggregationOutputOnly, basic) { ASSERT_EQ(rowContainer_->numRows(), numRows); rowContainer_->clear(); + rowContainer_->clear(); auto spillPartition = spiller_->finishSpill(); ASSERT_TRUE(spiller_->finalized()); @@ -1309,88 +1300,6 @@ TEST_P(AggregationOutputOnly, basic) { } } -class OrderByOutputOnly : public SpillerTest, - public testing::WithParamInterface { - public: - OrderByOutputOnly() : SpillerTest(GetParam()) {} - - static std::vector getTestParams() { - return TestParamsBuilder{ - .typesToExclude = - {Spiller::Type::kAggregateInput, - Spiller::Type::kAggregateOutput, - Spiller::Type::kHashJoinBuild, - Spiller::Type::kHashJoinProbe, - Spiller::Type::kOrderByInput}} - .getTestParams(); - } -}; - -TEST_P(OrderByOutputOnly, basic) { - const int numRows = 5'000; - struct { - int numSpillRows; - - std::string debugString() const { - return fmt::format("numSpillRows {}", numSpillRows); - } - } testSettings[] = {{0}, {1000}, {5000}, {5000 - 1}, {5000 + 1}, {50000 * 2}}; - - for (const auto& testData : testSettings) { - SCOPED_TRACE(testData.debugString()); - - setupSpillData(rowType_, numKeys_, numRows, 0); - sortSpillData(); - // NOTE: target file size is ignored by aggregation output spiller type. - setupSpiller(0, 1'000'000, 0, false); - RowContainerIterator rowIter; - std::vector rows(numRows); - int numListedRows{0}; - numListedRows = - rowContainer_->listRows(&rowIter, testData.numSpillRows, rows.data()); - ASSERT_LE(numListedRows, numRows); - { - RowVectorPtr dummy; - VELOX_ASSERT_THROW( - spiller_->spill(0, dummy), - "Unexpected spiller type: ORDER_BY_OUTPUT"); - } - auto spillRows = - std::vector(rows.begin(), rows.begin() + numListedRows); - spiller_->spill(spillRows); - ASSERT_EQ(rowContainer_->numRows(), numRows); - rowContainer_->clear(); - - rowContainer_->clear(); - auto spillPartition = spiller_->finishSpill(); - ASSERT_TRUE(spiller_->finalized()); - - const int expectedNumSpilledRows = numListedRows; - auto merge = spillPartition.createOrderedReader(pool()); - if (expectedNumSpilledRows == 0) { - ASSERT_TRUE(merge == nullptr); - } else { - for (auto i = 0; i < expectedNumSpilledRows; ++i) { - auto* stream = merge->next(); - ASSERT_TRUE(stream != nullptr); - ASSERT_TRUE(rowVector_->equalValueAt( - &stream->current(), partitions_[0][i], stream->currentIndex())); - stream->pop(); - } - } - - const auto stats = spiller_->stats(); - if (expectedNumSpilledRows == 0) { - ASSERT_EQ(stats.spilledFiles, 0) << stats.toString(); - ASSERT_EQ(stats.spilledRows, 0) << stats.toString(); - } else { - ASSERT_EQ(stats.spilledFiles, 1) << stats.toString(); - ASSERT_EQ(stats.spilledRows, expectedNumSpilledRows) << stats.toString(); - } - ASSERT_EQ(stats.spillSortTimeUs, 0); - } -} - VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, AllTypes, @@ -1410,8 +1319,3 @@ VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, AggregationOutputOnly, testing::ValuesIn(AggregationOutputOnly::getTestParams())); - -VELOX_INSTANTIATE_TEST_SUITE_P( - SpillerTest, - OrderByOutputOnly, - testing::ValuesIn(OrderByOutputOnly::getTestParams()));