Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark][Version Checksum] Add DV-related metrics #3953

Merged
merged 1 commit into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,12 @@
],
"sqlState" : "42601"
},
"DELTA_DV_HISTOGRAM_DESERIALIZATON" : {
"message" : [
"Could not deserialize the deleted record counts histogram during table integrity verification."
],
"sqlState" : "22000"
},
"DELTA_DYNAMIC_PARTITION_OVERWRITE_DISABLED" : {
"message" : [
"Dynamic partition overwrite mode is specified by session config or write options, but it is disabled by `delta.dynamicPartitionOverwrite.enabled=false`."
Expand Down
86 changes: 84 additions & 2 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.DeletionVectorUtils
import org.apache.spark.sql.delta.logging.DeltaLogKeys
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram
import org.apache.spark.sql.delta.stats.FileSizeHistogram
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.{FileNames, JsonUtils}
Expand All @@ -50,14 +52,22 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
* @param txnId Optional transaction identifier
* @param tableSizeBytes The size of the table in bytes
* @param numFiles Number of `AddFile` actions in the snapshot
* @param numDeletedRecordsOpt The number of deleted records with Deletion Vectors.
* @param numDeletionVectorsOpt The number of Deletion Vectors present in the snapshot.
* @param numMetadata Number of `Metadata` actions in the snapshot
* @param numProtocol Number of `Protocol` actions in the snapshot
* @param histogramOpt Optional file size histogram
* @param deletedRecordCountsHistogramOpt A histogram of the deleted records count distribution
* for all the files in the snapshot.
*/
case class VersionChecksum(
txnId: Option[String],
tableSizeBytes: Long,
numFiles: Long,
@JsonDeserialize(contentAs = classOf[Long])
numDeletedRecordsOpt: Option[Long],
@JsonDeserialize(contentAs = classOf[Long])
numDeletionVectorsOpt: Option[Long],
numMetadata: Long,
numProtocol: Long,
@JsonDeserialize(contentAs = classOf[Long])
Expand All @@ -67,6 +77,7 @@ case class VersionChecksum(
metadata: Metadata,
protocol: Protocol,
histogramOpt: Option[FileSizeHistogram],
deletedRecordCountsHistogramOpt: Option[DeletedRecordCountsHistogram],
allFiles: Option[Seq[AddFile]])

/**
Expand Down Expand Up @@ -202,6 +213,10 @@ trait RecordChecksum extends DeltaLogging {
// Incrementally compute the new version checksum, if the old one is available.
val ignoreAddFilesInOperation =
RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName)
val persistentDVsOnTableReadable =
DeletionVectorUtils.deletionVectorsReadable(protocol, metadata)
val persistentDVsOnTableWritable =
DeletionVectorUtils.deletionVectorsWritable(protocol, metadata)

computeNewChecksum(
versionToCompute,
Expand All @@ -211,7 +226,9 @@ trait RecordChecksum extends DeltaLogging {
oldSnapshot,
actions,
ignoreAddFilesInOperation,
includeAddFilesInCrc
includeAddFilesInCrc,
persistentDVsOnTableReadable,
persistentDVsOnTableWritable
)
}

Expand All @@ -227,6 +244,10 @@ trait RecordChecksum extends DeltaLogging {
* @param actions used to incrementally compute new checksum.
* @param ignoreAddFiles for transactions whose add file actions refer to already-existing files
* e.g., [[DeltaOperations.ComputeStats]] transactions.
* @param persistentDVsOnTableReadable Indicates whether commands modifying this table are allowed
* to read deletion vectors.
* @param persistentDVsOnTableWritable Indicates whether commands modifying this table are allowed
* to create new deletion vectors.
* @return Either the new checksum or error code string if the checksum could not be computed
* incrementally due to some reason.
*/
Expand All @@ -239,7 +260,9 @@ trait RecordChecksum extends DeltaLogging {
oldSnapshot: Option[Snapshot],
actions: Seq[Action],
ignoreAddFiles: Boolean,
includeAllFilesInCRC: Boolean
includeAllFilesInCRC: Boolean,
persistentDVsOnTableReadable: Boolean,
persistentDVsOnTableWritable: Boolean
) : Either[String, VersionChecksum] = {
// scalastyle:on argcount
oldSnapshot.foreach(s => require(s.version == (attemptVersion - 1)))
Expand All @@ -248,19 +271,66 @@ trait RecordChecksum extends DeltaLogging {
var protocol = oldVersionChecksum.protocol
var metadata = oldVersionChecksum.metadata

// In incremental computation, tables initialized with DVs disabled contain None DV
// statistics. DV statistics remain None even if DVs are enabled at a random point
// during the lifecycle of a table. That can only change if a full snapshot recomputation
// is invoked while DVs are enabled for the table.
val conf = spark.sessionState.conf
val isFirstVersion = oldSnapshot.forall(_.version == -1)
val checksumDVMetricsEnabled = conf.getConf(DeltaSQLConf.DELTA_CHECKSUM_DV_METRICS_ENABLED)
val deletedRecordCountsHistogramEnabled =
conf.getConf(DeltaSQLConf.DELTA_DELETED_RECORD_COUNTS_HISTOGRAM_ENABLED)

// For tables where DVs were disabled later on in the table lifecycle we want to maintain DV
// statistics.
val computeDVMetricsWhenDVsNotWritable = persistentDVsOnTableReadable &&
oldVersionChecksum.numDeletionVectorsOpt.isDefined && !isFirstVersion

val computeDVMetrics = checksumDVMetricsEnabled &&
(persistentDVsOnTableWritable || computeDVMetricsWhenDVsNotWritable)

// DV-related metrics. When the old checksum does not contain DV statistics, we attempt to
// pick them up from the old snapshot.
var numDeletedRecordsOpt = if (computeDVMetrics) {
oldVersionChecksum.numDeletedRecordsOpt
.orElse(oldSnapshot.flatMap(_.numDeletedRecordsOpt))
} else None
var numDeletionVectorsOpt = if (computeDVMetrics) {
oldVersionChecksum.numDeletionVectorsOpt
.orElse(oldSnapshot.flatMap(_.numDeletionVectorsOpt))
} else None
val deletedRecordCountsHistogramOpt =
if (computeDVMetrics && deletedRecordCountsHistogramEnabled) {
oldVersionChecksum.deletedRecordCountsHistogramOpt
.orElse(oldSnapshot.flatMap(_.deletedRecordCountsHistogramOpt))
.map(h => DeletedRecordCountsHistogram(h.deletedRecordCounts.clone()))
} else None

var inCommitTimestamp : Option[Long] = None
actions.foreach {
case a: AddFile if !ignoreAddFiles =>
tableSizeBytes += a.size
numFiles += 1

// Only accumulate DV statistics when base stats are not None.
val (dvCount, dvCardinality) =
Option(a.deletionVector).map(1L -> _.cardinality).getOrElse(0L -> 0L)
numDeletedRecordsOpt = numDeletedRecordsOpt.map(_ + dvCardinality)
numDeletionVectorsOpt = numDeletionVectorsOpt.map(_ + dvCount)
deletedRecordCountsHistogramOpt.foreach(_.insert(dvCardinality))

// extendedFileMetadata == true implies fields partitionValues, size, and tags are present
case r: RemoveFile if r.extendedFileMetadata == Some(true) =>
val size = r.size.get
tableSizeBytes -= size
numFiles -= 1

// Only accumulate DV statistics when base stats are not None.
val (dvCount, dvCardinality) =
Option(r.deletionVector).map(1L -> _.cardinality).getOrElse(0L -> 0L)
numDeletedRecordsOpt = numDeletedRecordsOpt.map(_ - dvCardinality)
numDeletionVectorsOpt = numDeletionVectorsOpt.map(_ - dvCount)
deletedRecordCountsHistogramOpt.foreach(_.remove(dvCardinality))

case r: RemoveFile =>
// Report the failure to usage logs.
Expand Down Expand Up @@ -359,6 +429,8 @@ trait RecordChecksum extends DeltaLogging {
txnId = txnIdOpt,
tableSizeBytes = tableSizeBytes,
numFiles = numFiles,
numDeletedRecordsOpt = numDeletedRecordsOpt,
numDeletionVectorsOpt = numDeletionVectorsOpt,
numMetadata = 1,
numProtocol = 1,
inCommitTimestampOpt = inCommitTimestamp,
Expand All @@ -367,6 +439,7 @@ trait RecordChecksum extends DeltaLogging {
setTransactions = setTransactions,
domainMetadata = domainMetadata,
allFiles = allFiles,
deletedRecordCountsHistogramOpt = deletedRecordCountsHistogramOpt,
histogramOpt = None
))
}
Expand Down Expand Up @@ -794,6 +867,15 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot =>
detailedErrorMapForUsageLogs += ("domainMetadata" -> JsonUtils.toJson(eventData))
}
}
// Deletion vectors metrics.
if (DeletionVectorUtils.deletionVectorsReadable(self)) {
(checksum.numDeletedRecordsOpt zip computedState.numDeletedRecordsOpt).foreach {
case (a, b) => compare(a, b, "Number of deleted records", "numDeletedRecordsOpt")
}
(checksum.numDeletionVectorsOpt zip computedState.numDeletionVectorsOpt).foreach {
case (a, b) => compare(a, b, "Number of deleted vectors", "numDeletionVectorsOpt")
}
}

compareAction(checksum.metadata, computedStateToCheckAgainst.metadata, "Metadata", "metadata")
compareAction(checksum.protocol, computedStateToCheckAgainst.protocol, "Protocol", "protocol")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,13 @@ trait DeltaErrorsBase
DeltaConfigs.CHANGE_DATA_FEED.key))
}

def deletedRecordCountsHistogramDeserializationException(): Throwable = {
new DeltaChecksumException(
errorClass = "DELTA_DV_HISTOGRAM_DESERIALIZATON",
messageParameters = Array.empty,
pos = 0)
}

/**
* Throwable used for invalid CDC 'start' and 'end' options, where end < start
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.encoderFor
Expand Down Expand Up @@ -45,6 +46,10 @@ object DeltaUDF {
def stringFromMap(f: Map[String, String] => String): UserDefinedFunction =
createUdfFromTemplateUnsafe(stringFromMapTemplate, f, udf(f))

def deletedRecordCountsHistogramFromArrayLong(
f: Array[Long] => DeletedRecordCountsHistogram): UserDefinedFunction =
createUdfFromTemplateUnsafe(deletedRecordCountsHistogramFromArrayLongTemplate, f, udf(f))

def booleanFromMap(f: Map[String, String] => Boolean): UserDefinedFunction =
createUdfFromTemplateUnsafe(booleanFromMapTemplate, f, udf(f))

Expand All @@ -65,6 +70,10 @@ object DeltaUDF {
private lazy val stringFromMapTemplate =
udf((_: Map[String, String]) => "").asInstanceOf[SparkUserDefinedFunction]

private lazy val deletedRecordCountsHistogramFromArrayLongTemplate =
udf((_: Array[Long]) => DeletedRecordCountsHistogram(Array.empty))
.asInstanceOf[SparkUserDefinedFunction]

private lazy val booleanFromMapTemplate =
udf((_: Map[String, String]) => true).asInstanceOf[SparkUserDefinedFunction]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,15 @@ class Snapshot(
},
domainMetadata = checksumOpt.flatMap(_.domainMetadata)
.orElse(Option.when(_computedStateTriggered)(domainMetadata)),
numDeletedRecordsOpt = checksumOpt.flatMap(_.numDeletedRecordsOpt)
.orElse(Option.when(_computedStateTriggered)(numDeletedRecordsOpt).flatten)
.filter(_ => deletionVectorsReadableAndMetricsEnabled),
numDeletionVectorsOpt = checksumOpt.flatMap(_.numDeletionVectorsOpt)
.orElse(Option.when(_computedStateTriggered)(numDeletionVectorsOpt).flatten)
.filter(_ => deletionVectorsReadableAndMetricsEnabled),
deletedRecordCountsHistogramOpt = checksumOpt.flatMap(_.deletedRecordCountsHistogramOpt)
.orElse(Option.when(_computedStateTriggered)(deletedRecordCountsHistogramOpt).flatten)
.filter(_ => deletionVectorsReadableAndHistogramEnabled),
histogramOpt = checksumOpt.flatMap(_.histogramOpt)
)

Expand Down
Loading
Loading