From bc160dfdeb414b4e60d5f2c0581f3fff7d04959b Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Thu, 2 Jan 2025 11:08:57 -0800 Subject: [PATCH] add DV metrics --- .../resources/error/delta-error-classes.json | 6 + .../org/apache/spark/sql/delta/Checksum.scala | 86 +++- .../apache/spark/sql/delta/DeltaErrors.scala | 7 + .../org/apache/spark/sql/delta/DeltaUDF.scala | 9 + .../org/apache/spark/sql/delta/Snapshot.scala | 9 + .../spark/sql/delta/SnapshotState.scala | 71 +++- .../sql/delta/sources/DeltaSQLConf.scala | 19 + .../stats/DeletedRecordCountsHistogram.scala | 79 ++++ .../DeletedRecordCountsHistogramUtils.scala | 159 ++++++++ .../sql/delta/ChecksumDVMetricsSuite.scala | 368 ++++++++++++++++++ 10 files changed, 807 insertions(+), 6 deletions(-) create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/stats/DeletedRecordCountsHistogram.scala create mode 100644 spark/src/main/scala/org/apache/spark/sql/delta/stats/DeletedRecordCountsHistogramUtils.scala create mode 100644 spark/src/test/scala/org/apache/spark/sql/delta/ChecksumDVMetricsSuite.scala diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index c193b6a15b9..b2ae5cdabbc 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -845,6 +845,12 @@ ], "sqlState" : "42601" }, + "DELTA_DV_HISTOGRAM_DESERIALIZATON" : { + "message" : [ + "Could not deserialize the deleted record counts histogram during table integrity verification." + ], + "sqlState" : "22000" + }, "DELTA_DYNAMIC_PARTITION_OVERWRITE_DISABLED" : { "message" : [ "Dynamic partition overwrite mode is specified by session config or write options, but it is disabled by `delta.dynamicPartitionOverwrite.enabled=false`." diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala index 150b57c0376..867fc2f038a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala @@ -27,9 +27,11 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import org.apache.spark.sql.delta.actions._ +import org.apache.spark.sql.delta.commands.DeletionVectorUtils import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram import org.apache.spark.sql.delta.stats.FileSizeHistogram import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} @@ -50,14 +52,22 @@ import org.apache.spark.util.{SerializableConfiguration, Utils} * @param txnId Optional transaction identifier * @param tableSizeBytes The size of the table in bytes * @param numFiles Number of `AddFile` actions in the snapshot + * @param numDeletedRecordsOpt The number of deleted records with Deletion Vectors. + * @param numDeletionVectorsOpt The number of Deletion Vectors present in the snapshot. * @param numMetadata Number of `Metadata` actions in the snapshot * @param numProtocol Number of `Protocol` actions in the snapshot * @param histogramOpt Optional file size histogram + * @param deletedRecordCountsHistogramOpt A histogram of the deleted records count distribution + * for all the files in the snapshot. */ case class VersionChecksum( txnId: Option[String], tableSizeBytes: Long, numFiles: Long, + @JsonDeserialize(contentAs = classOf[Long]) + numDeletedRecordsOpt: Option[Long], + @JsonDeserialize(contentAs = classOf[Long]) + numDeletionVectorsOpt: Option[Long], numMetadata: Long, numProtocol: Long, @JsonDeserialize(contentAs = classOf[Long]) @@ -67,6 +77,7 @@ case class VersionChecksum( metadata: Metadata, protocol: Protocol, histogramOpt: Option[FileSizeHistogram], + deletedRecordCountsHistogramOpt: Option[DeletedRecordCountsHistogram], allFiles: Option[Seq[AddFile]]) /** @@ -202,6 +213,10 @@ trait RecordChecksum extends DeltaLogging { // Incrementally compute the new version checksum, if the old one is available. val ignoreAddFilesInOperation = RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName) + val persistentDVsOnTableReadable = + DeletionVectorUtils.deletionVectorsReadable(protocol, metadata) + val persistentDVsOnTableWritable = + DeletionVectorUtils.deletionVectorsWritable(protocol, metadata) computeNewChecksum( versionToCompute, @@ -211,7 +226,9 @@ trait RecordChecksum extends DeltaLogging { oldSnapshot, actions, ignoreAddFilesInOperation, - includeAddFilesInCrc + includeAddFilesInCrc, + persistentDVsOnTableReadable, + persistentDVsOnTableWritable ) } @@ -227,6 +244,10 @@ trait RecordChecksum extends DeltaLogging { * @param actions used to incrementally compute new checksum. * @param ignoreAddFiles for transactions whose add file actions refer to already-existing files * e.g., [[DeltaOperations.ComputeStats]] transactions. + * @param persistentDVsOnTableReadable Indicates whether commands modifying this table are allowed + * to read deletion vectors. + * @param persistentDVsOnTableWritable Indicates whether commands modifying this table are allowed + * to create new deletion vectors. * @return Either the new checksum or error code string if the checksum could not be computed * incrementally due to some reason. */ @@ -239,7 +260,9 @@ trait RecordChecksum extends DeltaLogging { oldSnapshot: Option[Snapshot], actions: Seq[Action], ignoreAddFiles: Boolean, - includeAllFilesInCRC: Boolean + includeAllFilesInCRC: Boolean, + persistentDVsOnTableReadable: Boolean, + persistentDVsOnTableWritable: Boolean ) : Either[String, VersionChecksum] = { // scalastyle:on argcount oldSnapshot.foreach(s => require(s.version == (attemptVersion - 1))) @@ -248,12 +271,53 @@ trait RecordChecksum extends DeltaLogging { var protocol = oldVersionChecksum.protocol var metadata = oldVersionChecksum.metadata + // In incremental computation, tables initialized with DVs disabled contain None DV + // statistics. DV statistics remain None even if DVs are enabled at a random point + // during the lifecycle of a table. That can only change if a full snapshot recomputation + // is invoked while DVs are enabled for the table. + val conf = spark.sessionState.conf + val isFirstVersion = oldSnapshot.forall(_.version == -1) + val checksumDVMetricsEnabled = conf.getConf(DeltaSQLConf.DELTA_CHECKSUM_DV_METRICS_ENABLED) + val deletedRecordCountsHistogramEnabled = + conf.getConf(DeltaSQLConf.DELTA_DELETED_RECORD_COUNTS_HISTOGRAM_ENABLED) + + // For tables where DVs were disabled later on in the table lifecycle we want to maintain DV + // statistics. + val computeDVMetricsWhenDVsNotWritable = persistentDVsOnTableReadable && + oldVersionChecksum.numDeletionVectorsOpt.isDefined && !isFirstVersion + + val computeDVMetrics = checksumDVMetricsEnabled && + (persistentDVsOnTableWritable || computeDVMetricsWhenDVsNotWritable) + + // DV-related metrics. When the old checksum does not contain DV statistics, we attempt to + // pick them up from the old snapshot. + var numDeletedRecordsOpt = if (computeDVMetrics) { + oldVersionChecksum.numDeletedRecordsOpt + .orElse(oldSnapshot.flatMap(_.numDeletedRecordsOpt)) + } else None + var numDeletionVectorsOpt = if (computeDVMetrics) { + oldVersionChecksum.numDeletionVectorsOpt + .orElse(oldSnapshot.flatMap(_.numDeletionVectorsOpt)) + } else None + val deletedRecordCountsHistogramOpt = + if (computeDVMetrics && deletedRecordCountsHistogramEnabled) { + oldVersionChecksum.deletedRecordCountsHistogramOpt + .orElse(oldSnapshot.flatMap(_.deletedRecordCountsHistogramOpt)) + .map(h => DeletedRecordCountsHistogram(h.deletedRecordCounts.clone())) + } else None + var inCommitTimestamp : Option[Long] = None actions.foreach { case a: AddFile if !ignoreAddFiles => tableSizeBytes += a.size numFiles += 1 + // Only accumulate DV statistics when base stats are not None. + val (dvCount, dvCardinality) = + Option(a.deletionVector).map(1L -> _.cardinality).getOrElse(0L -> 0L) + numDeletedRecordsOpt = numDeletedRecordsOpt.map(_ + dvCardinality) + numDeletionVectorsOpt = numDeletionVectorsOpt.map(_ + dvCount) + deletedRecordCountsHistogramOpt.foreach(_.insert(dvCardinality)) // extendedFileMetadata == true implies fields partitionValues, size, and tags are present case r: RemoveFile if r.extendedFileMetadata == Some(true) => @@ -261,6 +325,12 @@ trait RecordChecksum extends DeltaLogging { tableSizeBytes -= size numFiles -= 1 + // Only accumulate DV statistics when base stats are not None. + val (dvCount, dvCardinality) = + Option(r.deletionVector).map(1L -> _.cardinality).getOrElse(0L -> 0L) + numDeletedRecordsOpt = numDeletedRecordsOpt.map(_ - dvCardinality) + numDeletionVectorsOpt = numDeletionVectorsOpt.map(_ - dvCount) + deletedRecordCountsHistogramOpt.foreach(_.remove(dvCardinality)) case r: RemoveFile => // Report the failure to usage logs. @@ -359,6 +429,8 @@ trait RecordChecksum extends DeltaLogging { txnId = txnIdOpt, tableSizeBytes = tableSizeBytes, numFiles = numFiles, + numDeletedRecordsOpt = numDeletedRecordsOpt, + numDeletionVectorsOpt = numDeletionVectorsOpt, numMetadata = 1, numProtocol = 1, inCommitTimestampOpt = inCommitTimestamp, @@ -367,6 +439,7 @@ trait RecordChecksum extends DeltaLogging { setTransactions = setTransactions, domainMetadata = domainMetadata, allFiles = allFiles, + deletedRecordCountsHistogramOpt = deletedRecordCountsHistogramOpt, histogramOpt = None )) } @@ -794,6 +867,15 @@ trait ValidateChecksum extends DeltaLogging { self: Snapshot => detailedErrorMapForUsageLogs += ("domainMetadata" -> JsonUtils.toJson(eventData)) } } + // Deletion vectors metrics. + if (DeletionVectorUtils.deletionVectorsReadable(self)) { + (checksum.numDeletedRecordsOpt zip computedState.numDeletedRecordsOpt).foreach { + case (a, b) => compare(a, b, "Number of deleted records", "numDeletedRecordsOpt") + } + (checksum.numDeletionVectorsOpt zip computedState.numDeletionVectorsOpt).foreach { + case (a, b) => compare(a, b, "Number of deleted vectors", "numDeletionVectorsOpt") + } + } compareAction(checksum.metadata, computedStateToCheckAgainst.metadata, "Metadata", "metadata") compareAction(checksum.protocol, computedStateToCheckAgainst.protocol, "Protocol", "protocol") diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 9f005e2bc89..5253e50c133 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -447,6 +447,13 @@ trait DeltaErrorsBase DeltaConfigs.CHANGE_DATA_FEED.key)) } + def deletedRecordCountsHistogramDeserializationException(): Throwable = { + new DeltaChecksumException( + errorClass = "DELTA_DV_HISTOGRAM_DESERIALIZATON", + messageParameters = Array.empty, + pos = 0) + } + /** * Throwable used for invalid CDC 'start' and 'end' options, where end < start */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala index 4c26595ead8..018c9a506f4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaUDF.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.encoders.encoderFor @@ -45,6 +46,10 @@ object DeltaUDF { def stringFromMap(f: Map[String, String] => String): UserDefinedFunction = createUdfFromTemplateUnsafe(stringFromMapTemplate, f, udf(f)) + def deletedRecordCountsHistogramFromArrayLong( + f: Array[Long] => DeletedRecordCountsHistogram): UserDefinedFunction = + createUdfFromTemplateUnsafe(deletedRecordCountsHistogramFromArrayLongTemplate, f, udf(f)) + def booleanFromMap(f: Map[String, String] => Boolean): UserDefinedFunction = createUdfFromTemplateUnsafe(booleanFromMapTemplate, f, udf(f)) @@ -65,6 +70,10 @@ object DeltaUDF { private lazy val stringFromMapTemplate = udf((_: Map[String, String]) => "").asInstanceOf[SparkUserDefinedFunction] + private lazy val deletedRecordCountsHistogramFromArrayLongTemplate = + udf((_: Array[Long]) => DeletedRecordCountsHistogram(Array.empty)) + .asInstanceOf[SparkUserDefinedFunction] + private lazy val booleanFromMapTemplate = udf((_: Map[String, String]) => true).asInstanceOf[SparkUserDefinedFunction] diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 9a93fdb05fd..930e1eed499 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -577,6 +577,15 @@ class Snapshot( }, domainMetadata = checksumOpt.flatMap(_.domainMetadata) .orElse(Option.when(_computedStateTriggered)(domainMetadata)), + numDeletedRecordsOpt = checksumOpt.flatMap(_.numDeletedRecordsOpt) + .orElse(Option.when(_computedStateTriggered)(numDeletedRecordsOpt).flatten) + .filter(_ => deletionVectorsReadableAndMetricsEnabled), + numDeletionVectorsOpt = checksumOpt.flatMap(_.numDeletionVectorsOpt) + .orElse(Option.when(_computedStateTriggered)(numDeletionVectorsOpt).flatten) + .filter(_ => deletionVectorsReadableAndMetricsEnabled), + deletedRecordCountsHistogramOpt = checksumOpt.flatMap(_.deletedRecordCountsHistogramOpt) + .orElse(Option.when(_computedStateTriggered)(deletedRecordCountsHistogramOpt).flatten) + .filter(_ => deletionVectorsReadableAndHistogramEnabled), histogramOpt = checksumOpt.flatMap(_.histogramOpt) ) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala index 33fb47993d4..0a7e8f3e10f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala @@ -19,12 +19,15 @@ package org.apache.spark.sql.delta // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.actions.{Metadata, Protocol, SetTransaction} import org.apache.spark.sql.delta.actions.DomainMetadata +import org.apache.spark.sql.delta.commands.DeletionVectorUtils import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogram +import org.apache.spark.sql.delta.stats.DeletedRecordCountsHistogramUtils import org.apache.spark.sql.delta.stats.FileSizeHistogram import org.apache.spark.sql.{Column, DataFrame} -import org.apache.spark.sql.functions.{coalesce, col, collect_set, count, last, lit, sum} +import org.apache.spark.sql.functions.{coalesce, col, collect_set, count, last, lit, sum, when} import org.apache.spark.util.Utils @@ -35,6 +38,8 @@ import org.apache.spark.util.Utils * @param numOfSetTransactions Number of streams writing to this table. * @param numOfFiles The number of files in this table. * @param numOfRemoves The number of tombstones in the state. + * @param numDeletedRecordsOpt The total number of records deleted with Deletion Vectors. + * @param numDeletionVectorsOpt The number of Deletion Vectors present in the table. * @param numOfMetadata The number of metadata actions in the state. Should be 1. * @param numOfProtocol The number of protocol actions in the state. Should be 1. * @param setTransactions The streaming queries writing to this table. @@ -42,19 +47,24 @@ import org.apache.spark.util.Utils * @param protocol The protocol version of the Delta table. * @param fileSizeHistogram A Histogram class tracking the file counts and total bytes * in different size ranges. + * @param deletedRecordCountsHistogramOpt A histogram of deletion records counts distribution + * for all files. */ case class SnapshotState( sizeInBytes: Long, numOfSetTransactions: Long, numOfFiles: Long, numOfRemoves: Long, + numDeletedRecordsOpt: Option[Long], + numDeletionVectorsOpt: Option[Long], numOfMetadata: Long, numOfProtocol: Long, setTransactions: Seq[SetTransaction], domainMetadata: Seq[DomainMetadata], metadata: Metadata, protocol: Protocol, - fileSizeHistogram: Option[FileSizeHistogram] = None + fileSizeHistogram: Option[FileSizeHistogram] = None, + deletedRecordCountsHistogramOpt: Option[DeletedRecordCountsHistogram] = None ) /** @@ -139,6 +149,33 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot => * A Map of alias to aggregations which needs to be done to calculate the `computedState` */ protected def aggregationsToComputeState: Map[String, Column] = { + val checksumDVMetricsEnabled = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CHECKSUM_DV_METRICS_ENABLED) + val deletedRecordCountsHistogramEnabled = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_DELETED_RECORD_COUNTS_HISTOGRAM_ENABLED) + lazy val persistentDVsOnTableSupported = + DeletionVectorUtils.deletionVectorsWritable(this) + + val computeChecksumDVMetrics = checksumDVMetricsEnabled && + persistentDVsOnTableSupported + val persistentDVsAggs = + if (computeChecksumDVMetrics) { + Map( + "numDeletedRecordsOpt" -> sum(coalesce(col("add.deletionVector.cardinality"), lit(0L))), + "numDeletionVectorsOpt" -> count(col("add.deletionVector"))) + } else { + Map("numDeletedRecordsOpt" -> lit(null), "numDeletionVectorsOpt" -> lit(null)) + } + + val histogramDVsAggExpr = if (computeChecksumDVMetrics && deletedRecordCountsHistogramEnabled) { + DeletedRecordCountsHistogramUtils.histogramAggregate( + when(col("add").isNotNull, coalesce(col("add.deletionVector.cardinality"), lit(0L)))) + } else { + lit(null).cast(DeletedRecordCountsHistogram.schema) + } + + val histogramDVsAgg = Seq("deletedRecordCountsHistogramOpt" -> histogramDVsAggExpr) + Map( // sum may return null for empty data set. "sizeInBytes" -> coalesce(sum(col("add.size")), lit(0L)), @@ -152,7 +189,7 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot => "metadata" -> last(col("metaData"), ignoreNulls = true), "protocol" -> last(col("protocol"), ignoreNulls = true), "fileSizeHistogram" -> lit(null).cast(FileSizeHistogram.schema) - ) + ) ++ persistentDVsAggs ++ histogramDVsAgg } /** @@ -171,21 +208,47 @@ trait SnapshotStateManager extends DeltaLogging { self: Snapshot => protected[delta] def setTransactionsIfKnown: Option[Seq[SetTransaction]] = Some(setTransactions) protected[delta] def numOfFilesIfKnown: Option[Long] = Some(numOfFiles) protected[delta] def domainMetadatasIfKnown: Option[Seq[DomainMetadata]] = Some(domainMetadata) + def numDeletedRecordsOpt: Option[Long] = computedState.numDeletedRecordsOpt + def numDeletionVectorsOpt: Option[Long] = computedState.numDeletionVectorsOpt + def deletedRecordCountsHistogramOpt: Option[DeletedRecordCountsHistogram] = + computedState.deletedRecordCountsHistogramOpt + + protected def deletionVectorsReadableAndMetricsEnabled: Boolean = { + val checksumDVMetricsEnabled = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CHECKSUM_DV_METRICS_ENABLED) + val dvsReadable = DeletionVectorUtils.deletionVectorsReadable(snapshotToScan) + checksumDVMetricsEnabled && dvsReadable + } + + protected def deletionVectorsReadableAndHistogramEnabled: Boolean = { + val deletedRecordCountsHistogramEnabled = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_DELETED_RECORD_COUNTS_HISTOGRAM_ENABLED) + deletionVectorsReadableAndMetricsEnabled && deletedRecordCountsHistogramEnabled + } /** Generate a default SnapshotState of a new table given the table metadata and the protocol. */ protected def initialState(metadata: Metadata, protocol: Protocol): SnapshotState = { + val deletedRecordCountsHistogramOpt = if (spark.sessionState.conf.getConf( + DeltaSQLConf.DELTA_DELETED_RECORD_COUNTS_HISTOGRAM_ENABLED)) { + Some(DeletedRecordCountsHistogramUtils.emptyHistogram) + } else None SnapshotState( sizeInBytes = 0L, numOfSetTransactions = 0L, numOfFiles = 0L, numOfRemoves = 0L, + // DV metrics are initialized to Some(0) to allow incremental computation. For tables where + // DVs are disabled, there are turned to None by the incremental computation. + numDeletedRecordsOpt = Some(0), + numDeletionVectorsOpt = Some(0), numOfMetadata = 1L, numOfProtocol = 1L, setTransactions = Nil, domainMetadata = Nil, metadata = metadata, - protocol = protocol + protocol = protocol, + deletedRecordCountsHistogramOpt = deletedRecordCountsHistogramOpt ) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 323b144c616..256f9b83c45 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1153,6 +1153,25 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_CHECKSUM_DV_METRICS_ENABLED = + buildConf("checksumDVMetrics.enabled") + .internal() + .doc(s"""When enabled, each delta transaction includes vector metrics in the checksum. + |Only applies to tables that use Deletion Vectors.""" + .stripMargin) + .booleanConf + .createWithDefault(true) + + val DELTA_DELETED_RECORD_COUNTS_HISTOGRAM_ENABLED = + buildConf("checksumDeletedRecordCountsHistogramMetrics.enabled") + .internal() + .doc(s"""When enabled, each delta transaction includes in the checksum the deleted + |record count distribution histogram for all the files. To enable this feature + |${DELTA_CHECKSUM_DV_METRICS_ENABLED.key} needs to be enabled as well. Only + |applies to tables that use Deletion Vectors.""".stripMargin) + .booleanConf + .createWithDefault(true) + val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED = buildConf("checkpoint.exceptionThrowing.enabled") .internal() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DeletedRecordCountsHistogram.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DeletedRecordCountsHistogram.scala new file mode 100644 index 00000000000..cf301d4e92a --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DeletedRecordCountsHistogram.scala @@ -0,0 +1,79 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.stats + +import java.util.Arrays + +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.types.StructType + +/** + * A Histogram class tracking the deleted record count distribution for all files in a table. + * @param deletedRecordCounts An array with 10 bins where each slot represents the number of + * files where the number of deleted records falls within the range + * of the particular bin. The range of each bin is the following: + * bin1 -> [0,0] + * bin2 -> [1,9] + * bin3 -> [10,99] + * bin4 -> [100,999], + * bin5 -> [1000,9999] + * bin6 -> [10000,99999], + * bin7 -> [100000,999999], + * bin8 -> [1000000,9999999], + * bin9 -> [10000000,Int.Max - 1], + * bin10 -> [Int.Max,Long.Max]. + */ +case class DeletedRecordCountsHistogram(deletedRecordCounts: Array[Long]) { + require(deletedRecordCounts.length == DeletedRecordCountsHistogramUtils.NUMBER_OF_BINS, + s"There should be ${DeletedRecordCountsHistogramUtils.NUMBER_OF_BINS} bins in total") + + override def hashCode(): Int = + 31 * Arrays.hashCode(deletedRecordCounts) + getClass.getCanonicalName.hashCode + + override def equals(that: Any): Boolean = that match { + case DeletedRecordCountsHistogram(thatDP) => + java.util.Arrays.equals(deletedRecordCounts, thatDP) + case _ => false + } + + /** + * Insert a given value into the appropriate histogram bin. + */ + def insert(numDeletedRecords: Long): Unit = { + if (numDeletedRecords >= 0) { + val index = DeletedRecordCountsHistogramUtils.getHistogramBin(numDeletedRecords) + deletedRecordCounts(index) += 1 + } + } + + /** + * Remove a given value from the appropriate histogram bin. + */ + def remove(numDeletedRecords: Long): Unit = { + if (numDeletedRecords >= 0) { + val index = DeletedRecordCountsHistogramUtils.getHistogramBin(numDeletedRecords) + deletedRecordCounts(index) -= 1 + } + } +} + +private[delta] object DeletedRecordCountsHistogram { + def apply(deletionPercentages: Array[Long]): DeletedRecordCountsHistogram = + new DeletedRecordCountsHistogram(deletionPercentages) + + lazy val schema: StructType = ExpressionEncoder[DeletedRecordCountsHistogram]().schema +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DeletedRecordCountsHistogramUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DeletedRecordCountsHistogramUtils.scala new file mode 100644 index 00000000000..144e317f8ec --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DeletedRecordCountsHistogramUtils.scala @@ -0,0 +1,159 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.stats + +import org.apache.spark.sql.delta.{DeltaErrors, DeltaUDF} + +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate.TypedImperativeAggregate +import org.apache.spark.sql.catalyst.trees.UnaryLike +import org.apache.spark.sql.catalyst.util.GenericArrayData +import org.apache.spark.sql.functions.udf +import org.apache.spark.sql.types.{ArrayType, DataType, LongType} +import org.apache.spark.unsafe.Platform + +/** + * This object contains helper functionality related to [[DeletedRecordCountsHistogram]]. + */ +private[delta] object DeletedRecordCountsHistogramUtils { + val BUCKET_BOUNDARIES = IndexedSeq( + 0L, 1L, 10L, 100L, 1000L, 10000L, 100000L, 1000000L, 10000000L, Int.MaxValue, Long.MaxValue) + val NUMBER_OF_BINS = BUCKET_BOUNDARIES.length - 1 + + def getDefaultBins: Array[Long] = Array.fill(NUMBER_OF_BINS)(0L) + + def emptyHistogram: DeletedRecordCountsHistogram = + DeletedRecordCountsHistogram.apply(getDefaultBins) + + def getHistogramBin(dvCardinality: Long): Int = { + import scala.collection.Searching._ + + require(dvCardinality >= 0) + + if (dvCardinality == Long.MaxValue) return NUMBER_OF_BINS - 1 + + BUCKET_BOUNDARIES.search(dvCardinality) match { + case Found(index) => + index + case InsertionPoint(insertionPoint) => + insertionPoint - 1 + } + } + + /** + * An imperative aggregate implementation of DeletedRecordCountsHistogram. + * + * The return type of this Imperative Aggregate is of ArrayType(LongType). The array + * represents a [[DeletedRecordCountsHistogram]]. + * + */ + case class DeletedRecordCountsHistogramAgg( + child: Expression, + mutableAggBufferOffset: Int = 0, + inputAggBufferOffset: Int = 0) + extends TypedImperativeAggregate[Array[Long]] + with UnaryLike[Expression] { + override def createAggregationBuffer(): Array[Long] = getDefaultBins + + override val dataType: DataType = ArrayType(LongType) + + // This Aggregate doesn't return null. + override val nullable: Boolean = false + + override protected def withNewChildInternal( + newChild: Expression): DeletedRecordCountsHistogramAgg = copy(child = newChild) + + override def update(aggBuffer: Array[Long], input: InternalRow): Array[Long] = { + val value = child.eval(input) + + if (value != null) { + val dvCardinality = value.asInstanceOf[Long] + val index = getHistogramBin(dvCardinality) + aggBuffer(index) += 1 + } + aggBuffer + } + + override def merge(buffer: Array[Long], input: Array[Long]): Array[Long] = { + require(buffer.length == input.length) + for (index <- buffer.indices) { + buffer(index) += input(index) + } + buffer + } + + override def eval(buffer: Array[Long]): Any = new GenericArrayData(buffer) + + /** Serializes the aggregation buffer to Array[Byte]. */ + override def serialize(buffer: Array[Long]): Array[Byte] = { + require(buffer.length < 128) + val bytesPerLong = 8 + // One 8bit value stores the number of elements, the remaining are bucket values. + val serializedByteSize = (buffer.length * bytesPerLong) + 1 + val byteArray = new Array[Byte](serializedByteSize) + // Add buffer length for validation. + Platform.putByte(byteArray, Platform.BYTE_ARRAY_OFFSET, buffer.length.toByte) + + for (index <- buffer.indices) { + val offset = Platform.BYTE_ARRAY_OFFSET + 1 + index * bytesPerLong + Platform.putLong(byteArray, offset, buffer(index)) + } + byteArray + } + + /** De-serializes the serialized format Array[Byte], and produces aggregation buffer. */ + override def deserialize(bytes: Array[Byte]): Array[Long] = { + val bytesPerLong = 8 + // One 8bit value stores the number of elements, the remaining are bucket values. + val numElementsFromSerializedByteSize = (bytes.length - 1) / bytesPerLong + val aggBuffer = new Array[Long](numElementsFromSerializedByteSize) + // At the first byte we store the length of the deserialized buffer for validation purposes. + val numElementsFromSerializedState = Platform.getByte(bytes, Platform.BYTE_ARRAY_OFFSET).toInt + if (numElementsFromSerializedByteSize != numElementsFromSerializedState) { + throw DeltaErrors.deletedRecordCountsHistogramDeserializationException() + } + for (index <- aggBuffer.indices) { + val offset = Platform.BYTE_ARRAY_OFFSET + 1 + index * bytesPerLong + aggBuffer(index) = Platform.getLong(bytes, offset) + } + aggBuffer + } + + override def withNewMutableAggBufferOffset(offset: Int): DeletedRecordCountsHistogramAgg = + copy(mutableAggBufferOffset = offset) + + override def withNewInputAggBufferOffset(offset: Int): DeletedRecordCountsHistogramAgg = + copy(inputAggBufferOffset = offset) + } + + /** + * A UDF to convert a long array (returned by [[DeletedRecordCountsHistogramAgg]]) to + * [[DeletedRecordCountsHistogram]]. + */ + private lazy val HistogramAggrToHistogramUDF = { + DeltaUDF.deletedRecordCountsHistogramFromArrayLong { deletedRecordCountsHistogramArray => + new DeletedRecordCountsHistogram(deletedRecordCountsHistogramArray) } + } + + def histogramAggregate(dvCardinalityExpr: Column): Column = { + val aggregate = + Column(DeletedRecordCountsHistogramAgg(dvCardinalityExpr.expr).toAggregateExpression()) + DeletedRecordCountsHistogramUtils.HistogramAggrToHistogramUDF(aggregate) + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumDVMetricsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumDVMetricsSuite.scala new file mode 100644 index 00000000000..ebc695474ff --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumDVMetricsSuite.scala @@ -0,0 +1,368 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import scala.collection.mutable + +import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.stats.{DeletedRecordCountsHistogram, DeletedRecordCountsHistogramUtils} +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.spark.sql.delta.util.DeltaEncoder + +import org.apache.spark.sql.{DataFrame, Encoder, QueryTest, Row} +import org.apache.spark.sql.functions.{coalesce, col, count, lit, sum} +import org.apache.spark.sql.test.SharedSparkSession + +case class StatsSchema( + numDeletedRecords: Long, + numDeletionVectors: Long, + deletedRecordCountsHistogramOpt: Option[DeletedRecordCountsHistogram]) + +class ChecksumDVMetricsSuite + extends QueryTest + with SharedSparkSession + with DeletionVectorsTestUtils + with DeltaSQLCommandTest { + + override def beforeAll(): Unit = { + super.beforeAll() + enableDeletionVectors(spark.conf) + } + + protected implicit def statsSchemaEncoder: Encoder[StatsSchema] = + (new DeltaEncoder[StatsSchema]).get + + /* + * Compare statistics in the checksum by comparing to the log statistics. + */ + protected def validateChecksum( + snapshot: Snapshot, + statisticsExpected: Boolean = true, + histogramEnabled: Boolean = true): Unit = { + val checksum = snapshot.checksumOpt match { + case Some(checksum) => checksum + case None => snapshot.computeChecksum + } + + if (statisticsExpected) { + val histogramAggregationOpt = + if (histogramEnabled) { + Some(DeletedRecordCountsHistogramUtils.histogramAggregate( + coalesce(col("deletionVector.cardinality"), lit(0L)) + ).as("deletedRecordCountsHistogramOpt")) + } else { + Some(lit(null) + .cast(DeletedRecordCountsHistogram.schema) + .as("deletedRecordCountsHistogramOpt")) + } + + val aggregations = Seq( + sum(coalesce(col("deletionVector.cardinality"), lit(0))).as("numDeletedRecords"), + count(col("deletionVector")).as("numDeletionVectors")) ++ + histogramAggregationOpt + + val stats = snapshot.withStatsDeduplicated + .select(aggregations: _*) + .as[StatsSchema] + .first() + + val numDeletedRecords = stats.numDeletedRecords + val numDeletionVectors = stats.numDeletionVectors + val deletionVectorHistogram = stats.deletedRecordCountsHistogramOpt + + assert(checksum.numDeletedRecordsOpt === Some(numDeletedRecords)) + assert(checksum.numDeletionVectorsOpt === Some(numDeletionVectors)) + assert(checksum.deletedRecordCountsHistogramOpt === deletionVectorHistogram) + } else { + assert(checksum.numDeletedRecordsOpt === None) + assert(checksum.numDeletionVectorsOpt === None) + assert(checksum.deletedRecordCountsHistogramOpt === None) + } + } + + protected def runMerge( + target: io.delta.tables.DeltaTable, + source: DataFrame, + deleteFromID: Int): Unit = { + target.as("t").merge(source.as("s"), "t.id = s.id") + .whenMatched(s"s.id >= ${deleteFromID}").delete() + .execute() + } + + protected def runDelete( + target: io.delta.tables.DeltaTable, + source: DataFrame = null, + deleteFromID: Int): Unit = { + target.delete(s"id >= ${deleteFromID}") + } + + protected def runUpdate( + target: io.delta.tables.DeltaTable, + source: DataFrame = null, + deleteFromID: Int): Unit = { + target.update(col("id") >= lit(deleteFromID), Map("v" -> lit(-1))) + } + + for { + enableDVsOnTableDefault <- BOOLEAN_DOMAIN + enableDVCreation <- BOOLEAN_DOMAIN + enableIncrementalCommit <- BOOLEAN_DOMAIN + allowDVsOnOperation <- BOOLEAN_DOMAIN + } test(s"Commit checksum captures DV statistics " + + s"enableDVsOnTableDefault: ${enableDVsOnTableDefault} " + + s"enableDVCreation: ${enableDVCreation} " + + s"enableIncrementalCommit: ${enableIncrementalCommit} " + + s"allowDVsOnOperation: ${allowDVsOnOperation}") { + val targetDF = createTestDF(0, 100, 2) + val sourceDF = targetDF + + val operations: Seq[(io.delta.tables.DeltaTable, DataFrame, Int) => Unit] = + Seq(runMerge, runDelete, runUpdate) + + // We validate checksum validation for different feature combinations. + for (runOperation <- operations) { + withSQLConf( + DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.defaultTablePropertyKey -> + enableDVsOnTableDefault.toString, + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> enableIncrementalCommit.toString) { + + withTempDeltaTable(targetDF, enableDVs = enableDVCreation) { (targetTable, targetLog) => + validateChecksum(targetLog.update(), enableDVCreation) + + // The first operation only deletes half the records from the second file. + runOperation(targetTable(), sourceDF, 75) + validateChecksum(targetLog.update(), enableDVCreation) + + // The second operation deletes the remaining records from the second file. + withSQLConf( + DeltaSQLConf.DELETE_USE_PERSISTENT_DELETION_VECTORS.key -> + allowDVsOnOperation.toString, + DeltaSQLConf.UPDATE_USE_PERSISTENT_DELETION_VECTORS.key -> + allowDVsOnOperation.toString) { + runOperation(targetTable(), sourceDF, 50) + validateChecksum(targetLog.update(), enableDVCreation) + } + } + } + } + } + + test(s"Verify checksum DV statistics are not produced when the relevant config is disabled") { + val targetDF = createTestDF(0, 100, 2) + + withSQLConf(DeltaSQLConf.DELTA_DELETED_RECORD_COUNTS_HISTOGRAM_ENABLED.key -> false.toString) { + withTempDeltaTable(targetDF, enableDVs = true) { (targetTable, targetLog) => + runDelete(targetTable(), deleteFromID = 75) + validateChecksum(targetLog.update(), histogramEnabled = false) + } + } + } + + for { + enableDVCreation <- BOOLEAN_DOMAIN + enableIncrementalCommit <- BOOLEAN_DOMAIN + } test(s"Checksum is backward compatible " + + s"enableDVCreation: $enableDVCreation " + + s"enableIncrementalCommit: $enableIncrementalCommit") { + val targetDF = createTestDF(0, 100, 2) + withSQLConf(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> enableIncrementalCommit.toString) { + withTempDeltaTable(targetDF, enableDVs = enableDVCreation) { (targetTable, targetLog) => + validateChecksum(targetLog.update(), statisticsExpected = enableDVCreation) + + runDelete(targetTable(), deleteFromID = 75) + validateChecksum(targetLog.update(), statisticsExpected = enableDVCreation) + + // Flip DV setting on an existing table. + enableDeletionVectorsInTable(targetLog, enable = !enableDVCreation) + + runDelete(targetTable(), deleteFromID = 50) + + // When INCREMENTAL_COMMIT_ENABLED, enabling DVs midway would normally yield + // empty stats. This is due to the incremental nature of the computation and due to the + // fact we do not store stats for tables with no DVs. However, in this scenario we try + // to take advantage any recent snapshot reconstruction and harvest the stats from there. + // In the opposite scenario, disabling DVs midway, we maintain the previously computed + // statistics so we do not lose incrementality if DVs are enabled again. + // When incremental commit is disabled, both enabling and disabling DVs + // midway is not an issue. When DVs are enabled we produce results and when DVs are + // disabled we do not. + validateChecksum(targetLog.update(), + statisticsExpected = !(enableDVCreation && !enableIncrementalCommit)) + } + } + } + + test("Checksum is computed in incremental commit when full state recomputation is triggered") { + val targetDF = createTestDF(0, 100, 2) + withSQLConf(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> true.toString, + DeltaSQLConf.INCREMENTAL_COMMIT_VERIFY.key -> false.toString, + DeltaSQLConf.INCREMENTAL_COMMIT_FORCE_VERIFY_IN_TESTS.key -> false.toString, + DeltaSQLConf.DELTA_ALL_FILES_IN_CRC_VERIFICATION_MODE_ENABLED.key -> false.toString) { + withTempDeltaTable(targetDF, enableDVs = false) { (targetTable, targetLog) => + validateChecksum(targetLog.update(), statisticsExpected = false) + + runDelete(targetTable(), deleteFromID = 75) + validateChecksum(targetLog.update(), statisticsExpected = false) + + // Flip DV setting on an existing table. + enableDeletionVectorsInTable(targetLog) + runDelete(targetTable(), deleteFromID = 60) + validateChecksum(targetLog.update(), statisticsExpected = true) + } + } + } + + for (enableIncrementalCommit <- BOOLEAN_DOMAIN) + test(s"Verify checksum validation " + + s"incrementalCommit: $enableIncrementalCommit") { + val targetDF = createTestDF(0, 100, 2) + + withSQLConf(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> enableIncrementalCommit.toString) { + withTempDeltaTable(targetDF, enableDVs = true) { (targetTable, targetLog) => + runDelete(targetTable(), deleteFromID = 75) + verifyDVsExist(targetLog, 1) + + val snapshot = targetLog.update() + assert(snapshot.validateChecksum()) + } + } + } + + for (enableIncrementalCommit <- BOOLEAN_DOMAIN) + test(s"Verify checksum validation when DVs are enabled on existing tables " + + s"incrementalCommit: $enableIncrementalCommit") { + val targetDF = createTestDF(0, 100, 2) + + withSQLConf(DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> enableIncrementalCommit.toString) { + withTempDeltaTable(targetDF, enableDVs = false) { (targetTable, targetLog) => + runDelete(targetTable(), deleteFromID = 75) + + // This validation should not take into account DV statistics. + assert(targetLog.update().validateChecksum()) + + // Enable DVs, delete with DVs and validate checksum again. + enableDeletionVectorsInTable(targetLog) + runDelete(targetTable(), deleteFromID = 60) + verifyDVsExist(targetLog, 1) + + // When incremental commit is enabled DV statistics should remain None since DVs were + // enabled midway. Checksum validation should not include DV statistics in the + // validation process. + assert(targetLog.update().validateChecksum()) + } + } + } + + test("Verify DeletedRecordsCountHistogram correctness") { + val histogram1 = DeletedRecordCountsHistogramUtils.emptyHistogram + + // Initialize histogram with 100 files. + (1 to 100).foreach(_ => histogram1.insert(0)) + assert(histogram1.deletedRecordCounts === Seq(100, 0, 0, 0, 0, 0, 0, 0, 0, 0)) + + // Simulate record deletions from 10 files. This would generate 10 RemoveFile actions with + // zero DV cardinality. Then we generate 10 add files, 5 in the range 1-9 and 5 more in + // the range 1000-9999. + (1 to 10).foreach(_ => histogram1.remove(0)) + (5 to 9).foreach(n => histogram1.insert(n)) + (1000 to 1004).foreach(n => histogram1.insert(n)) + assert(histogram1.deletedRecordCounts === Seq(90, 5, 0, 0, 5, 0, 0, 0, 0, 0)) + + (1 to 5).foreach(_ => histogram1.remove(0)) + (100000 to 100004).foreach(n => histogram1.insert(n)) + assert(histogram1.deletedRecordCounts === Seq(85, 5, 0, 0, 5, 0, 5, 0, 0, 0)) + + // Negative values should be ignored. + histogram1.insert(-123) + histogram1.remove(-14) + assert(histogram1.deletedRecordCounts === Seq(85, 5, 0, 0, 5, 0, 5, 0, 0, 0)) + + // Verify small numbers "catch all" bucket works. + (1 to 5).foreach(_ => histogram1.remove(0)) + histogram1.insert(10000000L) + histogram1.insert(422290000L) + histogram1.insert(300000999L) + assert(histogram1.deletedRecordCounts === Seq(80, 5, 0, 0, 5, 0, 5, 0, 3, 0)) + + // Verify large numbers "catch all" bucket works. + histogram1.insert(252763333339L) + assert(histogram1.deletedRecordCounts === Seq(80, 5, 0, 0, 5, 0, 5, 0, 3, 1)) + + // Check edges. + val histogram2 = DeletedRecordCountsHistogramUtils.emptyHistogram + // Bin 1. + histogram2.insert(0) + assert(histogram2.deletedRecordCounts === Seq(1, 0, 0, 0, 0, 0, 0, 0, 0, 0)) + // Bin 2. + histogram2.insert(1) + histogram2.insert(9) + assert(histogram2.deletedRecordCounts === Seq(1, 2, 0, 0, 0, 0, 0, 0, 0, 0)) + // Bin 3. + histogram2.insert(10) + histogram2.insert(99) + assert(histogram2.deletedRecordCounts === Seq(1, 2, 2, 0, 0, 0, 0, 0, 0, 0)) + // Bin 4. + histogram2.insert(100) + histogram2.insert(999) + assert(histogram2.deletedRecordCounts === Seq(1, 2, 2, 2, 0, 0, 0, 0, 0, 0)) + // Bin 5. + histogram2.insert(1000) + histogram2.insert(9999) + assert(histogram2.deletedRecordCounts === Seq(1, 2, 2, 2, 2, 0, 0, 0, 0, 0)) + // Bin 6. + histogram2.insert(10000) + histogram2.insert(99999) + assert(histogram2.deletedRecordCounts === Seq(1, 2, 2, 2, 2, 2, 0, 0, 0, 0)) + // Bin 7. + histogram2.insert(100000) + histogram2.insert(999999) + assert(histogram2.deletedRecordCounts === Seq(1, 2, 2, 2, 2, 2, 2, 0, 0, 0)) + // Bin 8. + histogram2.insert(1000000) + histogram2.insert(9999999) + assert(histogram2.deletedRecordCounts === Seq(1, 2, 2, 2, 2, 2, 2, 2, 0, 0)) + // Bin 9. + histogram2.insert(10000000) + histogram2.insert(100000000) + histogram2.insert(1000000000) + histogram2.insert(Int.MaxValue - 1) + assert(histogram2.deletedRecordCounts === Seq(1, 2, 2, 2, 2, 2, 2, 2, 4, 0)) + // Bin 10. + histogram2.insert(Int.MaxValue) + histogram2.insert(Long.MaxValue) + assert(histogram2.deletedRecordCounts === Seq(1, 2, 2, 2, 2, 2, 2, 2, 4, 2)) + } + + test("Verify DeletedRecordsCountHistogram aggregate correctness") { + import org.apache.spark.sql.delta.implicits._ + val data = Seq( + 0L, 1L, 9L, 10L, 99L, 100L, 999L, 1000L, 9999L, 10000L, 99999L, 100000L, 999999L, + 1000000L, 9999999L, 10000000L, Int.MaxValue - 1, Int.MaxValue, Long.MaxValue) + + val df = spark.createDataset(data).toDF("dvCardinality") + val histogram = df + .select(DeletedRecordCountsHistogramUtils.histogramAggregate(col("dvCardinality"))) + .first() + + val deletedRecordCounts = histogram + .getAs[Row](0) + .getAs[mutable.WrappedArray[Long]]("deletedRecordCounts") + + assert(deletedRecordCounts === Seq(1, 2, 2, 2, 2, 2, 2, 2, 2, 2)) + } +}