diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java index 1cc901d15bc1..dbb1df055035 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -36,6 +36,10 @@ import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +/** + * @deprecated will be removed in 2.0.0; use {@link PlannedDataReader} instead. + */ +@Deprecated public class DataReader implements DatumReader, SupportsRowPosition { public static DataReader create( diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index 91a728d53d38..b07ab5d18681 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -32,6 +32,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.DateTimeUtil; +import org.apache.iceberg.util.Pair; class GenericReaders { private GenericReaders() {} @@ -52,6 +53,11 @@ static ValueReader timestamptz() { return TimestamptzReader.INSTANCE; } + static ValueReader struct( + List>> readPlan, StructType struct) { + return new PlannedRecordReader(readPlan, struct); + } + static ValueReader struct( StructType struct, List> readers, Map idToConstant) { return new GenericRecordReader(readers, struct, idToConstant); @@ -101,6 +107,34 @@ public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException { } } + private static class PlannedRecordReader extends ValueReaders.PlannedStructReader { + private final StructType structType; + + private PlannedRecordReader(List>> readPlan, StructType struct) { + super(readPlan); + this.structType = struct; + } + + @Override + protected Record reuseOrCreate(Object reuse) { + if (reuse instanceof Record) { + return (Record) reuse; + } else { + return GenericRecord.create(structType); + } + } + + @Override + protected Object get(Record struct, int pos) { + return struct.get(pos); + } + + @Override + protected void set(Record struct, int pos, Object value) { + struct.set(pos, value); + } + } + private static class GenericRecordReader extends ValueReaders.StructReader { private final StructType structType; diff --git a/core/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java b/core/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java index 89513b7e0bed..f21bae037103 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/IcebergDecoder.java @@ -103,9 +103,7 @@ public void addSchema(org.apache.iceberg.Schema writeSchema) { private void addSchema(Schema writeSchema) { long fp = SchemaNormalization.parsingFingerprint64(writeSchema); - RawDecoder decoder = - new RawDecoder<>( - readSchema, avroSchema -> DataReader.create(readSchema, avroSchema), writeSchema); + RawDecoder decoder = RawDecoder.create(readSchema, PlannedDataReader::create, writeSchema); decoders.put(fp, decoder); } diff --git a/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java new file mode 100644 index 000000000000..c7ec2e6091cc --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.avro; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import org.apache.avro.LogicalType; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.AvroWithPartnerVisitor; +import org.apache.iceberg.avro.SupportsRowPosition; +import org.apache.iceberg.avro.ValueReader; +import org.apache.iceberg.avro.ValueReaders; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; + +public class PlannedDataReader implements DatumReader, SupportsRowPosition { + + public static PlannedDataReader create(org.apache.iceberg.Schema expectedSchema) { + return create(expectedSchema, ImmutableMap.of()); + } + + public static PlannedDataReader create( + org.apache.iceberg.Schema expectedSchema, Map idToConstant) { + return new PlannedDataReader<>(expectedSchema, idToConstant); + } + + private final org.apache.iceberg.Schema expectedSchema; + private final Map idToConstant; + private ValueReader reader; + + protected PlannedDataReader( + org.apache.iceberg.Schema expectedSchema, Map idToConstant) { + this.expectedSchema = expectedSchema; + this.idToConstant = idToConstant; + } + + @Override + @SuppressWarnings("unchecked") + public void setSchema(Schema fileSchema) { + this.reader = + (ValueReader) + AvroWithPartnerVisitor.visit( + expectedSchema.asStruct(), + fileSchema, + new ReadBuilder(idToConstant), + AvroWithPartnerVisitor.FieldIDAccessors.get()); + } + + @Override + public T read(T reuse, Decoder decoder) throws IOException { + return reader.read(decoder, reuse); + } + + @Override + public void setRowPositionSupplier(Supplier posSupplier) { + if (reader instanceof SupportsRowPosition) { + ((SupportsRowPosition) reader).setRowPositionSupplier(posSupplier); + } + } + + private static class ReadBuilder extends AvroWithPartnerVisitor> { + private final Map idToConstant; + + private ReadBuilder(Map idToConstant) { + this.idToConstant = idToConstant; + } + + @Override + public ValueReader record(Type partner, Schema record, List> fieldReaders) { + if (partner == null) { + return ValueReaders.skipStruct(fieldReaders); + } + + Types.StructType expected = partner.asStructType(); + List>> readPlan = + ValueReaders.buildReadPlan(expected, record, fieldReaders, idToConstant); + + return GenericReaders.struct(readPlan, expected); + } + + @Override + public ValueReader union(Type partner, Schema union, List> options) { + return ValueReaders.union(options); + } + + @Override + public ValueReader array(Type ignored, Schema array, ValueReader elementReader) { + return ValueReaders.array(elementReader); + } + + @Override + public ValueReader arrayMap( + Type ignored, Schema map, ValueReader keyReader, ValueReader valueReader) { + return ValueReaders.arrayMap(keyReader, valueReader); + } + + @Override + public ValueReader map(Type ignored, Schema map, ValueReader valueReader) { + return ValueReaders.map(ValueReaders.strings(), valueReader); + } + + @Override + public ValueReader primitive(Type partner, Schema primitive) { + LogicalType logicalType = primitive.getLogicalType(); + if (logicalType != null) { + switch (logicalType.getName()) { + case "date": + return GenericReaders.dates(); + + case "time-micros": + return GenericReaders.times(); + + case "timestamp-micros": + if (AvroSchemaUtil.isTimestamptz(primitive)) { + return GenericReaders.timestamptz(); + } + return GenericReaders.timestamps(); + + case "decimal": + return ValueReaders.decimal( + ValueReaders.decimalBytesReader(primitive), + ((LogicalTypes.Decimal) logicalType).getScale()); + + case "uuid": + return ValueReaders.uuids(); + + default: + throw new IllegalArgumentException("Unknown logical type: " + logicalType); + } + } + + switch (primitive.getType()) { + case NULL: + return ValueReaders.nulls(); + case BOOLEAN: + return ValueReaders.booleans(); + case INT: + if (partner != null && partner.typeId() == Type.TypeID.LONG) { + return ValueReaders.intsAsLongs(); + } + return ValueReaders.ints(); + case LONG: + return ValueReaders.longs(); + case FLOAT: + if (partner != null && partner.typeId() == Type.TypeID.DOUBLE) { + return ValueReaders.floatsAsDoubles(); + } + return ValueReaders.floats(); + case DOUBLE: + return ValueReaders.doubles(); + case STRING: + // might want to use a binary-backed container like Utf8 + return ValueReaders.strings(); + case FIXED: + return ValueReaders.fixed(primitive.getFixedSize()); + case BYTES: + return ValueReaders.byteBuffers(); + default: + throw new IllegalArgumentException("Unsupported type: " + primitive); + } + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/data/avro/RawDecoder.java b/core/src/main/java/org/apache/iceberg/data/avro/RawDecoder.java index c32ea707bfab..436cba05c73a 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/RawDecoder.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/RawDecoder.java @@ -33,6 +33,27 @@ public class RawDecoder extends MessageDecoder.BaseDecoder { private static final ThreadLocal DECODER = new ThreadLocal<>(); + /** + * Creates a new {@link MessageDecoder} that constructs datum instances described by the {@link + * Schema readSchema}. + * + *

The {@code readSchema} is used for the expected schema and the {@code writeSchema} is the + * schema used to decode buffers. The {@code writeSchema} must be the schema that was used to + * encode all buffers decoded by this class. + * + * @param readSchema an Iceberg schema to produce when reading + * @param readerFunction a function that produces a DatumReader from the read schema + * @param writeSchema an Avro schema that describes serialized data to be read + */ + public static RawDecoder create( + org.apache.iceberg.Schema readSchema, + Function> readerFunction, + Schema writeSchema) { + DatumReader reader = readerFunction.apply(readSchema); + reader.setSchema(writeSchema); + return new RawDecoder<>(reader); + } + private final DatumReader reader; /** @@ -42,7 +63,11 @@ public class RawDecoder extends MessageDecoder.BaseDecoder { *

The {@code readSchema} is used for the expected schema and the {@code writeSchema} is the * schema used to decode buffers. The {@code writeSchema} must be the schema that was used to * encode all buffers decoded by this class. + * + * @deprecated will be removed in 2.0.0; use {@link #create(org.apache.iceberg.Schema, Function, + * Schema)} instead */ + @Deprecated public RawDecoder( org.apache.iceberg.Schema readSchema, Function> readerFunction, @@ -51,6 +76,13 @@ public RawDecoder( this.reader.setSchema(writeSchema); } + /** + * Creates a new {@link MessageDecoder} that constructs datum instances using the {@code reader}. + */ + private RawDecoder(DatumReader reader) { + this.reader = reader; + } + @Override public D decode(InputStream stream, D reuse) { BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(stream, DECODER.get()); diff --git a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java index 7e57163d73ea..a09951728173 100644 --- a/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java +++ b/core/src/main/java/org/apache/iceberg/encryption/KeyMetadataDecoder.java @@ -66,7 +66,7 @@ public StandardKeyMetadata decode(InputStream stream, StandardKeyMetadata reuse) RawDecoder decoder = decoders.get(writeSchemaVersion); if (decoder == null) { - decoder = new RawDecoder<>(readSchema, GenericAvroReader::create, writeSchema); + decoder = RawDecoder.create(readSchema, GenericAvroReader::create, writeSchema); decoders.put(writeSchemaVersion, decoder); } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroDataWriter.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroDataWriter.java index 62f736e2c517..a9b9e7a6a74b 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroDataWriter.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroDataWriter.java @@ -32,7 +32,7 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -103,7 +103,7 @@ public void testDataWriter() throws IOException { try (AvroIterable reader = Avro.read(file.toInputFile()) .project(SCHEMA) - .createReaderFunc(DataReader::create) + .createResolvingReader(PlannedDataReader::create) .build()) { writtenRecords = Lists.newArrayList(reader); } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java index 504ef7aad5b3..86bb74c5a397 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroDeleteWriters.java @@ -33,8 +33,8 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDelete; import org.apache.iceberg.deletes.PositionDeleteWriter; @@ -102,7 +102,10 @@ public void testEqualityDeleteWriter() throws IOException { List deletedRecords; try (AvroIterable reader = - Avro.read(out.toInputFile()).project(SCHEMA).createReaderFunc(DataReader::create).build()) { + Avro.read(out.toInputFile()) + .project(SCHEMA) + .createResolvingReader(PlannedDataReader::create) + .build()) { deletedRecords = Lists.newArrayList(reader); } @@ -158,7 +161,7 @@ public void testPositionDeleteWriter() throws IOException { try (AvroIterable reader = Avro.read(out.toInputFile()) .project(deleteSchema) - .createReaderFunc(DataReader::create) + .createResolvingReader(PlannedDataReader::create) .build()) { deletedRecords = Lists.newArrayList(reader); } @@ -212,7 +215,7 @@ public void testPositionDeleteWriterWithEmptyRow() throws IOException { try (AvroIterable reader = Avro.read(out.toInputFile()) .project(deleteSchema) - .createReaderFunc(DataReader::create) + .createResolvingReader(PlannedDataReader::create) .build()) { deletedRecords = Lists.newArrayList(reader); } diff --git a/core/src/test/java/org/apache/iceberg/avro/TestAvroFileSplit.java b/core/src/test/java/org/apache/iceberg/avro/TestAvroFileSplit.java index 12b1326fc1be..fac537666854 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestAvroFileSplit.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestAvroFileSplit.java @@ -30,8 +30,8 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.io.OutputFile; @@ -186,7 +186,7 @@ public List readAvro(InputFile in, Schema projection, long start, long l throws IOException { try (AvroIterable reader = Avro.read(in) - .createReaderFunc(DataReader::create) + .createResolvingReader(PlannedDataReader::create) .split(start, length) .project(projection) .build()) { diff --git a/core/src/test/java/org/apache/iceberg/avro/TestEncryptedAvroFileSplit.java b/core/src/test/java/org/apache/iceberg/avro/TestEncryptedAvroFileSplit.java index 9020a1230271..efb5de3e96a7 100644 --- a/core/src/test/java/org/apache/iceberg/avro/TestEncryptedAvroFileSplit.java +++ b/core/src/test/java/org/apache/iceberg/avro/TestEncryptedAvroFileSplit.java @@ -30,8 +30,8 @@ import org.apache.iceberg.TableProperties; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.encryption.EncryptedFiles; import org.apache.iceberg.encryption.EncryptedInputFile; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -199,7 +199,7 @@ public List readAvro(InputFile in, Schema projection, long start, long l throws IOException { try (AvroIterable reader = Avro.read(in) - .createReaderFunc(DataReader::create) + .createResolvingReader(PlannedDataReader::create) .split(start, length) .project(projection) .build()) { diff --git a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java index 1b7a92f0682b..d0c50a614620 100644 --- a/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java +++ b/data/src/main/java/org/apache/iceberg/data/BaseDeleteLoader.java @@ -31,7 +31,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.Deletes; @@ -235,7 +235,7 @@ private CloseableIterable openDeletes( return Avro.read(inputFile) .project(projection) .reuseContainers() - .createReaderFunc(DataReader::create) + .createResolvingReader(PlannedDataReader::create) .build(); case PARQUET: diff --git a/data/src/main/java/org/apache/iceberg/data/GenericReader.java b/data/src/main/java/org/apache/iceberg/data/GenericReader.java index 590b01b228ed..aaf4b76ca851 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericReader.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericReader.java @@ -26,7 +26,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.TableScan; import org.apache.iceberg.avro.Avro; -import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.expressions.Evaluator; @@ -101,8 +101,7 @@ private CloseableIterable openFile(FileScanTask task, Schema fileProject Avro.ReadBuilder avro = Avro.read(input) .project(fileProjection) - .createReaderFunc( - avroSchema -> DataReader.create(fileProjection, avroSchema, partition)) + .createResolvingReader(schema -> PlannedDataReader.create(schema, partition)) .split(task.start(), task.length()); if (reuseContainers) { diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java index 83e8c09449e4..651df22cfc15 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericData.java @@ -18,7 +18,9 @@ */ package org.apache.iceberg.data.avro; +import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.io.File; import java.io.IOException; @@ -33,18 +35,24 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; public class TestGenericData extends DataTest { @Override protected void writeAndValidate(Schema schema) throws IOException { - List expected = RandomGenericData.generate(schema, 100, 0L); + writeAndValidate(schema, schema); + } + + protected void writeAndValidate(Schema writeSchema, Schema expectedSchema) throws IOException { + List expected = RandomGenericData.generate(writeSchema, 100, 0L); File testFile = File.createTempFile("junit", null, temp.toFile()); assertThat(testFile.delete()).isTrue(); try (FileAppender writer = Avro.write(Files.localOutput(testFile)) - .schema(schema) + .schema(writeSchema) .createWriterFunc(DataWriter::create) .named("test") .build()) { @@ -56,14 +64,230 @@ protected void writeAndValidate(Schema schema) throws IOException { List rows; try (AvroIterable reader = Avro.read(Files.localInput(testFile)) - .project(schema) - .createReaderFunc(DataReader::create) + .project(expectedSchema) + .createResolvingReader(PlannedDataReader::create) .build()) { rows = Lists.newArrayList(reader); } for (int i = 0; i < expected.size(); i += 1) { - DataTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i)); + DataTestHelpers.assertEquals(expectedSchema.asStruct(), expected.get(i), rows.get(i)); } } + + @Test + public void testMissingRequiredWithoutDefault() { + Schema writeSchema = new Schema(required(1, "id", Types.LongType.get())); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.required("missing_str") + .withId(6) + .ofType(Types.StringType.get()) + .withDoc("Missing required field with no default") + .build()); + + assertThatThrownBy(() -> writeAndValidate(writeSchema, expectedSchema)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Missing required field: missing_str"); + } + + @Test + public void testDefaultValues() throws IOException { + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.required("missing_str") + .withId(6) + .ofType(Types.StringType.get()) + .withInitialDefault("orange") + .build(), + Types.NestedField.optional("missing_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testNullDefaultValue() throws IOException { + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("missing_date") + .withId(3) + .ofType(Types.DateType.get()) + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testNestedDefaultValue() throws IOException { + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + Types.NestedField.optional("nested") + .withId(3) + .ofType(Types.StructType.of(required(4, "inner", Types.StringType.get()))) + .withDoc("Used to test nested field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("nested") + .withId(3) + .ofType( + Types.StructType.of( + required(4, "inner", Types.StringType.get()), + Types.NestedField.optional("missing_inner_float") + .withId(5) + .ofType(Types.FloatType.get()) + .withInitialDefault(-0.0F) + .build())) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testMapNestedDefaultValue() throws IOException { + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + Types.NestedField.optional("nested_map") + .withId(3) + .ofType( + Types.MapType.ofOptional( + 4, + 5, + Types.StringType.get(), + Types.StructType.of(required(6, "value_str", Types.StringType.get())))) + .withDoc("Used to test nested map value field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("nested_map") + .withId(3) + .ofType( + Types.MapType.ofOptional( + 4, + 5, + Types.StringType.get(), + Types.StructType.of( + required(6, "value_str", Types.StringType.get()), + Types.NestedField.optional("value_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()))) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } + + @Test + public void testListNestedDefaultValue() throws IOException { + Schema writeSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .withDoc("Should not produce default value") + .build(), + Types.NestedField.optional("nested_list") + .withId(3) + .ofType( + Types.ListType.ofOptional( + 4, Types.StructType.of(required(5, "element_str", Types.StringType.get())))) + .withDoc("Used to test nested field defaults") + .build()); + + Schema expectedSchema = + new Schema( + required(1, "id", Types.LongType.get()), + Types.NestedField.optional("data") + .withId(2) + .ofType(Types.StringType.get()) + .withInitialDefault("wrong!") + .build(), + Types.NestedField.optional("nested_list") + .withId(3) + .ofType( + Types.ListType.ofOptional( + 4, + Types.StructType.of( + required(5, "element_str", Types.StringType.get()), + Types.NestedField.optional("element_int") + .withId(7) + .ofType(Types.IntegerType.get()) + .withInitialDefault(34) + .build()))) + .withDoc("Used to test nested field defaults") + .build()); + + writeAndValidate(writeSchema, expectedSchema); + } } diff --git a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericReadProjection.java b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericReadProjection.java index b6083906c74b..776dd1466ab7 100644 --- a/data/src/test/java/org/apache/iceberg/data/avro/TestGenericReadProjection.java +++ b/data/src/test/java/org/apache/iceberg/data/avro/TestGenericReadProjection.java @@ -48,7 +48,7 @@ protected Record writeAndRead(String desc, Schema writeSchema, Schema readSchema Iterable records = Avro.read(Files.localInput(file)) .project(readSchema) - .createReaderFunc(DataReader::create) + .createResolvingReader(PlannedDataReader::create) .build(); return Iterables.getOnlyElement(records); diff --git a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java index 83f1bf261063..4d3e6a8ac9c6 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java +++ b/data/src/test/java/org/apache/iceberg/io/TestAppenderFactory.java @@ -38,7 +38,7 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.EqualityDeleteWriter; @@ -337,7 +337,10 @@ private CloseableIterable createReader(Schema schema, InputFile inputFil .build(); case AVRO: - return Avro.read(inputFile).project(schema).createReaderFunc(DataReader::create).build(); + return Avro.read(inputFile) + .project(schema) + .createResolvingReader(PlannedDataReader::create) + .build(); case ORC: return ORC.read(inputFile) diff --git a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java index 0acb173f0923..ab1d295125f2 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java +++ b/data/src/test/java/org/apache/iceberg/io/TestFileWriterFactory.java @@ -41,7 +41,7 @@ import org.apache.iceberg.avro.Avro; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.EqualityDeleteWriter; @@ -476,7 +476,10 @@ private List readFile(Schema schema, InputFile inputFile) throws IOExcep case AVRO: try (CloseableIterable records = - Avro.read(inputFile).project(schema).createReaderFunc(DataReader::create).build()) { + Avro.read(inputFile) + .project(schema) + .createResolvingReader(PlannedDataReader::create) + .build()) { return ImmutableList.copyOf(records); } diff --git a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java index 629df03e37cd..e7f9d90f0bb2 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestGenericSortedPosDeleteWriter.java @@ -40,7 +40,7 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.encryption.EncryptedOutputFile; @@ -314,7 +314,10 @@ private List readRecordsAsList(Schema schema, CharSequence path) throws case AVRO: iterable = - Avro.read(inputFile).project(schema).createReaderFunc(DataReader::create).build(); + Avro.read(inputFile) + .project(schema) + .createResolvingReader(PlannedDataReader::create) + .build(); break; case ORC: diff --git a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java index b1688e6653f2..71c112918e38 100644 --- a/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java +++ b/data/src/test/java/org/apache/iceberg/io/TestTaskEqualityDeltaWriter.java @@ -45,7 +45,7 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.deletes.DeleteGranularity; @@ -610,7 +610,10 @@ private List readRecordsAsList(Schema schema, CharSequence path) throws case AVRO: iterable = - Avro.read(inputFile).project(schema).createReaderFunc(DataReader::create).build(); + Avro.read(inputFile) + .project(schema) + .createResolvingReader(PlannedDataReader::create) + .build(); break; case ORC: diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java index cbf49ae6faa9..4a59dcfd1e09 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/data/AbstractTestFlinkAvroReaderWriter.java @@ -36,8 +36,8 @@ import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.RandomGenericData; import org.apache.iceberg.data.Record; -import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.TestHelpers; import org.apache.iceberg.io.CloseableIterable; @@ -116,7 +116,7 @@ private void writeAndValidate(Schema schema, List expectedRecords, int n try (CloseableIterable reader = Avro.read(Files.localInput(rowDataFile)) .project(schema) - .createReaderFunc(DataReader::create) + .createResolvingReader(PlannedDataReader::create) .build()) { Iterator expected = expectedRows.iterator(); Iterator records = reader.iterator(); diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java index 492729d97338..bb39644e42c1 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java @@ -55,7 +55,7 @@ import org.apache.iceberg.data.GenericDeleteFilter; import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.InternalRecordWrapper; -import org.apache.iceberg.data.avro.DataReader; +import org.apache.iceberg.data.avro.PlannedDataReader; import org.apache.iceberg.data.orc.GenericOrcReader; import org.apache.iceberg.data.parquet.GenericParquetReaders; import org.apache.iceberg.encryption.EncryptedFiles; @@ -389,12 +389,10 @@ private CloseableIterable newAvroIterable( throw new UnsupportedOperationException( "Avro support not yet supported for Pig and Hive"); case GENERIC: - avroReadBuilder.createReaderFunc( - (expIcebergSchema, expAvroSchema) -> - DataReader.create( - expIcebergSchema, - expAvroSchema, - constantsMap(task, IdentityPartitionConverters::convertConstant))); + avroReadBuilder.createResolvingReader( + schema -> + PlannedDataReader.create( + schema, constantsMap(task, IdentityPartitionConverters::convertConstant))); } return applyResidualFiltering(avroReadBuilder.build(), task.residual(), readSchema); }