diff --git a/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java b/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java index 8acff6de0563..5e08a3a3085d 100644 --- a/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java +++ b/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java @@ -74,6 +74,10 @@ public static LocalDateTime timestampFromNanos(long nanosFromEpoch) { return ChronoUnit.NANOS.addTo(EPOCH, nanosFromEpoch).toLocalDateTime(); } + public static LocalDateTime timestampFromMillis(long millisFromEpoch) { + return ChronoUnit.MILLIS.addTo(EPOCH, millisFromEpoch).toLocalDateTime(); + } + public static long microsFromInstant(Instant instant) { return ChronoUnit.MICROS.between(EPOCH, instant.atOffset(ZoneOffset.UTC)); } @@ -86,6 +90,10 @@ public static long nanosFromTimestamp(LocalDateTime dateTime) { return ChronoUnit.NANOS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC)); } + public static long millisFromTimestamp(LocalDateTime dateTime) { + return ChronoUnit.MILLIS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC)); + } + public static long microsToMillis(long micros) { // When the timestamp is negative, i.e before 1970, we need to adjust the milliseconds portion. // Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision. @@ -109,6 +117,10 @@ public static OffsetDateTime timestamptzFromNanos(long nanosFromEpoch) { return ChronoUnit.NANOS.addTo(EPOCH, nanosFromEpoch); } + public static OffsetDateTime timestamptzFromMillis(long millisFromEpoch) { + return ChronoUnit.MILLIS.addTo(EPOCH, millisFromEpoch); + } + public static long microsFromTimestamptz(OffsetDateTime dateTime) { return ChronoUnit.MICROS.between(EPOCH, dateTime); } @@ -117,6 +129,10 @@ public static long nanosFromTimestamptz(OffsetDateTime dateTime) { return ChronoUnit.NANOS.between(EPOCH, dateTime); } + public static long millisFromTimestamptz(OffsetDateTime dateTime) { + return ChronoUnit.MILLIS.between(EPOCH, dateTime); + } + public static String formatTimestampMillis(long millis) { return Instant.ofEpochMilli(millis).toString().replace("Z", "+00:00"); } diff --git a/api/src/test/java/org/apache/iceberg/util/TestDateTimeUtil.java b/api/src/test/java/org/apache/iceberg/util/TestDateTimeUtil.java index 5799158a8734..02ee8252d5f0 100644 --- a/api/src/test/java/org/apache/iceberg/util/TestDateTimeUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/TestDateTimeUtil.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.time.LocalDateTime; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -96,4 +97,24 @@ public void hourToDaysPositive() { assertThat(DateTimeUtil.hoursToDays(DateTimeUtil.microsToHours(micros))) .isEqualTo(expectedDays); } + + @Test + public void timestampFromMillis() { + assertThat(DateTimeUtil.timestampFromMillis(1510871468000L)) + .isEqualTo(LocalDateTime.parse("2017-11-16T22:31:08")); + assertThat(DateTimeUtil.timestampFromMillis(-1510871468000L)) + .isEqualTo(LocalDateTime.parse("1922-02-15T01:28:52")); + assertThat(DateTimeUtil.timestampFromMillis(0L)) + .isEqualTo(LocalDateTime.parse("1970-01-01T00:00")); + } + + @Test + public void millisFromTimestamp() { + assertThat(DateTimeUtil.millisFromTimestamp(LocalDateTime.parse("2017-11-16T22:31:08"))) + .isEqualTo(1510871468000L); + assertThat(DateTimeUtil.millisFromTimestamp(LocalDateTime.parse("1922-02-15T01:28:52"))) + .isEqualTo(-1510871468000L); + assertThat(DateTimeUtil.millisFromTimestamp(LocalDateTime.parse("1970-01-01T00:00"))) + .isEqualTo(0L); + } } diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java index 75929dfde4c4..eefb67cf3584 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -146,6 +146,12 @@ public ValueReader primitive(Type.PrimitiveType ignored, Schema primitive) { } return GenericReaders.timestampNanos(); + case "timestamp-millis": + if (AvroSchemaUtil.isTimestamptz(primitive)) { + return GenericReaders.timestamptzMillis(); + } + return GenericReaders.timestampMillis(); + case "decimal": return ValueReaders.decimal( ValueReaders.decimalBytesReader(primitive), diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index d072a26820ab..b31fb96fbcb6 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -61,6 +61,14 @@ static ValueReader timestamptzNanos() { return TimestamptzNanoReader.INSTANCE; } + static ValueReader timestampMillis() { + return TimestampMillisReader.INSTANCE; + } + + static ValueReader timestamptzMillis() { + return TimestamptzMillisReader.INSTANCE; + } + static ValueReader struct( List>> readPlan, StructType struct) { return new PlannedRecordReader(readPlan, struct); @@ -137,6 +145,28 @@ public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException { } } + private static class TimestampMillisReader implements ValueReader { + private static final TimestampMillisReader INSTANCE = new TimestampMillisReader(); + + private TimestampMillisReader() {} + + @Override + public LocalDateTime read(Decoder decoder, Object reuse) throws IOException { + return DateTimeUtil.timestampFromMillis(decoder.readLong()); + } + } + + private static class TimestamptzMillisReader implements ValueReader { + private static final TimestamptzMillisReader INSTANCE = new TimestamptzMillisReader(); + + private TimestamptzMillisReader() {} + + @Override + public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException { + return DateTimeUtil.timestamptzFromMillis(decoder.readLong()); + } + } + private static class PlannedRecordReader extends ValueReaders.PlannedStructReader { private final StructType structType; diff --git a/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java index 7e42c5be19bb..747907a2fb97 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java @@ -154,6 +154,12 @@ public ValueReader primitive(Type partner, Schema primitive) { } return GenericReaders.timestampNanos(); + case "timestamp-millis": + if (AvroSchemaUtil.isTimestamptz(primitive)) { + return GenericReaders.timestamptzMillis(); + } + return GenericReaders.timestampMillis(); + case "decimal": return ValueReaders.decimal( ValueReaders.decimalBytesReader(primitive), diff --git a/core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java b/core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java new file mode 100644 index 000000000000..966f01267d67 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.avro; + +import static org.apache.iceberg.avro.AvroSchemaUtil.ADJUST_TO_UTC_PROP; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.jupiter.api.Test; + +class TestDataReader { + + @Test + public void timestampDataReader() throws IOException { + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "timestamp_nanos", Types.TimestampType.withoutZone()), + Types.NestedField.required(2, "timestamp_micros", Types.TimestampType.withoutZone()), + Types.NestedField.required(3, "timestamp_millis", Types.TimestampType.withoutZone())); + + Schema avroSchema = + SchemaBuilder.record("test_programmatic") + .fields() + .name("timestamp_nanos") + .type(LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestamp_micros") + .type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestamp_millis") + .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .endRecord(); + + avroSchema.getField("timestamp_nanos").addProp("field-id", 1); + avroSchema.getField("timestamp_micros").addProp("field-id", 2); + avroSchema.getField("timestamp_millis").addProp("field-id", 3); + + DataReader reader = DataReader.create(icebergSchema, avroSchema); + reader.setSchema(avroSchema); + + // post-epoch timestamps + GenericRecord avroRecord = new GenericData.Record(avroSchema); + LocalDateTime timestampNanos = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456789); + LocalDateTime timestampMicros = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456000); + LocalDateTime timestampMillis = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123000000); + + avroRecord.put("timestamp_nanos", DateTimeUtil.nanosFromTimestamp(timestampNanos)); + avroRecord.put("timestamp_micros", DateTimeUtil.microsFromTimestamp(timestampMicros)); + avroRecord.put("timestamp_millis", DateTimeUtil.millisFromTimestamp(timestampMillis)); + + Record result = readRecord(reader, avroSchema, avroRecord); + + assertThat(result.getField("timestamp_nanos")).isEqualTo(timestampNanos); + assertThat(result.getField("timestamp_micros")).isEqualTo(timestampMicros); + assertThat(result.getField("timestamp_millis")).isEqualTo(timestampMillis); + + // pre-epoch timestamps + GenericRecord preEpochRecord = new GenericData.Record(avroSchema); + LocalDateTime preEpochNanos = LocalDateTime.of(1969, 1, 1, 10, 11, 12, 123456789); + LocalDateTime preEpochMicros = LocalDateTime.of(1968, 1, 1, 10, 11, 12, 123456000); + LocalDateTime preEpochMillis = LocalDateTime.of(1967, 1, 1, 10, 11, 12, 123000000); + + preEpochRecord.put("timestamp_nanos", DateTimeUtil.nanosFromTimestamp(preEpochNanos)); + preEpochRecord.put("timestamp_micros", DateTimeUtil.microsFromTimestamp(preEpochMicros)); + preEpochRecord.put("timestamp_millis", DateTimeUtil.millisFromTimestamp(preEpochMillis)); + + Record preEpochResult = readRecord(reader, avroSchema, preEpochRecord); + + assertThat(preEpochResult.getField("timestamp_nanos")).isEqualTo(preEpochNanos); + assertThat(preEpochResult.getField("timestamp_micros")).isEqualTo(preEpochMicros); + assertThat(preEpochResult.getField("timestamp_millis")).isEqualTo(preEpochMillis); + } + + @Test + public void timestampTzDataReader() throws IOException { + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "timestamp_nanos_tz", Types.TimestampType.withZone()), + Types.NestedField.required(2, "timestamp_micros_tz", Types.TimestampType.withZone()), + Types.NestedField.required(3, "timestamp_millis_tz", Types.TimestampType.withZone())); + + Schema avroSchema = + SchemaBuilder.record("test_tz") + .fields() + .name("timestamp_nanos_tz") + .type(LogicalTypes.timestampNanos().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .name("timestamp_micros_tz") + .type(LogicalTypes.timestampMicros().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .name("timestamp_millis_tz") + .type(LogicalTypes.timestampMillis().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .endRecord(); + + avroSchema.getField("timestamp_nanos_tz").addProp("field-id", 1); + avroSchema.getField("timestamp_micros_tz").addProp("field-id", 2); + avroSchema.getField("timestamp_millis_tz").addProp("field-id", 3); + + DataReader reader = DataReader.create(icebergSchema, avroSchema); + reader.setSchema(avroSchema); + + // post-epoch timestamps + GenericRecord avroRecord = new GenericData.Record(avroSchema); + + OffsetDateTime offsetTimestampNanos = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123456789, ZoneOffset.ofHours(-8)); + OffsetDateTime offsetTimestampMicros = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123456000, ZoneOffset.ofHours(5)); + OffsetDateTime offsetTimestampMillis = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123000000, ZoneOffset.ofHours(-3)); + + avroRecord.put("timestamp_nanos_tz", DateTimeUtil.nanosFromTimestamptz(offsetTimestampNanos)); + avroRecord.put( + "timestamp_micros_tz", DateTimeUtil.microsFromTimestamptz(offsetTimestampMicros)); + avroRecord.put( + "timestamp_millis_tz", DateTimeUtil.millisFromTimestamptz(offsetTimestampMillis)); + + Record result = readRecord(reader, avroSchema, avroRecord); + + assertThat(result.getField("timestamp_nanos_tz")) + .isEqualTo(offsetTimestampNanos.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(result.getField("timestamp_micros_tz")) + .isEqualTo(offsetTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(result.getField("timestamp_millis_tz")) + .isEqualTo(offsetTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC)); + + // pre-epoch timestamps + GenericRecord preEpochRecord = new GenericData.Record(avroSchema); + + OffsetDateTime preEpochTimestampNanos = + OffsetDateTime.of(1969, 1, 1, 10, 11, 12, 123456789, ZoneOffset.ofHours(-8)); + OffsetDateTime preEpochTimestampMicros = + OffsetDateTime.of(1968, 1, 1, 10, 11, 12, 123456000, ZoneOffset.ofHours(5)); + OffsetDateTime preEpochTimestampMillis = + OffsetDateTime.of(1967, 1, 1, 10, 11, 12, 123000000, ZoneOffset.ofHours(-3)); + + preEpochRecord.put( + "timestamp_nanos_tz", DateTimeUtil.nanosFromTimestamptz(preEpochTimestampNanos)); + preEpochRecord.put( + "timestamp_micros_tz", DateTimeUtil.microsFromTimestamptz(preEpochTimestampMicros)); + preEpochRecord.put( + "timestamp_millis_tz", DateTimeUtil.millisFromTimestamptz(preEpochTimestampMillis)); + + Record preEpochResult = readRecord(reader, avroSchema, preEpochRecord); + + assertThat(preEpochResult.getField("timestamp_nanos_tz")) + .isEqualTo(preEpochTimestampNanos.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(preEpochResult.getField("timestamp_micros_tz")) + .isEqualTo(preEpochTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(preEpochResult.getField("timestamp_millis_tz")) + .isEqualTo(preEpochTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC)); + } + + private Record readRecord(DataReader reader, Schema avroSchema, GenericRecord avroRecord) + throws IOException { + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + GenericDatumWriter writer = new GenericDatumWriter<>(avroSchema); + writer.write(avroRecord, encoder); + encoder.flush(); + + try (ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray())) { + return reader.read(null, DecoderFactory.get().binaryDecoder(in, null)); + } + } + } + + private Schema utcAdjustedLongSchema() { + Schema schema = Schema.create(Schema.Type.LONG); + schema.addProp(ADJUST_TO_UTC_PROP, "true"); + return schema; + } +} diff --git a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java new file mode 100644 index 000000000000..438a9202cc6d --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.avro; + +import static org.apache.iceberg.avro.AvroSchemaUtil.ADJUST_TO_UTC_PROP; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.jupiter.api.Test; + +public class TestPlannedDataReader { + + @Test + public void timestampDataReader() throws IOException { + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "timestamp_nanos", Types.TimestampType.withoutZone()), + Types.NestedField.required(2, "timestamp_micros", Types.TimestampType.withoutZone()), + Types.NestedField.required(3, "timestamp_millis", Types.TimestampType.withoutZone())); + + Schema avroSchema = + SchemaBuilder.record("test_programmatic") + .fields() + .name("timestamp_nanos") + .type(LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestamp_micros") + .type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestamp_millis") + .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .endRecord(); + + avroSchema.getField("timestamp_nanos").addProp("field-id", 1); + avroSchema.getField("timestamp_micros").addProp("field-id", 2); + avroSchema.getField("timestamp_millis").addProp("field-id", 3); + + PlannedDataReader reader = PlannedDataReader.create(icebergSchema); + reader.setSchema(avroSchema); + + // post-epoch timestamps + GenericRecord avroRecord = new GenericData.Record(avroSchema); + LocalDateTime timestampNanos = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456789); + LocalDateTime timestampMicros = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456000); + LocalDateTime timestampMillis = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123000000); + + avroRecord.put("timestamp_nanos", DateTimeUtil.nanosFromTimestamp(timestampNanos)); + avroRecord.put("timestamp_micros", DateTimeUtil.microsFromTimestamp(timestampMicros)); + avroRecord.put("timestamp_millis", DateTimeUtil.millisFromTimestamp(timestampMillis)); + + Record result = readRecord(reader, avroSchema, avroRecord); + + assertThat(result.getField("timestamp_nanos")).isEqualTo(timestampNanos); + assertThat(result.getField("timestamp_micros")).isEqualTo(timestampMicros); + assertThat(result.getField("timestamp_millis")).isEqualTo(timestampMillis); + + // pre-epoch timestamps + GenericRecord preEpochRecord = new GenericData.Record(avroSchema); + LocalDateTime preEpochNanos = LocalDateTime.of(1969, 1, 1, 10, 11, 12, 123456789); + LocalDateTime preEpochMicros = LocalDateTime.of(1968, 1, 1, 10, 11, 12, 123456000); + LocalDateTime preEpochMillis = LocalDateTime.of(1967, 1, 1, 10, 11, 12, 123000000); + + preEpochRecord.put("timestamp_nanos", DateTimeUtil.nanosFromTimestamp(preEpochNanos)); + preEpochRecord.put("timestamp_micros", DateTimeUtil.microsFromTimestamp(preEpochMicros)); + preEpochRecord.put("timestamp_millis", DateTimeUtil.millisFromTimestamp(preEpochMillis)); + + Record preEpochResult = readRecord(reader, avroSchema, preEpochRecord); + + assertThat(preEpochResult.getField("timestamp_nanos")).isEqualTo(preEpochNanos); + assertThat(preEpochResult.getField("timestamp_micros")).isEqualTo(preEpochMicros); + assertThat(preEpochResult.getField("timestamp_millis")).isEqualTo(preEpochMillis); + } + + @Test + public void timestampTzDataReader() throws IOException { + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "timestamp_nanos_tz", Types.TimestampType.withZone()), + Types.NestedField.required(2, "timestamp_micros_tz", Types.TimestampType.withZone()), + Types.NestedField.required(3, "timestamp_millis_tz", Types.TimestampType.withZone())); + + Schema avroSchema = + SchemaBuilder.record("test_tz") + .fields() + .name("timestamp_nanos_tz") + .type(LogicalTypes.timestampNanos().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .name("timestamp_micros_tz") + .type(LogicalTypes.timestampMicros().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .name("timestamp_millis_tz") + .type(LogicalTypes.timestampMillis().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .endRecord(); + + avroSchema.getField("timestamp_nanos_tz").addProp("field-id", 1); + avroSchema.getField("timestamp_micros_tz").addProp("field-id", 2); + avroSchema.getField("timestamp_millis_tz").addProp("field-id", 3); + + PlannedDataReader reader = PlannedDataReader.create(icebergSchema); + reader.setSchema(avroSchema); + + // post-epoch timestamps + GenericRecord avroRecord = new GenericData.Record(avroSchema); + + OffsetDateTime offsetTimestampNanos = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123456789, ZoneOffset.ofHours(-8)); + OffsetDateTime offsetTimestampMicros = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123456000, ZoneOffset.ofHours(5)); + OffsetDateTime offsetTimestampMillis = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123000000, ZoneOffset.ofHours(-3)); + + avroRecord.put("timestamp_nanos_tz", DateTimeUtil.nanosFromTimestamptz(offsetTimestampNanos)); + avroRecord.put( + "timestamp_micros_tz", DateTimeUtil.microsFromTimestamptz(offsetTimestampMicros)); + avroRecord.put( + "timestamp_millis_tz", DateTimeUtil.millisFromTimestamptz(offsetTimestampMillis)); + + Record result = readRecord(reader, avroSchema, avroRecord); + + assertThat(result.getField("timestamp_nanos_tz")) + .isEqualTo(offsetTimestampNanos.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(result.getField("timestamp_micros_tz")) + .isEqualTo(offsetTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(result.getField("timestamp_millis_tz")) + .isEqualTo(offsetTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC)); + + // pre-epoch timestamps + GenericRecord preEpochRecord = new GenericData.Record(avroSchema); + + OffsetDateTime preEpochTimestampNanos = + OffsetDateTime.of(1969, 1, 1, 10, 11, 12, 123456789, ZoneOffset.ofHours(-8)); + OffsetDateTime preEpochTimestampMicros = + OffsetDateTime.of(1968, 1, 1, 10, 11, 12, 123456000, ZoneOffset.ofHours(5)); + OffsetDateTime preEpochTimestampMillis = + OffsetDateTime.of(1967, 1, 1, 10, 11, 12, 123000000, ZoneOffset.ofHours(-3)); + + preEpochRecord.put( + "timestamp_nanos_tz", DateTimeUtil.nanosFromTimestamptz(preEpochTimestampNanos)); + preEpochRecord.put( + "timestamp_micros_tz", DateTimeUtil.microsFromTimestamptz(preEpochTimestampMicros)); + preEpochRecord.put( + "timestamp_millis_tz", DateTimeUtil.millisFromTimestamptz(preEpochTimestampMillis)); + + Record preEpochResult = readRecord(reader, avroSchema, preEpochRecord); + + assertThat(preEpochResult.getField("timestamp_nanos_tz")) + .isEqualTo(preEpochTimestampNanos.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(preEpochResult.getField("timestamp_micros_tz")) + .isEqualTo(preEpochTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(preEpochResult.getField("timestamp_millis_tz")) + .isEqualTo(preEpochTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC)); + } + + private Record readRecord( + PlannedDataReader reader, Schema avroSchema, GenericRecord avroRecord) + throws IOException { + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + GenericDatumWriter writer = new GenericDatumWriter<>(avroSchema); + writer.write(avroRecord, encoder); + encoder.flush(); + + try (ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray())) { + return reader.read(null, DecoderFactory.get().binaryDecoder(in, null)); + } + } + } + + private Schema utcAdjustedLongSchema() { + Schema schema = Schema.create(Schema.Type.LONG); + schema.addProp(ADJUST_TO_UTC_PROP, "true"); + return schema; + } +}