Skip to content

Commit

Permalink
[Spark] Skip collecting commit stats to prevent computing Snapshot State
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

Before this PR, Delta computes a
[SnapshotState](https://github.com/delta-io/delta/blob/v3.1.0/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala#L46-L58)
during every commit.  Computing a SnapshotState is fairly slow and
expensive, because it involves reading the entirety of a checkpoint,
sidecars, and log segment.

For many types of commit, it should be unnecessary to compute the
SnapshotState.

After this PR, an transaction can avoid computing the SnapshotState of
a newly created snapshot. Skipping the computation is enabled via a
spark configuration option `spark.databricks.delta.commitStats.collect=false`

This change can have a big performance impact when writing into a Delta
Table.  Especially when the table comprises a large number of underlying
data files.

## How was this patch tested?

- Locally built delta-spark
- Ran a small spark job to insert rows into a delta table
- Inspected log4j output to see if snapshot state was computed
- Repeated again, this time setting `spark.databricks.delta.commitStats.collect=false`

## Does this PR introduce _any_ user-facing changes?

Yes, after this PR the user can set spark config option
`spark.databricks.delta.commitStats.collect=false` to avoid computing
SnapshotState after a commit.

Signed-off-by: Ian Streeter <[email protected]>
  • Loading branch information
istreeter committed Mar 7, 2024
1 parent f50bd83 commit 564e6ba
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1811,6 +1811,37 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val info = currentTransactionInfo.commitInfo
.map(_.copy(readVersion = None, isolationLevel = None)).orNull
setNeedsCheckpoint(attemptVersion, postCommitSnapshot)

val numFilesTotal =
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_COMMIT_STATS)) {
// Forces snapshot state reconstruction
postCommitSnapshot.numOfFiles
} else -1L

val sizeInBytesTotal =
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_COMMIT_STATS)) {
// Forces snapshot state reconstruction
postCommitSnapshot.sizeInBytes
} else -1L

val protocol =
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_COMMIT_STATS)) {
// Forces protocolAndMetadata reconstruction
postCommitSnapshot.protocol
} else currentTransactionInfo.protocol

val checkpointSizeInBytes =
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_COMMIT_STATS)) {
// This might block waiting on a Future that has not yet completed.
postCommitSnapshot.checkpointSizeInBytes()
} else -1L

val numPartitionColumnsInTable =
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_COLLECT_COMMIT_STATS)) {
// Forces protocolAndMetadata reconstruction
postCommitSnapshot.metadata.partitionColumns.size
} else -1

val stats = CommitStats(
startVersion = snapshot.version,
commitVersion = attemptVersion,
Expand All @@ -1824,20 +1855,20 @@ trait OptimisticTransactionImpl extends TransactionalWrite
numRemove = numRemove,
numSetTransaction = numSetTransaction,
bytesNew = bytesNew,
numFilesTotal = postCommitSnapshot.numOfFiles,
sizeInBytesTotal = postCommitSnapshot.sizeInBytes,
numFilesTotal = numFilesTotal,
sizeInBytesTotal = sizeInBytesTotal,
numCdcFiles = numCdcFiles,
cdcBytesNew = cdcBytesNew,
protocol = postCommitSnapshot.protocol,
protocol = protocol,
commitSizeBytes = jsonActions.map(_.size).sum,
checkpointSizeBytes = postCommitSnapshot.checkpointSizeInBytes(),
checkpointSizeBytes = checkpointSizeInBytes,
totalCommitsSizeSinceLastCheckpoint = postCommitSnapshot.deltaFileSizeInBytes(),
checkpointAttempt = needsCheckpoint,
info = info,
newMetadata = newMetadata,
numAbsolutePathsInAdd = numAbsolutePaths,
numDistinctPartitionsInAdd = distinctPartitions.size,
numPartitionColumnsInTable = postCommitSnapshot.metadata.partitionColumns.size,
numPartitionColumnsInTable = numPartitionColumnsInTable,
isolationLevel = isolationLevel.toString,
numOfDomainMetadatas = numOfDomainMetadatas,
txnId = Some(txnId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ trait DeltaSQLConfBase {
.stringConf
.createOptional

val DELTA_COLLECT_COMMIT_STATS =
buildConf("commitStats.collect")
.internal()
.doc(
"""When true, commit statistics are collected for logging purposes.
| Enabling this feature might require the Snapshot State to be computed, which is
| potentially expensive.
""".stripMargin)
.booleanConf
.createWithDefault(true)

val DELTA_CONVERT_USE_METADATA_LOG =
buildConf("convert.useMetadataLog")
.doc(
Expand Down

0 comments on commit 564e6ba

Please sign in to comment.