Skip to content

Commit

Permalink
[VL] Support ignoreNulls for NthValue window function (#3857)
Browse files Browse the repository at this point in the history
  • Loading branch information
PHILO-HE authored Nov 29, 2023
1 parent e0f197f commit 2528841
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,8 @@ object BackendSettings extends BackendSettingsApi {
case _ =>
}
windowExpression.windowFunction match {
// 'ignoreNulls=true' is not supported in Velox for 'NthValue'.
case _: RowNumber | _: AggregateExpression | _: Rank | _: CumeDist | _: DenseRank |
_: PercentRank | _ @NthValue(_, _, false) =>
_: PercentRank | _: NthValue =>
case _ =>
allSupported = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package io.glutenproject.execution
import io.glutenproject.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.{GenerateExec, RDDScanExec}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.{avg, col, udf}
Expand Down Expand Up @@ -197,16 +197,31 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
}

test("window expression") {
def assertWindowOffloaded: DataFrame => Unit = {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[WindowExecTransformer]
}) > 0)
}
}

Seq("sort", "streaming").foreach {
windowType =>
withSQLConf("spark.gluten.sql.columnar.backend.velox.window.type" -> windowType.toString) {
withSQLConf("spark.gluten.sql.columnar.backend.velox.window.type" -> windowType) {
runQueryAndCompare(
"select row_number() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }
" (partition by l_suppkey order by l_orderkey) from lineitem ") {
assertWindowOffloaded
}

runQueryAndCompare(
"select rank() over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }
" (partition by l_suppkey order by l_orderkey) from lineitem ") {
assertWindowOffloaded
}

runQueryAndCompare(
"select dense_rank() over" +
Expand All @@ -223,31 +238,38 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla
runQueryAndCompare(
"select l_suppkey, l_orderkey, nth_value(l_orderkey, 2) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") {
df =>
{
assert(
getExecutedPlan(df).count(
plan => {
plan.isInstanceOf[WindowExecTransformer]
}) > 0)
}
assertWindowOffloaded
}

runQueryAndCompare(
"select l_suppkey, l_orderkey, nth_value(l_orderkey, 2) IGNORE NULLS over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") {
assertWindowOffloaded
}

runQueryAndCompare(
"select sum(l_partkey + 1) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem") { _ => }
" (partition by l_suppkey order by l_orderkey) from lineitem") {
assertWindowOffloaded
}

runQueryAndCompare(
"select max(l_partkey) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }
" (partition by l_suppkey order by l_orderkey) from lineitem ") {
assertWindowOffloaded
}

runQueryAndCompare(
"select min(l_partkey) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }
" (partition by l_suppkey order by l_orderkey) from lineitem ") {
assertWindowOffloaded
}

runQueryAndCompare(
"select avg(l_partkey) over" +
" (partition by l_suppkey order by l_orderkey) from lineitem ") { _ => }
" (partition by l_suppkey order by l_orderkey) from lineitem ") {
assertWindowOffloaded
}
}
}
}
Expand Down
26 changes: 21 additions & 5 deletions cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::

// Parse measures and get the window expressions.
// Each measure represents one window expression.
bool ignoreNullKeys = false;
bool ignoreNulls = false;
std::vector<core::WindowNode::Function> windowNodeFunctions;
std::vector<std::string> windowColumnNames;

Expand All @@ -613,9 +613,25 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
const auto& windowFunction = smea.measure();
std::string funcName = SubstraitParser::findVeloxFunction(functionMap_, windowFunction.function_reference());
std::vector<core::TypedExprPtr> windowParams;
windowParams.reserve(windowFunction.arguments().size());
for (const auto& arg : windowFunction.arguments()) {
windowParams.emplace_back(exprConverter_->toVeloxExpr(arg.value(), inputType));
auto& argumentList = windowFunction.arguments();
windowParams.reserve(argumentList.size());
// For functions in kOffsetWindowFunctions (see Spark OffsetWindowFunctions),
// we expect the last arg is passed for setting ignoreNulls.
if (kOffsetWindowFunctions.find(funcName) != kOffsetWindowFunctions.end()) {
int i = 0;
for (; i < argumentList.size() - 1; i++) {
windowParams.emplace_back(exprConverter_->toVeloxExpr(argumentList[i].value(), inputType));
}
auto constantTypedExpr = exprConverter_->toVeloxExpr(argumentList[i].value().literal());
auto variant = constantTypedExpr->value();
if (!variant.hasValue()) {
VELOX_FAIL("Value is expected in variant for setting ignoreNulls.");
}
ignoreNulls = variant.value<bool>();
} else {
for (const auto& arg : argumentList) {
windowParams.emplace_back(exprConverter_->toVeloxExpr(arg.value(), inputType));
}
}
auto windowVeloxType = SubstraitParser::parseType(windowFunction.output_type());
auto windowCall = std::make_shared<const core::CallTypedExpr>(windowVeloxType, std::move(windowParams), funcName);
Expand All @@ -626,7 +642,7 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::toVeloxPlan(const ::substrait::
windowColumnNames.push_back(windowFunction.column_name());

windowNodeFunctions.push_back(
{std::move(windowCall), createWindowFrame(lowerBound, upperBound, type), ignoreNullKeys});
{std::move(windowCall), createWindowFrame(lowerBound, upperBound, type), ignoreNulls});
}

// Construct partitionKeys
Expand Down
3 changes: 3 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

namespace gluten {

// Holds names of Spark OffsetWindowFunctions.
static const std::unordered_set<std::string> kOffsetWindowFunctions = {"nth_value"};

struct SplitInfo {
/// Whether the split comes from arrow array stream node.
bool isStream = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,15 @@ trait SparkPlanExecApi {
frame.frameType.sql
)
windowExpressionNodes.add(windowFunctionNode)
case wf @ NthValue(input, offset: Literal, _) =>
case wf @ NthValue(input, offset: Literal, ignoreNulls: Boolean) =>
val frame = wExpression.windowSpec.frameSpecification.asInstanceOf[SpecifiedWindowFrame]
val childrenNodeList = new JArrayList[ExpressionNode]()
childrenNodeList.add(
ExpressionConverter
.replaceWithExpressionTransformer(input, attributeSeq = originalInputAttributes)
.doTransform(args))
childrenNodeList.add(LiteralTransformer(offset).doTransform(args))
childrenNodeList.add(LiteralTransformer(Literal(ignoreNulls)).doTransform(args))
val windowFunctionNode = ExpressionBuilder.makeWindowFunction(
WindowFunctionsBuilder.create(args, wf).toInt,
childrenNodeList,
Expand Down

0 comments on commit 2528841

Please sign in to comment.