Skip to content

Commit

Permalink
Implement toIntermediate for merging HLL (facebookincubator#9421)
Browse files Browse the repository at this point in the history
Summary:

For the queries that read HLL digests from table and call `merge` on
them, we used to create empty accumulator and merge it with the serialized
digest, and then serialize the merged accumulator again, resulting in wasted
CPU.  Fix this by passing the serialized digests directly in case of abandon
partial aggregation.

Reviewed By: mbasmanova

Differential Revision: D55935663
  • Loading branch information
Yuhta authored and facebook-github-bot committed Apr 10, 2024
1 parent 22279f9 commit e50d1d9
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 18 deletions.
20 changes: 20 additions & 0 deletions velox/exec/Aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,24 @@ void Aggregate::clearInternal() {
numNulls_ = 0;
}

void Aggregate::singleInputAsIntermediate(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
VectorPtr& result) const {
VELOX_CHECK_EQ(args.size(), 1);
const auto& input = args[0];
if (rows.isAllSelected()) {
result = input;
return;
}
VELOX_CHECK_NOT_NULL(result);
// Set result to NULL for rows that are masked out.
{
auto nulls = allocateNulls(rows.size(), allocator_->pool(), bits::kNull);
rows.clearNulls(nulls);
result->setNulls(nulls);
}
result->copy(input.get(), rows, nullptr);
}

} // namespace facebook::velox::exec
7 changes: 7 additions & 0 deletions velox/exec/Aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,13 @@ class Aggregate {
// 'groups'. No-op for fixed length accumulators.
virtual void destroyInternal(folly::Range<char**> groups) {}

// Helper function to pass single input argument directly as intermediate
// result.
void singleInputAsIntermediate(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
VectorPtr& result) const;

// Shorthand for maintaining accumulator variable length size in
// accumulator update methods. Use like: { auto tracker =
// trackRowSize(group); update(group); }
Expand Down
11 changes: 11 additions & 0 deletions velox/functions/prestosql/aggregates/ApproxDistinctAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,17 @@ class ApproxDistinctAggregate : public exec::Aggregate {
return false;
}

bool supportsToIntermediate() const final {
return hllAsRawInput_;
}

void toIntermediate(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
VectorPtr& result) const final {
singleInputAsIntermediate(rows, args, result);
}

void extractValues(char** groups, int32_t numGroups, VectorPtr* result)
override {
if (hllAsFinalResult_) {
Expand Down
19 changes: 1 addition & 18 deletions velox/functions/prestosql/aggregates/MinMaxAggregates.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,24 +55,7 @@ class MinMaxAggregate : public SimpleNumericAggregate<T, T, T> {
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
VectorPtr& result) const override {
const auto& input = args[0];
if (rows.isAllSelected()) {
result = input;
return;
}

auto* pool = BaseAggregate::allocator_->pool();

result = BaseVector::create(input->type(), rows.size(), pool);

// Set result to NULL for rows that are masked out.
{
BufferPtr nulls = allocateNulls(rows.size(), pool, bits::kNull);
rows.clearNulls(nulls);
result->setNulls(nulls);
}

result->copy(input.get(), rows, nullptr);
this->singleInputAsIntermediate(rows, args, result);
}

void extractValues(char** groups, int32_t numGroups, VectorPtr* result)
Expand Down
15 changes: 15 additions & 0 deletions velox/functions/prestosql/aggregates/tests/ApproxDistinctTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,5 +407,20 @@ TEST_F(ApproxDistinctTest, mergeWithEmpty) {
ASSERT_EQ(readSingleValue(op).value<TypeKind::BIGINT>(), 499);
}

TEST_F(ApproxDistinctTest, toIntermediate) {
constexpr int kSize = 1000;
auto input = makeRowVector({
makeFlatVector<int32_t>(kSize, folly::identity),
makeConstant<int64_t>(1, kSize),
});
auto plan = PlanBuilder()
.values({input})
.singleAggregation({"c0"}, {"approx_set(c1)"})
.planNode();
auto digests = split(AssertQueryBuilder(plan).copyResults(pool()), 2);
testAggregations(
digests, {"c0"}, {"merge(a0)"}, {"c0", "cardinality(a0)"}, {input});
}

} // namespace
} // namespace facebook::velox::aggregate::test

0 comments on commit e50d1d9

Please sign in to comment.