Skip to content

Commit

Permalink
Put DF_UDF plugin code into the main uber jar. (#11634)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <[email protected]>
  • Loading branch information
revans2 authored Oct 24, 2024
1 parent db15a61 commit 05f40b5
Show file tree
Hide file tree
Showing 20 changed files with 562 additions and 249 deletions.
35 changes: 31 additions & 4 deletions df_udf/README.md → DF_UDF_README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ commands.

## Setup

To do this include com.nvidia:df_udf_plugin as a dependency for your project and also include it on the
classpath for your Apache Spark environment. Then include `com.nvidia.spark.DFUDFPlugin` in the config
`spark.sql.extensions`. Now you can implement a UDF in terms of Dataframe operations.
The dataframe UDF plugin is packaged in the same jar as the RAPIDS Accelerator for Apache Spark. This jar will need to
be added as a compile time dependency for code that wants to use this feature as well as adding the jar to your Spark
classpath just like you would do for GPU acceleration.

If you plan to not use the GPU accelerated processing, but still want dataframe UDF support on CPU applications then
add `com.nvidia.spark.DFUDFPlugin` to the `spark.sql.extensions` config. If you do use GPU accelerated processing
the RAPIDS Plugin will enable this automatically. You don't need to set the `spark.sql.extensions` config, but it
won't hurt anything if you do add it. Now you can implement a UDF in terms of Dataframe operations.

## Usage

Expand Down Expand Up @@ -48,6 +53,28 @@ Seq(Array(1L, 2L, 3L)).toDF("data").selectExpr("sum_array(data) as result").show
+------+
```

Java APIs are also supported and should work the same as Spark's UDFs

```java
import com.nvidia.spark.functions.df_udf

import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.expressions.UserDefinedFunction;


UserDefinedFunction myAdd = df_udf((Column lhs, Column rhs) -> lhs + rhs)
spark.udf().register("myadd", myAdd)

spark.sql("SELECT myadd(1, 1) as r").show();
// +--+
// | r|
// +--+
// | 2|
// +--+

```

## Type Checks

DataFrame APIs do not provide type safety when writing the code and that is the same here. There are no builtin type
Expand Down Expand Up @@ -87,4 +114,4 @@ at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveUpCast$$anonfun$apply$57$$anonfun$applyOrElse$234.applyOrElse(Analyzer.scala:3654)
```

We hope to add optional type checks in the future.
We hope to add optional type checks in the future.
88 changes: 0 additions & 88 deletions df_udf/pom.xml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package com.nvidia.spark
package com.nvidia.spark.rapids

import com.nvidia.spark.functions._

Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
<modules>
<module>aggregator</module>
<module>datagen</module>
<module>df_udf</module>
<module>dist</module>
<module>integration_tests</module>
<module>shuffle-plugin</module>
Expand Down
88 changes: 0 additions & 88 deletions scala2.13/df_udf/pom.xml

This file was deleted.

1 change: 0 additions & 1 deletion scala2.13/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
<modules>
<module>aggregator</module>
<module>datagen</module>
<module>df_udf</module>
<module>dist</module>
<module>integration_tests</module>
<module>shuffle-plugin</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,18 @@

package com.nvidia.spark

import com.nvidia.spark.rapids.{DFUDFPluginAPI, ShimLoader}

import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

class DFUDFPlugin extends (SparkSessionExtensions => Unit) {
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectResolutionRule(logicalPlanRules)
}
private lazy val impl: DFUDFPluginAPI = ShimLoader.newDFUDFImpl()

override def apply(extensions: SparkSessionExtensions): Unit =
impl(extensions)

def logicalPlanRules(sparkSession: SparkSession): Rule[LogicalPlan] = {
org.apache.spark.sql.nvidia.LogicalPlanRules()
}
def logicalPlanRules(sparkSession: SparkSession): Rule[LogicalPlan] =
impl.logicalPlanRules(sparkSession)
}
Loading

0 comments on commit 05f40b5

Please sign in to comment.