diff --git a/velox/exec/Spiller.cpp b/velox/exec/Spiller.cpp index 94daf1c76f1ea..c59987f94a72a 100644 --- a/velox/exec/Spiller.cpp +++ b/velox/exec/Spiller.cpp @@ -490,7 +490,10 @@ void Spiller::spill(const RowContainerIterator* startRowIter) { void Spiller::spill(std::vector<char*> rows) { CHECK_NOT_FINALIZED(); - VELOX_CHECK_NE(type_, Type::kHashJoinProbe); + VELOX_CHECK_EQ(type_, Type::kOrderByOutput); + if (rows.empty()) { + return; + } // Marks all the partitions have been spilled as we don't support fine-grained // spilling as for now. diff --git a/velox/exec/Spiller.h b/velox/exec/Spiller.h index d4542d6033f3b..dd7e9270544d7 100644 --- a/velox/exec/Spiller.h +++ b/velox/exec/Spiller.h @@ -119,7 +119,10 @@ class Spiller { /// The caller needs to erase them from the row container. void spill(const RowContainerIterator& startRowIter); - /// Invoked to spill. Spill all rows pointed by the pointers in sortedRows. + /// Invoked to spill.Spill all rows pointed by the pointers in sortedRows.This + /// is only used by 'kOrderByOutput' spiller type to spill during the order by + /// output processing. Similarly, the spilled rows still stays in the row + /// container.The caller needs to erase them from the row container. void spill(std::vector<char*> sortedRows); /// Append 'spillVector' into the spill file of given 'partition'. It is now diff --git a/velox/exec/tests/SpillerTest.cpp b/velox/exec/tests/SpillerTest.cpp index eececf9b27829..e1d3bf0f1eb00 100644 --- a/velox/exec/tests/SpillerTest.cpp +++ b/velox/exec/tests/SpillerTest.cpp @@ -1277,7 +1277,6 @@ TEST_P(AggregationOutputOnly, basic) { ASSERT_EQ(rowContainer_->numRows(), numRows); rowContainer_->clear(); - rowContainer_->clear(); auto spillPartition = spiller_->finishSpill(); ASSERT_TRUE(spiller_->finalized()); @@ -1309,6 +1308,88 @@ TEST_P(AggregationOutputOnly, basic) { } } +class OrderByOutputOnly : public SpillerTest, + public testing::WithParamInterface<TestParam> { + public: + OrderByOutputOnly() : SpillerTest(GetParam()) {} + + static std::vector<TestParam> getTestParams() { + return TestParamsBuilder{ + .typesToExclude = + {Spiller::Type::kAggregateInput, + Spiller::Type::kAggregateOutput, + Spiller::Type::kHashJoinBuild, + Spiller::Type::kHashJoinProbe, + Spiller::Type::kOrderByInput}} + .getTestParams(); + } +}; + +TEST_P(OrderByOutputOnly, basic) { + const int numRows = 5'000; + struct { + int numSpillRows; + + std::string debugString() const { + return fmt::format("numSpillRows {}", numSpillRows); + } + } testSettings[] = {{0}, {1000}, {5000}, {5000 - 1}, {5000 + 1}, {50000 * 2}}; + + for (const auto& testData : testSettings) { + SCOPED_TRACE(testData.debugString()); + + setupSpillData(rowType_, numKeys_, numRows, 0); + sortSpillData(); + // NOTE: target file size is ignored by aggregation output spiller type. + setupSpiller(0, 1'000'000, 0, false); + RowContainerIterator rowIter; + std::vector<char*> rows(numRows); + int numListedRows{0}; + numListedRows = + rowContainer_->listRows(&rowIter, testData.numSpillRows, rows.data()); + ASSERT_LE(numListedRows, numRows); + { + RowVectorPtr dummy; + VELOX_ASSERT_THROW( + spiller_->spill(0, dummy), + "Unexpected spiller type: ORDER_BY_OUTPUT"); + } + auto spillRows = + std::vector<char*>(rows.begin(), rows.begin() + numListedRows); + spiller_->spill(std::move(spillRows)); + ASSERT_EQ(rowContainer_->numRows(), numRows); + rowContainer_->clear(); + + rowContainer_->clear(); + auto spillPartition = spiller_->finishSpill(); + ASSERT_TRUE(spiller_->finalized()); + + const int expectedNumSpilledRows = numListedRows; + auto merge = spillPartition.createOrderedReader(pool()); + if (expectedNumSpilledRows == 0) { + ASSERT_TRUE(merge == nullptr); + } else { + for (auto i = 0; i < expectedNumSpilledRows; ++i) { + auto* stream = merge->next(); + ASSERT_TRUE(stream != nullptr); + ASSERT_TRUE(rowVector_->equalValueAt( + &stream->current(), partitions_[0][i], stream->currentIndex())); + stream->pop(); + } + } + + const auto stats = spiller_->stats(); + if (expectedNumSpilledRows == 0) { + ASSERT_EQ(stats.spilledFiles, 0) << stats.toString(); + ASSERT_EQ(stats.spilledRows, 0) << stats.toString(); + } else { + ASSERT_EQ(stats.spilledFiles, 1) << stats.toString(); + ASSERT_EQ(stats.spilledRows, expectedNumSpilledRows) << stats.toString(); + } + ASSERT_EQ(stats.spillSortTimeUs, 0); + } +} + VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, AllTypes, @@ -1328,3 +1409,8 @@ VELOX_INSTANTIATE_TEST_SUITE_P( SpillerTest, AggregationOutputOnly, testing::ValuesIn(AggregationOutputOnly::getTestParams())); + +VELOX_INSTANTIATE_TEST_SUITE_P( + SpillerTest, + OrderByOutputOnly, + testing::ValuesIn(OrderByOutputOnly::getTestParams()));