From 268d45580779d439dcaeba9c0efb3664f7a49c2e Mon Sep 17 00:00:00 2001 From: geruh Date: Thu, 23 Oct 2025 01:16:07 -0700 Subject: [PATCH 1/7] Spark: Add schema conversion support for default values --- .../apache/iceberg/spark/TypeToSparkType.java | 18 +- .../iceberg/spark/TestSparkSchemaUtil.java | 159 ++++++++++++++++++ .../spark/sql/TestSparkDefaultValues.java | 132 +++++++++++++++ 3 files changed, 308 insertions(+), 1 deletion(-) create mode 100644 spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index cbee10fbb069..5d0346d20f3f 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -25,6 +25,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.expressions.Literal$; import org.apache.spark.sql.types.ArrayType$; import org.apache.spark.sql.types.BinaryType$; import org.apache.spark.sql.types.BooleanType$; @@ -69,9 +70,24 @@ public DataType struct(Types.StructType struct, List fieldResults) { if (field.doc() != null) { sparkField = sparkField.withComment(field.doc()); } + if (field.writeDefault() != null) { + // Convert Iceberg default value to Spark SQL string representation. Spark stores default + // values as SQL strings in column metadata. The .sql() method formats literals correctly + // for each type + Object writeDefault = SparkUtil.internalToSpark(field.type(), field.writeDefault()); + sparkField = + sparkField.withCurrentDefaultValue(Literal$.MODULE$.create(writeDefault, type).sql()); + } + if (field.initialDefault() != null) { + // Same conversion for existence default values, used for existing rows when column is added + // to schema + Object initialDefault = SparkUtil.internalToSpark(field.type(), field.initialDefault()); + sparkField = + sparkField.withExistenceDefaultValue( + Literal$.MODULE$.create(initialDefault, type).sql()); + } sparkFields.add(sparkField); } - return StructType$.MODULE$.apply(sparkFields); } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java index 4045847d5a4a..d24e5cd38912 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java @@ -21,17 +21,31 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.List; +import java.util.stream.Stream; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.MetadataAttribute; import org.apache.spark.sql.catalyst.types.DataTypeUtils; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public class TestSparkSchemaUtil { + + private static final String CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"; + private static final String EXISTS_DEFAULT_COLUMN_METADATA_KEY = "EXISTS_DEFAULT"; + private static final Schema TEST_SCHEMA = new Schema( optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get())); @@ -80,4 +94,149 @@ public void testSchemaConversionWithMetaDataColumnSchema() { } } } + + @Test + public void testSchemaConversionWithOnlyWriteDefault() { + Schema schema = + new Schema( + Types.NestedField.optional("field") + .withId(1) + .ofType(Types.StringType.get()) + .withWriteDefault(Literal.of("write_only")) + .build()); + + StructType sparkSchema = SparkSchemaUtil.convert(schema); + Metadata metadata = sparkSchema.fields()[0].metadata(); + + assertThat(metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + .as("Field with only write default should have CURRENT_DEFAULT metadata") + .isTrue(); + assertThat(metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + .as("Field with only write default should not have EXISTS_DEFAULT metadata") + .isFalse(); + assertThat(metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + .as("Spark metadata CURRENT_DEFAULT should contain correctly formatted literal") + .isEqualTo("'write_only'"); + } + + @Test + public void testSchemaConversionWithOnlyInitialDefault() { + Schema schema = + new Schema( + Types.NestedField.optional("field") + .withId(1) + .ofType(Types.IntegerType.get()) + .withInitialDefault(Literal.of(42)) + .build()); + + StructType sparkSchema = SparkSchemaUtil.convert(schema); + Metadata metadata = sparkSchema.fields()[0].metadata(); + + assertThat(metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + .as("Field with only initial default should not have CURRENT_DEFAULT metadata") + .isFalse(); + assertThat(metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + .as("Field with only initial default should have EXISTS_DEFAULT metadata") + .isTrue(); + assertThat(metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + .as("Spark metadata EXISTS_DEFAULT should contain correctly formatted literal") + .isEqualTo("42"); + } + + @ParameterizedTest(name = "{0} with writeDefault={1}, initialDefault={2}") + @MethodSource("schemaConversionWithDefaultsTestCases") + public void testSchemaConversionWithDefaultsForPrimitiveTypes( + Type type, + Literal writeDefault, + Literal initialDefault, + String expectedCurrentDefaultValue, + String expectedExistsDefaultValue) { + Schema schema = + new Schema( + Types.NestedField.optional("field") + .withId(1) + .ofType(type) + .withWriteDefault(writeDefault) + .withInitialDefault(initialDefault) + .build()); + + StructType sparkSchema = SparkSchemaUtil.convert(schema); + StructField defaultField = sparkSchema.fields()[0]; + Metadata metadata = defaultField.metadata(); + + assertThat(metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + .as("Field of type %s should have CURRENT_DEFAULT metadata", type) + .isTrue(); + assertThat(metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + .as("Field of type %s should have EXISTS_DEFAULT metadata", type) + .isTrue(); + assertThat(metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + .as( + "Spark metadata CURRENT_DEFAULT for type %s should contain correctly formatted literal", + type) + .isEqualTo(expectedCurrentDefaultValue); + assertThat(metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + .as( + "Spark metadata EXISTS_DEFAULT for type %s should contain correctly formatted literal", + type) + .isEqualTo(expectedExistsDefaultValue); + } + + private static Stream schemaConversionWithDefaultsTestCases() { + return Stream.of( + Arguments.of(Types.IntegerType.get(), Literal.of(1), Literal.of(2), "1", "2"), + Arguments.of( + Types.StringType.get(), + Literal.of("write_default"), + Literal.of("initial_default"), + "'write_default'", + "'initial_default'"), + Arguments.of( + Types.UUIDType.get(), + Literal.of("f79c3e09-677c-4bbd-a479-3f349cb785e7").to(Types.UUIDType.get()), + Literal.of("f79c3e09-677c-4bbd-a479-3f349cb685e7").to(Types.UUIDType.get()), + "'f79c3e09-677c-4bbd-a479-3f349cb785e7'", + "'f79c3e09-677c-4bbd-a479-3f349cb685e7'"), + Arguments.of(Types.BooleanType.get(), Literal.of(true), Literal.of(false), "true", "false"), + Arguments.of(Types.IntegerType.get(), Literal.of(42), Literal.of(10), "42", "10"), + Arguments.of(Types.LongType.get(), Literal.of(100L), Literal.of(50L), "100L", "50L"), + Arguments.of( + Types.FloatType.get(), + Literal.of(3.14f), + Literal.of(1.5f), + "CAST('3.14' AS FLOAT)", + "CAST('1.5' AS FLOAT)"), + Arguments.of( + Types.DoubleType.get(), Literal.of(2.718), Literal.of(1.414), "2.718D", "1.414D"), + Arguments.of( + Types.DecimalType.of(10, 2), + Literal.of(new BigDecimal("99.99")), + Literal.of(new BigDecimal("11.11")), + "99.99BD", + "11.11BD"), + Arguments.of( + Types.DateType.get(), + Literal.of("2024-01-01").to(Types.DateType.get()), + Literal.of("2023-01-01").to(Types.DateType.get()), + "DATE '2024-01-01'", + "DATE '2023-01-01'"), + Arguments.of( + Types.TimestampType.withZone(), + Literal.of("2017-11-30T11:30:07.123456+01:00").to(Types.TimestampType.withZone()), + Literal.of("2017-11-29T11:30:07.123456+01:00").to(Types.TimestampType.withZone()), + "TIMESTAMP '2017-11-30 02:30:07.123456'", + "TIMESTAMP '2017-11-29 02:30:07.123456'"), + Arguments.of( + Types.BinaryType.get(), + Literal.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b})), + Literal.of(ByteBuffer.wrap(new byte[] {0x01, 0x02})), + "X'0A0B'", + "X'0102'"), + Arguments.of( + Types.FixedType.ofLength(4), + Literal.of("test".getBytes()), + Literal.of("init".getBytes()), + "X'74657374'", + "X'696E6974'")); + } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java new file mode 100644 index 000000000000..d0537ffb4df2 --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +/** + * Tests for Spark SQL Default values integration with Iceberg default values. + * + *

Note: These tests use {@code validationCatalog.createTable()} to create tables with default + * values because the Iceberg Spark integration does not yet support default value clauses in Spark + * DDL. + * + *

Partial column INSERT statements (e.g., {@code INSERT INTO table (col1) VALUES (val1)}) are + * not supported for dsv2 in Spark 4.0 and will be added in Spark 4.1.0. + */ +@ExtendWith(ParameterizedTestExtension.class) +public class TestSparkDefaultValues extends CatalogTestBase { + + @AfterEach + public void dropTestTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void testWriteDefaultWithExplicitDEFAULT() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withWriteDefault(Literal.of("default-data")) + .build()); + + validationCatalog.createTable( + tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); + + sql("INSERT INTO %s VALUES (1, DEFAULT)", commitTarget()); + + assertEquals( + "Should insert row with default values", + ImmutableList.of(row(1, "default-data")), + sql("SELECT * FROM %s", selectTarget())); + } + + @TestTemplate + public void testWriteDefaultForMultipleColumns() { + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional("bool_col") + .withId(2) + .ofType(Types.BooleanType.get()) + .withWriteDefault(Literal.of(true)) + .build(), + Types.NestedField.optional("int_col") + .withId(3) + .ofType(Types.IntegerType.get()) + .withWriteDefault(Literal.of(42)) + .build(), + Types.NestedField.optional("long_col") + .withId(4) + .ofType(Types.LongType.get()) + .withWriteDefault(Literal.of(100L)) + .build()); + + validationCatalog.createTable( + tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); + + sql("INSERT INTO %s VALUES (1, DEFAULT, DEFAULT, DEFAULT)", commitTarget()); + + assertEquals( + "Should have expected default values", + ImmutableList.of(row(1, true, 42, 100L)), + sql("SELECT * FROM %s", selectTarget())); + } + + @TestTemplate + public void testBulkInsertWithDefaults() { + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withWriteDefault(Literal.of("default_data")) + .build()); + + validationCatalog.createTable( + tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); + + sql("INSERT INTO %s VALUES (1, DEFAULT), (2, DEFAULT), (3, DEFAULT)", commitTarget()); + + assertEquals( + "Should insert multiple rows with default values", + ImmutableList.of(row(1, "default_data"), row(2, "default_data"), row(3, "default_data")), + sql("SELECT * FROM %s ORDER BY id", selectTarget())); + } +} From 3f882519e77549f5f58509178050b1bbf0f4717a Mon Sep 17 00:00:00 2001 From: geruh Date: Thu, 23 Oct 2025 10:21:32 -0700 Subject: [PATCH 2/7] use utc timestamps --- .../java/org/apache/iceberg/spark/TestSparkSchemaUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java index d24e5cd38912..f7d566ab1897 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java @@ -222,8 +222,8 @@ private static Stream schemaConversionWithDefaultsTestCases() { "DATE '2023-01-01'"), Arguments.of( Types.TimestampType.withZone(), - Literal.of("2017-11-30T11:30:07.123456+01:00").to(Types.TimestampType.withZone()), - Literal.of("2017-11-29T11:30:07.123456+01:00").to(Types.TimestampType.withZone()), + Literal.of("2017-11-30T10:30:07.123456+00:00").to(Types.TimestampType.withZone()), + Literal.of("2017-11-29T10:30:07.123456+00:00").to(Types.TimestampType.withZone()), "TIMESTAMP '2017-11-30 02:30:07.123456'", "TIMESTAMP '2017-11-29 02:30:07.123456'"), Arguments.of( From ccc427ad42059ea02e2eb7fc7ad176889f3f0f32 Mon Sep 17 00:00:00 2001 From: geruh Date: Thu, 23 Oct 2025 13:48:13 -0700 Subject: [PATCH 3/7] add tests for no tz and spark limitations --- .../iceberg/spark/TestSparkSchemaUtil.java | 75 +++++++++++-------- .../spark/sql/TestSparkDefaultValues.java | 72 +++++++++++++++++- 2 files changed, 114 insertions(+), 33 deletions(-) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java index f7d566ab1897..e72292379f30 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java @@ -24,6 +24,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.List; +import java.util.TimeZone; import java.util.stream.Stream; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; @@ -151,35 +152,41 @@ public void testSchemaConversionWithDefaultsForPrimitiveTypes( Literal initialDefault, String expectedCurrentDefaultValue, String expectedExistsDefaultValue) { - Schema schema = - new Schema( - Types.NestedField.optional("field") - .withId(1) - .ofType(type) - .withWriteDefault(writeDefault) - .withInitialDefault(initialDefault) - .build()); - - StructType sparkSchema = SparkSchemaUtil.convert(schema); - StructField defaultField = sparkSchema.fields()[0]; - Metadata metadata = defaultField.metadata(); - - assertThat(metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) - .as("Field of type %s should have CURRENT_DEFAULT metadata", type) - .isTrue(); - assertThat(metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) - .as("Field of type %s should have EXISTS_DEFAULT metadata", type) - .isTrue(); - assertThat(metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) - .as( - "Spark metadata CURRENT_DEFAULT for type %s should contain correctly formatted literal", - type) - .isEqualTo(expectedCurrentDefaultValue); - assertThat(metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) - .as( - "Spark metadata EXISTS_DEFAULT for type %s should contain correctly formatted literal", - type) - .isEqualTo(expectedExistsDefaultValue); + TimeZone systemTimeZone = TimeZone.getDefault(); + try { + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); + Schema schema = + new Schema( + Types.NestedField.optional("field") + .withId(1) + .ofType(type) + .withWriteDefault(writeDefault) + .withInitialDefault(initialDefault) + .build()); + + StructType sparkSchema = SparkSchemaUtil.convert(schema); + StructField defaultField = sparkSchema.fields()[0]; + Metadata metadata = defaultField.metadata(); + + assertThat(metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + .as("Field of type %s should have CURRENT_DEFAULT metadata", type) + .isTrue(); + assertThat(metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + .as("Field of type %s should have EXISTS_DEFAULT metadata", type) + .isTrue(); + assertThat(metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + .as( + "Spark metadata CURRENT_DEFAULT for type %s should contain correctly formatted literal", + type) + .isEqualTo(expectedCurrentDefaultValue); + assertThat(metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + .as( + "Spark metadata EXISTS_DEFAULT for type %s should contain correctly formatted literal", + type) + .isEqualTo(expectedExistsDefaultValue); + } finally { + TimeZone.setDefault(systemTimeZone); + } } private static Stream schemaConversionWithDefaultsTestCases() { @@ -224,8 +231,14 @@ private static Stream schemaConversionWithDefaultsTestCases() { Types.TimestampType.withZone(), Literal.of("2017-11-30T10:30:07.123456+00:00").to(Types.TimestampType.withZone()), Literal.of("2017-11-29T10:30:07.123456+00:00").to(Types.TimestampType.withZone()), - "TIMESTAMP '2017-11-30 02:30:07.123456'", - "TIMESTAMP '2017-11-29 02:30:07.123456'"), + "TIMESTAMP '2017-11-30 10:30:07.123456'", + "TIMESTAMP '2017-11-29 10:30:07.123456'"), + Arguments.of( + Types.TimestampType.withoutZone(), + Literal.of("2017-11-30T10:30:07.123456").to(Types.TimestampType.withoutZone()), + Literal.of("2017-11-29T10:30:07.123456").to(Types.TimestampType.withoutZone()), + "TIMESTAMP_NTZ '2017-11-30 10:30:07.123456'", + "TIMESTAMP_NTZ '2017-11-29 10:30:07.123456'"), Arguments.of( Types.BinaryType.get(), Literal.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b})), diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java index d0537ffb4df2..75a9b9bb384d 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.sql; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.PartitionSpec; @@ -28,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.spark.CatalogTestBase; import org.apache.iceberg.types.Types; +import org.apache.spark.sql.AnalysisException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -37,10 +39,13 @@ * *

Note: These tests use {@code validationCatalog.createTable()} to create tables with default * values because the Iceberg Spark integration does not yet support default value clauses in Spark - * DDL. + * DDL. See {@link #testCreateTableWithDefaultsNotYetSupported()} and {@link + * #testAlterTableAddColumnWithDefaultNotYetSupported()} for verification that DDL with defaults + * currently throws exceptions. * *

Partial column INSERT statements (e.g., {@code INSERT INTO table (col1) VALUES (val1)}) are - * not supported for dsv2 in Spark 4.0 and will be added in Spark 4.1.0. + * not supported for DSV2 in Spark 4.0 See {@link #testPartialInsertNotYetSupportedInSpark()} for + * verification. */ @ExtendWith(ParameterizedTestExtension.class) public class TestSparkDefaultValues extends CatalogTestBase { @@ -78,6 +83,10 @@ public void testWriteDefaultWithExplicitDEFAULT() { @TestTemplate public void testWriteDefaultForMultipleColumns() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + Schema schema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), @@ -110,6 +119,10 @@ public void testWriteDefaultForMultipleColumns() { @TestTemplate public void testBulkInsertWithDefaults() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + Schema schema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), @@ -129,4 +142,59 @@ public void testBulkInsertWithDefaults() { ImmutableList.of(row(1, "default_data"), row(2, "default_data"), row(3, "default_data")), sql("SELECT * FROM %s ORDER BY id", selectTarget())); } + + @TestTemplate + public void testCreateTableWithDefaultsNotYetSupported() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + assertThatThrownBy( + () -> + sql( + "CREATE TABLE %s (id INT, data STRING DEFAULT 'default-value') USING iceberg", + tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("does not support column default value"); + } + + @TestTemplate + public void testAlterTableAddColumnWithDefaultNotYetSupported() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + + validationCatalog.createTable( + tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); + + assertThatThrownBy( + () -> sql("ALTER TABLE %s ADD COLUMN data STRING DEFAULT 'default-value'", tableName)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("default values in Spark is currently unsupported"); + } + + @TestTemplate + public void testPartialInsertNotYetSupportedInSpark() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withWriteDefault(Literal.of("default-data")) + .build()); + + validationCatalog.createTable( + tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); + + assertThatThrownBy(() -> sql("INSERT INTO %s (id) VALUES (1)", commitTarget())) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot find data for the output column"); + } } From 55885a5742e42ec22a2b9d66ad5d1dd0e67ea10b Mon Sep 17 00:00:00 2001 From: geruh Date: Thu, 23 Oct 2025 23:24:55 -0700 Subject: [PATCH 4/7] fix test capitalization --- .../org/apache/iceberg/spark/sql/TestSparkDefaultValues.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java index 75a9b9bb384d..83edacf256f9 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java @@ -56,7 +56,7 @@ public void dropTestTable() { } @TestTemplate - public void testWriteDefaultWithExplicitDEFAULT() { + public void testWriteDefaultWithExplicitDefault() { assertThat(validationCatalog.tableExists(tableIdent)) .as("Table should not already exist") .isFalse(); From b288b390f6b4b3113a07190008c7580c963c54d1 Mon Sep 17 00:00:00 2001 From: geruh Date: Tue, 28 Oct 2025 00:16:53 -0700 Subject: [PATCH 5/7] Address comments --- .../apache/iceberg/spark/TypeToSparkType.java | 8 +- .../iceberg/spark/TestSparkSchemaUtil.java | 44 ++++++--- .../spark/sql/TestSparkDefaultValues.java | 97 +++++++++++++------ 3 files changed, 101 insertions(+), 48 deletions(-) diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index 5d0346d20f3f..c20393c25c80 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -70,17 +70,14 @@ public DataType struct(Types.StructType struct, List fieldResults) { if (field.doc() != null) { sparkField = sparkField.withComment(field.doc()); } + // Convert both write and initial default values to Spark SQL string literal representations + // on the StructField metadata if (field.writeDefault() != null) { - // Convert Iceberg default value to Spark SQL string representation. Spark stores default - // values as SQL strings in column metadata. The .sql() method formats literals correctly - // for each type Object writeDefault = SparkUtil.internalToSpark(field.type(), field.writeDefault()); sparkField = sparkField.withCurrentDefaultValue(Literal$.MODULE$.create(writeDefault, type).sql()); } if (field.initialDefault() != null) { - // Same conversion for existence default values, used for existing rows when column is added - // to schema Object initialDefault = SparkUtil.internalToSpark(field.type(), field.initialDefault()); sparkField = sparkField.withExistenceDefaultValue( @@ -88,6 +85,7 @@ public DataType struct(Types.StructType struct, List fieldResults) { } sparkFields.add(sparkField); } + return StructType$.MODULE$.apply(sparkFields); } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java index e72292379f30..1e36d51d5b43 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.MetadataAttribute; import org.apache.spark.sql.catalyst.types.DataTypeUtils; +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils$; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; @@ -44,9 +45,6 @@ public class TestSparkSchemaUtil { - private static final String CURRENT_DEFAULT_COLUMN_METADATA_KEY = "CURRENT_DEFAULT"; - private static final String EXISTS_DEFAULT_COLUMN_METADATA_KEY = "EXISTS_DEFAULT"; - private static final Schema TEST_SCHEMA = new Schema( optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get())); @@ -109,13 +107,19 @@ public void testSchemaConversionWithOnlyWriteDefault() { StructType sparkSchema = SparkSchemaUtil.convert(schema); Metadata metadata = sparkSchema.fields()[0].metadata(); - assertThat(metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY())) .as("Field with only write default should have CURRENT_DEFAULT metadata") .isTrue(); - assertThat(metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY())) .as("Field with only write default should not have EXISTS_DEFAULT metadata") .isFalse(); - assertThat(metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + assertThat( + metadata.getString( + ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY())) .as("Spark metadata CURRENT_DEFAULT should contain correctly formatted literal") .isEqualTo("'write_only'"); } @@ -133,13 +137,19 @@ public void testSchemaConversionWithOnlyInitialDefault() { StructType sparkSchema = SparkSchemaUtil.convert(schema); Metadata metadata = sparkSchema.fields()[0].metadata(); - assertThat(metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY())) .as("Field with only initial default should not have CURRENT_DEFAULT metadata") .isFalse(); - assertThat(metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY())) .as("Field with only initial default should have EXISTS_DEFAULT metadata") .isTrue(); - assertThat(metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + assertThat( + metadata.getString( + ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY())) .as("Spark metadata EXISTS_DEFAULT should contain correctly formatted literal") .isEqualTo("42"); } @@ -168,18 +178,26 @@ public void testSchemaConversionWithDefaultsForPrimitiveTypes( StructField defaultField = sparkSchema.fields()[0]; Metadata metadata = defaultField.metadata(); - assertThat(metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY())) .as("Field of type %s should have CURRENT_DEFAULT metadata", type) .isTrue(); - assertThat(metadata.contains(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY())) .as("Field of type %s should have EXISTS_DEFAULT metadata", type) .isTrue(); - assertThat(metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) + assertThat( + metadata.getString( + ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY())) .as( "Spark metadata CURRENT_DEFAULT for type %s should contain correctly formatted literal", type) .isEqualTo(expectedCurrentDefaultValue); - assertThat(metadata.getString(EXISTS_DEFAULT_COLUMN_METADATA_KEY)) + assertThat( + metadata.getString( + ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY())) .as( "Spark metadata EXISTS_DEFAULT for type %s should contain correctly formatted literal", type) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java index 83edacf256f9..742feaa6a80a 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java @@ -39,13 +39,10 @@ * *

Note: These tests use {@code validationCatalog.createTable()} to create tables with default * values because the Iceberg Spark integration does not yet support default value clauses in Spark - * DDL. See {@link #testCreateTableWithDefaultsNotYetSupported()} and {@link - * #testAlterTableAddColumnWithDefaultNotYetSupported()} for verification that DDL with defaults - * currently throws exceptions. + * DDL. * *

Partial column INSERT statements (e.g., {@code INSERT INTO table (col1) VALUES (val1)}) are - * not supported for DSV2 in Spark 4.0 See {@link #testPartialInsertNotYetSupportedInSpark()} for - * verification. + * not supported for DSV2 in Spark 4.0 */ @ExtendWith(ParameterizedTestExtension.class) public class TestSparkDefaultValues extends CatalogTestBase { @@ -56,7 +53,7 @@ public void dropTestTable() { } @TestTemplate - public void testWriteDefaultWithExplicitDefault() { + public void testWriteDefaultWithSparkDefaultKeyword() { assertThat(validationCatalog.tableExists(tableIdent)) .as("Table should not already exist") .isFalse(); @@ -64,25 +61,35 @@ public void testWriteDefaultWithExplicitDefault() { Schema schema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional("data") + Types.NestedField.optional("bool_col") .withId(2) - .ofType(Types.StringType.get()) - .withWriteDefault(Literal.of("default-data")) + .ofType(Types.BooleanType.get()) + .withWriteDefault(Literal.of(true)) + .build(), + Types.NestedField.optional("int_col") + .withId(3) + .ofType(Types.IntegerType.get()) + .withWriteDefault(Literal.of(42)) + .build(), + Types.NestedField.optional("long_col") + .withId(4) + .ofType(Types.LongType.get()) + .withWriteDefault(Literal.of(100L)) .build()); validationCatalog.createTable( tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); - sql("INSERT INTO %s VALUES (1, DEFAULT)", commitTarget()); + sql("INSERT INTO %s VALUES (1, DEFAULT, DEFAULT, DEFAULT)", commitTarget()); assertEquals( - "Should insert row with default values", - ImmutableList.of(row(1, "default-data")), + "Should have expected default values", + ImmutableList.of(row(1, true, 42, 100L)), sql("SELECT * FROM %s", selectTarget())); } @TestTemplate - public void testWriteDefaultForMultipleColumns() { + public void testWriteDefaultWithDefaultKeywordAndReorderedSchema() { assertThat(validationCatalog.tableExists(tableIdent)) .as("Table should not already exist") .isFalse(); @@ -90,30 +97,26 @@ public void testWriteDefaultForMultipleColumns() { Schema schema = new Schema( Types.NestedField.required(1, "id", Types.IntegerType.get()), - Types.NestedField.optional("bool_col") - .withId(2) - .ofType(Types.BooleanType.get()) - .withWriteDefault(Literal.of(true)) - .build(), Types.NestedField.optional("int_col") - .withId(3) + .withId(2) .ofType(Types.IntegerType.get()) - .withWriteDefault(Literal.of(42)) + .withWriteDefault(Literal.of(123)) .build(), - Types.NestedField.optional("long_col") - .withId(4) - .ofType(Types.LongType.get()) - .withWriteDefault(Literal.of(100L)) + Types.NestedField.optional("string_col") + .withId(3) + .ofType(Types.StringType.get()) + .withWriteDefault(Literal.of("doom")) .build()); validationCatalog.createTable( tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); - sql("INSERT INTO %s VALUES (1, DEFAULT, DEFAULT, DEFAULT)", commitTarget()); + // Insert with columns in different order than table schema + sql("INSERT INTO %s (int_col, id, string_col) VALUES (DEFAULT, 1, DEFAULT)", commitTarget()); assertEquals( - "Should have expected default values", - ImmutableList.of(row(1, true, 42, 100L)), + "Should apply correct defaults regardless of column order", + ImmutableList.of(row(1, 123, "doom")), sql("SELECT * FROM %s", selectTarget())); } @@ -144,7 +147,7 @@ public void testBulkInsertWithDefaults() { } @TestTemplate - public void testCreateTableWithDefaultsNotYetSupported() { + public void testCreateTableWithDefaultsUnsupported() { assertThat(validationCatalog.tableExists(tableIdent)) .as("Table should not already exist") .isFalse(); @@ -159,7 +162,7 @@ public void testCreateTableWithDefaultsNotYetSupported() { } @TestTemplate - public void testAlterTableAddColumnWithDefaultNotYetSupported() { + public void testAlterTableAddColumnWithDefaultUnsupported() { assertThat(validationCatalog.tableExists(tableIdent)) .as("Table should not already exist") .isFalse(); @@ -176,7 +179,7 @@ public void testAlterTableAddColumnWithDefaultNotYetSupported() { } @TestTemplate - public void testPartialInsertNotYetSupportedInSpark() { + public void testPartialInsertUnsupported() { assertThat(validationCatalog.tableExists(tableIdent)) .as("Table should not already exist") .isFalse(); @@ -197,4 +200,38 @@ public void testPartialInsertNotYetSupportedInSpark() { .isInstanceOf(AnalysisException.class) .hasMessageContaining("Cannot find data for the output column"); } + + @TestTemplate + public void testSchemaEvolutionWithDefaultValueChanges() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + Schema initialSchema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + + validationCatalog.createTable( + tableIdent, + initialSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of("format-version", "3")); + + sql("INSERT INTO %s VALUES (1), (2)", commitTarget()); + + // Add a column with a default value + validationCatalog + .loadTable(tableIdent) + .updateSchema() + .addColumn("data", Types.StringType.get(), Literal.of("default_data")) + .commit(); + + // Refresh this when using SparkCatalog since otherwise the new column would not be caught. + sql("REFRESH TABLE %s", commitTarget()); + + sql("INSERT INTO %s VALUES (3, DEFAULT)", commitTarget()); + + assertEquals( + "Should have correct default values for existing and new rows", + ImmutableList.of(row(1, "default_data"), row(2, "default_data"), row(3, "default_data")), + sql("SELECT * FROM %s ORDER BY id", selectTarget())); + } } From e1a05671989b80886ca7b6a5a9c8103d48562a1c Mon Sep 17 00:00:00 2001 From: geruh Date: Tue, 28 Oct 2025 20:42:34 -0700 Subject: [PATCH 6/7] Fix codestyle comments --- .../main/java/org/apache/iceberg/spark/TypeToSparkType.java | 3 +++ .../org/apache/iceberg/spark/sql/TestSparkDefaultValues.java | 3 --- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index c20393c25c80..09c89bbba813 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -70,6 +70,7 @@ public DataType struct(Types.StructType struct, List fieldResults) { if (field.doc() != null) { sparkField = sparkField.withComment(field.doc()); } + // Convert both write and initial default values to Spark SQL string literal representations // on the StructField metadata if (field.writeDefault() != null) { @@ -77,12 +78,14 @@ public DataType struct(Types.StructType struct, List fieldResults) { sparkField = sparkField.withCurrentDefaultValue(Literal$.MODULE$.create(writeDefault, type).sql()); } + if (field.initialDefault() != null) { Object initialDefault = SparkUtil.internalToSpark(field.type(), field.initialDefault()); sparkField = sparkField.withExistenceDefaultValue( Literal$.MODULE$.create(initialDefault, type).sql()); } + sparkFields.add(sparkField); } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java index 742feaa6a80a..ba856dc5383b 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java @@ -21,7 +21,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import org.apache.iceberg.ParameterizedTestExtension; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.expressions.Literal; @@ -32,7 +31,6 @@ import org.apache.spark.sql.AnalysisException; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.TestTemplate; -import org.junit.jupiter.api.extension.ExtendWith; /** * Tests for Spark SQL Default values integration with Iceberg default values. @@ -44,7 +42,6 @@ *

Partial column INSERT statements (e.g., {@code INSERT INTO table (col1) VALUES (val1)}) are * not supported for DSV2 in Spark 4.0 */ -@ExtendWith(ParameterizedTestExtension.class) public class TestSparkDefaultValues extends CatalogTestBase { @AfterEach From b9444239f5fdd2e706323078f0c7aef6a54dcdbd Mon Sep 17 00:00:00 2001 From: geruh Date: Wed, 29 Oct 2025 10:34:38 -0700 Subject: [PATCH 7/7] remove new line --- .../test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java | 1 - 1 file changed, 1 deletion(-) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java index 1e36d51d5b43..d5f407a715ef 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java @@ -44,7 +44,6 @@ import org.junit.jupiter.params.provider.MethodSource; public class TestSparkSchemaUtil { - private static final Schema TEST_SCHEMA = new Schema( optional(1, "id", Types.IntegerType.get()), optional(2, "data", Types.StringType.get()));