diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala index 0a9d67ba362..47c103a76a0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.types._ /** * Information corresponding to a single type change. - * @param version The version of the table where the type change was made. + * @param version (Deprecated) The version of the table where the type change was made. This is + * only populated by clients using the preview of type widening. * @param fromType The original type before the type change. * @param toType The new type after the type change. * @param fieldPath The path inside nested maps and arrays to the field where the type change was @@ -34,7 +35,7 @@ import org.apache.spark.sql.types._ * arrays. The path is empty if the type change was applied inside a map or array. */ private[delta] case class TypeChange( - version: Long, + version: Option[Long], fromType: DataType, toType: DataType, fieldPath: Seq[String]) { @@ -43,7 +44,8 @@ private[delta] case class TypeChange( /** Serialize this type change to a [[Metadata]] object. */ def toMetadata: Metadata = { val builder = new MetadataBuilder() - .putLong(TABLE_VERSION_METADATA_KEY, version) + version.foreach(builder.putLong(TABLE_VERSION_METADATA_KEY, _)) + builder .putString(FROM_TYPE_METADATA_KEY, fromType.typeName) .putString(TO_TYPE_METADATA_KEY, toType.typeName) if (fieldPath.nonEmpty) { @@ -54,6 +56,9 @@ private[delta] case class TypeChange( } private[delta] object TypeChange { + // tableVersion was a field present during the preview and removed afterwards. We preserve it if + // it's already present in the type change metadata of the table to avoid breaking older clients + // that use it to decide which files must be rewritten when dropping the feature. val TABLE_VERSION_METADATA_KEY: String = "tableVersion" val FROM_TYPE_METADATA_KEY: String = "fromType" val TO_TYPE_METADATA_KEY: String = "toType" @@ -66,8 +71,13 @@ private[delta] object TypeChange { } else { Seq.empty } + val version = if (metadata.contains(TABLE_VERSION_METADATA_KEY)) { + Some(metadata.getLong(TABLE_VERSION_METADATA_KEY)) + } else { + None + } TypeChange( - version = metadata.getLong(TABLE_VERSION_METADATA_KEY), + version, fromType = DataType.fromDDL(metadata.getString(FROM_TYPE_METADATA_KEY)), toType = DataType.fromDDL(metadata.getString(TO_TYPE_METADATA_KEY)), fieldPath @@ -130,11 +140,15 @@ private[delta] object TypeWideningMetadata extends DeltaLogging { val changesToRecord = mutable.Buffer.empty[TypeChange] val schemaWithMetadata = SchemaMergingUtils.transformColumns(schema, oldSchema) { case (_, newField, Some(oldField), _) => - // Record the version the transaction will attempt to use in the type change metadata. If - // there's a conflict with another transaction, the version in the metadata will be updated - // during conflict resolution. See [[ConflictChecker.updateTypeWideningMetadata()]]. - val typeChanges = - collectTypeChanges(oldField.dataType, newField.dataType, txn.getFirstAttemptVersion) + var typeChanges = collectTypeChanges(oldField.dataType, newField.dataType) + // The version field isn't used anymore but we need to populate it in case the table uses + // the preview feature, as preview clients may then rely on the field being present. + if (txn.protocol.isFeatureSupported(TypeWideningPreviewTableFeature)) { + typeChanges = typeChanges.map { change => + change.copy(version = Some(txn.getFirstAttemptVersion)) + } + } + changesToRecord ++= typeChanges TypeWideningMetadata(typeChanges).appendToField(newField) case (_, newField, None, _) => @@ -159,24 +173,24 @@ private[delta] object TypeWideningMetadata extends DeltaLogging { /** * Recursively collect primitive type changes inside nested maps and arrays between `fromType` and - * `toType`. The `version` is the version of the table where the type change was made. + * `toType`. */ - private def collectTypeChanges(fromType: DataType, toType: DataType, version: Long) + private def collectTypeChanges(fromType: DataType, toType: DataType) : Seq[TypeChange] = (fromType, toType) match { case (from: MapType, to: MapType) => - collectTypeChanges(from.keyType, to.keyType, version).map { typeChange => + collectTypeChanges(from.keyType, to.keyType).map { typeChange => typeChange.copy(fieldPath = "key" +: typeChange.fieldPath) } ++ - collectTypeChanges(from.valueType, to.valueType, version).map { typeChange => + collectTypeChanges(from.valueType, to.valueType).map { typeChange => typeChange.copy(fieldPath = "value" +: typeChange.fieldPath) } case (from: ArrayType, to: ArrayType) => - collectTypeChanges(from.elementType, to.elementType, version).map { typeChange => + collectTypeChanges(from.elementType, to.elementType).map { typeChange => typeChange.copy(fieldPath = "element" +: typeChange.fieldPath) } case (fromType: AtomicType, toType: AtomicType) if fromType != toType => Seq(TypeChange( - version, + version = None, fromType, toType, fieldPath = Seq.empty @@ -190,6 +204,10 @@ private[delta] object TypeWideningMetadata extends DeltaLogging { /** * Change the `tableVersion` value in the type change metadata present in `schema`. Used during * conflict resolution to update the version associated with the transaction is incremented. + * + * Note: The `tableVersion` field is only populated for tables that use the preview of type + * widening, we could remove this if/when there are no more tables using the preview of the + * feature. */ def updateTypeChangeVersion(schema: StructType, fromVersion: Long, toVersion: Long): StructType = SchemaMergingUtils.transformColumns(schema) { @@ -197,8 +215,8 @@ private[delta] object TypeWideningMetadata extends DeltaLogging { fromField(field) match { case Some(typeWideningMetadata) => val updatedTypeChanges = typeWideningMetadata.typeChanges.map { - case typeChange if typeChange.version == fromVersion => - typeChange.copy(version = toVersion) + case typeChange if typeChange.version.contains(fromVersion) => + typeChange.copy(version = Some(toVersion)) case olderTypeChange => olderTypeChange } val newMetadata = new MetadataBuilder().withMetadata(field.metadata) @@ -262,19 +280,4 @@ private[delta] object TypeWideningMetadata extends DeltaLogging { getTypeChanges(field).map((fieldPath :+ field.name, _)) } } - - /** Return the version of the latest type change recorded in the schema metadata */ - def getLatestTypeChangeVersion(schema: StructType): Option[Long] = { - val allStructFields = SchemaUtils.filterRecursively(schema, checkComplexTypes = true) { - _ => true - }.map(_._2) - - // Collect all type change versions from all struct fields. - val versions = allStructFields - .flatMap(TypeWideningMetadata.fromField) - .flatMap(_.typeChanges) - .map(_.version) - - if (versions.nonEmpty) Some(versions.max) else None - } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala index c9edee95a06..ef317b7f261 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.delta.typewidening import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.actions.TableFeatureProtocolUtils.propertyKey import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.util.JsonUtils @@ -49,8 +50,12 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { override protected def beforeAll(): Unit = { super.beforeAll() - sql(s"CREATE TABLE $testTableName (a int) USING delta " + - s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'true')") + sql(s"CREATE TABLE $testTableName (a int) USING delta TBLPROPERTIES (" + + s"'${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'true', " + + // Force the stable feature to be used by default in tests instead of the preview feature, + // which is the one currently enabled by default. Tests that cover aspects specific to the + // preview - e.p. populating the `tableVersion` field - explicitly enable the preview feature. + s"'${propertyKey(TypeWideningTableFeature)}' = 'supported')") } override protected def afterAll(): Unit = { @@ -62,12 +67,10 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { * Short-hand to build the metadata for a type change to cut down on repetition. */ private def typeChangeMetadata( - version: Long, fromType: String, toType: String, path: String = ""): Metadata = { val builder = new MetadataBuilder() - .putLong("tableVersion", version) .putString("fromType", fromType) .putString("toType", toType) if (path.nonEmpty) { @@ -77,15 +80,27 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { } test("toMetadata/fromMetadata with empty path") { - val typeChange = TypeChange(version = 1, IntegerType, LongType, Seq.empty) - assert(typeChange.toMetadata === typeChangeMetadata(version = 1, "integer", "long")) + val typeChange = TypeChange(version = None, IntegerType, LongType, Seq.empty) + assert(typeChange.toMetadata === typeChangeMetadata("integer", "long")) assert(TypeChange.fromMetadata(typeChange.toMetadata) === typeChange) } test("toMetadata/fromMetadata with non-empty path") { - val typeChange = TypeChange(10, DateType, TimestampNTZType, Seq("key", "element")) + val typeChange = + TypeChange(version = None, DateType, TimestampNTZType, Seq("key", "element")) assert(typeChange.toMetadata === - typeChangeMetadata(version = 10, "date", "timestamp_ntz", "key.element")) + typeChangeMetadata("date", "timestamp_ntz", "key.element")) + assert(TypeChange.fromMetadata(typeChange.toMetadata) === typeChange) + } + + test("toMetadata/fromMetadata with tableVersion") { + val typeChange = TypeChange(version = Some(1), ByteType, ShortType, Seq.empty) + val expectedMetadata = new MetadataBuilder() + .putLong("tableVersion", 1) + .putString("fromType", "byte") + .putString("toType", "short") + .build() + assert(typeChange.toMetadata === expectedMetadata) assert(TypeChange.fromMetadata(typeChange.toMetadata) === typeChange) } @@ -109,11 +124,12 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { test("fromField with single type change") { val field = StructField("a", IntegerType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long") + typeChangeMetadata("integer", "long") )).build() ) assert(TypeWideningMetadata.fromField(field) === - Some(TypeWideningMetadata(Seq(TypeChange(1, IntegerType, LongType, Seq.empty))))) + Some(TypeWideningMetadata(Seq( + TypeChange(version = None, IntegerType, LongType, Seq.empty))))) val otherField = StructField("a", IntegerType) assert(TypeWideningMetadata.fromField(field).get.appendToField(otherField) === field) } @@ -121,14 +137,32 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { test("fromField with multiple type changes") { val field = StructField("a", IntegerType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long"), - typeChangeMetadata(version = 10, "decimal(5,0)", "decimal(10,2)", "element.element") + typeChangeMetadata("integer", "long"), + typeChangeMetadata("decimal(5,0)", "decimal(10,2)", "element.element") )).build() ) assert(TypeWideningMetadata.fromField(field) === Some(TypeWideningMetadata(Seq( - TypeChange(1, IntegerType, LongType, Seq.empty), - TypeChange(10, DecimalType(5, 0), DecimalType(10, 2), Seq("element", "element")))))) + TypeChange(version = None, IntegerType, LongType, Seq.empty), + TypeChange( + version = None, DecimalType(5, 0), DecimalType(10, 2), Seq("element", "element")))))) + val otherField = StructField("a", IntegerType) + assert(TypeWideningMetadata.fromField(field).get.appendToField(otherField) === field) + } + + test("fromField with tableVersion") { + val typeChange = new MetadataBuilder() + .putLong("tableVersion", 1) + .putString("fromType", "integer") + .putString("toType", "long") + .build() + val field = StructField("a", IntegerType, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array(typeChange)) + .build() + ) + assert(TypeWideningMetadata.fromField(field) === + Some(TypeWideningMetadata(Seq( + TypeChange(version = Some(1), IntegerType, LongType, Seq.empty))))) val otherField = StructField("a", IntegerType) assert(TypeWideningMetadata.fromField(field).get.appendToField(otherField) === field) } @@ -142,11 +176,11 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { // Adding single type change should add the metadata to the field and not otherwise change it. val singleMetadata = TypeWideningMetadata(Seq( - TypeChange(1, IntegerType, LongType, Seq.empty))) + TypeChange(version = None, IntegerType, LongType, Seq.empty))) assert(singleMetadata.appendToField(field) === field.copy(metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long") + typeChangeMetadata("integer", "long") )).build() ) ) @@ -157,13 +191,13 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { // Adding multiple type changes should add the metadata to the field and not otherwise change // it. val multipleMetadata = TypeWideningMetadata(Seq( - TypeChange(1, IntegerType, LongType, Seq.empty), - TypeChange(6, FloatType, DoubleType, Seq("value")))) + TypeChange(version = None, IntegerType, LongType, Seq.empty), + TypeChange(version = None, FloatType, DoubleType, Seq("value")))) assert(multipleMetadata.appendToField(field) === field.copy(metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long"), - typeChangeMetadata(version = 6, "float", "double", "value") + typeChangeMetadata("integer", "long"), + typeChangeMetadata("float", "double", "value") )).build() ) ) @@ -177,7 +211,7 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { val field = StructField("a", IntegerType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long") + typeChangeMetadata("integer", "long") )).build() ) // Adding empty type widening metadata should not change the field. @@ -185,49 +219,49 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { assert(emptyMetadata.appendToField(field) === field) assert(TypeWideningMetadata.fromField(emptyMetadata.appendToField(field)).contains( TypeWideningMetadata(Seq( - TypeChange(1, IntegerType, LongType, Seq.empty))) + TypeChange(version = None, IntegerType, LongType, Seq.empty))) )) // Adding single type change should add the metadata to the field and not otherwise change it. val singleMetadata = TypeWideningMetadata(Seq( - TypeChange(5, DecimalType(18, 0), DecimalType(19, 0), Seq.empty))) + TypeChange(version = None, DecimalType(18, 0), DecimalType(19, 0), Seq.empty))) assert(singleMetadata.appendToField(field) === field.copy( metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long"), - typeChangeMetadata(version = 5, "decimal(18,0)", "decimal(19,0)") + typeChangeMetadata("integer", "long"), + typeChangeMetadata("decimal(18,0)", "decimal(19,0)") )).build() )) val singleMetadataFromField = TypeWideningMetadata.fromField(singleMetadata.appendToField(field)) assert(singleMetadataFromField.contains(TypeWideningMetadata(Seq( - TypeChange(1, IntegerType, LongType, Seq.empty), - TypeChange(5, DecimalType(18, 0), DecimalType(19, 0), Seq.empty))) + TypeChange(version = None, IntegerType, LongType, Seq.empty), + TypeChange(version = None, DecimalType(18, 0), DecimalType(19, 0), Seq.empty))) )) // Adding multiple type changes should add the metadata to the field and not otherwise change // it. val multipleMetadata = TypeWideningMetadata(Seq( - TypeChange(5, DecimalType(18, 0), DecimalType(19, 0), Seq.empty), - TypeChange(6, FloatType, DoubleType, Seq("value")))) + TypeChange(version = None, DecimalType(18, 0), DecimalType(19, 0), Seq.empty), + TypeChange(version = None, FloatType, DoubleType, Seq("value")))) assert(multipleMetadata.appendToField(field) === field.copy( metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long"), - typeChangeMetadata(version = 5, "decimal(18,0)", "decimal(19,0)"), - typeChangeMetadata(version = 6, "float", "double", "value") + typeChangeMetadata("integer", "long"), + typeChangeMetadata("decimal(18,0)", "decimal(19,0)"), + typeChangeMetadata("float", "double", "value") )).build() )) val multipleMetadataFromField = TypeWideningMetadata.fromField(multipleMetadata.appendToField(field)) assert(multipleMetadataFromField.contains(TypeWideningMetadata(Seq( - TypeChange(1, IntegerType, LongType, Seq.empty), - TypeChange(5, DecimalType(18, 0), DecimalType(19, 0), Seq.empty), - TypeChange(6, FloatType, DoubleType, Seq("value")))) + TypeChange(version = None, IntegerType, LongType, Seq.empty), + TypeChange(version = None, DecimalType(18, 0), DecimalType(19, 0), Seq.empty), + TypeChange(version = None, FloatType, DoubleType, Seq("value")))) )) } @@ -270,28 +304,28 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { assert(schema("i") === StructField("i", LongType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "short", "long") + typeChangeMetadata("short", "long") )).build() )) assert(schema("d") === StructField("d", DecimalType(15, 4), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "decimal(6,2)", "decimal(15,4)") + typeChangeMetadata("decimal(6,2)", "decimal(15,4)") )).build() )) assert(schema("a") === StructField("a", ArrayType(DoubleType), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "byte", "double", "element") + typeChangeMetadata("byte", "double", "element") )).build() )) assert(schema("m") === StructField("m", MapType(ShortType, IntegerType), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "byte", "short", "key") + typeChangeMetadata("byte", "short", "key") )).build() )) @@ -308,32 +342,32 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { assert(schema("i") === StructField("i", LongType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "short", "long"), - typeChangeMetadata(version = 1, "integer", "long") + typeChangeMetadata("short", "long"), + typeChangeMetadata("integer", "long") )).build() )) assert(schema("d") === StructField("d", DecimalType(15, 4), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "decimal(6,2)", "decimal(15,4)"), - typeChangeMetadata(version = 1, "decimal(10,4)", "decimal(15,4)") + typeChangeMetadata("decimal(6,2)", "decimal(15,4)"), + typeChangeMetadata("decimal(10,4)", "decimal(15,4)") )).build() )) assert(schema("a") === StructField("a", ArrayType(DoubleType), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "byte", "double", "element"), - typeChangeMetadata(version = 1, "integer", "double", "element") + typeChangeMetadata("byte", "double", "element"), + typeChangeMetadata("integer", "double", "element") )).build() )) assert(schema("m") === StructField("m", MapType(ShortType, IntegerType), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "byte", "short", "key"), - typeChangeMetadata(version = 1, "byte", "integer", "value") + typeChangeMetadata("byte", "short", "key"), + typeChangeMetadata("byte", "integer", "value") )).build() )) @@ -362,14 +396,14 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { assert(struct("i") === StructField("i", LongType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "short", "long") + typeChangeMetadata("short", "long") )).build() )) assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, LongType)), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "byte", "integer", "element.key") + typeChangeMetadata("byte", "integer", "element.key") )).build() )) @@ -377,7 +411,7 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { MapType(MapType(LongType, IntegerType), ArrayType(LongType)), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long", "key.key") + typeChangeMetadata("integer", "long", "key.key") )).build() )) @@ -395,16 +429,16 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { assert(struct("i") === StructField("i", LongType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "short", "long"), - typeChangeMetadata(version = 1, "integer", "long") + typeChangeMetadata("short", "long"), + typeChangeMetadata("integer", "long") )).build() )) assert(struct("a") === StructField("a", ArrayType(MapType(IntegerType, LongType)), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "byte", "integer", "element.key"), - typeChangeMetadata(version = 1, "integer", "long", "element.value") + typeChangeMetadata("byte", "integer", "element.key"), + typeChangeMetadata("integer", "long", "element.value") )).build() )) @@ -412,8 +446,8 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { MapType(MapType(LongType, IntegerType), ArrayType(LongType)), metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long", "key.key"), - typeChangeMetadata(version = 1, "integer", "long", "value.element") + typeChangeMetadata("integer", "long", "key.key"), + typeChangeMetadata("integer", "long", "value.element") )).build() )) assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === @@ -436,7 +470,7 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { assert(schema("b") === StructField("b", LongType, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long") + typeChangeMetadata("integer", "long") )).build() )) assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === @@ -459,6 +493,33 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === newSchema -> Seq.empty) } + test("addTypeWideningMetadata/removeTypeWideningMetadata with preview feature") { + val newSchema = StructType.fromDDL("a short") + val oldSchema = StructType.fromDDL("a byte") + + // Create a new transaction with the preview feature supported. + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(testTableName)) + val txn = DeltaLog.forTable(spark, TableIdentifier(testTableName)) + .startTransaction(catalogTableOpt = Some(table)) + txn.updateProtocol(txn.protocol.withFeature(TypeWideningPreviewTableFeature)) + val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, newSchema, oldSchema) + + // Type widening metadata is added with field `tableVersion` populated as this uses the preview + // feature. That field is deprecated in the stable version of the feature. + assert(schema("a") === StructField("a", ShortType, + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putLong("tableVersion", 1) + .putString("fromType", "byte") + .putString("toType", "short") + .build() + )).build() + )) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + newSchema -> Seq(Seq.empty -> schema("a"))) + } + test("updateTypeChangeVersion with no type changes") { val schema = new StructType().add("a", IntegerType) assert(TypeWideningMetadata.updateTypeChangeVersion(schema, 1, 4) === schema) @@ -468,7 +529,7 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { val schema = new StructType() .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long") + typeChangeMetadata("integer", "long") )) .build() ) @@ -477,7 +538,7 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { new StructType() .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 4, "integer", "long") + typeChangeMetadata("integer", "long") )) .build() ) @@ -488,8 +549,8 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { val schema = new StructType() .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long"), - typeChangeMetadata(version = 6, "float", "double", "value") + typeChangeMetadata("integer", "long"), + typeChangeMetadata("float", "double", "value") )) .build() ) @@ -499,8 +560,8 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { new StructType() .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 4, "integer", "long"), - typeChangeMetadata(version = 6, "float", "double", "value") + typeChangeMetadata("integer", "long"), + typeChangeMetadata("float", "double", "value") )) .build() ) @@ -516,12 +577,12 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { val schema = new StructType() .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "integer", "long") + typeChangeMetadata("integer", "long") )) .build()) .add("b", ArrayType(IntegerType), nullable = true, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 1, "short", "integer", "element") + typeChangeMetadata("short", "integer", "element") )) .build()) @@ -530,12 +591,12 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { new StructType() .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 4, "integer", "long") + typeChangeMetadata("integer", "long") )) .build()) .add("b", ArrayType(IntegerType), nullable = true, metadata = new MetadataBuilder() .putMetadataArray("delta.typeChanges", Array( - typeChangeMetadata(version = 4, "short", "integer", "element") + typeChangeMetadata("short", "integer", "element") )) .build()) ) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala index 9d8c804b259..4f48123b156 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala @@ -704,4 +704,61 @@ trait TypeWideningTableFeatureTests extends RowTrackingTestUtils with TypeWideni ) assertFeatureSupported(preview = false, stable = false) } + + test("tableVersion metadata is correctly set and preserved when using the preview feature") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'false')") + + addTableFeature(tempPath, TypeWideningPreviewTableFeature) + enableTypeWidening(tempPath) + addSingleFile(Seq(1), ByteType) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE short") + + assert(readDeltaTable(tempPath).schema === new StructType() + .add("a", ShortType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putLong("tableVersion", 4) + .putString("fromType", "byte") + .putString("toType", "short") + .build() + )).build())) + + // It's allowed to manually add both the preview and stable feature to the same table - the + // specs are compatible. In that case, we still populate the `tableVersion` field. + addTableFeature(tempPath, TypeWideningTableFeature) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") + assert(readDeltaTable(tempPath).schema === new StructType() + .add("a", IntegerType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putLong("tableVersion", 4) + .putString("fromType", "byte") + .putString("toType", "short") + .build(), + new MetadataBuilder() + .putLong("tableVersion", 6) + .putString("fromType", "short") + .putString("toType", "integer") + .build() + )).build())) + } + + test("tableVersion isn't set when using the stable feature") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'false')") + + addTableFeature(tempPath, TypeWideningTableFeature) + enableTypeWidening(tempPath) + addSingleFile(Seq(1), ByteType) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE short") + assert(readDeltaTable(tempPath).schema === new StructType() + .add("a", ShortType, nullable = true, metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putString("fromType", "byte") + .putString("toType", "short") + .build() + )).build())) + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTestMixin.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTestMixin.scala index 7144c9f5057..b33bb3ad077 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTestMixin.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTestMixin.scala @@ -72,7 +72,7 @@ trait TypeWideningTestMixin extends DeltaSQLCommandTest with DeltaDMLTestUtils { path: Seq[String] = Seq.empty): Metadata = new MetadataBuilder() .putMetadataArray( - "delta.typeChanges", Array(TypeChange(version, from, to, path).toMetadata)) + "delta.typeChanges", Array(TypeChange(Some(version), from, to, path).toMetadata)) .build() def addSingleFile[T: Encoder](values: Seq[T], dataType: DataType): Unit =