Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Dec 19, 2023
1 parent 0de210b commit 3fd188e
Showing 1 changed file with 96 additions and 35 deletions.
131 changes: 96 additions & 35 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,46 @@ TEST_F(AggregationTest, spillWithMemoryLimit) {
}
}

TEST_F(AggregationTest, spillAll) {
auto inputs = makeVectors(rowType_, 100, 10);

const auto numDistincts =
AssertQueryBuilder(PlanBuilder()
.values(inputs)
.singleAggregation({"c0"}, {}, {})
.planNode())
.copyResults(pool_.get())
->size();

auto plan = PlanBuilder()
.values(inputs)
.singleAggregation({"c0"}, {"array_agg(c1)"})
.planNode();

auto results = AssertQueryBuilder(plan).copyResults(pool_.get());

auto tempDirectory = exec::test::TempDirectoryPath::create();
auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
auto task = AssertQueryBuilder(plan)
.spillDirectory(tempDirectory->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
// Set one spill partition to avoid the test flakiness.
// Set the memory trigger limit to be a very small value.
.config(QueryConfig::kAggregationSpillMemoryThreshold, "1024")
.assertResults(results);

auto stats = task->taskStats().pipelineStats;
ASSERT_LT(0, stats[0].operatorStats[1].runtimeStats["spillRuns"].count);
// Check spilled bytes.
ASSERT_LT(0, stats[0].operatorStats[1].spilledInputBytes);
ASSERT_LT(0, stats[0].operatorStats[1].spilledBytes);
ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 1);
// Verifies all the rows have been spilled.
ASSERT_EQ(stats[0].operatorStats[1].spilledRows, numDistincts);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}

TEST_F(AggregationTest, partialSpillWithMemoryLimit) {
constexpr int32_t kNumDistinct = 2000;
constexpr int64_t kMaxBytes = 1LL << 30; // 1GB
Expand All @@ -1139,9 +1179,9 @@ TEST_F(AggregationTest, partialSpillWithMemoryLimit) {
core::PlanNodeId partialAggrNodeId;
const auto plan = PlanBuilder()
.values(batches)
.partialAggregation({"c0"}, {}, {})
.partialAggregation({"c0"}, {"count(1)", "sum(c1)"})
.capturePlanNodeId(partialAggrNodeId)
.finalAggregation({"c0"}, {}, {})
.finalAggregation()
.planNode();
const auto expectedResults =
AssertQueryBuilder(plan).copyResults(pool_.get());
Expand Down Expand Up @@ -1190,44 +1230,65 @@ TEST_F(AggregationTest, partialSpillWithMemoryLimit) {
}
}

TEST_F(AggregationTest, spillAll) {
auto inputs = makeVectors(rowType_, 100, 10);
TEST_F(AggregationTest, partialDistinctSpillWithMemoryLimit) {
constexpr int32_t kNumDistinct = 2000;
constexpr int64_t kMaxBytes = 1LL << 30; // 1GB
rng_.seed(1);
rowType_ = ROW({"c0", "c1", "c2"}, {INTEGER(), INTEGER(), INTEGER()});
auto batches = makeVectors(rowType_, 100, 5);

const auto numDistincts =
AssertQueryBuilder(PlanBuilder()
.values(inputs)
.singleAggregation({"c0"}, {}, {})
.planNode())
.copyResults(pool_.get())
->size();
core::PlanNodeId partialAggrNodeId;
const auto plan = PlanBuilder()
.values(batches)
.partialAggregation({"c0"}, {}, {})
.capturePlanNodeId(partialAggrNodeId)
.finalAggregation({"c0"}, {}, {})
.planNode();
const auto expectedResults =
AssertQueryBuilder(plan).copyResults(pool_.get());

auto plan = PlanBuilder()
.values(inputs)
.singleAggregation({"c0"}, {"array_agg(c1)"})
.planNode();
struct {
uint64_t aggregationMemLimit;
bool expectSpill;

auto results = AssertQueryBuilder(plan).copyResults(pool_.get());
std::string debugString() const {
return fmt::format(
"aggregationMemLimit:{}, expectSpill:{}",
aggregationMemLimit,
expectSpill);
}
} testSettings[] = {// Memory limit is disabled so spilling is not triggered.
{0, false},
// Memory limit is too small so always trigger spilling.
{1, true},
// Memory limit is too large so spilling is not triggered.
{1'000'000'000, false}};
for (const auto& testData : testSettings) {
SCOPED_TRACE(testData.debugString());

auto tempDirectory = exec::test::TempDirectoryPath::create();
auto queryCtx = std::make_shared<core::QueryCtx>(executor_.get());
auto task = AssertQueryBuilder(plan)
.spillDirectory(tempDirectory->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
// Set one spill partition to avoid the test flakiness.
// Set the memory trigger limit to be a very small value.
.config(QueryConfig::kAggregationSpillMemoryThreshold, "1024")
.assertResults(results);
auto spillDirectory = exec::test::TempDirectoryPath::create();
auto task =
AssertQueryBuilder(plan)
.spillDirectory(spillDirectory->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kPartialAggregationSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(
QueryConfig::kAggregationSpillMemoryThreshold,
std::to_string(testData.aggregationMemLimit))
.config(
QueryConfig::kAbandonPartialAggregationMinPct,
"200") // avoid abandoning
.config(
QueryConfig::kAbandonPartialAggregationMinRows,
std::to_string(1LL << 30)) // avoid abandoning
.assertResults(expectedResults);

auto stats = task->taskStats().pipelineStats;
ASSERT_LT(0, stats[0].operatorStats[1].runtimeStats["spillRuns"].count);
// Check spilled bytes.
ASSERT_LT(0, stats[0].operatorStats[1].spilledInputBytes);
ASSERT_LT(0, stats[0].operatorStats[1].spilledBytes);
ASSERT_EQ(stats[0].operatorStats[1].spilledPartitions, 1);
// Verifies all the rows have been spilled.
ASSERT_EQ(stats[0].operatorStats[1].spilledRows, numDistincts);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
auto taskStats = exec::toPlanStats(task->taskStats());
auto& stats = taskStats.at(partialAggrNodeId);
checkSpillStats(stats, testData.expectSpill);
OperatorTestBase::deleteTaskAndCheckSpillDirectory(task);
}
}

// Verify number of memory allocations in the HashAggregation operator.
Expand Down

0 comments on commit 3fd188e

Please sign in to comment.