Skip to content

Commit

Permalink
Enable type widening in Delta streaming source
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Jan 13, 2025
1 parent a920885 commit 070cf92
Show file tree
Hide file tree
Showing 14 changed files with 1,390 additions and 87 deletions.
16 changes: 15 additions & 1 deletion spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2309,7 +2309,7 @@
"DELTA_STREAMING_CANNOT_CONTINUE_PROCESSING_POST_SCHEMA_EVOLUTION" : {
"message" : [
"We've detected one or more non-additive schema change(s) (<opType>) between Delta version <previousSchemaChangeVersion> and <currentSchemaChangeVersion> in the Delta streaming source.",
"Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at <currentSchemaChangeVersion>.",
"Please check if you want to manually propagate the schema change(s) to the sink table before we proceed with stream processing using the finalized schema at version <currentSchemaChangeVersion>.",
"Once you have fixed the schema of the sink table or have decided there is no need to fix, you can set (one of) the following SQL configurations to unblock the non-additive schema change(s) and continue stream processing.",
"To unblock for this particular stream just for this series of schema change(s): set `<allowCkptVerKey> = <allowCkptVerValue>`.",
"To unblock for this particular stream: set `<allowCkptKey> = <allowCkptValue>`",
Expand All @@ -2318,6 +2318,20 @@
],
"sqlState" : "KD002"
},
"DELTA_STREAMING_CANNOT_CONTINUE_PROCESSING_TYPE_WIDENING" : {
"message" : [
"We've detected one or more data types being widened between Delta version <previousSchemaChangeVersion> and <currentSchemaChangeVersion>:",
"<wideningTypeChanges>",
"",
"Your streaming query contains operations that may fail or produce different results after the type change(s).",
"Please check if you want to update your streaming query before we proceed with stream processing using the finalized schema at <currentSchemaChangeVersion>.",
"Once you have updated your streaming query or have decided there is no need to update it, you can set (one of) the following SQL configurations to unblock the type change(s) and continue stream processing.",
"To unblock for this particular stream just for this series of type change(s): set `<allowCkptVerKey> = <allowCkptVerValue>`.",
"To unblock for this particular stream: set `<allowCkptKey> = <allowCkptValue>`",
"To unblock for all streams: set `<allowAllKey> = <allowAllValue>`."
],
"sqlState" : "KD002"
},
"DELTA_STREAMING_CHECK_COLUMN_MAPPING_NO_SNAPSHOT" : {
"message" : [
"Failed to obtain Delta log snapshot for the start version when checking column mapping schema changes. Please choose a different start version, or force enable streaming read at your own risk by setting '<config>' to 'true'."
Expand Down
32 changes: 32 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3130,6 +3130,38 @@ trait DeltaErrorsBase
)
}

def cannotContinueStreamingTypeWidening(
previousSchemaChangeVersion: Long,
currentSchemaChangeVersion: Long,
checkpointHash: Int,
allowAllSqlConfKey: String,
wideningTypeChanges: Seq[TypeChange]): Throwable = {

val wideningTypeChangesStr = wideningTypeChanges.map { change =>
s" ${SchemaUtils.prettyFieldName(change.fieldPath)}: ${change.fromType.sql} -> " +
s"${change.toType.sql}"
}.mkString("\n")

new DeltaRuntimeException(
errorClass = "DELTA_STREAMING_CANNOT_CONTINUE_PROCESSING_TYPE_WIDENING",
messageParameters = Array(
previousSchemaChangeVersion.toString,
currentSchemaChangeVersion.toString,
wideningTypeChangesStr,
currentSchemaChangeVersion.toString,
// Allow this stream to pass for this particular version
s"$allowAllSqlConfKey.ckpt_$checkpointHash",
currentSchemaChangeVersion.toString,
// Allow this stream to pass
s"$allowAllSqlConfKey.ckpt_$checkpointHash",
"always",
// Allow all streams to pass
allowAllSqlConfKey,
"always"
)
)
}

def cannotReconstructPathFromURI(uri: String): Throwable =
new DeltaRuntimeException(
errorClass = "DELTA_CANNOT_RECONSTRUCT_PATH_FROM_URI",
Expand Down
16 changes: 16 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,20 @@ object TypeWidening {
case (LongType, d: DecimalType) => d.isWiderThan(LongType)
case _ => false
}

/**
* Compares `from` and `to` and returns whether the type was widened, or, for nested types,
* whether one of the nested fields was widened.
*/
def containsWideningTypeChanges(from: DataType, to: DataType): Boolean = (from, to) match {
case (from: StructType, to: StructType) =>
TypeWideningMetadata.collectTypeChanges(from, to).nonEmpty
case (from: MapType, to: MapType) =>
containsWideningTypeChanges(from.keyType, to.keyType) ||
containsWideningTypeChanges(from.valueType, to.valueType)
case (from: ArrayType, to: ArrayType) =>
containsWideningTypeChanges(from.elementType, to.elementType)
case (from: AtomicType, to: AtomicType) =>
isTypeChangeSupported(from, to)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ private[delta] object TypeWideningMetadata extends DeltaLogging {
val changesToRecord = mutable.Buffer.empty[TypeChange]
val schemaWithMetadata = SchemaMergingUtils.transformColumns(schema, oldSchema) {
case (_, newField, Some(oldField), _) =>
var typeChanges = collectTypeChanges(oldField.dataType, newField.dataType)
var typeChanges = collectTypeChanges(
oldField.dataType,
newField.dataType,
logNonWideningChanges = true
)
// The version field isn't used anymore but we need to populate it in case the table uses
// the preview feature, as preview clients may then rely on the field being present.
if (txn.protocol.isFeatureSupported(TypeWideningPreviewTableFeature)) {
Expand Down Expand Up @@ -171,23 +175,48 @@ private[delta] object TypeWideningMetadata extends DeltaLogging {
schemaWithMetadata
}

/**
* Recursively compare `from` and `to` to collect all primitive type widening changes, including
* in nested structs, maps and arrays.
*/
def collectTypeChanges(from: StructType, to: StructType): Seq[TypeChange] = {
val changes = mutable.Buffer.empty[TypeChange]

SchemaMergingUtils.transformColumns(schema = to, other = from) {
case (path, newField, Some(oldField), _) =>
changes ++= collectTypeChanges(
oldField.dataType,
newField.dataType,
logNonWideningChanges = false
).map { change =>
change.copy(fieldPath = path ++ Seq(newField.name) ++ change.fieldPath)
}
newField
case (_, field, _, _) => field
}
changes
}

/**
* Recursively collect primitive type changes inside nested maps and arrays between `fromType` and
* `toType`.
*/
private def collectTypeChanges(fromType: DataType, toType: DataType)
: Seq[TypeChange] = (fromType, toType) match {
private def collectTypeChanges(
fromType: DataType,
toType: DataType,
logNonWideningChanges: Boolean): Seq[TypeChange] = (fromType, toType) match {
case (from: MapType, to: MapType) =>
collectTypeChanges(from.keyType, to.keyType).map { typeChange =>
collectTypeChanges(from.keyType, to.keyType, logNonWideningChanges).map { typeChange =>
typeChange.copy(fieldPath = "key" +: typeChange.fieldPath)
} ++
collectTypeChanges(from.valueType, to.valueType).map { typeChange =>
collectTypeChanges(from.valueType, to.valueType, logNonWideningChanges).map { typeChange =>
typeChange.copy(fieldPath = "value" +: typeChange.fieldPath)
}
case (from: ArrayType, to: ArrayType) =>
collectTypeChanges(from.elementType, to.elementType).map { typeChange =>
typeChange.copy(fieldPath = "element" +: typeChange.fieldPath)
}
collectTypeChanges(from.elementType, to.elementType, logNonWideningChanges)
.map { typeChange =>
typeChange.copy(fieldPath = "element" +: typeChange.fieldPath)
}
case (fromType: AtomicType, toType: AtomicType) if fromType != toType &&
TypeWidening.isTypeChangeSupported(fromType, toType) =>
Seq(TypeChange(
Expand All @@ -202,7 +231,7 @@ private[delta] object TypeWideningMetadata extends DeltaLogging {
case (fromType: AtomicType, toType: AtomicType) if isStringTypeChange(fromType, toType) =>
Seq.empty
case (_: AtomicType, _: AtomicType) =>
deltaAssert(fromType == toType,
deltaAssert(!logNonWideningChanges || fromType == toType,
name = "typeWidening.unexpectedTypeChange",
msg = s"Trying to apply an unsupported type change: $fromType to $toType",
data = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ object TypeWideningMode {
override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean = false
}

/** All supported type widening changes are allowed. */
case object AllTypeWidening extends TypeWideningMode {
override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean =
TypeWidening.isTypeChangeSupported(fromType = fromType, toType = toType)
}

/**
* Type changes that are eligible to be applied automatically during schema evolution are allowed.
* Can be restricted to only type changes supported by Iceberg.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,10 @@ def normalizeColumnNamesInDataType(
* As the Delta snapshots update, the schema may change as well. This method defines whether the
* new schema of a Delta table can be used with a previously analyzed LogicalPlan. Our
* rules are to return false if:
* - Dropping any column that was present in the existing schema, if not allowMissingColumns
* - Dropping any column that was present in the existing schema, if not
* allowMissingColumns/allowMissingStructFields.
* Note: for historical reasons, this is configured separately for top-level columns
* (allowMissingColumns) and nested struct fields (allowMissingStructFields).
* - Any change of datatype, unless eligible for widening. The caller specifies eligible type
* changes via `typeWideningMode`.
* - Change of partition columns. Although analyzed LogicalPlan is not changed,
Expand All @@ -403,6 +406,7 @@ def normalizeColumnNamesInDataType(
readSchema: StructType,
forbidTightenNullability: Boolean = false,
allowMissingColumns: Boolean = false,
allowMissingStructFields: Boolean = false,
typeWideningMode: TypeWideningMode = TypeWideningMode.NoTypeWidening,
newPartitionColumns: Seq[String] = Seq.empty,
oldPartitionColumns: Seq[String] = Seq.empty): Boolean = {
Expand All @@ -418,7 +422,12 @@ def normalizeColumnNamesInDataType(
def isDatatypeReadCompatible(existing: DataType, newtype: DataType): Boolean = {
(existing, newtype) match {
case (e: StructType, n: StructType) =>
isReadCompatible(e, n, forbidTightenNullability, typeWideningMode = typeWideningMode)
isReadCompatible(e, n,
forbidTightenNullability,
typeWideningMode = typeWideningMode,
allowMissingColumns = allowMissingStructFields,
allowMissingStructFields = allowMissingStructFields
)
case (e: ArrayType, n: ArrayType) =>
// if existing elements are non-nullable, so should be the new element
isNullabilityCompatible(e.containsNull, n.containsNull) &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,22 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val DELTA_ALLOW_TYPE_WIDENING_STREAMING_SOURCE =
buildConf("typeWidening.allowTypeChangeStreamingDeltaSource")
.doc("Accept incoming widening type changes when streaming from a Delta source.")
.internal()
.booleanConf
.createWithDefault(true)

val DELTA_TYPE_WIDENING_BYPASS_STREAMING_TYPE_CHANGE_CHECK =
buildConf("typeWidening.bypassStreamingTypeChangeCheck")
.doc("Controls the check performed when a type change is detected when streaming from a " +
"Delta source. This check fails the streaming query in case a type change may impact the " +
"semantics of the query and requests user intervention.")
.internal()
.booleanConf
.createWithDefault(false)

/**
* Internal config to bypass check that prevents applying type changes that are not supported by
* Iceberg when Uniform is enabled with Iceberg compatibility.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,18 @@ trait DeltaSourceBase extends Source
protected val isStreamingFromColumnMappingTable: Boolean =
snapshotAtSourceInit.metadata.columnMappingMode != NoMapping

/**
* Internal flag to allow widening type changes to be propagated from Delta sources.
*/
protected lazy val allowTypeWidening: Boolean =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_ALLOW_TYPE_WIDENING_STREAMING_SOURCE)

/**
* Whether we are streaming from a table that supports the type widening table feature.
*/
protected val isStreamingFromTypeWideningTable: Boolean =
TypeWidening.isSupported(snapshotAtSourceInit.protocol)

/**
* The persisted schema from the schema log that must be used to read data files in this Delta
* streaming source.
Expand Down Expand Up @@ -561,6 +573,7 @@ trait DeltaSourceBase extends Source

// Perform schema check if we need to, considering all escape flags.
if (!allowUnsafeStreamingReadOnColumnMappingSchemaChanges ||
(allowTypeWidening && isStreamingFromTypeWideningTable) ||
!forceEnableStreamingReadOnReadIncompatibleSchemaChangesDuringStreamStart) {
startVersionSnapshotOpt.foreach { snapshot =>
checkReadIncompatibleSchemaChanges(
Expand Down Expand Up @@ -620,18 +633,25 @@ trait DeltaSourceBase extends Source
(metadata, snapshotAtSourceInit.metadata)
}

// Column mapping schema changes
if (!allowUnsafeStreamingReadOnColumnMappingSchemaChanges) {
assert(!trackingMetadataChange, "should not check schema change while tracking it")

if (!DeltaColumnMapping.hasNoColumnMappingSchemaChanges(newMetadata, oldMetadata,
allowUnsafeStreamingReadOnPartitionColumnChanges)) {
throw DeltaErrors.blockStreamingReadsWithIncompatibleColumnMappingSchemaChanges(
spark,
oldMetadata.schema,
newMetadata.schema,
detectedDuringStreaming = !validatedDuringStreamStart)
def shouldTrackSchema: Boolean =
if (isStreamingFromTypeWideningTable && allowTypeWidening &&
TypeWidening.containsWideningTypeChanges(oldMetadata.schema, newMetadata.schema)) {
true
} else if (allowUnsafeStreamingReadOnColumnMappingSchemaChanges) {
false
} else {
// Column mapping schema changes
assert(!trackingMetadataChange, "should not check schema change while tracking it")
!DeltaColumnMapping.hasNoColumnMappingSchemaChanges(newMetadata, oldMetadata,
allowUnsafeStreamingReadOnPartitionColumnChanges)
}

if (shouldTrackSchema) {
throw DeltaErrors.blockStreamingReadsWithIncompatibleColumnMappingSchemaChanges(
spark,
oldMetadata.schema,
newMetadata.schema,
detectedDuringStreaming = !validatedDuringStreamStart)
}

// Other standard read compatibility changes
Expand Down
Loading

0 comments on commit 070cf92

Please sign in to comment.