Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Implement defaults for generic data #11785

Merged
merged 2 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions data/src/test/java/org/apache/iceberg/data/DataTestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ public class DataTestHelpers {
private DataTestHelpers() {}

public static void assertEquals(Types.StructType struct, Record expected, Record actual) {
List<Types.NestedField> fields = struct.fields();
for (int i = 0; i < fields.size(); i += 1) {
Type fieldType = fields.get(i).type();

Object expectedValue = expected.get(i);
Object actualValue = actual.get(i);

assertEquals(fieldType, expectedValue, actualValue);
Types.StructType expectedType = expected.struct();
for (Types.NestedField field : struct.fields()) {
Types.NestedField expectedField = expectedType.field(field.fieldId());
if (expectedField != null) {
assertEquals(
field.type(), expected.getField(expectedField.name()), actual.getField(field.name()));
} else {
assertThat(actual.getField(field.name())).isEqualTo(field.initialDefault());
}
}
}

Expand Down
256 changes: 236 additions & 20 deletions data/src/test/java/org/apache/iceberg/data/parquet/TestGenericData.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
package org.apache.iceberg.data.parquet;

import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.File;
import java.io.IOException;
Expand All @@ -34,30 +36,33 @@
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.data.DataTest;
import org.apache.iceberg.data.DataTestHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.junit.jupiter.api.Test;

public class TestGenericData extends DataTest {
@Override
protected void writeAndValidate(Schema schema) throws IOException {
List<Record> expected = RandomGenericData.generate(schema, 100, 0L);
writeAndValidate(schema, schema);
}

protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException {
List<Record> expected = RandomGenericData.generate(writeSchema, 100, 12228L);

File testFile = File.createTempFile("junit", null, temp.toFile());
assertThat(testFile.delete()).isTrue();

try (FileAppender<Record> appender =
Parquet.write(Files.localOutput(testFile))
.schema(schema)
.schema(writeSchema)
.createWriterFunc(GenericParquetWriter::buildWriter)
.build()) {
appender.addAll(expected);
Expand All @@ -66,29 +71,29 @@ protected void writeAndValidate(Schema schema) throws IOException {
List<Record> rows;
try (CloseableIterable<Record> reader =
Parquet.read(Files.localInput(testFile))
.project(schema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.project(expectedSchema)
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(expectedSchema, fileSchema))
.build()) {
rows = Lists.newArrayList(reader);
}

for (int i = 0; i < expected.size(); i += 1) {
DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i));
DataTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(i), rows.get(i));
}

// test reuseContainers
try (CloseableIterable<Record> reader =
Parquet.read(Files.localInput(testFile))
.project(schema)
.project(expectedSchema)
.reuseContainers()
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(expectedSchema, fileSchema))
.build()) {
CloseableIterator it = reader.iterator();
int idx = 0;
while (it.hasNext()) {
GenericRecord actualRecord = (GenericRecord) it.next();
DataTestHelpers.assertEquals(schema.asStruct(), expected.get(idx), actualRecord);
idx++;
int index = 0;
for (Record actualRecord : reader) {
DataTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(index), actualRecord);
index += 1;
}
}
}
Expand Down Expand Up @@ -131,14 +136,225 @@ public void testTwoLevelList() throws IOException {
.reuseContainers()
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build()) {
CloseableIterator it = reader.iterator();
assertThat(it).hasNext();
while (it.hasNext()) {
GenericRecord actualRecord = (GenericRecord) it.next();
for (Record actualRecord : reader) {
assertThat(actualRecord.get(0, ArrayList.class)).first().isEqualTo(expectedBinary);
assertThat(actualRecord.get(1, ByteBuffer.class)).isEqualTo(expectedBinary);
assertThat(it).isExhausted();
}

assertThat(Lists.newArrayList(reader).size()).isEqualTo(1);
}
}

@Test
public void testMissingRequiredWithoutDefault() {
Schema writeSchema = new Schema(required(1, "id", Types.LongType.get()));

Schema expectedSchema =
new Schema(
required(1, "id", Types.LongType.get()),
NestedField.required("missing_str")
.withId(6)
.ofType(Types.StringType.get())
.withDoc("Missing required field with no default")
.build());

assertThatThrownBy(() -> writeAndValidate(writeSchema, expectedSchema))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Missing required field: missing_str");
}

@Test
public void testDefaultValues() throws IOException {
Schema writeSchema =
new Schema(
required(1, "id", Types.LongType.get()),
NestedField.optional("data")
.withId(2)
.ofType(Types.StringType.get())
.withInitialDefault("wrong!")
.withDoc("Should not produce default value")
.build());

Schema expectedSchema =
new Schema(
required(1, "id", Types.LongType.get()),
NestedField.optional("data")
.withId(2)
.ofType(Types.StringType.get())
.withInitialDefault("wrong!")
.build(),
NestedField.required("missing_str")
.withId(6)
.ofType(Types.StringType.get())
.withInitialDefault("orange")
.build(),
NestedField.optional("missing_int")
.withId(7)
.ofType(Types.IntegerType.get())
.withInitialDefault(34)
.build());

writeAndValidate(writeSchema, expectedSchema);
}

@Test
public void testNullDefaultValue() throws IOException {
Schema writeSchema =
new Schema(
required(1, "id", Types.LongType.get()),
NestedField.optional("data")
.withId(2)
.ofType(Types.StringType.get())
.withInitialDefault("wrong!")
.withDoc("Should not produce default value")
.build());

Schema expectedSchema =
new Schema(
required(1, "id", Types.LongType.get()),
NestedField.optional("data")
.withId(2)
.ofType(Types.StringType.get())
.withInitialDefault("wrong!")
.build(),
NestedField.optional("missing_date").withId(3).ofType(Types.DateType.get()).build());

writeAndValidate(writeSchema, expectedSchema);
}

@Test
public void testNestedDefaultValue() throws IOException {
Schema writeSchema =
new Schema(
required(1, "id", Types.LongType.get()),
NestedField.optional("data")
.withId(2)
.ofType(Types.StringType.get())
.withInitialDefault("wrong!")
.withDoc("Should not produce default value")
.build(),
NestedField.optional("nested")
.withId(3)
.ofType(Types.StructType.of(required(4, "inner", Types.StringType.get())))
.withDoc("Used to test nested field defaults")
.build());

Schema expectedSchema =
new Schema(
required(1, "id", Types.LongType.get()),
NestedField.optional("data")
.withId(2)
.ofType(Types.StringType.get())
.withInitialDefault("wrong!")
.build(),
NestedField.optional("nested")
.withId(3)
.ofType(
Types.StructType.of(
required(4, "inner", Types.StringType.get()),
NestedField.optional("missing_inner_float")
.withId(5)
.ofType(Types.FloatType.get())
.withInitialDefault(-0.0F)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an example with another complex default type (map, list)?

Copy link
Contributor

@emkornfield emkornfield Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition to this, I'm not sure if this is the right place to test it but it would be good to make sure there test coverage that covers. The following spec requirements:

  • When a new field is added to a struct with a default value, updating the struct's default is optional
  • If a field value is missing from a struct's initial-default, the field's initial-default must be used for the field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good call out. I'll add those cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@emkornfield, right now we have only added support for primitive field defaults so those cases aren't being addressed yet.

.build()))
.withDoc("Used to test nested field defaults")
.build());

writeAndValidate(writeSchema, expectedSchema);
}

@Test
public void testMapNestedDefaultValue() throws IOException {
Schema writeSchema =
new Schema(
required(1, "id", Types.LongType.get()),
NestedField.optional("data")
.withId(2)
.ofType(Types.StringType.get())
.withInitialDefault("wrong!")
.withDoc("Should not produce default value")
.build(),
NestedField.optional("nested_map")
.withId(3)
.ofType(
Types.MapType.ofOptional(
4,
5,
Types.StringType.get(),
Types.StructType.of(required(6, "value_str", Types.StringType.get()))))
.withDoc("Used to test nested map value field defaults")
.build());

Schema expectedSchema =
new Schema(
required(1, "id", Types.LongType.get()),
NestedField.optional("data")
.withId(2)
.ofType(Types.StringType.get())
.withInitialDefault("wrong!")
.build(),
NestedField.optional("nested_map")
.withId(3)
.ofType(
Types.MapType.ofOptional(
4,
5,
Types.StringType.get(),
Types.StructType.of(
required(6, "value_str", Types.StringType.get()),
Types.NestedField.optional("value_int")
.withId(7)
.ofType(Types.IntegerType.get())
.withInitialDefault(34)
.build())))
.withDoc("Used to test nested field defaults")
.build());

writeAndValidate(writeSchema, expectedSchema);
}

@Test
public void testListNestedDefaultValue() throws IOException {
Schema writeSchema =
new Schema(
required(1, "id", Types.LongType.get()),
NestedField.optional("data")
.withId(2)
.ofType(Types.StringType.get())
.withInitialDefault("wrong!")
.withDoc("Should not produce default value")
.build(),
NestedField.optional("nested_list")
.withId(3)
.ofType(
Types.ListType.ofOptional(
4, Types.StructType.of(required(5, "element_str", Types.StringType.get()))))
.withDoc("Used to test nested field defaults")
.build());

Schema expectedSchema =
new Schema(
required(1, "id", Types.LongType.get()),
NestedField.optional("data")
.withId(2)
.ofType(Types.StringType.get())
.withInitialDefault("wrong!")
.build(),
NestedField.optional("nested_list")
.withId(3)
.ofType(
Types.ListType.ofOptional(
4,
Types.StructType.of(
required(5, "element_str", Types.StringType.get()),
Types.NestedField.optional("element_int")
.withId(7)
.ofType(Types.IntegerType.get())
.withInitialDefault(34)
.build())))
.withDoc("Used to test nested field defaults")
.build());

writeAndValidate(writeSchema, expectedSchema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ public ParquetValueReader<?> struct(
int defaultMaxDefinitionLevel = type.getMaxDefinitionLevel(currentPath());
for (Types.NestedField field : expectedFields) {
int id = field.fieldId();
ParquetValueReader<?> reader = readersById.get(id);
if (idToConstant.containsKey(id)) {
// containsKey is used because the constant may be null
int fieldMaxDefinitionLevel =
Expand All @@ -276,15 +277,21 @@ public ParquetValueReader<?> struct(
} else if (id == MetadataColumns.IS_DELETED.fieldId()) {
reorderedFields.add(ParquetValueReaders.constant(false));
types.add(null);
} else if (reader != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really miss Python's := operator here :)

reorderedFields.add(reader);
types.add(typesById.get(id));
} else if (field.initialDefault() != null) {
reorderedFields.add(
ParquetValueReaders.constant(
field.initialDefault(),
maxDefinitionLevelsById.getOrDefault(id, defaultMaxDefinitionLevel)));
types.add(typesById.get(id));
} else if (field.isOptional()) {
reorderedFields.add(ParquetValueReaders.nulls());
types.add(null);
} else {
ParquetValueReader<?> reader = readersById.get(id);
if (reader != null) {
reorderedFields.add(reader);
types.add(typesById.get(id));
} else {
reorderedFields.add(ParquetValueReaders.nulls());
types.add(null);
}
throw new IllegalArgumentException(
String.format("Missing required field: %s", field.name()));
}
}

Expand Down