diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala index e5962d7ac54..b715b7a0be8 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ConflictChecker.scala @@ -560,7 +560,9 @@ private[delta] class ConflictChecker( * to handle the row tracking feature being enabled by the winning transaction. */ private def reassignRowCommitVersions(): Unit = { - if (!RowTracking.isSupported(currentTransactionInfo.protocol)) { + if (!RowTracking.isSupported(currentTransactionInfo.protocol) && + // Type widening relies on default row commit versions to be set. + !TypeWidening.isSupported(currentTransactionInfo.protocol)) { return } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala index 0cba4a05c09..e7046ee42fe 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DefaultRowCommitVersion.scala @@ -24,7 +24,8 @@ object DefaultRowCommitVersion { protocol: Protocol, actions: Iterator[Action], version: Long): Iterator[Action] = { - if (!RowTracking.isSupported(protocol)) { + // Type Widening relies on default row commit versions to be set. + if (!RowTracking.isSupported(protocol) && !TypeWidening.isSupported(protocol)) { return actions } actions.map { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala index 9e43ac3e6d1..97c7ab60e9d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreDowngradeTableFeatureCommand.scala @@ -18,10 +18,12 @@ package org.apache.spark.sql.delta import java.util.concurrent.TimeUnit import org.apache.spark.sql.delta.catalog.DeltaTableV2 -import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand} +import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.util.{Utils => DeltaUtils} +import org.apache.spark.sql.catalyst.analysis.ResolvedTable + /** * A base class for implementing a preparation command for removing table features. * Must implement a run method. Note, the run method must be implemented in a way that when @@ -126,3 +128,88 @@ case class V2CheckpointPreDowngradeCommand(table: DeltaTableV2) true } } + +case class TypeWideningPreDowngradeCommand(table: DeltaTableV2) + extends PreDowngradeTableFeatureCommand + with DeltaLogging { + + /** + * Unset the type widening table property to prevent new type changes to be applied to the table, + * then removes traces of the feature: + * - Rewrite files that have columns or fields with a different type than in the current table + * schema. These are all files not added or modified after the last type change. + * - Remove the type widening metadata attached to fields in the current table schema. + * + * @return Return true if files were rewritten or metadata was removed. False otherwise. + */ + override def removeFeatureTracesIfNeeded(): Boolean = { + if (TypeWideningTableFeature.validateRemoval(table.initialSnapshot)) return false + + val startTimeNs = System.nanoTime() + val properties = Seq(DeltaConfigs.ENABLE_TYPE_WIDENING.key) + AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark) + val numFilesRewritten = rewriteFilesIfNeeded() + val metadataRemoved = removeMetadataIfNeeded() + + recordDeltaEvent( + table.deltaLog, + opType = "delta.typeWideningFeatureRemovalMetrics", + data = Map( + "downgradeTimeMs" -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs), + "numFilesRewritten" -> numFilesRewritten, + "metadataRemoved" -> metadataRemoved + ) + ) + numFilesRewritten > 0 || metadataRemoved + } + + /** + * Rewrite files that have columns or fields with a different type than in the current table + * schema. These are all files not added or modified after the last type change. + * @return Return the number of files rewritten. + */ + private def rewriteFilesIfNeeded(): Long = { + val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot) + if (numFilesToRewrite == 0L) return 0L + + // Get the table Id and catalog from the delta table to build a ResolvedTable plan for the reorg + // command. + import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + val tableId = table.spark + .sessionState + .sqlParser + .parseTableIdentifier(table.name).nameParts.asIdentifier + val catalog = table.spark.sessionState.catalogManager.currentCatalog.asTableCatalog + + val reorg = DeltaReorgTableCommand( + ResolvedTable.create( + catalog, + tableId, + table + ), + DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) + )(Nil) + + reorg.run(table.spark) + numFilesToRewrite + } + + /** + * Remove the type widening metadata attached to fields in the current table schema. + * @return Return true if any metadata was removed. False otherwise. + */ + private def removeMetadataIfNeeded(): Boolean = { + if (!TypeWideningMetadata.containsTypeWideningMetadata(table.initialSnapshot.schema)) { + return false + } + + val txn = table.startTransaction() + val metadata = txn.metadata + val (cleanedSchema, changes) = + TypeWideningMetadata.removeTypeWideningMetadata(metadata.schema) + txn.commit( + metadata.copy(schemaString = cleanedSchema.json) :: Nil, + DeltaOperations.UpdateColumnMetadata("DROP FEATURE", changes)) + true + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 8acb32d536e..1966d0ed176 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -629,7 +629,8 @@ object ManagedCommitTableFeature } object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening-dev") - with FeatureAutomaticallyEnabledByMetadata { + with FeatureAutomaticallyEnabledByMetadata + with RemovableFeature { override def automaticallyUpdateProtocolOfExistingTables: Boolean = true private def isTypeWideningSupportNeededByMetadata(metadata: Metadata): Boolean = @@ -638,6 +639,19 @@ object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening override def metadataRequiresFeatureToBeEnabled( metadata: Metadata, spark: SparkSession): Boolean = isTypeWideningSupportNeededByMetadata(metadata) + + override def validateRemoval(snapshot: Snapshot): Boolean = + !isTypeWideningSupportNeededByMetadata(snapshot.metadata) && + !TypeWideningMetadata.containsTypeWideningMetadata(snapshot.metadata.schema) + + override def actionUsesFeature(action: Action): Boolean = + action match { + case m: Metadata => TypeWideningMetadata.containsTypeWideningMetadata(m.schema) + case _ => false + } + + override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand = + TypeWideningPreDowngradeCommand(table) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index b3dd6b2b1cc..2efbdf11e47 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -16,9 +16,10 @@ package org.apache.spark.sql.delta -import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils} +import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, TableFeatureProtocolUtils} import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.types._ object TypeWidening { @@ -60,4 +61,33 @@ object TypeWidening { case (ByteType | ShortType, IntegerType) => true case _ => false } + + /** + * Filter the given list of files to only keep files that were written before the latest type + * change, if any. These older files contain a column or field with a type that is different than + * in the current table schema and must be rewritten when dropping the type widening table feature + * to make the table readable by readers that don't support the feature. + */ + def filterFilesRequiringRewrite(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = + TypeWideningMetadata.getLatestTypeChangeVersion(snapshot.metadata.schema) match { + case Some(latestVersion) => + files.filter(_.defaultRowCommitVersion match { + case Some(version) => version < latestVersion + // Files written before the type widening table feature was added to the table don't + // have a defaultRowCommitVersion. That does mean they were written before the latest + // type change. + case None => true + }) + case None => + Seq.empty + } + + + /** + * Return the number of files that were written before the latest type change and that then + * contain a column or field with a type that is different from the current able schema. + */ + def numFilesRequiringRewrite(snapshot: Snapshot): Long = { + filterFilesRequiringRewrite(snapshot, snapshot.allFiles.collect()).size + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala index 82accd624ac..87905035301 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWideningMetadata.scala @@ -16,7 +16,9 @@ package org.apache.spark.sql.delta -import org.apache.spark.sql.delta.schema.SchemaMergingUtils +import scala.collection.mutable + +import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} import org.apache.spark.sql.util.ScalaExtensions._ import org.apache.spark.sql.types._ @@ -156,12 +158,12 @@ private[delta] object TypeWideningMetadata { typeChange.copy(fieldPath = "element" +: typeChange.fieldPath) } case (fromType: AtomicType, toType: AtomicType) if fromType != toType => - Seq(TypeChange( - version, - fromType, - toType, - fieldPath = Seq.empty - )) + Seq(TypeChange( + version, + fromType, + toType, + fieldPath = Seq.empty + )) case (_: AtomicType, _: AtomicType) => Seq.empty // Don't recurse inside structs, `collectTypeChanges` should be called directly on each struct // fields instead to only collect type changes inside these fields. @@ -192,4 +194,49 @@ private[delta] object TypeWideningMetadata { case None => field } } + + /** + * Remove the type widening metadata from all the fields in the given schema. + * Return the cleaned schema and a list of fields with their path that had type widening metadata. + */ + def removeTypeWideningMetadata(schema: StructType) + : (StructType, Seq[(Seq[String], StructField)]) = { + if (!containsTypeWideningMetadata(schema)) return (schema, Seq.empty) + + val changes = mutable.Buffer.empty[(Seq[String], StructField)] + val newSchema = SchemaMergingUtils.transformColumns(schema) { + case (fieldPath: Seq[String], field: StructField, _) + if field.metadata.contains(TYPE_CHANGES_METADATA_KEY) => + changes.append((fieldPath, field)) + val cleanMetadata = new MetadataBuilder() + .withMetadata(field.metadata) + .remove(TYPE_CHANGES_METADATA_KEY) + .build() + field.copy(metadata = cleanMetadata) + case (_, field: StructField, _) => field + } + newSchema -> changes.toSeq + } + + /** Recursively checks whether any struct field in the schema contains type widening metadata. */ + def containsTypeWideningMetadata(schema: StructType): Boolean = + schema.existsRecursively { + case s: StructType => s.exists(_.metadata.contains(TYPE_CHANGES_METADATA_KEY)) + case _ => false + } + + /** Return the version of the latest type change recorded in the schema metadata */ + def getLatestTypeChangeVersion(schema: StructType): Option[Long] = { + val allStructFields = SchemaUtils.filterRecursively(schema, checkComplexTypes = true) { + _ => true + }.map(_._2) + + // Collect all type change versions from all struct fields. + val versions = allStructFields + .flatMap(TypeWideningMetadata.fromField) + .flatMap(_.typeChanges) + .map(_.version) + + if (versions.nonEmpty) Some(versions.max) else None + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala index c21efc5ddf0..f78ef60ae29 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaReorgTableCommand.scala @@ -16,13 +16,14 @@ package org.apache.spark.sql.delta.commands +import org.apache.spark.sql.delta.{Snapshot, TypeWidening} import org.apache.spark.sql.delta.actions.AddFile import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LeafCommand, LogicalPlan, UnaryCommand} object DeltaReorgTableMode extends Enumeration { - val PURGE, UNIFORM_ICEBERG = Value + val PURGE, UNIFORM_ICEBERG, REWRITE_TYPE_WIDENING = Value } case class DeltaReorgTableSpec( @@ -70,7 +71,8 @@ case class DeltaReorgTableCommand( } override def run(sparkSession: SparkSession): Seq[Row] = reorgTableSpec match { - case DeltaReorgTableSpec(DeltaReorgTableMode.PURGE, None) => + case DeltaReorgTableSpec( + DeltaReorgTableMode.PURGE | DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) => optimizeByReorg(sparkSession) case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) => val table = getDeltaTable(target, "REORG") @@ -82,6 +84,8 @@ case class DeltaReorgTableCommand( new DeltaPurgeOperation() case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) => new DeltaUpgradeUniformOperation(icebergCompatVersion) + case DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) => + new DeltaRewriteTypeWideningOperation() } } @@ -93,14 +97,14 @@ sealed trait DeltaReorgOperation { * Collects files that need to be processed by the reorg operation from the list of candidate * files. */ - def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] + def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] } /** * Reorg operation to purge files with soft deleted rows. */ class DeltaPurgeOperation extends DeltaReorgOperation { - override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] = + override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = files.filter { file => (file.deletionVector != null && file.numPhysicalRecords.isEmpty) || file.numDeletedRecords > 0L @@ -111,7 +115,7 @@ class DeltaPurgeOperation extends DeltaReorgOperation { * Reorg operation to upgrade the iceberg compatibility version of a table. */ class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorgOperation { - override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] = { + override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = { def shouldRewriteToBeIcebergCompatible(file: AddFile): Boolean = { if (file.tags == null) return true val icebergCompatVersion = file.tags.getOrElse(AddFile.Tags.ICEBERG_COMPAT_VERSION.name, "0") @@ -120,3 +124,12 @@ class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorg files.filter(shouldRewriteToBeIcebergCompatible) } } + +/** + * Internal reorg operation to rewrite files to conform to the current table schema when dropping + * the type widening table feature. + */ +class DeltaRewriteTypeWideningOperation extends DeltaReorgOperation { + override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = + TypeWidening.filterFilesRequiringRewrite(snapshot, files) +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala index 656ad0d8b72..a38967ee0aa 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala @@ -264,7 +264,7 @@ class OptimizeExecutor( val partitionSchema = txn.metadata.partitionSchema val filesToProcess = optimizeContext.reorg match { - case Some(reorgOperation) => reorgOperation.filterFilesToReorg(candidateFiles) + case Some(reorgOperation) => reorgOperation.filterFilesToReorg(txn.snapshot, candidateFiles) case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles) } val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala index 24dc88ed03c..ef1b9835a81 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningMetadataSuite.scala @@ -220,7 +220,7 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest )) } - test("addTypeWideningMetadata with no type changes") { + test("addTypeWideningMetadata/removeTypeWideningMetadata with no type changes") { for { (oldSchema, newSchema) <- Seq( ("a short", "a short"), @@ -240,19 +240,21 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest withClue(s"oldSchema = $oldSchema, newSchema = $newSchema") { val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, newSchema, oldSchema) assert(schema === newSchema) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === schema -> Seq.empty) } } } - test("addTypeWideningMetadata on top-level fields") { - var schema = + test("addTypeWideningMetadata/removeTypeWideningMetadata on top-level fields") { + val schemaWithoutMetadata = StructType.fromDDL("i long, d decimal(15, 4), a array, m map") val firstOldSchema = StructType.fromDDL("i short, d decimal(6, 2), a array, m map") val secondOldSchema = StructType.fromDDL("i int, d decimal(10, 4), a array, m map") - schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, firstOldSchema) + var schema = + TypeWideningMetadata.addTypeWideningMetadata(txn, schemaWithoutMetadata, firstOldSchema) assert(schema("i") === StructField("i", LongType, metadata = new MetadataBuilder() @@ -282,6 +284,13 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest )).build() )) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + schemaWithoutMetadata -> Seq( + Seq.empty -> schema("i"), + Seq.empty -> schema("d"), + Seq.empty -> schema("a"), + Seq.empty -> schema("m") + )) // Second type change on all fields. schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema) @@ -316,10 +325,18 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest typeChangeMetadata(version = 1, "byte", "integer", "value") )).build() )) + + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + schemaWithoutMetadata -> Seq( + Seq.empty -> schema("i"), + Seq.empty -> schema("d"), + Seq.empty -> schema("a"), + Seq.empty -> schema("m") + )) } - test("addTypeWideningMetadata on nested fields") { - var schema = StructType.fromDDL( + test("addTypeWideningMetadata/removeTypeWideningMetadata on nested fields") { + val schemaWithoutMetadata = StructType.fromDDL( "s struct>, m: map, array>>") val firstOldSchema = StructType.fromDDL( "s struct>, m: map, array>>") @@ -327,7 +344,8 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest "s struct>, m: map, array>>") // First type change on all struct fields. - schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, firstOldSchema) + var schema = + TypeWideningMetadata.addTypeWideningMetadata(txn, schemaWithoutMetadata, firstOldSchema) var struct = schema("s").dataType.asInstanceOf[StructType] assert(struct("i") === StructField("i", LongType, @@ -352,6 +370,13 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest )).build() )) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + schemaWithoutMetadata -> Seq( + Seq("s") -> struct("i"), + Seq("s") -> struct("a"), + Seq("s") -> struct("m") + )) + // Second type change on all struct fields. schema = TypeWideningMetadata.addTypeWideningMetadata(txn, schema, secondOldSchema) struct = schema("s").dataType.asInstanceOf[StructType] @@ -380,9 +405,15 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest typeChangeMetadata(version = 1, "integer", "long", "value.element") )).build() )) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + schemaWithoutMetadata -> Seq( + Seq("s") -> struct("i"), + Seq("s") -> struct("a"), + Seq("s") -> struct("m") + )) } - test("addTypeWideningMetadata with added and removed fields") { + test("addTypeWideningMetadata/removeTypeWideningMetadata with added and removed fields") { val newSchema = StructType.fromDDL("a int, b long, d int") val oldSchema = StructType.fromDDL("a int, b int, c int") @@ -397,13 +428,16 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest typeChangeMetadata(version = 1, "integer", "long") )).build() )) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === + newSchema -> Seq(Seq.empty -> schema("b")) + ) } - test("addTypeWideningMetadata with different field position") { - val initialSchema = StructType.fromDDL("a short, b int, s struct") - val secondSchema = StructType.fromDDL("b int, a short, s struct") + test("addTypeWideningMetadata/removeTypeWideningMetadata with different field position") { + val newSchema = StructType.fromDDL("a short, b int, s struct") + val oldSchema = StructType.fromDDL("b int, a short, s struct") - val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, initialSchema, secondSchema) + val schema = TypeWideningMetadata.addTypeWideningMetadata(txn, newSchema, oldSchema) // No type widening metadata is added. assert(schema("a") === StructField("a", ShortType)) assert(schema("b") === StructField("b", IntegerType)) @@ -411,6 +445,7 @@ class DeltaTypeWideningMetadataSuite extends QueryTest with DeltaSQLCommandTest StructField("s", new StructType() .add("c", IntegerType) .add("d", LongType))) + assert(TypeWideningMetadata.removeTypeWideningMetadata(schema) === newSchema -> Seq.empty) } test("updateTypeChangeVersion with no type changes") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index fca8e3e046f..9fb52146ee2 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -16,17 +16,26 @@ package org.apache.spark.sql.delta +import java.util.concurrent.TimeUnit + +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.delta.catalog.DeltaTableV2 +import org.apache.spark.sql.delta.commands.AlterTableDropFeatureDeltaCommand +import org.apache.spark.sql.delta.rowtracking.RowTrackingTestUtils import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.util.JsonUtils +import org.apache.hadoop.fs.Path +import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Encoder, QueryTest, Row} import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.execution.datasources.parquet.ParquetTest -import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.functions.{col, lit} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ +import org.apache.spark.util.ManualClock /** * Suite covering the type widening table feature. @@ -35,6 +44,7 @@ class DeltaTypeWideningSuite extends QueryTest with ParquetTest with DeltaDMLTestUtils + with RowTrackingTestUtils with DeltaSQLCommandTest with DeltaTypeWideningTestMixin with DeltaTypeWideningAlterTableTests @@ -602,20 +612,104 @@ trait DeltaTypeWideningMetadataTests { ]}""".stripMargin) } -trait DeltaTypeWideningTableFeatureTests { - self: QueryTest with ParquetTest with DeltaDMLTestUtils with DeltaTypeWideningTestMixin - with SharedSparkSession => +/** + * Tests covering adding and removing the type widening table feature. Dropping the table feature + * also includes rewriting data files with the old type and removing type widening metadata. + */ +trait DeltaTypeWideningTableFeatureTests extends BeforeAndAfterEach { + self: QueryTest + with ParquetTest + with DeltaDMLTestUtils + with RowTrackingTestUtils + with DeltaTypeWideningTestMixin => + + import testImplicits._ + + /** Clock used to advance past the retention period when dropping the table feature. */ + var clock: ManualClock = _ + + override protected def beforeEach(): Unit = { + super.beforeEach() + clock = new ManualClock(System.currentTimeMillis()) + // Override the (cached) delta log with one using our manual clock. + DeltaLog.clearCache() + deltaLog = DeltaLog.forTable(spark, new Path(tempPath), clock) + } def isTypeWideningSupported: Boolean = { - val snapshot = DeltaLog.forTable(spark, tempPath).unsafeVolatileSnapshot - TypeWidening.isSupported(snapshot.protocol) + TypeWidening.isSupported(deltaLog.update().protocol) } def isTypeWideningEnabled: Boolean = { - val snapshot = DeltaLog.forTable(spark, tempPath).unsafeVolatileSnapshot + val snapshot = deltaLog.update() TypeWidening.isEnabled(snapshot.protocol, snapshot.metadata) } + /** Expected outcome of dropping the type widening table feature. */ + object ExpectedOutcome extends Enumeration { + val SUCCESS, FAIL_CURRENT_VERSION_USES_FEATURE, FAIL_HISTORICAL_VERSION_USES_FEATURE = Value + } + + /** Helper method to drop the type widening table feature and check for an expected outcome. */ + def dropTableFeature(expectedOutcome: ExpectedOutcome.Value): Unit = { + // Need to directly call ALTER TABLE command to pass our deltaLog with manual clock. + val dropFeature = AlterTableDropFeatureDeltaCommand( + DeltaTableV2(spark, deltaLog.dataPath), + TypeWideningTableFeature.name) + + expectedOutcome match { + case ExpectedOutcome.SUCCESS => + dropFeature.run(spark) + case ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE => + checkError( + exception = intercept[DeltaTableFeatureException] { dropFeature.run(spark) }, + errorClass = "DELTA_FEATURE_DROP_WAIT_FOR_RETENTION_PERIOD", + parameters = Map( + "feature" -> TypeWideningTableFeature.name, + "logRetentionPeriodKey" -> DeltaConfigs.LOG_RETENTION.key, + "logRetentionPeriod" -> DeltaConfigs.LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString, + "truncateHistoryLogRetentionPeriod" -> + DeltaConfigs.TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString) + ) + case ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE => + checkError( + exception = intercept[DeltaTableFeatureException] { dropFeature.run(spark) }, + errorClass = "DELTA_FEATURE_DROP_HISTORICAL_VERSIONS_EXIST", + parameters = Map( + "feature" -> TypeWideningTableFeature.name, + "logRetentionPeriodKey" -> DeltaConfigs.LOG_RETENTION.key, + "logRetentionPeriod" -> DeltaConfigs.LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString, + "truncateHistoryLogRetentionPeriod" -> + DeltaConfigs.TABLE_FEATURE_DROP_TRUNCATE_HISTORY_LOG_RETENTION + .fromMetaData(deltaLog.unsafeVolatileMetadata).toString) + ) + } + } + + /** + * Use this after dropping the table feature to artificially move the current time to after + * the table retention period. + */ + def advancePastRetentionPeriod(): Unit = { + clock.advance( + deltaLog.deltaRetentionMillis(deltaLog.update().metadata) + + TimeUnit.MINUTES.toMillis(5)) + } + + def addSingleFile[T: Encoder](values: Seq[T], dataType: DataType): Unit = + append(values.toDF("a").select(col("a").cast(dataType)).repartition(1)) + + /** Get the number of AddFile actions committed since the given table version (included). */ + def getNumAddFilesSinceVersion(version: Long): Long = + deltaLog + .getChanges(startVersion = version) + .flatMap { case (_, actions) => actions } + .collect { case a: AddFile => a } + .size + test("enable type widening at table creation then disable it") { sql(s"CREATE TABLE delta.`$tempPath` (a int) USING DELTA " + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'true')") @@ -711,4 +805,156 @@ trait DeltaTypeWideningTableFeatureTests { enableTypeWidening(tempPath, enabled = false) sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE INT") } + + test("drop unused table feature on empty table") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version = 0) === 0) + checkAnswer(readDeltaTable(tempPath), Seq.empty) + } + + // Rewriting the data when dropping the table feature relies on the default row commit version + // being set even when row tracking isn't enabled. + for(rowTrackingEnabled <- BOOLEAN_DOMAIN) { + test(s"drop unused table feature on table with data, rowTrackingEnabled=$rowTrackingEnabled") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") + addSingleFile(Seq(1, 2, 3), ByteType) + assert(getNumAddFilesSinceVersion(version = 0) === 1) + + val version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) + } + + test(s"drop unused table feature on table with data inserted before adding the table feature," + + s"rowTrackingEnabled=$rowTrackingEnabled") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA " + + s"TBLPROPERTIES ('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = 'false')") + addSingleFile(Seq(1, 2, 3), ByteType) + enableTypeWidening(tempPath) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") + assert(getNumAddFilesSinceVersion(version = 0) === 1) + + var version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 1) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + + version = deltaLog.update().version + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) + } + + test(s"drop table feature on table with data added only after type change, " + + s"rowTrackingEnabled=$rowTrackingEnabled") { + sql(s"CREATE TABLE delta.`$tempPath` (a byte) USING DELTA") + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN a TYPE int") + addSingleFile(Seq(1, 2, 3), IntegerType) + assert(getNumAddFilesSinceVersion(version = 0) === 1) + + // We could actually drop the table feature directly here instead of failing by checking that + // there were no files added before the type change. This may be an expensive check for a rare + // scenario so we don't do it. + var version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + + advancePastRetentionPeriod() + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) + } + + test(s"drop table feature on table with data added before type change, " + + s"rowTrackingEnabled=$rowTrackingEnabled") { + sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") + addSingleFile(Seq(1, 2, 3), ByteType) + sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") + assert(getNumAddFilesSinceVersion(version = 0) === 1) + + var version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 1) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + + version = deltaLog.update().version + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3))) + } + + test(s"drop table feature on table with data added before type change and fully rewritten " + + s"after, rowTrackingEnabled=$rowTrackingEnabled") { + sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") + addSingleFile(Seq(1, 2, 3), ByteType) + sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") + sql(s"UPDATE delta.`$tempDir` SET a = a + 10") + assert(getNumAddFilesSinceVersion(version = 0) === 2) + + var version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + // The file was already rewritten in UPDATE. + assert(getNumAddFilesSinceVersion(version + 1) === 0) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + + version = deltaLog.update().version + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer(readDeltaTable(tempPath), Seq(Row(11), Row(12), Row(13))) + } + + test(s"drop table feature on table with data added before type change and partially " + + s"rewritten after, rowTrackingEnabled=$rowTrackingEnabled") { + withRowTrackingEnabled(rowTrackingEnabled) { + sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") + addSingleFile(Seq(1, 2, 3), ByteType) + addSingleFile(Seq(4, 5, 6), ByteType) + sql(s"ALTER TABLE delta.`$tempDir` CHANGE COLUMN a TYPE int") + assert(getNumAddFilesSinceVersion(version = 0) === 2) + sql(s"UPDATE delta.`$tempDir` SET a = a + 10 WHERE a < 4") + assert(getNumAddFilesSinceVersion(version = 0) === 3) + + var version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_CURRENT_VERSION_USES_FEATURE) + // One file was already rewritten in UPDATE, leaving 1 file to rewrite. + assert(getNumAddFilesSinceVersion(version + 1) === 1) + assert(!TypeWideningMetadata.containsTypeWideningMetadata(deltaLog.update().schema)) + + version = deltaLog.update().version + dropTableFeature(ExpectedOutcome.FAIL_HISTORICAL_VERSION_USES_FEATURE) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + + version = deltaLog.update().version + advancePastRetentionPeriod() + dropTableFeature(ExpectedOutcome.SUCCESS) + assert(getNumAddFilesSinceVersion(version + 1) === 0) + checkAnswer( + readDeltaTable(tempPath), + Seq(Row(11), Row(12), Row(13), Row(4), Row(5), Row(6))) + } + } + } }