Skip to content
16 changes: 16 additions & 0 deletions api/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ public static LocalDateTime timestampFromNanos(long nanosFromEpoch) {
return ChronoUnit.NANOS.addTo(EPOCH, nanosFromEpoch).toLocalDateTime();
}

public static LocalDateTime timestampFromMillis(long millisFromEpoch) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add tests for anything that's new to TestDateTimeUtil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return ChronoUnit.MILLIS.addTo(EPOCH, millisFromEpoch).toLocalDateTime();
}

public static long microsFromInstant(Instant instant) {
return ChronoUnit.MICROS.between(EPOCH, instant.atOffset(ZoneOffset.UTC));
}
Expand All @@ -86,6 +90,10 @@ public static long nanosFromTimestamp(LocalDateTime dateTime) {
return ChronoUnit.NANOS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
}

public static long millisFromTimestamp(LocalDateTime dateTime) {
return ChronoUnit.MILLIS.between(EPOCH, dateTime.atOffset(ZoneOffset.UTC));
}

public static long microsToMillis(long micros) {
// When the timestamp is negative, i.e before 1970, we need to adjust the milliseconds portion.
// Example - 1965-01-01 10:11:12.123456 is represented as (-157700927876544) in micro precision.
Expand All @@ -109,6 +117,10 @@ public static OffsetDateTime timestamptzFromNanos(long nanosFromEpoch) {
return ChronoUnit.NANOS.addTo(EPOCH, nanosFromEpoch);
}

public static OffsetDateTime timestamptzFromMillis(long millisFromEpoch) {
return ChronoUnit.MILLIS.addTo(EPOCH, millisFromEpoch);
}

public static long microsFromTimestamptz(OffsetDateTime dateTime) {
return ChronoUnit.MICROS.between(EPOCH, dateTime);
}
Expand All @@ -117,6 +129,10 @@ public static long nanosFromTimestamptz(OffsetDateTime dateTime) {
return ChronoUnit.NANOS.between(EPOCH, dateTime);
}

public static long millisFromTimestamptz(OffsetDateTime dateTime) {
return ChronoUnit.MILLIS.between(EPOCH, dateTime);
}

public static String formatTimestampMillis(long millis) {
return Instant.ofEpochMilli(millis).toString().replace("Z", "+00:00");
}
Expand Down
21 changes: 21 additions & 0 deletions api/src/test/java/org/apache/iceberg/util/TestDateTimeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.time.LocalDateTime;
import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -96,4 +97,24 @@ public void hourToDaysPositive() {
assertThat(DateTimeUtil.hoursToDays(DateTimeUtil.microsToHours(micros)))
.isEqualTo(expectedDays);
}

@Test
public void timestampFromMillis() {
assertThat(DateTimeUtil.timestampFromMillis(1510871468000L))
.isEqualTo(LocalDateTime.parse("2017-11-16T22:31:08"));
assertThat(DateTimeUtil.timestampFromMillis(-1510871468000L))
.isEqualTo(LocalDateTime.parse("1922-02-15T01:28:52"));
assertThat(DateTimeUtil.timestampFromMillis(0L))
.isEqualTo(LocalDateTime.parse("1970-01-01T00:00"));
}

@Test
public void millisFromTimestamp() {
assertThat(DateTimeUtil.millisFromTimestamp(LocalDateTime.parse("2017-11-16T22:31:08")))
.isEqualTo(1510871468000L);
assertThat(DateTimeUtil.millisFromTimestamp(LocalDateTime.parse("1922-02-15T01:28:52")))
.isEqualTo(-1510871468000L);
assertThat(DateTimeUtil.millisFromTimestamp(LocalDateTime.parse("1970-01-01T00:00")))
.isEqualTo(0L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ public ValueReader<?> primitive(Type.PrimitiveType ignored, Schema primitive) {
}
return GenericReaders.timestampNanos();

case "timestamp-millis":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also need a test for this code path

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the same tests we had for PlannedDataReader. Also added tests for the TZ code path too.

if (AvroSchemaUtil.isTimestamptz(primitive)) {
return GenericReaders.timestamptzMillis();
}
return GenericReaders.timestampMillis();

case "decimal":
return ValueReaders.decimal(
ValueReaders.decimalBytesReader(primitive),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ static ValueReader<OffsetDateTime> timestamptzNanos() {
return TimestamptzNanoReader.INSTANCE;
}

static ValueReader<LocalDateTime> timestampMillis() {
return TimestampMillisReader.INSTANCE;
}

static ValueReader<OffsetDateTime> timestamptzMillis() {
return TimestamptzMillisReader.INSTANCE;
}

static ValueReader<Record> struct(
List<Pair<Integer, ValueReader<?>>> readPlan, StructType struct) {
return new PlannedRecordReader(readPlan, struct);
Expand Down Expand Up @@ -137,6 +145,28 @@ public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException {
}
}

private static class TimestampMillisReader implements ValueReader<LocalDateTime> {
private static final TimestampMillisReader INSTANCE = new TimestampMillisReader();

private TimestampMillisReader() {}

@Override
public LocalDateTime read(Decoder decoder, Object reuse) throws IOException {
return DateTimeUtil.timestampFromMillis(decoder.readLong());
}
}

private static class TimestamptzMillisReader implements ValueReader<OffsetDateTime> {
private static final TimestamptzMillisReader INSTANCE = new TimestamptzMillisReader();

private TimestamptzMillisReader() {}

@Override
public OffsetDateTime read(Decoder decoder, Object reuse) throws IOException {
return DateTimeUtil.timestamptzFromMillis(decoder.readLong());
}
}

private static class PlannedRecordReader extends ValueReaders.PlannedStructReader<Record> {
private final StructType structType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ public ValueReader<?> primitive(Type partner, Schema primitive) {
}
return GenericReaders.timestampNanos();

case "timestamp-millis":
if (AvroSchemaUtil.isTimestamptz(primitive)) {
return GenericReaders.timestamptzMillis();
}
return GenericReaders.timestampMillis();

case "decimal":
return ValueReaders.decimal(
ValueReaders.decimalBytesReader(primitive),
Expand Down
208 changes: 208 additions & 0 deletions core/src/test/java/org/apache/iceberg/data/avro/TestDataReader.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.data.avro;

import static org.apache.iceberg.avro.AvroSchemaUtil.ADJUST_TO_UTC_PROP;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.junit.jupiter.api.Test;

class TestDataReader {

@Test
public void timestampDataReader() throws IOException {
org.apache.iceberg.Schema icebergSchema =
new org.apache.iceberg.Schema(
Types.NestedField.required(1, "timestamp_nanos", Types.TimestampType.withoutZone()),
Types.NestedField.required(2, "timestamp_micros", Types.TimestampType.withoutZone()),
Types.NestedField.required(3, "timestamp_millis", Types.TimestampType.withoutZone()));

Schema avroSchema =
SchemaBuilder.record("test_programmatic")
.fields()
.name("timestamp_nanos")
.type(LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG)))
.noDefault()
.name("timestamp_micros")
.type(LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)))
.noDefault()
.name("timestamp_millis")
.type(LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)))
.noDefault()
.endRecord();

avroSchema.getField("timestamp_nanos").addProp("field-id", 1);
avroSchema.getField("timestamp_micros").addProp("field-id", 2);
avroSchema.getField("timestamp_millis").addProp("field-id", 3);

DataReader<Record> reader = DataReader.create(icebergSchema, avroSchema);
reader.setSchema(avroSchema);

// post-epoch timestamps
GenericRecord avroRecord = new GenericData.Record(avroSchema);
LocalDateTime timestampNanos = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456789);
LocalDateTime timestampMicros = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123456000);
LocalDateTime timestampMillis = LocalDateTime.of(2023, 10, 15, 14, 30, 45, 123000000);

avroRecord.put("timestamp_nanos", DateTimeUtil.nanosFromTimestamp(timestampNanos));
avroRecord.put("timestamp_micros", DateTimeUtil.microsFromTimestamp(timestampMicros));
avroRecord.put("timestamp_millis", DateTimeUtil.millisFromTimestamp(timestampMillis));

Record result = readRecord(reader, avroSchema, avroRecord);

assertThat(result.getField("timestamp_nanos")).isEqualTo(timestampNanos);
assertThat(result.getField("timestamp_micros")).isEqualTo(timestampMicros);
assertThat(result.getField("timestamp_millis")).isEqualTo(timestampMillis);

// pre-epoch timestamps
GenericRecord preEpochRecord = new GenericData.Record(avroSchema);
LocalDateTime preEpochNanos = LocalDateTime.of(1969, 1, 1, 10, 11, 12, 123456789);
LocalDateTime preEpochMicros = LocalDateTime.of(1968, 1, 1, 10, 11, 12, 123456000);
LocalDateTime preEpochMillis = LocalDateTime.of(1967, 1, 1, 10, 11, 12, 123000000);

preEpochRecord.put("timestamp_nanos", DateTimeUtil.nanosFromTimestamp(preEpochNanos));
preEpochRecord.put("timestamp_micros", DateTimeUtil.microsFromTimestamp(preEpochMicros));
preEpochRecord.put("timestamp_millis", DateTimeUtil.millisFromTimestamp(preEpochMillis));

Record preEpochResult = readRecord(reader, avroSchema, preEpochRecord);

assertThat(preEpochResult.getField("timestamp_nanos")).isEqualTo(preEpochNanos);
assertThat(preEpochResult.getField("timestamp_micros")).isEqualTo(preEpochMicros);
assertThat(preEpochResult.getField("timestamp_millis")).isEqualTo(preEpochMillis);
}

@Test
public void timestampTzDataReader() throws IOException {
org.apache.iceberg.Schema icebergSchema =
new org.apache.iceberg.Schema(
Types.NestedField.required(1, "timestamp_nanos_tz", Types.TimestampType.withZone()),
Types.NestedField.required(2, "timestamp_micros_tz", Types.TimestampType.withZone()),
Types.NestedField.required(3, "timestamp_millis_tz", Types.TimestampType.withZone()));

Schema avroSchema =
SchemaBuilder.record("test_tz")
.fields()
.name("timestamp_nanos_tz")
.type(LogicalTypes.timestampNanos().addToSchema(utcAdjustedLongSchema()))
.noDefault()
.name("timestamp_micros_tz")
.type(LogicalTypes.timestampMicros().addToSchema(utcAdjustedLongSchema()))
.noDefault()
.name("timestamp_millis_tz")
.type(LogicalTypes.timestampMillis().addToSchema(utcAdjustedLongSchema()))
.noDefault()
.endRecord();

avroSchema.getField("timestamp_nanos_tz").addProp("field-id", 1);
avroSchema.getField("timestamp_micros_tz").addProp("field-id", 2);
avroSchema.getField("timestamp_millis_tz").addProp("field-id", 3);

DataReader<Record> reader = DataReader.create(icebergSchema, avroSchema);
reader.setSchema(avroSchema);

// post-epoch timestamps
GenericRecord avroRecord = new GenericData.Record(avroSchema);

OffsetDateTime offsetTimestampNanos =
OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123456789, ZoneOffset.ofHours(-8));
OffsetDateTime offsetTimestampMicros =
OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123456000, ZoneOffset.ofHours(5));
OffsetDateTime offsetTimestampMillis =
OffsetDateTime.of(2023, 10, 15, 14, 30, 45, 123000000, ZoneOffset.ofHours(-3));

avroRecord.put("timestamp_nanos_tz", DateTimeUtil.nanosFromTimestamptz(offsetTimestampNanos));
avroRecord.put(
"timestamp_micros_tz", DateTimeUtil.microsFromTimestamptz(offsetTimestampMicros));
avroRecord.put(
"timestamp_millis_tz", DateTimeUtil.millisFromTimestamptz(offsetTimestampMillis));

Record result = readRecord(reader, avroSchema, avroRecord);

assertThat(result.getField("timestamp_nanos_tz"))
.isEqualTo(offsetTimestampNanos.withOffsetSameInstant(ZoneOffset.UTC));
assertThat(result.getField("timestamp_micros_tz"))
.isEqualTo(offsetTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC));
assertThat(result.getField("timestamp_millis_tz"))
.isEqualTo(offsetTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC));

// pre-epoch timestamps
GenericRecord preEpochRecord = new GenericData.Record(avroSchema);

OffsetDateTime preEpochTimestampNanos =
OffsetDateTime.of(1969, 1, 1, 10, 11, 12, 123456789, ZoneOffset.ofHours(-8));
OffsetDateTime preEpochTimestampMicros =
OffsetDateTime.of(1968, 1, 1, 10, 11, 12, 123456000, ZoneOffset.ofHours(5));
OffsetDateTime preEpochTimestampMillis =
OffsetDateTime.of(1967, 1, 1, 10, 11, 12, 123000000, ZoneOffset.ofHours(-3));

preEpochRecord.put(
"timestamp_nanos_tz", DateTimeUtil.nanosFromTimestamptz(preEpochTimestampNanos));
preEpochRecord.put(
"timestamp_micros_tz", DateTimeUtil.microsFromTimestamptz(preEpochTimestampMicros));
preEpochRecord.put(
"timestamp_millis_tz", DateTimeUtil.millisFromTimestamptz(preEpochTimestampMillis));

Record preEpochResult = readRecord(reader, avroSchema, preEpochRecord);

assertThat(preEpochResult.getField("timestamp_nanos_tz"))
.isEqualTo(preEpochTimestampNanos.withOffsetSameInstant(ZoneOffset.UTC));
assertThat(preEpochResult.getField("timestamp_micros_tz"))
.isEqualTo(preEpochTimestampMicros.withOffsetSameInstant(ZoneOffset.UTC));
assertThat(preEpochResult.getField("timestamp_millis_tz"))
.isEqualTo(preEpochTimestampMillis.withOffsetSameInstant(ZoneOffset.UTC));
}

private Record readRecord(DataReader<Record> reader, Schema avroSchema, GenericRecord avroRecord)
throws IOException {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(avroSchema);
writer.write(avroRecord, encoder);
encoder.flush();

try (ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray())) {
return reader.read(null, DecoderFactory.get().binaryDecoder(in, null));
}
}
}

private Schema utcAdjustedLongSchema() {
Schema schema = Schema.create(Schema.Type.LONG);
schema.addProp(ADJUST_TO_UTC_PROP, "true");
return schema;
}
}
Loading