Skip to content

Language Integration

Harish Butani edited this page Feb 2, 2021 · 8 revisions

[[TOC]]

Language Integration encompasses capabilities that extend Apache Spark such that on the one hand native Oracle functions and data structures are usable in Spark expressions (through the mechanism of user defined registration of functions/types/operators); and the other hand support for translation of custom code into equivalent oracle sql/pl-sql so that larger parts of Spark pipelines are pushed down to the Oracle DB for processing.

Registration and Use of Oracle Native Functions(Row and Aggregate) in Spark

Oracle provides a large library of SQL and PL-SQL functions. On data processed in the Oracle DB users should be able to use these functions in their queries. For example the function SYS_CONTEXT from the SATNDARD pl-sql package should be usable in Spark SQL queries that are on Oracle tables. We enable this by providing an implicit extension of the SparkSession. One of the extension functions is the ability to register oracle functions.

import org.apache.spark.sql.oracle._
sparkSession.registerOracleFunction(Some("STANDARD"), "SYS_CONTEXT")

Now users can use the sys_context function like this:

select oracle.sys_context('USERENV', 'CLIENT_PROGRAM_NAME') ora_client_pgm
from sparktest.unit_test
limit 1;

Function usage is only supported for operations that are pushed to the Oracle database. In other cases a UnsupportedOperationException is thrown with 'Cannot generate code for expression..' error. This is because the oracle native function call expression is resolved into a OraNativeRowFuncInvoke(see below for details) that cannot be evaluated in Spark, it is only a marker for the oracle pushdown planner extensions. So the following will throw an error:

set spark.sql.oracle.enable.pushdown=false
select oracle.sys_context('USERENV', 'CLIENT_PROGRAM_NAME') ora_client_pgm
from sparktest.unit_test
limit 1;

java.lang.UnsupportedOperationException: Cannot generate code for expression: oranativerowfuncinvoke(name=STANDARD....

Under the covers when a registerOracleFunction is issued we extract the function defintion from the ALL_PROCEDURES and ALL_ARGUMENTS oracle dictionary views. Oracle functions can be overloaded; so we capture all supported signatures into an internal OraFuncDef structure. Only signatures that meet the following criteria are supported:

  • return type and argument types must be one of the supported types, see OraDataType.
  • only IN arguments are allowed.
  • for a given overloading and argument position only 1 entry must exist in the ALL_ARGUMENTStable. TheregisterOracleFunctionoutputs theOraFuncDef` details, for example:
name=STANDARD.SYS_CONTEXT,isAggregate=false
  subProgramId=260, retType=StringType, args={NAMESPACE:StringType,ATTRIBUTE:StringType}
  subProgramId=261, retType=StringType, args={NAMESPACE:StringType,ATTRIBUTE:StringType,NEWOPTIONAL:StringType}

This shows what overloaded signatures(subProgramIds from ALL_PROCEDURES) are made available in Spark.

We then register a user defined function with Spark(see Spark FunctionRegistry). By default the function is registered with the FunctionIdentifier(name = {oraFuncName}, database = "oracle"). So in Spark expressions refer the function as oracle.{oraFuncName}. A different name can be provided for the function by providing a override such as: sparkSession.registerOracleFunction(Some("STANDARD"), "SYS_CONTEXT", Some("ora_context")) In this case refer to the function with oracle.ora_context.

When resolving a particular function invocation our registered FunctionBuilder is invoked. This sets up a OraNativeRowFuncInvoke expression by choosing the first signature whose argument datatypes can be cast to the child expressions at the call-site.

As a convenience multiple functions can be registered in one call using registerOracleFunctions. For example: sparkSession.registerOracleFunction(Some("STANDARD"), "USER", ("SYS_CONTEXT", "ORA_CONTEXT"). Each argument can be string or a tuple of the form (oracle_func_name, name_in_spark).

Aggregate Functions: Oracle Native Aggregate functions can be registered in a similar fashion, for example:

-- StringAggregate concatenates all String values in a group
sparkSession.registerOracleFunction(None, "STRAGG")

sparkSession.sql(
"""
      |select c_char_5, oracle.stragg(c_char_1)
      |from sparktest.unit_test
      |group by c_char_5""".stripMargin
).show()

Oracle Types and DB-Options

  • ML
  • Geospatial

Table Functions

Spark Macros: translation of Scala code into Catalyst Expressions

This is a generic Spark capability developed by us.

Spark SQL Macros provide a capability to register custom functions into a Spark Session that is similar to custom UDF Registration capability of Spark. The difference being the SQL Macros registration mechanism attempts to generate an equivalent Spark catalyst Expression for the function body.

We explain the capability with an example.

Vanilla Spark behavior

Function registration

A custom function can be registered, in the following way. Consider a very simple function that adds 2 its argument.

  spark.udf.register("intUDF", (i: Int) => {
       val j = 2
       i + j
      })

Under the covers a Invoke Catalyst Expression is associated with the function name in Spark's Function Registry. At runtime an Invoke Catalyst Expression runs the associated function body.

spark-func-reg

Spark Plan containing a custom Spark function

Then the following spark-sql query(assuming sparktest.unit_test has a column c_int : Int):

  select intUDF(c_int)
  from sparktest.unit_test
  where intUDF(c_int) < 0

generates the following physical plan:


|== Physical Plan ==
Project (3)
+- * Filter (2)
   +- BatchScan (1)


(1) BatchScan
Output [1]: [C_INT#2271]
OraPlan: 00 OraSingleQueryBlock [C_INT#2271], [oracolumnref(C_INT#2271)]
01 +- OraTableScan SPARKTEST.UNIT_TEST, [C_INT#2271]
ReadSchema: struct<C_INT:int>
dsKey: DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
oraPushdownSQL: select "C_INT"
from SPARKTEST.UNIT_TEST

(2) Filter [codegen id : 1]
Input [1]: [C_INT#2271]
Condition : (if (isnull(C_INT#2271)) null else intUDF(knownnotnull(C_INT#2271)) < 0)

(3) Project [codegen id : 1]
Output [1]: [if (isnull(C_INT#2271)) null else intUDF(knownnotnull(C_INT#2271)) AS intUDF(c_int)#2278]
Input [1]: [C_INT#2271]

Plan charateristics

  • The intUDF is invoked in the Filter operator for evaluating the intUDF(c_int) < 0 predicate;
  • The intUDF is invoked in the Project operator to evaluate the projection intUDF(c_int)

Logically the evaluation of these function calls involves moving values between Catalyst and Scala and making a function call on the JVM. Catalyst CodeGen(and Java JIT) does a good job of optimizing away the Serde and function calls for simple functions; but in general this cannot be avoided. Besides since the function code is a blackbox no further optimizaion, like expression pushdown to Oracle is possible. spark-cust-func-eval

The intUDF is a trivial function that just adds 2 to its argument. It would be nice if we convert the function body into a + 2 expressions.

Spark SQL Macro behavior

With Spark SQL Macros you can register the function as a macro like this:

import org.apache.spark.sql.defineMacros._

spark.registerMacro("intUDM", spark.udm((i: Int) => {
   val j = 2
   i + j
  }))

This is almost identical to the registeration process for custom functions in Spark. But under the covers, we leverage the Macro mechanics of the Scala Compiler to analyze the Scala AST of the function body and try to generate an equivalent Catalyst Expression for the function body.

spark-macro-reg

If we succeed then what is registered in the Function Registry is a plain old Catalyst Expression with holes that get replaced with argument expressions at any call-site of the function.

Spark Plan containing a Spark SQL macro

The query:

  select intUDM(c_int)
  from sparktest.unit_test
  where intUDM(c_int) < 0

generates the following physical plan:

|== Physical Plan ==
Project (2)
+- BatchScan (1)


(1) BatchScan
Output [1]: [(c_int + 2)#2316]
OraPlan: 00 OraSingleQueryBlock [(C_INT#2309 + 2) AS (c_int + 2)#2316], [oraalias((C_INT#2309 + 2) AS (c_int + 2)#2316)], orabinaryopexpression((((C_INT#2309 + 2) < 0) AND isnotnull(C_INT#2309)))
01 +- OraTableScan SPARKTEST.UNIT_TEST, [C_INT#2309]
ReadSchema: struct<(c_int + 2):int>
dsKey: DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
oraPushdownBindValues: 2, 0
oraPushdownSQL: select ("C_INT" + 2) AS "(c_int + 2)"
from SPARKTEST.UNIT_TEST
where ((("C_INT" + ?) < ?) AND "C_INT" IS NOT NULL)

(2) Project [codegen id : 1]
Output [1]: [(c_int + 2)#2316]
Input [1]: [(c_int + 2)#2316]

Plan charateristics

  • The predicate intUDM(c_int) < 0 becomes ("C_INT" + ?) < ? (the literal in a predicate is converted to a bind value)
  • the projection intUDM(c_int) becomes "C_INT" + 2.
  • since Macro calls are just plain old catalyst expressions, The Project and Filters are pushable to Oracle. So the entire query is collapsed into an Oracle Scan.

Spark Macro: planned initial scala translation

We plan translation support for the following scala constructs

  • Primitive datatypes, Case Classes in Scala, Arrays, Collections and Maps
  • Value Definitions
  • Arbitrary References at the macro call-site that can be evaluated at macro definition time
  • Arithmetic, Datetime, String, decimal functions supported in Catalyst
  • Scala Case and If statements.

In addition to supporting User-Defined macros, we also plan to provide a way such that argument functions provided to DataFrame map and flatMap high-order functions get converted to equivalent catalyst expressions.