Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
PHILO-HE committed Mar 20, 2024
1 parent 6514392 commit 55fb67f
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -529,26 +529,11 @@ trait SparkPlanExecApi {
attributeSeq = originalInputAttributes)
.doTransform(args))
// Spark only accepts foldable offset. Converts it to LongType literal.
var offset = offsetWf.offset.eval(EmptyRow).asInstanceOf[Int]
if (wf.isInstanceOf[Lead]) {
if (offset < 0) {
// Velox always expects non-negative offset.
throw new UnsupportedOperationException(
s"${wf.nodeName} does not support negative offset: $offset")
}
} else {
// For Lag
// Spark would use `-inputOffset` as offset, so here we forbid positive offset.
// Which means the inputOffset is negative.
if (offset > 0) {
// Velox always expects non-negative offset.
throw new UnsupportedOperationException(
s"${wf.nodeName} does not support negative offset: $offset")
}
// Revert the Spark change and use the original input offset
offset = -offset
}
val offsetNode = ExpressionBuilder.makeLiteral(offset.toLong, LongType, false)
val offset = offsetWf.offset.eval(EmptyRow).asInstanceOf[Int]
// Velox only allows negative offset. WindowFunctionsBuilder#create converts
// lag/lead with negative offset to the function with positive offset. So just
// makes offsetNode store positive value.
val offsetNode = ExpressionBuilder.makeLiteral(Math.abs(offset.toLong), LongType, false)
childrenNodeList.add(offsetNode)
// NullType means Null is the default value. Don't pass it to native.
if (offsetWf.default.dataType != NullType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ case class WindowExecTransformer(
val childCtx = child.asInstanceOf[TransformSupport].doTransform(context)
val operatorId = context.nextOperatorId(this.nodeName)
if (windowExpression == null || windowExpression.isEmpty) {
// The computing for this project is not needed.
// The computing for this operator is not needed.
context.registerEmptyRelToOperator(operatorId)
return childCtx
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@ package io.glutenproject.expression

import io.glutenproject.exception.GlutenNotSupportException
import io.glutenproject.expression.ConverterUtils.FunctionConfig
import io.glutenproject.expression.ExpressionNames.{LAG, LEAD}
import io.glutenproject.substrait.expression.ExpressionBuilder

import org.apache.spark.sql.catalyst.expressions.{Expression, WindowExpression, WindowFunction}
import org.apache.spark.sql.catalyst.expressions.{EmptyRow, Expression, Lag, Lead, WindowExpression, WindowFunction}

import scala.util.control.Breaks.{break, breakable}

object WindowFunctionsBuilder {
def create(args: java.lang.Object, windowFunc: WindowFunction): Long = {
val functionMap = args.asInstanceOf[java.util.HashMap[String, java.lang.Long]]
val substraitFunc = ExpressionMappings.expressionsMap.get(windowFunc.getClass)
val substraitFunc = windowFunc match {
// Handle lag with negative inputOffset, e.g., converts lag(c1, -1) to lead(c1, 1).
// Spark uses `-inputOffset` as `offset` for Lag function.
case lag: Lag if lag.offset.eval(EmptyRow).asInstanceOf[Int] > 0 =>
Some(LEAD)
// Handle lead with negative offset, e.g., converts lead(c1, -1) to lag(c1, 1).
case lead: Lead if lead.offset.eval(EmptyRow).asInstanceOf[Int] < 0 =>
Some(LAG)
case _ =>
ExpressionMappings.expressionsMap.get(windowFunc.getClass)
}
if (substraitFunc.isEmpty) {
throw new GlutenNotSupportException(
s"not currently supported: ${windowFunc.getClass.getName}.")
Expand Down

0 comments on commit 55fb67f

Please sign in to comment.