-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parquet: Add readers and writers for the internal object model #11904
Changes from 1 commit
b0658f9
9dfca7d
868cc50
cb68af4
2f210d5
0951235
d5fb4ca
3b7ba50
20f7c26
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,19 +18,8 @@ | |
*/ | ||
package org.apache.iceberg.data.parquet; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.nio.ByteOrder; | ||
import java.time.Instant; | ||
import java.time.LocalDate; | ||
import java.time.LocalDateTime; | ||
import java.time.LocalTime; | ||
import java.time.OffsetDateTime; | ||
import java.time.ZoneOffset; | ||
import java.time.temporal.ChronoUnit; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.iceberg.MetadataColumns; | ||
import org.apache.iceberg.Schema; | ||
import org.apache.iceberg.parquet.ParquetSchemaUtil; | ||
|
@@ -45,7 +34,6 @@ | |
import org.apache.parquet.column.ColumnDescriptor; | ||
import org.apache.parquet.schema.GroupType; | ||
import org.apache.parquet.schema.LogicalTypeAnnotation; | ||
import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; | ||
import org.apache.parquet.schema.MessageType; | ||
import org.apache.parquet.schema.PrimitiveType; | ||
import org.apache.parquet.schema.Type; | ||
|
@@ -76,6 +64,16 @@ protected ParquetValueReader<T> createReader( | |
protected abstract ParquetValueReader<T> createStructReader( | ||
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType); | ||
|
||
protected abstract LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> | ||
logicalTypeReaderVisitor( | ||
ColumnDescriptor desc, | ||
org.apache.iceberg.types.Type.PrimitiveType expected, | ||
PrimitiveType primitive); | ||
|
||
protected abstract ParquetValueReaders.PrimitiveReader<?> fixedReader(ColumnDescriptor desc); | ||
|
||
protected abstract ParquetValueReaders.PrimitiveReader<?> int96Reader(ColumnDescriptor desc); | ||
|
||
protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { | ||
return value; | ||
} | ||
|
@@ -114,113 +112,6 @@ public ParquetValueReader<?> struct( | |
} | ||
} | ||
|
||
private class LogicalTypeAnnotationParquetValueReaderVisitor | ||
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> { | ||
|
||
private final ColumnDescriptor desc; | ||
private final org.apache.iceberg.types.Type.PrimitiveType expected; | ||
private final PrimitiveType primitive; | ||
|
||
LogicalTypeAnnotationParquetValueReaderVisitor( | ||
ColumnDescriptor desc, | ||
org.apache.iceberg.types.Type.PrimitiveType expected, | ||
PrimitiveType primitive) { | ||
this.desc = desc; | ||
this.expected = expected; | ||
this.primitive = primitive; | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { | ||
return Optional.of(new ParquetValueReaders.StringReader(desc)); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { | ||
return Optional.of(new ParquetValueReaders.StringReader(desc)); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit(DecimalLogicalTypeAnnotation decimalLogicalType) { | ||
switch (primitive.getPrimitiveTypeName()) { | ||
case BINARY: | ||
case FIXED_LEN_BYTE_ARRAY: | ||
return Optional.of( | ||
new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale())); | ||
case INT64: | ||
return Optional.of( | ||
new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale())); | ||
case INT32: | ||
return Optional.of( | ||
new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale())); | ||
default: | ||
throw new UnsupportedOperationException( | ||
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); | ||
} | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { | ||
return Optional.of(new DateReader(desc)); | ||
ajantha-bhat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { | ||
if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { | ||
return Optional.of(new TimeReader(desc)); | ||
} else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { | ||
return Optional.of(new TimeMillisReader(desc)); | ||
} | ||
|
||
return Optional.empty(); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { | ||
if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { | ||
Types.TimestampType tsMicrosType = (Types.TimestampType) expected; | ||
return tsMicrosType.shouldAdjustToUTC() | ||
? Optional.of(new TimestamptzReader(desc)) | ||
: Optional.of(new TimestampReader(desc)); | ||
} else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { | ||
Types.TimestampType tsMillisType = (Types.TimestampType) expected; | ||
return tsMillisType.shouldAdjustToUTC() | ||
? Optional.of(new TimestamptzMillisReader(desc)) | ||
: Optional.of(new TimestampMillisReader(desc)); | ||
} | ||
|
||
return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { | ||
if (intLogicalType.getBitWidth() == 64) { | ||
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); | ||
} | ||
return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) | ||
? Optional.of(new ParquetValueReaders.IntAsLongReader(desc)) | ||
: Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { | ||
return Optional.of(new ParquetValueReaders.StringReader(desc)); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { | ||
return Optional.of(new ParquetValueReaders.BytesReader(desc)); | ||
} | ||
} | ||
|
||
private class ReadBuilder extends TypeWithSchemaVisitor<ParquetValueReader<?>> { | ||
private final MessageType type; | ||
private final Map<Integer, ?> idToConstant; | ||
|
@@ -362,7 +253,7 @@ public ParquetValueReader<?> primitive( | |
if (primitive.getOriginalType() != null) { | ||
return primitive | ||
.getLogicalTypeAnnotation() | ||
.accept(new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive)) | ||
.accept(logicalTypeReaderVisitor(desc, expected, primitive)) | ||
.orElseThrow( | ||
() -> | ||
new UnsupportedOperationException( | ||
|
@@ -371,7 +262,7 @@ public ParquetValueReader<?> primitive( | |
|
||
switch (primitive.getPrimitiveTypeName()) { | ||
case FIXED_LEN_BYTE_ARRAY: | ||
return new FixedReader(desc); | ||
return fixedReader(desc); | ||
case BINARY: | ||
if (expected.typeId() == org.apache.iceberg.types.Type.TypeID.STRING) { | ||
return new ParquetValueReaders.StringReader(desc); | ||
|
@@ -397,7 +288,7 @@ public ParquetValueReader<?> primitive( | |
case INT96: | ||
// Impala & Spark used to write timestamps as INT96 without a logical type. For backwards | ||
// compatibility we try to read INT96 as timestamps. | ||
return new TimestampInt96Reader(desc); | ||
return int96Reader(desc); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can't this call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I should assume my understanding is if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Because the reader produces and |
||
default: | ||
throw new UnsupportedOperationException("Unsupported type: " + primitive); | ||
} | ||
|
@@ -407,124 +298,4 @@ MessageType type() { | |
return type; | ||
} | ||
} | ||
|
||
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); | ||
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); | ||
|
||
private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> { | ||
private DateReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalDate read(LocalDate reuse) { | ||
return EPOCH_DAY.plusDays(column.nextInteger()); | ||
} | ||
} | ||
|
||
private static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> { | ||
private TimestampReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalDateTime read(LocalDateTime reuse) { | ||
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime(); | ||
} | ||
} | ||
|
||
private static class TimestampMillisReader | ||
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> { | ||
private TimestampMillisReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalDateTime read(LocalDateTime reuse) { | ||
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime(); | ||
} | ||
} | ||
|
||
private static class TimestampInt96Reader | ||
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> { | ||
private static final long UNIX_EPOCH_JULIAN = 2_440_588L; | ||
|
||
private TimestampInt96Reader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public OffsetDateTime read(OffsetDateTime reuse) { | ||
final ByteBuffer byteBuffer = | ||
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); | ||
final long timeOfDayNanos = byteBuffer.getLong(); | ||
final int julianDay = byteBuffer.getInt(); | ||
|
||
return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN)) | ||
.plusNanos(timeOfDayNanos) | ||
.atOffset(ZoneOffset.UTC); | ||
} | ||
} | ||
|
||
private static class TimestamptzReader | ||
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> { | ||
private TimestamptzReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public OffsetDateTime read(OffsetDateTime reuse) { | ||
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS); | ||
} | ||
} | ||
|
||
private static class TimestamptzMillisReader | ||
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> { | ||
private TimestamptzMillisReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public OffsetDateTime read(OffsetDateTime reuse) { | ||
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS); | ||
} | ||
} | ||
|
||
private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> { | ||
private TimeMillisReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalTime read(LocalTime reuse) { | ||
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L); | ||
} | ||
} | ||
|
||
private static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> { | ||
private TimeReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalTime read(LocalTime reuse) { | ||
return LocalTime.ofNanoOfDay(column.nextLong() * 1000L); | ||
} | ||
} | ||
|
||
private static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> { | ||
private FixedReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public byte[] read(byte[] reuse) { | ||
if (reuse != null) { | ||
column.nextBinary().toByteBuffer().duplicate().get(reuse); | ||
return reuse; | ||
} else { | ||
return column.nextBinary().getBytes(); | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,16 +18,32 @@ | |
*/ | ||
package org.apache.iceberg.data.parquet; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.nio.ByteOrder; | ||
import java.time.Instant; | ||
import java.time.LocalDate; | ||
import java.time.LocalDateTime; | ||
import java.time.LocalTime; | ||
import java.time.OffsetDateTime; | ||
import java.time.ZoneOffset; | ||
import java.time.temporal.ChronoUnit; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.TimeUnit; | ||
import org.apache.iceberg.Schema; | ||
import org.apache.iceberg.data.GenericDataUtil; | ||
import org.apache.iceberg.data.GenericRecord; | ||
import org.apache.iceberg.data.Record; | ||
import org.apache.iceberg.parquet.ParquetValueReader; | ||
import org.apache.iceberg.parquet.ParquetValueReaders; | ||
import org.apache.iceberg.parquet.ParquetValueReaders.StructReader; | ||
import org.apache.iceberg.types.Types; | ||
import org.apache.iceberg.types.Types.StructType; | ||
import org.apache.parquet.column.ColumnDescriptor; | ||
import org.apache.parquet.schema.LogicalTypeAnnotation; | ||
import org.apache.parquet.schema.MessageType; | ||
import org.apache.parquet.schema.PrimitiveType; | ||
import org.apache.parquet.schema.Type; | ||
|
||
public class GenericParquetReaders extends BaseParquetReaders<Record> { | ||
|
@@ -52,6 +68,25 @@ protected ParquetValueReader<Record> createStructReader( | |
return new RecordReader(types, fieldReaders, structType); | ||
} | ||
|
||
@Override | ||
protected LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> | ||
logicalTypeReaderVisitor( | ||
ColumnDescriptor desc, | ||
org.apache.iceberg.types.Type.PrimitiveType expected, | ||
PrimitiveType primitive) { | ||
return new LogicalTypeAnnotationParquetValueReaderVisitor(desc, expected, primitive); | ||
} | ||
|
||
@Override | ||
protected ParquetValueReaders.PrimitiveReader<?> fixedReader(ColumnDescriptor desc) { | ||
return new FixedReader(desc); | ||
} | ||
|
||
@Override | ||
protected ParquetValueReaders.PrimitiveReader<?> int96Reader(ColumnDescriptor desc) { | ||
return new TimestampInt96Reader(desc); | ||
} | ||
|
||
@Override | ||
protected Object convertConstant(org.apache.iceberg.types.Type type, Object value) { | ||
return GenericDataUtil.internalToGeneric(type, value); | ||
|
@@ -92,4 +127,232 @@ protected void set(Record struct, int pos, Object value) { | |
struct.set(pos, value); | ||
} | ||
} | ||
|
||
private class LogicalTypeAnnotationParquetValueReaderVisitor | ||
implements LogicalTypeAnnotation.LogicalTypeAnnotationVisitor<ParquetValueReader<?>> { | ||
|
||
private final ColumnDescriptor desc; | ||
private final org.apache.iceberg.types.Type.PrimitiveType expected; | ||
private final PrimitiveType primitive; | ||
|
||
LogicalTypeAnnotationParquetValueReaderVisitor( | ||
ColumnDescriptor desc, | ||
org.apache.iceberg.types.Type.PrimitiveType expected, | ||
PrimitiveType primitive) { | ||
this.desc = desc; | ||
this.expected = expected; | ||
this.primitive = primitive; | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.StringLogicalTypeAnnotation stringLogicalType) { | ||
return Optional.of(new ParquetValueReaders.StringReader(desc)); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) { | ||
return Optional.of(new ParquetValueReaders.StringReader(desc)); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalLogicalType) { | ||
switch (primitive.getPrimitiveTypeName()) { | ||
case BINARY: | ||
case FIXED_LEN_BYTE_ARRAY: | ||
return Optional.of( | ||
new ParquetValueReaders.BinaryAsDecimalReader(desc, decimalLogicalType.getScale())); | ||
case INT64: | ||
return Optional.of( | ||
new ParquetValueReaders.LongAsDecimalReader(desc, decimalLogicalType.getScale())); | ||
case INT32: | ||
return Optional.of( | ||
new ParquetValueReaders.IntegerAsDecimalReader(desc, decimalLogicalType.getScale())); | ||
default: | ||
throw new UnsupportedOperationException( | ||
"Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); | ||
} | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateLogicalType) { | ||
return Optional.of(new DateReader(desc)); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.TimeLogicalTypeAnnotation timeLogicalType) { | ||
if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { | ||
return Optional.of(new TimeReader(desc)); | ||
} else if (timeLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { | ||
return Optional.of(new TimeMillisReader(desc)); | ||
} | ||
|
||
return Optional.empty(); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestampLogicalType) { | ||
if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { | ||
Types.TimestampType tsMicrosType = (Types.TimestampType) expected; | ||
return tsMicrosType.shouldAdjustToUTC() | ||
? Optional.of(new TimestamptzReader(desc)) | ||
: Optional.of(new TimestampReader(desc)); | ||
} else if (timestampLogicalType.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { | ||
Types.TimestampType tsMillisType = (Types.TimestampType) expected; | ||
return tsMillisType.shouldAdjustToUTC() | ||
? Optional.of(new TimestamptzMillisReader(desc)) | ||
: Optional.of(new TimestampMillisReader(desc)); | ||
} | ||
|
||
return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampLogicalType); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.IntLogicalTypeAnnotation intLogicalType) { | ||
if (intLogicalType.getBitWidth() == 64) { | ||
return Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); | ||
} | ||
return (expected.typeId() == org.apache.iceberg.types.Type.TypeID.LONG) | ||
? Optional.of(new ParquetValueReaders.IntAsLongReader(desc)) | ||
: Optional.of(new ParquetValueReaders.UnboxedReader<>(desc)); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.JsonLogicalTypeAnnotation jsonLogicalType) { | ||
return Optional.of(new ParquetValueReaders.StringReader(desc)); | ||
} | ||
|
||
@Override | ||
public Optional<ParquetValueReader<?>> visit( | ||
LogicalTypeAnnotation.BsonLogicalTypeAnnotation bsonLogicalType) { | ||
return Optional.of(new ParquetValueReaders.BytesReader(desc)); | ||
} | ||
} | ||
|
||
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); | ||
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); | ||
|
||
private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with moving the date/time reader classes here. |
||
private DateReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalDate read(LocalDate reuse) { | ||
return EPOCH_DAY.plusDays(column.nextInteger()); | ||
} | ||
} | ||
|
||
private static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> { | ||
private TimestampReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalDateTime read(LocalDateTime reuse) { | ||
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime(); | ||
} | ||
} | ||
|
||
private static class TimestampMillisReader | ||
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> { | ||
private TimestampMillisReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalDateTime read(LocalDateTime reuse) { | ||
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime(); | ||
} | ||
} | ||
|
||
private static class TimestampInt96Reader | ||
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> { | ||
private static final long UNIX_EPOCH_JULIAN = 2_440_588L; | ||
|
||
private TimestampInt96Reader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public OffsetDateTime read(OffsetDateTime reuse) { | ||
final ByteBuffer byteBuffer = | ||
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN); | ||
final long timeOfDayNanos = byteBuffer.getLong(); | ||
final int julianDay = byteBuffer.getInt(); | ||
|
||
return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN)) | ||
.plusNanos(timeOfDayNanos) | ||
.atOffset(ZoneOffset.UTC); | ||
} | ||
} | ||
|
||
private static class TimestamptzReader | ||
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> { | ||
private TimestamptzReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public OffsetDateTime read(OffsetDateTime reuse) { | ||
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS); | ||
} | ||
} | ||
|
||
private static class TimestamptzMillisReader | ||
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> { | ||
private TimestamptzMillisReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public OffsetDateTime read(OffsetDateTime reuse) { | ||
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS); | ||
} | ||
} | ||
|
||
private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> { | ||
private TimeMillisReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalTime read(LocalTime reuse) { | ||
return LocalTime.ofNanoOfDay(column.nextLong() * 1000000L); | ||
ajantha-bhat marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
private static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> { | ||
private TimeReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public LocalTime read(LocalTime reuse) { | ||
return LocalTime.ofNanoOfDay(column.nextLong() * 1000L); | ||
} | ||
} | ||
|
||
private static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> { | ||
private FixedReader(ColumnDescriptor desc) { | ||
super(desc); | ||
} | ||
|
||
@Override | ||
public byte[] read(byte[] reuse) { | ||
if (reuse != null) { | ||
column.nextBinary().toByteBuffer().duplicate().get(reuse); | ||
return reuse; | ||
} else { | ||
return column.nextBinary().getBytes(); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it makes sense to have the subclasses provide this visitor.