Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
fixup

fixup

fixup

fixup

fixup

Revert "fixup"

This reverts commit f9efc1a.

fixup

fixup

fixup

fixup

test

This reverts commit 73bf319b76d74a794d2fcffa3b992f581d69f6a1.
  • Loading branch information
zhztheplayer committed Nov 16, 2023
1 parent 6401b7b commit e8572ae
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 10 deletions.
3 changes: 1 addition & 2 deletions velox/core/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,7 @@ bool AggregationNode::canSpill(const QueryConfig& queryConfig) const {
}
// TODO: add spilling for pre-grouped aggregation later:
// https://github.com/facebookincubator/velox/issues/3264
return (isFinal() || isSingle()) && preGroupedKeys().empty() &&
queryConfig.aggregationSpillEnabled();
return preGroupedKeys().empty() && queryConfig.aggregationSpillEnabled();
}

void AggregationNode::addDetails(std::stringstream& stream) const {
Expand Down
8 changes: 4 additions & 4 deletions velox/core/tests/PlanFragmentTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ TEST_F(PlanFragmentTest, aggregationCanSpill) {
{AggregationNode::Step::kSingle, true, true, false, false, true},
{AggregationNode::Step::kIntermediate, false, true, false, false, false},
{AggregationNode::Step::kIntermediate, true, false, false, false, false},
{AggregationNode::Step::kIntermediate, true, true, true, false, false},
{AggregationNode::Step::kIntermediate, true, true, true, false, true},
{AggregationNode::Step::kIntermediate, true, true, false, true, false},
{AggregationNode::Step::kIntermediate, true, true, false, false, false},
{AggregationNode::Step::kIntermediate, true, true, false, false, true},
{AggregationNode::Step::kPartial, false, true, false, false, false},
{AggregationNode::Step::kPartial, true, false, false, false, false},
{AggregationNode::Step::kPartial, true, true, true, false, false},
{AggregationNode::Step::kPartial, true, true, true, false, true},
{AggregationNode::Step::kPartial, true, true, false, true, false},
{AggregationNode::Step::kPartial, true, true, false, false, false},
{AggregationNode::Step::kPartial, true, true, false, false, true},
{AggregationNode::Step::kFinal, false, true, false, false, false},
{AggregationNode::Step::kFinal, true, false, false, false, false},
{AggregationNode::Step::kFinal, true, true, true, false, true},
Expand Down
8 changes: 6 additions & 2 deletions velox/exec/GroupingSet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ const HashLookup& GroupingSet::hashLookup() const {
void GroupingSet::ensureInputFits(const RowVectorPtr& input) {
// Spilling is considered if this is a final or single aggregation and
// spillPath is set.
if (isPartial_ || spillConfig_ == nullptr) {
if (spillConfig_ == nullptr) {
return;
}

Expand Down Expand Up @@ -913,7 +913,7 @@ void GroupingSet::ensureOutputFits() {
// to reserve memory for the output as we can't reclaim much memory from this
// operator itself. The output processing can reclaim memory from the other
// operator or query through memory arbitration.
if (isPartial_ || spillConfig_ == nullptr || hasSpilled()) {
if (spillConfig_ == nullptr || hasSpilled()) {
return;
}

Expand Down Expand Up @@ -960,6 +960,10 @@ void GroupingSet::spill() {
return;
}

if (hasSpilled() && spiller_->finalized()) {
return;
}

if (!hasSpilled()) {
auto rows = table_->rows();
VELOX_DCHECK(pool_.trackUsage());
Expand Down
126 changes: 126 additions & 0 deletions velox/exec/tests/AggregationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "folly/experimental/EventCount.h"
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/Memory.h"
#include "velox/common/testutil/TestValue.h"
#include "velox/dwio/common/tests/utils/BatchMaker.h"
#include "velox/exec/Aggregate.h"
Expand Down Expand Up @@ -397,6 +398,33 @@ class AggregationTest : public OperatorTestBase {
VARCHAR()})};
folly::Random::DefaultGenerator rng_;
memory::MemoryReclaimer::Stats reclaimerStats_;

std::shared_ptr<core::QueryCtx> newQueryCtx(
int64_t memoryCapacity = memory::kMaxMemory) {
std::unordered_map<std::string, std::shared_ptr<Config>> configs;
std::shared_ptr<memory::MemoryPool> pool = memoryManager_->addRootPool(
"", memoryCapacity, MemoryReclaimer::create());
auto queryCtx = std::make_shared<core::QueryCtx>(
executor_.get(),
core::QueryConfig({}),
configs,
cache::AsyncDataCache::getInstance(),
std::move(pool));
return queryCtx;
}

void setupMemory() {
memory::MemoryManagerOptions options;
options.arbitratorKind = "SHARED";
options.checkUsageLeak = true;
memoryAllocator_ = memory::MemoryAllocator::createDefaultInstance();
options.allocator = memoryAllocator_.get();
memoryManager_ = std::make_unique<memory::MemoryManager>(options);
}

private:
std::shared_ptr<memory::MemoryAllocator> memoryAllocator_;
std::unique_ptr<memory::MemoryManager> memoryManager_;
};

template <>
Expand Down Expand Up @@ -847,6 +875,104 @@ TEST_F(AggregationTest, partialAggregationMemoryLimit) {
.customStats.count("flushRowCount"));
}

// TODO move to arbitrator test
TEST_F(AggregationTest, partialAggregationSpill) {
VectorFuzzer::Options fuzzerOpts;
fuzzerOpts.vectorSize = 128;
RowTypePtr rowType = ROW(
{{"c0", INTEGER()},
{"c1", INTEGER()},
{"c2", INTEGER()},
{"c3", INTEGER()},
{"c4", INTEGER()},
{"c5", INTEGER()},
{"c6", INTEGER()},
{"c7", INTEGER()},
{"c8", INTEGER()},
{"c9", INTEGER()},
{"c10", INTEGER()}});
VectorFuzzer fuzzer(std::move(fuzzerOpts), pool());

std::vector<RowVectorPtr> vectors;

const int32_t numVectors = 2000;
for (int i = 0; i < numVectors; i++) {
vectors.push_back(fuzzer.fuzzRow(rowType));
}

createDuckDbTable(vectors);

setupMemory();

core::PlanNodeId partialAggNodeId;
core::PlanNodeId finalAggNodeId;
// Set an artificially low limit on the amount of data to accumulate in
// the partial aggregation.

// Distinct aggregation.
auto spillDirectory1 = exec::test::TempDirectoryPath::create();
auto task = AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(newQueryCtx(10LL << 10 << 10))
.spillDirectory(spillDirectory1->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(
QueryConfig::kAggregationSpillMemoryThreshold,
std::to_string(0)) // always spill on final agg
.plan(PlanBuilder()
.values(vectors)
.partialAggregation({"c0"}, {})
.capturePlanNodeId(partialAggNodeId)
.finalAggregation()
.capturePlanNodeId(finalAggNodeId)
.planNode())
.assertResults("SELECT distinct c0 FROM tmp");

checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true);
checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true);

// Count aggregation.
auto spillDirectory2 = exec::test::TempDirectoryPath::create();
task = AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(newQueryCtx(10LL << 10 << 10))
.spillDirectory(spillDirectory2->path)
.config(QueryConfig::kSpillEnabled, "true")
.config(QueryConfig::kAggregationSpillEnabled, "true")
.config(
QueryConfig::kAggregationSpillMemoryThreshold,
std::to_string(0)) // always spill on final agg
.plan(PlanBuilder()
.values(vectors)
.partialAggregation({"c0"}, {"count(1)"})
.capturePlanNodeId(partialAggNodeId)
.finalAggregation()
.capturePlanNodeId(finalAggNodeId)
.planNode())
.assertResults("SELECT c0, count(1) FROM tmp GROUP BY 1");

checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), true);
checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), true);

// Global aggregation.
task = AssertQueryBuilder(duckDbQueryRunner_)
.queryCtx(newQueryCtx(10LL << 10 << 10))
.plan(PlanBuilder()
.values(vectors)
.partialAggregation({}, {"sum(c0)"})
.capturePlanNodeId(partialAggNodeId)
.finalAggregation()
.capturePlanNodeId(finalAggNodeId)
.planNode())
.assertResults("SELECT sum(c0) FROM tmp");
EXPECT_EQ(
0,
toPlanStats(task->taskStats())
.at(partialAggNodeId)
.customStats.count("flushRowCount"));
checkSpillStats(toPlanStats(task->taskStats()).at(partialAggNodeId), false);
checkSpillStats(toPlanStats(task->taskStats()).at(finalAggNodeId), false);
}

TEST_F(AggregationTest, partialDistinctWithAbandon) {
auto vectors = {
// 1st batch will produce 100 distinct groups from 10 rows.
Expand Down
13 changes: 11 additions & 2 deletions velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -625,8 +625,17 @@ class ApproxPercentileAggregate : public exec::Aggregate {
auto rowVec = decoded.base()->as<RowVector>();
if constexpr (checkIntermediateInputs) {
VELOX_USER_CHECK(rowVec);
for (int i = kPercentiles; i <= kAccuracy; ++i) {
VELOX_USER_CHECK(rowVec->childAt(i)->isConstantEncoding());
for (int i = kPercentiles; i <= kPercentiles; ++i) {
VELOX_USER_CHECK(
rowVec->childAt(i)->isConstantEncoding() ||
rowVec->childAt(i)
->isArrayEncoding()); // spilling flats constant encoding
}
for (int i = kPercentilesIsArray; i <= kAccuracy; ++i) {
VELOX_USER_CHECK(
rowVec->childAt(i)->isConstantEncoding() ||
rowVec->childAt(i)
->isFlatEncoding()); // spilling flats constant encoding
}
for (int i = kK; i <= kMaxValue; ++i) {
VELOX_USER_CHECK(rowVec->childAt(i)->isFlatEncoding());
Expand Down
5 changes: 5 additions & 0 deletions velox/vector/BaseVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ class BaseVector {
*/
virtual std::unique_ptr<SimpleVector<uint64_t>> hashAll() const = 0;

/// Returns true if this vector is encoded as array
bool isArrayEncoding() const {
return encoding_ == VectorEncoding::Simple::ARRAY;
}

/// Returns true if this vector is encoded as flat (FlatVector).
bool isFlatEncoding() const {
return encoding_ == VectorEncoding::Simple::FLAT;
Expand Down

0 comments on commit e8572ae

Please sign in to comment.