Skip to content

Commit

Permalink
Add min, max Spark aggregate functions (facebookincubator#9868)
Browse files Browse the repository at this point in the history
Summary:
There are two semantic differences between Presto and Spark.
1. Nested NULLs are compared as values in Spark and as "unknown value" in
Presto.
2. The timestamp type represents a time instant in microsecond precision in
Spark, but millisecond precision in Presto.

Therefore, we need to implement min and max functions for Spark. In this PR,
1. Move Presto `min` and `max` aggregation function implements to lib folder.
2. Add `getMinFunctionFactory` and `getMaxFunctionFactory`  which allow callers
to register max & min functions with different behaviors.

Pull Request resolved: facebookincubator#9868

Reviewed By: mbasmanova

Differential Revision: D60051468

Pulled By: kevinwilfong

fbshipit-source-id: 1f056420d6909174a35d336e4e1b413a87ef7665
  • Loading branch information
zhli1142015 authored and facebook-github-bot committed Jul 24, 2024
1 parent 709dab2 commit e5671c0
Show file tree
Hide file tree
Showing 16 changed files with 1,157 additions and 583 deletions.
10 changes: 10 additions & 0 deletions velox/docs/functions/spark/aggregate.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ General Aggregate Functions
Returns the last non-null value of `x`.

.. spark:function:: max(x) -> [same as x]
Returns the maximum value of ``x``.
``x`` must be an orderable type.

.. spark:function:: max_by(x, y) -> [same as x]
Returns the value of `x` associated with the maximum value of `y`.
Expand All @@ -97,6 +102,11 @@ General Aggregate Functions

Returns c

.. spark:function:: min(x) -> [same as x]
Returns the minimum value of ``x``.
``x`` must be an orderable type.

.. spark:function:: min_by(x, y) -> [same as x]
Returns the value of `x` associated with the minimum value of `y`.
Expand Down
2 changes: 2 additions & 0 deletions velox/functions/lib/aggregates/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ velox_add_library(
velox_functions_aggregates
AverageAggregateBase.cpp
CentralMomentsAggregatesBase.cpp
Compare.cpp
MinMaxAggregateBase.cpp
SingleValueAccumulator.cpp
ValueList.cpp
ValueSet.cpp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,20 @@
* limitations under the License.
*/

#include "velox/functions/prestosql/aggregates/Compare.h"
#include "velox/functions/lib/aggregates/Compare.h"

using namespace facebook::velox::functions::aggregate;

namespace facebook::velox::aggregate::prestosql {
namespace facebook::velox::functions::aggregate {

int32_t compare(
const SingleValueAccumulator* accumulator,
const DecodedVector& decoded,
vector_size_t index) {
vector_size_t index,
CompareFlags::NullHandlingMode nullHandlingMode) {
static const CompareFlags kCompareFlags{
true, // nullsFirst
true, // ascending
false, // equalsOnly
CompareFlags::NullHandlingMode::kNullAsIndeterminate};
nullHandlingMode};

auto result = accumulator->compare(decoded, index, kCompareFlags);
VELOX_USER_CHECK(
Expand All @@ -38,4 +37,4 @@ int32_t compare(
mapTypeKindToName(decoded.base()->typeKind())));
return result.value();
}
} // namespace facebook::velox::aggregate::prestosql
} // namespace facebook::velox::functions::aggregate
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@
#include "velox/functions/lib/aggregates/SingleValueAccumulator.h"
#include "velox/vector/DecodedVector.h"

namespace facebook::velox::aggregate::prestosql {
namespace facebook::velox::functions::aggregate {

/// Compare the new value of the DecodedVector at the given index with the value
/// stored in the SingleValueAccumulator. Returns 0 if stored and new values are
/// equal; <0 if stored value is less then new value; >0 if stored value is
/// greater than new value.
///
/// The default nullHandlingMode in Presto is StopAtNull so it will throw an
/// exception when complex type values contain nulls.
/// If nullHandlingMode is NullAsValue, nested nulls are handled as value. If
/// nullHandlingMode is StopAtNull, it will throw an exception when complex
/// type values contain nulls.
/// Note, The default nullHandlingMode in Presto is StopAtNull while the
/// default nullHandlingMode is NullAsValue in Spark.
int32_t compare(
const velox::functions::aggregate::SingleValueAccumulator* accumulator,
const DecodedVector& decoded,
vector_size_t index);
} // namespace facebook::velox::aggregate::prestosql
vector_size_t index,
CompareFlags::NullHandlingMode nullHandlingMode);
} // namespace facebook::velox::functions::aggregate
Loading

0 comments on commit e5671c0

Please sign in to comment.