Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Shims for adding Databricks 14.3 Support [databricks] #11635

Open
wants to merge 21 commits into
base: branch-24.12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8f7be42
Added 350db shims
razajafri Oct 17, 2024
cceaa9b
Merge remote-tracking branch 'origin/branch-24.12' into HEAD
razajafri Oct 30, 2024
93c7ff5
changed 350db to 350db143 to plan for the upcoming Databricks release…
razajafri Oct 30, 2024
49e665a
removed RapidsShuffleManager because it's shimmed
razajafri Oct 30, 2024
5bf423d
Added pom changes
razajafri Oct 30, 2024
5aca66f
Merge remote-tracking branch 'upstream/branch-24.12' into HEAD
razajafri Oct 30, 2024
d4d879b
Fixed Errors due to upmerge
razajafri Oct 30, 2024
8a874ca
Append Databricks version to the spark version for 14.3 programmatically
razajafri Oct 30, 2024
848e9aa
Generated Scala 2.13 poms
razajafri Oct 31, 2024
1938c2d
addressed review comments
razajafri Nov 2, 2024
7fe0830
updated copyrights
razajafri Nov 4, 2024
6c982f3
Merge remote-tracking branch 'origin/branch-24.12' into HEAD
razajafri Nov 4, 2024
7f03c67
Removed 350db143 from build and included missing 353 shims
razajafri Nov 5, 2024
bf9ef67
regenerated Scala 2.13 poms
razajafri Nov 5, 2024
883b1af
Handle the change that breaks the assumption that all Databricks rele…
razajafri Nov 5, 2024
e12e554
regenerated 2.13 poms
razajafri Nov 5, 2024
0594e0d
Merge branch 'branch-24.12' into SP-10661-db-14.3-2
razajafri Nov 6, 2024
f08fba1
add supportedExprs.csv for Spark 4.0.0
razajafri Nov 6, 2024
74ce118
Add Databricks 14.3 to premerge
razajafri Nov 7, 2024
c844b5d
Added missed Spark 344 shims
razajafri Nov 7, 2024
4e4f5cb
Temporarily skip tests on Databricks 14.3 until the test failures are…
NvTimLiu Nov 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,23 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>release350db143</id>
<activation>
<property>
<name>buildver</name>
<value>350db143</value>
</property>
</activation>
<dependencies>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-delta-spark350db143_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>${spark.version.classifier}</classifier>
</dependency>
</dependencies>
</profile>
<profile>
<id>release351</id>
<activation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
{"spark": "342"}
{"spark": "343"}
{"spark": "350"}
{"spark": "350db143"}
{"spark": "351"}
{"spark": "352"}
{"spark": "353"}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -68,7 +68,7 @@ object GpuDeltaLog {
dataPath: String,
options: Map[String, String],
rapidsConf: RapidsConf): GpuDeltaLog = {
val deltaLog = DeltaLog.forTable(spark, dataPath, options)
val deltaLog = DeltaLog.forTable(spark, new Path(dataPath), options)
new GpuDeltaLog(deltaLog, rapidsConf)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -337,7 +337,7 @@ class DeltaCreatableRelationProviderMeta(
}
val path = saveCmd.options.get("path")
if (path.isDefined) {
val deltaLog = DeltaLog.forTable(SparkSession.active, path.get, saveCmd.options)
val deltaLog = DeltaLog.forTable(SparkSession.active, new Path(path.get), saveCmd.options)
RapidsDeltaUtils.tagForDeltaWrite(this, saveCmd.query.schema, Some(deltaLog),
saveCmd.options, SparkSession.active)
} else {
Expand All @@ -346,4 +346,4 @@ class DeltaCreatableRelationProviderMeta(
}

override def convertToGpu(): GpuCreatableRelationProvider = new GpuDeltaDataSource(conf)
}
}
gerashegalov marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* This file was derived from OptimisticTransaction.scala and TransactionalWrite.scala
* in the Delta Lake project at https://github.com/delta-io/delta.
*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.databricks.sql.transaction.tahoe.rapids

import com.databricks.sql.transaction.tahoe._
import com.databricks.sql.transaction.tahoe.actions.FileAction
import com.databricks.sql.transaction.tahoe.constraints.{Constraint, DeltaInvariantCheckerExec}
import com.databricks.sql.transaction.tahoe.files.TahoeBatchFileIndex
import com.databricks.sql.transaction.tahoe.metering.DeltaLogging
import com.databricks.sql.transaction.tahoe.sources.DeltaSQLConf
import com.nvidia.spark.rapids._

import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.rapids.GpuShuffleEnv
import org.apache.spark.sql.rapids.GpuV1WriteUtils.GpuEmpty2Null
import org.apache.spark.sql.rapids.delta.{DeltaShufflePartitionsUtil, GpuOptimizeWriteExchangeExec, OptimizeWriteExchangeExec}
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.Clock

/**
* Used to perform a set of reads in a transaction and then commit a set of updates to the
* state of the log. All reads from the DeltaLog, MUST go through this instance rather
* than directly to the DeltaLog otherwise they will not be check for logical conflicts
* with concurrent updates.
*
* This class is not thread-safe.
*
* @param deltaLog The Delta Log for the table this transaction is modifying.
* @param snapshot The snapshot that this transaction is reading at.
* @param rapidsConf RAPIDS Accelerator config settings.
*/
abstract class GpuOptimisticTransactionBase
(deltaLog: DeltaLog, snapshot: Snapshot, val rapidsConf: RapidsConf)
(implicit clock: Clock)
extends OptimisticTransaction(deltaLog, snapshot)(clock)
with DeltaLogging {

/**
* Adds checking of constraints on the table
* @param plan Plan to generate the table to check against constraints
* @param constraints Constraints to check on the table
* @return GPU columnar plan to execute
*/
protected def addInvariantChecks(plan: SparkPlan, constraints: Seq[Constraint]): SparkPlan = {
val cpuInvariants =
DeltaInvariantCheckerExec.buildInvariantChecks(plan.output, constraints, plan.session)
GpuCheckDeltaInvariant.maybeConvertToGpu(cpuInvariants, rapidsConf) match {
case Some(gpuInvariants) =>
val gpuPlan = convertToGpu(plan)
GpuDeltaInvariantCheckerExec(gpuPlan, gpuInvariants)
case None =>
val cpuPlan = convertToCpu(plan)
DeltaInvariantCheckerExec(cpuPlan, constraints)
}
}

/** GPU version of convertEmptyToNullIfNeeded */
private def gpuConvertEmptyToNullIfNeeded(
plan: GpuExec,
partCols: Seq[Attribute],
constraints: Seq[Constraint]): SparkPlan = {
if (!spark.conf.get(DeltaSQLConf.CONVERT_EMPTY_TO_NULL_FOR_STRING_PARTITION_COL)) {
return plan
}
// No need to convert if there are no constraints. The empty strings will be converted later by
// FileFormatWriter and FileFormatDataWriter. Note that we might still do unnecessary convert
// here as the constraints might not be related to the string partition columns. A precise
// check will need to walk the constraints to see if such columns are really involved. It
// doesn't seem to worth the effort.
if (constraints.isEmpty) return plan

val partSet = AttributeSet(partCols)
var needConvert = false
val projectList: Seq[NamedExpression] = plan.output.map {
case p if partSet.contains(p) && p.dataType == StringType =>
needConvert = true
GpuAlias(GpuEmpty2Null(p), p.name)()
case attr => attr
}
if (needConvert) GpuProjectExec(projectList.toList, plan) else plan
}

/**
* If there is any string partition column and there are constraints defined, add a projection to
* convert empty string to null for that column. The empty strings will be converted to null
* eventually even without this convert, but we want to do this earlier before check constraints
* so that empty strings are correctly rejected. Note that this should not cause the downstream
* logic in `FileFormatWriter` to add duplicate conversions because the logic there checks the
* partition column using the original plan's output. When the plan is modified with additional
* projections, the partition column check won't match and will not add more conversion.
*
* @param plan The original SparkPlan.
* @param partCols The partition columns.
* @param constraints The defined constraints.
* @return A SparkPlan potentially modified with an additional projection on top of `plan`
*/
override def convertEmptyToNullIfNeeded(
plan: SparkPlan,
partCols: Seq[Attribute],
constraints: Seq[Constraint]): SparkPlan = {
// Reuse the CPU implementation if the plan ends up on the CPU, otherwise do the
// equivalent on the GPU.
plan match {
case g: GpuExec => gpuConvertEmptyToNullIfNeeded(g, partCols, constraints)
case _ => super.convertEmptyToNullIfNeeded(plan, partCols, constraints)
}
}

override def writeFiles(
inputData: Dataset[_],
additionalConstraints: Seq[Constraint]): Seq[FileAction] = {
writeFiles(inputData, None, additionalConstraints)
}

protected def applyOptimizeWriteIfNeeded(
spark: SparkSession,
physicalPlan: SparkPlan,
partitionSchema: StructType,
isOptimize: Boolean): SparkPlan = {
val optimizeWriteEnabled = !isOptimize &&
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_ENABLED)
.orElse(DeltaConfigs.OPTIMIZE_WRITE.fromMetaData(metadata)).getOrElse(false)
if (optimizeWriteEnabled) {
val planWithoutTopRepartition =
DeltaShufflePartitionsUtil.removeTopRepartition(physicalPlan)
val partitioning = DeltaShufflePartitionsUtil.partitioningForRebalance(
physicalPlan.output, partitionSchema, spark.sessionState.conf.numShufflePartitions)
planWithoutTopRepartition match {
case p: GpuExec =>
val partMeta = GpuOverrides.wrapPart(partitioning, rapidsConf, None)
partMeta.tagForGpu()
if (partMeta.canThisBeReplaced) {
val plan = GpuOptimizeWriteExchangeExec(partMeta.convertToGpu(), p)
if (GpuShuffleEnv.useGPUShuffle(rapidsConf)) {
GpuCoalesceBatches(plan, TargetSize(rapidsConf.gpuTargetBatchSizeBytes))
} else {
GpuShuffleCoalesceExec(plan, rapidsConf.gpuTargetBatchSizeBytes)
}
} else {
GpuColumnarToRowExec(OptimizeWriteExchangeExec(partitioning, p))
}
case p =>
OptimizeWriteExchangeExec(partitioning, p)
}
} else {
physicalPlan
}
}

protected def isOptimizeCommand(plan: LogicalPlan): Boolean = {
val leaves = plan.collectLeaves()
leaves.size == 1 && leaves.head.collect {
case LogicalRelation(HadoopFsRelation(
index: TahoeBatchFileIndex, _, _, _, _, _), _, _, _) =>
index.actionType.equals("Optimize")
}.headOption.getOrElse(false)
}

protected def convertToCpu(plan: SparkPlan): SparkPlan = plan match {
case GpuRowToColumnarExec(p, _) => p
case p: GpuExec => GpuColumnarToRowExec(p)
case p => p
}

protected def convertToGpu(plan: SparkPlan): SparkPlan = plan match {
case GpuColumnarToRowExec(p, _) => p
case p: GpuExec => p
case p => GpuRowToColumnarExec(p, TargetSize(rapidsConf.gpuTargetBatchSizeBytes))
}
}
Loading
Loading