diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java index cbee10fbb069..09c89bbba813 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java @@ -25,6 +25,7 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.expressions.Literal$; import org.apache.spark.sql.types.ArrayType$; import org.apache.spark.sql.types.BinaryType$; import org.apache.spark.sql.types.BooleanType$; @@ -69,6 +70,22 @@ public DataType struct(Types.StructType struct, List fieldResults) { if (field.doc() != null) { sparkField = sparkField.withComment(field.doc()); } + + // Convert both write and initial default values to Spark SQL string literal representations + // on the StructField metadata + if (field.writeDefault() != null) { + Object writeDefault = SparkUtil.internalToSpark(field.type(), field.writeDefault()); + sparkField = + sparkField.withCurrentDefaultValue(Literal$.MODULE$.create(writeDefault, type).sql()); + } + + if (field.initialDefault() != null) { + Object initialDefault = SparkUtil.internalToSpark(field.type(), field.initialDefault()); + sparkField = + sparkField.withExistenceDefaultValue( + Literal$.MODULE$.create(initialDefault, type).sql()); + } + sparkFields.add(sparkField); } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java index 4045847d5a4a..d5f407a715ef 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/TestSparkSchemaUtil.java @@ -21,15 +21,27 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.assertj.core.api.Assertions.assertThat; +import java.math.BigDecimal; +import java.nio.ByteBuffer; import java.util.List; +import java.util.TimeZone; +import java.util.stream.Stream; import org.apache.iceberg.MetadataColumns; import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.MetadataAttribute; import org.apache.spark.sql.catalyst.types.DataTypeUtils; +import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils$; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; public class TestSparkSchemaUtil { private static final Schema TEST_SCHEMA = @@ -80,4 +92,181 @@ public void testSchemaConversionWithMetaDataColumnSchema() { } } } + + @Test + public void testSchemaConversionWithOnlyWriteDefault() { + Schema schema = + new Schema( + Types.NestedField.optional("field") + .withId(1) + .ofType(Types.StringType.get()) + .withWriteDefault(Literal.of("write_only")) + .build()); + + StructType sparkSchema = SparkSchemaUtil.convert(schema); + Metadata metadata = sparkSchema.fields()[0].metadata(); + + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY())) + .as("Field with only write default should have CURRENT_DEFAULT metadata") + .isTrue(); + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY())) + .as("Field with only write default should not have EXISTS_DEFAULT metadata") + .isFalse(); + assertThat( + metadata.getString( + ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY())) + .as("Spark metadata CURRENT_DEFAULT should contain correctly formatted literal") + .isEqualTo("'write_only'"); + } + + @Test + public void testSchemaConversionWithOnlyInitialDefault() { + Schema schema = + new Schema( + Types.NestedField.optional("field") + .withId(1) + .ofType(Types.IntegerType.get()) + .withInitialDefault(Literal.of(42)) + .build()); + + StructType sparkSchema = SparkSchemaUtil.convert(schema); + Metadata metadata = sparkSchema.fields()[0].metadata(); + + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY())) + .as("Field with only initial default should not have CURRENT_DEFAULT metadata") + .isFalse(); + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY())) + .as("Field with only initial default should have EXISTS_DEFAULT metadata") + .isTrue(); + assertThat( + metadata.getString( + ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY())) + .as("Spark metadata EXISTS_DEFAULT should contain correctly formatted literal") + .isEqualTo("42"); + } + + @ParameterizedTest(name = "{0} with writeDefault={1}, initialDefault={2}") + @MethodSource("schemaConversionWithDefaultsTestCases") + public void testSchemaConversionWithDefaultsForPrimitiveTypes( + Type type, + Literal writeDefault, + Literal initialDefault, + String expectedCurrentDefaultValue, + String expectedExistsDefaultValue) { + TimeZone systemTimeZone = TimeZone.getDefault(); + try { + TimeZone.setDefault(TimeZone.getTimeZone("UTC")); + Schema schema = + new Schema( + Types.NestedField.optional("field") + .withId(1) + .ofType(type) + .withWriteDefault(writeDefault) + .withInitialDefault(initialDefault) + .build()); + + StructType sparkSchema = SparkSchemaUtil.convert(schema); + StructField defaultField = sparkSchema.fields()[0]; + Metadata metadata = defaultField.metadata(); + + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY())) + .as("Field of type %s should have CURRENT_DEFAULT metadata", type) + .isTrue(); + assertThat( + metadata.contains( + ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY())) + .as("Field of type %s should have EXISTS_DEFAULT metadata", type) + .isTrue(); + assertThat( + metadata.getString( + ResolveDefaultColumnsUtils$.MODULE$.CURRENT_DEFAULT_COLUMN_METADATA_KEY())) + .as( + "Spark metadata CURRENT_DEFAULT for type %s should contain correctly formatted literal", + type) + .isEqualTo(expectedCurrentDefaultValue); + assertThat( + metadata.getString( + ResolveDefaultColumnsUtils$.MODULE$.EXISTS_DEFAULT_COLUMN_METADATA_KEY())) + .as( + "Spark metadata EXISTS_DEFAULT for type %s should contain correctly formatted literal", + type) + .isEqualTo(expectedExistsDefaultValue); + } finally { + TimeZone.setDefault(systemTimeZone); + } + } + + private static Stream schemaConversionWithDefaultsTestCases() { + return Stream.of( + Arguments.of(Types.IntegerType.get(), Literal.of(1), Literal.of(2), "1", "2"), + Arguments.of( + Types.StringType.get(), + Literal.of("write_default"), + Literal.of("initial_default"), + "'write_default'", + "'initial_default'"), + Arguments.of( + Types.UUIDType.get(), + Literal.of("f79c3e09-677c-4bbd-a479-3f349cb785e7").to(Types.UUIDType.get()), + Literal.of("f79c3e09-677c-4bbd-a479-3f349cb685e7").to(Types.UUIDType.get()), + "'f79c3e09-677c-4bbd-a479-3f349cb785e7'", + "'f79c3e09-677c-4bbd-a479-3f349cb685e7'"), + Arguments.of(Types.BooleanType.get(), Literal.of(true), Literal.of(false), "true", "false"), + Arguments.of(Types.IntegerType.get(), Literal.of(42), Literal.of(10), "42", "10"), + Arguments.of(Types.LongType.get(), Literal.of(100L), Literal.of(50L), "100L", "50L"), + Arguments.of( + Types.FloatType.get(), + Literal.of(3.14f), + Literal.of(1.5f), + "CAST('3.14' AS FLOAT)", + "CAST('1.5' AS FLOAT)"), + Arguments.of( + Types.DoubleType.get(), Literal.of(2.718), Literal.of(1.414), "2.718D", "1.414D"), + Arguments.of( + Types.DecimalType.of(10, 2), + Literal.of(new BigDecimal("99.99")), + Literal.of(new BigDecimal("11.11")), + "99.99BD", + "11.11BD"), + Arguments.of( + Types.DateType.get(), + Literal.of("2024-01-01").to(Types.DateType.get()), + Literal.of("2023-01-01").to(Types.DateType.get()), + "DATE '2024-01-01'", + "DATE '2023-01-01'"), + Arguments.of( + Types.TimestampType.withZone(), + Literal.of("2017-11-30T10:30:07.123456+00:00").to(Types.TimestampType.withZone()), + Literal.of("2017-11-29T10:30:07.123456+00:00").to(Types.TimestampType.withZone()), + "TIMESTAMP '2017-11-30 10:30:07.123456'", + "TIMESTAMP '2017-11-29 10:30:07.123456'"), + Arguments.of( + Types.TimestampType.withoutZone(), + Literal.of("2017-11-30T10:30:07.123456").to(Types.TimestampType.withoutZone()), + Literal.of("2017-11-29T10:30:07.123456").to(Types.TimestampType.withoutZone()), + "TIMESTAMP_NTZ '2017-11-30 10:30:07.123456'", + "TIMESTAMP_NTZ '2017-11-29 10:30:07.123456'"), + Arguments.of( + Types.BinaryType.get(), + Literal.of(ByteBuffer.wrap(new byte[] {0x0a, 0x0b})), + Literal.of(ByteBuffer.wrap(new byte[] {0x01, 0x02})), + "X'0A0B'", + "X'0102'"), + Arguments.of( + Types.FixedType.ofLength(4), + Literal.of("test".getBytes()), + Literal.of("init".getBytes()), + "X'74657374'", + "X'696E6974'")); + } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java new file mode 100644 index 000000000000..ba856dc5383b --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSparkDefaultValues.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.sql; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.AnalysisException; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.TestTemplate; + +/** + * Tests for Spark SQL Default values integration with Iceberg default values. + * + *

Note: These tests use {@code validationCatalog.createTable()} to create tables with default + * values because the Iceberg Spark integration does not yet support default value clauses in Spark + * DDL. + * + *

Partial column INSERT statements (e.g., {@code INSERT INTO table (col1) VALUES (val1)}) are + * not supported for DSV2 in Spark 4.0 + */ +public class TestSparkDefaultValues extends CatalogTestBase { + + @AfterEach + public void dropTestTable() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @TestTemplate + public void testWriteDefaultWithSparkDefaultKeyword() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional("bool_col") + .withId(2) + .ofType(Types.BooleanType.get()) + .withWriteDefault(Literal.of(true)) + .build(), + Types.NestedField.optional("int_col") + .withId(3) + .ofType(Types.IntegerType.get()) + .withWriteDefault(Literal.of(42)) + .build(), + Types.NestedField.optional("long_col") + .withId(4) + .ofType(Types.LongType.get()) + .withWriteDefault(Literal.of(100L)) + .build()); + + validationCatalog.createTable( + tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); + + sql("INSERT INTO %s VALUES (1, DEFAULT, DEFAULT, DEFAULT)", commitTarget()); + + assertEquals( + "Should have expected default values", + ImmutableList.of(row(1, true, 42, 100L)), + sql("SELECT * FROM %s", selectTarget())); + } + + @TestTemplate + public void testWriteDefaultWithDefaultKeywordAndReorderedSchema() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional("int_col") + .withId(2) + .ofType(Types.IntegerType.get()) + .withWriteDefault(Literal.of(123)) + .build(), + Types.NestedField.optional("string_col") + .withId(3) + .ofType(Types.StringType.get()) + .withWriteDefault(Literal.of("doom")) + .build()); + + validationCatalog.createTable( + tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); + + // Insert with columns in different order than table schema + sql("INSERT INTO %s (int_col, id, string_col) VALUES (DEFAULT, 1, DEFAULT)", commitTarget()); + + assertEquals( + "Should apply correct defaults regardless of column order", + ImmutableList.of(row(1, 123, "doom")), + sql("SELECT * FROM %s", selectTarget())); + } + + @TestTemplate + public void testBulkInsertWithDefaults() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withWriteDefault(Literal.of("default_data")) + .build()); + + validationCatalog.createTable( + tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); + + sql("INSERT INTO %s VALUES (1, DEFAULT), (2, DEFAULT), (3, DEFAULT)", commitTarget()); + + assertEquals( + "Should insert multiple rows with default values", + ImmutableList.of(row(1, "default_data"), row(2, "default_data"), row(3, "default_data")), + sql("SELECT * FROM %s ORDER BY id", selectTarget())); + } + + @TestTemplate + public void testCreateTableWithDefaultsUnsupported() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + assertThatThrownBy( + () -> + sql( + "CREATE TABLE %s (id INT, data STRING DEFAULT 'default-value') USING iceberg", + tableName)) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("does not support column default value"); + } + + @TestTemplate + public void testAlterTableAddColumnWithDefaultUnsupported() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + Schema schema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + + validationCatalog.createTable( + tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); + + assertThatThrownBy( + () -> sql("ALTER TABLE %s ADD COLUMN data STRING DEFAULT 'default-value'", tableName)) + .isInstanceOf(UnsupportedOperationException.class) + .hasMessageContaining("default values in Spark is currently unsupported"); + } + + @TestTemplate + public void testPartialInsertUnsupported() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + Schema schema = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withWriteDefault(Literal.of("default-data")) + .build()); + + validationCatalog.createTable( + tableIdent, schema, PartitionSpec.unpartitioned(), ImmutableMap.of("format-version", "3")); + + assertThatThrownBy(() -> sql("INSERT INTO %s (id) VALUES (1)", commitTarget())) + .isInstanceOf(AnalysisException.class) + .hasMessageContaining("Cannot find data for the output column"); + } + + @TestTemplate + public void testSchemaEvolutionWithDefaultValueChanges() { + assertThat(validationCatalog.tableExists(tableIdent)) + .as("Table should not already exist") + .isFalse(); + + Schema initialSchema = new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())); + + validationCatalog.createTable( + tableIdent, + initialSchema, + PartitionSpec.unpartitioned(), + ImmutableMap.of("format-version", "3")); + + sql("INSERT INTO %s VALUES (1), (2)", commitTarget()); + + // Add a column with a default value + validationCatalog + .loadTable(tableIdent) + .updateSchema() + .addColumn("data", Types.StringType.get(), Literal.of("default_data")) + .commit(); + + // Refresh this when using SparkCatalog since otherwise the new column would not be caught. + sql("REFRESH TABLE %s", commitTarget()); + + sql("INSERT INTO %s VALUES (3, DEFAULT)", commitTarget()); + + assertEquals( + "Should have correct default values for existing and new rows", + ImmutableList.of(row(1, "default_data"), row(2, "default_data"), row(3, "default_data")), + sql("SELECT * FROM %s ORDER BY id", selectTarget())); + } +}