From da0aaf01924f258856a0c0001eed451e6426de5a Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 13 Dec 2024 13:54:47 -0800 Subject: [PATCH 1/2] Parquet: Implement defaults for generic data. --- .../apache/iceberg/data/DataTestHelpers.java | 17 +- .../iceberg/data/parquet/TestGenericData.java | 174 ++++++++++++++++-- .../data/parquet/BaseParquetReaders.java | 23 ++- 3 files changed, 178 insertions(+), 36 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java index 0573897dab72..72e3973382af 100644 --- a/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java +++ b/data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java @@ -29,14 +29,15 @@ public class DataTestHelpers { private DataTestHelpers() {} public static void assertEquals(Types.StructType struct, Record expected, Record actual) { - List fields = struct.fields(); - for (int i = 0; i < fields.size(); i += 1) { - Type fieldType = fields.get(i).type(); - - Object expectedValue = expected.get(i); - Object actualValue = actual.get(i); - - assertEquals(fieldType, expectedValue, actualValue); + Types.StructType expectedType = expected.struct(); + for (Types.NestedField field : struct.fields()) { + Types.NestedField expectedField = expectedType.field(field.fieldId()); + if (expectedField != null) { + assertEquals( + field.type(), expected.getField(expectedField.name()), actual.getField(field.name())); + } else { + assertThat(actual.getField(field.name())).isEqualTo(field.initialDefault()); + } } } diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java index 6de56570589c..37ebf83e92a9 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java @@ -19,7 +19,9 @@ package org.apache.iceberg.data.parquet; import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -34,15 +36,14 @@ import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.DataTest; import org.apache.iceberg.data.DataTestHelpers; -import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.CloseableIterable; -import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.NestedField; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.junit.jupiter.api.Test; @@ -50,14 +51,18 @@ public class TestGenericData extends DataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { - List expected = RandomGenericData.generate(schema, 100, 0L); + writeAndValidate(schema, schema); + } + + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + List expected = RandomGenericData.generate(writeSchema, 100, 12228L); File testFile = File.createTempFile("junit", null, temp.toFile()); assertThat(testFile.delete()).isTrue(); try (FileAppender appender = Parquet.write(Files.localOutput(testFile)) - .schema(schema) + .schema(writeSchema) .createWriterFunc(GenericParquetWriter::buildWriter) .build()) { appender.addAll(expected); @@ -66,29 +71,29 @@ protected void writeAndValidate(Schema schema) throws IOException { List rows; try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) - .project(schema) - .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .project(expectedSchema) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(expectedSchema, fileSchema)) .build()) { rows = Lists.newArrayList(reader); } for (int i = 0; i < expected.size(); i += 1) { - DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); + DataTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(i), rows.get(i)); } // test reuseContainers try (CloseableIterable reader = Parquet.read(Files.localInput(testFile)) - .project(schema) + .project(expectedSchema) .reuseContainers() - .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) + .createReaderFunc( + fileSchema -> GenericParquetReaders.buildReader(expectedSchema, fileSchema)) .build()) { - CloseableIterator it = reader.iterator(); - int idx = 0; - while (it.hasNext()) { - GenericRecord actualRecord = (GenericRecord) it.next(); - DataTestHelpers.assertEquals(schema.asStruct(), expected.get(idx), actualRecord); - idx++; + int index = 0; + for (Record actualRecord : reader) { + DataTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(index), actualRecord); + index += 1; } } } @@ -131,14 +136,143 @@ public void testTwoLevelList() throws IOException { .reuseContainers() .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema)) .build()) { - CloseableIterator it = reader.iterator(); - assertThat(it).hasNext(); - while (it.hasNext()) { - GenericRecord actualRecord = (GenericRecord) it.next(); + for (Record actualRecord : reader) { assertThat(actualRecord.get(0, ArrayList.class)).first().isEqualTo(expectedBinary); assertThat(actualRecord.get(1, ByteBuffer.class)).isEqualTo(expectedBinary); - assertThat(it).isExhausted(); } + + assertThat(Lists.newArrayList(reader).size()).isEqualTo(1); } } + + @Test + public void testMissingRequiredWithoutDefault() { + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + NestedField.required("missing_str") + .withId(6) + .ofType(Types.StringType.get()) + .withDoc("Missing required field with no default") + .build()); + + assertThatThrownBy(() -> writeAndValidate(writeSchema, expectedSchema)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Missing required field: missing_str"); + } + + @Test + public void testDefaultValues() throws IOException { + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + NestedField.required("missing_str") + .withId(6) + .ofType(Types.StringType.get()) + .withInitialDefault("orange") + .build(), + NestedField.optional("missing_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testNullDefaultValue() throws IOException { + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + NestedField.optional("missing_date").withId(3).ofType(Types.DateType.get()).build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testNestedDefaultValue() throws IOException { + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + NestedField.optional("nested") + .withId(3) + .ofType(Types.StructType.of(required(4, "inner", Types.StringType.get()))) + .withDoc("Used to test nested field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + NestedField.optional("nested") + .withId(3) + .ofType( + Types.StructType.of( + required(4, "inner", Types.StringType.get()), + NestedField.optional("missing_inner_float") + .withId(5) + .ofType(Types.FloatType.get()) + .withInitialDefault(-0.0F) + .build())) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java index 38fd69393023..b3a1948d0a35 100644 --- a/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/data/parquet/BaseParquetReaders.java @@ -263,6 +263,7 @@ public ParquetValueReader struct( int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); + ParquetValueReader reader = readersById.get(id); if (idToConstant.containsKey(id)) { // containsKey is used because the constant may be null int fieldMaxDefinitionLevel = @@ -276,15 +277,21 @@ public ParquetValueReader struct( } else if (id == MetadataColumns.IS_DELETED.fieldId()) { reorderedFields.add(ParquetValueReaders.constant(false)); types.add(null); + } else if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else if (field.initialDefault() != null) { + reorderedFields.add( + ParquetValueReaders.constant( + field.initialDefault(), + maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel))); + types.add(typesById.get(id)); + } else if (field.isOptional()) { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); } else { - ParquetValueReader reader = readersById.get(id); - if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); - types.add(null); - } + throw new IllegalArgumentException( + String.format("Missing required field: %s", field.name())); } } From 720a48add1204c1877999429b56942609affe25a Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 16 Dec 2024 09:45:41 -0800 Subject: [PATCH 2/2] Update tests for review comments. --- .../iceberg/data/parquet/TestGenericData.java | 110 +++++++++++++++--- 1 file changed, 96 insertions(+), 14 deletions(-) diff --git a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java index 37ebf83e92a9..5c7c11f1d231 100644 --- a/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java @@ -147,24 +147,11 @@ public void testTwoLevelList() throws IOException { @Test public void testMissingRequiredWithoutDefault() { - Schema writeSchema = - new Schema( - required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .withDoc("Should not produce default value") - .build()); + Schema writeSchema = new Schema(required(1, "id", Types.LongType.get())); Schema expectedSchema = new Schema( required(1, "id", Types.LongType.get()), - NestedField.optional("data") - .withId(2) - .ofType(Types.StringType.get()) - .withInitialDefault("wrong!") - .build(), NestedField.required("missing_str") .withId(6) .ofType(Types.StringType.get()) @@ -275,4 +262,99 @@ public void testNestedDefaultValue() throws IOException { writeAndValidate(writeSchema, expectedSchema); } + + @Test + public void testMapNestedDefaultValue() throws IOException { + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + NestedField.optional("nested_map") + .withId(3) + .ofType( + Types.MapType.ofOptional( + 4, + 5, + Types.StringType.get(), + Types.StructType.of(required(6, "value_str", Types.StringType.get())))) + .withDoc("Used to test nested map value field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + NestedField.optional("nested_map") + .withId(3) + .ofType( + Types.MapType.ofOptional( + 4, + 5, + Types.StringType.get(), + Types.StructType.of( + required(6, "value_str", Types.StringType.get()), + Types.NestedField.optional("value_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()))) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testListNestedDefaultValue() throws IOException { + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + NestedField.optional("nested_list") + .withId(3) + .ofType( + Types.ListType.ofOptional( + 4, Types.StructType.of(required(5, "element_str", Types.StringType.get())))) + .withDoc("Used to test nested field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + NestedField.optional("nested_list") + .withId(3) + .ofType( + Types.ListType.ofOptional( + 4, + Types.StructType.of( + required(5, "element_str", Types.StringType.get()), + Types.NestedField.optional("element_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()))) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } }