Skip to content

Language Integration

Harish Butani edited this page Jan 13, 2022 · 8 revisions

Language integration encompasses capabilities that extend Apache Spark such that on the one hand native Oracle functions and data structures are usable in Spark expressions (through the mechanism of user-defined registration of functions/types/operators) and, on the other hand, support for translation of custom code into equivalent oracle sql/pl-sql, so that larger parts of Spark pipelines are pushed down to the Oracle Database for processing.

Registration and Use of Oracle Native Functions (Row and Aggregate) in Spark

Oracle provides a large library of SQL and PL-SQL functions. On data processed in the Oracle Database, users should be able to use these functions in their queries. For example the function SYS_CONTEXT from the SATNDARD pl-sql package should be usable in Spark SQL queries that are on Oracle tables. We enable this by providing an implicit extension of the SparkSession. One of the extension functions is the ability to register oracle functions.

import org.apache.spark.sql.oracle._
sparkSession.registerOracleFunction(Some("STANDARD"), "SYS_CONTEXT")

Now users can use the sys_context function like this:

select oracle.sys_context('USERENV', 'CLIENT_PROGRAM_NAME') ora_client_pgm
from sparktest.unit_test
limit 1;

Function usage is only supported for operations that are pushed to the Oracle database. In other cases a UnsupportedOperationException is thrown with 'Cannot generate code for expression..' error. This is because the Oracle native function call expression is resolved into an OraNativeRowFuncInvoke(see below for details) that cannot be evaluated in Spark. It is only a marker for the oracle pushdown planner extensions. So the following will throw an error:

set spark.sql.oracle.enable.pushdown=false
select oracle.sys_context('USERENV', 'CLIENT_PROGRAM_NAME') ora_client_pgm
from sparktest.unit_test
limit 1;

java.lang.UnsupportedOperationException: Cannot generate code for expression: oranativerowfuncinvoke(name=STANDARD....

Under the covers when a registerOracleFunction is issued we extract the function definition from the ALL_PROCEDURES and ALL_ARGUMENTS oracle dictionary views. Oracle functions can be overloaded; so we capture all supported signatures into an internal OraFuncDef structure. Only signatures that meet the following criteria are supported:

  • Return type and argument types must be one of the supported types. See OraDataType.
  • Only IN arguments are allowed.
  • For a given overloading and argument position only one entry must exist in the ALL_ARGUMENTS table. The registerOracleFunction outputs the OraFuncDef details, for example:
name=STANDARD.SYS_CONTEXT,isAggregate=false
  subProgramId=260, retType=StringType, args={NAMESPACE:StringType,ATTRIBUTE:StringType}
  subProgramId=261, retType=StringType, args={NAMESPACE:StringType,ATTRIBUTE:StringType,NEWOPTIONAL:StringType}

This shows what overloaded signatures (subProgramIds from ALL_PROCEDURES) are made available in Spark.

We then register a user-defined function with Spark (see Spark FunctionRegistry). By default the function is registered with the FunctionIdentifier(name = {oraFuncName}, database = "oracle"). So, in Spark expressions refer the function as oracle.{oraFuncName}. A different name can be provided for the function by providing a override such as: sparkSession.registerOracleFunction(Some("STANDARD"), "SYS_CONTEXT", Some("ora_context")) In this case refer to the function with oracle.ora_context.

When resolving a particular function invocation, our registered FunctionBuilder is invoked. This sets up an OraNativeRowFuncInvoke expression by choosing the first signature whose argument datatypes can be cast to the child expressions at the call site.

As a convenience multiple functions can be registered in one call using registerOracleFunctions. For example: sparkSession.registerOracleFunction(Some("STANDARD"), "USER", ("SYS_CONTEXT", "ORA_CONTEXT"). Each argument can be string or a tuple of the form (oracle_func_name, name_in_spark).

Aggregate Functions: Oracle Native Aggregate functions can be registered in a similar fashion, for example:

-- StringAggregate concatenates all String values in a group
sparkSession.registerOracleFunction(None, "STRAGG")

sparkSession.sql(
"""
      |select c_char_5, oracle.stragg(c_char_1)
      |from sparktest.unit_test
      |group by c_char_5""".stripMargin
).show()

Oracle Types and DB-Options

  • ML
  • Geospatial

Table Functions

Spark Macros: translation of Scala code into Catalyst Expressions

See Spark SQL Macros doc