Skip to content

Commit

Permalink
Fix default error for approx_set and empty_approx_set (facebookincuba…
Browse files Browse the repository at this point in the history
…tor#9399)

Summary:

In Presto, the default standard error for `approx_set`
(and `empty_approx_set`) is
[0.01625](https://prestodb.io/docs/current/functions/hyperloglog.html#approx_set),
which is different from `approx_distinct`
([0.023](https://prestodb.io/docs/current/functions/aggregate.html#approx_distinct)).
In Velox we are using 0.023 for both, this causes problem (in addition to the
precision loss) that when `empty_approx_set` is constant folded by the
coordinator, it has a different standard error and failing the sanity check.
Fix this by aligning the default error value with Presto.

Differential Revision: D55814871
  • Loading branch information
Yuhta authored and facebook-github-bot committed Apr 6, 2024
1 parent 568a96c commit 9392bf2
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 36 deletions.
3 changes: 2 additions & 1 deletion velox/common/hyperloglog/HllUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ namespace facebook::velox::common::hll {

constexpr double kLowestMaxStandardError = 0.0040625;
constexpr double kHighestMaxStandardError = 0.26000;
constexpr double kDefaultStandardError = 0.023;
constexpr double kDefaultApproxDistinctStandardError = 0.023;
constexpr double kDefaultApproxSetStandardError = 0.01625;

const int8_t kPrestoSparseV2 = 2;
const int8_t kPrestoDenseV2 = 3;
Expand Down
2 changes: 1 addition & 1 deletion velox/functions/prestosql/HyperLogLogFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ struct EmptyApproxSetFunction {

FOLLY_ALWAYS_INLINE bool call(out_type<HyperLogLog>& result) {
static const std::string kEmpty =
common::hll::SparseHll::serializeEmpty(11);
common::hll::SparseHll::serializeEmpty(12);

result.resize(kEmpty.size());
memcpy(result.data(), kEmpty.data(), kEmpty.size());
Expand Down
41 changes: 29 additions & 12 deletions velox/functions/prestosql/aggregates/ApproxDistinctAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,12 @@ class ApproxDistinctAggregate : public exec::Aggregate {
explicit ApproxDistinctAggregate(
const TypePtr& resultType,
bool hllAsFinalResult,
bool hllAsRawInput)
bool hllAsRawInput,
double defaultError)
: exec::Aggregate(resultType),
hllAsFinalResult_{hllAsFinalResult},
hllAsRawInput_{hllAsRawInput} {}
hllAsRawInput_{hllAsRawInput},
indexBitLength_{common::hll::toIndexBitLength(defaultError)} {}

int32_t accumulatorFixedWidthSize() const override {
return sizeof(HllAccumulator);
Expand Down Expand Up @@ -405,8 +407,7 @@ class ApproxDistinctAggregate : public exec::Aggregate {
/// serialized HLLs.
const bool hllAsRawInput_;

int8_t indexBitLength_{
common::hll::toIndexBitLength(common::hll::kDefaultStandardError)};
int8_t indexBitLength_;
double maxStandardError_{-1};
DecodedVector decodedValue_;
DecodedVector decodedMaxStandardError_;
Expand All @@ -417,18 +418,20 @@ template <TypeKind kind>
std::unique_ptr<exec::Aggregate> createApproxDistinct(
const TypePtr& resultType,
bool hllAsFinalResult,
bool hllAsRawInput) {
bool hllAsRawInput,
double defaultError) {
using T = typename TypeTraits<kind>::NativeType;
return std::make_unique<ApproxDistinctAggregate<T>>(
resultType, hllAsFinalResult, hllAsRawInput);
resultType, hllAsFinalResult, hllAsRawInput, defaultError);
}

exec::AggregateRegistrationResult registerApproxDistinct(
const std::string& name,
bool hllAsFinalResult,
bool hllAsRawInput,
bool withCompanionFunctions,
bool overwrite) {
bool overwrite,
double defaultError) {
auto returnType = hllAsFinalResult ? "hyperloglog" : "bigint";

std::vector<std::shared_ptr<exec::AggregateFunctionSignature>> signatures;
Expand Down Expand Up @@ -484,7 +487,7 @@ exec::AggregateRegistrationResult registerApproxDistinct(
return exec::registerAggregateFunction(
name,
std::move(signatures),
[name, hllAsFinalResult, hllAsRawInput](
[name, hllAsFinalResult, hllAsRawInput, defaultError](
core::AggregationNode::Step /*step*/,
const std::vector<TypePtr>& argTypes,
const TypePtr& resultType,
Expand All @@ -496,7 +499,8 @@ exec::AggregateRegistrationResult registerApproxDistinct(
type->kind(),
resultType,
hllAsFinalResult,
hllAsRawInput);
hllAsRawInput,
defaultError);
},
withCompanionFunctions,
overwrite);
Expand All @@ -516,11 +520,24 @@ void registerApproxDistinctAggregates(
false,
false,
withCompanionFunctions,
overwrite);
overwrite,
common::hll::kDefaultApproxDistinctStandardError);
// approx_set and merge are already companion functions themselves. Don't
// register companion functions for them.
registerApproxDistinct(prefix + kApproxSet, true, false, false, overwrite);
registerApproxDistinct(prefix + kMerge, true, true, false, overwrite);
registerApproxDistinct(
prefix + kApproxSet,
true,
false,
false,
overwrite,
common::hll::kDefaultApproxSetStandardError);
registerApproxDistinct(
prefix + kMerge,
true,
true,
false,
overwrite,
common::hll::kDefaultApproxSetStandardError);
}

} // namespace facebook::velox::aggregate::prestosql
85 changes: 63 additions & 22 deletions velox/functions/prestosql/aggregates/tests/ApproxDistinctTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ class ApproxDistinctTest : public AggregationTestBase {
void testGlobalAgg(
const VectorPtr& values,
double maxStandardError,
int64_t expectedResult,
bool testWithTableScan = true) {
int64_t expectedResult) {
auto vectors = makeRowVector({values});
auto expected =
makeRowVector({makeNullableFlatVector<int64_t>({expectedResult})});
Expand Down Expand Up @@ -63,7 +62,7 @@ class ApproxDistinctTest : public AggregationTestBase {
void testGlobalAgg(
const VectorPtr& values,
int64_t expectedResult,
bool testWithTableScan = true) {
bool testApproxSet = true) {
auto vectors = makeRowVector({values});
auto expected =
makeRowVector({makeNullableFlatVector<int64_t>({expectedResult})});
Expand All @@ -78,8 +77,10 @@ class ApproxDistinctTest : public AggregationTestBase {
{},
{expected});

testAggregations(
{vectors}, {}, {"approx_set(c0)"}, {"cardinality(a0)"}, {expected});
if (testApproxSet) {
testAggregations(
{vectors}, {}, {"approx_set(c0)"}, {"cardinality(a0)"}, {expected});
}
}

template <typename T, typename U>
Expand All @@ -100,7 +101,8 @@ class ApproxDistinctTest : public AggregationTestBase {
void testGroupByAgg(
const VectorPtr& keys,
const VectorPtr& values,
const std::unordered_map<int32_t, int64_t>& expectedResults) {
const std::unordered_map<int32_t, int64_t>& expectedResults,
bool testApproxSet = true) {
auto vectors = makeRowVector({keys, values});
auto expected = toRowVector(expectedResults);

Expand All @@ -114,12 +116,14 @@ class ApproxDistinctTest : public AggregationTestBase {
{},
{expected});

testAggregations(
{vectors},
{"c0"},
{"approx_set(c1)"},
{"c0", "cardinality(a0)"},
{expected});
if (testApproxSet) {
testAggregations(
{vectors},
{"c0"},
{"approx_set(c1)"},
{"c0", "cardinality(a0)"},
{expected});
}
}
};

Expand Down Expand Up @@ -170,7 +174,13 @@ TEST_F(ApproxDistinctTest, groupByHighCardinalityIntegers) {
auto keys = makeFlatVector<int32_t>(size, [](auto row) { return row % 2; });
auto values = makeFlatVector<int32_t>(size, [](auto row) { return row; });

testGroupByAgg(keys, values, {{0, 488}, {1, 493}});
testGroupByAgg(keys, values, {{0, 488}, {1, 493}}, false);
testAggregations(
{makeRowVector({keys, values})},
{"c0"},
{"approx_set(c1)"},
{"c0", "cardinality(a0)"},
{toRowVector<int32_t, int64_t>({{0, 500}, {1, 500}})});
}

TEST_F(ApproxDistinctTest, groupByVeryLowCardinalityIntegers) {
Expand Down Expand Up @@ -230,7 +240,13 @@ TEST_F(ApproxDistinctTest, globalAggHighCardinalityIntegers) {
vector_size_t size = 1'000;
auto values = makeFlatVector<int32_t>(size, [](auto row) { return row; });

testGlobalAgg(values, 977);
testGlobalAgg(values, 977, false);
testAggregations(
{makeRowVector({values})},
{},
{"approx_set(c0)"},
{"cardinality(a0)"},
{makeRowVector({makeFlatVector<int64_t>(std::vector<int64_t>({997}))})});
}

TEST_F(ApproxDistinctTest, globalAggVeryLowCardinalityIntegers) {
Expand All @@ -244,7 +260,13 @@ TEST_F(ApproxDistinctTest, toIndexBitLength) {
ASSERT_EQ(
common::hll::toIndexBitLength(common::hll::kHighestMaxStandardError), 4);
ASSERT_EQ(
common::hll::toIndexBitLength(common::hll::kDefaultStandardError), 11);
common::hll::toIndexBitLength(
common::hll::kDefaultApproxDistinctStandardError),
11);
ASSERT_EQ(
common::hll::toIndexBitLength(
common::hll::kDefaultApproxSetStandardError),
12);
ASSERT_EQ(
common::hll::toIndexBitLength(common::hll::kLowestMaxStandardError), 16);

Expand Down Expand Up @@ -317,14 +339,16 @@ TEST_F(ApproxDistinctTest, globalAggAllNulls) {
TEST_F(ApproxDistinctTest, hugeInt) {
auto hugeIntValues =
makeFlatVector<int128_t>(50000, [](auto row) { return row; });
// Last param is set false to disable tablescan test
// as DWRF writer doesn't have hugeint support.
// Refer:https://github.com/facebookincubator/velox/issues/7775
testGlobalAgg(hugeIntValues, 49669, false);
testGlobalAgg(
hugeIntValues, common::hll::kLowestMaxStandardError, 50110, false);
testGlobalAgg(
hugeIntValues, common::hll::kHighestMaxStandardError, 41741, false);
testAggregations(
{makeRowVector({hugeIntValues})},
{},
{"approx_set(c0)"},
{"cardinality(a0)"},
{makeRowVector(
{makeFlatVector<int64_t>(std::vector<int64_t>({49958}))})});
testGlobalAgg(hugeIntValues, common::hll::kLowestMaxStandardError, 50110);
testGlobalAgg(hugeIntValues, common::hll::kHighestMaxStandardError, 41741);
}

TEST_F(ApproxDistinctTest, streaming) {
Expand Down Expand Up @@ -366,5 +390,22 @@ TEST_F(ApproxDistinctTest, memoryLeakInMerge) {
toPlanStats(task->taskStats()).at(finalAgg).peakMemoryBytes, 180'000);
}

TEST_F(ApproxDistinctTest, mergeWithEmpty) {
constexpr int kSize = 500;
auto input = makeRowVector({
makeFlatVector<int32_t>(kSize, [](auto i) { return std::min(i, 1); }),
makeFlatVector<int32_t>(
kSize, folly::identity, [](auto i) { return i == 0; }),
});
auto op = PlanBuilder()
.values({input})
.singleAggregation({"c0"}, {"approx_set(c1)"})
.project({"coalesce(a0, empty_approx_set())"})
.singleAggregation({}, {"merge(p0)"})
.project({"cardinality(a0)"})
.planNode();
ASSERT_EQ(readSingleValue(op).value<TypeKind::BIGINT>(), 499);
}

} // namespace
} // namespace facebook::velox::aggregate::test

0 comments on commit 9392bf2

Please sign in to comment.