Skip to content

Commit

Permalink
[Spark] Skip collecting commit stats to prevent computing Snapshot State
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Before this PR, Delta computes a
[SnapshotState](https://github.com/delta-io/delta/blob/v3.1.0/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala#L46-L58)
during every commit.  Computing a SnapshotState is fairly slow and
expensive, because it involves reading the entirety of a checkpoint,
sidecars, and log segment.

For many types of commit, it should be unnecessary to compute the
SnapshotState.

After this PR, an transaction can avoid computing the SnapshotState of
a newly created snapshot. Skipping the computation is enabled via a
spark configuration option `spark.databricks.delta.commitStats.collect=false`

This change can have a big performance impact when writing into a Delta
Table.  Especially when the table comprises a large number of underlying
data files.

## How was this patch tested?

- Locally built delta-spark
- Ran a small spark job to insert rows into a delta table
- Inspected log4j output to see if snapshot state was computed
- Repeated again, this time setting `spark.databricks.delta.commitStats.collect=false`

## Does this PR introduce _any_ user-facing changes?

Yes, after this PR the user can set spark config option
`spark.databricks.delta.commitStats.collect=false` to avoid computing
SnapshotState after a commit.

Signed-off-by: Ian Streeter <[email protected]>

# Please enter the commit message for your changes. Lines starting
# with '#' will be kept; you may remove them yourself if you want to.
# An empty message aborts the commit.
#
# Date:      Mon Mar 4 20:43:45 2024 +0000
#
# HEAD detached from 564e6ba
# Changes to be committed:
#	modified:   spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala
#	modified:   spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala
#	modified:   spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala
#	modified:   spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala
#	modified:   spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuiteBase.scala
#
  • Loading branch information
istreeter committed Mar 27, 2024
1 parent dbed7a9 commit 278595f
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1848,6 +1848,37 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val info = currentTransactionInfo.commitInfo
.map(_.copy(readVersion = None, isolationLevel = None)).orNull
setNeedsCheckpoint(attemptVersion, postCommitSnapshot)

val numFilesTotal =
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_COMMIT_STATS)) {
// Forces snapshot state reconstruction
postCommitSnapshot.numOfFiles
} else -1L

val sizeInBytesTotal =
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_COMMIT_STATS)) {
// Forces snapshot state reconstruction
postCommitSnapshot.sizeInBytes
} else -1L

val protocol =
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_COMMIT_STATS)) {
// Forces protocolAndMetadata reconstruction
postCommitSnapshot.protocol
} else currentTransactionInfo.protocol

val checkpointSizeInBytes =
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_COMMIT_STATS)) {
// This might block waiting on a Future that has not yet completed.
postCommitSnapshot.checkpointSizeInBytes()
} else -1L

val numPartitionColumnsInTable =
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_COMMIT_STATS)) {
// Forces protocolAndMetadata reconstruction
postCommitSnapshot.metadata.partitionColumns.size
} else -1

val stats = CommitStats(
startVersion = snapshot.version,
commitVersion = attemptVersion,
Expand All @@ -1861,20 +1892,20 @@ trait OptimisticTransactionImpl extends TransactionalWrite
numRemove = numRemove,
numSetTransaction = numSetTransaction,
bytesNew = bytesNew,
numFilesTotal = postCommitSnapshot.numOfFiles,
sizeInBytesTotal = postCommitSnapshot.sizeInBytes,
numFilesTotal = numFilesTotal,
sizeInBytesTotal = sizeInBytesTotal,
numCdcFiles = numCdcFiles,
cdcBytesNew = cdcBytesNew,
protocol = postCommitSnapshot.protocol,
protocol = protocol,
commitSizeBytes = jsonActions.map(_.size).sum,
checkpointSizeBytes = postCommitSnapshot.checkpointSizeInBytes(),
checkpointSizeBytes = checkpointSizeInBytes,
totalCommitsSizeSinceLastCheckpoint = postCommitSnapshot.deltaFileSizeInBytes(),
checkpointAttempt = needsCheckpoint,
info = info,
newMetadata = newMetadata,
numAbsolutePathsInAdd = numAbsolutePaths,
numDistinctPartitionsInAdd = distinctPartitions.size,
numPartitionColumnsInTable = postCommitSnapshot.metadata.partitionColumns.size,
numPartitionColumnsInTable = numPartitionColumnsInTable,
isolationLevel = isolationLevel.toString,
numOfDomainMetadatas = numOfDomainMetadatas,
txnId = Some(txnId))
Expand Down
6 changes: 5 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ object RowId {
* Extracts the high watermark of row IDs from a snapshot.
*/
private[delta] def extractHighWatermark(snapshot: Snapshot): Option[Long] =
RowTrackingMetadataDomain.fromSnapshot(snapshot).map(_.rowIdHighWaterMark)
if (isSupported(snapshot.protocol)) {
RowTrackingMetadataDomain.fromSnapshot(snapshot).map(_.rowIdHighWaterMark)
} else {
None
}

/** Base Row ID column name */
val BASE_ROW_ID = "base_row_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ trait DeltaSQLConfBase {
.stringConf
.createOptional

val DELTA_COLLECT_COMMIT_STATS =
buildConf("commitStats.collect")
.internal()
.doc(
"""When true, commit statistics are collected for logging purposes.
| Enabling this feature might require the Snapshot State to be computed, which is
| potentially expensive.
""".stripMargin)
.booleanConf
.createWithDefault(true)

val DELTA_CONVERT_USE_METADATA_LOG =
buildConf("convert.useMetadataLog")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,4 +806,30 @@ class OptimisticTransactionSuite
}
}
}

test("Skip computing state of post-commit snapshot") {
withTempDir { tableDir =>
val df = Seq((1, 0), (2, 1)).toDF("key", "value")
df.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val deltaLog = DeltaLog.forTable(spark, tableDir)
val snapshot = deltaLog.update()

assert(!snapshot.stateReconstructionTriggered)
}
}

test("Skip computing state of pre-commit snapshot") {
withTempDir { tableDir =>
val df = Seq((1, 0), (2, 1)).toDF("key", "value")
df.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val deltaLog = DeltaLog.forTable(spark, tableDir)
val preCommitSnapshot = deltaLog.update()

df.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

assert(!preCommitSnapshot.stateReconstructionTriggered)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ trait OptimisticTransactionSuiteBase
with SharedSparkSession
with DeletionVectorsTestUtils {

override def sparkConf =
super.sparkConf.set("spark.databricks.delta.commitStats.collect", "false")


/**
* Check whether the test transaction conflict with the concurrent writes by executing the
Expand Down

0 comments on commit 278595f

Please sign in to comment.