Skip to content

Spark_SQL_macros

Harish Butani edited this page Jan 13, 2022 · 5 revisions

This is a generic Spark capability developed by us.

Spark SQL Macros provide a capability to register custom functions into a Spark Session that is similar to custom UDF Registration capability of Spark. The difference is that the SQL macros registration mechanism attempts to generate an equivalent Spark catalyst expression for the function body.

We explain the capability with an example.

Vanilla Spark behavior

Function registration

A custom function can be registered, in the following way. Consider a very simple function that adds 2 as its argument.

  spark.udf.register("intUDF", (i: Int) => {
       val j = 2
       i + j
      })

Under the covers an Invoke Catalyst Expression is associated with the function name in Spark's Function Registry. At runtime, an Invoke Catalyst Expression runs the associated function body.

spark-func-reg

Spark Plan containing a custom Spark function

Then, the following spark-sql query (assuming sparktest.unit_test has a column c_int : Int):

  select intUDF(c_int)
  from sparktest.unit_test
  where intUDF(c_int) < 0

generates the following physical plan:


|== Physical Plan ==
Project (3)
+- * Filter (2)
   +- BatchScan (1)


(1) BatchScan
Output [1]: [C_INT#2271]
OraPlan: 00 OraSingleQueryBlock [C_INT#2271], [oracolumnref(C_INT#2271)]
01 +- OraTableScan SPARKTEST.UNIT_TEST, [C_INT#2271]
ReadSchema: struct<C_INT:int>
dsKey: DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
oraPushdownSQL: select "C_INT"
from SPARKTEST.UNIT_TEST

(2) Filter [codegen id : 1]
Input [1]: [C_INT#2271]
Condition : (if (isnull(C_INT#2271)) null else intUDF(knownnotnull(C_INT#2271)) < 0)

(3) Project [codegen id : 1]
Output [1]: [if (isnull(C_INT#2271)) null else intUDF(knownnotnull(C_INT#2271)) AS intUDF(c_int)#2278]
Input [1]: [C_INT#2271]

Plan charateristics

  • The intUDF is invoked in the Filter operator for evaluating the intUDF(c_int) < 0 predicate;
  • The intUDF is invoked in the Project operator to evaluate the projection intUDF(c_int)

Logically the evaluation of these function calls involves moving values between Catalyst and Scala and making a function call on the JVM. Catalyst CodeGen (and Java JIT) does a good job of optimizing away the Serde and function calls for simple functions; but in general this cannot be avoided. Besides since the function code is a black box, no further optimization, like expression pushdown to Oracle is possible. spark-cust-func-eval

The intUDF is a trivial function that just adds 2 to its argument. It would be nice if we convert the function body into a + 2 expressions.

Spark SQL Macro behavior

With Spark SQL Macros you can register the function as a macro like this:

import org.apache.spark.sql.defineMacros._

spark.registerMacro("intUDM", spark.udm((i: Int) => {
   val j = 2
   i + j
  }))

This is almost identical to the registration process for custom functions in Spark. But under the covers, we leverage the Macro mechanics of the Scala Compiler to analyze the Scala AST of the function body and try to generate an equivalent Catalyst Expression for the function body.

spark-macro-reg

If we succeed then what is registered in the Function Registry is a plain old Catalyst Expression with holes that get replaced with argument expressions at any call-site of the function.

Spark Plan containing a Spark SQL macro

The query:

  select intUDM(c_int)
  from sparktest.unit_test
  where intUDM(c_int) < 0

generates the following physical plan:

|== Physical Plan ==
Project (2)
+- BatchScan (1)


(1) BatchScan
Output [1]: [(c_int + 2)#2316]
OraPlan: 00 OraSingleQueryBlock [(C_INT#2309 + 2) AS (c_int + 2)#2316], [oraalias((C_INT#2309 + 2) AS (c_int + 2)#2316)], orabinaryopexpression((((C_INT#2309 + 2) < 0) AND isnotnull(C_INT#2309)))
01 +- OraTableScan SPARKTEST.UNIT_TEST, [C_INT#2309]
ReadSchema: struct<(c_int + 2):int>
dsKey: DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
oraPushdownBindValues: 2, 0
oraPushdownSQL: select ("C_INT" + 2) AS "(c_int + 2)"
from SPARKTEST.UNIT_TEST
where ((("C_INT" + ?) < ?) AND "C_INT" IS NOT NULL)

(2) Project [codegen id : 1]
Output [1]: [(c_int + 2)#2316]
Input [1]: [(c_int + 2)#2316]

Plan charateristics

  • The predicate intUDM(c_int) < 0 becomes ("C_INT" + ?) < ? (the literal in a predicate is converted to a bind value)
  • The projection intUDM(c_int) becomes "C_INT" + 2.
  • Since macro calls are just plain old catalyst expressions, The project and filters are pushable to Oracle. So the entire query is collapsed into an Oracle scan.

Spark Macro: initial scala translation support

We will provide translation support for the following Scala constructs:

  • Primitive datatypes, Case Classes in Scala, Tuples, Arrays, Collections and Maps
    • For Tuples and Case Classes we will translate field access
  • Value Definitions
  • Arbitrary References at the macro call-site that can be evaluated at macro definition time
  • Arithmetic, Math, Datetime, String, and Decimal functions supported in Catalyst
  • Recursive macro invocations
  • Scala Case and If statements.

See Spark SQL Macro examples page.

In addition to supporting user-defined macros, we also plan to provide a way such that argument functions provided to DataFrame map and flatMap high-order functions get converted to equivalent catalyst expressions.

Note on Performance of SQL Macros

There are many cases where Spark SQL macros based plans will perform better than equivalent user-defined functions.

Obviously as we demonstrated in the Macro behavior section Spark macro plans enable more pushdown processing to the data source. In these cases by-and-large the Macro based plan will provide a lot of performance gain.

But even without the advantage of pushdown macros, plans can perform better because the serialization-deserialization at function call boundaries is avoided; also the function call itself is avoided.

Consider the following calculation of tax and discount:

Let's define a Product data set:

case class Product(prod : String, prodCat : String, amt : Double)

val prods = for(i <- (0 until 1000000)) yield {
   Product(s"p_$i", {val m = i % 3; if (m == 0) "alcohol" else if (i == 1) "grocery" else "rest"}, (i % 200).toDouble)
 }

 val prodDF = spark.createDataset(prods).coalesce(1).cache
 prodDF.count

 prodDF.createOrReplaceGlobalTempView("products")

For which tax and discount is calculated as:

  • No tax on groceries, alcohol is 10.5%, everything else is 9.5%
  • On Sundays give a discount of 5% on alcohol.

We can define this logic as a Spark UDF and as a Spark SQL Macro:

// function registration
spark.udf.register("taxAndDiscountF", {(prodCat : String, amt : Double) =>
  import org.apache.spark.sql.catalyst.util.DateTimeUtils._

  val taxRate = prodCat match {
    case "grocery" => 0.0
    case "alcohol" => 10.5
    case _ => 9.5
  }
  val currDate = currentDate(ZoneId.systemDefault())
  val discount = if (getDayOfWeek(currDate) == 1 && prodCat == "alcohol") 0.05 else 0.0

  amt * ( 1.0 - discount) * (1.0 + taxRate)
})

// macro registration
import org.apache.spark.sql.defineMacros._
import org.apache.spark.sql.sqlmacros.DateTimeUtils._
import java.time.ZoneId

spark.registerMacro("taxAndDiscountM", spark.udm({(prodCat : String, amt : Double) =>
  val taxRate = prodCat match {
    case "grocery" => 0.0
    case "alcohol" => 10.5
    case _ => 9.5
  }
  val currDate = currentDate(ZoneId.systemDefault())
  val discount = if (getDayOfWeek(currDate) == 1 && prodCat == "alcohol") 0.05 else 0.0

  amt * ( 1.0 - discount) * (1.0 + taxRate)
}))

Now consider the 2 queries:

// queries
val macroBasedResDF = sql("select prod, taxAndDiscountM(prod, amt) from  global_temp.products")
val funcBasedResDF =  sql("select prod, taxAndDiscountF(prod, amt) from  global_temp.products")

val funcBasedRes = funcBasedResDF.collect
val macroBasedRes = macroBasedResDF.collect

In our testing we found the task times for the macro based query to be around 2-3 faster than the function based query

Design Notes

Injection of Static Values

We allow macro call-site static values to be used in the macro code. These values need to be translated to catalyst expression trees. Spark's org.apache.spark.sql.catalyst.ScalaReflection already provides a mechanism for inferring and converting to catalyst expressions (via org.apache.spark.sql.catalyst.encoders.ExpressionEncoders) values for supported types. We leverage this mechanism. But in order to leverage it we need to stand-up a runtime Universe inside the macro invocation. This is fine because SQLMacro is invoked in an environment that has all the Spark classes in the classpath. The only issue is that we cannot use the Thread Classloader of the Macro invocation. For this reason MacrosScalaReflection is a copy of org.apache.spark.sql.catalyst.ScalaReflection with its mirror setup on org.apache.spark.util.Utils.getSparkClassLoader

Transferring Catalyst Expression Tree by Serialization

Instead of developing a new builder capability to construct macro universe trees for catalyst expressions, we directly construct catalyst expressions. To lift these catalyst expressions back to the runtime world we use the serialization mechanism of catalyst expressions. So the SQLMacroExpressionBuilder is constructed with the serialized form of the catalyst expression. In the runtime world this serialized form is deserialized and on macro invocation MacroArg positions are replaced with the catalyst expressions at the invocation site.

Writing macros FAQ

Macro translation fails when referring to terms around the call-site

The following macro definition doesn't get translated.

val a = Array(5)
spark.registerMacro("intUDM", spark.udm((i: Int) => {
  val j = a(0)
  i + j
}))
  • Macro translation happens during the compilation of the above code snippet.
    • At Macro translation time the value of a is unknown
  • For identifiers that are not macro parameters or are well-known symbols (such as java.lang.Math.abs) SQL expression translation attempts to replace them with their value at macro compilation time by issuing a macro context eval. This fails, and so the overall macro translation falls back to registering a user-defined function.