From 12e97a54490761e943931b0c5f85f410d4647106 Mon Sep 17 00:00:00 2001 From: James Faulkner Date: Wed, 22 Oct 2025 15:10:22 +0100 Subject: [PATCH 1/6] Core: Add support for Avro logical type timestamp-millis --- .../org/apache/iceberg/util/DateTimeUtil.java | 16 ++++++++++ .../apache/iceberg/avro/BaseWriteBuilder.java | 3 ++ .../apache/iceberg/data/avro/DataReader.java | 6 ++++ .../apache/iceberg/data/avro/DataWriter.java | 6 ++++ .../iceberg/data/avro/GenericReaders.java | 30 +++++++++++++++++++ .../iceberg/data/avro/GenericWriters.java | 30 +++++++++++++++++++ .../iceberg/data/avro/PlannedDataReader.java | 6 ++++ 7 files changed, 97 insertions(+) diff --git a/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java b/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java index 8acff6de0563..5e08a3a3085d 100644 --- a/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java +++ b/api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java @@ -74,6 +74,10 @@ public static LocalDateTime timestampFromNanos(long nanosFromEpoch) { return ChronoUnit.NANOS.addTo(EPOCH, nanosFromEpoch).toLocalDateTime(); } + public static LocalDateTime timestampFromMillis(long millisFromEpoch) { + return ChronoUnit.MILLIS.addTo(EPOCH, millisFromEpoch).toLocalDateTime(); + } + public static long microsFromInstant(Instant instant) { return ChronoUnit.MICROS.between(EPOCH, instant.atOffset(ZoneOffset.UTC)); } @@ -86,6 +90,10 @@ public static long nanosFromTimestamp(LocalDateTime dateTime) { return ChronoUnit.NANOS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC)); } + public static long millisFromTimestamp(LocalDateTime dateTime) { + return ChronoUnit.MILLIS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC)); + } + public static long microsToMillis(long micros) { // When the timestamp is negative, i.e before 1970, we need to adjust the milliseconds portion. // Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision. @@ -109,6 +117,10 @@ public static OffsetDateTime timestamptzFromNanos(long nanosFromEpoch) { return ChronoUnit.NANOS.addTo(EPOCH, nanosFromEpoch); } + public static OffsetDateTime timestamptzFromMillis(long millisFromEpoch) { + return ChronoUnit.MILLIS.addTo(EPOCH, millisFromEpoch); + } + public static long microsFromTimestamptz(OffsetDateTime dateTime) { return ChronoUnit.MICROS.between(EPOCH, dateTime); } @@ -117,6 +129,10 @@ public static long nanosFromTimestamptz(OffsetDateTime dateTime) { return ChronoUnit.NANOS.between(EPOCH, dateTime); } + public static long millisFromTimestamptz(OffsetDateTime dateTime) { + return ChronoUnit.MILLIS.between(EPOCH, dateTime); + } + public static String formatTimestampMillis(long millis) { return Instant.ofEpochMilli(millis).toString().replace("Z", "+00:00"); } diff --git a/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java index 51e02eaade82..3c8fca7bb709 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java +++ b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java @@ -87,6 +87,9 @@ public ValueWriter primitive(Schema primitive) { case "timestamp-nanos": return ValueWriters.longs(); + case "timestamp-millis": + return ValueWriters.longs(); + case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java index 75929dfde4c4..eefb67cf3584 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataReader.java @@ -146,6 +146,12 @@ public ValueReader primitive(Type.PrimitiveType ignored, Schema primitive) { } return GenericReaders.timestampNanos(); + case "timestamp-millis": + if (AvroSchemaUtil.isTimestamptz(primitive)) { + return GenericReaders.timestamptzMillis(); + } + return GenericReaders.timestampMillis(); + case "decimal": return ValueReaders.decimal( ValueReaders.decimalBytesReader(primitive), diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java index 397b38643b15..00cfe76b24c5 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java @@ -130,6 +130,12 @@ public ValueWriter primitive(Schema primitive) { } return GenericWriters.timestampNanos(); + case "timestamp-millis": + if (AvroSchemaUtil.isTimestamptz(primitive)) { + return GenericWriters.timestamptzMillis(); + } + return GenericWriters.timestampMillis(); + case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index d072a26820ab..e26aab94673c 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -61,6 +61,14 @@ static ValueReader timestamptzNanos() { return TimestamptzNanoReader.INSTANCE; } + static ValueReader timestampMillis() { + return TimestampMilliReader.INSTANCE; + } + + static ValueReader timestamptzMillis() { + return TimestamptzMilliReader.INSTANCE; + } + static ValueReader struct( List>> readPlan, StructType struct) { return new PlannedRecordReader(readPlan, struct); @@ -137,6 +145,28 @@ public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException { } } + private static class TimestampMilliReader implements ValueReader { + private static final TimestampMilliReader INSTANCE = new TimestampMilliReader(); + + private TimestampMilliReader() {} + + @Override + public LocalDateTime read(Decoder decoder, Object reuse) throws IOException { + return DateTimeUtil.timestampFromMillis(decoder.readLong()); + } + } + + private static class TimestamptzMilliReader implements ValueReader { + private static final TimestamptzMilliReader INSTANCE = new TimestamptzMilliReader(); + + private TimestamptzMilliReader() {} + + @Override + public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException { + return DateTimeUtil.timestamptzFromMillis(decoder.readLong()); + } + } + private static class PlannedRecordReader extends ValueReaders.PlannedStructReader { private final StructType structType; diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java index 6ba5e7ded44b..951a81c209e6 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java @@ -60,6 +60,14 @@ static ValueWriter timestamptzNanos() { return TimestamptzNanoWriter.INSTANCE; } + static ValueWriter timestampMillis() { + return TimestampMillisWriter.INSTANCE; + } + + static ValueWriter timestamptzMillis() { + return TimestamptzMillisWriter.INSTANCE; + } + static ValueWriter struct(List> writers) { return new GenericRecordWriter(writers); } @@ -133,6 +141,28 @@ public void write(OffsetDateTime timestamptz, Encoder encoder) throws IOExceptio } } + private static class TimestampMillisWriter implements ValueWriter { + private static final TimestampMillisWriter INSTANCE = new TimestampMillisWriter(); + + private TimestampMillisWriter() {} + + @Override + public void write(LocalDateTime timestamp, Encoder encoder) throws IOException { + encoder.writeLong(DateTimeUtil.millisFromTimestamp(timestamp)); + } + } + + private static class TimestamptzMillisWriter implements ValueWriter { + private static final TimestamptzMillisWriter INSTANCE = new TimestamptzMillisWriter(); + + private TimestamptzMillisWriter() {} + + @Override + public void write(OffsetDateTime timestamptz, Encoder encoder) throws IOException { + encoder.writeLong(DateTimeUtil.millisFromTimestamptz(timestamptz)); + } + } + private static class GenericRecordWriter extends ValueWriters.StructWriter { private GenericRecordWriter(List> writers) { super(writers); diff --git a/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java index 7e42c5be19bb..747907a2fb97 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/PlannedDataReader.java @@ -154,6 +154,12 @@ public ValueReader primitive(Type partner, Schema primitive) { } return GenericReaders.timestampNanos(); + case "timestamp-millis": + if (AvroSchemaUtil.isTimestamptz(primitive)) { + return GenericReaders.timestamptzMillis(); + } + return GenericReaders.timestampMillis(); + case "decimal": return ValueReaders.decimal( ValueReaders.decimalBytesReader(primitive), From 8cb017c140358bba10a01606d701a1709a06a5d2 Mon Sep 17 00:00:00 2001 From: James Faulkner Date: Wed, 22 Oct 2025 17:25:18 +0100 Subject: [PATCH 2/6] Core: Added test for different logical types --- .../data/avro/TestPlannedDataReader.java | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) create mode 100644 core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java diff --git a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java new file mode 100644 index 000000000000..05b30472bd5b --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.avro; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.LocalDateTime; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.jupiter.api.Test; + +public class TestPlannedDataReader { + + @Test + public void testTimestampDataReader() throws IOException { + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "timestamp_nanos", Types.TimestampType.withoutZone()), + Types.NestedField.required(2, "timestamp_micros", Types.TimestampType.withoutZone()), + Types.NestedField.required(3, "timestamp_millis", Types.TimestampType.withoutZone())); + + Schema avroSchema = + SchemaBuilder.record("test_programmatic") + .fields() + .name("timestamp_nanos") + .type(LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestamp_micros") + .type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestamp_millis") + .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .endRecord(); + + avroSchema.getField("timestamp_nanos").addProp("field-id", 1); + avroSchema.getField("timestamp_micros").addProp("field-id", 2); + avroSchema.getField("timestamp_millis").addProp("field-id", 3); + + PlannedDataReader reader = PlannedDataReader.create(icebergSchema); + reader.setSchema(avroSchema); + + GenericRecord avroRecord = new GenericData.Record(avroSchema); + LocalDateTime timestampNanos = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456789); + LocalDateTime timestampMicros = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456000); + LocalDateTime timestampMillis = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123000000); + + avroRecord.put("timestamp_nanos", DateTimeUtil.nanosFromTimestamp(timestampNanos)); + avroRecord.put("timestamp_micros", DateTimeUtil.microsFromTimestamp(timestampMicros)); + avroRecord.put("timestamp_millis", DateTimeUtil.millisFromTimestamp(timestampMillis)); + + Record result = readRecord(reader, avroSchema, avroRecord); + + assertThat(result.getField("timestamp_nanos")).isEqualTo(timestampNanos); + assertThat(result.getField("timestamp_micros")).isEqualTo(timestampMicros); + assertThat(result.getField("timestamp_millis")).isEqualTo(timestampMillis); + } + + private Record readRecord( + PlannedDataReader reader, Schema avroSchema, GenericRecord avroRecord) + throws IOException { + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + GenericDatumWriter writer = new GenericDatumWriter<>(avroSchema); + writer.write(avroRecord, encoder); + encoder.flush(); + + try (ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray())) { + return reader.read(null, DecoderFactory.get().binaryDecoder(in, null)); + } + } + } +} From 13d7c02035d5b689725b909986385340ff3c7c8a Mon Sep 17 00:00:00 2001 From: James Faulkner Date: Mon, 3 Nov 2025 14:21:59 +0000 Subject: [PATCH 3/6] Core: PR Comments: Added tests for DateTimeUtils --- .../apache/iceberg/util/TestDateTimeUtil.java | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/api/src/test/java/org/apache/iceberg/util/TestDateTimeUtil.java b/api/src/test/java/org/apache/iceberg/util/TestDateTimeUtil.java index 5799158a8734..02ee8252d5f0 100644 --- a/api/src/test/java/org/apache/iceberg/util/TestDateTimeUtil.java +++ b/api/src/test/java/org/apache/iceberg/util/TestDateTimeUtil.java @@ -20,6 +20,7 @@ import static org.assertj.core.api.Assertions.assertThat; +import java.time.LocalDateTime; import org.apache.iceberg.transforms.Transforms; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; @@ -96,4 +97,24 @@ public void hourToDaysPositive() { assertThat(DateTimeUtil.hoursToDays(DateTimeUtil.microsToHours(micros))) .isEqualTo(expectedDays); } + + @Test + public void timestampFromMillis() { + assertThat(DateTimeUtil.timestampFromMillis(1510871468000L)) + .isEqualTo(LocalDateTime.parse("2017-11-16T22:31:08")); + assertThat(DateTimeUtil.timestampFromMillis(-1510871468000L)) + .isEqualTo(LocalDateTime.parse("1922-02-15T01:28:52")); + assertThat(DateTimeUtil.timestampFromMillis(0L)) + .isEqualTo(LocalDateTime.parse("1970-01-01T00:00")); + } + + @Test + public void millisFromTimestamp() { + assertThat(DateTimeUtil.millisFromTimestamp(LocalDateTime.parse("2017-11-16T22:31:08"))) + .isEqualTo(1510871468000L); + assertThat(DateTimeUtil.millisFromTimestamp(LocalDateTime.parse("1922-02-15T01:28:52"))) + .isEqualTo(-1510871468000L); + assertThat(DateTimeUtil.millisFromTimestamp(LocalDateTime.parse("1970-01-01T00:00"))) + .isEqualTo(0L); + } } From 4bcc0823302f1800e810ae7c2923d2515dab012b Mon Sep 17 00:00:00 2001 From: James Faulkner Date: Wed, 5 Nov 2025 15:53:19 +0000 Subject: [PATCH 4/6] Core: PR Comments: - Removed timestamp-millis from Writers - Added tests for DataWriter and tz tests --- .../apache/iceberg/avro/BaseWriteBuilder.java | 3 - .../apache/iceberg/data/avro/DataWriter.java | 6 - .../iceberg/data/avro/GenericReaders.java | 16 +- .../iceberg/data/avro/TestDataReader.java | 164 ++++++++++++++++++ .../data/avro/TestPlannedDataReader.java | 63 +++++++ 5 files changed, 235 insertions(+), 17 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java diff --git a/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java index 3c8fca7bb709..51e02eaade82 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java +++ b/core/src/main/java/org/apache/iceberg/avro/BaseWriteBuilder.java @@ -87,9 +87,6 @@ public ValueWriter primitive(Schema primitive) { case "timestamp-nanos": return ValueWriters.longs(); - case "timestamp-millis": - return ValueWriters.longs(); - case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java index 00cfe76b24c5..397b38643b15 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java @@ -130,12 +130,6 @@ public ValueWriter primitive(Schema primitive) { } return GenericWriters.timestampNanos(); - case "timestamp-millis": - if (AvroSchemaUtil.isTimestamptz(primitive)) { - return GenericWriters.timestamptzMillis(); - } - return GenericWriters.timestampMillis(); - case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java index e26aab94673c..b31fb96fbcb6 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericReaders.java @@ -62,11 +62,11 @@ static ValueReader timestamptzNanos() { } static ValueReader timestampMillis() { - return TimestampMilliReader.INSTANCE; + return TimestampMillisReader.INSTANCE; } static ValueReader timestamptzMillis() { - return TimestamptzMilliReader.INSTANCE; + return TimestamptzMillisReader.INSTANCE; } static ValueReader struct( @@ -145,10 +145,10 @@ public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException { } } - private static class TimestampMilliReader implements ValueReader { - private static final TimestampMilliReader INSTANCE = new TimestampMilliReader(); + private static class TimestampMillisReader implements ValueReader { + private static final TimestampMillisReader INSTANCE = new TimestampMillisReader(); - private TimestampMilliReader() {} + private TimestampMillisReader() {} @Override public LocalDateTime read(Decoder decoder, Object reuse) throws IOException { @@ -156,10 +156,10 @@ public LocalDateTime read(Decoder decoder, Object reuse) throws IOException { } } - private static class TimestamptzMilliReader implements ValueReader { - private static final TimestamptzMilliReader INSTANCE = new TimestamptzMilliReader(); + private static class TimestamptzMillisReader implements ValueReader { + private static final TimestamptzMillisReader INSTANCE = new TimestamptzMillisReader(); - private TimestamptzMilliReader() {} + private TimestamptzMillisReader() {} @Override public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException { diff --git a/core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java b/core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java new file mode 100644 index 000000000000..a66e6e16ac2c --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.data.avro; + +import static org.apache.iceberg.avro.AvroSchemaUtil.ADJUST_TO_UTC_PROP; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.jupiter.api.Test; + +class TestDataReader { + + @Test + public void testTimestampDataReader() throws IOException { + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "timestamp_nanos", Types.TimestampType.withoutZone()), + Types.NestedField.required(2, "timestamp_micros", Types.TimestampType.withoutZone()), + Types.NestedField.required(3, "timestamp_millis", Types.TimestampType.withoutZone())); + + Schema avroSchema = + SchemaBuilder.record("test_programmatic") + .fields() + .name("timestamp_nanos") + .type(LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestamp_micros") + .type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .name("timestamp_millis") + .type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG))) + .noDefault() + .endRecord(); + + avroSchema.getField("timestamp_nanos").addProp("field-id", 1); + avroSchema.getField("timestamp_micros").addProp("field-id", 2); + avroSchema.getField("timestamp_millis").addProp("field-id", 3); + + DataReader reader = DataReader.create(icebergSchema, avroSchema); + reader.setSchema(avroSchema); + + GenericRecord avroRecord = new GenericData.Record(avroSchema); + LocalDateTime timestampNanos = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456789); + LocalDateTime timestampMicros = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456000); + LocalDateTime timestampMillis = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123000000); + + avroRecord.put("timestamp_nanos", DateTimeUtil.nanosFromTimestamp(timestampNanos)); + avroRecord.put("timestamp_micros", DateTimeUtil.microsFromTimestamp(timestampMicros)); + avroRecord.put("timestamp_millis", DateTimeUtil.millisFromTimestamp(timestampMillis)); + + Record result = readRecord(reader, avroSchema, avroRecord); + + assertThat(result.getField("timestamp_nanos")).isEqualTo(timestampNanos); + assertThat(result.getField("timestamp_micros")).isEqualTo(timestampMicros); + assertThat(result.getField("timestamp_millis")).isEqualTo(timestampMillis); + } + + @Test + public void testTimestampTzDataReader() throws IOException { + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "timestamp_nanos_tz", Types.TimestampType.withZone()), + Types.NestedField.required(2, "timestamp_micros_tz", Types.TimestampType.withZone()), + Types.NestedField.required(3, "timestamp_millis_tz", Types.TimestampType.withZone())); + + Schema avroSchema = + SchemaBuilder.record("test_tz") + .fields() + .name("timestamp_nanos_tz") + .type(LogicalTypes.timestampNanos().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .name("timestamp_micros_tz") + .type(LogicalTypes.timestampMicros().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .name("timestamp_millis_tz") + .type(LogicalTypes.timestampMillis().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .endRecord(); + + avroSchema.getField("timestamp_nanos_tz").addProp("field-id", 1); + avroSchema.getField("timestamp_micros_tz").addProp("field-id", 2); + avroSchema.getField("timestamp_millis_tz").addProp("field-id", 3); + + DataReader reader = DataReader.create(icebergSchema, avroSchema); + reader.setSchema(avroSchema); + + GenericRecord avroRecord = new GenericData.Record(avroSchema); + + OffsetDateTime offsetTimestampNanos = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123456789, ZoneOffset.ofHours(-8)); + OffsetDateTime offsetTimestampMicros = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123456000, ZoneOffset.ofHours(5)); + OffsetDateTime offsetTimestampMillis = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123000000, ZoneOffset.ofHours(-3)); + + avroRecord.put("timestamp_nanos_tz", DateTimeUtil.nanosFromTimestamptz(offsetTimestampNanos)); + avroRecord.put( + "timestamp_micros_tz", DateTimeUtil.microsFromTimestamptz(offsetTimestampMicros)); + avroRecord.put( + "timestamp_millis_tz", DateTimeUtil.millisFromTimestamptz(offsetTimestampMillis)); + + Record result = readRecord(reader, avroSchema, avroRecord); + + assertThat(result.getField("timestamp_nanos_tz")) + .isEqualTo(offsetTimestampNanos.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(result.getField("timestamp_micros_tz")) + .isEqualTo(offsetTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(result.getField("timestamp_millis_tz")) + .isEqualTo(offsetTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC)); + } + + private Record readRecord(DataReader reader, Schema avroSchema, GenericRecord avroRecord) + throws IOException { + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + GenericDatumWriter writer = new GenericDatumWriter<>(avroSchema); + writer.write(avroRecord, encoder); + encoder.flush(); + + try (ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray())) { + return reader.read(null, DecoderFactory.get().binaryDecoder(in, null)); + } + } + } + + private Schema utcAdjustedLongSchema() { + Schema schema = Schema.create(Schema.Type.LONG); + schema.addProp(ADJUST_TO_UTC_PROP, "true"); + return schema; + } +} diff --git a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java index 05b30472bd5b..7631294371a8 100644 --- a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java +++ b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java @@ -18,12 +18,15 @@ */ package org.apache.iceberg.data.avro; +import static org.apache.iceberg.avro.AvroSchemaUtil.ADJUST_TO_UTC_PROP; import static org.assertj.core.api.Assertions.assertThat; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; @@ -85,6 +88,60 @@ public void testTimestampDataReader() throws IOException { assertThat(result.getField("timestamp_millis")).isEqualTo(timestampMillis); } + @Test + public void testTimestampTzDataReader() throws IOException { + org.apache.iceberg.Schema icebergSchema = + new org.apache.iceberg.Schema( + Types.NestedField.required(1, "timestamp_nanos_tz", Types.TimestampType.withZone()), + Types.NestedField.required(2, "timestamp_micros_tz", Types.TimestampType.withZone()), + Types.NestedField.required(3, "timestamp_millis_tz", Types.TimestampType.withZone())); + + Schema avroSchema = + SchemaBuilder.record("test_tz") + .fields() + .name("timestamp_nanos_tz") + .type(LogicalTypes.timestampNanos().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .name("timestamp_micros_tz") + .type(LogicalTypes.timestampMicros().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .name("timestamp_millis_tz") + .type(LogicalTypes.timestampMillis().addToSchema(utcAdjustedLongSchema())) + .noDefault() + .endRecord(); + + avroSchema.getField("timestamp_nanos_tz").addProp("field-id", 1); + avroSchema.getField("timestamp_micros_tz").addProp("field-id", 2); + avroSchema.getField("timestamp_millis_tz").addProp("field-id", 3); + + PlannedDataReader reader = PlannedDataReader.create(icebergSchema); + reader.setSchema(avroSchema); + + GenericRecord avroRecord = new GenericData.Record(avroSchema); + + OffsetDateTime offsetTimestampNanos = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123456789, ZoneOffset.ofHours(-8)); + OffsetDateTime offsetTimestampMicros = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123456000, ZoneOffset.ofHours(5)); + OffsetDateTime offsetTimestampMillis = + OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123000000, ZoneOffset.ofHours(-3)); + + avroRecord.put("timestamp_nanos_tz", DateTimeUtil.nanosFromTimestamptz(offsetTimestampNanos)); + avroRecord.put( + "timestamp_micros_tz", DateTimeUtil.microsFromTimestamptz(offsetTimestampMicros)); + avroRecord.put( + "timestamp_millis_tz", DateTimeUtil.millisFromTimestamptz(offsetTimestampMillis)); + + Record result = readRecord(reader, avroSchema, avroRecord); + + assertThat(result.getField("timestamp_nanos_tz")) + .isEqualTo(offsetTimestampNanos.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(result.getField("timestamp_micros_tz")) + .isEqualTo(offsetTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(result.getField("timestamp_millis_tz")) + .isEqualTo(offsetTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC)); + } + private Record readRecord( PlannedDataReader reader, Schema avroSchema, GenericRecord avroRecord) throws IOException { @@ -99,4 +156,10 @@ private Record readRecord( } } } + + private Schema utcAdjustedLongSchema() { + Schema schema = Schema.create(Schema.Type.LONG); + schema.addProp(ADJUST_TO_UTC_PROP, "true"); + return schema; + } } From 1553b1e33e533503f818d9cdf4cf0cec2f43e403 Mon Sep 17 00:00:00 2001 From: James Faulkner Date: Thu, 6 Nov 2025 16:46:22 +0000 Subject: [PATCH 5/6] Core: PR Comments: Remove changes to `GenericWriters` --- .../iceberg/data/avro/GenericWriters.java | 30 ------------------- 1 file changed, 30 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java index 951a81c209e6..6ba5e7ded44b 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java @@ -60,14 +60,6 @@ static ValueWriter timestamptzNanos() { return TimestamptzNanoWriter.INSTANCE; } - static ValueWriter timestampMillis() { - return TimestampMillisWriter.INSTANCE; - } - - static ValueWriter timestamptzMillis() { - return TimestamptzMillisWriter.INSTANCE; - } - static ValueWriter struct(List> writers) { return new GenericRecordWriter(writers); } @@ -141,28 +133,6 @@ public void write(OffsetDateTime timestamptz, Encoder encoder) throws IOExceptio } } - private static class TimestampMillisWriter implements ValueWriter { - private static final TimestampMillisWriter INSTANCE = new TimestampMillisWriter(); - - private TimestampMillisWriter() {} - - @Override - public void write(LocalDateTime timestamp, Encoder encoder) throws IOException { - encoder.writeLong(DateTimeUtil.millisFromTimestamp(timestamp)); - } - } - - private static class TimestamptzMillisWriter implements ValueWriter { - private static final TimestamptzMillisWriter INSTANCE = new TimestamptzMillisWriter(); - - private TimestamptzMillisWriter() {} - - @Override - public void write(OffsetDateTime timestamptz, Encoder encoder) throws IOException { - encoder.writeLong(DateTimeUtil.millisFromTimestamptz(timestamptz)); - } - } - private static class GenericRecordWriter extends ValueWriters.StructWriter { private GenericRecordWriter(List> writers) { super(writers); From 64c9cd02e099a91417cf414db0a2b19ef95be710 Mon Sep 17 00:00:00 2001 From: James Faulkner Date: Thu, 6 Nov 2025 16:47:30 +0000 Subject: [PATCH 6/6] Core: PR Comments: - Tests for pre epoch - remove the `test` prefix on the test methods --- .../iceberg/data/avro/TestDataReader.java | 48 ++++++++++++++++++- .../data/avro/TestPlannedDataReader.java | 48 ++++++++++++++++++- 2 files changed, 92 insertions(+), 4 deletions(-) diff --git a/core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java b/core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java index a66e6e16ac2c..966f01267d67 100644 --- a/core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java +++ b/core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java @@ -44,7 +44,7 @@ class TestDataReader { @Test - public void testTimestampDataReader() throws IOException { + public void timestampDataReader() throws IOException { org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema( Types.NestedField.required(1, "timestamp_nanos", Types.TimestampType.withoutZone()), @@ -72,6 +72,7 @@ public void testTimestampDataReader() throws IOException { DataReader reader = DataReader.create(icebergSchema, avroSchema); reader.setSchema(avroSchema); + // post-epoch timestamps GenericRecord avroRecord = new GenericData.Record(avroSchema); LocalDateTime timestampNanos = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456789); LocalDateTime timestampMicros = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456000); @@ -86,10 +87,26 @@ public void testTimestampDataReader() throws IOException { assertThat(result.getField("timestamp_nanos")).isEqualTo(timestampNanos); assertThat(result.getField("timestamp_micros")).isEqualTo(timestampMicros); assertThat(result.getField("timestamp_millis")).isEqualTo(timestampMillis); + + // pre-epoch timestamps + GenericRecord preEpochRecord = new GenericData.Record(avroSchema); + LocalDateTime preEpochNanos = LocalDateTime.of(1969, 1, 1, 10, 11, 12, 123456789); + LocalDateTime preEpochMicros = LocalDateTime.of(1968, 1, 1, 10, 11, 12, 123456000); + LocalDateTime preEpochMillis = LocalDateTime.of(1967, 1, 1, 10, 11, 12, 123000000); + + preEpochRecord.put("timestamp_nanos", DateTimeUtil.nanosFromTimestamp(preEpochNanos)); + preEpochRecord.put("timestamp_micros", DateTimeUtil.microsFromTimestamp(preEpochMicros)); + preEpochRecord.put("timestamp_millis", DateTimeUtil.millisFromTimestamp(preEpochMillis)); + + Record preEpochResult = readRecord(reader, avroSchema, preEpochRecord); + + assertThat(preEpochResult.getField("timestamp_nanos")).isEqualTo(preEpochNanos); + assertThat(preEpochResult.getField("timestamp_micros")).isEqualTo(preEpochMicros); + assertThat(preEpochResult.getField("timestamp_millis")).isEqualTo(preEpochMillis); } @Test - public void testTimestampTzDataReader() throws IOException { + public void timestampTzDataReader() throws IOException { org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema( Types.NestedField.required(1, "timestamp_nanos_tz", Types.TimestampType.withZone()), @@ -117,6 +134,7 @@ public void testTimestampTzDataReader() throws IOException { DataReader reader = DataReader.create(icebergSchema, avroSchema); reader.setSchema(avroSchema); + // post-epoch timestamps GenericRecord avroRecord = new GenericData.Record(avroSchema); OffsetDateTime offsetTimestampNanos = @@ -140,6 +158,32 @@ public void testTimestampTzDataReader() throws IOException { .isEqualTo(offsetTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC)); assertThat(result.getField("timestamp_millis_tz")) .isEqualTo(offsetTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC)); + + // pre-epoch timestamps + GenericRecord preEpochRecord = new GenericData.Record(avroSchema); + + OffsetDateTime preEpochTimestampNanos = + OffsetDateTime.of(1969, 1, 1, 10, 11, 12, 123456789, ZoneOffset.ofHours(-8)); + OffsetDateTime preEpochTimestampMicros = + OffsetDateTime.of(1968, 1, 1, 10, 11, 12, 123456000, ZoneOffset.ofHours(5)); + OffsetDateTime preEpochTimestampMillis = + OffsetDateTime.of(1967, 1, 1, 10, 11, 12, 123000000, ZoneOffset.ofHours(-3)); + + preEpochRecord.put( + "timestamp_nanos_tz", DateTimeUtil.nanosFromTimestamptz(preEpochTimestampNanos)); + preEpochRecord.put( + "timestamp_micros_tz", DateTimeUtil.microsFromTimestamptz(preEpochTimestampMicros)); + preEpochRecord.put( + "timestamp_millis_tz", DateTimeUtil.millisFromTimestamptz(preEpochTimestampMillis)); + + Record preEpochResult = readRecord(reader, avroSchema, preEpochRecord); + + assertThat(preEpochResult.getField("timestamp_nanos_tz")) + .isEqualTo(preEpochTimestampNanos.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(preEpochResult.getField("timestamp_micros_tz")) + .isEqualTo(preEpochTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(preEpochResult.getField("timestamp_millis_tz")) + .isEqualTo(preEpochTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC)); } private Record readRecord(DataReader reader, Schema avroSchema, GenericRecord avroRecord) diff --git a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java index 7631294371a8..438a9202cc6d 100644 --- a/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java +++ b/core/src/test/java/org/apache/iceberg/data/avro/TestPlannedDataReader.java @@ -44,7 +44,7 @@ public class TestPlannedDataReader { @Test - public void testTimestampDataReader() throws IOException { + public void timestampDataReader() throws IOException { org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema( Types.NestedField.required(1, "timestamp_nanos", Types.TimestampType.withoutZone()), @@ -72,6 +72,7 @@ public void testTimestampDataReader() throws IOException { PlannedDataReader reader = PlannedDataReader.create(icebergSchema); reader.setSchema(avroSchema); + // post-epoch timestamps GenericRecord avroRecord = new GenericData.Record(avroSchema); LocalDateTime timestampNanos = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456789); LocalDateTime timestampMicros = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456000); @@ -86,10 +87,26 @@ public void testTimestampDataReader() throws IOException { assertThat(result.getField("timestamp_nanos")).isEqualTo(timestampNanos); assertThat(result.getField("timestamp_micros")).isEqualTo(timestampMicros); assertThat(result.getField("timestamp_millis")).isEqualTo(timestampMillis); + + // pre-epoch timestamps + GenericRecord preEpochRecord = new GenericData.Record(avroSchema); + LocalDateTime preEpochNanos = LocalDateTime.of(1969, 1, 1, 10, 11, 12, 123456789); + LocalDateTime preEpochMicros = LocalDateTime.of(1968, 1, 1, 10, 11, 12, 123456000); + LocalDateTime preEpochMillis = LocalDateTime.of(1967, 1, 1, 10, 11, 12, 123000000); + + preEpochRecord.put("timestamp_nanos", DateTimeUtil.nanosFromTimestamp(preEpochNanos)); + preEpochRecord.put("timestamp_micros", DateTimeUtil.microsFromTimestamp(preEpochMicros)); + preEpochRecord.put("timestamp_millis", DateTimeUtil.millisFromTimestamp(preEpochMillis)); + + Record preEpochResult = readRecord(reader, avroSchema, preEpochRecord); + + assertThat(preEpochResult.getField("timestamp_nanos")).isEqualTo(preEpochNanos); + assertThat(preEpochResult.getField("timestamp_micros")).isEqualTo(preEpochMicros); + assertThat(preEpochResult.getField("timestamp_millis")).isEqualTo(preEpochMillis); } @Test - public void testTimestampTzDataReader() throws IOException { + public void timestampTzDataReader() throws IOException { org.apache.iceberg.Schema icebergSchema = new org.apache.iceberg.Schema( Types.NestedField.required(1, "timestamp_nanos_tz", Types.TimestampType.withZone()), @@ -117,6 +134,7 @@ public void testTimestampTzDataReader() throws IOException { PlannedDataReader reader = PlannedDataReader.create(icebergSchema); reader.setSchema(avroSchema); + // post-epoch timestamps GenericRecord avroRecord = new GenericData.Record(avroSchema); OffsetDateTime offsetTimestampNanos = @@ -140,6 +158,32 @@ public void testTimestampTzDataReader() throws IOException { .isEqualTo(offsetTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC)); assertThat(result.getField("timestamp_millis_tz")) .isEqualTo(offsetTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC)); + + // pre-epoch timestamps + GenericRecord preEpochRecord = new GenericData.Record(avroSchema); + + OffsetDateTime preEpochTimestampNanos = + OffsetDateTime.of(1969, 1, 1, 10, 11, 12, 123456789, ZoneOffset.ofHours(-8)); + OffsetDateTime preEpochTimestampMicros = + OffsetDateTime.of(1968, 1, 1, 10, 11, 12, 123456000, ZoneOffset.ofHours(5)); + OffsetDateTime preEpochTimestampMillis = + OffsetDateTime.of(1967, 1, 1, 10, 11, 12, 123000000, ZoneOffset.ofHours(-3)); + + preEpochRecord.put( + "timestamp_nanos_tz", DateTimeUtil.nanosFromTimestamptz(preEpochTimestampNanos)); + preEpochRecord.put( + "timestamp_micros_tz", DateTimeUtil.microsFromTimestamptz(preEpochTimestampMicros)); + preEpochRecord.put( + "timestamp_millis_tz", DateTimeUtil.millisFromTimestamptz(preEpochTimestampMillis)); + + Record preEpochResult = readRecord(reader, avroSchema, preEpochRecord); + + assertThat(preEpochResult.getField("timestamp_nanos_tz")) + .isEqualTo(preEpochTimestampNanos.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(preEpochResult.getField("timestamp_micros_tz")) + .isEqualTo(preEpochTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC)); + assertThat(preEpochResult.getField("timestamp_millis_tz")) + .isEqualTo(preEpochTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC)); } private Record readRecord(