Skip to content

Commit

Permalink
[Spark]Column mapping removal basic rewrite operation (#2741)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
Implement basic rewrite command to rewrite a table with column mapping
enabled.


<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

## How was this patch tested?
New unit tests

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
  • Loading branch information
sabir-akhadov authored Mar 12, 2024
1 parent 3ae99ae commit 60914cd
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.sql.delta
// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.DeltaOperationMetrics.MetricsTransformer
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.constraints.Constraint
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils

Expand Down Expand Up @@ -135,6 +134,14 @@ object DeltaOperations {
}
override def changesData: Boolean = true
}

case class RemoveColumnMapping(
override val userMetadata: Option[String] = None) extends Operation("REMOVE COLUMN MAPPING") {
override def parameters: Map[String, Any] = Map()

override val operationMetrics: Set[String] = DeltaOperationMetrics.REMOVE_COLUMN_MAPPING
}

/** Recorded during streaming inserts. */
case class StreamingUpdate(
outputMode: OutputMode,
Expand Down Expand Up @@ -612,6 +619,14 @@ private[delta] object DeltaOperationMetrics {
"numOutputRows" // number of rows written
)

val REMOVE_COLUMN_MAPPING: Set[String] = Set(
"numRewrittenFiles",
"numOutputBytes",
"numRemovedBytes",
"numCopiedRows",
"numDeletionVectorsRemoved"
)

val STREAMING_UPDATE = Set(
"numAddedFiles", // number of files added
"numRemovedFiles", // number of files removed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ case class AlterTableSetPropertiesDeltaCommand(
if (disableColumnMapping && columnMappingRemovalAllowed) {
RemoveColumnMappingCommand(deltaLog, table.catalogTable)
.run(sparkSession, removeColumnMappingTableProperty = false)
// Not changing anything else, so we can return early.
if (configuration.size == 1) {
return Seq.empty[Row]
}
}
recordDeltaOperation(deltaLog, "delta.ddl.alter.setProperties") {
val txn = startTransaction()
Expand Down Expand Up @@ -187,6 +191,10 @@ case class AlterTableUnsetPropertiesDeltaCommand(
if (disableColumnMapping && columnMappingRemovalAllowed) {
RemoveColumnMappingCommand(deltaLog, table.catalogTable)
.run(sparkSession, removeColumnMappingTableProperty = true)
if (propKeys.size == 1) {
// Not unsetting anything else, so we can return early.
return Seq.empty[Row]
}
}
recordDeltaOperation(deltaLog, "delta.ddl.alter.unsetProperties") {
val txn = startTransaction()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package org.apache.spark.sql.delta.commands.columnmapping

import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.schema.{ImplicitMetadataOperation, SchemaUtils}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.types.StructType

Expand All @@ -39,21 +40,89 @@ class RemoveColumnMappingCommand(
* the table instead of setting it to 'none'
*/
def run(spark: SparkSession, removeColumnMappingTableProperty: Boolean): Unit = {
val schema = deltaLog.update().schema
verifySchemaFieldNames(schema)
deltaLog.withNewTransaction(catalogOpt) { txn =>
val originalFiles = txn.filterFiles()
val originalData = buildDataFrame(txn, originalFiles)
val originalSchema = txn.snapshot.schema
val newSchema = DeltaColumnMapping.dropColumnMappingMetadata(originalSchema)
verifySchemaFieldNames(newSchema)

updateMetadata(removeColumnMappingTableProperty, txn, newSchema)

val deltaOptions = getDeltaOptionsForWrite(spark)
val addedFiles = writeData(txn, originalData, deltaOptions)
val removeFileActions = originalFiles.map(_.removeWithTimestamp(dataChange = false))

txn.commit(removeFileActions ++ addedFiles,
DeltaOperations.RemoveColumnMapping()
)
}
}

/**
* Verify none of the schema fields contain invalid column names.
*/
protected def verifySchemaFieldNames(schema: StructType) = {
def verifySchemaFieldNames(schema: StructType): Unit = {
val invalidColumnNames =
SchemaUtils.findInvalidColumnNamesInSchema(schema)
if (invalidColumnNames.nonEmpty) {
throw DeltaErrors
.foundInvalidColumnNamesWhenRemovingColumnMapping(invalidColumnNames)
}
}

/**
* Update the metadata to remove the column mapping table properties and
* update the schema to remove the column mapping metadata.
*/
def updateMetadata(
removeColumnMappingTableProperty: Boolean,
txn: OptimisticTransaction,
newSchema: StructType): Unit = {
val newConfiguration =
getConfigurationWithoutColumnMapping(txn, removeColumnMappingTableProperty)
val newMetadata = txn.metadata.copy(
schemaString = newSchema.json,
configuration = newConfiguration)
txn.updateMetadata(newMetadata)
}

def getConfigurationWithoutColumnMapping(
txn: OptimisticTransaction,
removeColumnMappingTableProperty: Boolean): Map[String, String] = {
// Scanned schema does not include the column mapping metadata and can be reused as is.
val columnMappingPropertyKey = DeltaConfigs.COLUMN_MAPPING_MODE.key
val columnMappingMaxIdPropertyKey = DeltaConfigs.COLUMN_MAPPING_MAX_ID.key
// Unset or overwrite the column mapping mode to none and remove max id property
// while keeping other properties.
(if (removeColumnMappingTableProperty) {
txn.metadata.configuration - columnMappingPropertyKey
} else {
txn.metadata.configuration + (columnMappingPropertyKey -> "none")
}) - columnMappingMaxIdPropertyKey
}

def getDeltaOptionsForWrite(spark: SparkSession): DeltaOptions = {
new DeltaOptions(
// Prevent files from being split by writers.
Map(DeltaOptions.MAX_RECORDS_PER_FILE -> "0"),
spark.sessionState.conf)
}

def buildDataFrame(
txn: OptimisticTransaction,
originalFiles: Seq[AddFile]): DataFrame =
recordDeltaOperation(txn.deltaLog, "delta.removeColumnMapping.setupDataFrame") {
txn.deltaLog.createDataFrame(txn.snapshot, originalFiles)
}

def writeData(
txn: OptimisticTransaction,
data: DataFrame,
deltaOptions: DeltaOptions): Seq[AddFile] = {
txn.writeFiles(data, Some(deltaOptions), isOptimize = true, additionalConstraints = Seq.empty)
.asInstanceOf[Seq[AddFile]]
}
}

object RemoveColumnMappingCommand {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf._

import org.apache.spark.sql.catalyst.TableIdentifier

class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils {
/**
* Test removing column mapping from a table.
*/
class RemoveColumnMappingSuite
extends RemoveColumnMappingSuiteUtils
{

test("column mapping cannot be removed without the feature flag") {
withSQLConf(ALLOW_COLUMN_MAPPING_REMOVAL.key -> "false") {
Expand Down Expand Up @@ -119,4 +124,97 @@ class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils {
assert(deltaLog.update().metadata.configuration.contains(propertyToKeep))
assert(deltaLog.update().metadata.configuration.contains(propertyToUnset))
}

test("remove column mapping from a table") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testRemovingColumnMapping()
}

test("remove column mapping using unset") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testRemovingColumnMapping(unsetTableProperty = true)
}

test("remove column mapping from a partitioned table") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|PARTITIONED BY (part)
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn, id % 2 as part
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testRemovingColumnMapping()
}

test("remove column mapping from a partitioned table with two part columns") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|PARTITIONED BY (part1, part2)
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn, id % 2 as part1,
|id % 3 as part2
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
testRemovingColumnMapping()
}

test("remove column mapping from a table with only logical names") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
// Add column mapping without renaming any columns.
// That is, the column names in the table should be the same as the logical column names.
sql(
s"""ALTER TABLE $testTableName
|SET TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5'
|)""".stripMargin)
testRemovingColumnMapping()
}

test("dropped column is added back") {
sql(
s"""CREATE TABLE $testTableName
|USING delta
|TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none')
|AS SELECT id as $logicalColumnName, id + 1 as $secondColumn
| FROM RANGE(0, $totalRows, 1, $numFiles)
|""".stripMargin)
// Add column mapping without renaming any columns.
// That is, the column names in the table should be the same as the logical column names.
sql(
s"""ALTER TABLE $testTableName
|SET TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5'
|)""".stripMargin)
// Drop the second column.
sql(s"ALTER TABLE $testTableName DROP COLUMN $secondColumn")
// Remove column mapping, this should rewrite the table to physically remove the dropped column.
testRemovingColumnMapping()
// Add the same column back.
sql(s"ALTER TABLE $testTableName ADD COLUMN $secondColumn BIGINT")
// Read from the table, ensure none of the original values of secondColumn are present.
assert(sql(s"SELECT $secondColumn FROM $testTableName WHERE $secondColumn IS NOT NULL").count()
== 0)
}
}
Loading

0 comments on commit 60914cd

Please sign in to comment.