Skip to content

Commit

Permalink
add spiller test
Browse files Browse the repository at this point in the history
  • Loading branch information
duanmeng committed Nov 29, 2023
1 parent 6c4396b commit 4a6013c
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 3 deletions.
5 changes: 4 additions & 1 deletion velox/exec/Spiller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,10 @@ void Spiller::spill(const RowContainerIterator* startRowIter) {

void Spiller::spill(std::vector<char*> rows) {
CHECK_NOT_FINALIZED();
VELOX_CHECK_NE(type_, Type::kHashJoinProbe);
VELOX_CHECK_EQ(type_, Type::kOrderByOutput);
if (rows.empty()) {
return;
}

// Marks all the partitions have been spilled as we don't support fine-grained
// spilling as for now.
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/Spiller.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ class Spiller {
/// The caller needs to erase them from the row container.
void spill(const RowContainerIterator& startRowIter);

/// Invoked to spill. Spill all rows pointed by the pointers in sortedRows.
/// Invoked to spill.Spill all rows pointed by the pointers in sortedRows.This
/// is only used by 'kOrderByOutput' spiller type to spill during the order by
/// output processing. Similarly, the spilled rows still stays in the row
/// container.The caller needs to erase them from the row container.
void spill(std::vector<char*> sortedRows);

/// Append 'spillVector' into the spill file of given 'partition'. It is now
Expand Down
88 changes: 87 additions & 1 deletion velox/exec/tests/SpillerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1277,7 +1277,6 @@ TEST_P(AggregationOutputOnly, basic) {
ASSERT_EQ(rowContainer_->numRows(), numRows);
rowContainer_->clear();

rowContainer_->clear();
auto spillPartition = spiller_->finishSpill();
ASSERT_TRUE(spiller_->finalized());

Expand Down Expand Up @@ -1309,6 +1308,88 @@ TEST_P(AggregationOutputOnly, basic) {
}
}

class OrderByOutputOnly : public SpillerTest,
public testing::WithParamInterface<TestParam> {
public:
OrderByOutputOnly() : SpillerTest(GetParam()) {}

static std::vector<TestParam> getTestParams() {
return TestParamsBuilder{
.typesToExclude =
{Spiller::Type::kAggregateInput,
Spiller::Type::kAggregateOutput,
Spiller::Type::kHashJoinBuild,
Spiller::Type::kHashJoinProbe,
Spiller::Type::kOrderByInput}}
.getTestParams();
}
};

TEST_P(OrderByOutputOnly, basic) {
const int numRows = 5'000;
struct {
int numSpillRows;

std::string debugString() const {
return fmt::format("numSpillRows {}", numSpillRows);
}
} testSettings[] = {{0}, {1000}, {5000}, {5000 - 1}, {5000 + 1}, {50000 * 2}};

for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

setupSpillData(rowType_, numKeys_, numRows, 0);
sortSpillData();
// NOTE: target file size is ignored by aggregation output spiller type.
setupSpiller(0, 1'000'000, 0, false);
RowContainerIterator rowIter;
std::vector<char*> rows(numRows);
int numListedRows{0};
numListedRows =
rowContainer_->listRows(&rowIter, testData.numSpillRows, rows.data());
ASSERT_LE(numListedRows, numRows);
{
RowVectorPtr dummy;
VELOX_ASSERT_THROW(
spiller_->spill(0, dummy),
"Unexpected spiller type: ORDER_BY_OUTPUT");
}
auto spillRows =
std::vector<char*>(rows.begin(), rows.begin() + numListedRows);
spiller_->spill(std::move(spillRows));
ASSERT_EQ(rowContainer_->numRows(), numRows);
rowContainer_->clear();

rowContainer_->clear();
auto spillPartition = spiller_->finishSpill();
ASSERT_TRUE(spiller_->finalized());

const int expectedNumSpilledRows = numListedRows;
auto merge = spillPartition.createOrderedReader(pool());
if (expectedNumSpilledRows == 0) {
ASSERT_TRUE(merge == nullptr);
} else {
for (auto i = 0; i < expectedNumSpilledRows; ++i) {
auto* stream = merge->next();
ASSERT_TRUE(stream != nullptr);
ASSERT_TRUE(rowVector_->equalValueAt(
&stream->current(), partitions_[0][i], stream->currentIndex()));
stream->pop();
}
}

const auto stats = spiller_->stats();
if (expectedNumSpilledRows == 0) {
ASSERT_EQ(stats.spilledFiles, 0) << stats.toString();
ASSERT_EQ(stats.spilledRows, 0) << stats.toString();
} else {
ASSERT_EQ(stats.spilledFiles, 1) << stats.toString();
ASSERT_EQ(stats.spilledRows, expectedNumSpilledRows) << stats.toString();
}
ASSERT_EQ(stats.spillSortTimeUs, 0);
}
}

VELOX_INSTANTIATE_TEST_SUITE_P(
SpillerTest,
AllTypes,
Expand All @@ -1328,3 +1409,8 @@ VELOX_INSTANTIATE_TEST_SUITE_P(
SpillerTest,
AggregationOutputOnly,
testing::ValuesIn(AggregationOutputOnly::getTestParams()));

VELOX_INSTANTIATE_TEST_SUITE_P(
SpillerTest,
OrderByOutputOnly,
testing::ValuesIn(OrderByOutputOnly::getTestParams()));

0 comments on commit 4a6013c

Please sign in to comment.