Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Drop Type Widening Table Feature #2720

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,9 @@ private[delta] class ConflictChecker(
* to handle the row tracking feature being enabled by the winning transaction.
*/
private def reassignRowCommitVersions(): Unit = {
if (!RowTracking.isSupported(currentTransactionInfo.protocol)) {
if (!RowTracking.isSupported(currentTransactionInfo.protocol) &&
// Type widening relies on default row commit versions to be set.
!TypeWidening.isSupported(currentTransactionInfo.protocol)) {
johanl-db marked this conversation as resolved.
Show resolved Hide resolved
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ object DefaultRowCommitVersion {
protocol: Protocol,
actions: Iterator[Action],
version: Long): Iterator[Action] = {
if (!RowTracking.isSupported(protocol)) {
// Type Widening relies on default row commit versions to be set.
if (!RowTracking.isSupported(protocol) && !TypeWidening.isSupported(protocol)) {
return actions
}
actions.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package org.apache.spark.sql.delta
import java.util.concurrent.TimeUnit

import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand}
import org.apache.spark.sql.delta.commands.{AlterTableSetPropertiesDeltaCommand, AlterTableUnsetPropertiesDeltaCommand, DeltaReorgTableCommand, DeltaReorgTableMode, DeltaReorgTableSpec}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}

import org.apache.spark.sql.catalyst.analysis.ResolvedTable

/**
* A base class for implementing a preparation command for removing table features.
* Must implement a run method. Note, the run method must be implemented in a way that when
Expand Down Expand Up @@ -126,3 +128,88 @@ case class V2CheckpointPreDowngradeCommand(table: DeltaTableV2)
true
}
}

case class TypeWideningPreDowngradeCommand(table: DeltaTableV2)
extends PreDowngradeTableFeatureCommand
with DeltaLogging {

/**
* Unset the type widening table property to prevent new type changes to be applied to the table,
* then removes traces of the feature:
* - Rewrite files that have columns or fields with a different type than in the current table
* schema. These are all files not added or modified after the last type change.
* - Remove the type widening metadata attached to fields in the current table schema.
*
* @return Return true if files were rewritten or metadata was removed. False otherwise.
*/
override def removeFeatureTracesIfNeeded(): Boolean = {
if (TypeWideningTableFeature.validateRemoval(table.initialSnapshot)) return false

val startTimeNs = System.nanoTime()
val properties = Seq(DeltaConfigs.ENABLE_TYPE_WIDENING.key)
AlterTableUnsetPropertiesDeltaCommand(table, properties, ifExists = true).run(table.spark)
johanl-db marked this conversation as resolved.
Show resolved Hide resolved
val numFilesRewritten = rewriteFilesIfNeeded()
val metadataRemoved = removeMetadataIfNeeded()

recordDeltaEvent(
table.deltaLog,
opType = "delta.typeWideningFeatureRemovalMetrics",
data = Map(
"downgradeTimeMs" -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNs),
"numFilesRewritten" -> numFilesRewritten,
"metadataRemoved" -> metadataRemoved
)
)
numFilesRewritten > 0 || metadataRemoved
}

/**
* Rewrite files that have columns or fields with a different type than in the current table
* schema. These are all files not added or modified after the last type change.
* @return Return the number of files rewritten.
*/
private def rewriteFilesIfNeeded(): Long = {
val numFilesToRewrite = TypeWidening.numFilesRequiringRewrite(table.initialSnapshot)
if (numFilesToRewrite == 0L) return 0L

// Get the table Id and catalog from the delta table to build a ResolvedTable plan for the reorg
// command.
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val tableId = table.spark
.sessionState
.sqlParser
.parseTableIdentifier(table.name).nameParts.asIdentifier
val catalog = table.spark.sessionState.catalogManager.currentCatalog.asTableCatalog

val reorg = DeltaReorgTableCommand(
ResolvedTable.create(
catalog,
tableId,
table
),
DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None)
)(Nil)

reorg.run(table.spark)
numFilesToRewrite
}

/**
* Remove the type widening metadata attached to fields in the current table schema.
* @return Return true if any metadata was removed. False otherwise.
*/
private def removeMetadataIfNeeded(): Boolean = {
if (!TypeWideningMetadata.containsTypeWideningMetadata(table.initialSnapshot.schema)) {
return false
}

val txn = table.startTransaction()
val metadata = txn.metadata
val (cleanedSchema, changes) =
TypeWideningMetadata.removeTypeWideningMetadata(metadata.schema)
txn.commit(
metadata.copy(schemaString = cleanedSchema.json) :: Nil,
DeltaOperations.UpdateColumnMetadata("DROP FEATURE", changes))
true
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,8 @@ object ManagedCommitTableFeature
}

object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening-dev")
with FeatureAutomaticallyEnabledByMetadata {
with FeatureAutomaticallyEnabledByMetadata
with RemovableFeature {
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

private def isTypeWideningSupportNeededByMetadata(metadata: Metadata): Boolean =
Expand All @@ -638,6 +639,19 @@ object TypeWideningTableFeature extends ReaderWriterFeature(name = "typeWidening
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = isTypeWideningSupportNeededByMetadata(metadata)

override def validateRemoval(snapshot: Snapshot): Boolean =
!isTypeWideningSupportNeededByMetadata(snapshot.metadata) &&
!TypeWideningMetadata.containsTypeWideningMetadata(snapshot.metadata.schema)

override def actionUsesFeature(action: Action): Boolean =
action match {
case m: Metadata => TypeWideningMetadata.containsTypeWideningMetadata(m.schema)
case _ => false
}

override def preDowngradeCommand(table: DeltaTableV2): PreDowngradeTableFeatureCommand =
TypeWideningPreDowngradeCommand(table)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.actions.{Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, Protocol, TableFeatureProtocolUtils}

import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.functions.{col, lit}
import org.apache.spark.sql.types._

object TypeWidening {
Expand Down Expand Up @@ -60,4 +61,33 @@ object TypeWidening {
case (ByteType | ShortType, IntegerType) => true
case _ => false
}

/**
* Filter the given list of files to only keep files that were written before the latest type
* change, if any. These older files contain a column or field with a type that is different than
* in the current table schema and must be rewritten when dropping the type widening table feature
* to make the table readable by readers that don't support the feature.
*/
def filterFilesRequiringRewrite(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
TypeWideningMetadata.getLatestTypeChangeVersion(snapshot.metadata.schema) match {
case Some(latestVersion) =>
files.filter(_.defaultRowCommitVersion match {
case Some(version) => version < latestVersion
// Files written before the type widening table feature was added to the table don't
// have a defaultRowCommitVersion. That does mean they were written before the latest
// type change.
case None => true
})
case None =>
Seq.empty
}


/**
* Return the number of files that were written before the latest type change and that then
* contain a column or field with a type that is different from the current able schema.
*/
def numFilesRequiringRewrite(snapshot: Snapshot): Long = {
filterFilesRequiringRewrite(snapshot, snapshot.allFiles.collect()).size
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import scala.collection.mutable

import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}
import org.apache.spark.sql.util.ScalaExtensions._

import org.apache.spark.sql.types._
Expand Down Expand Up @@ -156,12 +158,12 @@ private[delta] object TypeWideningMetadata {
typeChange.copy(fieldPath = "element" +: typeChange.fieldPath)
}
case (fromType: AtomicType, toType: AtomicType) if fromType != toType =>
Seq(TypeChange(
version,
fromType,
toType,
fieldPath = Seq.empty
))
Seq(TypeChange(
version,
fromType,
toType,
fieldPath = Seq.empty
))
case (_: AtomicType, _: AtomicType) => Seq.empty
// Don't recurse inside structs, `collectTypeChanges` should be called directly on each struct
// fields instead to only collect type changes inside these fields.
Expand Down Expand Up @@ -192,4 +194,49 @@ private[delta] object TypeWideningMetadata {
case None => field
}
}

/**
* Remove the type widening metadata from all the fields in the given schema.
* Return the cleaned schema and a list of fields with their path that had type widening metadata.
*/
def removeTypeWideningMetadata(schema: StructType)
: (StructType, Seq[(Seq[String], StructField)]) = {
if (!containsTypeWideningMetadata(schema)) return (schema, Seq.empty)

val changes = mutable.Buffer.empty[(Seq[String], StructField)]
val newSchema = SchemaMergingUtils.transformColumns(schema) {
case (fieldPath: Seq[String], field: StructField, _)
if field.metadata.contains(TYPE_CHANGES_METADATA_KEY) =>
changes.append((fieldPath, field))
val cleanMetadata = new MetadataBuilder()
.withMetadata(field.metadata)
.remove(TYPE_CHANGES_METADATA_KEY)
.build()
field.copy(metadata = cleanMetadata)
case (_, field: StructField, _) => field
}
newSchema -> changes.toSeq
}

/** Recursively checks whether any struct field in the schema contains type widening metadata. */
def containsTypeWideningMetadata(schema: StructType): Boolean =
schema.existsRecursively {
case s: StructType => s.exists(_.metadata.contains(TYPE_CHANGES_METADATA_KEY))
case _ => false
}

/** Return the version of the latest type change recorded in the schema metadata */
def getLatestTypeChangeVersion(schema: StructType): Option[Long] = {
val allStructFields = SchemaUtils.filterRecursively(schema, checkComplexTypes = true) {
_ => true
}.map(_._2)

// Collect all type change versions from all struct fields.
val versions = allStructFields
.flatMap(TypeWideningMetadata.fromField)
.flatMap(_.typeChanges)
.map(_.version)

if (versions.nonEmpty) Some(versions.max) else None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package org.apache.spark.sql.delta.commands

import org.apache.spark.sql.delta.{Snapshot, TypeWidening}
import org.apache.spark.sql.delta.actions.AddFile

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LeafCommand, LogicalPlan, UnaryCommand}

object DeltaReorgTableMode extends Enumeration {
val PURGE, UNIFORM_ICEBERG = Value
val PURGE, UNIFORM_ICEBERG, REWRITE_TYPE_WIDENING = Value
}

case class DeltaReorgTableSpec(
Expand Down Expand Up @@ -70,7 +71,8 @@ case class DeltaReorgTableCommand(
}

override def run(sparkSession: SparkSession): Seq[Row] = reorgTableSpec match {
case DeltaReorgTableSpec(DeltaReorgTableMode.PURGE, None) =>
case DeltaReorgTableSpec(
DeltaReorgTableMode.PURGE | DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) =>
optimizeByReorg(sparkSession)
case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) =>
val table = getDeltaTable(target, "REORG")
Expand All @@ -82,6 +84,8 @@ case class DeltaReorgTableCommand(
new DeltaPurgeOperation()
case DeltaReorgTableSpec(DeltaReorgTableMode.UNIFORM_ICEBERG, Some(icebergCompatVersion)) =>
new DeltaUpgradeUniformOperation(icebergCompatVersion)
case DeltaReorgTableSpec(DeltaReorgTableMode.REWRITE_TYPE_WIDENING, None) =>
new DeltaRewriteTypeWideningOperation()
}
}

Expand All @@ -93,14 +97,14 @@ sealed trait DeltaReorgOperation {
* Collects files that need to be processed by the reorg operation from the list of candidate
* files.
*/
def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile]
def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile]
}

/**
* Reorg operation to purge files with soft deleted rows.
*/
class DeltaPurgeOperation extends DeltaReorgOperation {
override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] =
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
files.filter { file =>
(file.deletionVector != null && file.numPhysicalRecords.isEmpty) ||
file.numDeletedRecords > 0L
Expand All @@ -111,7 +115,7 @@ class DeltaPurgeOperation extends DeltaReorgOperation {
* Reorg operation to upgrade the iceberg compatibility version of a table.
*/
class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorgOperation {
override def filterFilesToReorg(files: Seq[AddFile]): Seq[AddFile] = {
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] = {
def shouldRewriteToBeIcebergCompatible(file: AddFile): Boolean = {
if (file.tags == null) return true
val icebergCompatVersion = file.tags.getOrElse(AddFile.Tags.ICEBERG_COMPAT_VERSION.name, "0")
Expand All @@ -120,3 +124,12 @@ class DeltaUpgradeUniformOperation(icebergCompatVersion: Int) extends DeltaReorg
files.filter(shouldRewriteToBeIcebergCompatible)
}
}

/**
* Internal reorg operation to rewrite files to conform to the current table schema when dropping
* the type widening table feature.
*/
class DeltaRewriteTypeWideningOperation extends DeltaReorgOperation {
override def filterFilesToReorg(snapshot: Snapshot, files: Seq[AddFile]): Seq[AddFile] =
TypeWidening.filterFilesRequiringRewrite(snapshot, files)
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class OptimizeExecutor(
val partitionSchema = txn.metadata.partitionSchema

val filesToProcess = optimizeContext.reorg match {
case Some(reorgOperation) => reorgOperation.filterFilesToReorg(candidateFiles)
case Some(reorgOperation) => reorgOperation.filterFilesToReorg(txn.snapshot, candidateFiles)
case None => filterCandidateFileList(minFileSize, maxDeletedRowsRatio, candidateFiles)
}
val partitionsToCompact = filesToProcess.groupBy(_.partitionValues).toSeq
Expand Down
Loading
Loading