Skip to content

Commit

Permalink
[3.3][Kernel] Disable setting columnMapping/icebergCompatV2 for 3.3.0…
Browse files Browse the repository at this point in the history
… release
  • Loading branch information
vkorukanti committed Dec 16, 2024
1 parent 9c967b9 commit 3cf6cc6
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,20 @@ public static Map<String, String> validateProperties(Map<String, String> configu
String key = kv.getKey().toLowerCase(Locale.ROOT);
String value = kv.getValue();
if (key.startsWith("delta.") && VALID_PROPERTIES.containsKey(key)) {
// Disable `columnMapping` and `icebergCompatV2` for Delta 3.3 release
// There are a couple of pieces missing from fully supporting these features
// 1. Data path PR is not yet merged (delta-io/delta#3475), waiting on the
// expression fixup which is going to affect how column mapping is done.
// 2. Auto enabling of the protocol version based on the current table version and
// column mapping/iceberg compat v2 enabled.
if ((key.equalsIgnoreCase(COLUMN_MAPPING_MODE.getKey())
|| key.equalsIgnoreCase(ICEBERG_COMPAT_V2_ENABLED.getKey()))
&&
// Ignore the check if tests are running
!Boolean.getBoolean("ENABLE_COLUMN_MAPPING_TESTS")) {
throw DeltaErrors.unknownConfigurationException(kv.getKey());
}

TableConfig<?> tableConfig = VALID_PROPERTIES.get(key);
if (tableConfig.editable) {
tableConfig.validate(value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ class TableConfigSuite extends AnyFunSuite {
TableConfig.IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.getKey -> "1",
TableConfig.COORDINATED_COMMITS_COORDINATOR_NAME.getKey -> "{in-memory}",
TableConfig.COORDINATED_COMMITS_COORDINATOR_CONF.getKey -> "{\"1\": \"1\"}",
TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> "{\"1\": \"1\"}",
TableConfig.COLUMN_MAPPING_MODE.getKey -> "name",
TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true").asJava)
TableConfig.COORDINATED_COMMITS_TABLE_CONF.getKey -> "{\"1\": \"1\"}"
// TableConfig.COLUMN_MAPPING_MODE.getKey -> "name",
// TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey -> "true"
).asJava)
}

test("check TableConfig.MAX_COLUMN_ID.editable is false") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
txnBuilder.build(engine)
}

test("create table with unsupported column mapping mode") {
withColumnMappingEnabled("create table with unsupported column mapping mode") {
withTempDirAndEngine { (tablePath, engine) =>
val ex = intercept[InvalidConfigurationValueException] {
createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty,
Expand All @@ -1017,7 +1017,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("create table with column mapping mode = none") {
withColumnMappingEnabled("create table with column mapping mode = none") {
withTempDirAndEngine { (tablePath, engine) =>
createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty,
tableProperties = Map(TableConfig.COLUMN_MAPPING_MODE.getKey -> "none"))
Expand All @@ -1028,7 +1028,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("cannot update table with unsupported column mapping mode") {
withColumnMappingEnabled("cannot update table with unsupported column mapping mode") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
createTxn(engine, tablePath, isNewTable = true, testSchema, Seq.empty)
Expand All @@ -1046,7 +1046,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("cannot update table with unsupported column mapping mode change") {
withColumnMappingEnabled("cannot update table with unsupported column mapping mode change") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty,
Expand All @@ -1065,7 +1065,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("cannot update column mapping mode from id to name on existing table") {
withColumnMappingEnabled("cannot update column mapping mode from id to name on existing table") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand Down Expand Up @@ -1093,7 +1093,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("cannot update column mapping mode from name to id on existing table") {
withColumnMappingEnabled("cannot update column mapping mode from name to id on existing table") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand Down Expand Up @@ -1121,7 +1121,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("cannot update column mapping mode from none to id on existing table") {
withColumnMappingEnabled("cannot update column mapping mode from none to id on existing table") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand Down Expand Up @@ -1156,7 +1156,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
// TODO
}

test("new table with column mapping mode = name") {
withColumnMappingEnabled("new table with column mapping mode = name") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand All @@ -1173,7 +1173,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("new table with column mapping mode = id") {
withColumnMappingEnabled("new table with column mapping mode = id") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand All @@ -1190,7 +1190,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("can update existing table to column mapping mode = name") {
withColumnMappingEnabled("can update existing table to column mapping mode = name") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand All @@ -1216,7 +1216,7 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("new table with column mapping mode = id and nested schema") {
withColumnMappingEnabled("new table with column mapping mode = id and nested schema") {
withTempDirAndEngine { (tablePath, engine) =>
val table = Table.forPath(engine, tablePath)
val schema = new StructType()
Expand Down Expand Up @@ -1256,4 +1256,33 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
assert(meta.get(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY) == expPhyName)
}
}

test("ensure column mapping is disabled - for Delta 3.3 only") {
Seq(
(TableConfig.COLUMN_MAPPING_MODE.getKey, "id"),
(TableConfig.ICEBERG_COMPAT_V2_ENABLED.getKey, "true")
).foreach { case (key, value) =>
withTempDirAndEngine { (tablePath, engine) =>
val ex = intercept[UnknownConfigurationException] {
createTxn(engine, tablePath, isNewTable = true, testSchema, partCols = Seq.empty,
tableProperties = Map(key -> value))
.commit(engine, emptyIterable())
}
assert(ex.getMessage.contains("Unknown configuration was specified:"))
}
}
}

// helper method to run columnmapping tests
def withColumnMappingEnabled(testName: String)(fun: => Unit): Unit = {
test(testName) {
try {
// this property is needed to enable column mapping
System.setProperty("ENABLE_COLUMN_MAPPING_TESTS", "true")
fun
} finally {
System.clearProperty("ENABLE_COLUMN_MAPPING_TESTS")
}
}
}
}

0 comments on commit 3cf6cc6

Please sign in to comment.