From 3cf6cc6208bf4750e229f26a1cd853148bf61944 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Mon, 16 Dec 2024 10:51:41 -0800 Subject: [PATCH] [3.3][Kernel] Disable setting columnMapping/icebergCompatV2 for 3.3.0 release --- .../io/delta/kernel/internal/TableConfig.java | 14 +++++ .../kernel/internal/TableConfigSuite.scala | 7 +-- .../defaults/DeltaTableWritesSuite.scala | 51 +++++++++++++++---- 3 files changed, 58 insertions(+), 14 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java index 794c016d1ed..02eed32a050 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java @@ -254,6 +254,20 @@ public static Map validateProperties(Map configu String key = kv.getKey().toLowerCase(Locale.ROOT); String value = kv.getValue(); if (key.startsWith("delta.") && VALID_PROPERTIES.containsKey(key)) { + // Disable `columnMapping` and `icebergCompatV2` for Delta 3.3 release + // There are a couple of pieces missing from fully supporting these features + // 1. Data path PR is not yet merged (delta-io/delta#3475), waiting on the + // expression fixup which is going to affect how column mapping is done. + // 2. Auto enabling of the protocol version based on the current table version and + // column mapping/iceberg compat v2 enabled. + if ((key.equalsIgnoreCase(COLUMN_MAPPING_MODE.getKey()) + || key.equalsIgnoreCase(ICEBERG_COMPAT_V2_ENABLED.getKey())) + && + // Ignore the check if tests are running + !Boolean.getBoolean("ENABLE_COLUMN_MAPPING_TESTS")) { + throw DeltaErrors.unknownConfigurationException(kv.getKey()); + } + TableConfig tableConfig = VALID_PROPERTIES.get(key); if (tableConfig.editable) { tableConfig.validate(value); diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableConfigSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableConfigSuite.scala index 2dafb5b82a9..549b025b78e 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableConfigSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/TableConfigSuite.scala @@ -32,9 +32,10 @@ class TableConfigSuite extends AnyFunSuite { TableConfig.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.getKey -> "1", TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "{in-memory}", TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> "{\"1\": \"1\"}", - TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> "{\"1\": \"1\"}", - TableConfig.COLUMN_MAPPING_MODE.getKey -> "name", - TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true").asJava) + TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> "{\"1\": \"1\"}" + // TableConfig.COLUMN_MAPPING_MODE.getKey -> "name", + // TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true" + ).asJava) } test("check TableConfig.MAX_COLUMN_ID.editable is false") { diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala index 95f99af2836..aa3a5ac7131 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/DeltaTableWritesSuite.scala @@ -1005,7 +1005,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa txnBuilder.build(engine) } - test("create table with unsupported column mapping mode") { + withColumnMappingEnabled("create table with unsupported column mapping mode") { withTempDirAndEngine { (tablePath, engine) => val ex = intercept[InvalidConfigurationValueException] { createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty, @@ -1017,7 +1017,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } - test("create table with column mapping mode = none") { + withColumnMappingEnabled("create table with column mapping mode = none") { withTempDirAndEngine { (tablePath, engine) => createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty, tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "none")) @@ -1028,7 +1028,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } - test("cannot update table with unsupported column mapping mode") { + withColumnMappingEnabled("cannot update table with unsupported column mapping mode") { withTempDirAndEngine { (tablePath, engine) => val table = Table.forPath(engine, tablePath) createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty) @@ -1046,7 +1046,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } - test("cannot update table with unsupported column mapping mode change") { + withColumnMappingEnabled("cannot update table with unsupported column mapping mode change") { withTempDirAndEngine { (tablePath, engine) => val table = Table.forPath(engine, tablePath) createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty, @@ -1065,7 +1065,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } - test("cannot update column mapping mode from id to name on existing table") { + withColumnMappingEnabled("cannot update column mapping mode from id to name on existing table") { withTempDirAndEngine { (tablePath, engine) => val table = Table.forPath(engine, tablePath) val schema = new StructType() @@ -1093,7 +1093,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } - test("cannot update column mapping mode from name to id on existing table") { + withColumnMappingEnabled("cannot update column mapping mode from name to id on existing table") { withTempDirAndEngine { (tablePath, engine) => val table = Table.forPath(engine, tablePath) val schema = new StructType() @@ -1121,7 +1121,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } - test("cannot update column mapping mode from none to id on existing table") { + withColumnMappingEnabled("cannot update column mapping mode from none to id on existing table") { withTempDirAndEngine { (tablePath, engine) => val table = Table.forPath(engine, tablePath) val schema = new StructType() @@ -1156,7 +1156,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa // TODO } - test("new table with column mapping mode = name") { + withColumnMappingEnabled("new table with column mapping mode = name") { withTempDirAndEngine { (tablePath, engine) => val table = Table.forPath(engine, tablePath) val schema = new StructType() @@ -1173,7 +1173,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } - test("new table with column mapping mode = id") { + withColumnMappingEnabled("new table with column mapping mode = id") { withTempDirAndEngine { (tablePath, engine) => val table = Table.forPath(engine, tablePath) val schema = new StructType() @@ -1190,7 +1190,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } - test("can update existing table to column mapping mode = name") { + withColumnMappingEnabled("can update existing table to column mapping mode = name") { withTempDirAndEngine { (tablePath, engine) => val table = Table.forPath(engine, tablePath) val schema = new StructType() @@ -1216,7 +1216,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa } } - test("new table with column mapping mode = id and nested schema") { + withColumnMappingEnabled("new table with column mapping mode = id and nested schema") { withTempDirAndEngine { (tablePath, engine) => val table = Table.forPath(engine, tablePath) val schema = new StructType() @@ -1256,4 +1256,33 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa assert(meta.get(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY) == expPhyName) } } + + test("ensure column mapping is disabled - for Delta 3.3 only") { + Seq( + (TableConfig.COLUMN_MAPPING_MODE.getKey, "id"), + (TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey, "true") + ).foreach { case (key, value) => + withTempDirAndEngine { (tablePath, engine) => + val ex = intercept[UnknownConfigurationException] { + createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty, + tableProperties = Map(key -> value)) + .commit(engine, emptyIterable()) + } + assert(ex.getMessage.contains("Unknown configuration was specified:")) + } + } + } + + // helper method to run columnmapping tests + def withColumnMappingEnabled(testName: String)(fun: => Unit): Unit = { + test(testName) { + try { + // this property is needed to enable column mapping + System.setProperty("ENABLE_COLUMN_MAPPING_TESTS", "true") + fun + } finally { + System.clearProperty("ENABLE_COLUMN_MAPPING_TESTS") + } + } + } }