Skip to content

Commit

Permalink
[Kernel][Defaults] Support reading timestamp_ntz stored as INT96 (#3301)
Browse files Browse the repository at this point in the history
## Description
Resolves #2908 

## How was this patch tested?
Added a unit test that reads an INT96 column written by Spark as
TIMESTAMP_NTZ
  • Loading branch information
raveeram-db authored Jun 26, 2024
1 parent 7bb9792 commit 8e3647a
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import io.delta.kernel.defaults.internal.data.vector.*;
import static io.delta.kernel.defaults.internal.parquet.TimestampConverters.createTimestampConverter;
import static io.delta.kernel.defaults.internal.parquet.TimestampConverters.createTimestampNtzConverter;

/**
* Parquet column readers for materializing the column values from Parquet files into Kernels
Expand Down Expand Up @@ -79,9 +78,11 @@ public static Converter createConverter(
return DecimalColumnReader.createDecimalConverter(
initialBatchSize, (DecimalType) typeFromClient, typeFromFile);
} else if (typeFromClient instanceof TimestampType) {
return createTimestampConverter(initialBatchSize, typeFromFile);
return createTimestampConverter(initialBatchSize, typeFromFile,
TimestampType.TIMESTAMP);
} else if (typeFromClient instanceof TimestampNTZType) {
return createTimestampNtzConverter(initialBatchSize, typeFromFile);
return createTimestampConverter(initialBatchSize, typeFromFile,
TimestampNTZType.TIMESTAMP_NTZ);
}

throw new UnsupportedOperationException(typeFromClient + " is not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT96;

import io.delta.kernel.types.*;
import static io.delta.kernel.types.TimestampNTZType.TIMESTAMP_NTZ;
import static io.delta.kernel.types.TimestampType.TIMESTAMP;

import static io.delta.kernel.internal.util.Preconditions.checkArgument;

Expand All @@ -44,31 +42,40 @@ public class TimestampConverters {
*
* @param initialBatchSize Initial batch size of the generated column vector
* @param typeFromFile Column type metadata from Parquet file
* @param typeFromClient Column type from client
* @return instance of {@link Converter}
*/
public static Converter createTimestampConverter(int initialBatchSize, Type typeFromFile) {
public static Converter createTimestampConverter(int initialBatchSize,
Type typeFromFile,
DataType typeFromClient) {
PrimitiveType primType = typeFromFile.asPrimitiveType();
LogicalTypeAnnotation typeAnnotation = primType.getLogicalTypeAnnotation();
boolean isTimestampTz = (typeFromClient instanceof TimestampType);

if (primType.getPrimitiveTypeName() == INT96) {
return new TimestampBinaryConverter(TIMESTAMP, initialBatchSize);
} else if (primType.getPrimitiveTypeName() == INT64) {
LogicalTypeAnnotation typeAnnotation = primType.getLogicalTypeAnnotation();
if (!(typeAnnotation instanceof TimestampLogicalTypeAnnotation)) {
throw new RuntimeException(String.format(
"Unsupported timestamp column with Parquet type %s.",
typeFromFile));
}
// INT96 does not have a logical type in both TIMESTAMP and TIMESTAMP_NTZ
// Also, TimestampNTZ type does not require rebasing
// due to its lack of time zone context.
return new TimestampBinaryConverter(typeFromClient, initialBatchSize);
} else if (primType.getPrimitiveTypeName() == INT64 &&
typeAnnotation instanceof TimestampLogicalTypeAnnotation) {
TimestampLogicalTypeAnnotation timestamp =
(TimestampLogicalTypeAnnotation) typeAnnotation;

checkArgument(timestamp.isAdjustedToUTC(),
"TimestampType must have Parquet TimestampType(isAdjustedToUTC=true)");
boolean isAdjustedUtc = timestamp.isAdjustedToUTC();
if (!((isTimestampTz && isAdjustedUtc) || (!isTimestampTz && !isAdjustedUtc))) {
throw new RuntimeException(String.format(
"Incompatible Utc adjustment for timestamp column. " +
"Client type: %s, File type: %s, isAdjustedUtc: %s",
typeFromClient, typeFromFile, isAdjustedUtc));
}

switch (timestamp.getUnit()) {
case MICROS:
return new ParquetColumnReaders.LongColumnReader(TIMESTAMP, initialBatchSize);
return new ParquetColumnReaders.LongColumnReader(typeFromClient,
initialBatchSize);
case MILLIS:
return new TimestampMillisConverter(TIMESTAMP, initialBatchSize);
return new TimestampMillisConverter(typeFromClient, initialBatchSize);
default:
throw new UnsupportedOperationException(String.format(
"Unsupported Parquet TimeType unit=%s", timestamp.getUnit()));
Expand All @@ -80,39 +87,6 @@ public static Converter createTimestampConverter(int initialBatchSize, Type type
}
}

/**
* Create a {@code timestamp_ntz} column type reader
*
* @param initialBatchSize Initial batch size of the generated column vector
* @param typeFromFile Column type metadata from Parquet file
* @return instance of {@link Converter}
*/
public static Converter createTimestampNtzConverter(int initialBatchSize, Type typeFromFile) {
PrimitiveType primType = typeFromFile.asPrimitiveType();

LogicalTypeAnnotation logicalType = primType.getLogicalTypeAnnotation();
checkArgument(logicalType instanceof TimestampLogicalTypeAnnotation,
"Invalid logical type annotation for timestamp_ntz type columns: " + logicalType);

checkArgument(primType.getPrimitiveTypeName() == INT64,
"Invalid storage type for timestamp_ntz columns: "
+ primType.getPrimitiveTypeName());

TimestampLogicalTypeAnnotation timestamp = (TimestampLogicalTypeAnnotation) logicalType;
checkArgument(!timestamp.isAdjustedToUTC(),
TIMESTAMP_NTZ + " must have Parquet TimestampType(isAdjustedToUTC=false)");

switch (timestamp.getUnit()) {
case MICROS:
return new ParquetColumnReaders.LongColumnReader(TIMESTAMP_NTZ, initialBatchSize);
case MILLIS:
return new TimestampMillisConverter(TIMESTAMP_NTZ, initialBatchSize);
default:
throw new UnsupportedOperationException(String.format(
"Unsupported Parquet TimeType unit=%s", timestamp.getUnit()));
}
}

public static class TimestampMillisConverter extends ParquetColumnReaders.LongColumnReader {

TimestampMillisConverter(DataType dataType, int initialBatchSize) {
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,19 @@ class ParquetFileReaderSuite extends AnyFunSuite
readParquetFilesUsingSpark(tablePath, readSchema) /* expected */)
}

test("read columns with int96 timestamp_ntz") {
// Spark doesn't support writing timestamp_NTZ as INT96 (although reads are)
// So we're reusing a pre-written file directly.
val filePath = getTestResourceFilePath("parquet/parquet-timestamp_ntz_int96.parquet")
val readSchema = new StructType()
.add("id", IntegerType.INTEGER)
.add("time", TimestampNTZType.TIMESTAMP_NTZ)
checkAnswer(
readParquetFilesUsingKernel(filePath, readSchema), /* actual */
Seq(TestRow(1, 915181200000000L) /* expected */)
)
}

test("request row indices") {
val readSchema = new StructType()
.add("id", LongType.LONG)
Expand Down

0 comments on commit 8e3647a

Please sign in to comment.