Skip to content

Commit

Permalink
[Spark] Add Preserving Row Tracking in Merge (delta-io#2936)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
Preserve row IDs in Merge by reading the metadata column and writing it
out to the physical column.
<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
Existing UTs.
<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

## Does this PR introduce _any_ user-facing changes?
No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
  • Loading branch information
longvu-db authored Apr 22, 2024
1 parent 7d31b4f commit e75e4f9
Show file tree
Hide file tree
Showing 4 changed files with 701 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,13 @@ case class MergeIntoCommand(
val finalActions = createSetTransaction(spark, targetDeltaLog).toSeq ++ mergeActions
deltaTxn.commitIfNeeded(
actions = finalActions,
DeltaOperations.Merge(
op = DeltaOperations.Merge(
predicate = Option(condition),
matchedPredicates = matchedClauses.map(DeltaOperations.MergePredicate(_)),
notMatchedPredicates = notMatchedClauses.map(DeltaOperations.MergePredicate(_)),
notMatchedBySourcePredicates =
notMatchedBySourceClauses.map(DeltaOperations.MergePredicate(_))))
notMatchedBySourceClauses.map(DeltaOperations.MergePredicate(_))),
tags = RowTracking.addPreservedRowTrackingTagIfNotSet(deltaTxn.snapshot))
val stats = collectMergeStats(deltaTxn, materializeSourceReason)
recordDeltaEvent(targetDeltaLog, "delta.dml.merge.stats", data = stats)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,10 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
deltaTxn,
filesToRewrite,
columnsToDrop = Nil)
val baseTargetDF = Dataset.ofRows(spark, targetPlan)
val baseTargetDF = RowTracking.preserveRowTrackingColumns(
dfWithoutRowTrackingColumns = Dataset.ofRows(spark, targetPlan),
snapshot = deltaTxn.snapshot)

val joinType = if (writeUnmodifiedRows) {
if (shouldOptimizeMatchedOnlyMerge(spark)) {
"rightOuter"
Expand Down Expand Up @@ -386,30 +389,46 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
joinedDF,
clauses = matchedClauses ++ notMatchedClauses ++ notMatchedBySourceClauses)

// In case Row IDs are preserved, get the attribute expression of the Row ID column.
val rowIdColumnExpressionOpt =
MaterializedRowId.getAttribute(deltaTxn.snapshot, joinedAndPrecomputedConditionsDF)

val rowCommitVersionColumnExpressionOpt =
MaterializedRowCommitVersion.getAttribute(deltaTxn.snapshot, joinedAndPrecomputedConditionsDF)

// The target output columns need to be marked as nullable here, as they are going to be used
// to reference the output of an outer join.
val targetWriteCols = postEvolutionTargetExpressions(makeNullable = true)

// If there are N columns in the target table, the full outer join output will have:
// - N columns for target table
// - Two optional Row ID / Row commit version preservation columns with their physical name.
// - ROW_DROPPED_COL to define whether the generated row should be dropped or written
// - if CDC is enabled, also CDC_TYPE_COLUMN_NAME with the type of change being performed
// in a particular row
// (N+1 or N+2 columns depending on CDC disabled / enabled)
val outputColNames =
targetWriteCols.map(_.name) ++
rowIdColumnExpressionOpt.map(_.name) ++
rowCommitVersionColumnExpressionOpt.map(_.name) ++
Seq(ROW_DROPPED_COL) ++
(if (cdcEnabled) Some(CDC_TYPE_COLUMN_NAME) else None)

// Copy expressions to copy the existing target row and not drop it (ROW_DROPPED_COL=false),
// and in case CDC is enabled, set it to CDC_TYPE_NOT_CDC.
// (N+1 or N+2 or N+3 columns depending on CDC disabled / enabled and if Row IDs are preserved)
var noopCopyExprs = (targetWriteCols :+ incrNoopCountExpr) ++
(if (cdcEnabled) Some(CDC_TYPE_NOT_CDC) else None)
val noopCopyExprs =
targetWriteCols ++
rowIdColumnExpressionOpt ++
rowCommitVersionColumnExpressionOpt ++
Seq(incrNoopCountExpr) ++
(if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Seq())

// Generate output columns.
val outputCols = generateWriteAllChangesOutputCols(
targetWriteCols,
rowIdColumnExpressionOpt,
rowCommitVersionColumnExpressionOpt,
outputColNames,
noopCopyExprs,
clausesWithPrecompConditions,
Expand All @@ -422,6 +441,8 @@ trait ClassicMergeExecutor extends MergeOutputGeneration {
outputCols,
outputColNames,
noopCopyExprs,
rowIdColumnExpressionOpt.map(_.name),
rowCommitVersionColumnExpressionOpt.map(_.name),
deduplicateCDFDeletes)
} else {
// change data capture is off, just output the normal data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.commands.merge

import scala.collection.mutable

import org.apache.spark.sql.delta.{RowCommitVersion, RowId}
import org.apache.spark.sql.delta.commands.MergeIntoCommandBase
import org.apache.spark.sql.delta.commands.cdc.CDCReader

Expand Down Expand Up @@ -98,6 +99,8 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
*/
protected def generateWriteAllChangesOutputCols(
targetWriteCols: Seq[Expression],
rowIdColumnExpressionOpt: Option[NamedExpression],
rowCommitVersionColumnExpressionOpt: Option[NamedExpression],
targetWriteColNames: Seq[String],
noopCopyExprs: Seq[Expression],
clausesWithPrecompConditions: Seq[DeltaMergeIntoClause],
Expand All @@ -109,6 +112,8 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
// ==== Generate N + 2 (N + 4 preserving Row Tracking) expressions for MATCHED clauses ====
val processedMatchClauses: Seq[ProcessedClause] = generateAllActionExprs(
targetWriteCols,
rowIdColumnExpressionOpt,
rowCommitVersionColumnExpressionOpt,
clausesWithPrecompConditions.collect { case c: DeltaMergeIntoMatchedClause => c },
cdcEnabled,
shouldCountDeletedRows)
Expand All @@ -120,12 +125,19 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
// N + 1 (or N + 2 with CDC, N + 4 preserving Row Tracking and CDC) expressions to delete the
// unmatched source row when it should not be inserted. `target.output` will produce NULLs
// which will get deleted eventually.
val deleteSourceRowExprs = (targetWriteCols :+ Literal.TrueLiteral) ++
(if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Nil)

val deleteSourceRowExprs =
(targetWriteCols ++
rowIdColumnExpressionOpt.map(_ => Literal(null)) ++
rowCommitVersionColumnExpressionOpt.map(_ => Literal(null)) ++
Seq(Literal(true))) ++
(if (cdcEnabled) Seq(CDC_TYPE_NOT_CDC) else Seq())

// ==== Generate N + 2 (N + 4 preserving Row Tracking) expressions for NOT MATCHED clause ====
val processedNotMatchClauses: Seq[ProcessedClause] = generateAllActionExprs(
targetWriteCols,
rowIdColumnExpressionOpt,
rowCommitVersionColumnExpressionOpt,
clausesWithPrecompConditions.collect { case c: DeltaMergeIntoNotMatchedClause => c },
cdcEnabled,
shouldCountDeletedRows)
Expand All @@ -137,6 +149,8 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
// === Generate N + 2 (N + 4 with Row Tracking) expressions for NOT MATCHED BY SOURCE clause ===
val processedNotMatchBySourceClauses: Seq[ProcessedClause] = generateAllActionExprs(
targetWriteCols,
rowIdColumnExpressionOpt,
rowCommitVersionColumnExpressionOpt,
clausesWithPrecompConditions.collect { case c: DeltaMergeIntoNotMatchedBySourceClause => c },
cdcEnabled,
shouldCountDeletedRows)
Expand Down Expand Up @@ -187,7 +201,17 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
ifSourceRowNull -> notMatchedBySourceExprs(i),
ifTargetRowNull -> notMatchedExprs(i)),
/* otherwise */ matchedExprs(i))
Column(Alias(caseWhen, name)())
if (rowIdColumnExpressionOpt.exists(_.name == name)) {
// Add Row ID metadata to allow writing the column.
Column(Alias(caseWhen, name)(
explicitMetadata = Some(RowId.columnMetadata(name))))
} else if (rowCommitVersionColumnExpressionOpt.exists(_.name == name)) {
// Add Row Commit Versions metadata to allow writing the column.
Column(Alias(caseWhen, name)(
explicitMetadata = Some(RowCommitVersion.columnMetadata(name))))
} else {
Column(Alias(caseWhen, name)())
}
}
logDebug("writeAllChanges: join output expressions\n\t" + seqToString(outputCols.map(_.expr)))
outputCols
Expand All @@ -206,6 +230,11 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
* UPDATE, DELETE and/or INSERT action(s).
* @param targetWriteCols List of output column expressions from the target table. Used to
* generate CDC data for DELETE.
* @param rowIdColumnExpressionOpt The optional Row ID preservation column with the physical
* Row ID name, it stores stable Row IDs of the table.
* @param rowCommitVersionColumnExpressionOpt The optional Row Commit Version preservation
* column with the physical Row Commit Version name, it stores
* stable Row Commit Versions.
* @param clausesWithPrecompConditions List of merge clauses with precomputed conditions. Action
* expressions are generated for each of these clauses.
* @param cdcEnabled Whether the generated expressions should include CDC information.
Expand All @@ -217,6 +246,8 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
*/
protected def generateAllActionExprs(
targetWriteCols: Seq[Expression],
rowIdColumnExpressionOpt: Option[NamedExpression],
rowCommitVersionColumnExpressionOpt: Option[NamedExpression],
clausesWithPrecompConditions: Seq[DeltaMergeIntoClause],
cdcEnabled: Boolean,
shouldCountDeletedRows: Boolean): Seq[ProcessedClause] = {
Expand All @@ -230,6 +261,8 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
valueToReturn = false)
// Generate update expressions and set ROW_DROPPED_COL = false
u.resolvedActions.map(_.expr) ++
rowIdColumnExpressionOpt ++
rowCommitVersionColumnExpressionOpt.map(_ => Literal(null)) ++
Seq(incrCountExpr) ++
(if (cdcEnabled) Some(Literal(CDC_TYPE_UPDATE_POSTIMAGE)) else None)
case u: DeltaMergeIntoNotMatchedBySourceUpdateClause =>
Expand All @@ -238,6 +271,8 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
valueToReturn = false)
// Generate update expressions and set ROW_DROPPED_COL = false
u.resolvedActions.map(_.expr) ++
rowIdColumnExpressionOpt ++
rowCommitVersionColumnExpressionOpt.map(_ => Literal(null)) ++
Seq(incrCountExpr) ++
(if (cdcEnabled) Some(Literal(CDC_TYPE_UPDATE_POSTIMAGE)) else None)
case _: DeltaMergeIntoMatchedDeleteClause =>
Expand All @@ -252,6 +287,8 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
}
// Generate expressions to set the ROW_DROPPED_COL = true and mark as a DELETE
targetWriteCols ++
rowIdColumnExpressionOpt ++
rowCommitVersionColumnExpressionOpt ++
Seq(incrCountExpr) ++
(if (cdcEnabled) Some(CDC_TYPE_DELETE) else None)
case _: DeltaMergeIntoNotMatchedBySourceDeleteClause =>
Expand All @@ -266,13 +303,17 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
}
// Generate expressions to set the ROW_DROPPED_COL = true and mark as a DELETE
targetWriteCols ++
rowIdColumnExpressionOpt ++
rowCommitVersionColumnExpressionOpt ++
Seq(incrCountExpr) ++
(if (cdcEnabled) Some(CDC_TYPE_DELETE) else None)
case i: DeltaMergeIntoNotMatchedInsertClause =>
val incrInsertedCountExpr = incrementMetricsAndReturnBool(
names = Seq("numTargetRowsInserted"),
valueToReturn = false)
i.resolvedActions.map(_.expr) ++
rowIdColumnExpressionOpt.map(_ => Literal(null)) ++
rowCommitVersionColumnExpressionOpt.map(_ => Literal(null)) ++
Seq(incrInsertedCountExpr) ++
(if (cdcEnabled) Some(Literal(CDC_TYPE_INSERT)) else None)
}
Expand Down Expand Up @@ -346,6 +387,8 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
outputCols: Seq[Column],
outputColNames: Seq[String],
noopCopyExprs: Seq[Expression],
rowIdColumnNameOpt: Option[String],
rowCommitVersionColumnNameOpt: Option[String],
deduplicateDeletes: DeduplicateCDFDeletes): DataFrame = {
import org.apache.spark.sql.delta.commands.cdc.CDCReader._
// The main partition just needs to swap in the CDC_TYPE_NOT_CDC value.
Expand Down Expand Up @@ -416,12 +459,16 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
sourceDf,
cdcArray,
cdcToMainDataArray,
rowIdColumnNameOpt,
rowCommitVersionColumnNameOpt,
outputColNames)
} else {
packAndExplodeCDCOutput(
sourceDf,
cdcArray,
cdcToMainDataArray,
rowIdColumnNameOpt,
rowCommitVersionColumnNameOpt,
outputColNames,
dedupColumns = Nil)
}
Expand All @@ -440,22 +487,37 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
* @param cdcToMainDataArray Transforms the packed CDC data to add the main data output, i.e. rows
* that are inserted or updated and will be written to the main
* partition.
* @param rowIdColumnNameOpt The optional Row ID preservation column with the physical Row ID
* name, it stores stable Row IDs.
* @param rowCommitVersionColumnNameOpt The optional Row Commit Version preservation column
* with the physical Row Commit Version name, it stores
* stable Row Commit Versions.
* @param outputColNames All the main and CDC columns to use in the output.
* @param dedupColumns Additional columns to add to enable deduplication.
*/
private def packAndExplodeCDCOutput(
sourceDf: DataFrame,
cdcArray: Column,
cdcToMainDataArray: Column,
rowIdColumnNameOpt: Option[String],
rowCommitVersionColumnNameOpt: Option[String],
outputColNames: Seq[String],
dedupColumns: Seq[Column]): DataFrame = {
val unpackedCols = outputColNames.map { name =>
if (rowIdColumnNameOpt.contains(name)) {
// Add metadata to allow writing the column although it is not part of the schema.
col(s"packedData.`$name`").as(name, RowId.columnMetadata(name))
} else if (rowCommitVersionColumnNameOpt.contains(name)) {
col(s"packedData.`$name`").as(name, RowCommitVersion.columnMetadata(name))
} else {
col(s"packedData.`$name`").as(name)
}
}

sourceDf
// `explode()` creates a [[Generator]] which can't handle non-deterministic expressions that
// we use to increment metric counters. We first project the CDC array so that the expressions
// are evaluated before we explode the array,
// are evaluated before we explode the array.
.select(cdcArray.as("projectedCDC") +: dedupColumns: _*)
.select(explode(col("projectedCDC")).as("packedCdc") +: dedupColumns: _*)
.select(explode(cdcToMainDataArray).as("packedData") +: dedupColumns: _*)
Expand All @@ -480,6 +542,8 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
df: DataFrame,
cdcArray: Column,
cdcToMainDataArray: Column,
rowIdColumnNameOpt: Option[String],
rowCommitVersionColumnNameOpt: Option[String],
outputColNames: Seq[String]): DataFrame = {
val dedupColumns = if (deduplicateDeletes.includesInserts) {
Seq(col(TARGET_ROW_INDEX_COL), col(SOURCE_ROW_INDEX_COL))
Expand All @@ -491,6 +555,8 @@ trait MergeOutputGeneration { self: MergeIntoCommandBase =>
df,
cdcArray,
cdcToMainDataArray,
rowIdColumnNameOpt,
rowCommitVersionColumnNameOpt,
outputColNames,
dedupColumns
)
Expand Down
Loading

0 comments on commit e75e4f9

Please sign in to comment.