Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Replaces startTransaction(Snapshot) calls with catalogTable overload #2126

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class DeltaLog private(
* the new protocol version is not a superset of the original one used by the snapshot.
*/
def upgradeProtocol(
catalogTable: Option[CatalogTable],
snapshot: Snapshot,
newVersion: Protocol): Unit = {
val currentVersion = snapshot.protocol
Expand All @@ -277,7 +278,7 @@ class DeltaLog private(
throw new ProtocolDowngradeException(currentVersion, newVersion)
}

val txn = startTransaction(Some(snapshot))
val txn = startTransaction(catalogTable, Some(snapshot))
try {
SchemaMergingUtils.checkColumnNameDuplication(txn.metadata.schema, "in the table schema")
} catch {
Expand All @@ -288,11 +289,6 @@ class DeltaLog private(
logConsole(s"Upgraded table at $dataPath to $newVersion.")
}

// Test-only!!
private[delta] def upgradeProtocol(newVersion: Protocol): Unit = {
upgradeProtocol(unsafeVolatileSnapshot, newVersion)
}

/**
* Get all actions starting from "startVersion" (inclusive). If `startVersion` doesn't exist,
* return an empty Iterator.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ case class CreateDeltaTableCommand(
deltaLog: DeltaLog,
tableWithLocation: CatalogTable,
snapshotOpt: Option[Snapshot] = None): OptimisticTransaction = {
val txn = deltaLog.startTransaction(snapshotOpt)
val txn = deltaLog.startTransaction(None, snapshotOpt)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use a comment why we don't pass tableWithLocation. I believe the reason is, tableWithLocation is the proposed catalog table entry, which will be installed by this command but does not yet exist as-of the start of the transaction?


// During CREATE/REPLACE, we synchronously run conversion (if Uniform is enabled) so
// we always remove the post commit hook here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ case class RestoreTableCommand(
require(versionToRestore < latestVersion, s"Version to restore ($versionToRestore)" +
s"should be less then last available version ($latestVersion)")

deltaLog.withNewTransaction { txn =>
deltaLog.withNewTransaction(sourceTable.catalogTable) { txn =>
val latestSnapshot = txn.snapshot
val snapshotToRestore = deltaLog.getSnapshotAt(versionToRestore)
val latestSnapshotFiles = latestSnapshot.allFiles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ case class AlterTableDropFeatureDeltaCommand(
val log = table.deltaLog
val snapshot = log.update(checkIfUpdatedSinceTs = Some(snapshotRefreshStartTime))
val emptyCommitTS = System.nanoTime()
log.startTransaction(Some(snapshot)).commit(Nil, DeltaOperations.EmptyCommit)
log.startTransaction(table.catalogTable, Some(snapshot))
.commit(Nil, DeltaOperations.EmptyCommit)
log.checkpoint(log.update(checkIfUpdatedSinceTs = Some(emptyCommitTS)))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1549,7 +1549,8 @@ trait DeltaAlterTableByPathTests extends DeltaAlterTableTestBase {
override protected def createTable(schema: String, tblProperties: Map[String, String]): String = {
val tmpDir = Utils.createTempDir().getCanonicalPath
val (deltaLog, snapshot) = getDeltaLogWithSnapshot(tmpDir)
val txn = deltaLog.startTransaction(Some(snapshot))
// This is a path-based table so we don't need to pass the catalogTable here
val txn = deltaLog.startTransaction(None, Some(snapshot))
val metadata = Metadata(
schemaString = StructType.fromDDL(schema).json,
configuration = tblProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.rowtracking
import org.apache.spark.sql.delta.{DeltaLog, DeltaOperations, RowId, RowTrackingFeature}
import org.apache.spark.sql.delta.actions.{Action, AddFile}
import org.apache.spark.sql.delta.rowid.RowIdTestUtils
import org.apache.spark.sql.delta.test.DeltaTestImplicits._

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ object DeltaTestImplicits {
def enableExpiredLogCleanup(): Boolean = {
deltaLog.enableExpiredLogCleanup(snapshot.metadata)
}

def upgradeProtocol(newVersion: Protocol): Unit = {
upgradeProtocol(deltaLog.unsafeVolatileSnapshot, newVersion)
}

def upgradeProtocol(snapshot: Snapshot, newVersion: Protocol): Unit = {
deltaLog.upgradeProtocol(None, snapshot, newVersion)
}
}

implicit class DeltaTableV2ObjectTestHelper(dt: DeltaTableV2.type) {
Expand Down
Loading