diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala index 3d5edc20031..47c103a76a0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala @@ -57,7 +57,8 @@ private[delta] case class TypeChange( private[delta] object TypeChange { // tableVersion was a field present during the preview and removed afterwards. We preserve it if - // it's already present in the type change metadata of the table to avoid breaking older clients. + // it's already present in the type change metadata of the table to avoid breaking older clients + // that use it to decide which files must be rewritten when dropping the feature. val TABLE_VERSION_METADATA_KEY: String = "tableVersion" val FROM_TYPE_METADATA_KEY: String = "fromType" val TO_TYPE_METADATA_KEY: String = "toType" @@ -140,10 +141,9 @@ private[delta] object TypeWideningMetadata extends DeltaLogging { val schemaWithMetadata = SchemaMergingUtils.transformColumns(schema, oldSchema) { case (_, newField, Some(oldField), _) => var typeChanges = collectTypeChanges(oldField.dataType, newField.dataType) - // The version field isn't used anymore but we need to populate it in case the table doesn't - // use the stable feature, as preview clients may then still access the table and rely on - // the field being present. - if (!txn.protocol.isFeatureSupported(TypeWideningTableFeature)) { + // The version field isn't used anymore but we need to populate it in case the table uses + // the preview feature, as preview clients may then rely on the field being present. + if (txn.protocol.isFeatureSupported(TypeWideningPreviewTableFeature)) { typeChanges = typeChanges.map { change => change.copy(version = Some(txn.getFirstAttemptVersion)) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala index 906fcfd4418..ef317b7f261 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningMetadataSuite.scala @@ -52,6 +52,9 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { super.beforeAll() sql(s"CREATE TABLE $testTableName (a int) USING delta TBLPROPERTIES (" + s"'${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'true', " + + // Force the stable feature to be used by default in tests instead of the preview feature, + // which is the one currently enabled by default. Tests that cover aspects specific to the + // preview - e.p. populating the `tableVersion` field - explicitly enable the preview feature. s"'${propertyKey(TypeWideningTableFeature)}' = 'supported')") } @@ -490,6 +493,33 @@ trait TypeWideningMetadataTests extends QueryTest with DeltaSQLCommandTest { assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === newSchema -> Seq.empty) } + test("addTypeWideningMetadata/removeTypeWideningMetadata with preview feature") { + val newSchema = StructType.fromDDL("a short") + val oldSchema = StructType.fromDDL("a byte") + + // Create a new transaction with the preview feature supported. + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(testTableName)) + val txn = DeltaLog.forTable(spark, TableIdentifier(testTableName)) + .startTransaction(catalogTableOpt = Some(table)) + txn.updateProtocol(txn.protocol.withFeature(TypeWideningPreviewTableFeature)) + val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, newSchema, oldSchema) + + // Type widening metadata is added with field `tableVersion` populated as this uses the preview + // feature. That field is deprecated in the stable version of the feature. + assert(schema("a") === StructField("a", ShortType, + metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putLong("tableVersion", 1) + .putString("fromType", "byte") + .putString("toType", "short") + .build() + )).build() + )) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + newSchema -> Seq(Seq.empty -> schema("a"))) + } + test("updateTypeChangeVersion with no type changes") { val schema = new StructType().add("a", IntegerType) assert(TypeWideningMetadata.updateTypeChangeVersion(schema, 1, 4) === schema) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala index 2354a04ef8b..4f48123b156 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala @@ -724,6 +724,8 @@ trait TypeWideningTableFeatureTests extends RowTrackingTestUtils with TypeWideni .build() )).build())) + // It's allowed to manually add both the preview and stable feature to the same table - the + // specs are compatible. In that case, we still populate the `tableVersion` field. addTableFeature(tempPath, TypeWideningTableFeature) sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") assert(readDeltaTable(tempPath).schema === new StructType() @@ -735,6 +737,7 @@ trait TypeWideningTableFeatureTests extends RowTrackingTestUtils with TypeWideni .putString("toType", "short") .build(), new MetadataBuilder() + .putLong("tableVersion", 6) .putString("fromType", "short") .putString("toType", "integer") .build()