Skip to content

Commit

Permalink
Add support for kurtosis Spark aggregate function (facebookincubator#…
Browse files Browse the repository at this point in the history
…9233)

Summary:
Spark uses Pearson's formula for calculating the kurtosis coefficient, which is
m4 / (m2^2) - 3. Meanwhile, Presto employs the sample kurtosis calculation
formula. The results from the two methods are completely different,
necessitating the implementation of a separate ResultAccessor for Spark's
kurtosis.

Spark's implement
https://github.com/apache/spark/blob/e22ddcbd852c95375d39fd6074627e1b5a91c6e7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala#L333-L336

Pull Request resolved: facebookincubator#9233

Reviewed By: mbasmanova

Differential Revision: D55331913

Pulled By: Yuhta

fbshipit-source-id: 82eb077325d828bc083da17596a9df56ad06bf56
  • Loading branch information
liujiayi771 authored and facebook-github-bot committed Mar 26, 2024
1 parent 2d832ee commit 494b888
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 8 deletions.
6 changes: 6 additions & 0 deletions velox/docs/functions/spark/aggregate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ General Aggregate Functions
Returns the first non-null value of `x`.

.. spark:function:: kurtosis(x) -> double
Returns the Pearson's kurtosis of all input values. When the count of `x` is not empty,
a non-null output will be generated. When the value of `m2` in the accumulator is 0, a null
output will be generated.

.. spark:function:: last(x) -> x
Returns the last value of `x`.
Expand Down
15 changes: 15 additions & 0 deletions velox/functions/sparksql/aggregates/CentralMomentsAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ struct SkewnessResultAccessor {
}
};

struct KurtosisResultAccessor {
static bool hasResult(const CentralMomentsAccumulator& accumulator) {
return accumulator.count() >= 1 && accumulator.m2() != 0;
}

static double result(const CentralMomentsAccumulator& accumulator) {
double count = accumulator.count();
double m2 = accumulator.m2();
double m4 = accumulator.m4();
return count * m4 / (m2 * m2) - 3.0;
}
};

template <typename TResultAccessor>
exec::AggregateRegistrationResult registerCentralMoments(
const std::string& name,
Expand Down Expand Up @@ -109,6 +122,8 @@ void registerCentralMomentsAggregate(
bool overwrite) {
registerCentralMoments<SkewnessResultAccessor>(
prefix + "skewness", withCompanionFunctions, overwrite);
registerCentralMoments<KurtosisResultAccessor>(
prefix + "kurtosis", withCompanionFunctions, overwrite);
}

} // namespace facebook::velox::functions::aggregate::sparksql
Original file line number Diff line number Diff line change
Expand Up @@ -31,29 +31,53 @@ class CentralMomentsAggregationTest : public AggregationTestBase {
registerAggregateFunctions("spark_");
}

void testSkewnessResult(
void testCenteralMomentsAggResult(
const std::string& agg,
const RowVectorPtr& input,
const RowVectorPtr& expected) {
PlanBuilder builder(pool());
builder.values({input});
builder.singleAggregation({}, {"spark_skewness(c0)"});
AssertQueryBuilder queryBuilder(
builder.planNode(), this->duckDbQueryRunner_);
queryBuilder.assertResults({expected});
builder.singleAggregation({}, {fmt::format("spark_{}(c0)", agg)});
AssertQueryBuilder(builder.planNode()).assertResults({expected});
}
};

TEST_F(CentralMomentsAggregationTest, skewnessHasResult) {
auto agg = "skewness";
auto input = makeRowVector({makeFlatVector<int32_t>({1, 2})});
// Even when the count is 2, Spark still produces output.
auto expected =
makeRowVector({makeFlatVector<double>(std::vector<double>{0.0})});
testSkewnessResult(input, expected);
testCenteralMomentsAggResult(agg, input, expected);

input = makeRowVector({makeFlatVector<int32_t>({1, 1})});
expected = makeRowVector({makeNullableFlatVector<double>(
std::vector<std::optional<double>>{std::nullopt})});
testSkewnessResult(input, expected);
testCenteralMomentsAggResult(agg, input, expected);
}

TEST_F(CentralMomentsAggregationTest, pearsonKurtosis) {
auto agg = "kurtosis";
auto input = makeRowVector({makeFlatVector<int32_t>({1, 10, 100, 10, 1})});
auto expected = makeRowVector(
{makeFlatVector<double>(std::vector<double>{0.19432323191699075})});
testCenteralMomentsAggResult(agg, input, expected);

input = makeRowVector({makeFlatVector<int32_t>({-10, -20, 100, 1000})});
expected = makeRowVector(
{makeFlatVector<double>(std::vector<double>{-0.7014368047529627})});
testCenteralMomentsAggResult(agg, input, expected);

// Even when the count is 2, Spark still produces non-null result.
input = makeRowVector({makeFlatVector<int32_t>({1, 2})});
expected = makeRowVector({makeFlatVector<double>(std::vector<double>{-2.0})});
testCenteralMomentsAggResult(agg, input, expected);

// Output NULL when m2 equals 0.
input = makeRowVector({makeFlatVector<int32_t>({1, 1})});
expected = makeRowVector({makeNullableFlatVector<double>(
std::vector<std::optional<double>>{std::nullopt})});
testCenteralMomentsAggResult(agg, input, expected);
}

} // namespace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ int main(int argc, char** argv) {
{"first_ignore_null", nullptr},
{"max_by", nullptr},
{"min_by", nullptr},
{"skewness", nullptr}};
{"skewness", nullptr},
{"kurtosis", nullptr}};

size_t initialSeed = FLAGS_seed == 0 ? std::time(nullptr) : FLAGS_seed;
auto duckQueryRunner =
Expand All @@ -83,6 +84,10 @@ int main(int argc, char** argv) {
// algorithms.
// https://github.com/facebookincubator/velox/issues/4845
"skewness",
// Spark's kurtosis uses Pearson's formula for calculating the kurtosis
// coefficient. Meanwhile, DuckDB employs the sample kurtosis calculation
// formula. The results from the two methods are completely different.
"kurtosis",
});

using Runner = facebook::velox::exec::test::AggregationFuzzerRunner;
Expand Down

0 comments on commit 494b888

Please sign in to comment.