Skip to content

Commit

Permalink
Fix auto-conflict handling login in Optimize to consider DV
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulsmahadev committed Dec 17, 2024
1 parent 05cdd3c commit 1bc3a8a
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
import org.apache.spark.sql.types._
import org.apache.spark.util.{SystemClock, ThreadUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.delta.actions.InMemoryLogReplay.UniqueFileActionTuple

/** Base class defining abstract optimize command */
abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaCommand {
Expand Down Expand Up @@ -238,7 +239,6 @@ case class Bin(partitionValues: Map[String, String], files: Seq[AddFile])
* @param bins The set of bins to process in this transaction
*/
case class Batch(bins: Seq[Bin])

/**
* Optimize job which compacts small files into larger files to reduce
* the number of files and potentially allow more efficient reads.
Expand Down Expand Up @@ -453,12 +453,17 @@ class OptimizeExecutor(
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles, removedDVs)
commitAndRetry(txn, getOperation(), updates, metrics) { newTxn =>
val newPartitionSchema = newTxn.metadata.partitionSchema
val candidateSetOld = filesToProcess.map(_.path).toSet
// Note: When checking if the candidate set is the same, we need to consider (Path, DV)
// as the key.
val candidateSetOld = filesToProcess.
map(f => UniqueFileActionTuple(f.pathAsUri, f.getDeletionVectorUniqueId)).toSet

// We specifically don't list the files through the transaction since we are potentially
// only processing a subset of them below. If the transaction is still valid, we will
// register the files and predicate below
val candidateSetNew =
newTxn.snapshot.filesForScan(partitionPredicate).files.map(_.path).toSet
newTxn.snapshot.filesForScan(partitionPredicate).files
.map(f => UniqueFileActionTuple(f.pathAsUri, f.getDeletionVectorUniqueId)).toSet

// As long as all of the files that we compacted are still part of the table,
// and the partitioning has not changed it is valid to continue to try
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.delta.optimize

import java.io.File

import scala.concurrent.duration.Duration

import org.apache.spark.SparkException
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.concurrency.PhaseLockingTestMixin
import org.apache.spark.sql.delta.concurrency.TransactionExecutionTestMixin
import org.apache.spark.sql.delta.fuzzer.{OptimisticTransactionPhases, PhaseLockingTransactionExecutionObserver}
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.util.ThreadUtils

class OptimizeConflictSuite extends QueryTest
with SharedSparkSession
with PhaseLockingTestMixin
with TransactionExecutionTestMixin
with DeltaSQLCommandTest {

protected def appendRows(dir: File, numRows: Int, numFiles: Int): Unit = {
spark.range(start = 0, end = numRows, step = 1, numPartitions = numFiles)
.write.format("delta").mode("append").save(dir.getAbsolutePath)
}

test("conflict handling between Optimize and Business Txn") {
withTempDir { tempDir =>

// Create table with 100 rows.
appendRows(tempDir, numRows = 100, numFiles = 10)

// Enable DVs.
sql(s"ALTER TABLE delta.`${tempDir.toString}` " +
"SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath)

def optimizeTxn(): Array[Row] = {
deltaTable.optimize().executeCompaction()
Array.empty
}

def deleteTxn(): Array[Row] = {
// Delete 50% of the rows.
sql(s"DELETE FROM delta.`${tempDir}` WHERE id%2 = 0").collect()
}

val Seq(future) = runFunctionsWithOrderingFromObserver(Seq(optimizeTxn)) {
case (optimizeObserver :: Nil) =>
// Create a replacement observer for the retry thread of Optimize.
val retryObserver = new PhaseLockingTransactionExecutionObserver(
OptimisticTransactionPhases.forName("test-replacement-txn"))

// Block Optimize during the first commit attempt.
optimizeObserver.setNextObserver(retryObserver, autoAdvance = true)
unblockUntilPreCommit(optimizeObserver)
busyWaitFor(optimizeObserver.phases.preparePhase.hasEntered, timeout)

// Delete starts and finishes
deleteTxn()

// Allow Optimize to resume.
unblockCommit(optimizeObserver)
busyWaitFor(optimizeObserver.phases.commitPhase.hasLeft, timeout)
optimizeObserver.phases.postCommitPhase.exitBarrier.unblock()

// The first txn will not commit as there was a conflict commit
// (deleteTxn). Optimize will attempt to auto resolve and retry
// Wait for the retry txn to finish.
// Resume the retry txn.
unblockAllPhases(retryObserver)
}
val e = intercept[SparkException] {
ThreadUtils.awaitResult(future, timeout)
}
// The retry txn should fail as the same files are modified(DVs added) by
// the delete txn.
assert(e.getCause.getMessage.contains("DELTA_CONCURRENT_DELETE_READ"))
assert(sql(s"SELECT * FROM delta.`${tempDir}`").count() == 50)
}
}
}

0 comments on commit 1bc3a8a

Please sign in to comment.