Skip to content

Language Integration

Harish Butani edited this page Jan 6, 2022 · 8 revisions

Language Integration encompasses capabilities that extend Apache Spark such that on the one hand native Oracle functions and data structures are usable in Spark expressions (through the mechanism of user defined registration of functions/types/operators); and the other hand support for translation of custom code into equivalent oracle sql/pl-sql so that larger parts of Spark pipelines are pushed down to the Oracle DB for processing.

Registration and Use of Oracle Native Functions(Row and Aggregate) in Spark

Oracle provides a large library of SQL and PL-SQL functions. On data processed in the Oracle DB users should be able to use these functions in their queries. For example the function SYS_CONTEXT from the SATNDARD pl-sql package should be usable in Spark SQL queries that are on Oracle tables. We enable this by providing an implicit extension of the SparkSession. One of the extension functions is the ability to register oracle functions.

import org.apache.spark.sql.oracle._
sparkSession.registerOracleFunction(Some("STANDARD"), "SYS_CONTEXT")

Now users can use the sys_context function like this:

select oracle.sys_context('USERENV', 'CLIENT_PROGRAM_NAME') ora_client_pgm
from sparktest.unit_test
limit 1;

Function usage is only supported for operations that are pushed to the Oracle database. In other cases a UnsupportedOperationException is thrown with 'Cannot generate code for expression..' error. This is because the oracle native function call expression is resolved into a OraNativeRowFuncInvoke(see below for details) that cannot be evaluated in Spark, it is only a marker for the oracle pushdown planner extensions. So the following will throw an error:

set spark.sql.oracle.enable.pushdown=false
select oracle.sys_context('USERENV', 'CLIENT_PROGRAM_NAME') ora_client_pgm
from sparktest.unit_test
limit 1;

java.lang.UnsupportedOperationException: Cannot generate code for expression: oranativerowfuncinvoke(name=STANDARD....

Under the covers when a registerOracleFunction is issued we extract the function defintion from the ALL_PROCEDURES and ALL_ARGUMENTS oracle dictionary views. Oracle functions can be overloaded; so we capture all supported signatures into an internal OraFuncDef structure. Only signatures that meet the following criteria are supported:

  • return type and argument types must be one of the supported types, see OraDataType.
  • only IN arguments are allowed.
  • for a given overloading and argument position only 1 entry must exist in the ALL_ARGUMENTStable. TheregisterOracleFunctionoutputs theOraFuncDef` details, for example:
name=STANDARD.SYS_CONTEXT,isAggregate=false
  subProgramId=260, retType=StringType, args={NAMESPACE:StringType,ATTRIBUTE:StringType}
  subProgramId=261, retType=StringType, args={NAMESPACE:StringType,ATTRIBUTE:StringType,NEWOPTIONAL:StringType}

This shows what overloaded signatures(subProgramIds from ALL_PROCEDURES) are made available in Spark.

We then register a user defined function with Spark(see Spark FunctionRegistry). By default the function is registered with the FunctionIdentifier(name = {oraFuncName}, database = "oracle"). So in Spark expressions refer the function as oracle.{oraFuncName}. A different name can be provided for the function by providing a override such as: sparkSession.registerOracleFunction(Some("STANDARD"), "SYS_CONTEXT", Some("ora_context")) In this case refer to the function with oracle.ora_context.

When resolving a particular function invocation our registered FunctionBuilder is invoked. This sets up a OraNativeRowFuncInvoke expression by choosing the first signature whose argument datatypes can be cast to the child expressions at the call-site.

As a convenience multiple functions can be registered in one call using registerOracleFunctions. For example: sparkSession.registerOracleFunction(Some("STANDARD"), "USER", ("SYS_CONTEXT", "ORA_CONTEXT"). Each argument can be string or a tuple of the form (oracle_func_name, name_in_spark).

Aggregate Functions: Oracle Native Aggregate functions can be registered in a similar fashion, for example:

-- StringAggregate concatenates all String values in a group
sparkSession.registerOracleFunction(None, "STRAGG")

sparkSession.sql(
"""
      |select c_char_5, oracle.stragg(c_char_1)
      |from sparktest.unit_test
      |group by c_char_5""".stripMargin
).show()

Oracle Types and DB-Options

  • ML
  • Geospatial

Table Functions

Spark Macros: translation of Scala code into Catalyst Expressions

See Spark SQL Macros doc