Skip to content

Commit

Permalink
[Spark] Fix auto-conflict handling logic in Optimize to handle DVs (#…
Browse files Browse the repository at this point in the history
…3981)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?


- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
Bug: There was an existing long standing bug where the custom conflict
detection logic in Optimize does not catch concurrent transactions that
add DVs. e.g. AddFile(path='a') -> AddFile(path='a', dv='dv1').

Fix: Updated the conflict resolution to consider a composite key of
(path, dvId) instead of just depending on path.

## How was this patch tested?
- unit tests

## Does this PR introduce _any_ user-facing changes?
no
  • Loading branch information
rahulsmahadev authored Dec 17, 2024
1 parent fc81d12 commit f577290
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.types._
import org.apache.spark.util.{SystemClock, ThreadUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.actions.InMemoryLogReplay.UniqueFileActionTuple

/** Base class defining abstract optimize command */
abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaCommand {
Expand Down Expand Up @@ -453,12 +454,17 @@ class OptimizeExecutor(
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles, removedDVs)
commitAndRetry(txn, getOperation(), updates, metrics) { newTxn =>
val newPartitionSchema = newTxn.metadata.partitionSchema
val candidateSetOld = filesToProcess.map(_.path).toSet
// Note: When checking if the candidate set is the same, we need to consider (Path, DV)
// as the key.
val candidateSetOld = filesToProcess.
map(f => UniqueFileActionTuple(f.pathAsUri, f.getDeletionVectorUniqueId)).toSet

// We specifically don't list the files through the transaction since we are potentially
// only processing a subset of them below. If the transaction is still valid, we will
// register the files and predicate below
val candidateSetNew =
newTxn.snapshot.filesForScan(partitionPredicate).files.map(_.path).toSet
newTxn.snapshot.filesForScan(partitionPredicate).files
.map(f => UniqueFileActionTuple(f.pathAsUri, f.getDeletionVectorUniqueId)).toSet

// As long as all of the files that we compacted are still part of the table,
// and the partitioning has not changed it is valid to continue to try
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta.optimize

import java.io.File

import scala.concurrent.duration.Duration

import org.apache.spark.SparkException
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.concurrency.PhaseLockingTestMixin
import org.apache.spark.sql.delta.concurrency.TransactionExecutionTestMixin
import org.apache.spark.sql.delta.fuzzer.{OptimisticTransactionPhases, PhaseLockingTransactionExecutionObserver}
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.ThreadUtils

class OptimizeConflictSuite extends QueryTest
with SharedSparkSession
with PhaseLockingTestMixin
with TransactionExecutionTestMixin
with DeltaSQLCommandTest {

protected def appendRows(dir: File, numRows: Int, numFiles: Int): Unit = {
spark.range(start = 0, end = numRows, step = 1, numPartitions = numFiles)
.write.format("delta").mode("append").save(dir.getAbsolutePath)
}

test("conflict handling between Optimize and Business Txn") {
withTempDir { tempDir =>

// Create table with 100 rows.
appendRows(tempDir, numRows = 100, numFiles = 10)

// Enable DVs.
sql(s"ALTER TABLE delta.`${tempDir.toString}` " +
"SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath)

def optimizeTxn(): Array[Row] = {
deltaTable.optimize().executeCompaction()
Array.empty
}

def deleteTxn(): Array[Row] = {
// Delete 50% of the rows.
sql(s"DELETE FROM delta.`${tempDir}` WHERE id%2 = 0").collect()
}

val Seq(future) = runFunctionsWithOrderingFromObserver(Seq(optimizeTxn)) {
case (optimizeObserver :: Nil) =>
// Create a replacement observer for the retry thread of Optimize.
val retryObserver = new PhaseLockingTransactionExecutionObserver(
OptimisticTransactionPhases.forName("test-replacement-txn"))

// Block Optimize during the first commit attempt.
optimizeObserver.setNextObserver(retryObserver, autoAdvance = true)
unblockUntilPreCommit(optimizeObserver)
busyWaitFor(optimizeObserver.phases.preparePhase.hasEntered, timeout)

// Delete starts and finishes
deleteTxn()

// Allow Optimize to resume.
unblockCommit(optimizeObserver)
busyWaitFor(optimizeObserver.phases.commitPhase.hasLeft, timeout)
optimizeObserver.phases.postCommitPhase.exitBarrier.unblock()

// The first txn will not commit as there was a conflict commit
// (deleteTxn). Optimize will attempt to auto resolve and retry
// Wait for the retry txn to finish.
// Resume the retry txn.
unblockAllPhases(retryObserver)
}
val e = intercept[SparkException] {
ThreadUtils.awaitResult(future, timeout)
}
// The retry txn should fail as the same files are modified(DVs added) by
// the delete txn.
assert(e.getCause.getMessage.contains("DELTA_CONCURRENT_DELETE_READ"))
assert(sql(s"SELECT * FROM delta.`${tempDir}`").count() == 50)
}
}
}

0 comments on commit f577290

Please sign in to comment.