Skip to content

Commit

Permalink
Use FileSystemCommitStore to simply code when managed commits is not …
Browse files Browse the repository at this point in the history
…enabled

Use FileSystemCommitStore to simply code when managed commits is not enabled

Closes #2721

GitOrigin-RevId: 5dbdb0ce0f357ba60e932000d83c31731915c24b
  • Loading branch information
prakharjain09 authored and allisonport-db committed Mar 7, 2024
1 parent 06d87d9 commit 6cdacab
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 162 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.files._
import org.apache.spark.sql.delta.hooks.{CheckpointHook, GenerateSymlinkManifest, IcebergConverterHook, PostCommitHook, UpdateCatalogFactory}
import org.apache.spark.sql.delta.implicits.addFileEncoder
import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitStore, UpdatedActions}
import org.apache.spark.sql.delta.managedcommit.{Commit, CommitFailedException, CommitResponse, CommitStore, UpdatedActions}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats._
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.util.ScalaExtensions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.SparkException
Expand Down Expand Up @@ -1309,27 +1311,16 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val fsWriteStartNano = System.nanoTime()
val jsonActions = allActions.map(_.json)
val hadoopConf = deltaLog.newDeltaHadoopConf()
val commitOpt = preCommitCommitStoreOpt match {
case Some(preCommitCommitStore) =>
val commitResponse = preCommitCommitStore.commit(
deltaLog.store,
hadoopConf,
deltaLog.dataPath,
attemptVersion,
jsonActions,
UpdatedActions(commitInfo, newMetadata, newProtocolOpt))
// TODO(managed-commits): Use the right timestamp method on top of CommitInfo once ICT is
// merged.
Some(commitResponse.commit)
case None =>
deltaLog.store.write(
deltaFile(deltaLog.logPath, attemptVersion),
jsonActions,
overwrite = false,
hadoopConf)
None
}

val commitStore = preCommitCommitStoreOpt.getOrElse(new FileSystemBasedCommitStore(deltaLog))
val commitResponse = commitStore.commit(
deltaLog.store,
hadoopConf,
deltaLog.logPath,
attemptVersion,
jsonActions,
UpdatedActions(commitInfo, newMetadata, newProtocolOpt))
// TODO(managed-commits): Use the right timestamp method on top of CommitInfo once ICT is
// merged.
acStatsCollector.finalizeStats(deltaLog.tableId)
spark.sessionState.conf.setConf(
DeltaSQLConf.DELTA_LAST_COMMIT_VERSION_IN_SESSION,
Expand All @@ -1339,7 +1330,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
// NOTE: commitLarge cannot run postCommitHooks (such as the CheckpointHook).
// Instead, manually run any necessary actions in updateAndCheckpoint.
val postCommitSnapshot = updateAndCheckpoint(
spark, deltaLog, commitSize, attemptVersion, commitOpt, txnId)
spark, deltaLog, commitSize, attemptVersion, commitResponse.commit, txnId)
val postCommitReconstructionTime = System.nanoTime()
var stats = CommitStats(
startVersion = readVersion,
Expand Down Expand Up @@ -1408,9 +1399,14 @@ trait OptimisticTransactionImpl extends TransactionalWrite
deltaLog: DeltaLog,
commitSize: Int,
attemptVersion: Long,
commitOpt: Option[Commit],
commit: Commit,
txnId: String): Snapshot = {
val currentSnapshot = deltaLog.update()

val currentSnapshot = deltaLog.updateAfterCommit(
attemptVersion,
commit,
newChecksumOpt = None,
preCommitLogSegment = preCommitLogSegment)
if (currentSnapshot.version != attemptVersion) {
throw DeltaErrors.invalidCommittedVersion(attemptVersion, currentSnapshot.version)
}
Expand Down Expand Up @@ -1800,7 +1796,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val fsWriteStartNano = System.nanoTime()
val jsonActions = actions.map(_.json)

val (newChecksumOpt, commitOpt) =
val (newChecksumOpt, commit) =
writeCommitFile(attemptVersion, jsonActions.toIterator, currentTransactionInfo)

spark.sessionState.conf.setConf(
Expand All @@ -1811,6 +1807,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite

val postCommitSnapshot = deltaLog.updateAfterCommit(
attemptVersion,
commit,
newChecksumOpt,
preCommitLogSegment
)
Expand Down Expand Up @@ -1887,6 +1884,41 @@ trait OptimisticTransactionImpl extends TransactionalWrite
postCommitSnapshot
}

class FileSystemBasedCommitStore(deltaLog: DeltaLog) extends CommitStore {
override def commit(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
commitVersion: Long,
actions: Iterator[String],
updatedActions: UpdatedActions): CommitResponse = {
val commitFile = util.FileNames.deltaFile(logPath, commitVersion)
val commitFileStatus =
doCommit(logStore, hadoopConf, logPath, commitFile, commitVersion, actions)
// TODO(managed-commits): Integrate with ICT and pass the correct commitTimestamp
CommitResponse(Commit(
commitVersion,
fileStatus = commitFileStatus,
commitTimestamp = commitFileStatus.getModificationTime
))
}

protected def doCommit(
logStore: LogStore,
hadoopConf: Configuration,
logPath: Path,
commitFile: Path,
commitVersion: Long,
actions: Iterator[String]): FileStatus = {
logStore.write(commitFile, actions, overwrite = false, hadoopConf)
logPath.getFileSystem(hadoopConf).getFileStatus(commitFile)
}


override def getCommits(
tablePath: Path, startVersion: Long, endVersion: Option[Long]): Seq[Commit] = Seq.empty
}

/**
* Writes the json actions provided to the commit file corresponding to attemptVersion.
* If managed-commits are enabled, this method must return a non-empty [[Commit]]
Expand All @@ -1896,35 +1928,39 @@ trait OptimisticTransactionImpl extends TransactionalWrite
attemptVersion: Long,
jsonActions: Iterator[String],
currentTransactionInfo: CurrentTransactionInfo)
: (Option[VersionChecksum], Option[Commit]) = preCommitCommitStoreOpt match {
case Some(preCommitCommitStore) =>
// managed-commit
val updatedActions = currentTransactionInfo.getUpdateActions()
val commitResponse = preCommitCommitStore.commit(
deltaLog.store,
deltaLog.newDeltaHadoopConf(),
deltaLog.dataPath,
attemptVersion,
jsonActions,
updatedActions)
if (attemptVersion == 0L) {
val expectedPathForCommitZero = deltaFile(deltaLog.logPath, version = 0L).toUri
val actualCommitPath = commitResponse.commit.fileStatus.getPath.toUri
if (actualCommitPath != expectedPathForCommitZero) {
throw new IllegalStateException("Expected 0th commit to be written to " +
s"$expectedPathForCommitZero but was written to $actualCommitPath")
}
: (Option[VersionChecksum], Commit) = {
val commitStore = preCommitCommitStoreOpt.getOrElse(new FileSystemBasedCommitStore(deltaLog))
val commitFile =
writeCommitFileImpl(attemptVersion, jsonActions, commitStore, currentTransactionInfo)
(None, commitFile)
}

protected def writeCommitFileImpl(
attemptVersion: Long,
jsonActions: Iterator[String],
commitStore: CommitStore,
currentTransactionInfo: CurrentTransactionInfo
): Commit = {
val commitResponse = commitStore.commit(
deltaLog.store,
deltaLog.newDeltaHadoopConf(),
deltaLog.logPath,
attemptVersion,
jsonActions,
currentTransactionInfo.getUpdateActions())
// TODO(managed-commits): Use the right timestamp method on top of CommitInfo once ICT is
// merged.
val commitTimestamp = commitResponse.commit.fileStatus.getModificationTime
val commitFile = commitResponse.commit.copy(commitTimestamp = commitTimestamp)
if (attemptVersion == 0L) {
val expectedPathForCommitZero = deltaFile(deltaLog.logPath, version = 0L).toUri
val actualCommitPath = commitResponse.commit.fileStatus.getPath.toUri
if (actualCommitPath != expectedPathForCommitZero) {
throw new IllegalStateException("Expected 0th commit to be written to " +
s"$expectedPathForCommitZero but was written to $actualCommitPath")
}
(None, Some(commitResponse.commit))
case None =>
// filesystem based commit
deltaLog.store.write(
deltaFile(deltaLog.logPath, attemptVersion),
jsonActions,
overwrite = false,
deltaLog.newDeltaHadoopConf())
// No VersionChecksum and commitInfo available yet
(None, None)
}
commitFile
}

/**
Expand Down Expand Up @@ -2126,5 +2162,4 @@ trait OptimisticTransactionImpl extends TransactionalWrite
case _ =>
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,10 @@ trait SnapshotManagement { self: DeltaLog =>
// TODO(managed-commits): Make sure all usage of `listDeltaCompactedDeltaAndCheckpointFiles`
// are replaced with this method.
val resultFromCommitStore = recordFrameProfile("DeltaLog", "CommitStore.getCommits") {
commitStoreOpt
.map(_.getCommits(dataPath, startVersion, endVersion = versionToLoad))
.getOrElse(Seq.empty)
commitStoreOpt match {
case Some(cs) => cs.getCommits(logPath, startVersion, endVersion = versionToLoad)
case None => Seq.empty
}
}

var maxDeltaVersionSeen = startVersion - 1
Expand Down Expand Up @@ -657,7 +658,33 @@ trait SnapshotManagement { self: DeltaLog =>
}
}

/** Used to compute the LogSegment after a commit */
/**
* Used to compute the LogSegment after a commit, by adding the delta file with the specified
* version to the preCommitLogSegment (which must match the immediately preceding version).
*/
protected[delta] def getLogSegmentAfterCommit(
committedVersion: Long,
newChecksumOpt: Option[VersionChecksum],
preCommitLogSegment: LogSegment,
commit: Commit,
commitStoreOpt: Option[CommitStore],
oldCheckpointProvider: CheckpointProvider): LogSegment = recordFrameProfile(
"Delta", "SnapshotManagement.getLogSegmentAfterCommit") {
// If the table doesn't have any competing updates, then go ahead and use the optimized
// incremental logSegment computation to fetch the LogSegment for the committedVersion.
// See the comment in the getLogSegmentAfterCommit overload for why we can't always safely
// return the committedVersion's snapshot when there is contention.
val useFastSnapshotConstruction = !snapshotLock.hasQueuedThreads
if (useFastSnapshotConstruction) {
SnapshotManagement.appendCommitToLogSegment(
preCommitLogSegment, commit.fileStatus, committedVersion)
} else {
val latestCheckpointProvider =
Seq(preCommitLogSegment.checkpointProvider, oldCheckpointProvider).maxBy(_.version)
getLogSegmentAfterCommit(commitStoreOpt, latestCheckpointProvider)
}
}

protected[delta] def getLogSegmentAfterCommit(
commitStoreOpt: Option[CommitStore],
oldCheckpointProvider: UninitializedCheckpointProvider): LogSegment = {
Expand Down Expand Up @@ -958,12 +985,14 @@ trait SnapshotManagement { self: DeltaLog =>
* Called after committing a transaction and updating the state of the table.
*
* @param committedVersion the version that was committed
* @param commit information about the commit file.
* @param newChecksumOpt the checksum for the new commit, if available.
* Usually None, since the commit would have just finished.
* @param preCommitLogSegment the log segment of the table prior to commit
*/
def updateAfterCommit(
committedVersion: Long,
commit: Commit,
newChecksumOpt: Option[VersionChecksum],
preCommitLogSegment: LogSegment): Snapshot = withSnapshotLockInterruptibly {
recordDeltaOperation(this, "delta.log.updateAfterCommit") {
Expand All @@ -972,6 +1001,10 @@ trait SnapshotManagement { self: DeltaLog =>
// Somebody else could have already updated the snapshot while we waited for the lock
if (committedVersion <= previousSnapshot.version) return previousSnapshot
val segment = getLogSegmentAfterCommit(
committedVersion,
newChecksumOpt,
preCommitLogSegment,
commit,
previousSnapshot.commitStoreOpt,
previousSnapshot.checkpointProvider)

Expand Down
Loading

0 comments on commit 6cdacab

Please sign in to comment.