Skip to content

Commit

Permalink
[Spark] Replaces startTransaction(Snapshot) calls with catalogTable o…
Browse files Browse the repository at this point in the history
…verload

This PR replaces all calls to DeltaLog.startTransaction(Snapshot) with calls to DeltaLog.startTransaction(Option[CatalogTable], Snapshot). This PR is part of #2105 and a follow-up to #2125. It makes sure that transactions have a valid catalogTable attached to them so Uniform can correctly update the table in the catalog.

This is a small refactoring change so existing test coverage is sufficient.

No

Closes #2126

GitOrigin-RevId: d82787c64979a2dd4a363bf92a1640b7635ec02e
  • Loading branch information
LukasRupprecht authored and vkorukanti committed Oct 6, 2023
1 parent 5f9b98e commit 4e7cb5f
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class DeltaLog private(
* the new protocol version is not a superset of the original one used by the snapshot.
*/
def upgradeProtocol(
catalogTable: Option[CatalogTable],
snapshot: Snapshot,
newVersion: Protocol): Unit = {
val currentVersion = snapshot.protocol
Expand All @@ -277,7 +278,7 @@ class DeltaLog private(
throw new ProtocolDowngradeException(currentVersion, newVersion)
}

val txn = startTransaction(Some(snapshot))
val txn = startTransaction(catalogTable, Some(snapshot))
try {
SchemaMergingUtils.checkColumnNameDuplication(txn.metadata.schema, "in the table schema")
} catch {
Expand All @@ -288,11 +289,6 @@ class DeltaLog private(
logConsole(s"Upgraded table at $dataPath to $newVersion.")
}

// Test-only!!
private[delta] def upgradeProtocol(newVersion: Protocol): Unit = {
upgradeProtocol(unsafeVolatileSnapshot, newVersion)
}

/**
* Get all actions starting from "startVersion" (inclusive). If `startVersion` doesn't exist,
* return an empty Iterator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ case class CreateDeltaTableCommand(
deltaLog: DeltaLog,
tableWithLocation: CatalogTable,
snapshotOpt: Option[Snapshot] = None): OptimisticTransaction = {
val txn = deltaLog.startTransaction(snapshotOpt)
val txn = deltaLog.startTransaction(None, snapshotOpt)

// During CREATE/REPLACE, we synchronously run conversion (if Uniform is enabled) so
// we always remove the post commit hook here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ case class RestoreTableCommand(
require(versionToRestore < latestVersion, s"Version to restore ($versionToRestore)" +
s"should be less then last available version ($latestVersion)")

deltaLog.withNewTransaction { txn =>
deltaLog.withNewTransaction(sourceTable.catalogTable) { txn =>
val latestSnapshot = txn.snapshot
val snapshotToRestore = deltaLog.getSnapshotAt(versionToRestore)
val latestSnapshotFiles = latestSnapshot.allFiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ case class AlterTableDropFeatureDeltaCommand(
val log = table.deltaLog
val snapshot = log.update(checkIfUpdatedSinceTs = Some(snapshotRefreshStartTime))
val emptyCommitTS = System.nanoTime()
log.startTransaction(Some(snapshot)).commit(Nil, DeltaOperations.EmptyCommit)
log.startTransaction(table.catalogTable, Some(snapshot))
.commit(Nil, DeltaOperations.EmptyCommit)
log.checkpoint(log.update(checkIfUpdatedSinceTs = Some(emptyCommitTS)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,8 @@ trait DeltaAlterTableByPathTests extends DeltaAlterTableTestBase {
override protected def createTable(schema: String, tblProperties: Map[String, String]): String = {
val tmpDir = Utils.createTempDir().getCanonicalPath
val (deltaLog, snapshot) = getDeltaLogWithSnapshot(tmpDir)
val txn = deltaLog.startTransaction(Some(snapshot))
// This is a path-based table so we don't need to pass the catalogTable here
val txn = deltaLog.startTransaction(None, Some(snapshot))
val metadata = Metadata(
schemaString = StructType.fromDDL(schema).json,
configuration = tblProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.rowtracking
import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, RowId, RowTrackingFeature}
import org.apache.spark.sql.delta.actions.{Action, AddFile}
import org.apache.spark.sql.delta.rowid.RowIdTestUtils
import org.apache.spark.sql.delta.test.DeltaTestImplicits._

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ object DeltaTestImplicits {
def enableExpiredLogCleanup(): Boolean = {
deltaLog.enableExpiredLogCleanup(snapshot.metadata)
}

def upgradeProtocol(newVersion: Protocol): Unit = {
upgradeProtocol(deltaLog.unsafeVolatileSnapshot, newVersion)
}

def upgradeProtocol(snapshot: Snapshot, newVersion: Protocol): Unit = {
deltaLog.upgradeProtocol(None, snapshot, newVersion)
}
}

implicit class DeltaTableV2ObjectTestHelper(dt: DeltaTableV2.type) {
Expand Down

0 comments on commit 4e7cb5f

Please sign in to comment.