Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: Add readers and writers for the internal object model #11904

Merged
merged 9 commits into from
Jan 24, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions api/src/test/java/org/apache/iceberg/util/RandomUtil.java
Original file line number Diff line number Diff line change
@@ -237,7 +237,7 @@ private static BigInteger randomUnscaled(int precision, Random random) {
}

public static List<Object> generateList(
Random random, Types.ListType list, Supplier<Object> elementResult) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressing the nit from #11919

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about elementSupplier instead? I think that's more accurate than elements for two reasons. First, the argument is not an element, it is a function that produces an element. Second, there is only one supplier so the name shouldn't be plural.

Random random, Types.ListType list, Supplier<Object> elementSupplier) {
int numElements = random.nextInt(20);

List<Object> result = Lists.newArrayListWithExpectedSize(numElements);
@@ -246,23 +246,26 @@ public static List<Object> generateList(
if (list.isElementOptional() && random.nextInt(20) == 1) {
result.add(null);
} else {
result.add(elementResult.get());
result.add(elementSupplier.get());
}
}

return result;
}

public static Map<Object, Object> generateMap(
Random random, Types.MapType map, Supplier<Object> keyResult, Supplier<Object> valueResult) {
Random random,
Types.MapType map,
Supplier<Object> keySupplier,
Supplier<Object> valueSupplier) {
int numEntries = random.nextInt(20);

Map<Object, Object> result = Maps.newLinkedHashMap();
Supplier<Object> keyFunc;
if (map.keyType() == Types.StringType.get()) {
keyFunc = () -> keyResult.get().toString();
keyFunc = () -> keySupplier.get().toString();
} else {
keyFunc = keyResult;
keyFunc = keySupplier;
}

Set<Object> keySet = Sets.newHashSet();
@@ -279,7 +282,7 @@ public static Map<Object, Object> generateMap(
if (map.isValueOptional() && random.nextInt(20) == 1) {
result.put(key, null);
} else {
result.put(key, valueResult.get());
result.put(key, valueSupplier.get());
}
}

Original file line number Diff line number Diff line change
@@ -583,7 +583,7 @@ private void checkColumnarBatch(
columnSet,
"uuid",
(records, i) -> records.get(i).getField("uuid"),
ColumnVector::getBinary);
(array, i) -> UUIDUtil.convert(array.getBinary(i)));

checkColumnarArrayValues(
expectedNumRows,
@@ -593,7 +593,7 @@ private void checkColumnarBatch(
columnSet,
"uuid_nullable",
(records, i) -> records.get(i).getField("uuid_nullable"),
ColumnVector::getBinary);
(array, i) -> UUIDUtil.convert(array.getBinary(i)));

checkColumnarArrayValues(
expectedNumRows,
@@ -820,8 +820,7 @@ private List<GenericRecord> createIncrementalRecordsForDate(
rec.setField("int_promotion", i);
rec.setField("time", LocalTime.of(11, i));
rec.setField("time_nullable", LocalTime.of(11, i));
ByteBuffer bb = UUIDUtil.convertToByteBuffer(UUID.randomUUID());
byte[] uuid = bb.array();
UUID uuid = UUID.randomUUID();
rec.setField("uuid", uuid);
rec.setField("uuid_nullable", uuid);
rec.setField("decimal", new BigDecimal("14.0" + i % 10));
@@ -858,9 +857,7 @@ private List<GenericRecord> createConstantRecordsForDate(Schema schema, LocalDat
rec.setField("int_promotion", 1);
rec.setField("time", LocalTime.of(11, 30));
rec.setField("time_nullable", LocalTime.of(11, 30));
ByteBuffer bb =
UUIDUtil.convertToByteBuffer(UUID.fromString("abcd91cf-08d0-4223-b145-f64030b3077f"));
byte[] uuid = bb.array();
UUID uuid = UUID.fromString("abcd91cf-08d0-4223-b145-f64030b3077f");
rec.setField("uuid", uuid);
rec.setField("uuid_nullable", uuid);
rec.setField("decimal", new BigDecimal("14.20"));
@@ -1140,7 +1137,7 @@ private void checkAllVectorValues(
columnSet,
"uuid",
(records, i) -> records.get(i).getField("uuid"),
(vector, i) -> ((FixedSizeBinaryVector) vector).get(i));
(vector, i) -> UUIDUtil.convert(((FixedSizeBinaryVector) vector).get(i)));

checkVectorValues(
expectedNumRows,
@@ -1149,7 +1146,7 @@ private void checkAllVectorValues(
columnSet,
"uuid_nullable",
(records, i) -> records.get(i).getField("uuid_nullable"),
(vector, i) -> ((FixedSizeBinaryVector) vector).get(i));
(vector, i) -> UUIDUtil.convert(((FixedSizeBinaryVector) vector).get(i)));

checkVectorValues(
expectedNumRows,
Original file line number Diff line number Diff line change
@@ -18,19 +18,9 @@
*/
package org.apache.iceberg.data.parquet;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
@@ -50,6 +40,10 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

/**
* @deprecated since 1.8.0, will be made package-private in 1.9.0
*/
@Deprecated
public abstract class BaseParquetReaders<T> {
protected BaseParquetReaders() {}

@@ -76,6 +70,46 @@ protected ParquetValueReader<T> createReader(
protected abstract ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);

protected ParquetValueReader<?> fixedReader(ColumnDescriptor desc) {
return new GenericParquetReaders.FixedReader(desc);
}

protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
return new GenericParquetReaders.DateReader(desc);
}

protected ParquetValueReader<?> timeReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
switch (unit) {
case MICROS:
return new GenericParquetReaders.TimeReader(desc);
case MILLIS:
return new GenericParquetReaders.TimeMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported unit for time: " + unit);
}
}

protected ParquetValueReader<?> timestampReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) {
if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
return new GenericParquetReaders.TimestampInt96Reader(desc);
}

switch (unit) {
case MICROS:
return isAdjustedToUTC
? new GenericParquetReaders.TimestamptzReader(desc)
: new GenericParquetReaders.TimestampReader(desc);
case MILLIS:
return isAdjustedToUTC
? new GenericParquetReaders.TimestamptzMillisReader(desc)
: new GenericParquetReaders.TimestampMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit);
}
}

protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) {
return value;
}
@@ -164,37 +198,23 @@ public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decima
@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) {
return Optional.of(new DateReader(desc));
ajantha-bhat marked this conversation as resolved.
Show resolved Hide resolved
return Optional.of(dateReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) {
if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
return Optional.of(new TimeReader(desc));
} else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return Optional.of(new TimeMillisReader(desc));
}

return Optional.empty();
return Optional.of(timeReader(desc, timeLogicalType.getUnit()));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) {
if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) {
Types.TimestampType tsMicrosType = (Types.TimestampType) expected;
return tsMicrosType.shouldAdjustToUTC()
? Optional.of(new TimestamptzReader(desc))
: Optional.of(new TimestampReader(desc));
} else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) {
Types.TimestampType tsMillisType = (Types.TimestampType) expected;
return tsMillisType.shouldAdjustToUTC()
? Optional.of(new TimestamptzMillisReader(desc))
: Optional.of(new TimestampMillisReader(desc));
}

return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType);
return Optional.of(
timestampReader(
desc,
timestampLogicalType.getUnit(),
((Types.TimestampType) expected).shouldAdjustToUTC()));
}

@Override
@@ -219,6 +239,12 @@ public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) {
return Optional.of(new ParquetValueReaders.BytesReader(desc));
}

@Override
public Optional<ParquetValueReader<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
return Optional.of(ParquetValueReaders.uuids(desc));
}
}

private class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> {
@@ -359,7 +385,7 @@ public ParquetValueReader<?> primitive(

ColumnDescriptor desc = type.getColumnDescription(currentPath());

if (primitive.getOriginalType() != null) {
if (primitive.getLogicalTypeAnnotation() != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this change, but please point these kinds of changes out for reviewers.

The old version worked because all of the supported logical type annotations had an equivalent ConvertedType (which is what OriginalType is called in Parquet format and the logical type docs).

return primitive
.getLogicalTypeAnnotation()
.accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive))
@@ -371,7 +397,7 @@ public ParquetValueReader<?> primitive(

switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
return new FixedReader(desc);
return fixedReader(desc);
case BINARY:
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.STRING) {
return new ParquetValueReaders.StringReader(desc);
@@ -397,7 +423,7 @@ public ParquetValueReader<?> primitive(
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return new TimestampInt96Reader(desc);
return timestampReader(desc, LogicalTypeAnnotation.TimeUnit.NANOS, true);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
@@ -407,124 +433,4 @@ MessageType type() {
return type;
}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
private DateReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDate read(LocalDate reuse) {
return EPOCH_DAY.plusDays(column.nextInteger());
}
}

private static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampMillisReader
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampInt96Reader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

private TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN))
.plusNanos(timeOfDayNanos)
.atOffset(ZoneOffset.UTC);
}
}

private static class TimestamptzReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS);
}
}

private static class TimestamptzMillisReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS);
}
}

private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L);
}
}

private static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000L);
}
}

private static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
private FixedReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public byte[] read(byte[] reuse) {
if (reuse != null) {
column.nextBinary().toByteBuffer().duplicate().get(reuse);
return reuse;
} else {
return column.nextBinary().getBytes();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -18,13 +18,6 @@
*/
package org.apache.iceberg.data.parquet;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Optional;
import org.apache.iceberg.parquet.ParquetTypeVisitor;
@@ -33,13 +26,16 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

/**
* @deprecated since 1.8.0, will be made package-private in 1.9.0
*/
@Deprecated
public abstract class BaseParquetWriter<T> {

@SuppressWarnings("unchecked")
@@ -50,6 +46,26 @@ protected ParquetValueWriter<T> createWriter(MessageType type) {
protected abstract ParquetValueWriters.StructWriter<T> createStructWriter(
List<ParquetValueWriter<?>> writers);

protected ParquetValueWriter<?> fixedWriter(ColumnDescriptor desc) {
return new GenericParquetWriter.FixedWriter(desc);
}

protected ParquetValueWriter<?> dateWriter(ColumnDescriptor desc) {
return new GenericParquetWriter.DateWriter(desc);
}

protected ParquetValueWriter<?> timeWriter(ColumnDescriptor desc) {
return new GenericParquetWriter.TimeWriter(desc);
}

protected ParquetValueWriter<?> timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) {
if (isAdjustedToUTC) {
return new GenericParquetWriter.TimestamptzWriter(desc);
} else {
return new GenericParquetWriter.TimestampWriter(desc);
}
}

private class WriteBuilder extends ParquetTypeVisitor<ParquetValueWriter<?>> {
private final MessageType type;

@@ -119,7 +135,7 @@ public ParquetValueWriter<?> primitive(PrimitiveType primitive) {
ColumnDescriptor desc = type.getColumnDescription(currentPath());
LogicalTypeAnnotation logicalType = primitive.getLogicalTypeAnnotation();
if (logicalType != null) {
Optional<ParquetValueWriters.PrimitiveWriter<?>> writer =
Optional<ParquetValueWriter<?>> writer =
logicalType.accept(new LogicalTypeWriterVisitor(desc));
if (writer.isPresent()) {
return writer.get();
@@ -128,7 +144,7 @@ public ParquetValueWriter<?> primitive(PrimitiveType primitive) {

switch (primitive.getPrimitiveTypeName()) {
case FIXED_LEN_BYTE_ARRAY:
return new FixedWriter(desc);
return fixedWriter(desc);
case BINARY:
return ParquetValueWriters.byteBuffers(desc);
case BOOLEAN:
@@ -147,29 +163,28 @@ public ParquetValueWriter<?> primitive(PrimitiveType primitive) {
}
}

private static class LogicalTypeWriterVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<
ParquetValueWriters.PrimitiveWriter<?>> {
private class LogicalTypeWriterVisitor
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueWriter<?>> {
private final ColumnDescriptor desc;

private LogicalTypeWriterVisitor(ColumnDescriptor desc) {
this.desc = desc;
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.StringLogicalTypeAnnotation stringType) {
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumType) {
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType) {
switch (desc.getPrimitiveType().getPrimitiveTypeName()) {
case INT32:
@@ -190,33 +205,33 @@ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
return Optional.of(new DateWriter(desc));
return Optional.of(dateWriter(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeType) {
return Optional.of(new TimeWriter(desc));
Preconditions.checkArgument(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this check as for timestamp there was a check to process only MICROS.

LogicalTypeAnnotation.TimeUnit.MICROS.equals(timeType.getUnit()),
"Cannot write time in %s, only MICROS is supported",
timeType.getUnit());
return Optional.of(timeWriter(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampType) {
Preconditions.checkArgument(
LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()),
"Cannot write timestamp in %s, only MICROS is supported",
timestampType.getUnit());
if (timestampType.isAdjustedToUTC()) {
return Optional.of(new TimestamptzWriter(desc));
} else {
return Optional.of(new TimestampWriter(desc));
}
return Optional.of(timestampWriter(desc, timestampType.isAdjustedToUTC()));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.IntLogicalTypeAnnotation intType) {
Preconditions.checkArgument(
intType.isSigned() || intType.getBitWidth() < 64,
@@ -229,75 +244,21 @@ public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) {
return Optional.of(ParquetValueWriters.strings(desc));
}

@Override
public Optional<ParquetValueWriters.PrimitiveWriter<?>> visit(
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonType) {
return Optional.of(ParquetValueWriters.byteBuffers(desc));
}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateWriter extends ParquetValueWriters.PrimitiveWriter<LocalDate> {
private DateWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, LocalDate value) {
column.writeInteger(repetitionLevel, (int) ChronoUnit.DAYS.between(EPOCH_DAY, value));
}
}

private static class TimeWriter extends ParquetValueWriters.PrimitiveWriter<LocalTime> {
private TimeWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, LocalTime value) {
column.writeLong(repetitionLevel, value.toNanoOfDay() / 1000);
}
}

private static class TimestampWriter extends ParquetValueWriters.PrimitiveWriter<LocalDateTime> {
private TimestampWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, LocalDateTime value) {
column.writeLong(
repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC)));
}
}

private static class TimestamptzWriter
extends ParquetValueWriters.PrimitiveWriter<OffsetDateTime> {
private TimestamptzWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, OffsetDateTime value) {
column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value));
}
}

private static class FixedWriter extends ParquetValueWriters.PrimitiveWriter<byte[]> {
private FixedWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, byte[] value) {
column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value));
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
return Optional.of(ParquetValueWriters.uuids(desc));
}
}
}
Original file line number Diff line number Diff line change
@@ -18,15 +18,25 @@
*/
package org.apache.iceberg.data.parquet;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericDataUtil;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders.StructReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.types.Types.StructType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

@@ -49,47 +59,127 @@ public static ParquetValueReader<Record> buildReader(
@Override
protected ParquetValueReader<Record> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return new RecordReader(types, fieldReaders, structType);
return ParquetValueReaders.recordReader(types, fieldReaders, structType);
}

@Override
protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) {
return GenericDataUtil.internalToGeneric(type, value);
}

private static class RecordReader extends StructReader<Record, Record> {
private final GenericRecord template;
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

RecordReader(List<Type> types, List<ParquetValueReader<?>> readers, StructType struct) {
super(types, readers);
this.template = struct != null ? GenericRecord.create(struct) : null;
static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
DateReader(ColumnDescriptor desc) {
super(desc);
}

@Override
protected Record newStructData(Record reuse) {
if (reuse != null) {
return reuse;
} else {
// GenericRecord.copy() is more performant then GenericRecord.create(StructType) since
// NAME_MAP_CACHE access
// is eliminated. Using copy here to gain performance.
return template.copy();
}
public LocalDate read(LocalDate reuse) {
return EPOCH_DAY.plusDays(column.nextInteger());
}
}

static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
TimestampReader(ColumnDescriptor desc) {
super(desc);
}

@Override
protected Object getField(Record intermediate, int pos) {
return intermediate.get(pos);
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime();
}
}

static class TimestampMillisReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
protected Record buildStruct(Record struct) {
return struct;
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime();
}
}

static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN))
.plusNanos(timeOfDayNanos)
.atOffset(ZoneOffset.UTC);
}
}

static class TimestamptzReader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
TimestamptzReader(ColumnDescriptor desc) {
super(desc);
}

@Override
protected void set(Record struct, int pos, Object value) {
struct.set(pos, value);
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS);
}
}

static class TimestamptzMillisReader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
TimestamptzMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS);
}
}

static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextInteger() * 1000000L);
}
}

static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
TimeReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000L);
}
}

static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
FixedReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public byte[] read(byte[] reuse) {
if (reuse != null) {
column.nextBinary().toByteBuffer().duplicate().get(reuse);
return reuse;
} else {
return column.nextBinary().getBytes();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -18,10 +18,21 @@
*/
package org.apache.iceberg.data.parquet;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;

public class GenericParquetWriter extends BaseParquetWriter<Record> {
@@ -35,17 +46,73 @@ public static ParquetValueWriter<Record> buildWriter(MessageType type) {

@Override
protected StructWriter<Record> createStructWriter(List<ParquetValueWriter<?>> writers) {
return new RecordWriter(writers);
return ParquetValueWriters.recordWriter(writers);
}

private static class RecordWriter extends StructWriter<Record> {
private RecordWriter(List<ParquetValueWriter<?>> writers) {
super(writers);
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

static class DateWriter extends ParquetValueWriters.PrimitiveWriter<LocalDate> {
DateWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, LocalDate value) {
column.writeInteger(repetitionLevel, (int) ChronoUnit.DAYS.between(EPOCH_DAY, value));
}
}

static class TimeWriter extends ParquetValueWriters.PrimitiveWriter<LocalTime> {
TimeWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, LocalTime value) {
column.writeLong(repetitionLevel, value.toNanoOfDay() / 1000);
}
}

static class TimestampWriter extends ParquetValueWriters.PrimitiveWriter<LocalDateTime> {
TimestampWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, LocalDateTime value) {
column.writeLong(
repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value.atOffset(ZoneOffset.UTC)));
}
}

static class TimestamptzWriter extends ParquetValueWriters.PrimitiveWriter<OffsetDateTime> {
TimestamptzWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, OffsetDateTime value) {
column.writeLong(repetitionLevel, ChronoUnit.MICROS.between(EPOCH, value));
}
}

static class FixedWriter extends ParquetValueWriters.PrimitiveWriter<byte[]> {
private final int length;

FixedWriter(ColumnDescriptor desc) {
super(desc);
this.length = desc.getPrimitiveType().getTypeLength();
}

@Override
protected Object get(Record struct, int index) {
return struct.get(index);
public void write(int repetitionLevel, byte[] value) {
Preconditions.checkArgument(
value.length == length,
"Cannot write byte buffer of length %s as fixed[%s]",
value.length,
length);
column.writeBinary(repetitionLevel, Binary.fromReusedByteArray(value));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data.parquet;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.types.Types.StructType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public class InternalReader<T extends StructLike> extends BaseParquetReaders<T> {

private static final InternalReader<?> INSTANCE = new InternalReader<>();

private InternalReader() {}

@SuppressWarnings("unchecked")
public static <T extends StructLike> ParquetValueReader<T> create(
Schema expectedSchema, MessageType fileSchema) {
return (ParquetValueReader<T>) INSTANCE.createReader(expectedSchema, fileSchema);
}

@SuppressWarnings("unchecked")
public static <T extends StructLike> ParquetValueReader<T> create(
Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> idToConstant) {
return (ParquetValueReader<T>) INSTANCE.createReader(expectedSchema, fileSchema, idToConstant);
}

@Override
@SuppressWarnings("unchecked")
protected ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return (ParquetValueReader<T>)
ParquetValueReaders.recordReader(types, fieldReaders, structType);
}

@Override
protected ParquetValueReader<?> fixedReader(ColumnDescriptor desc) {
return new ParquetValueReaders.BytesReader(desc);
}

@Override
protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
return new ParquetValueReaders.UnboxedReader<>(desc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't correct. The unboxed reader will return a Binary for int96 columns. Instead, this needs to use the same logic as the Spark reader (which also uses the internal representation):

  private static class TimestampInt96Reader extends UnboxedReader<Long> {
    TimestampInt96Reader(ColumnDescriptor desc) {
      super(desc);
    }

    @Override
    public Long read(Long ignored) {
      return readLong();
    }
    @Override
    public long readLong() {
      final ByteBuffer byteBuffer =
          column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
      return ParquetUtil.extractTimestampInt96(byteBuffer);
    }
  }

You can move that class into the parquet package to share it.

}

@Override
protected ParquetValueReader<?> timeReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return ParquetValueReaders.millisAsTimes(desc);
}

return new ParquetValueReaders.UnboxedReader<>(desc);
}

@Override
protected ParquetValueReader<?> timestampReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) {
if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
return ParquetValueReaders.int96Timestamps(desc);
}

if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return ParquetValueReaders.millisAsTimestamps(desc);
}

return new ParquetValueReaders.UnboxedReader<>(desc);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data.parquet;

import java.util.List;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.parquet.ParquetValueWriter;
import org.apache.iceberg.parquet.ParquetValueWriters;
import org.apache.iceberg.parquet.ParquetValueWriters.StructWriter;
import org.apache.iceberg.types.Type;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.MessageType;

/**
* A Writer that consumes Iceberg's internal in-memory object model.
*
* <p>Iceberg's internal in-memory object model produces the types defined in {@link
* Type.TypeID#javaClass()}.
*/
public class InternalWriter<T extends StructLike> extends BaseParquetWriter<T> {
private static final InternalWriter<?> INSTANCE = new InternalWriter<>();

private InternalWriter() {}

@SuppressWarnings("unchecked")
public static <T extends StructLike> ParquetValueWriter<T> create(MessageType type) {
return (ParquetValueWriter<T>) INSTANCE.createWriter(type);
}

@Override
protected StructWriter<T> createStructWriter(List<ParquetValueWriter<?>> writers) {
return ParquetValueWriters.recordWriter(writers);
}

@Override
protected ParquetValueWriter<?> fixedWriter(ColumnDescriptor desc) {
return ParquetValueWriters.fixedBuffers(desc);
}

@Override
protected ParquetValueWriter<?> dateWriter(ColumnDescriptor desc) {
return ParquetValueWriters.ints(desc);
}

@Override
protected ParquetValueWriter<?> timeWriter(ColumnDescriptor desc) {
return ParquetValueWriters.longs(desc);
}

@Override
protected ParquetValueWriter<?> timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) {
return ParquetValueWriters.longs(desc);
}
}
Original file line number Diff line number Diff line change
@@ -24,12 +24,18 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.io.api.Binary;
@@ -63,6 +69,27 @@ public static ParquetValueReader<Long> position() {
return new PositionReader();
}

public static ParquetValueReader<UUID> uuids(ColumnDescriptor desc) {
return new ParquetValueReaders.UUIDReader(desc);
}

public static ParquetValueReader<Long> int96Timestamps(ColumnDescriptor desc) {
return new ParquetValueReaders.TimestampInt96Reader(desc);
}

public static ParquetValueReader<Long> millisAsTimes(ColumnDescriptor desc) {
return new ParquetValueReaders.TimeMillisReader(desc);
}

public static ParquetValueReader<Long> millisAsTimestamps(ColumnDescriptor desc) {
return new ParquetValueReaders.TimestampMillisReader(desc);
}

public static ParquetValueReader<Record> recordReader(
List<Type> types, List<ParquetValueReader<?>> readers, Types.StructType struct) {
return new RecordReader(types, readers, struct);
}

private static class NullReader<T> implements ParquetValueReader<T> {
private static final NullReader<Void> INSTANCE = new NullReader<>();
private static final ImmutableList<TripleIterator<?>> COLUMNS = ImmutableList.of();
@@ -401,6 +428,17 @@ public ByteBuffer read(ByteBuffer reuse) {
}
}

private static class UUIDReader extends PrimitiveReader<UUID> {
private UUIDReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public UUID read(UUID reuse) {
return UUIDUtil.convert(column.nextBinary().toByteBuffer());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks fine to me.

}
}

public static class ByteArrayReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
public ByteArrayReader(ColumnDescriptor desc) {
super(desc);
@@ -412,6 +450,57 @@ public byte[] read(byte[] ignored) {
}
}

private static class TimestampInt96Reader extends UnboxedReader<Long> {

private TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Long read(Long ignored) {
return readLong();
}

@Override
public long readLong() {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
return ParquetUtil.extractTimestampInt96(byteBuffer);
}
}

private static class TimeMillisReader extends UnboxedReader<Long> {
private TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Long read(Long ignored) {
return readLong();
}

@Override
public long readLong() {
return 1000L * column.nextInteger();
}
}

private static class TimestampMillisReader extends UnboxedReader<Long> {
private TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Long read(Long ignored) {
return readLong();
}

@Override
public long readLong() {
return 1000L * column.nextLong();
}
}

private static class OptionReader<T> implements ParquetValueReader<T> {
private final int definitionLevel;
private final ParquetValueReader<T> reader;
@@ -850,4 +939,39 @@ private TripleIterator<?> firstNonNullColumn(List<TripleIterator<?>> columns) {
return NullReader.NULL_COLUMN;
}
}

private static class RecordReader extends StructReader<Record, Record> {
private final GenericRecord template;

RecordReader(List<Type> types, List<ParquetValueReader<?>> readers, Types.StructType struct) {
super(types, readers);
this.template = struct != null ? GenericRecord.create(struct) : null;
}

@Override
protected Record newStructData(Record reuse) {
if (reuse != null) {
return reuse;
} else {
// GenericRecord.copy() is more performant than GenericRecord.create(StructType) since
// NAME_MAP_CACHE access is eliminated. Using copy here to gain performance.
return template.copy();
}
}

@Override
protected Object getField(Record intermediate, int pos) {
return intermediate.get(pos);
}

@Override
protected Record buildStruct(Record struct) {
return struct;
}

@Override
protected void set(Record struct, int pos, Object value) {
struct.set(pos, value);
}
}
}
Original file line number Diff line number Diff line change
@@ -21,23 +21,27 @@
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.avro.util.Utf8;
import org.apache.iceberg.DoubleFieldMetrics;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.FloatFieldMetrics;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.deletes.PositionDelete;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.DecimalUtil;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.io.api.Binary;
@@ -87,6 +91,10 @@ public static PrimitiveWriter<CharSequence> strings(ColumnDescriptor desc) {
return new StringWriter(desc);
}

public static PrimitiveWriter<UUID> uuids(ColumnDescriptor desc) {
return new UUIDWriter(desc);
}

public static PrimitiveWriter<BigDecimal> decimalAsInteger(
ColumnDescriptor desc, int precision, int scale) {
return new IntegerDecimalWriter(desc, precision, scale);
@@ -106,6 +114,10 @@ public static PrimitiveWriter<ByteBuffer> byteBuffers(ColumnDescriptor desc) {
return new BytesWriter(desc);
}

public static PrimitiveWriter<ByteBuffer> fixedBuffers(ColumnDescriptor desc) {
return new FixedBufferWriter(desc);
}

public static <E> CollectionWriter<E> collections(int dl, int rl, ParquetValueWriter<E> writer) {
return new CollectionWriter<>(dl, rl, writer);
}
@@ -115,6 +127,11 @@ public static <K, V> MapWriter<K, V> maps(
return new MapWriter<>(dl, rl, keyWriter, valueWriter);
}

public static <T extends StructLike> StructWriter<T> recordWriter(
List<ParquetValueWriter<?>> writers) {
return new RecordWriter<>(writers);
}

public abstract static class PrimitiveWriter<T> implements ParquetValueWriter<T> {
@SuppressWarnings("checkstyle:VisibilityModifier")
protected final ColumnWriter<T> column;
@@ -313,6 +330,25 @@ public void write(int repetitionLevel, ByteBuffer buffer) {
}
}

private static class FixedBufferWriter extends PrimitiveWriter<ByteBuffer> {
private final int length;

private FixedBufferWriter(ColumnDescriptor desc) {
super(desc);
this.length = desc.getPrimitiveType().getTypeLength();
}

@Override
public void write(int repetitionLevel, ByteBuffer buffer) {
Preconditions.checkArgument(
buffer.remaining() == length,
"Cannot write byte buffer of length %s as fixed[%s]",
buffer.remaining(),
length);
column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer));
}
}

private static class StringWriter extends PrimitiveWriter<CharSequence> {
private StringWriter(ColumnDescriptor desc) {
super(desc);
@@ -330,6 +366,37 @@ public void write(int repetitionLevel, CharSequence value) {
}
}

private static class UUIDWriter extends PrimitiveWriter<UUID> {
private static final ThreadLocal<ByteBuffer> BUFFER =
ThreadLocal.withInitial(
() -> {
ByteBuffer buffer = ByteBuffer.allocate(16);
buffer.order(ByteOrder.BIG_ENDIAN);
return buffer;
});

private UUIDWriter(ColumnDescriptor desc) {
super(desc);
}

@Override
public void write(int repetitionLevel, UUID value) {
ByteBuffer buffer = UUIDUtil.convertToByteBuffer(value, BUFFER.get());
column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer));
}
}

private static class RecordWriter<T extends StructLike> extends StructWriter<T> {
private RecordWriter(List<ParquetValueWriter<?>> writers) {
super(writers);
}

@Override
protected Object get(T struct, int index) {
return struct.get(index, Object.class);
}
}

static class OptionWriter<T> implements ParquetValueWriter<T> {
private final int definitionLevel;
private final ParquetValueWriter<T> writer;
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.parquet;

import java.io.IOException;
import java.util.List;
import org.apache.iceberg.InternalTestHelpers;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RandomInternalData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.avro.AvroDataTest;
import org.apache.iceberg.data.parquet.InternalReader;
import org.apache.iceberg.data.parquet.InternalWriter;
import org.apache.iceberg.inmemory.InMemoryOutputFile;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class TestInternalParquet extends AvroDataTest {
@Override
protected void writeAndValidate(Schema schema) throws IOException {
List<StructLike> expected = RandomInternalData.generate(schema, 100, 1376L);

OutputFile outputFile = new InMemoryOutputFile();

try (DataWriter<StructLike> dataWriter =
Parquet.writeData(outputFile)
.schema(schema)
.createWriterFunc(InternalWriter::create)
.overwrite()
.withSpec(PartitionSpec.unpartitioned())
.build()) {
for (StructLike record : expected) {
dataWriter.write(record);
}
}

List<StructLike> rows;
try (CloseableIterable<StructLike> reader =
Parquet.read(outputFile.toInputFile())
.project(schema)
.createReaderFunc(fileSchema -> InternalReader.create(schema, fileSchema))
.build()) {
rows = Lists.newArrayList(reader);
}

for (int i = 0; i < expected.size(); i += 1) {
InternalTestHelpers.assertEquals(schema.asStruct(), expected.get(i), rows.get(i));
}

// test reuseContainers
try (CloseableIterable<StructLike> reader =
Parquet.read(outputFile.toInputFile())
.project(schema)
.reuseContainers()
.createReaderFunc(fileSchema -> InternalReader.create(schema, fileSchema))
.build()) {
int index = 0;
for (StructLike actualRecord : reader) {
InternalTestHelpers.assertEquals(schema.asStruct(), expected.get(index), actualRecord);
index += 1;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -21,14 +21,12 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetValueReaders.FloatAsDoubleReader;
@@ -265,7 +263,7 @@ public ParquetValueReader<?> primitive(
case TIMESTAMP_MICROS:
return new UnboxedReader<>(desc);
case TIMESTAMP_MILLIS:
return new TimestampMillisReader(desc);
return ParquetValueReaders.millisAsTimestamps(desc);
case DECIMAL:
DecimalLogicalTypeAnnotation decimal =
(DecimalLogicalTypeAnnotation) primitive.getLogicalTypeAnnotation();
@@ -315,7 +313,7 @@ public ParquetValueReader<?> primitive(
case INT96:
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards
// compatibility we try to read INT96 as timestamps.
return new TimestampInt96Reader(desc);
return ParquetValueReaders.int96Timestamps(desc);
default:
throw new UnsupportedOperationException("Unsupported type: " + primitive);
}
@@ -373,41 +371,6 @@ public Decimal read(Decimal ignored) {
}
}

private static class TimestampMillisReader extends UnboxedReader<Long> {
ajantha-bhat marked this conversation as resolved.
Show resolved Hide resolved
TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Long read(Long ignored) {
return readLong();
}

@Override
public long readLong() {
return 1000 * column.nextLong();
}
}

private static class TimestampInt96Reader extends UnboxedReader<Long> {

TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Long read(Long ignored) {
return readLong();
}

@Override
public long readLong() {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
return ParquetUtil.extractTimestampInt96(byteBuffer);
}
}

private static class StringReader extends PrimitiveReader<UTF8String> {
StringReader(ColumnDescriptor desc) {
super(desc);