Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Skip collecting commit stats to prevent computing Snapshot State #2718

Merged
merged 2 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1874,6 +1874,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val info = currentTransactionInfo.commitInfo
.map(_.copy(readVersion = None, isolationLevel = None)).orNull
setNeedsCheckpoint(attemptVersion, postCommitSnapshot)
istreeter marked this conversation as resolved.
Show resolved Hide resolved
val doCollectCommitStats =
needsCheckpoint || spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_FORCE_ALL_COMMIT_STATS)

// Stats that force an expensive snapshot state reconstruction:
val numFilesTotal = if (doCollectCommitStats) postCommitSnapshot.numOfFiles else -1L
val sizeInBytesTotal = if (doCollectCommitStats) postCommitSnapshot.sizeInBytes else -1L

val stats = CommitStats(
startVersion = snapshot.version,
commitVersion = attemptVersion,
Expand All @@ -1887,8 +1894,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
numRemove = numRemove,
numSetTransaction = numSetTransaction,
bytesNew = bytesNew,
numFilesTotal = postCommitSnapshot.numOfFiles,
sizeInBytesTotal = postCommitSnapshot.sizeInBytes,
numFilesTotal = numFilesTotal,
sizeInBytesTotal = sizeInBytesTotal,
numCdcFiles = numCdcFiles,
cdcBytesNew = cdcBytesNew,
protocol = postCommitSnapshot.protocol,
Expand Down
6 changes: 5 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,11 @@ object RowId {
* Extracts the high watermark of row IDs from a snapshot.
*/
private[delta] def extractHighWatermark(snapshot: Snapshot): Option[Long] =
RowTrackingMetadataDomain.fromSnapshot(snapshot).map(_.rowIdHighWaterMark)
if (isSupported(snapshot.protocol)) {
RowTrackingMetadataDomain.fromSnapshot(snapshot).map(_.rowIdHighWaterMark)
} else {
None
}

/** Base Row ID column name */
val BASE_ROW_ID = "base_row_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ trait DeltaSQLConfBase {
.stringConf
.createOptional

val DELTA_FORCE_ALL_COMMIT_STATS =
buildConf("commitStats.force")
.internal()
.doc(
"""When true, forces commit statistics to be collected for logging purposes.
| Enabling this feature requires the Snapshot State to be computed, which is
| potentially expensive.
""".stripMargin)
.booleanConf
.createWithDefault(false)

val DELTA_CONVERT_USE_METADATA_LOG =
buildConf("convert.useMetadataLog")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ abstract class DeleteSuiteBase extends QueryTest
test("schema pruning on data condition") {
val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
append(input, Nil)
// Start from a cached snapshot state
deltaLog.update().stateDF

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkDelete(Some("key = 2"),
Expand All @@ -347,6 +349,8 @@ abstract class DeleteSuiteBase extends QueryTest
val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
.select(struct("key", "value").alias("nested"))
append(input, Nil)
// Start from a cached snapshot state
deltaLog.update().stateDF

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkDelete(Some("nested.key = 2"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -888,4 +888,22 @@ class OptimisticTransactionSuite
}
}
}

test("Append does not trigger snapshot state computation") {
withTempDir { tableDir =>
val df = Seq((1, 0), (2, 1)).toDF("key", "value")
df.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val deltaLog = DeltaLog.forTable(spark, tableDir)
val preCommitSnapshot = deltaLog.update()
assert(!preCommitSnapshot.stateReconstructionTriggered)

df.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val postCommitSnapshot = deltaLog.update()
assert(!preCommitSnapshot.stateReconstructionTriggered)
assert(!postCommitSnapshot.stateReconstructionTriggered)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,8 @@ abstract class UpdateSuiteBase

test("schema pruning on finding files to update") {
append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"))
// Start from a cached snapshot state
deltaLog.update().stateDF

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkUpdate(condition = Some("key = 2"), setClauses = "key = 1, value = 3",
Expand All @@ -717,6 +719,8 @@ abstract class UpdateSuiteBase
test("nested schema pruning on finding files to update") {
append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
.select(struct("key", "value").alias("nested")))
// Start from a cached snapshot state
deltaLog.update().stateDF

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkUpdate(condition = Some("nested.key = 2"),
Expand Down
Loading