-
Notifications
You must be signed in to change notification settings - Fork 10
Language Integration
[[TOC]]
Language Integration encompasses capabilities that extend Apache Spark such that on the one hand native Oracle functions and data structures are usable in Spark expressions (through the mechanism of user defined registration of functions/types/operators); and the other hand support for translation of custom code into equivalent oracle sql/pl-sql so that larger parts of Spark pipelines are pushed down to the Oracle DB for processing.
Oracle provides a large library of SQL
and PL-SQL functions.
On data processed in the Oracle DB users should be able to use these functions in their queries.
For example the function SYS_CONTEXT
from the SATNDARD
pl-sql package should be
usable in Spark SQL queries that are on Oracle tables.
We enable this by providing an implicit extension of the SparkSession
. One of the extension
functions is the ability to register oracle functions.
import org.apache.spark.sql.oracle._
sparkSession.registerOracleFunction(Some("STANDARD"), "SYS_CONTEXT")
Now users can use the sys_context
function like this:
select oracle.sys_context('USERENV', 'CLIENT_PROGRAM_NAME') ora_client_pgm
from sparktest.unit_test
limit 1;
Function usage is only supported for operations that are pushed to the Oracle database.
In other cases a UnsupportedOperationException
is thrown with 'Cannot generate code for expression..'
error. This is because the oracle native function call expression is resolved into a
OraNativeRowFuncInvoke
(see below for details) that cannot be evaluated in Spark, it is only a marker
for the oracle pushdown planner extensions. So the following will throw an error:
set spark.sql.oracle.enable.pushdown=false
select oracle.sys_context('USERENV', 'CLIENT_PROGRAM_NAME') ora_client_pgm
from sparktest.unit_test
limit 1;
java.lang.UnsupportedOperationException: Cannot generate code for expression: oranativerowfuncinvoke(name=STANDARD....
Under the covers when a registerOracleFunction
is issued we extract the function defintion
from the ALL_PROCEDURES and ALL_ARGUMENTS oracle dictionary views.
Oracle functions can be overloaded; so we capture all supported signatures into an internal OraFuncDef
structure. Only signatures that meet the following criteria are supported:
- return type and argument types must be one of the supported types, see OraDataType.
- only
IN
arguments are allowed. - for a given
overloading and argument position only 1 entry must exist in the
ALL_ARGUMENTStable. The
registerOracleFunctionoutputs the
OraFuncDef` details, for example:
name=STANDARD.SYS_CONTEXT,isAggregate=false
subProgramId=260, retType=StringType, args={NAMESPACE:StringType,ATTRIBUTE:StringType}
subProgramId=261, retType=StringType, args={NAMESPACE:StringType,ATTRIBUTE:StringType,NEWOPTIONAL:StringType}
This shows what overloaded signatures(subProgramIds from ALL_PROCEDURES) are made available in Spark.
We then register a user defined function with Spark(see Spark FunctionRegistry).
By default the function is registered with the FunctionIdentifier(name = {oraFuncName}, database = "oracle")
.
So in Spark expressions refer the function as oracle.{oraFuncName}
.
A different name can be provided for the function by providing a override such as:
sparkSession.registerOracleFunction(Some("STANDARD"), "SYS_CONTEXT", Some("ora_context"))
In this case refer to the function with oracle.ora_context
.
When resolving a particular function invocation our registered FunctionBuilder is invoked. This sets up a OraNativeRowFuncInvoke expression by choosing the first signature whose argument datatypes can be cast to the child expressions at the call-site.
As a convenience multiple functions can be registered in one call using registerOracleFunctions
.
For example: sparkSession.registerOracleFunction(Some("STANDARD"), "USER", ("SYS_CONTEXT", "ORA_CONTEXT")
.
Each argument can be string or a tuple of the form (oracle_func_name, name_in_spark)
.
Aggregate Functions: Oracle Native Aggregate functions can be registered in a similar fashion, for example:
-- StringAggregate concatenates all String values in a group
sparkSession.registerOracleFunction(None, "STRAGG")
sparkSession.sql(
"""
|select c_char_5, oracle.stragg(c_char_1)
|from sparktest.unit_test
|group by c_char_5""".stripMargin
).show()
- ML
- Geospatial
This a generic Spark capability developed by us.
Spark SQL Macros provide a capability to register custom functions into a Spark Session that is similar to custom UDF Registration capability of Spark. The difference being the SQL Macros registration mechanism attempts to generate an equivalent Spark catalyst Expression for the function body.
We explain the capability with an example.
A custom function can be registered, in the following way. Consider a very simple function that adds to its argument.
spark.udf.register("intUDF", (i: Int) => {
val j = 2
i + j
})
Under the covers a Invoke Catalyst Expression
is associated with the function name in
Spark's Function Registry.
Then the following spark-sql query(assuming sparktest.unit_test
has a column c_int : Int
):
select intUDF(c_int)
from sparktest.unit_test
where intUDF(c_int) < 0
generates the following physical plan:
|== Physical Plan ==
Project (3)
+- * Filter (2)
+- BatchScan (1)
(1) BatchScan
Output [1]: [C_INT#2271]
OraPlan: 00 OraSingleQueryBlock [C_INT#2271], [oracolumnref(C_INT#2271)]
01 +- OraTableScan SPARKTEST.UNIT_TEST, [C_INT#2271]
ReadSchema: struct<C_INT:int>
dsKey: DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
oraPushdownSQL: select "C_INT"
from SPARKTEST.UNIT_TEST
(2) Filter [codegen id : 1]
Input [1]: [C_INT#2271]
Condition : (if (isnull(C_INT#2271)) null else intUDF(knownnotnull(C_INT#2271)) < 0)
(3) Project [codegen id : 1]
Output [1]: [if (isnull(C_INT#2271)) null else intUDF(knownnotnull(C_INT#2271)) AS intUDF(c_int)#2278]
Input [1]: [C_INT#2271]
- The
intUDF
is invoked in theFilter operator
for evaluating theintUDF(c_int) < 0
predicate; - The
intUDF
is invoked in theProject operator
evaluate the projectionintUDF(c_int)
Logically the evaluation of these function calls involves moving values between Catalyst and Scala and making a function call on the JVM.
Catalyst CodeGen(and Java JIT) does a good job of optimizing away the Serde and function calls for simple functions; but in general this cannot be avoided. Besides since the function code is a blackbox no further optimizaion, like expression pushdown to Oracle is possible.
The intUDF
is a trivial function that just adds 2
to its argument.
It would be nice if we convert the function body into a + 2
expressions.
With Spark SQL Macros you can register the function as a macro like this:
import org.apache.spark.sql.defineMacros._
spark.registerMacro("intUDM", spark.udm((i: Int) => {
val j = 2
i + j
}))
This is almost identical to the registeration process for custom functions in Spark. But under the covers, we leverage the Macro mechanics of the Scala Compiler to analyze the Scala AST of the function body and try to generate an equivalent Catalyst Expression for the function body.
If we succeed then what is registered in the Function Registry is a plain old Catalyst Expression with holes that get replaced with argument expressions at any call-site of the function.
The query:
select intUDM(c_int)
from sparktest.unit_test
where intUDM(c_int) < 0
generates the following physical plan:
|== Physical Plan ==
Project (2)
+- BatchScan (1)
(1) BatchScan
Output [1]: [(c_int + 2)#2316]
OraPlan: 00 OraSingleQueryBlock [(C_INT#2309 + 2) AS (c_int + 2)#2316], [oraalias((C_INT#2309 + 2) AS (c_int + 2)#2316)], orabinaryopexpression((((C_INT#2309 + 2) < 0) AND isnotnull(C_INT#2309)))
01 +- OraTableScan SPARKTEST.UNIT_TEST, [C_INT#2309]
ReadSchema: struct<(c_int + 2):int>
dsKey: DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
oraPushdownBindValues: 2, 0
oraPushdownSQL: select ("C_INT" + 2) AS "(c_int + 2)"
from SPARKTEST.UNIT_TEST
where ((("C_INT" + ?) < ?) AND "C_INT" IS NOT NULL)
(2) Project [codegen id : 1]
Output [1]: [(c_int + 2)#2316]
Input [1]: [(c_int + 2)#2316]
- The predicate
intUDM(c_int) < 0
becomes("C_INT" + ?) < ?
(the literal in a predicate is converted to a bind value) - the projection
intUDM(c_int)
becomes"C_INT" + 2
. - since Macro calls are just plain old catalyst expressions, The Project and Filters are pushable to Oracle. So the entire query is collapsed into an Oracle Scan.
We plan translation support the following scala constructs
- Primitive datatypes, Case Classes in Scala, Arrays, Collections and Maps
- Value Definitions
- Arbitrary References at the macro call-site that can be evaluated at macro definition time
- Arithmetic, Datetime, String, decimal functions supported in Catalyst
In addition to supporting User-Defined macros, we also plan to provide a way such that argument functions provided to DataFrame map and flatMap high-order functions get converted to equivalent catalyst expressions.
- Quick Start
- Latest Demo
- Configuration
- Catalog
- Translation
- Query Splitting details
- DML Operations
- Language Integration
- Dockerized Demo env.
- Sharded Database
- Developer Notes