Skip to content

Commit

Permalink
[Spark] Deprecate tableVersion field in type widening metadata (#3334)
Browse files Browse the repository at this point in the history
## Description
The protocol specification for type widening dropped the requirement to
populate a `tableVersion` field as part of type change history to track
in which version of the table a given change was applied.
See protocol update: #3297
This field was used at some point during the preview but isn't needed
anymore.

It is now deprecated:
- The field is preserved in table metadata that already contains it.
- The field isn't set anymore when the stable table feature is active on
the table.
- The field is still set when only the preview table feature is active
on the table.

The last point is necessary to avoid breaking preview clients (Delta 3.2
& Delta 4.0 preview) that require the field to be set.

## How was this patch tested?
- Updated existing metadata tests to cover `tableVersion` not being set
by default.
- Added metadata tests to explicitly cover `tableVersion` being set.
- Added tests covering `tableVersion` when using the preview and stable
table features.

## Does this PR introduce _any_ user-facing changes?
Yes. As of this change, a table that supports the stable table feature
`typeWidening` won't have a `tableVersion` field in the type change
history stored in the table metadata.

Tables that only support the preview table feature
`typeWidening-preview` don't see any change.
  • Loading branch information
johanl-db authored Jul 9, 2024
1 parent 0e5b856 commit 70bfe82
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import org.apache.spark.sql.types._

/**
* Information corresponding to a single type change.
* @param version The version of the table where the type change was made.
* @param version (Deprecated) The version of the table where the type change was made. This is
* only populated by clients using the preview of type widening.
* @param fromType The original type before the type change.
* @param toType The new type after the type change.
* @param fieldPath The path inside nested maps and arrays to the field where the type change was
* made. Each path element is either `key`/`value` for maps or `element` for
* arrays. The path is empty if the type change was applied inside a map or array.
*/
private[delta] case class TypeChange(
version: Long,
version: Option[Long],
fromType: DataType,
toType: DataType,
fieldPath: Seq[String]) {
Expand All @@ -43,7 +44,8 @@ private[delta] case class TypeChange(
/** Serialize this type change to a [[Metadata]] object. */
def toMetadata: Metadata = {
val builder = new MetadataBuilder()
.putLong(TABLE_VERSION_METADATA_KEY, version)
version.foreach(builder.putLong(TABLE_VERSION_METADATA_KEY, _))
builder
.putString(FROM_TYPE_METADATA_KEY, fromType.typeName)
.putString(TO_TYPE_METADATA_KEY, toType.typeName)
if (fieldPath.nonEmpty) {
Expand All @@ -54,6 +56,9 @@ private[delta] case class TypeChange(
}

private[delta] object TypeChange {
// tableVersion was a field present during the preview and removed afterwards. We preserve it if
// it's already present in the type change metadata of the table to avoid breaking older clients
// that use it to decide which files must be rewritten when dropping the feature.
val TABLE_VERSION_METADATA_KEY: String = "tableVersion"
val FROM_TYPE_METADATA_KEY: String = "fromType"
val TO_TYPE_METADATA_KEY: String = "toType"
Expand All @@ -66,8 +71,13 @@ private[delta] object TypeChange {
} else {
Seq.empty
}
val version = if (metadata.contains(TABLE_VERSION_METADATA_KEY)) {
Some(metadata.getLong(TABLE_VERSION_METADATA_KEY))
} else {
None
}
TypeChange(
version = metadata.getLong(TABLE_VERSION_METADATA_KEY),
version,
fromType = DataType.fromDDL(metadata.getString(FROM_TYPE_METADATA_KEY)),
toType = DataType.fromDDL(metadata.getString(TO_TYPE_METADATA_KEY)),
fieldPath
Expand Down Expand Up @@ -130,11 +140,15 @@ private[delta] object TypeWideningMetadata extends DeltaLogging {
val changesToRecord = mutable.Buffer.empty[TypeChange]
val schemaWithMetadata = SchemaMergingUtils.transformColumns(schema, oldSchema) {
case (_, newField, Some(oldField), _) =>
// Record the version the transaction will attempt to use in the type change metadata. If
// there's a conflict with another transaction, the version in the metadata will be updated
// during conflict resolution. See [[ConflictChecker.updateTypeWideningMetadata()]].
val typeChanges =
collectTypeChanges(oldField.dataType, newField.dataType, txn.getFirstAttemptVersion)
var typeChanges = collectTypeChanges(oldField.dataType, newField.dataType)
// The version field isn't used anymore but we need to populate it in case the table uses
// the preview feature, as preview clients may then rely on the field being present.
if (txn.protocol.isFeatureSupported(TypeWideningPreviewTableFeature)) {
typeChanges = typeChanges.map { change =>
change.copy(version = Some(txn.getFirstAttemptVersion))
}
}

changesToRecord ++= typeChanges
TypeWideningMetadata(typeChanges).appendToField(newField)
case (_, newField, None, _) =>
Expand All @@ -159,24 +173,24 @@ private[delta] object TypeWideningMetadata extends DeltaLogging {

/**
* Recursively collect primitive type changes inside nested maps and arrays between `fromType` and
* `toType`. The `version` is the version of the table where the type change was made.
* `toType`.
*/
private def collectTypeChanges(fromType: DataType, toType: DataType, version: Long)
private def collectTypeChanges(fromType: DataType, toType: DataType)
: Seq[TypeChange] = (fromType, toType) match {
case (from: MapType, to: MapType) =>
collectTypeChanges(from.keyType, to.keyType, version).map { typeChange =>
collectTypeChanges(from.keyType, to.keyType).map { typeChange =>
typeChange.copy(fieldPath = "key" +: typeChange.fieldPath)
} ++
collectTypeChanges(from.valueType, to.valueType, version).map { typeChange =>
collectTypeChanges(from.valueType, to.valueType).map { typeChange =>
typeChange.copy(fieldPath = "value" +: typeChange.fieldPath)
}
case (from: ArrayType, to: ArrayType) =>
collectTypeChanges(from.elementType, to.elementType, version).map { typeChange =>
collectTypeChanges(from.elementType, to.elementType).map { typeChange =>
typeChange.copy(fieldPath = "element" +: typeChange.fieldPath)
}
case (fromType: AtomicType, toType: AtomicType) if fromType != toType =>
Seq(TypeChange(
version,
version = None,
fromType,
toType,
fieldPath = Seq.empty
Expand All @@ -190,15 +204,19 @@ private[delta] object TypeWideningMetadata extends DeltaLogging {
/**
* Change the `tableVersion` value in the type change metadata present in `schema`. Used during
* conflict resolution to update the version associated with the transaction is incremented.
*
* Note: The `tableVersion` field is only populated for tables that use the preview of type
* widening, we could remove this if/when there are no more tables using the preview of the
* feature.
*/
def updateTypeChangeVersion(schema: StructType, fromVersion: Long, toVersion: Long): StructType =
SchemaMergingUtils.transformColumns(schema) {
case (_, field, _) =>
fromField(field) match {
case Some(typeWideningMetadata) =>
val updatedTypeChanges = typeWideningMetadata.typeChanges.map {
case typeChange if typeChange.version == fromVersion =>
typeChange.copy(version = toVersion)
case typeChange if typeChange.version.contains(fromVersion) =>
typeChange.copy(version = Some(toVersion))
case olderTypeChange => olderTypeChange
}
val newMetadata = new MetadataBuilder().withMetadata(field.metadata)
Expand Down Expand Up @@ -262,19 +280,4 @@ private[delta] object TypeWideningMetadata extends DeltaLogging {
getTypeChanges(field).map((fieldPath :+ field.name, _))
}
}

/** Return the version of the latest type change recorded in the schema metadata */
def getLatestTypeChangeVersion(schema: StructType): Option[Long] = {
val allStructFields = SchemaUtils.filterRecursively(schema, checkComplexTypes = true) {
_ => true
}.map(_._2)

// Collect all type change versions from all struct fields.
val versions = allStructFields
.flatMap(TypeWideningMetadata.fromField)
.flatMap(_.typeChanges)
.map(_.version)

if (versions.nonEmpty) Some(versions.max) else None
}
}
Loading

0 comments on commit 70bfe82

Please sign in to comment.