Skip to content

Commit

Permalink
[CORE] Add support for Spark url_decode function (#5070)
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyangxiaozhu authored Mar 22, 2024
1 parent 01c9813 commit 63e83bd
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ object CHExpressionUtil {
DATE_FROM_UNIX_DATE -> DefaultValidator(),
MONOTONICALLY_INCREASING_ID -> DefaultValidator(),
SPARK_PARTITION_ID -> DefaultValidator(),
URL_DECODE -> DefaultValidator(),
SKEWNESS -> DefaultValidator(),
BIT_LENGTH -> DefaultValidator()
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,21 @@ class VeloxFunctionsValidateSuite extends VeloxWholeStageTransformerSuite {
}
}

testWithSpecifiedSparkVersion("Test url_decode function", Some("3.4.2")) {
withTempPath {
path =>
Seq("https%3A%2F%2Fspark.apache.org")
.toDF("a")
.write
.parquet(path.getCanonicalPath)
spark.sparkContext.setLogLevel("info")
spark.read.parquet(path.getCanonicalPath).createOrReplaceTempView("url_tbl")
runQueryAndCompare("select url_decode(a) from url_tbl") {
checkOperatorMatch[ProjectExecTransformer]
}
}
}

test("Test hex function") {
runQueryAndCompare("SELECT hex(l_partkey), hex(l_shipmode) FROM lineitem limit 1") {
checkOperatorMatch[ProjectExecTransformer]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.execution.{ScalarSubquery, _}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
Expand Down Expand Up @@ -115,6 +116,19 @@ object ExpressionConverter extends SQLConfHelper with Logging {
return replaceScalaUDFWithExpressionTransformer(s, attributeSeq, expressionsMap)
case _ if HiveUDFTransformer.isHiveUDF(expr) =>
return HiveUDFTransformer.replaceWithExpressionTransformer(expr, attributeSeq)
case i: StaticInvoke =>
val objectName = i.staticObject.getName.stripSuffix("$")
if (objectName.endsWith("UrlCodec")) {
val child = i.arguments(0)
i.functionName match {
case "decode" =>
return GenericExpressionTransformer(
ExpressionNames.URL_DECODE,
child.map(
replaceWithExpressionTransformerInternal(_, attributeSeq, expressionsMap)),
i)
}
}
case _ =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ object ExpressionNames {

// URL functions
final val PARSE_URL = "parse_url"
final val URL_DECODE = "url_decode"

// SparkSQL Math functions
final val ABS = "abs"
Expand Down

0 comments on commit 63e83bd

Please sign in to comment.