Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark][3.3] Make Identity Column High Water Mark updates consistent #3990

Merged
merged 2 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 183 additions & 35 deletions spark/src/main/scala/org/apache/spark/sql/delta/IdentityColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package org.apache.spark.sql.delta

import scala.collection.mutable

import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSourceUtils._
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.{DeltaFileStatistics, DeltaJobStatisticsTracker}
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.conf.Configuration
Expand All @@ -31,6 +34,17 @@ import org.apache.spark.sql.execution.datasources.WriteTaskStats
import org.apache.spark.sql.functions.{array, max, min, to_json}
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}

/**
* This object holds String constants used the field `debugInfo` for
* logging [[IdentityColumn.opTypeHighWaterMarkUpdate]].
* Each string represents an unexpected or notable event while calculating the high water mark.
*/
object IdentityColumnHighWaterMarkUpdateInfo {
val EXISTING_WATER_MARK_BEFORE_START = "existing_water_mark_before_start"
val CANDIDATE_HIGH_WATER_MARK_ROUNDED = "candidate_high_watermark_rounded"
val CANDIDATE_HIGH_WATER_MARK_BEFORE_START = "candidate_high_water_mark_before_start"
}

/**
* Provide utility methods related to IDENTITY column support for Delta.
*/
Expand All @@ -46,6 +60,8 @@ object IdentityColumn extends DeltaLogging {
val opTypeWrite = "delta.identityColumn.write"
// When IDENTITY column update causes transaction to abort.
val opTypeAbort = "delta.identityColumn.abort"
// When we update the high watermark of an IDENTITY column.
val opTypeHighWaterMarkUpdate = "delta.identityColumn.highWaterMarkUpdate"

// Return true if `field` is an identity column that allows explicit insert. Caller must ensure
// `isIdentityColumn(field)` is true.
Expand Down Expand Up @@ -134,32 +150,158 @@ object IdentityColumn extends DeltaLogging {
))
}

/** Round `value` to the next value that follows start and step configuration. */
protected[delta] def roundToNext(start: Long, step: Long, value: Long): Long = {
val valueOffset = Math.subtractExact(value, start)
if (valueOffset % step == 0) {
value
} else {
// An identity value follows the formula start + step * n. So n = (value - start) / step.
// Where n is a non-negative integer if the value respects the start.
// Since the value doesn't follow this formula, we need to ceil n.
// corrected value = start + step * ceil(n).
// However, we can't cast to Double for division because it's only accurate up to 54 bits.
// Instead, we will do a floored division and add 1.
// start + step * ((value - start) / step + 1)
val quotient = valueOffset / step
// `valueOffset` will have the same sign as `step` if `value` respects the start.
val stepMultiple = if (Math.signum(valueOffset) == Math.signum(step)) {
Math.addExact(quotient, 1L)
} else {
// Don't add one. Otherwise, we end up rounding 2 values up, which may skip the start.
quotient
}
Math.addExact(
start,
Math.multiplyExact(step, stepMultiple)
)
}
}

/**
* Update the high water mark of the IDENTITY column based on `candidateHighWaterMark`.
*
* We validate against the identity column definition (start, step) and may insert a high
* watermark that's different from `candidateHighWaterMark` if it's not valid. This method
* may also not update the high watermark if the candidate doesn't respect the start, is
* below the current watermark or is a NOOP.
*
* @param field The IDENTITY column to update.
* @param candidateHighWaterMark The candidate high water mark to update to.
* @param allowLoweringHighWaterMarkForSyncIdentity Whether to allow lowering the high water mark.
* Lowering the high water mark is NOT SAFE in
* general, but may be a valid operation in SYNC
* IDENTITY (e.g. repair a high water mark after
* a bad sync).
* @return A new `StructField` with the high water mark updated to `candidateHighWaterMark` and
* a Seq[String] that contains debug information for logging.
*/
protected[delta] def updateToValidHighWaterMark(
field: StructField,
candidateHighWaterMark: Long,
allowLoweringHighWaterMarkForSyncIdentity: Boolean
): (StructField, Seq[String]) = {
require(ColumnWithDefaultExprUtils.isIdentityColumn(field))

val info = getIdentityInfo(field)
val positiveStep = info.step > 0
val orderInStepDirection = if (positiveStep) Ordering.Long else Ordering.Long.reverse

val loggingBuffer = new mutable.ArrayBuffer[String]

// We check `candidateHighWaterMark` and not `newHighWaterMark` because
// newHighWaterMark may not be part of the column. E.g. a generated by default column
// has candidateHighWaterMark = 9, start = 10, step = 3, and previous highWaterMark = None.
// We don't want to bump the high water mark to 10 because the next value generated will
// be 13, and we'll miss the specified start entirely.
val isBeforeStart = orderInStepDirection.lt(candidateHighWaterMark, info.start)
if (isBeforeStart) {
loggingBuffer.append(
IdentityColumnHighWaterMarkUpdateInfo.CANDIDATE_HIGH_WATER_MARK_BEFORE_START)
}

// We must round on the generated by default case because the candidate may be a user inserted
// value and may not follow the identity column definition. We're not skipping this check
// for the generated always case. It's effectively a NOOP since generated always values should
// theoretically always respect the identity column definition. If the high watermark was
// wrong (for some reason), this is our chance to fix it.
val roundedCandidateHighWaterMark = roundToNext(info.start, info.step, candidateHighWaterMark)
if (roundedCandidateHighWaterMark != candidateHighWaterMark) {
loggingBuffer.append(IdentityColumnHighWaterMarkUpdateInfo.CANDIDATE_HIGH_WATER_MARK_ROUNDED)
}

// If allowLoweringHighWaterMarkForSyncIdentity is true, we can ignore the existing high water
// mark.
val newHighWaterMark = info.highWaterMark match {
case Some(oldWaterMark) if !allowLoweringHighWaterMarkForSyncIdentity =>
orderInStepDirection.max(oldWaterMark, roundedCandidateHighWaterMark)
case _ => roundedCandidateHighWaterMark
}

val tableHasBadHighWaterMark = info.highWaterMark.exists(oldWaterMark =>
orderInStepDirection.lt(oldWaterMark, info.start))
if (tableHasBadHighWaterMark) {
loggingBuffer.append(
IdentityColumnHighWaterMarkUpdateInfo.EXISTING_WATER_MARK_BEFORE_START)
}

val isChanged = !info.highWaterMark.contains(newHighWaterMark)

// If a table already has a bad high water mark, we shouldn't prevent them from updating the
// high water mark. Always try to update to newHighWaterMark, which is guaranteed to be a better
// choice than the existing one since we do a max().
// Note that means if a table has bad water mark, we can set the high water to the start due to
// the rounding logic.
// Don't update if it's before start or the high watermark is the same.
if (tableHasBadHighWaterMark || (!isBeforeStart && isChanged)) {
val newMetadata = new MetadataBuilder().withMetadata(field.metadata)
.putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark)
.build()
(field.copy(metadata = newMetadata), loggingBuffer.toIndexedSeq)
} else {
// If we don't update the high watermark, we don't need to log the update.
(field, Nil)
}
}

/**
* Return a new schema with IDENTITY high water marks updated in the schema.
* The new high watermarks are decided based on the `updatedIdentityHighWaterMarks` and old high
* watermark values present in the passed `schema`.
*/
def updateSchema(
deltaLog: DeltaLog,
schema: StructType,
updatedIdentityHighWaterMarks: Seq[(String, Long)]) : StructType = {
updatedIdentityHighWaterMarks: Seq[(String, Long)]
): StructType = {
val updatedIdentityHighWaterMarksGrouped =
updatedIdentityHighWaterMarks.groupBy(_._1).mapValues(v => v.map(_._2))
StructType(schema.map { f =>
updatedIdentityHighWaterMarksGrouped.get(DeltaColumnMapping.getPhysicalName(f)) match {
case Some(newWatermarks) if ColumnWithDefaultExprUtils.isIdentityColumn(f) =>
val oldIdentityInfo = getIdentityInfo(f)
val positiveStep = oldIdentityInfo.step > 0
val newHighWaterMark = if (positiveStep) {
oldIdentityInfo.highWaterMark.map(Math.max(_, newWatermarks.max))
.getOrElse(newWatermarks.max)
val candidateHighWaterMark = if (positiveStep) {
newWatermarks.max
} else {
oldIdentityInfo.highWaterMark.map(Math.min(_, newWatermarks.min))
.getOrElse(newWatermarks.min)
newWatermarks.min
}
val (newField, loggingSeq) = updateToValidHighWaterMark(
f, candidateHighWaterMark, allowLoweringHighWaterMarkForSyncIdentity = false)
if (loggingSeq.nonEmpty) {
recordDeltaEvent(
deltaLog = deltaLog,
opType = opTypeHighWaterMarkUpdate,
data = Map(
"columnName" -> f.name,
"debugInfo" -> loggingSeq.mkString(", "),
"oldHighWaterMark" -> oldIdentityInfo.highWaterMark,
"candidateHighWaterMark" -> candidateHighWaterMark,
"updatedFrom" -> "updateSchema"
)
)
}
val builder = new MetadataBuilder()
.withMetadata(f.metadata)
.putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark)
f.copy(metadata = builder.build())
newField
case _ =>
f
}
Expand Down Expand Up @@ -252,19 +394,12 @@ object IdentityColumn extends DeltaLogging {

// Calculate the sync'ed IDENTITY high water mark based on actual data and returns a
// potentially updated `StructField`.
def syncIdentity(field: StructField, df: DataFrame): StructField = {
// Round `value` to the next value that follows start and step configuration.
def roundToNext(start: Long, step: Long, value: Long): Long = {
if (Math.subtractExact(value, start) % step == 0) {
value
} else {
// start + step * ((value - start) / step + 1)
Math.addExact(
Math.multiplyExact(Math.addExact(Math.subtractExact(value, start) / step, 1), step),
start)
}
}

def syncIdentity(
deltaLog: DeltaLog,
field: StructField,
df: DataFrame,
allowLoweringHighWaterMarkForSyncIdentity: Boolean
): StructField = {
assert(ColumnWithDefaultExprUtils.isIdentityColumn(field))
// Run a query to get the actual high water mark (max or min value of the IDENTITY column) from
// the actual data.
Expand All @@ -274,17 +409,23 @@ object IdentityColumn extends DeltaLogging {
val resultRow = df.select(expr).collect().head

if (!resultRow.isNullAt(0)) {
val result = resultRow.getLong(0)
val isBeforeStart = if (positiveStep) result < info.start else result > info.start
val newHighWaterMark = roundToNext(info.start, info.step, result)
if (isBeforeStart || info.highWaterMark.contains(newHighWaterMark)) {
field
} else {
val newMetadata = new MetadataBuilder().withMetadata(field.metadata)
.putLong(IDENTITY_INFO_HIGHWATERMARK, newHighWaterMark)
.build()
field.copy(metadata = newMetadata)
val candidateHighWaterMark = resultRow.getLong(0)
val (newField, loggingSeq) = updateToValidHighWaterMark(
field, candidateHighWaterMark, allowLoweringHighWaterMarkForSyncIdentity)
if (loggingSeq.nonEmpty) {
recordDeltaEvent(
deltaLog = deltaLog,
opType = opTypeHighWaterMarkUpdate,
data = Map(
"columnName" -> field.name,
"debugInfo" -> loggingSeq.mkString(", "),
"oldHighWaterMark" -> info.highWaterMark,
"candidateHighWaterMark" -> candidateHighWaterMark,
"updatedFrom" -> "syncIdentity"
)
)
}
newField
} else {
field
}
Expand All @@ -295,12 +436,19 @@ object IdentityColumn extends DeltaLogging {
* been merged with the corresponding high water marks of `schemaWithHighWaterMarksToMerge`.
*/
def copySchemaWithMergedHighWaterMarks(
schemaToCopy: StructType, schemaWithHighWaterMarksToMerge: StructType): StructType = {
deltaLog: DeltaLog,
schemaToCopy: StructType,
schemaWithHighWaterMarksToMerge: StructType
): StructType = {
val newHighWatermarks = getIdentityColumns(schemaWithHighWaterMarksToMerge).flatMap { f =>
val info = getIdentityInfo(f)
info.highWaterMark.map(waterMark => DeltaColumnMapping.getPhysicalName(f) -> waterMark)
}
updateSchema(schemaToCopy, newHighWatermarks)
updateSchema(
deltaLog,
schemaToCopy,
newHighWatermarks
)
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,10 @@ trait OptimisticTransactionImpl extends TransactionalWrite
def precommitUpdateSchemaWithIdentityHighWaterMarks(): Unit = {
if (updatedIdentityHighWaterMarks.nonEmpty) {
val newSchema = IdentityColumn.updateSchema(
metadata.schema, updatedIdentityHighWaterMarks.toSeq)
deltaLog,
metadata.schema,
updatedIdentityHighWaterMarks.toSeq
)
val updatedMetadata = metadata.copy(schemaString = newSchema.json)
updateMetadataAfterWrite(updatedMetadata)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,10 @@ abstract class CloneTableBase(
.toMap
val clonedSchema =
IdentityColumn.copySchemaWithMergedHighWaterMarks(
deltaLog = targetSnapshot.deltaLog,
schemaToCopy = clonedMetadata.schema,
schemaWithHighWaterMarksToMerge = targetSnapshot.metadata.schema)
schemaWithHighWaterMarksToMerge = targetSnapshot.metadata.schema
)
clonedMetadata.copy(configuration = filteredConfiguration, schemaString = clonedSchema.json)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,10 @@ case class RestoreTableCommand(sourceTable: DeltaTableV2)
// We need to merge the schema of the latest snapshot with the schema of the snapshot
// we're restoring to ensure that the high water mark is correct.
val mergedSchema = IdentityColumn.copySchemaWithMergedHighWaterMarks(
deltaLog = deltaLog,
schemaToCopy = snapshotToRestore.metadata.schema,
schemaWithHighWaterMarksToMerge = latestSnapshot.metadata.schema)
schemaWithHighWaterMarksToMerge = latestSnapshot.metadata.schema
)

txn.updateMetadata(snapshotToRestore.metadata.copy(schemaString = mergedSchema.json))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,14 @@ case class AlterTableChangeColumnDeltaCommand(
if (syncIdentity) {
assert(oldColumn == newColumn)
val df = txn.snapshot.deltaLog.createDataFrame(txn.snapshot, txn.filterFiles())
val field = IdentityColumn.syncIdentity(newColumn, df)
val allowLoweringHighWaterMarkForSyncIdentity = sparkSession.conf
.get(DeltaSQLConf.DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK)
val field = IdentityColumn.syncIdentity(
deltaLog,
newColumn,
df,
allowLoweringHighWaterMarkForSyncIdentity
)
txn.setSyncIdentity()
txn.readWholeTable()
field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2261,6 +2261,18 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK =
buildConf("identityColumn.allowSyncIdentityToLowerHighWaterMark.enabled")
.internal()
.doc(
"""
| If true, the SYNC IDENTITY command can reduce the high water mark in a Delta IDENTITY
| column. If false, the high water mark will only be updated if it
| respects the column's specified start, step, and existing high watermark value.
|""".stripMargin)
.booleanConf
.createWithDefault(false)

///////////
// TESTING
///////////
Expand Down
Loading
Loading