Skip to content

Commit

Permalink
Support HiveGenericUDF
Browse files Browse the repository at this point in the history
  • Loading branch information
WangGuangxin committed Mar 20, 2024
1 parent 6514392 commit 727a9f7
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.execution.{ScalarSubquery, _}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.hive.HiveSimpleUDFTransformer
import org.apache.spark.sql.hive.HiveUDFTransformer
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -109,8 +109,8 @@ object ExpressionConverter extends SQLConfHelper with Logging {
return replacePythonUDFWithExpressionTransformer(p, attributeSeq, expressionsMap)
case s: ScalaUDF =>
return replaceScalaUDFWithExpressionTransformer(s, attributeSeq, expressionsMap)
case _ if HiveSimpleUDFTransformer.isHiveSimpleUDF(expr) =>
return HiveSimpleUDFTransformer.replaceWithExpressionTransformer(expr, attributeSeq)
case _ if HiveUDFTransformer.isHiveUDF(expr) =>
return HiveUDFTransformer.replaceWithExpressionTransformer(expr, attributeSeq)
case _ =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,37 @@ import org.apache.spark.sql.catalyst.expressions._

import java.util.Locale

object HiveSimpleUDFTransformer {
def isHiveSimpleUDF(expr: Expression): Boolean = {
object HiveUDFTransformer {
def isHiveUDF(expr: Expression): Boolean = {
expr match {
case _: HiveSimpleUDF => true
case _: HiveSimpleUDF | _: HiveGenericUDF => true
case _ => false
}
}

def replaceWithExpressionTransformer(
expr: Expression,
attributeSeq: Seq[Attribute]): ExpressionTransformer = {
if (!isHiveSimpleUDF(expr)) {
throw new UnsupportedOperationException(s"Expression $expr is not a HiveSimpleUDF")
val udfName = expr match {
case s: HiveSimpleUDF =>
s.name.stripPrefix("default.")
case g: HiveGenericUDF =>
g.name.stripPrefix("default.")
case _ =>
throw new UnsupportedOperationException(
s"Expression $expr is not a HiveSimpleUDF or HiveGenericUDF")
}

val udf = expr.asInstanceOf[HiveSimpleUDF]
val substraitExprName =
UDFMappings.hiveUDFMap.get(udf.name.stripPrefix("default.").toLowerCase(Locale.ROOT))
substraitExprName match {
UDFMappings.hiveUDFMap.get(udfName.toLowerCase(Locale.ROOT)) match {
case Some(name) =>
GenericExpressionTransformer(
name,
ExpressionConverter.replaceWithExpressionTransformer(udf.children, attributeSeq),
udf)
ExpressionConverter.replaceWithExpressionTransformer(expr.children, attributeSeq),
expr)
case _ =>
throw new UnsupportedOperationException(
s"Not supported hive simple udf:$udf"
+ s" name:${udf.name} hiveUDFMap:${UDFMappings.hiveUDFMap}")
s"Not supported hive udf:$expr"
+ s" name:$udfName hiveUDFMap:${UDFMappings.hiveUDFMap}")
}
}
}

0 comments on commit 727a9f7

Please sign in to comment.