diff --git a/iceberg/src/test/scala/org/apache/spark/sql/delta/uniform/TypeWideningUniformSuite.scala b/iceberg/src/test/scala/org/apache/spark/sql/delta/uniform/TypeWideningUniformSuite.scala new file mode 100644 index 00000000000..05c3d5793e6 --- /dev/null +++ b/iceberg/src/test/scala/org/apache/spark/sql/delta/uniform/TypeWideningUniformSuite.scala @@ -0,0 +1,24 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.uniform + +import org.apache.spark.sql.delta.typewidening.TypeWideningUniformTests + +/** + * Suite running Uniform Iceberg + type widening tests against HMS. + */ +class TypeWideningUniformSuite extends TypeWideningUniformTests with WriteDeltaHMSReadIceberg diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index c028865d2e7..c193b6a15b9 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -1221,6 +1221,12 @@ "" ] }, + "UNSUPPORTED_TYPE_WIDENING" : { + "message" : [ + "IcebergCompatV is incompatible with a type change applied to this table:", + "Field was changed from to ." + ] + }, "VERSION_MUTUAL_EXCLUSIVE" : { "message" : [ "Only one IcebergCompat version can be enabled, please explicitly disable all other IcebergCompat versions that are not needed." diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 012d88a036a..f2a2e1168ed 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -82,8 +82,8 @@ class DeltaAnalysis(session: SparkSession) override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown { // INSERT INTO by ordinal and df.insertInto() case a @ AppendDelta(r, d) if !a.isByName && - needsSchemaAdjustmentByOrdinal(d, a.query, r.schema) => - val projection = resolveQueryColumnsByOrdinal(a.query, r.output, d) + needsSchemaAdjustmentByOrdinal(d, a.query, r.schema, a.writeOptions) => + val projection = resolveQueryColumnsByOrdinal(a.query, r.output, d, a.writeOptions) if (projection != a.query) { a.copy(query = projection) } else { @@ -94,9 +94,9 @@ class DeltaAnalysis(session: SparkSession) // INSERT INTO by name // AppendData.byName is also used for DataFrame append so we check for the SQL origin text // since we only want to up-cast for SQL insert into by name - case a @ AppendDelta(r, d) if a.isByName && - a.origin.sqlText.nonEmpty && needsSchemaAdjustmentByName(a.query, r.output, d) => - val projection = resolveQueryColumnsByName(a.query, r.output, d) + case a @ AppendDelta(r, d) if a.isByName && a.origin.sqlText.nonEmpty && + needsSchemaAdjustmentByName(a.query, r.output, d, a.writeOptions) => + val projection = resolveQueryColumnsByName(a.query, r.output, d, a.writeOptions) if (projection != a.query) { a.copy(query = projection) } else { @@ -214,8 +214,8 @@ class DeltaAnalysis(session: SparkSession) // INSERT OVERWRITE by ordinal and df.insertInto() case o @ OverwriteDelta(r, d) if !o.isByName && - needsSchemaAdjustmentByOrdinal(d, o.query, r.schema) => - val projection = resolveQueryColumnsByOrdinal(o.query, r.output, d) + needsSchemaAdjustmentByOrdinal(d, o.query, r.schema, o.writeOptions) => + val projection = resolveQueryColumnsByOrdinal(o.query, r.output, d, o.writeOptions) if (projection != o.query) { val aliases = AttributeMap(o.query.output.zip(projection.output).collect { case (l: AttributeReference, r: AttributeReference) if !l.sameRef(r) => (l, r) @@ -231,9 +231,9 @@ class DeltaAnalysis(session: SparkSession) // INSERT OVERWRITE by name // OverwriteDelta.byName is also used for DataFrame append so we check for the SQL origin text // since we only want to up-cast for SQL insert into by name - case o @ OverwriteDelta(r, d) if o.isByName && - o.origin.sqlText.nonEmpty && needsSchemaAdjustmentByName(o.query, r.output, d) => - val projection = resolveQueryColumnsByName(o.query, r.output, d) + case o @ OverwriteDelta(r, d) if o.isByName && o.origin.sqlText.nonEmpty && + needsSchemaAdjustmentByName(o.query, r.output, d, o.writeOptions) => + val projection = resolveQueryColumnsByName(o.query, r.output, d, o.writeOptions) if (projection != o.query) { val aliases = AttributeMap(o.query.output.zip(projection.output).collect { case (l: AttributeReference, r: AttributeReference) if !l.sameRef(r) => (l, r) @@ -251,15 +251,15 @@ class DeltaAnalysis(session: SparkSession) case o @ DynamicPartitionOverwriteDelta(r, d) if o.resolved => val adjustedQuery = if (!o.isByName && - needsSchemaAdjustmentByOrdinal(d, o.query, r.schema)) { + needsSchemaAdjustmentByOrdinal(d, o.query, r.schema, o.writeOptions)) { // INSERT OVERWRITE by ordinal and df.insertInto() - resolveQueryColumnsByOrdinal(o.query, r.output, d) + resolveQueryColumnsByOrdinal(o.query, r.output, d, o.writeOptions) } else if (o.isByName && o.origin.sqlText.nonEmpty && - needsSchemaAdjustmentByName(o.query, r.output, d)) { + needsSchemaAdjustmentByName(o.query, r.output, d, o.writeOptions)) { // INSERT OVERWRITE by name // OverwriteDelta.byName is also used for DataFrame append so we check for the SQL origin // text since we only want to up-cast for SQL insert into by name - resolveQueryColumnsByName(o.query, r.output, d) + resolveQueryColumnsByName(o.query, r.output, d, o.writeOptions) } else { o.query } @@ -864,13 +864,16 @@ class DeltaAnalysis(session: SparkSession) * type column/field. */ private def resolveQueryColumnsByOrdinal( - query: LogicalPlan, targetAttrs: Seq[Attribute], deltaTable: DeltaTableV2): LogicalPlan = { + query: LogicalPlan, + targetAttrs: Seq[Attribute], + deltaTable: DeltaTableV2, + writeOptions: Map[String, String]): LogicalPlan = { // always add a Cast. it will be removed in the optimizer if it is unnecessary. val project = query.output.zipWithIndex.map { case (attr, i) => if (i < targetAttrs.length) { val targetAttr = targetAttrs(i) addCastToColumn(attr, targetAttr, deltaTable.name(), - allowTypeWidening = allowTypeWidening(deltaTable) + typeWideningMode = getTypeWideningMode(deltaTable, writeOptions) ) } else { attr @@ -889,7 +892,10 @@ class DeltaAnalysis(session: SparkSession) * columns. */ private def resolveQueryColumnsByName( - query: LogicalPlan, targetAttrs: Seq[Attribute], deltaTable: DeltaTableV2): LogicalPlan = { + query: LogicalPlan, + targetAttrs: Seq[Attribute], + deltaTable: DeltaTableV2, + writeOptions: Map[String, String]): LogicalPlan = { insertIntoByNameMissingColumn(query, targetAttrs, deltaTable) // This is called before resolveOutputColumns in postHocResolutionRules, so we need to duplicate @@ -907,7 +913,7 @@ class DeltaAnalysis(session: SparkSession) throw DeltaErrors.missingColumn(attr, targetAttrs) } addCastToColumn(attr, targetAttr, deltaTable.name(), - allowTypeWidening = allowTypeWidening(deltaTable) + typeWideningMode = getTypeWideningMode(deltaTable, writeOptions) ) } Project(project, query) @@ -917,25 +923,25 @@ class DeltaAnalysis(session: SparkSession) attr: NamedExpression, targetAttr: NamedExpression, tblName: String, - allowTypeWidening: Boolean): NamedExpression = { + typeWideningMode: TypeWideningMode): NamedExpression = { val expr = (attr.dataType, targetAttr.dataType) match { case (s, t) if s == t => attr case (s: StructType, t: StructType) if s != t => - addCastsToStructs(tblName, attr, s, t, allowTypeWidening) + addCastsToStructs(tblName, attr, s, t, typeWideningMode) case (ArrayType(s: StructType, sNull: Boolean), ArrayType(t: StructType, tNull: Boolean)) - if s != t && sNull == tNull => - addCastsToArrayStructs(tblName, attr, s, t, sNull, allowTypeWidening) + if s != t && sNull == tNull => + addCastsToArrayStructs(tblName, attr, s, t, sNull, typeWideningMode) case (s: AtomicType, t: AtomicType) - if allowTypeWidening && TypeWidening.isTypeChangeSupportedForSchemaEvolution(t, s) => + if typeWideningMode.shouldWidenType(fromType = t, toType = s) => // Keep the type from the query, the target schema will be updated to widen the existing // type to match it. attr case (s: MapType, t: MapType) - if !DataType.equalsStructurally(s, t, ignoreNullability = true) || allowTypeWidening => - // only trigger addCastsToMaps if exists differences like extra fields, renaming - // Or allowTypeWidening is enabled - addCastsToMaps(tblName, attr, s, t, allowTypeWidening) + if !DataType.equalsStructurally(s, t, ignoreNullability = true) => + // only trigger addCastsToMaps if exists differences like extra fields, renaming or type + // differences. + addCastsToMaps(tblName, attr, s, t, typeWideningMode) case _ => getCastFunction(attr, targetAttr.dataType, targetAttr.name) } @@ -943,16 +949,26 @@ class DeltaAnalysis(session: SparkSession) } /** - * Whether inserting values that have a wider type than the table has is allowed. In that case, - * values are not downcasted to the current table type and the table schema is updated instead to - * use the wider type. + * Returns the type widening mode to use for the given delta table. A type widening mode indicates + * for (fromType, toType) tuples whether `fromType` is eligible to be automatically widened to + * `toType` when ingesting data. If it is, the table schema is updated to `toType` before + * ingestion and values are written using their original `toType` type. Otherwise, the table type + * `fromType` is retained and values are downcasted on write. */ - private def allowTypeWidening(deltaTable: DeltaTableV2): Boolean = { - val options = new DeltaOptions(Map.empty[String, String], conf) - options.canMergeSchema && TypeWidening.isEnabled( - deltaTable.initialSnapshot.protocol, - deltaTable.initialSnapshot.metadata - ) + private def getTypeWideningMode( + deltaTable: DeltaTableV2, + writeOptions: Map[String, String]): TypeWideningMode = { + val options = new DeltaOptions(deltaTable.options ++ writeOptions, conf) + val snapshot = deltaTable.initialSnapshot + val typeWideningEnabled = TypeWidening.isEnabled(snapshot.protocol, snapshot.metadata) + val schemaEvolutionEnabled = options.canMergeSchema + + if (typeWideningEnabled && schemaEvolutionEnabled) { + TypeWideningMode.TypeEvolution( + uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(snapshot.metadata)) + } else { + TypeWideningMode.NoTypeWidening + } } /** @@ -963,7 +979,8 @@ class DeltaAnalysis(session: SparkSession) private def needsSchemaAdjustmentByOrdinal( deltaTable: DeltaTableV2, query: LogicalPlan, - schema: StructType): Boolean = { + schema: StructType, + writeOptions: Map[String, String]): Boolean = { val output = query.output if (output.length < schema.length) { throw DeltaErrors.notEnoughColumnsInInsert(deltaTable.name(), output.length, schema.length) @@ -973,7 +990,7 @@ class DeltaAnalysis(session: SparkSession) val existingSchemaOutput = output.take(schema.length) existingSchemaOutput.map(_.name) != schema.map(_.name) || !SchemaUtils.isReadCompatible(schema.asNullable, existingSchemaOutput.toStructType, - allowTypeWidening = allowTypeWidening(deltaTable)) + typeWideningMode = getTypeWideningMode(deltaTable, writeOptions)) } /** @@ -1017,8 +1034,11 @@ class DeltaAnalysis(session: SparkSession) * name queries. We also check that any columns not in the list of user-specified columns must * have a default expression. */ - private def needsSchemaAdjustmentByName(query: LogicalPlan, targetAttrs: Seq[Attribute], - deltaTable: DeltaTableV2): Boolean = { + private def needsSchemaAdjustmentByName( + query: LogicalPlan, + targetAttrs: Seq[Attribute], + deltaTable: DeltaTableV2, + writeOptions: Map[String, String]): Boolean = { insertIntoByNameMissingColumn(query, targetAttrs, deltaTable) val userSpecifiedNames = if (session.sessionState.conf.caseSensitiveAnalysis) { query.output.map(a => (a.name, a)).toMap @@ -1029,7 +1049,7 @@ class DeltaAnalysis(session: SparkSession) !SchemaUtils.isReadCompatible( specifiedTargetAttrs.toStructType.asNullable, query.output.toStructType, - allowTypeWidening = allowTypeWidening(deltaTable) + typeWideningMode = getTypeWideningMode(deltaTable, writeOptions) ) } @@ -1060,44 +1080,47 @@ class DeltaAnalysis(session: SparkSession) parent: NamedExpression, source: StructType, target: StructType, - allowTypeWidening: Boolean): NamedExpression = { + typeWideningMode: TypeWideningMode): NamedExpression = { if (source.length < target.length) { throw DeltaErrors.notEnoughColumnsInInsert( tableName, source.length, target.length, Some(parent.qualifiedName)) } + // Extracts the field at a given index in the target schema. Only matches if the index is valid. + object TargetIndex { + def unapply(index: Int): Option[StructField] = target.lift(index) + } + val fields = source.zipWithIndex.map { - case (StructField(name, nested: StructType, _, metadata), i) if i < target.length => - target(i).dataType match { + case (StructField(name, nested: StructType, _, metadata), i @ TargetIndex(targetField)) => + targetField.dataType match { case t: StructType => - val subField = Alias(GetStructField(parent, i, Option(name)), target(i).name)( + val subField = Alias(GetStructField(parent, i, Option(name)), targetField.name)( explicitMetadata = Option(metadata)) - addCastsToStructs(tableName, subField, nested, t, allowTypeWidening) + addCastsToStructs(tableName, subField, nested, t, typeWideningMode) case o => val field = parent.qualifiedName + "." + name - val targetName = parent.qualifiedName + "." + target(i).name + val targetName = parent.qualifiedName + "." + targetField.name throw DeltaErrors.cannotInsertIntoColumn(tableName, field, targetName, o.simpleString) } - case (StructField(name, dt: AtomicType, _, _), i) if i < target.length && allowTypeWidening && - TypeWidening.isTypeChangeSupportedForSchemaEvolution( - target(i).dataType.asInstanceOf[AtomicType], dt) => - val targetAttr = target(i) + case (StructField(name, sourceType: AtomicType, _, _), + i @ TargetIndex(StructField(targetName, targetType: AtomicType, _, targetMetadata))) + if typeWideningMode.shouldWidenType(fromType = targetType, toType = sourceType) => Alias( GetStructField(parent, i, Option(name)), - targetAttr.name)(explicitMetadata = Option(targetAttr.metadata)) - case (other, i) if i < target.length => - val targetAttr = target(i) + targetName)(explicitMetadata = Option(targetMetadata)) + case (sourceField, i @ TargetIndex(targetField)) => Alias( - getCastFunction(GetStructField(parent, i, Option(other.name)), - targetAttr.dataType, targetAttr.name), - targetAttr.name)(explicitMetadata = Option(targetAttr.metadata)) + getCastFunction(GetStructField(parent, i, Option(sourceField.name)), + targetField.dataType, targetField.name), + targetField.name)(explicitMetadata = Option(targetField.metadata)) - case (other, i) => + case (sourceField, i) => // This is a new column, so leave to schema evolution as is. Do not lose it's name so // wrap with an alias Alias( - GetStructField(parent, i, Option(other.name)), - other.name)(explicitMetadata = Option(other.metadata)) + GetStructField(parent, i, Option(sourceField.name)), + sourceField.name)(explicitMetadata = Option(sourceField.metadata)) } Alias(CreateStruct(fields), parent.name)( parent.exprId, parent.qualifier, Option(parent.metadata)) @@ -1109,10 +1132,10 @@ class DeltaAnalysis(session: SparkSession) source: StructType, target: StructType, sourceNullable: Boolean, - allowTypeWidening: Boolean): Expression = { + typeWideningMode: TypeWideningMode): Expression = { val structConverter: (Expression, Expression) => Expression = (_, i) => addCastsToStructs( - tableName, Alias(GetArrayItem(parent, i), i.toString)(), source, target, allowTypeWidening) + tableName, Alias(GetArrayItem(parent, i), i.toString)(), source, target, typeWideningMode) val transformLambdaFunc = { val elementVar = NamedLambdaVariable("elementVar", source, sourceNullable) val indexVar = NamedLambdaVariable("indexVar", IntegerType, false) @@ -1137,7 +1160,7 @@ class DeltaAnalysis(session: SparkSession) parent: NamedExpression, sourceMapType: MapType, targetMapType: MapType, - allowTypeWidening: Boolean): Expression = { + typeWideningMode: TypeWideningMode): Expression = { val transformedKeys = if (sourceMapType.keyType != targetMapType.keyType) { // Create a transformation for the keys @@ -1153,7 +1176,7 @@ class DeltaAnalysis(session: SparkSession) key, keyAttr, tableName, - allowTypeWidening + typeWideningMode ) LambdaFunction(castedKey, Seq(key)) }) @@ -1176,7 +1199,7 @@ class DeltaAnalysis(session: SparkSession) value, valueAttr, tableName, - allowTypeWidening + typeWideningMode ) LambdaFunction(castedValue, Seq(value)) }) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index b90b4524558..9f005e2bc89 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -3402,6 +3402,23 @@ trait DeltaErrorsBase ) } + def icebergCompatUnsupportedTypeWideningException( + version: Int, + fieldPath: Seq[String], + oldType: DataType, + newType: DataType): Throwable = { + new DeltaUnsupportedOperationException( + errorClass = "DELTA_ICEBERG_COMPAT_VIOLATION.UNSUPPORTED_TYPE_WIDENING", + messageParameters = Array( + version.toString, + version.toString, + SchemaUtils.prettyFieldName(fieldPath), + toSQLType(oldType), + toSQLType(newType) + ) + ) + } + def universalFormatConversionFailedException( failedOnCommitVersion: Long, format: String, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompat.scala index 014fb3d4aea..0df246d399f 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/IcebergCompat.scala @@ -21,8 +21,10 @@ import org.apache.spark.sql.delta.commands.DeletionVectorUtils import org.apache.spark.sql.delta.logging.DeltaLogKeys import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.internal.MDC +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types._ /** @@ -47,7 +49,8 @@ object IcebergCompatV1 extends IcebergCompat( CheckAddFileHasStats, CheckNoPartitionEvolution, CheckNoListMapNullType, - CheckDeletionVectorDisabled + CheckDeletionVectorDisabled, + CheckTypeWideningSupported ) ) @@ -62,7 +65,8 @@ object IcebergCompatV2 extends IcebergCompat( CheckTypeInV2AllowList, CheckPartitionDataTypeInV2AllowList, CheckNoPartitionEvolution, - CheckDeletionVectorDisabled + CheckDeletionVectorDisabled, + CheckTypeWideningSupported ) ) @@ -104,6 +108,7 @@ case class IcebergCompat( * updates need to be applied, will return None. */ def enforceInvariantsAndDependencies( + spark: SparkSession, prevSnapshot: Snapshot, newestProtocol: Protocol, newestMetadata: Metadata, @@ -190,6 +195,7 @@ case class IcebergCompat( // Apply additional checks val context = IcebergCompatContext( + spark, prevSnapshot, protocolResult.getOrElse(newestProtocol), metadataResult.getOrElse(newestMetadata), @@ -301,6 +307,7 @@ object RequireColumnMapping extends RequiredDeltaTableProperty( } case class IcebergCompatContext( + spark: SparkSession, prevSnapshot: Snapshot, newestProtocol: Protocol, newestMetadata: Metadata, @@ -457,3 +464,31 @@ object CheckDeletionVectorDisabled extends IcebergCompatCheck { } } } + +/** + * Checks that the table didn't go through any type changes that Iceberg doesn't support. See + * `TypeWidening.isTypeChangeSupportedByIceberg()` for supported type changes. + * Note that this check covers both: + * - When the table had an unsupported type change applied in the past and Uniform is being enabled. + * - When Uniform is enabled and a new, unsupported type change is being applied. + */ +object CheckTypeWideningSupported extends IcebergCompatCheck { + override def apply(context: IcebergCompatContext): Unit = { + val skipCheck = context.spark.sessionState.conf + .getConf(DeltaSQLConf.DELTA_TYPE_WIDENING_ALLOW_UNSUPPORTED_ICEBERG_TYPE_CHANGES) + + if (skipCheck || !TypeWidening.isSupported(context.newestProtocol)) return + + TypeWideningMetadata.getAllTypeChanges(context.newestMetadata.schema).foreach { + case (fieldPath, TypeChange(_, fromType: AtomicType, toType: AtomicType, _)) + // We ignore type changes that are not generally supported with type widening to reduce the + // risk of this check misfiring. These are handled by `TypeWidening.assertTableReadable()`. + // The error here only captures type changes that are supported in Delta but not Iceberg. + if TypeWidening.isTypeChangeSupported(fromType, toType) && + !TypeWidening.isTypeChangeSupportedByIceberg(fromType, toType) => + throw DeltaErrors.icebergCompatUnsupportedTypeWideningException( + context.version, fieldPath, fromType, toType) + case _ => () // ignore + } + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 91af0fb4049..8722ce249fb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1970,6 +1970,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite val (protocolUpdate1, metadataUpdate1) = UniversalFormat.enforceInvariantsAndDependencies( + spark, // Note: if this txn has no protocol or metadata updates, then `prev` will equal `newest`. snapshot, newestProtocol = protocol, // Note: this will try to use `newProtocol` diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala index 49fa154e22b..d891d74a1c4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.{AtomicType, StructField, StructType} /** * Implements logic to resolve conditions and actions in MERGE clauses and handles schema evolution. @@ -292,11 +292,13 @@ object ResolveDeltaMergeInto { }) val migrationSchema = filterSchema(source.schema, Seq.empty) - val allowTypeWidening = target.exists { - case DeltaTable(fileIndex) => - TypeWidening.isEnabled(fileIndex.protocol, fileIndex.metadata) - case _ => false - } + + val typeWideningMode = + target.collectFirst { + case DeltaTable(index) if TypeWidening.isEnabled(index.protocol, index.metadata) => + TypeWideningMode.TypeEvolution( + uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(index.metadata)) + }.getOrElse(TypeWideningMode.NoTypeWidening) // The implicit conversions flag allows any type to be merged from source to target if Spark // SQL considers the source type implicitly castable to the target. Normally, mergeSchemas @@ -306,7 +308,7 @@ object ResolveDeltaMergeInto { target.schema, migrationSchema, allowImplicitConversions = true, - allowTypeWidening = allowTypeWidening + typeWideningMode = typeWideningMode ) } else { target.schema diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index 8c7e39eee63..49ddaa800ea 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -68,14 +68,41 @@ object TypeWidening { * It is the responsibility of the caller to recurse into structs, maps and arrays. */ def isTypeChangeSupported(fromType: AtomicType, toType: AtomicType): Boolean = - TypeWideningShims.isTypeChangeSupported(fromType, toType) + TypeWideningShims.isTypeChangeSupported(fromType = fromType, toType = toType) /** * Returns whether the given type change can be applied during schema evolution. Only a * subset of supported type changes are considered for schema evolution. */ - def isTypeChangeSupportedForSchemaEvolution(fromType: AtomicType, toType: AtomicType): Boolean = - TypeWideningShims.isTypeChangeSupportedForSchemaEvolution(fromType, toType) + def isTypeChangeSupportedForSchemaEvolution( + fromType: AtomicType, + toType: AtomicType, + uniformIcebergCompatibleOnly: Boolean): Boolean = + TypeWideningShims.isTypeChangeSupportedForSchemaEvolution( + fromType = fromType, + toType = toType + ) && ( + !uniformIcebergCompatibleOnly || + isTypeChangeSupportedByIceberg(fromType = fromType, toType = toType) + ) + + /** + * Returns whether the given type change is supported by Iceberg, and by extension can be read + * using Uniform. See https://iceberg.apache.org/spec/#schema-evolution. + * Note that these are type promotions supported by Iceberg V1 & V2 (both support the same type + * promotions). Iceberg V3 will add support for date -> timestamp_ntz and void -> any but Uniform + * doesn't currently support Iceberg V3. + */ + def isTypeChangeSupportedByIceberg(fromType: AtomicType, toType: AtomicType): Boolean = + (fromType, toType) match { + case (from, to) if from == to => true + case (from, to) if !isTypeChangeSupported(from, to) => false + case (from: IntegralType, to: IntegralType) => from.defaultSize <= to.defaultSize + case (FloatType, DoubleType) => true + case (from: DecimalType, to: DecimalType) + if from.scale == to.scale && from.precision <= to.precision => true + case _ => false + } /** * Asserts that the given table doesn't contain any unsupported type changes. This should never diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMode.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMode.scala new file mode 100644 index 00000000000..4d3a15dbf9b --- /dev/null +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMode.scala @@ -0,0 +1,52 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.types.AtomicType + +/** + * A type widening mode captures a specific set of type changes that are allowed to be applied. + * Currently: + * - NoTypeWidening: No type change is allowed. + * - TypeEvolution(uniformIcebergCompatibleOnly = true): Type changes that are eligible to be + * applied automatically during schema evolution and that are supported by Iceberg are allowed. + * - TypeEvolution(uniformIcebergCompatibleOnly = false): Type changes that are eligible to be + * applied automatically during schema evolution are allowed, even if they are not supported by + * Iceberg. + */ +sealed trait TypeWideningMode { + def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean +} + +object TypeWideningMode { + /** + * No type change allowed. Typically because type widening and/or schema evolution isn't enabled. + */ + case object NoTypeWidening extends TypeWideningMode { + override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean = false + } + + /** + * Type changes that are eligible to be applied automatically during schema evolution are allowed. + * Can be restricted to only type changes supported by Iceberg. + */ + case class TypeEvolution(uniformIcebergCompatibleOnly: Boolean) extends TypeWideningMode { + override def shouldWidenType(fromType: AtomicType, toType: AtomicType): Boolean = + TypeWidening.isTypeChangeSupportedForSchemaEvolution( + fromType = fromType, toType = toType, uniformIcebergCompatibleOnly) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala index 65698d669f7..80b9294be8d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala @@ -109,6 +109,7 @@ object UniversalFormat extends DeltaLogging { * updates need to be applied, will return None. */ def enforceInvariantsAndDependencies( + spark: SparkSession, snapshot: Snapshot, newestProtocol: Protocol, newestMetadata: Metadata, @@ -116,7 +117,7 @@ object UniversalFormat extends DeltaLogging { actions: Seq[Action]): (Option[Protocol], Option[Metadata]) = { enforceHudiDependencies(newestMetadata, snapshot) enforceIcebergInvariantsAndDependencies( - snapshot, newestProtocol, newestMetadata, operation, actions) + spark, snapshot, newestProtocol, newestMetadata, operation, actions) } /** @@ -151,6 +152,7 @@ object UniversalFormat extends DeltaLogging { * updates need to be applied, will return None. */ def enforceIcebergInvariantsAndDependencies( + spark: SparkSession, snapshot: Snapshot, newestProtocol: Protocol, newestMetadata: Metadata, @@ -200,6 +202,7 @@ object UniversalFormat extends DeltaLogging { changed = uniformProtocol.nonEmpty || uniformMetadata.nonEmpty val (v1protocolUpdate, v1metadataUpdate) = IcebergCompatV1.enforceInvariantsAndDependencies( + spark, snapshot, newestProtocol = protocolToCheck, newestMetadata = metadataToCheck, @@ -211,6 +214,7 @@ object UniversalFormat extends DeltaLogging { changed ||= v1protocolUpdate.nonEmpty || v1metadataUpdate.nonEmpty val (v2protocolUpdate, v2metadataUpdate) = IcebergCompatV2.enforceInvariantsAndDependencies( + spark, snapshot, newestProtocol = protocolToCheck, newestMetadata = metadataToCheck, @@ -238,12 +242,14 @@ object UniversalFormat extends DeltaLogging { * otherwise the original configuration. */ def enforceDependenciesInConfiguration( + spark: SparkSession, configuration: Map[String, String], snapshot: Snapshot): Map[String, String] = { var metadata = snapshot.metadata.copy(configuration = configuration) // Check UniversalFormat related property dependencies val (_, universalMetadata) = UniversalFormat.enforceInvariantsAndDependencies( + spark, snapshot, newestProtocol = snapshot.protocol, newestMetadata = metadata, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index b0740c38f9c..7487c717545 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -284,6 +284,7 @@ case class CreateDeltaTableCommand( txn, tableWithLocation, options, + sparkSession, schema) } var taggedCommitData = deltaWriter.writeAndReturnCommitData( @@ -319,8 +320,11 @@ case class CreateDeltaTableCommand( ) (taggedCommitData, op) } - val updatedConfiguration = UniversalFormat - .enforceDependenciesInConfiguration(deltaWriter.configuration, txn.snapshot) + val updatedConfiguration = UniversalFormat.enforceDependenciesInConfiguration( + sparkSession, + deltaWriter.configuration, + txn.snapshot + ) val updatedWriter = deltaWriter.withNewWriterConfiguration(updatedConfiguration) // We are either appending/overwriting with saveAsTable or creating a new table with CTAS if (!hasBeenExecuted(txn, sparkSession, Some(options))) { @@ -382,7 +386,10 @@ case class CreateDeltaTableCommand( getProvidedMetadata(tableWithLocation, table.schema.json) newMetadata = newMetadata.copy(configuration = UniversalFormat.enforceDependenciesInConfiguration( - newMetadata.configuration, txn.snapshot)) + sparkSession, + newMetadata.configuration, + txn.snapshot + )) txn.updateMetadataForNewTable(newMetadata) protocol.foreach { protocol => @@ -425,6 +432,7 @@ case class CreateDeltaTableCommand( txn, tableWithLocation, options, + sparkSession, tableWithLocation.schema) // Truncate the table val operationTimestamp = System.currentTimeMillis() @@ -753,6 +761,7 @@ case class CreateDeltaTableCommand( txn: OptimisticTransaction, tableDesc: CatalogTable, options: DeltaOptions, + sparkSession: SparkSession, schema: StructType): Unit = { // If a user explicitly specifies not to overwrite the schema, during a replace, we should // tell them that it's not supported @@ -766,6 +775,7 @@ case class CreateDeltaTableCommand( // or createOrReplace a table, we blindly overwrite the metadata. var newMetadata = getProvidedMetadata(table, schema.json) val updatedConfig = UniversalFormat.enforceDependenciesInConfiguration( + sparkSession, newMetadata.configuration, txn.snapshot) newMetadata = newMetadata.copy(configuration = updatedConfig) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala index cbaf4fdee0e..e66e68d81fa 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala @@ -29,7 +29,7 @@ import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.FileSourceGeneratedMetadataStructField import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, AtomicType, DataType, MapType, StructType} /** * A trait that writers into Delta can extend to update the schema and/or partitioning of the table. @@ -220,10 +220,17 @@ object ImplicitMetadataOperation { } else { checkDependentExpressions(spark, txn.protocol, txn.metadata, dataSchema) + val typeWideningMode = if (TypeWidening.isEnabled(txn.protocol, txn.metadata)) { + TypeWideningMode.TypeEvolution( + uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(txn.metadata)) + } else { + TypeWideningMode.NoTypeWidening + } + SchemaMergingUtils.mergeSchemas( txn.metadata.schema, dataSchema, - allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata)) + typeWideningMode = typeWideningMode) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala index c428ffcd108..ada30d2e8b9 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.delta.schema import scala.util.control.NonFatal -import org.apache.spark.sql.delta.{DeltaAnalysisException, TypeWidening} +import org.apache.spark.sql.delta.{DeltaAnalysisException, TypeWideningMode} import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions.Literal @@ -153,7 +153,7 @@ object SchemaMergingUtils { dataSchema: StructType, allowImplicitConversions: Boolean = false, keepExistingType: Boolean = false, - allowTypeWidening: Boolean = false, + typeWideningMode: TypeWideningMode = TypeWideningMode.NoTypeWidening, caseSensitive: Boolean = false): StructType = { checkColumnNameDuplication(dataSchema, "in the data to save", caseSensitive) mergeDataTypes( @@ -161,7 +161,7 @@ object SchemaMergingUtils { dataSchema, allowImplicitConversions, keepExistingType, - allowTypeWidening, + typeWideningMode, caseSensitive, allowOverride = false ).asInstanceOf[StructType] @@ -178,6 +178,9 @@ object SchemaMergingUtils { * merge will succeed, because once we get to write time Spark SQL * will support implicitly converting the int to a string. * @param keepExistingType Whether to keep existing types instead of trying to merge types. + * @param typeWideningMode Identifies the (current, update) type tuples where `current` can be + * widened to `update`, in which case `update` is used. See + * [[TypeWideningMode]]. * @param caseSensitive Whether we should keep field mapping case-sensitively. * This should default to false for Delta, which is case insensitive. * @param allowOverride Whether to let incoming type override the existing type if unmatched. @@ -187,7 +190,7 @@ object SchemaMergingUtils { update: DataType, allowImplicitConversions: Boolean, keepExistingType: Boolean, - allowTypeWidening: Boolean, + typeWideningMode: TypeWideningMode, caseSensitive: Boolean, allowOverride: Boolean): DataType = { def merge(current: DataType, update: DataType): DataType = { @@ -236,9 +239,10 @@ object SchemaMergingUtils { merge(currentElementType, updateElementType), currentContainsNull) - // If allowTypeWidening is true and supported, it takes precedence over keepExistingType - case (current: AtomicType, update: AtomicType) if allowTypeWidening && - TypeWidening.isTypeChangeSupportedForSchemaEvolution(current, update) => update + // If type widening is enabled and the type can be widened, it takes precedence over + // keepExistingType. + case (current: AtomicType, update: AtomicType) + if typeWideningMode.shouldWidenType(fromType = current, toType = update) => update // Simply keeps the existing type for primitive types case (current, _) if keepExistingType => current diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala index 2b9b619eb20..c43890c97e7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaUtils.scala @@ -20,7 +20,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal -import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping, TypeWidening} +import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping, TypeWidening, TypeWideningMode} import org.apache.spark.sql.delta.{RowCommitVersion, RowId} import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.commands.cdc.CDCReader @@ -381,8 +381,8 @@ def normalizeColumnNamesInDataType( * new schema of a Delta table can be used with a previously analyzed LogicalPlan. Our * rules are to return false if: * - Dropping any column that was present in the existing schema, if not allowMissingColumns - * - Any change of datatype, if not allowTypeWidening. Any non-widening change of datatype - * otherwise. + * - Any change of datatype, unless eligible for widening. The caller specifies eligible type + * changes via `typeWideningMode`. * - Change of partition columns. Although analyzed LogicalPlan is not changed, * physical structure of data is changed and thus is considered not read compatible. * - If `forbidTightenNullability` = true: @@ -403,7 +403,7 @@ def normalizeColumnNamesInDataType( readSchema: StructType, forbidTightenNullability: Boolean = false, allowMissingColumns: Boolean = false, - allowTypeWidening: Boolean = false, + typeWideningMode: TypeWideningMode = TypeWideningMode.NoTypeWidening, newPartitionColumns: Seq[String] = Seq.empty, oldPartitionColumns: Seq[String] = Seq.empty): Boolean = { @@ -418,7 +418,7 @@ def normalizeColumnNamesInDataType( def isDatatypeReadCompatible(existing: DataType, newtype: DataType): Boolean = { (existing, newtype) match { case (e: StructType, n: StructType) => - isReadCompatible(e, n, forbidTightenNullability, allowTypeWidening = allowTypeWidening) + isReadCompatible(e, n, forbidTightenNullability, typeWideningMode = typeWideningMode) case (e: ArrayType, n: ArrayType) => // if existing elements are non-nullable, so should be the new element isNullabilityCompatible(e.containsNull, n.containsNull) && @@ -428,8 +428,8 @@ def normalizeColumnNamesInDataType( isNullabilityCompatible(e.valueContainsNull, n.valueContainsNull) && isDatatypeReadCompatible(e.keyType, n.keyType) && isDatatypeReadCompatible(e.valueType, n.valueType) - case (e: AtomicType, n: AtomicType) if allowTypeWidening => - TypeWidening.isTypeChangeSupportedForSchemaEvolution(e, n) + case (e: AtomicType, n: AtomicType) + if typeWideningMode.shouldWidenType(fromType = e, toType = n) => true case (a, b) => a == b } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index e3b30c7248c..754212f45a3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1205,6 +1205,22 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + /** + * Internal config to bypass check that prevents applying type changes that are not supported by + * Iceberg when Uniform is enabled with Iceberg compatibility. + */ + val DELTA_TYPE_WIDENING_ALLOW_UNSUPPORTED_ICEBERG_TYPE_CHANGES = + buildConf("typeWidening.allowUnsupportedIcebergTypeChanges") + .internal() + .doc( + """ + |By default, type changes that aren't supported by Iceberg are rejected when Uniform is + |enabled with Iceberg compatibility. This config allows bypassing this restriction, but + |reading the affected column with Iceberg clients will likely fail or behave erratically. + |""".stripMargin) + .booleanConf + .createWithDefault(false) + val DELTA_IS_DELTA_TABLE_THROW_ON_ERROR = buildConf("isDeltaTable.throwOnError") .internal() diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala index c65eabb6e02..a412aa47c2e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSink.scala @@ -176,11 +176,17 @@ case class DeltaSink( if (canOverwriteSchema) return dataSchema + val typeWideningMode = if (canMergeSchema && TypeWidening.isEnabled(protocol, metadata)) { + TypeWideningMode.TypeEvolution( + uniformIcebergCompatibleOnly = UniversalFormat.icebergEnabled(metadata)) + } else { + TypeWideningMode.NoTypeWidening + } SchemaMergingUtils.mergeSchemas( metadata.schema, dataSchema, allowImplicitConversions = true, - allowTypeWidening = canMergeSchema && TypeWidening.isEnabled(protocol, metadata) + typeWideningMode = typeWideningMode ) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala index 5d89b28420f..0f4b6a42436 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta import scala.collection.mutable import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.hadoop.fs.Path import org.apache.spark.{DebugFilesystem, SparkThrowable} import org.apache.spark.sql.{DataFrame, QueryTest, SaveMode} @@ -221,10 +222,11 @@ trait DeltaInsertIntoTest extends QueryTest with DeltaDMLTestUtils with DeltaSQL val isSQL: Boolean = false def runInsert(columns: Seq[String], whereCol: String, whereValue: Int): Unit = { val tablePath = DeltaLog.forTable(spark, TableIdentifier("target")).dataPath + val checkpointLocation = new Path(tablePath, "_checkpoint") val query = spark.readStream .table("source") .writeStream - .option("checkpointLocation", tablePath.toString) + .option("checkpointLocation", checkpointLocation.toString) .format("delta") .trigger(Trigger.AvailableNow()) .toTable("target") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/UniversalFormatSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/UniversalFormatSuiteBase.scala index 2401dde1ed4..5f64234dcd7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/UniversalFormatSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/UniversalFormatSuiteBase.scala @@ -450,7 +450,7 @@ trait UniversalFormatMiscSuiteBase extends IcebergCompatUtilsBase { // The enforce is not lossy. It will do nothing if there is no Universal related key. def getUpdatedConfiguration(conf: Map[String, String]): Map[String, String] = - UniversalFormat.enforceDependenciesInConfiguration(conf, snapshot) + UniversalFormat.enforceDependenciesInConfiguration(spark, conf, snapshot) var updatedConfiguration = getUpdatedConfiguration(configurationUnderTest) assert(configurationUnderTest == configurationUnderTest) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala index 8126460fcca..c6939677d71 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala @@ -22,7 +22,7 @@ import java.util.regex.Pattern import scala.annotation.tailrec -import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaLog, DeltaTestUtils} +import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaLog, DeltaTestUtils, TypeWideningMode} import org.apache.spark.sql.delta.RowCommitVersion import org.apache.spark.sql.delta.RowId import org.apache.spark.sql.delta.commands.cdc.CDCReader @@ -2433,10 +2433,16 @@ class SchemaUtilsSuite extends QueryTest // Array root type val base1 = ArrayType(new StructType().add("a", IntegerType)) val update1 = ArrayType(new StructType().add("b", IntegerType)) - - assert(mergeDataTypes( - base1, update1, false, false, false, false, allowOverride = false) === - ArrayType(new StructType().add("a", IntegerType).add("b", IntegerType))) + val mergedType1 = + mergeDataTypes( + current = base1, + update = update1, + allowImplicitConversions = false, + keepExistingType = false, + typeWideningMode = TypeWideningMode.NoTypeWidening, + caseSensitive = false, + allowOverride = false) + assert(mergedType1 === ArrayType(new StructType().add("a", IntegerType).add("b", IntegerType))) // Map root type val base2 = MapType( @@ -2447,9 +2453,16 @@ class SchemaUtilsSuite extends QueryTest new StructType().add("b", IntegerType), new StructType().add("c", IntegerType) ) - - assert(mergeDataTypes( - base2, update2, false, false, false, false, allowOverride = false) === + val mergedType2 = + mergeDataTypes( + current = base2, + update = update2, + allowImplicitConversions = false, + keepExistingType = false, + typeWideningMode = TypeWideningMode.NoTypeWidening, + caseSensitive = false, + allowOverride = false) + assert(mergedType2 === MapType( new StructType().add("a", IntegerType).add("b", IntegerType), new StructType().add("b", IntegerType).add("c", IntegerType) @@ -2460,20 +2473,34 @@ class SchemaUtilsSuite extends QueryTest // override root type val base1 = new StructType().add("a", IntegerType) val update1 = ArrayType(LongType) - - assert(mergeDataTypes( - base1, update1, false, false, false, false, allowOverride = true) === ArrayType(LongType)) + val mergedSchema1 = + mergeDataTypes( + current = base1, + update = update1, + allowImplicitConversions = false, + keepExistingType = false, + typeWideningMode = TypeWideningMode.NoTypeWidening, + caseSensitive = false, + allowOverride = true) + assert(mergedSchema1 === ArrayType(LongType)) // override nested type val base2 = ArrayType(new StructType().add("a", IntegerType).add("b", StringType)) val update2 = ArrayType(new StructType().add("a", MapType(StringType, StringType))) - - assert(mergeDataTypes( - base2, update2, false, false, false, false, allowOverride = true) === + val mergedSchema2 = + mergeDataTypes( + current = base2, + update = update2, + allowImplicitConversions = false, + keepExistingType = false, + typeWideningMode = TypeWideningMode.NoTypeWidening, + caseSensitive = false, + allowOverride = true) + assert(mergedSchema2 === ArrayType(new StructType().add("a", MapType(StringType, StringType)).add("b", StringType))) } - test("keepExistingType and allowTypeWidening both true allows both widening and " + + test("keepExistingType and typeWideningMode both set allows both widening and " + "preserving non-widenable existing types") { val base = new StructType() .add("widened", ShortType) @@ -2501,9 +2528,15 @@ class SchemaUtilsSuite extends QueryTest .add("map", MapType(IntegerType, IntegerType)) .add("array", ArrayType(StringType)) .add("nonwidened", IntegerType) - assert( - mergeSchemas(base, update, allowTypeWidening = true, keepExistingType = true) === expected - ) + + val mergedSchema = + mergeSchemas( + base, + update, + typeWideningMode = TypeWideningMode.TypeEvolution(uniformIcebergCompatibleOnly = false), + keepExistingType = true + ) + assert(mergedSchema === expected) } //////////////////////////// diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala index 487a6609814..49099315b08 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala @@ -122,6 +122,23 @@ trait TypeWideningInsertSchemaEvolutionTests Seq(1, 2).toDF("a").select($"a".cast(ShortType))) } + test("INSERT - type widening is triggered when schema evolution is enabled via option") { + val tableName = "type_widening_insert_into_table" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (a short) USING DELTA") + Seq(1, 2).toDF("a") + .write + .format("delta") + .mode("append") + .option("mergeSchema", "true") + .insertInto(tableName) + + val result = spark.read.format("delta").table(tableName) + assert(result.schema("a").dataType === IntegerType) + checkAnswer(result, Seq(1, 2).toDF("a")) + } + } + /** * Short-hand to create a logical plan to insert into the table. This captures the state of the * table at the time the method is called, e.p. the type widening property value that will be used diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningUniformTests.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningUniformTests.scala new file mode 100644 index 00000000000..23d5d78bfe5 --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningUniformTests.scala @@ -0,0 +1,247 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.typewidening + +import org.apache.spark.sql.delta.{DeltaInsertIntoTest, DeltaUnsupportedOperationException, IcebergCompat, IcebergCompatV1, IcebergCompatV2} +import org.apache.spark.sql.delta.DeltaErrors.toSQLType +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.scalatest.GivenWhenThen + +import org.apache.spark.sql.{QueryTest, SaveMode} +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.types.{DataType, DateType, DecimalType} + +/** Trait collecting tests covering type widening + Uniform Iceberg compatibility. */ +trait TypeWideningUniformTests extends QueryTest + with TypeWideningTestMixin + with TypeWideningTestCases + with DeltaInsertIntoTest + with GivenWhenThen { + + // Iceberg supports all base type changes eligible for widening during schema evolution except + // for date -> timestampNtz and decimal scale changes. + private val icebergSupportedTestCases = supportedTestCases.filter { + case SupportedTypeEvolutionTestCase(_ : DateType, _, _, _) => false + case SupportedTypeEvolutionTestCase(from: DecimalType, to: DecimalType, _, _) => + from.scale == to.scale + case _ => true + } + + // Unsupported type changes are all base changes that aren't supported above and all changes that + // are not eligible for schema evolution: int -> double, int -> decimal + private val icebergUnsupportedTestCases = + supportedTestCases.diff(icebergSupportedTestCases) ++ alterTableOnlySupportedTestCases + + /** Helper to enable Uniform with Iceberg compatibility on the given table. */ + private def enableIcebergUniform(tableName: String, compat: IcebergCompat): Unit = + sql( + s""" + |ALTER TABLE $tableName SET TBLPROPERTIES ( + | 'delta.columnMapping.mode' = 'name', + | '${compat.config.key}' = 'true', + | 'delta.universalFormat.enabledFormats' = 'iceberg' + |) + """.stripMargin) + + /** Helper to check that the given function violates Uniform compatibility with type widening. */ + private def checkIcebergCompatViolation( + compat: IcebergCompat, + fromType: DataType, + toType: DataType)(f: => Unit): Unit = { + Given(s"iceberg compat ${compat.getClass.getSimpleName}") + checkError( + exception = intercept[DeltaUnsupportedOperationException] { + f + }, + "DELTA_ICEBERG_COMPAT_VIOLATION.UNSUPPORTED_TYPE_WIDENING", + parameters = Map( + "version" -> compat.version.toString, + "prevType" -> toSQLType(fromType), + "newType" -> toSQLType(toType), + "fieldPath" -> "a" + ) + ) + } + + test("apply supported type change then enable uniform") { + for (testCase <- icebergSupportedTestCases) { + Given(s"changing ${testCase.fromType.sql} -> ${testCase.toType.sql}") + val tableName = "type_widening_uniform_supported_table" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (a ${testCase.fromType.sql}) USING DELTA") + sql(s"ALTER TABLE $tableName CHANGE COLUMN a TYPE ${testCase.toType.sql}") + enableIcebergUniform(tableName, IcebergCompatV2) + } + } + } + + test("apply unsupported type change then enable uniform") { + for (testCase <- icebergUnsupportedTestCases) { + val tableName = "type_widening_uniform_unsupported_table" + withTable(tableName) { + Given(s"changing ${testCase.fromType.sql} -> ${testCase.toType.sql}") + sql(s"CREATE TABLE $tableName (a ${testCase.fromType.sql}) USING DELTA") + sql(s"ALTER TABLE $tableName CHANGE COLUMN a TYPE ${testCase.toType.sql}") + checkIcebergCompatViolation(IcebergCompatV1, testCase.fromType, testCase.toType) { + enableIcebergUniform(tableName, IcebergCompatV1) + } + checkIcebergCompatViolation(IcebergCompatV2, testCase.fromType, testCase.toType) { + enableIcebergUniform(tableName, IcebergCompatV2) + } + } + } + } + + test("enable uniform then apply supported type change - ALTER TABLE") { + for (testCase <- icebergSupportedTestCases) { + Given(s"changing ${testCase.fromType.sql} -> ${testCase.toType.sql}") + val tableName = "type_widening_uniform_manual_supported_table" + withTable(tableName) { + sql(s"CREATE TABLE $tableName (a ${testCase.fromType.sql}) USING DELTA") + enableIcebergUniform(tableName, IcebergCompatV2) + sql(s"ALTER TABLE $tableName CHANGE COLUMN a TYPE ${testCase.toType.sql}") + } + } + } + + test("enable uniform then apply unsupported type change - ALTER TABLE") { + for (testCase <- icebergUnsupportedTestCases) { + val tableName = "type_widening_uniform_manual_unsupported_table" + withTable(tableName) { + Given(s"changing ${testCase.fromType.sql} -> ${testCase.toType.sql}") + sql(s"CREATE TABLE $tableName (a ${testCase.fromType.sql}) USING DELTA") + enableIcebergUniform(tableName, IcebergCompatV2) + checkIcebergCompatViolation(IcebergCompatV2, testCase.fromType, testCase.toType) { + sql(s"ALTER TABLE $tableName CHANGE COLUMN a TYPE ${testCase.toType.sql}") + } + } + } + } + + + test("enable uniform then apply supported type change - MERGE") { + for (testCase <- icebergSupportedTestCases) { + Given(s"changing ${testCase.fromType.sql} -> ${testCase.toType.sql}") + withTable("source", "target") { + testCase.initialValuesDF.write.format("delta").saveAsTable("target") + testCase.additionalValuesDF.write.format("delta").saveAsTable("source") + enableIcebergUniform("target", IcebergCompatV2) + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { + sql( + s""" + |MERGE INTO target + |USING source + |ON 0 = 1 + |WHEN NOT MATCHED THEN INSERT * + """.stripMargin) + } + val result = sql(s"SELECT * FROM target") + assert(result.schema("value").dataType === testCase.toType) + checkAnswer(result, testCase.expectedResult) + } + } + } + + test("enable uniform then apply unsupported type change - MERGE") { + for (testCase <- icebergUnsupportedTestCases) { + Given(s"changing ${testCase.fromType.sql} -> ${testCase.toType.sql}") + withTable("source", "target") { + testCase.initialValuesDF.write.format("delta").saveAsTable("target") + // Here we use a source for MERGE that contains the same data that is already present in the + // target, except that it uses a wider type. Since Uniform is enabled and Iceberg doesn't + // support the given type change, we will keep the existing narrower type and downcast + // values. `testCase.additionalValueDF` contains values that would overflow, which would + // just fail, hence why we use `testCase.initialValueDF` instead. + testCase.initialValuesDF + .select(col("value").cast(testCase.toType)) + .write + .format("delta") + .saveAsTable("source") + enableIcebergUniform("target", IcebergCompatV2) + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { + sql( + s""" + |MERGE INTO target + |USING source + |ON 0 = 1 + |WHEN NOT MATCHED THEN INSERT * + """.stripMargin) + } + val result = sql(s"SELECT * FROM target") + val expected = testCase.initialValuesDF.union(testCase.initialValuesDF) + assert(result.schema("value").dataType === testCase.fromType) + checkAnswer(result, expected) + } + } + } + + + for (insert <- Set( + // Cover only a subset of all INSERTs. There's little value in testing all of them and it + // quickly gets expensive. + SQLInsertByPosition(SaveMode.Append), + SQLInsertByName(SaveMode.Append), + DFv1InsertInto(SaveMode.Append), + StreamingInsert)) { + test(s"enable uniform then apply supported type change - ${insert.name}") { + for (testCase <- icebergSupportedTestCases) { + Given(s"changing ${testCase.fromType.sql} -> ${testCase.toType.sql}") + withTable("source", "target") { + testCase.initialValuesDF.write.format("delta").saveAsTable("target") + testCase.additionalValuesDF.write.format("delta").saveAsTable("source") + enableIcebergUniform("target", IcebergCompatV2) + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { + insert.runInsert(columns = Seq("value"), whereCol = "value", whereValue = 1) + } + val result = sql(s"SELECT * FROM target") + assert(result.schema("value").dataType === testCase.toType) + checkAnswer(result, testCase.expectedResult) + } + } + } + + test(s"enable uniform then apply unsupported type change - ${insert.name}") { + for (testCase <- icebergUnsupportedTestCases) { + Given(s"changing ${testCase.fromType.sql} -> ${testCase.toType.sql}") + withTable("source", "target") { + testCase.initialValuesDF.write.format("delta").saveAsTable("target") + // Here we use a source for INSERT that contains the same data that is already present in + // the target, except that it uses a wider type. Since Uniform is enabled and Iceberg + // doesn't support the given type change, we will keep the existing narrower type and + // downcast values. `testCase.additionalValueDF` contains values that would overflow, + // which would just fail, hence why we use `testCase.initialValueDF` instead. + testCase.initialValuesDF + .select(col("value").cast(testCase.toType)) + .write + .format("delta") + .saveAsTable("source") + enableIcebergUniform("target", IcebergCompatV2) + withSQLConf( + DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true", + DeltaSQLConf.DELTA_STREAMING_SINK_ALLOW_IMPLICIT_CASTS.key -> "true" + ) { + insert.runInsert(columns = Seq("value"), whereCol = "value", whereValue = 1) + } + val result = sql(s"SELECT * FROM target") + val expected = testCase.initialValuesDF.union(testCase.initialValuesDF) + assert(result.schema("value").dataType === testCase.fromType) + checkAnswer(result, expected) + } + } + } + } +}