From 6cdacab4649fcbf81582e061878b7fbd76d95e1b Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Tue, 5 Mar 2024 15:35:41 -0800 Subject: [PATCH] Use FileSystemCommitStore to simply code when managed commits is not enabled Use FileSystemCommitStore to simply code when managed commits is not enabled Closes https://github.com/delta-io/delta/pull/2721 GitOrigin-RevId: 5dbdb0ce0f357ba60e932000d83c31731915c24b --- .../sql/delta/OptimisticTransaction.scala | 145 +++++++++++------- .../spark/sql/delta/SnapshotManagement.scala | 41 ++++- .../AbstractBatchBackfillingCommitStore.scala | 43 +++--- .../sql/delta/managedcommit/CommitStore.scala | 22 +-- .../managedcommit/InMemoryCommitStore.scala | 26 ++-- .../sql/delta/SnapshotManagementSuite.scala | 1 + .../managedcommit/CommitStoreSuite.scala | 4 +- .../InMemoryCommitStoreSuite.scala | 100 ++++++------ .../managedcommit/ManagedCommitSuite.scala | 59 ++++++- .../ManagedCommitTestUtils.scala | 8 +- 10 files changed, 287 insertions(+), 162 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index b2aea75cf3b..8c01fbe5e01 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -35,12 +35,14 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.files._ import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, IcebergConverterHook, PostCommitHook, UpdateCatalogFactory} import org.apache.spark.sql.delta.implicits.addFileEncoder -import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitStore, UpdatedActions} +import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitResponse, CommitStore, UpdatedActions} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.stats._ +import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.util.ScalaExtensions._ +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.SparkException @@ -1309,27 +1311,16 @@ trait OptimisticTransactionImpl extends TransactionalWrite val fsWriteStartNano = System.nanoTime() val jsonActions = allActions.map(_.json) val hadoopConf = deltaLog.newDeltaHadoopConf() - val commitOpt = preCommitCommitStoreOpt match { - case Some(preCommitCommitStore) => - val commitResponse = preCommitCommitStore.commit( - deltaLog.store, - hadoopConf, - deltaLog.dataPath, - attemptVersion, - jsonActions, - UpdatedActions(commitInfo, newMetadata, newProtocolOpt)) - // TODO(managed-commits): Use the right timestamp method on top of CommitInfo once ICT is - // merged. - Some(commitResponse.commit) - case None => - deltaLog.store.write( - deltaFile(deltaLog.logPath, attemptVersion), - jsonActions, - overwrite = false, - hadoopConf) - None - } - + val commitStore = preCommitCommitStoreOpt.getOrElse(new FileSystemBasedCommitStore(deltaLog)) + val commitResponse = commitStore.commit( + deltaLog.store, + hadoopConf, + deltaLog.logPath, + attemptVersion, + jsonActions, + UpdatedActions(commitInfo, newMetadata, newProtocolOpt)) + // TODO(managed-commits): Use the right timestamp method on top of CommitInfo once ICT is + // merged. acStatsCollector.finalizeStats(deltaLog.tableId) spark.sessionState.conf.setConf( DeltaSQLConf.DELTA_LAST_COMMIT_VERSION_IN_SESSION, @@ -1339,7 +1330,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite // NOTE: commitLarge cannot run postCommitHooks (such as the CheckpointHook). // Instead, manually run any necessary actions in updateAndCheckpoint. val postCommitSnapshot = updateAndCheckpoint( - spark, deltaLog, commitSize, attemptVersion, commitOpt, txnId) + spark, deltaLog, commitSize, attemptVersion, commitResponse.commit, txnId) val postCommitReconstructionTime = System.nanoTime() var stats = CommitStats( startVersion = readVersion, @@ -1408,9 +1399,14 @@ trait OptimisticTransactionImpl extends TransactionalWrite deltaLog: DeltaLog, commitSize: Int, attemptVersion: Long, - commitOpt: Option[Commit], + commit: Commit, txnId: String): Snapshot = { - val currentSnapshot = deltaLog.update() + + val currentSnapshot = deltaLog.updateAfterCommit( + attemptVersion, + commit, + newChecksumOpt = None, + preCommitLogSegment = preCommitLogSegment) if (currentSnapshot.version != attemptVersion) { throw DeltaErrors.invalidCommittedVersion(attemptVersion, currentSnapshot.version) } @@ -1800,7 +1796,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite val fsWriteStartNano = System.nanoTime() val jsonActions = actions.map(_.json) - val (newChecksumOpt, commitOpt) = + val (newChecksumOpt, commit) = writeCommitFile(attemptVersion, jsonActions.toIterator, currentTransactionInfo) spark.sessionState.conf.setConf( @@ -1811,6 +1807,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite val postCommitSnapshot = deltaLog.updateAfterCommit( attemptVersion, + commit, newChecksumOpt, preCommitLogSegment ) @@ -1887,6 +1884,41 @@ trait OptimisticTransactionImpl extends TransactionalWrite postCommitSnapshot } + class FileSystemBasedCommitStore(deltaLog: DeltaLog) extends CommitStore { + override def commit( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + commitVersion: Long, + actions: Iterator[String], + updatedActions: UpdatedActions): CommitResponse = { + val commitFile = util.FileNames.deltaFile(logPath, commitVersion) + val commitFileStatus = + doCommit(logStore, hadoopConf, logPath, commitFile, commitVersion, actions) + // TODO(managed-commits): Integrate with ICT and pass the correct commitTimestamp + CommitResponse(Commit( + commitVersion, + fileStatus = commitFileStatus, + commitTimestamp = commitFileStatus.getModificationTime + )) + } + + protected def doCommit( + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + commitFile: Path, + commitVersion: Long, + actions: Iterator[String]): FileStatus = { + logStore.write(commitFile, actions, overwrite = false, hadoopConf) + logPath.getFileSystem(hadoopConf).getFileStatus(commitFile) + } + + + override def getCommits( + tablePath: Path, startVersion: Long, endVersion: Option[Long]): Seq[Commit] = Seq.empty + } + /** * Writes the json actions provided to the commit file corresponding to attemptVersion. * If managed-commits are enabled, this method must return a non-empty [[Commit]] @@ -1896,35 +1928,39 @@ trait OptimisticTransactionImpl extends TransactionalWrite attemptVersion: Long, jsonActions: Iterator[String], currentTransactionInfo: CurrentTransactionInfo) - : (Option[VersionChecksum], Option[Commit]) = preCommitCommitStoreOpt match { - case Some(preCommitCommitStore) => - // managed-commit - val updatedActions = currentTransactionInfo.getUpdateActions() - val commitResponse = preCommitCommitStore.commit( - deltaLog.store, - deltaLog.newDeltaHadoopConf(), - deltaLog.dataPath, - attemptVersion, - jsonActions, - updatedActions) - if (attemptVersion == 0L) { - val expectedPathForCommitZero = deltaFile(deltaLog.logPath, version = 0L).toUri - val actualCommitPath = commitResponse.commit.fileStatus.getPath.toUri - if (actualCommitPath != expectedPathForCommitZero) { - throw new IllegalStateException("Expected 0th commit to be written to " + - s"$expectedPathForCommitZero but was written to $actualCommitPath") - } + : (Option[VersionChecksum], Commit) = { + val commitStore = preCommitCommitStoreOpt.getOrElse(new FileSystemBasedCommitStore(deltaLog)) + val commitFile = + writeCommitFileImpl(attemptVersion, jsonActions, commitStore, currentTransactionInfo) + (None, commitFile) + } + + protected def writeCommitFileImpl( + attemptVersion: Long, + jsonActions: Iterator[String], + commitStore: CommitStore, + currentTransactionInfo: CurrentTransactionInfo + ): Commit = { + val commitResponse = commitStore.commit( + deltaLog.store, + deltaLog.newDeltaHadoopConf(), + deltaLog.logPath, + attemptVersion, + jsonActions, + currentTransactionInfo.getUpdateActions()) + // TODO(managed-commits): Use the right timestamp method on top of CommitInfo once ICT is + // merged. + val commitTimestamp = commitResponse.commit.fileStatus.getModificationTime + val commitFile = commitResponse.commit.copy(commitTimestamp = commitTimestamp) + if (attemptVersion == 0L) { + val expectedPathForCommitZero = deltaFile(deltaLog.logPath, version = 0L).toUri + val actualCommitPath = commitResponse.commit.fileStatus.getPath.toUri + if (actualCommitPath != expectedPathForCommitZero) { + throw new IllegalStateException("Expected 0th commit to be written to " + + s"$expectedPathForCommitZero but was written to $actualCommitPath") } - (None, Some(commitResponse.commit)) - case None => - // filesystem based commit - deltaLog.store.write( - deltaFile(deltaLog.logPath, attemptVersion), - jsonActions, - overwrite = false, - deltaLog.newDeltaHadoopConf()) - // No VersionChecksum and commitInfo available yet - (None, None) + } + commitFile } /** @@ -2126,5 +2162,4 @@ trait OptimisticTransactionImpl extends TransactionalWrite case _ => } } - } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index ec65943e35f..721a183b0b3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -166,9 +166,10 @@ trait SnapshotManagement { self: DeltaLog => // TODO(managed-commits): Make sure all usage of `listDeltaCompactedDeltaAndCheckpointFiles` // are replaced with this method. val resultFromCommitStore = recordFrameProfile("DeltaLog", "CommitStore.getCommits") { - commitStoreOpt - .map(_.getCommits(dataPath, startVersion, endVersion = versionToLoad)) - .getOrElse(Seq.empty) + commitStoreOpt match { + case Some(cs) => cs.getCommits(logPath, startVersion, endVersion = versionToLoad) + case None => Seq.empty + } } var maxDeltaVersionSeen = startVersion - 1 @@ -657,7 +658,33 @@ trait SnapshotManagement { self: DeltaLog => } } - /** Used to compute the LogSegment after a commit */ + /** + * Used to compute the LogSegment after a commit, by adding the delta file with the specified + * version to the preCommitLogSegment (which must match the immediately preceding version). + */ + protected[delta] def getLogSegmentAfterCommit( + committedVersion: Long, + newChecksumOpt: Option[VersionChecksum], + preCommitLogSegment: LogSegment, + commit: Commit, + commitStoreOpt: Option[CommitStore], + oldCheckpointProvider: CheckpointProvider): LogSegment = recordFrameProfile( + "Delta", "SnapshotManagement.getLogSegmentAfterCommit") { + // If the table doesn't have any competing updates, then go ahead and use the optimized + // incremental logSegment computation to fetch the LogSegment for the committedVersion. + // See the comment in the getLogSegmentAfterCommit overload for why we can't always safely + // return the committedVersion's snapshot when there is contention. + val useFastSnapshotConstruction = !snapshotLock.hasQueuedThreads + if (useFastSnapshotConstruction) { + SnapshotManagement.appendCommitToLogSegment( + preCommitLogSegment, commit.fileStatus, committedVersion) + } else { + val latestCheckpointProvider = + Seq(preCommitLogSegment.checkpointProvider, oldCheckpointProvider).maxBy(_.version) + getLogSegmentAfterCommit(commitStoreOpt, latestCheckpointProvider) + } + } + protected[delta] def getLogSegmentAfterCommit( commitStoreOpt: Option[CommitStore], oldCheckpointProvider: UninitializedCheckpointProvider): LogSegment = { @@ -958,12 +985,14 @@ trait SnapshotManagement { self: DeltaLog => * Called after committing a transaction and updating the state of the table. * * @param committedVersion the version that was committed + * @param commit information about the commit file. * @param newChecksumOpt the checksum for the new commit, if available. * Usually None, since the commit would have just finished. * @param preCommitLogSegment the log segment of the table prior to commit */ def updateAfterCommit( committedVersion: Long, + commit: Commit, newChecksumOpt: Option[VersionChecksum], preCommitLogSegment: LogSegment): Snapshot = withSnapshotLockInterruptibly { recordDeltaOperation(this, "delta.log.updateAfterCommit") { @@ -972,6 +1001,10 @@ trait SnapshotManagement { self: DeltaLog => // Somebody else could have already updated the snapshot while we waited for the lock if (committedVersion <= previousSnapshot.version) return previousSnapshot val segment = getLogSegmentAfterCommit( + committedVersion, + newChecksumOpt, + preCommitLogSegment, + commit, previousSnapshot.commitStoreOpt, previousSnapshot.checkpointProvider) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala index 9bac1ff62ef..20184d39a92 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/AbstractBatchBackfillingCommitStore.scala @@ -40,13 +40,13 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging { val batchSize: Long /** - * Commit a given `commitFile` to the table represented by given `tablePath` at the + * Commit a given `commitFile` to the table represented by given `logPath` at the * given `commitVersion` */ protected def commitImpl( logStore: LogStore, hadoopConf: Configuration, - tablePath: Path, + logPath: Path, commitVersion: Long, commitFile: FileStatus, commitTimestamp: Long): CommitResponse @@ -54,40 +54,40 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging { override def commit( logStore: LogStore, hadoopConf: Configuration, - tablePath: Path, + logPath: Path, commitVersion: Long, actions: Iterator[String], updatedActions: UpdatedActions): CommitResponse = { - + val tablePath = getTablePath(logPath) logInfo(s"Attempting to commit version $commitVersion on table $tablePath") - val fs = tablePath.getFileSystem(hadoopConf) + val fs = logPath.getFileSystem(hadoopConf) if (batchSize <= 1) { // Backfill until `commitVersion - 1` logInfo(s"Making sure commits are backfilled until $commitVersion version for" + s" table ${tablePath.toString}") - backfillToVersion(logStore, hadoopConf, tablePath) + backfillToVersion(logStore, hadoopConf, logPath) } // Write new commit file in _commits directory - val fileStatus = writeCommitFile(logStore, hadoopConf, tablePath, commitVersion, actions) + val fileStatus = writeCommitFile(logStore, hadoopConf, logPath, commitVersion, actions) // Do the actual commit val commitTimestamp = updatedActions.commitInfo.getTimestamp var commitResponse = - commitImpl(logStore, hadoopConf, tablePath, commitVersion, fileStatus, commitTimestamp) + commitImpl(logStore, hadoopConf, logPath, commitVersion, fileStatus, commitTimestamp) // Backfill if needed if (commitVersion == 0 || batchSize <= 1) { // Always backfill zeroth commit or when batch size is configured as 1 - backfill(logStore, hadoopConf, tablePath, commitVersion, fileStatus) - val targetFile = FileNames.deltaFile(logPath(tablePath), commitVersion) + backfill(logStore, hadoopConf, logPath, commitVersion, fileStatus) + val targetFile = FileNames.deltaFile(logPath, commitVersion) val targetFileStatus = fs.getFileStatus(targetFile) val newCommit = commitResponse.commit.copy(fileStatus = targetFileStatus) commitResponse = commitResponse.copy(commit = newCommit) } else if (commitVersion % batchSize == 0) { logInfo(s"Making sure commits are backfilled till $commitVersion version for" + s"table ${tablePath.toString}") - backfillToVersion(logStore, hadoopConf, tablePath) + backfillToVersion(logStore, hadoopConf, logPath) } logInfo(s"Commit $commitVersion done successfully on table $tablePath") commitResponse @@ -96,11 +96,11 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging { protected def writeCommitFile( logStore: LogStore, hadoopConf: Configuration, - tablePath: Path, + logPath: Path, commitVersion: Long, actions: Iterator[String]): FileStatus = { val uuidStr = generateUUID() - val commitPath = FileNames.uuidDeltaFile(logPath(tablePath), commitVersion, Some(uuidStr)) + val commitPath = FileNames.uuidDeltaFile(logPath, commitVersion, Some(uuidStr)) logStore.write(commitPath, actions, overwrite = false, hadoopConf) commitPath.getFileSystem(hadoopConf).getFileStatus(commitPath) } @@ -111,10 +111,9 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging { protected def backfillToVersion( logStore: LogStore, hadoopConf: Configuration, - tablePath: Path): Unit = { - getCommits(tablePath, startVersion = 0).foreach { case commit => - val fileStatus = commit.fileStatus - backfill(logStore, hadoopConf, tablePath, commit.version, fileStatus) + logPath: Path): Unit = { + getCommits(logPath, startVersion = 0).foreach { commit => + backfill(logStore, hadoopConf, logPath, commit.version, commit.fileStatus) } } @@ -122,10 +121,10 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging { protected def backfill( logStore: LogStore, hadoopConf: Configuration, - tablePath: Path, + logPath: Path, version: Long, fileStatus: FileStatus): Unit = { - val targetFile = FileNames.deltaFile(logPath(tablePath), version) + val targetFile = FileNames.deltaFile(logPath, version) logInfo(s"Backfilling commit ${fileStatus.getPath} to ${targetFile.toString}") val commitContentIterator = logStore.readAsIterator(fileStatus, hadoopConf) try { @@ -134,7 +133,7 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging { commitContentIterator, overwrite = false, hadoopConf) - registerBackfill(tablePath, version) + registerBackfill(logPath, version) } catch { case _: FileAlreadyExistsException => logInfo(s"The backfilled file $targetFile already exists.") @@ -145,8 +144,8 @@ trait AbstractBatchBackfillingCommitStore extends CommitStore with Logging { /** Callback to tell the CommitStore that all commits <= `backfilledVersion` are backfilled. */ protected[delta] def registerBackfill( - tablePath: Path, + logPath: Path, backfilledVersion: Long): Unit - protected def logPath(tablePath: Path): Path = new Path(tablePath, DeltaLog.LOG_DIR_NAME) + protected def getTablePath(logPath: Path): Path = logPath.getParent } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala index 51b4abad261..1479cc38cf7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/CommitStore.scala @@ -60,22 +60,22 @@ case class UpdatedActions( */ trait CommitStore { /** - * API to commit the given set of `actions` to the table represented by given `tablePath` at the + * API to commit the given set of `actions` to the table represented by given `logPath` at the * given `commitVersion`. * @return CommitResponse which contains the file status of the commit file. If the commit is * already backfilled, then the fileStatus could be omitted from response and the client * could get the info by themselves. */ def commit( - logStore: LogStore, - hadoopConf: Configuration, - tablePath: Path, - commitVersion: Long, - actions: Iterator[String], - updatedActions: UpdatedActions): CommitResponse + logStore: LogStore, + hadoopConf: Configuration, + logPath: Path, + commitVersion: Long, + actions: Iterator[String], + updatedActions: UpdatedActions): CommitResponse /** - * API to get the un-backfilled commits for the table represented by the given `tablePath`. + * API to get the un-backfilled commits for the table represented by the given `logPath`. * Commits older than `startVersion`, or newer than `endVersion` (if given), are ignored. The * returned commits are contiguous and in ascending version order. * Note that the first version returned by this API may not be equal to the `startVersion`. This @@ -85,9 +85,9 @@ trait CommitStore { * @return a sequence of [[Commit]] which are tracked by [[CommitStore]]. */ def getCommits( - tablePath: Path, - startVersion: Long, - endVersion: Option[Long] = None): Seq[Commit] + logPath: Path, + startVersion: Long, + endVersion: Option[Long] = None): Seq[Commit] } /** A builder interface for CommitStore */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala index 59bec7f6e6f..613adaf24c9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStore.scala @@ -38,9 +38,9 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC private[managedcommit] val perTableMap = new ConcurrentHashMap[Path, PerTableData]() - private[managedcommit] def withWriteLock[T](tablePath: Path)(operation: => T): T = { + private[managedcommit] def withWriteLock[T](logPath: Path)(operation: => T): T = { val lock = perTableMap - .computeIfAbsent(tablePath, _ => new PerTableData()) // computeIfAbsent is atomic + .computeIfAbsent(logPath, _ => new PerTableData()) // computeIfAbsent is atomic .lock .writeLock() lock.lock() @@ -51,9 +51,9 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC } } - private[managedcommit] def withReadLock[T](tablePath: Path)(operation: => T): T = { + private[managedcommit] def withReadLock[T](logPath: Path)(operation: => T): T = { val lock = perTableMap - .computeIfAbsent(tablePath, _ => new PerTableData()) // computeIfAbsent is atomic + .computeIfAbsent(logPath, _ => new PerTableData()) // computeIfAbsent is atomic .lock .readLock() lock.lock() @@ -74,12 +74,12 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC protected def commitImpl( logStore: LogStore, hadoopConf: Configuration, - tablePath: Path, + logPath: Path, commitVersion: Long, commitFile: FileStatus, commitTimestamp: Long): CommitResponse = { - withWriteLock[CommitResponse](tablePath) { - val tableData = perTableMap.get(tablePath) + withWriteLock[CommitResponse](logPath) { + val tableData = perTableMap.get(logPath) val expectedVersion = tableData.maxCommitVersion + 1 if (commitVersion != expectedVersion) { throw new CommitFailedException( @@ -98,11 +98,11 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC } override def getCommits( - tablePath: Path, + logPath: Path, startVersion: Long, endVersion: Option[Long]): Seq[Commit] = { - withReadLock[Seq[Commit]](tablePath) { - val tableData = perTableMap.get(tablePath) + withReadLock[Seq[Commit]](logPath) { + val tableData = perTableMap.get(logPath) // Calculate the end version for the range, or use the last key if endVersion is not provided val effectiveEndVersion = endVersion.getOrElse(tableData.commitsMap.lastOption.map(_._1).getOrElse(startVersion)) @@ -112,10 +112,10 @@ class InMemoryCommitStore(val batchSize: Long) extends AbstractBatchBackfillingC } override protected[delta] def registerBackfill( - tablePath: Path, + logPath: Path, backfilledVersion: Long): Unit = { - withWriteLock(tablePath) { - val tableData = perTableMap.get(tablePath) + withWriteLock(logPath) { + val tableData = perTableMap.get(logPath) if (backfilledVersion > tableData.maxCommitVersion) { throw new IllegalArgumentException( s"Unexpected backfill version: $backfilledVersion. " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index e1acb1ddc52..17ee49433e3 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta import java.io.{File, FileNotFoundException, RandomAccessFile} import java.util.concurrent.ExecutionException +import org.apache.spark.sql.delta.managedcommit.Commit import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala index 6151b90c5e7..9f14cf2ae44 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/CommitStoreSuite.scala @@ -34,7 +34,7 @@ class CommitStoreSuite extends QueryTest with DeltaSQLTestUtils with SharedSpark override def commit( logStore: LogStore, hadoopConf: Configuration, - tablePath: Path, + logPath: Path, commitVersion: Long, actions: Iterator[String], updatedActions: UpdatedActions): CommitResponse = { @@ -42,7 +42,7 @@ class CommitStoreSuite extends QueryTest with DeltaSQLTestUtils with SharedSpark } override def getCommits( - tablePath: Path, + logPath: Path, startVersion: Long, endVersion: Option[Long] = None): Seq[Commit] = Seq.empty } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStoreSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStoreSuite.scala index 2a528990743..75eea849151 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStoreSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/InMemoryCommitStoreSuite.scala @@ -57,12 +57,12 @@ class InMemoryCommitStoreSuite extends QueryTest version: Long, timestamp: Long, cs: CommitStore, - tablePath: Path): Commit = { + logPath: Path): Commit = { val commitInfo = CommitInfo.empty(version = Some(version)).withTimestamp(timestamp) cs.commit( store, sessionHadoopConf, - tablePath, + logPath, version, Iterator(s"$version", s"$timestamp"), UpdatedActions(commitInfo, None, None)).commit @@ -70,9 +70,8 @@ class InMemoryCommitStoreSuite extends QueryTest private def assertBackfilled( version: Long, - tablePath: Path, + logPath: Path, timestampOpt: Option[Long] = None): Unit = { - val logPath = new Path(tablePath, DeltaLog.LOG_DIR_NAME) val delta = FileNames.deltaFile(logPath, version) if (timestampOpt.isDefined) { assert(store.read(delta, sessionHadoopConf) == Seq(s"$version", s"${timestampOpt.get}")) @@ -97,11 +96,11 @@ class InMemoryCommitStoreSuite extends QueryTest } private def assertInvariants( - tablePath: Path, + logPath: Path, cs: InMemoryCommitStore, commitTimestampsOpt: Option[Array[Long]] = None): Unit = { - val maxUntrackedVersion: Int = cs.withReadLock[Int](tablePath) { - val tableData = cs.perTableMap.get(tablePath) + val maxUntrackedVersion: Int = cs.withReadLock[Int](logPath) { + val tableData = cs.perTableMap.get(logPath) if (tableData.commitsMap.isEmpty) { tableData.maxCommitVersion.toInt } else { @@ -117,7 +116,7 @@ class InMemoryCommitStoreSuite extends QueryTest } } (0 to maxUntrackedVersion).foreach { version => - assertBackfilled(version, tablePath, commitTimestampsOpt.map(_(version)))} + assertBackfilled(version, logPath, commitTimestampsOpt.map(_(version)))} } test("in-memory-commit-store-builder works as expected") { @@ -146,97 +145,101 @@ class InMemoryCommitStoreSuite extends QueryTest test("test basic commit and backfill functionality") { withTempTableDir { tempDir => val tablePath = new Path(tempDir.getCanonicalPath) + val logPath = new Path(tablePath, DeltaLog.LOG_DIR_NAME) val cs = InMemoryCommitStoreBuilder(batchSize = 3).build(Map.empty) // Commit 0 is always immediately backfilled - val c0 = commit(0, 0, cs, tablePath) - assert(cs.getCommits(tablePath, 0) == Seq.empty) - assertBackfilled(0, tablePath, Some(0)) + val c0 = commit(0, 0, cs, logPath) + assert(cs.getCommits(logPath, 0) == Seq.empty) + assertBackfilled(0, logPath, Some(0)) - val c1 = commit(1, 1, cs, tablePath) - val c2 = commit(2, 2, cs, tablePath) - assert(cs.getCommits(tablePath, 0).takeRight(2) == Seq(c1, c2)) + val c1 = commit(1, 1, cs, logPath) + val c2 = commit(2, 2, cs, logPath) + assert(cs.getCommits(logPath, 0).takeRight(2) == Seq(c1, c2)) // All 3 commits are backfilled since batchSize == 3 - val c3 = commit(3, 3, cs, tablePath) - assert(cs.getCommits(tablePath, 0) == Seq.empty) - (1 to 3).foreach(i => assertBackfilled(i, tablePath, Some(i))) + val c3 = commit(3, 3, cs, logPath) + assert(cs.getCommits(logPath, 0) == Seq.empty) + (1 to 3).foreach(i => assertBackfilled(i, logPath, Some(i))) // Test that startVersion and endVersion are respected in getCommits - val c4 = commit(4, 4, cs, tablePath) - val c5 = commit(5, 5, cs, tablePath) - assert(cs.getCommits(tablePath, 4) == Seq(c4, c5)) - assert(cs.getCommits(tablePath, 4, Some(4)) == Seq(c4)) - assert(cs.getCommits(tablePath, 5) == Seq(c5)) + val c4 = commit(4, 4, cs, logPath) + val c5 = commit(5, 5, cs, logPath) + assert(cs.getCommits(logPath, 4) == Seq(c4, c5)) + assert(cs.getCommits(logPath, 4, Some(4)) == Seq(c4)) + assert(cs.getCommits(logPath, 5) == Seq(c5)) // Commit [4, 6] are backfilled since batchSize == 3 - val c6 = commit(6, 6, cs, tablePath) - assert(cs.getCommits(tablePath, 0) == Seq.empty) - (4 to 6).foreach(i => assertBackfilled(i, tablePath, Some(i))) - assertInvariants(tablePath, cs.asInstanceOf[InMemoryCommitStore]) + val c6 = commit(6, 6, cs, logPath) + assert(cs.getCommits(logPath, 0) == Seq.empty) + (4 to 6).foreach(i => assertBackfilled(i, logPath, Some(i))) + assertInvariants(logPath, cs.asInstanceOf[InMemoryCommitStore]) } } test("test basic commit and backfill functionality with 1 batch size") { withTempTableDir { tempDir => val tablePath = new Path(tempDir.getCanonicalPath) + val logPath = new Path(tablePath, DeltaLog.LOG_DIR_NAME) val cs = InMemoryCommitStoreBuilder(batchSize = 1).build(Map.empty) // Test that all commits are immediately backfilled (0 to 3).foreach { version => - commit(version, version, cs, tablePath) - assert(cs.getCommits(tablePath, 0) == Seq.empty) - assertBackfilled(version, tablePath, Some(version)) + commit(version, version, cs, logPath) + assert(cs.getCommits(logPath, 0) == Seq.empty) + assertBackfilled(version, logPath, Some(version)) } // Test that out-of-order backfill is rejected intercept[IllegalArgumentException] { cs.asInstanceOf[InMemoryCommitStore] - .registerBackfill(tablePath, 5) + .registerBackfill(logPath, 5) } - assertInvariants(tablePath, cs.asInstanceOf[InMemoryCommitStore]) + assertInvariants(logPath, cs.asInstanceOf[InMemoryCommitStore]) } } test("test out-of-order commits are rejected") { withTempTableDir { tempDir => val tablePath = new Path(tempDir.getCanonicalPath) + val logPath = new Path(tablePath, DeltaLog.LOG_DIR_NAME) val cs = InMemoryCommitStoreBuilder(batchSize = 5).build(Map.empty) // Anything other than version-0 should be rejected as the first commit - assertCommitFail(1, 0, retryable = false, commit(1, 0, cs, tablePath)) + assertCommitFail(1, 0, retryable = false, commit(1, 0, cs, logPath)) // Verify that conflict-checker rejects out-of-order commits. - (0 to 4).foreach(i => commit(i, i, cs, tablePath)) - assertCommitFail(0, 5, retryable = true, commit(0, 5, cs, tablePath)) - assertCommitFail(4, 5, retryable = true, commit(4, 6, cs, tablePath)) + (0 to 4).foreach(i => commit(i, i, cs, logPath)) + assertCommitFail(0, 5, retryable = true, commit(0, 5, cs, logPath)) + assertCommitFail(4, 5, retryable = true, commit(4, 6, cs, logPath)) // Verify that the conflict-checker still works even when everything has been backfilled - commit(5, 5, cs, tablePath) - assert(cs.getCommits(tablePath, 0) == Seq.empty) - assertCommitFail(5, 6, retryable = true, commit(5, 5, cs, tablePath)) - assertCommitFail(7, 6, retryable = false, commit(7, 7, cs, tablePath)) + commit(5, 5, cs, logPath) + assert(cs.getCommits(logPath, 0) == Seq.empty) + assertCommitFail(5, 6, retryable = true, commit(5, 5, cs, logPath)) + assertCommitFail(7, 6, retryable = false, commit(7, 7, cs, logPath)) - assertInvariants(tablePath, cs.asInstanceOf[InMemoryCommitStore]) + assertInvariants(logPath, cs.asInstanceOf[InMemoryCommitStore]) } } test("test out-of-order backfills are rejected") { withTempTableDir { tempDir => val tablePath = new Path(tempDir.getCanonicalPath) + val logPath = new Path(tablePath, DeltaLog.LOG_DIR_NAME) val cs = InMemoryCommitStoreBuilder(batchSize = 5).build(Map.empty) intercept[IllegalArgumentException] { - cs.asInstanceOf[InMemoryCommitStore].registerBackfill(tablePath, 0) + cs.asInstanceOf[InMemoryCommitStore].registerBackfill(logPath, 0) } - (0 to 3).foreach(i => commit(i, i, cs, tablePath)) + (0 to 3).foreach(i => commit(i, i, cs, logPath)) // Test that backfilling is idempotent for already-backfilled commits. - cs.asInstanceOf[InMemoryCommitStore].registerBackfill(tablePath, 2) - cs.asInstanceOf[InMemoryCommitStore].registerBackfill(tablePath, 2) + cs.asInstanceOf[InMemoryCommitStore].registerBackfill(logPath, 2) + cs.asInstanceOf[InMemoryCommitStore].registerBackfill(logPath, 2) // Test that backfilling uncommited commits fail. intercept[IllegalArgumentException] { - cs.asInstanceOf[InMemoryCommitStore].registerBackfill(tablePath, 4) + cs.asInstanceOf[InMemoryCommitStore].registerBackfill(logPath, 4) } } } @@ -244,6 +247,7 @@ class InMemoryCommitStoreSuite extends QueryTest test("should handle concurrent readers and writers") { withTempTableDir { tempDir => val tablePath = new Path(tempDir.getCanonicalPath) + val logPath = new Path(tablePath, DeltaLog.LOG_DIR_NAME) val batchSize = 6 val cs = InMemoryCommitStoreBuilder(batchSize).build(Map.empty) @@ -267,12 +271,12 @@ class InMemoryCommitStoreSuite extends QueryTest while (currentWriterCommits < numberOfCommitsPerWriter) { val nextVersion = cs - .getCommits(tablePath, 0) + .getCommits(logPath, 0) .lastOption.map(_.version + 1) .getOrElse(actualCommits.get().toLong) try { val currentTimestamp = runningTimestamp.getAndIncrement() - val commitResponse = commit(nextVersion, currentTimestamp, cs, tablePath) + val commitResponse = commit(nextVersion, currentTimestamp, cs, logPath) currentWriterCommits += 1 actualCommits.getAndIncrement() assert(commitResponse.commitTimestamp == currentTimestamp) @@ -285,7 +289,7 @@ class InMemoryCommitStoreSuite extends QueryTest commitFailedExceptions.getAndIncrement() } finally { assertInvariants( - tablePath, + logPath, cs.asInstanceOf[InMemoryCommitStore], Some(commitTimestamp)) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala index 8b1ae5c6808..b49cf371fac 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitSuite.scala @@ -63,16 +63,16 @@ class ManagedCommitSuite override def commit( logStore: LogStore, hadoopConf: Configuration, - tablePath: Path, + logPath: Path, commitVersion: Long, actions: Iterator[String], updatedActions: UpdatedActions): CommitResponse = { val uuidFile = - FileNames.uuidDeltaFile(new Path(tablePath, "_delta_log"), commitVersion) + FileNames.uuidDeltaFile(logPath, commitVersion) logStore.write(uuidFile, actions, overwrite = false, hadoopConf) val uuidFileStatus = uuidFile.getFileSystem(hadoopConf).getFileStatus(uuidFile) val commitTime = uuidFileStatus.getModificationTime - commitImpl(logStore, hadoopConf, tablePath, commitVersion, uuidFileStatus, commitTime) + commitImpl(logStore, hadoopConf, logPath, commitVersion, uuidFileStatus, commitTime) CommitResponse(Commit(commitVersion, uuidFileStatus, commitTime)) } @@ -115,4 +115,57 @@ class ManagedCommitSuite checkAnswer(sql(s"SELECT * FROM delta.`$tablePath`"), Seq(Row(2), Row(3))) } } + + testWithDifferentBackfillInterval("post commit snapshot creation") { backfillInterval => + withTempDir { tempDir => + val tablePath = tempDir.getAbsolutePath + + def getDeltasInPostCommitSnapshot(log: DeltaLog): Seq[String] = { + log + .unsafeVolatileSnapshot + .logSegment.deltas + .map(_.getPath.getName.replace("0000000000000000000", "")) + } + + // Commit 0 + Seq(1).toDF.write.format("delta").mode("overwrite").save(tablePath) + val log = DeltaLog.forTable(spark, tablePath) + assert(getDeltasInPostCommitSnapshot(log) === Seq("0.json")) + log.update() + assert(getDeltasInPostCommitSnapshot(log) === Seq("0.json")) + + // Commit 1 + Seq(2).toDF.write.format("delta").mode("append").save(tablePath) // version 1 + val commit1 = if (backfillInterval < 2) "1.json" else "1.uuid-1.json" + assert(getDeltasInPostCommitSnapshot(log) === Seq("0.json", commit1)) + log.update() + assert(getDeltasInPostCommitSnapshot(log) === Seq("0.json", commit1)) + + // Commit 2 + Seq(3).toDF.write.format("delta").mode("append").save(tablePath) // version 2 + val commit2 = if (backfillInterval < 2) "2.json" else "2.uuid-2.json" + assert(getDeltasInPostCommitSnapshot(log) === Seq("0.json", commit1, commit2)) + log.update() + if (backfillInterval <= 2) { + // backfill would have happened at commit 2. Next deltaLog.update will pickup the backfilled + // files. + assert(getDeltasInPostCommitSnapshot(log) === Seq("0.json", "1.json", "2.json")) + } else { + assert(getDeltasInPostCommitSnapshot(log) === + Seq("0.json", "1.uuid-1.json", "2.uuid-2.json")) + } + + // Commit 3 + Seq(4).toDF.write.format("delta").mode("append").save(tablePath) + val commit3 = if (backfillInterval < 2) "3.json" else "3.uuid-3.json" + if (backfillInterval <= 2) { + assert(getDeltasInPostCommitSnapshot(log) === Seq("0.json", "1.json", "2.json", commit3)) + } else { + assert(getDeltasInPostCommitSnapshot(log) === + Seq("0.json", "1.uuid-1.json", "2.uuid-2.json", commit3)) + } + + checkAnswer(sql(s"SELECT * FROM delta.`$tablePath`"), Seq(Row(1), Row(2), Row(3), Row(4))) + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala index 91970c008d8..92e4fafea4b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/managedcommit/ManagedCommitTestUtils.scala @@ -79,18 +79,18 @@ case class TrackingInMemoryCommitStore( override def commit( logStore: LogStore, hadoopConf: Configuration, - tablePath: Path, + logPath: Path, commitVersion: Long, actions: Iterator[String], updatedActions: UpdatedActions): CommitResponse = recordOperation("commit") { - super.commit(logStore, hadoopConf, tablePath, commitVersion, actions, updatedActions) + super.commit(logStore, hadoopConf, logPath, commitVersion, actions, updatedActions) } override def getCommits( - tablePath: Path, + logPath: Path, startVersion: Long, endVersion: Option[Long] = None): Seq[Commit] = recordOperation("getCommits") { - super.getCommits(tablePath, startVersion, endVersion) + super.getCommits(logPath, startVersion, endVersion) } var nextUuidSuffix = 0L