From 72fad38a3b003122ccdd88144cfc4f4110d9e154 Mon Sep 17 00:00:00 2001 From: sabir-akhadov <52208605+sabir-akhadov@users.noreply.github.com> Date: Mon, 18 Mar 2024 21:22:27 +0100 Subject: [PATCH] [Spark] Column mapping removal: support tables with deletion vectors, column constraints and generated columns. (#2753) ## Description Add additional tests for tables with deletion vectors, generated columns and column constraints for column mapping removal. ## How was this patch tested? New unit tests ## Does this PR introduce _any_ user-facing changes? No --- .../spark/sql/delta/DeltaColumnMapping.scala | 2 +- .../RemoveColumnMappingSuite.scala | 112 ++++++++++++++++-- .../RemoveColumnMappingSuiteUtils.scala | 19 +-- 3 files changed, 115 insertions(+), 18 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index 928b6500bfd..73cfac97721 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -96,7 +96,7 @@ trait DeltaColumnMappingBase extends DeltaLogging { // No change. (oldMode == newMode) || // Downgrade allowed with a flag. - (removalAllowed && (oldMode == NameMapping && newMode == NoMapping)) || + (removalAllowed && (oldMode != NoMapping && newMode == NoMapping)) || // Upgrade always allowed. (oldMode == NoMapping && newMode == NameMapping) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala index 3e5fd78421d..6aca88c004e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuite.scala @@ -16,20 +16,19 @@ package org.apache.spark.sql.delta.columnmapping -import org.apache.spark.sql.delta.DeltaAnalysisException -import org.apache.spark.sql.delta.DeltaColumnMappingUnsupportedException -import org.apache.spark.sql.delta.DeltaConfigs -import org.apache.spark.sql.delta.DeltaLog +import io.delta.tables.DeltaTable + +import org.apache.spark.sql.delta._ +import org.apache.spark.sql.delta.schema.DeltaInvariantViolationException import org.apache.spark.sql.delta.sources.DeltaSQLConf._ +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.TableIdentifier /** * Test removing column mapping from a table. */ -class RemoveColumnMappingSuite - extends RemoveColumnMappingSuiteUtils - { +class RemoveColumnMappingSuite extends RemoveColumnMappingSuiteUtils { test("column mapping cannot be removed without the feature flag") { withSQLConf(ALLOW_COLUMN_MAPPING_REMOVAL.key -> "false") { @@ -48,6 +47,16 @@ class RemoveColumnMappingSuite } } + test("table without column mapping enabled") { + sql(s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none') + |AS SELECT 1 as a + |""".stripMargin) + + unsetColumnMappingProperty(useUnset = true) + } + test("invalid column names") { val invalidColName1 = colName("col1") val invalidColName2 = colName("col2") @@ -58,7 +67,7 @@ class RemoveColumnMappingSuite |""".stripMargin) val e = intercept[DeltaAnalysisException] { // Try to remove column mapping. - sql(s"ALTER TABLE $testTableName SET TBLPROPERTIES ('delta.columnMapping.mode' = 'none')") + unsetColumnMappingProperty(useUnset = true) } assert(e.errorClass .contains("DELTA_INVALID_COLUMN_NAMES_WHEN_REMOVING_COLUMN_MAPPING")) @@ -217,4 +226,91 @@ class RemoveColumnMappingSuite assert(sql(s"SELECT $secondColumn FROM $testTableName WHERE $secondColumn IS NOT NULL").count() == 0) } + + test("remove column mapping from a table with deletion vectors") { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ( + | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name', + | '${DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key}' = true) + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + sql(s"DELETE FROM $testTableName WHERE $logicalColumnName % 2 = 0") + testRemovingColumnMapping() + } + + test("remove column mapping from a table with a generated column") { + // Note: generate expressions are using logical column names and renaming referenced columns + // is forbidden. + DeltaTable.create(spark) + .tableName(testTableName) + .addColumn(logicalColumnName, "LONG") + .addColumn( + DeltaTable.columnBuilder(secondColumn) + .dataType("LONG") + .generatedAlwaysAs(s"$logicalColumnName + 1") + .build()) + .property(DeltaConfigs.COLUMN_MAPPING_MODE.key, "name") + .execute() + // Insert data into the table. + spark.range(totalRows) + .selectExpr(s"id as $logicalColumnName") + .writeTo(testTableName) + .append() + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName)) + assert(GeneratedColumn.getGeneratedColumns(deltaLog.update()).head.name == secondColumn) + testRemovingColumnMapping() + // Verify the generated column is still there. + assert(GeneratedColumn.getGeneratedColumns(deltaLog.update()).head.name == secondColumn) + // Insert more rows. + spark.range(totalRows) + .selectExpr(s"id + $totalRows as $logicalColumnName") + .writeTo(testTableName) + .append() + // Verify the generated column values are correct. + checkAnswer(sql(s"SELECT $logicalColumnName, $secondColumn FROM $testTableName"), + (0 until totalRows * 2).map(i => Row(i, i + 1))) + } + + test("column constraints are preserved") { + // Note: constraints are using logical column names and renaming is forbidden until + // constraint is dropped. + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ( + | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + val constraintName = "secondcolumnaddone" + val constraintExpr = s"$secondColumn = $logicalColumnName + 1" + sql(s"ALTER TABLE $testTableName ADD CONSTRAINT " + + s"$constraintName CHECK ($constraintExpr)") + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName = testTableName)) + assert(deltaLog.update().metadata.configuration(s"delta.constraints.$constraintName") == + constraintExpr) + testRemovingColumnMapping() + // Verify the constraint is still there. + assert(deltaLog.update().metadata.configuration(s"delta.constraints.$constraintName") == + constraintExpr) + // Verify the constraint is still enforced. + intercept[DeltaInvariantViolationException] { + sql(s"INSERT INTO $testTableName VALUES (0, 0)") + } + } + + test("remove column mapping in id mode") { + sql( + s"""CREATE TABLE $testTableName + |USING delta + |TBLPROPERTIES ( + | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'id') + |AS SELECT id as $logicalColumnName, id + 1 as $secondColumn + | FROM RANGE(0, $totalRows, 1, $numFiles) + |""".stripMargin) + testRemovingColumnMapping() + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala index ff8cc2f3f61..4081c3608ad 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/columnmapping/RemoveColumnMappingSuiteUtils.scala @@ -23,7 +23,9 @@ import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf._ import com.fasterxml.jackson.databind.ObjectMapper +import org.apache.spark.sql.DataFrame import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.functions.col @@ -52,12 +54,9 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui import testImplicits._ - protected def testRemovingColumnMapping( - unsetTableProperty: Boolean = false): Any = { + protected def testRemovingColumnMapping(unsetTableProperty: Boolean = false): Any = { // Verify the input data is as expected. - checkAnswer( - spark.table(tableName = testTableName).select(logicalColumnName), - spark.range(totalRows).select(col("id").as(logicalColumnName))) + val originalData = spark.table(tableName = testTableName).select(logicalColumnName).collect() // Add a schema comment and verify it is preserved after the rewrite. val comment = "test comment" sql(s"ALTER TABLE $testTableName ALTER COLUMN $logicalColumnName COMMENT '$comment'") @@ -73,10 +72,11 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui unsetColumnMappingProperty(useUnset = unsetTableProperty) verifyRewrite( - unsetTableProperty, + unsetTableProperty = unsetTableProperty, deltaLog, originalFiles, - startingVersion) + startingVersion, + originalData = originalData) // Verify the schema comment is preserved after the rewrite. assert(deltaLog.update().schema.head.getComment().get == comment, "Should preserve the schema comment.") @@ -90,10 +90,11 @@ trait RemoveColumnMappingSuiteUtils extends QueryTest with DeltaColumnMappingSui unsetTableProperty: Boolean, deltaLog: DeltaLog, originalFiles: Array[AddFile], - startingVersion: Long): Unit = { + startingVersion: Long, + originalData: Array[Row]): Unit = { checkAnswer( spark.table(tableName = testTableName).select(logicalColumnName), - spark.range(totalRows).select(col("id").as(logicalColumnName))) + originalData) val newSnapshot = deltaLog.update() assert(newSnapshot.version - startingVersion == 1, "Should rewrite the table in one commit.")