Skip to content

Commit

Permalink
[Spark][3.3] Make Identity Column High Water Mark updates consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
c27kwan committed Dec 19, 2024
1 parent 6177288 commit 297f2a7
Show file tree
Hide file tree
Showing 9 changed files with 753 additions and 44 deletions.
211 changes: 176 additions & 35 deletions spark/src/main/scala/org/apache/spark/sql/delta/IdentityColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package org.apache.spark.sql.delta

import scala.collection.mutable

import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSourceUtils._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.{DeltaFileStatistics, DeltaJobStatisticsTracker}
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.conf.Configuration
Expand All @@ -31,6 +34,17 @@ import org.apache.spark.sql.execution.datasources.WriteTaskStats
import org.apache.spark.sql.functions.{array, max, min, to_json}
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}

/**
* This object holds String constants used the field `debugInfo` for
* logging [[IdentityColumn.opTypeHighWaterMarkUpdate]].
* Each string represents an unexpected or notable event while calculating the high water mark.
*/
object IdentityColumnHighWaterMarkUpdateInfo {
val EXISTING_WATER_MARK_BEFORE_START = "existing_water_mark_before_start"
val CANDIDATE_HIGH_WATER_MARK_ROUNDED = "candidate_high_watermark_rounded"
val CANDIDATE_HIGH_WATER_MARK_BEFORE_START = "candidate_high_water_mark_before_start"
}

/**
* Provide utility methods related to IDENTITY column support for Delta.
*/
Expand All @@ -46,6 +60,8 @@ object IdentityColumn extends DeltaLogging {
val opTypeWrite = "delta.identityColumn.write"
// When IDENTITY column update causes transaction to abort.
val opTypeAbort = "delta.identityColumn.abort"
// When we update the high watermark of an IDENTITY column.
val opTypeHighWaterMarkUpdate = "delta.identityColumn.highWaterMarkUpdate"

// Return true if `field` is an identity column that allows explicit insert. Caller must ensure
// `isIdentityColumn(field)` is true.
Expand Down Expand Up @@ -134,32 +150,151 @@ object IdentityColumn extends DeltaLogging {
))
}

/** Round `value` to the next value that follows start and step configuration. */
protected[delta] def roundToNext(start: Long, step: Long, value: Long): Long = {
val valueOffset = Math.subtractExact(value, start)
if (valueOffset % step == 0) {
value
} else {
// An identity value follows the formula start + step * n. So n = (value - start) / step.
// Since the value doesn't follow this formula, we need to ceil n.
// corrected value = start + step * ceil(n).
// However, we can't cast to Double for division because it's only accurate up to 54 bits.
// Instead, we will do a floored division and add 1 if it's a positive step or -1 if
// it is a negative step.
// start + step * ((value - start) / step + 1)
val stepMultiple = (valueOffset / step) + Math.signum(step).toInt
Math.addExact(
start,
Math.multiplyExact(step, stepMultiple)
)
}
}

/**
* Update the high water mark of the IDENTITY column based on `candidateHighWaterMark`.
*
* We validate against the identity column definition (start, step) and may insert a high
* watermark that's different from `candidateHighWaterMark` if it's not valid. This method
* may also not update the high watermark if the candidate doesn't respect the start, is
* below the current watermark or is a NOOP.
*
* @param field The IDENTITY column to update.
* @param candidateHighWaterMark The candidate high water mark to update to.
* @param allowLoweringHighWaterMarkForSyncIdentity Whether to allow lowering the high water mark.
* Lowering the high water mark is NOT SAFE in
* general, but may be a valid operation in SYNC
* IDENTITY (e.g. repair a high water mark after
* a bad sync).
* @return A new `StructField` with the high water mark updated to `candidateHighWaterMark` and
* a Seq[String] that contains debug information for logging.
*/
protected[delta] def updateToValidHighWaterMark(
field: StructField,
candidateHighWaterMark: Long,
allowLoweringHighWaterMarkForSyncIdentity: Boolean
): (StructField, Seq[String]) = {
require(ColumnWithDefaultExprUtils.isIdentityColumn(field))

val info = getIdentityInfo(field)
val positiveStep = info.step > 0
val orderInStepDirection = if (positiveStep) Ordering.Long else Ordering.Long.reverse

val loggingBuffer = new mutable.ArrayBuffer[String]

// We check `candidateHighWaterMark` and not `newHighWaterMark` because
// newHighWaterMark may not be part of the column. E.g. a generated by default column
// has candidateHighWaterMark = 9, start = 10, step = 3, and previous highWaterMark = None.
// We don't want to bump the high water mark to 10 because the next value generated will
// be 13, and we'll miss the specified start entirely.
val isBeforeStart = orderInStepDirection.lt(candidateHighWaterMark, info.start)
if (isBeforeStart) {
loggingBuffer.append(
IdentityColumnHighWaterMarkUpdateInfo.CANDIDATE_HIGH_WATER_MARK_BEFORE_START)
}

// We must round on the generated by default case because the candidate may be a user inserted
// value and may not follow the identity column definition. We're not skipping this check
// for the generated always case. It's effectively a NOOP since generated always values should
// theoretically always respect the identity column definition. If the high watermark was
// wrong (for some reason), this is our chance to fix it.
val roundedCandidateHighWaterMark = roundToNext(info.start, info.step, candidateHighWaterMark)
if (roundedCandidateHighWaterMark != candidateHighWaterMark) {
loggingBuffer.append(IdentityColumnHighWaterMarkUpdateInfo.CANDIDATE_HIGH_WATER_MARK_ROUNDED)
}

// If allowLoweringHighWaterMarkForSyncIdentity is true, we can ignore the existing high water
// mark.
val newHighWaterMark = info.highWaterMark match {
case Some(oldWaterMark) if !allowLoweringHighWaterMarkForSyncIdentity =>
orderInStepDirection.max(oldWaterMark, roundedCandidateHighWaterMark)
case _ => roundedCandidateHighWaterMark
}

val tableHasBadHighWaterMark = info.highWaterMark.exists(oldWaterMark =>
orderInStepDirection.lt(oldWaterMark, info.start))
if (tableHasBadHighWaterMark) {
loggingBuffer.append(
IdentityColumnHighWaterMarkUpdateInfo.EXISTING_WATER_MARK_BEFORE_START)
}

val isChanged = !info.highWaterMark.contains(newHighWaterMark)

// If a table already has a bad high water mark, we shouldn't prevent them from updating the
// high water mark. Always try to update to newHighWaterMark, which is guaranteed to be a better
// choice than the existing one since we do a max().
// Note that means if a table has bad water mark, we can set the high water to the start due to
// the rounding logic.
// Don't update if it's before start or the high watermark is the same.
if (tableHasBadHighWaterMark || (!isBeforeStart && isChanged)) {
val newMetadata = new MetadataBuilder().withMetadata(field.metadata)
.putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark)
.build()
(field.copy(metadata = newMetadata), loggingBuffer.toIndexedSeq)
} else {
// If we don't update the high watermark, we don't need to log the update.
(field, Nil)
}
}

/**
* Return a new schema with IDENTITY high water marks updated in the schema.
* The new high watermarks are decided based on the `updatedIdentityHighWaterMarks` and old high
* watermark values present in the passed `schema`.
*/
def updateSchema(
deltaLog: DeltaLog,
schema: StructType,
updatedIdentityHighWaterMarks: Seq[(String, Long)]) : StructType = {
updatedIdentityHighWaterMarks: Seq[(String, Long)]
): StructType = {
val updatedIdentityHighWaterMarksGrouped =
updatedIdentityHighWaterMarks.groupBy(_._1).mapValues(v => v.map(_._2))
StructType(schema.map { f =>
updatedIdentityHighWaterMarksGrouped.get(DeltaColumnMapping.getPhysicalName(f)) match {
case Some(newWatermarks) if ColumnWithDefaultExprUtils.isIdentityColumn(f) =>
val oldIdentityInfo = getIdentityInfo(f)
val positiveStep = oldIdentityInfo.step > 0
val newHighWaterMark = if (positiveStep) {
oldIdentityInfo.highWaterMark.map(Math.max(_, newWatermarks.max))
.getOrElse(newWatermarks.max)
val candidateHighWaterMark = if (positiveStep) {
newWatermarks.max
} else {
oldIdentityInfo.highWaterMark.map(Math.min(_, newWatermarks.min))
.getOrElse(newWatermarks.min)
newWatermarks.min
}
val (newField, loggingSeq) = updateToValidHighWaterMark(
f, candidateHighWaterMark, allowLoweringHighWaterMarkForSyncIdentity = false)
if (loggingSeq.nonEmpty) {
recordDeltaEvent(
deltaLog = deltaLog,
opType = opTypeHighWaterMarkUpdate,
data = Map(
"columnName" -> f.name,
"debugInfo" -> loggingSeq.mkString(", "),
"oldHighWaterMark" -> oldIdentityInfo.highWaterMark,
"candidateHighWaterMark" -> candidateHighWaterMark,
"updatedFrom" -> "updateSchema"
)
)
}
val builder = new MetadataBuilder()
.withMetadata(f.metadata)
.putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark)
f.copy(metadata = builder.build())
newField
case _ =>
f
}
Expand Down Expand Up @@ -252,19 +387,12 @@ object IdentityColumn extends DeltaLogging {

// Calculate the sync'ed IDENTITY high water mark based on actual data and returns a
// potentially updated `StructField`.
def syncIdentity(field: StructField, df: DataFrame): StructField = {
// Round `value` to the next value that follows start and step configuration.
def roundToNext(start: Long, step: Long, value: Long): Long = {
if (Math.subtractExact(value, start) % step == 0) {
value
} else {
// start + step * ((value - start) / step + 1)
Math.addExact(
Math.multiplyExact(Math.addExact(Math.subtractExact(value, start) / step, 1), step),
start)
}
}

def syncIdentity(
deltaLog: DeltaLog,
field: StructField,
df: DataFrame,
allowLoweringHighWaterMarkForSyncIdentity: Boolean
): StructField = {
assert(ColumnWithDefaultExprUtils.isIdentityColumn(field))
// Run a query to get the actual high water mark (max or min value of the IDENTITY column) from
// the actual data.
Expand All @@ -274,17 +402,23 @@ object IdentityColumn extends DeltaLogging {
val resultRow = df.select(expr).collect().head

if (!resultRow.isNullAt(0)) {
val result = resultRow.getLong(0)
val isBeforeStart = if (positiveStep) result < info.start else result > info.start
val newHighWaterMark = roundToNext(info.start, info.step, result)
if (isBeforeStart || info.highWaterMark.contains(newHighWaterMark)) {
field
} else {
val newMetadata = new MetadataBuilder().withMetadata(field.metadata)
.putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark)
.build()
field.copy(metadata = newMetadata)
val candidateHighWaterMark = resultRow.getLong(0)
val (newField, loggingSeq) = updateToValidHighWaterMark(
field, candidateHighWaterMark, allowLoweringHighWaterMarkForSyncIdentity)
if (loggingSeq.nonEmpty) {
recordDeltaEvent(
deltaLog = deltaLog,
opType = opTypeHighWaterMarkUpdate,
data = Map(
"columnName" -> field.name,
"debugInfo" -> loggingSeq.mkString(", "),
"oldHighWaterMark" -> info.highWaterMark,
"candidateHighWaterMark" -> candidateHighWaterMark,
"updatedFrom" -> "syncIdentity"
)
)
}
newField
} else {
field
}
Expand All @@ -295,12 +429,19 @@ object IdentityColumn extends DeltaLogging {
* been merged with the corresponding high water marks of `schemaWithHighWaterMarksToMerge`.
*/
def copySchemaWithMergedHighWaterMarks(
schemaToCopy: StructType, schemaWithHighWaterMarksToMerge: StructType): StructType = {
deltaLog: DeltaLog,
schemaToCopy: StructType,
schemaWithHighWaterMarksToMerge: StructType
): StructType = {
val newHighWatermarks = getIdentityColumns(schemaWithHighWaterMarksToMerge).flatMap { f =>
val info = getIdentityInfo(f)
info.highWaterMark.map(waterMark => DeltaColumnMapping.getPhysicalName(f) -> waterMark)
}
updateSchema(schemaToCopy, newHighWatermarks)
updateSchema(
deltaLog,
schemaToCopy,
newHighWatermarks
)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
def precommitUpdateSchemaWithIdentityHighWaterMarks(): Unit = {
if (updatedIdentityHighWaterMarks.nonEmpty) {
val newSchema = IdentityColumn.updateSchema(
metadata.schema, updatedIdentityHighWaterMarks.toSeq)
deltaLog,
metadata.schema,
updatedIdentityHighWaterMarks.toSeq
)
val updatedMetadata = metadata.copy(schemaString = newSchema.json)
updateMetadataAfterWrite(updatedMetadata)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,10 @@ abstract class CloneTableBase(
.toMap
val clonedSchema =
IdentityColumn.copySchemaWithMergedHighWaterMarks(
deltaLog = targetSnapshot.deltaLog,
schemaToCopy = clonedMetadata.schema,
schemaWithHighWaterMarksToMerge = targetSnapshot.metadata.schema)
schemaWithHighWaterMarksToMerge = targetSnapshot.metadata.schema
)
clonedMetadata.copy(configuration = filteredConfiguration, schemaString = clonedSchema.json)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ case class RestoreTableCommand(sourceTable: DeltaTableV2)
// We need to merge the schema of the latest snapshot with the schema of the snapshot
// we're restoring to ensure that the high water mark is correct.
val mergedSchema = IdentityColumn.copySchemaWithMergedHighWaterMarks(
deltaLog = deltaLog,
schemaToCopy = snapshotToRestore.metadata.schema,
schemaWithHighWaterMarksToMerge = latestSnapshot.metadata.schema)
schemaWithHighWaterMarksToMerge = latestSnapshot.metadata.schema
)

txn.updateMetadata(snapshotToRestore.metadata.copy(schemaString = mergedSchema.json))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,14 @@ case class AlterTableChangeColumnDeltaCommand(
if (syncIdentity) {
assert(oldColumn == newColumn)
val df = txn.snapshot.deltaLog.createDataFrame(txn.snapshot, txn.filterFiles())
val field = IdentityColumn.syncIdentity(newColumn, df)
val allowLoweringHighWaterMarkForSyncIdentity = sparkSession.conf
.get(DeltaSQLConf.DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK)
val field = IdentityColumn.syncIdentity(
deltaLog,
newColumn,
df,
allowLoweringHighWaterMarkForSyncIdentity
)
txn.setSyncIdentity()
txn.readWholeTable()
field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2261,6 +2261,18 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK =
buildConf("identityColumn.allowSyncIdentityToLowerHighWaterMark.enabled")
.internal()
.doc(
"""
| If true, the SYNC IDENTITY command can reduce the high water mark in a Delta IDENTITY
| column. If false, the high water mark will only be updated if it
| respects the column's specified start, step, and existing high watermark value.
|""".stripMargin)
.booleanConf
.createWithDefault(false)

///////////
// TESTING
///////////
Expand Down
Loading

0 comments on commit 297f2a7

Please sign in to comment.