From 6862e271ef09ce583f06b31f31feb404d8a3c7a0 Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Tue, 21 May 2024 10:51:03 -0700 Subject: [PATCH] [Spark] Integrate ICT into Managed Commits (#3108) #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Resolves TODOs around ICT <-> MC integration. ## How was this patch tested? Updated existing tests. ## Does this PR introduce _any_ user-facing changes? No --- .../spark/sql/delta/OptimisticTransaction.scala | 17 ++++++++++------- ...tractBatchBackfillingCommitOwnerClient.scala | 3 ++- .../managedcommit/ManagedCommitSuite.scala | 11 +++++++++-- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 1a2256248e6..e8b8138fbd3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -2102,11 +2102,18 @@ trait OptimisticTransactionImpl extends TransactionalWrite val commitFileStatus = doCommit(logStore, hadoopConf, logPath, commitFile, commitVersion, actions) executionObserver.beginBackfill() - // TODO(managed-commits): Integrate with ICT and pass the correct commitTimestamp + val ictEnabled = updatedActions.getNewMetadata.getConfiguration.getOrElse( + DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key, "false") == "true" + val commitTimestamp = if (ictEnabled) { + // CommitInfo.getCommitTimestamp will return the inCommitTimestamp. + updatedActions.getCommitInfo.getCommitTimestamp + } else { + commitFileStatus.getModificationTime + } CommitResponse(Commit( commitVersion, fileStatus = commitFileStatus, - commitTimestamp = commitFileStatus.getModificationTime + commitTimestamp = commitTimestamp )) } @@ -2182,10 +2189,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite val commitResponse = TransactionExecutionObserver.withObserver(executionObserver) { tableCommitOwnerClient.commit(attemptVersion, jsonActions, updatedActions) } - // TODO(managed-commits): Use the right timestamp method on top of CommitInfo once ICT is - // merged. - val commitTimestamp = commitResponse.getCommit.getFileStatus.getModificationTime - val commitFile = commitResponse.getCommit.copy(commitTimestamp = commitTimestamp) if (attemptVersion == 0L) { val expectedPathForCommitZero = unsafeDeltaFile(deltaLog.logPath, version = 0L).toUri val actualCommitPath = commitResponse.getCommit.getFileStatus.getPath.toUri @@ -2194,7 +2197,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite s"$expectedPathForCommitZero but was written to $actualCommitPath") } } - commitFile + commitResponse.getCommit } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala index 9187f6b539f..8462c763adb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitOwnerClient.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.managedcommit import java.nio.file.FileAlreadyExistsException import java.util.UUID +import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.TransactionExecutionObserver import org.apache.spark.sql.delta.actions.CommitInfo import org.apache.spark.sql.delta.actions.Metadata @@ -87,7 +88,7 @@ trait AbstractBatchBackfillingCommitOwnerClient extends CommitOwnerClient with L logStore, hadoopConf, logPath, commitVersion, actions, generateUUID()) // Do the actual commit - val commitTimestamp = updatedActions.getCommitInfo.asInstanceOf[CommitInfo].getTimestamp + val commitTimestamp = updatedActions.getCommitInfo.getCommitTimestamp var commitResponse = commitImpl( logStore, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala index 64627b2071b..71f0fafac83 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer import com.databricks.spark.util.Log4jUsageLogger import org.apache.spark.sql.delta.{DeltaOperations, ManagedCommitTableFeature, V2CheckpointTableFeature} import org.apache.spark.sql.delta.CommitOwnerGetCommitsFailedException -import org.apache.spark.sql.delta.DeltaConfigs.{CHECKPOINT_INTERVAL, MANAGED_COMMIT_OWNER_CONF, MANAGED_COMMIT_OWNER_NAME, ROW_TRACKING_ENABLED} +import org.apache.spark.sql.delta.DeltaConfigs.{CHECKPOINT_INTERVAL, MANAGED_COMMIT_OWNER_CONF, MANAGED_COMMIT_OWNER_NAME, MANAGED_COMMIT_TABLE_CONF} import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.delta.DeltaTestUtils.createTestAddFile import org.apache.spark.sql.delta.InitialSnapshot @@ -774,7 +774,14 @@ class ManagedCommitSuite // Downgrade the table // [upgradeExistingTable = false] Commit-3 // [upgradeExistingTable = true] Commit-5 - val newMetadata2 = Metadata().copy(configuration = Map("downgraded_at" -> "v2")) + val commitOwnerConfKeys = Seq( + MANAGED_COMMIT_OWNER_NAME.key, + MANAGED_COMMIT_OWNER_CONF.key, + MANAGED_COMMIT_TABLE_CONF.key + ) + val newConfig = log.snapshot.metadata.configuration + .filterKeys(!commitOwnerConfKeys.contains(_)) ++ Map("downgraded_at" -> "v2") + val newMetadata2 = log.snapshot.metadata.copy(configuration = newConfig.toMap) log.startTransaction().commitManually(newMetadata2) assert(log.unsafeVolatileSnapshot.version === upgradeStartVersion + 3) assert(log.unsafeVolatileSnapshot.tableCommitOwnerClientOpt.isEmpty)