From 707e7a64be7c07b83381fa2c8f59366254f6a55b Mon Sep 17 00:00:00 2001 From: Renan Tomazoni Pinzon Date: Thu, 7 Mar 2024 14:02:34 -0300 Subject: [PATCH] [Kernel] Rename Parquet readers to maintain consistency with Parquet writers Rename Parquet readers to maintain consistency with Parquet writers. Resolves #2636 --- .../client/DefaultParquetHandler.java | 4 +- ...yConverter.java => ArrayColumnReader.java} | 10 ++- ...nverters.java => DecimalColumnReader.java} | 38 +++++---- ...MapConverter.java => MapColumnReader.java} | 10 ++- ...verters.java => ParquetColumnReaders.java} | 84 ++++++++++--------- ...atchReader.java => ParquetFileReader.java} | 13 +-- .../parquet/RepeatedValueConverter.java | 12 +-- ...RowConverter.java => RowColumnReader.java} | 27 +++--- .../internal/parquet/TimestampConverters.java | 6 +- .../internal/parquet/ParquetSuiteBase.scala | 3 +- 10 files changed, 114 insertions(+), 93 deletions(-) rename kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/{ArrayConverter.java => ArrayColumnReader.java} (86%) rename kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/{DecimalConverters.java => DecimalColumnReader.java} (85%) rename kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/{MapConverter.java => MapColumnReader.java} (88%) rename kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/{ParquetConverters.java => ParquetColumnReaders.java} (86%) rename kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/{ParquetBatchReader.java => ParquetFileReader.java} (95%) rename kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/{RowConverter.java => RowColumnReader.java} (86%) diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultParquetHandler.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultParquetHandler.java index db455cb5cba..62b53c804cc 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultParquetHandler.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultParquetHandler.java @@ -31,7 +31,7 @@ import io.delta.kernel.internal.util.Utils; -import io.delta.kernel.defaults.internal.parquet.ParquetBatchReader; +import io.delta.kernel.defaults.internal.parquet.ParquetFileReader; import io.delta.kernel.defaults.internal.parquet.ParquetFileWriter; /** @@ -55,7 +55,7 @@ public CloseableIterator readParquetFiles( StructType physicalSchema, Optional predicate) throws IOException { return new CloseableIterator() { - private final ParquetBatchReader batchReader = new ParquetBatchReader(hadoopConf); + private final ParquetFileReader batchReader = new ParquetFileReader(hadoopConf); private FileStatus currentFile; private CloseableIterator currentFileReader; diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayConverter.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java similarity index 86% rename from kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayConverter.java rename to kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java index 15422bcac65..1335badc312 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayConverter.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ArrayColumnReader.java @@ -24,12 +24,16 @@ import io.delta.kernel.types.ArrayType; import io.delta.kernel.defaults.internal.data.vector.DefaultArrayVector; -import static io.delta.kernel.defaults.internal.parquet.ParquetConverters.createConverter; +import static io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.createConverter; -class ArrayConverter extends RepeatedValueConverter { +/** + * Array column reader for materializing the column values from Parquet files into Kernels + * {@link ColumnVector}. + */ +class ArrayColumnReader extends RepeatedValueConverter { private final ArrayType typeFromClient; - ArrayConverter(int initialBatchSize, ArrayType typeFromClient, GroupType typeFromFile) { + ArrayColumnReader(int initialBatchSize, ArrayType typeFromClient, GroupType typeFromFile) { super( initialBatchSize, createElementConverter(initialBatchSize, typeFromClient, typeFromFile)); diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/DecimalConverters.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/DecimalColumnReader.java similarity index 85% rename from kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/DecimalConverters.java rename to kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/DecimalColumnReader.java index e175296a56c..64a17728145 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/DecimalConverters.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/DecimalColumnReader.java @@ -36,9 +36,13 @@ import static io.delta.kernel.internal.util.Preconditions.checkArgument; import io.delta.kernel.defaults.internal.data.vector.DefaultDecimalVector; -import io.delta.kernel.defaults.internal.parquet.ParquetConverters.BasePrimitiveColumnConverter; +import io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.BasePrimitiveColumnReader; -public class DecimalConverters { +/** + * Decimal column readers for materializing the column values from Parquet files into Kernels + * {@link ColumnVector}. + */ +public class DecimalColumnReader { public static Converter createDecimalConverter( int initialBatchSize, @@ -54,12 +58,12 @@ public static Converter createDecimalConverter( if (typeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) typeAnnotation; - return new IntDictionaryAwareDecimalConverter(typeFromClient, + return new IntDictionaryAwareDecimalColumnReader(typeFromClient, decimalType.getPrecision(), decimalType.getScale(), initialBatchSize); } else { // If the column is a plain INT32, we should pick the precision that can host // the largest INT32 value. - return new IntDictionaryAwareDecimalConverter(typeFromClient, + return new IntDictionaryAwareDecimalColumnReader(typeFromClient, 10, 0, initialBatchSize); } } else if (primType.getPrimitiveTypeName() == INT64) { @@ -67,12 +71,12 @@ public static Converter createDecimalConverter( if (typeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) typeAnnotation; - return new LongDictionaryAwareDecimalConverter(typeFromClient, + return new LongDictionaryAwareDecimalColumnReader(typeFromClient, decimalType.getPrecision(), decimalType.getScale(), initialBatchSize); } else { // If the column is a plain INT64, we should pick the precision that can host // the largest INT64 value. - return new LongDictionaryAwareDecimalConverter(typeFromClient, + return new LongDictionaryAwareDecimalColumnReader(typeFromClient, 20, 0, initialBatchSize); } } else if (primType.getPrimitiveTypeName() == FIXED_LEN_BYTE_ARRAY || @@ -81,7 +85,7 @@ public static Converter createDecimalConverter( if (typeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) typeAnnotation; - return new BinaryDictionaryAwareDecimalConverter(typeFromClient, + return new BinaryDictionaryAwareDecimalColumnReader(typeFromClient, decimalType.getPrecision(), decimalType.getScale(), initialBatchSize); } else { throw new RuntimeException(String.format( @@ -98,7 +102,7 @@ public static Converter createDecimalConverter( } } - public abstract static class BaseDecimalConverter extends BasePrimitiveColumnConverter { + public abstract static class BaseDecimalColumnReader extends BasePrimitiveColumnReader { // working state private BigDecimal[] values; @@ -106,7 +110,7 @@ public abstract static class BaseDecimalConverter extends BasePrimitiveColumnCon private final int scale; protected BigDecimal[] expandedDictionary; - BaseDecimalConverter(DataType dataType, int precision, int scale, int initialBatchSize) { + BaseDecimalColumnReader(DataType dataType, int precision, int scale, int initialBatchSize) { super(initialBatchSize); DecimalType decimalType = (DecimalType) dataType; checkArgument( @@ -144,7 +148,7 @@ public void addValueFromDictionary(int dictionaryId) { public ColumnVector getDataColumnVector(int batchSize) { ColumnVector vector = new DefaultDecimalVector(dataType, batchSize, values); // re-initialize the working space - this.nullability = ParquetConverters.initNullabilityVector(nullability.length); + this.nullability = ParquetColumnReaders.initNullabilityVector(nullability.length); this.values = new BigDecimal[values.length]; this.currentRowIndex = 0; return vector; @@ -156,7 +160,7 @@ public void resizeIfNeeded() { int newSize = values.length * 2; this.values = Arrays.copyOf(this.values, newSize); this.nullability = Arrays.copyOf(this.nullability, newSize); - ParquetConverters.setNullabilityToTrue(this.nullability, newSize / 2, newSize); + ParquetColumnReaders.setNullabilityToTrue(this.nullability, newSize / 2, newSize); } } @@ -169,8 +173,8 @@ protected BigDecimal decimalFromBinary(Binary value) { } } - public static class IntDictionaryAwareDecimalConverter extends BaseDecimalConverter { - IntDictionaryAwareDecimalConverter( + public static class IntDictionaryAwareDecimalColumnReader extends BaseDecimalColumnReader { + IntDictionaryAwareDecimalColumnReader( DataType dataType, int precision, int scale, int initialBatchSize) { super(dataType, precision, scale, initialBatchSize); } @@ -190,8 +194,8 @@ public void addInt(int value) { } } - public static class LongDictionaryAwareDecimalConverter extends BaseDecimalConverter { - LongDictionaryAwareDecimalConverter( + public static class LongDictionaryAwareDecimalColumnReader extends BaseDecimalColumnReader { + LongDictionaryAwareDecimalColumnReader( DataType dataType, int precision, int scale, int initialBatchSize) { super(dataType, precision, scale, initialBatchSize); } @@ -211,8 +215,8 @@ public void addLong(long value) { } } - public static class BinaryDictionaryAwareDecimalConverter extends BaseDecimalConverter { - BinaryDictionaryAwareDecimalConverter( + public static class BinaryDictionaryAwareDecimalColumnReader extends BaseDecimalColumnReader { + BinaryDictionaryAwareDecimalColumnReader( DataType dataType, int precision, int scale, int initialBatchSize) { super(dataType, precision, scale, initialBatchSize); } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapConverter.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java similarity index 88% rename from kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapConverter.java rename to kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java index 61238bc21b7..b2b3a64ed29 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapConverter.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/MapColumnReader.java @@ -25,12 +25,16 @@ import io.delta.kernel.types.MapType; import io.delta.kernel.defaults.internal.data.vector.DefaultMapVector; -import static io.delta.kernel.defaults.internal.parquet.ParquetConverters.createConverter; +import static io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.createConverter; -class MapConverter extends RepeatedValueConverter { +/** + * Map column readers for materializing the column values from Parquet files into Kernels + * {@link ColumnVector}. + */ +class MapColumnReader extends RepeatedValueConverter { private final MapType typeFromClient; - MapConverter(int initialBatchSize, MapType typeFromClient, GroupType typeFromFile) { + MapColumnReader(int initialBatchSize, MapType typeFromClient, GroupType typeFromFile) { super( initialBatchSize, createElementConverters(initialBatchSize, typeFromClient, typeFromFile)); diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetConverters.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java similarity index 86% rename from kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetConverters.java rename to kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java index 03b93b2f701..b06da114600 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetConverters.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetColumnReaders.java @@ -31,46 +31,50 @@ import io.delta.kernel.defaults.internal.data.vector.*; -class ParquetConverters { +/** + * Parquet column readers for materializing the column values from Parquet files into Kernels + * {@link ColumnVector}. + */ +class ParquetColumnReaders { public static Converter createConverter( int initialBatchSize, DataType typeFromClient, Type typeFromFile ) { if (typeFromClient instanceof StructType) { - return new RowConverter( + return new RowColumnReader( initialBatchSize, (StructType) typeFromClient, (GroupType) typeFromFile); } else if (typeFromClient instanceof ArrayType) { - return new ArrayConverter( + return new ArrayColumnReader( initialBatchSize, (ArrayType) typeFromClient, (GroupType) typeFromFile ); } else if (typeFromClient instanceof MapType) { - return new MapConverter( + return new MapColumnReader( initialBatchSize, (MapType) typeFromClient, (GroupType) typeFromFile); } else if (typeFromClient instanceof StringType || typeFromClient instanceof BinaryType) { - return new BinaryColumnConverter(typeFromClient, initialBatchSize); + return new BinaryColumnReader(typeFromClient, initialBatchSize); } else if (typeFromClient instanceof BooleanType) { - return new BooleanColumnConverter(initialBatchSize); + return new BooleanColumnReader(initialBatchSize); } else if (typeFromClient instanceof IntegerType || typeFromClient instanceof DateType) { - return new IntColumnConverter(typeFromClient, initialBatchSize); + return new IntColumnReader(typeFromClient, initialBatchSize); } else if (typeFromClient instanceof ByteType) { - return new ByteColumnConverter(initialBatchSize); + return new ByteColumnReader(initialBatchSize); } else if (typeFromClient instanceof ShortType) { - return new ShortColumnConverter(initialBatchSize); + return new ShortColumnReader(initialBatchSize); } else if (typeFromClient instanceof LongType) { - return new LongColumnConverter(typeFromClient, initialBatchSize); + return new LongColumnReader(typeFromClient, initialBatchSize); } else if (typeFromClient instanceof FloatType) { - return new FloatColumnConverter(initialBatchSize); + return new FloatColumnReader(initialBatchSize); } else if (typeFromClient instanceof DoubleType) { - return new DoubleColumnConverter(initialBatchSize); + return new DoubleColumnReader(initialBatchSize); } else if (typeFromClient instanceof DecimalType) { - return DecimalConverters.createDecimalConverter( + return DecimalColumnReader.createDecimalConverter( initialBatchSize, (DecimalType) typeFromClient, typeFromFile); } else if (typeFromClient instanceof TimestampType) { return TimestampConverters.createTimestampConverter(initialBatchSize, typeFromFile); @@ -95,14 +99,14 @@ static void setNullabilityToTrue(boolean[] nullability, int start, int end) { } /** - * Base converter for all implementations of Parquet {@link Converter} to return data in + * Base column reader for all implementations of Parquet {@link Converter} to return data in * columnar batch. General operation flow is: - * - each converter implementation allocates state to receive a fixed number of column values + * - each reader implementation allocates state to receive a fixed number of column values * - before accepting a new value the state is resized if it is not of sufficient size * - after each row, {@link #finalizeCurrentRow(long)} is called to finalize the state of * the last read row column value. */ - public interface BaseConverter { + public interface BaseColumnReader { ColumnVector getDataColumnVector(int batchSize); /** @@ -122,12 +126,12 @@ default void resizeIfNeeded() {} default void resetWorkingState() {} } - public static class NonExistentColumnConverter + public static class NonExistentColumnReader extends PrimitiveConverter - implements BaseConverter { + implements BaseColumnReader { private final DataType dataType; - NonExistentColumnConverter(DataType dataType) { + NonExistentColumnReader(DataType dataType) { this.dataType = Objects.requireNonNull(dataType, "dataType is null"); } @@ -140,14 +144,14 @@ public ColumnVector getDataColumnVector(int batchSize) { public void finalizeCurrentRow(long currentRowIndex) {} } - public abstract static class BasePrimitiveColumnConverter + public abstract static class BasePrimitiveColumnReader extends PrimitiveConverter - implements BaseConverter { + implements BaseColumnReader { // working state protected int currentRowIndex; protected boolean[] nullability; - BasePrimitiveColumnConverter(int initialBatchSize) { + BasePrimitiveColumnReader(int initialBatchSize) { checkArgument(initialBatchSize > 0, "invalid initialBatchSize: %s", initialBatchSize); // Initialize the working state this.nullability = initNullabilityVector(initialBatchSize); @@ -160,11 +164,11 @@ public void finalizeCurrentRow(long currentRowIndex) { } } - public static class BooleanColumnConverter extends BasePrimitiveColumnConverter { + public static class BooleanColumnReader extends BasePrimitiveColumnReader { // working state private boolean[] values; - BooleanColumnConverter(int initialBatchSize) { + BooleanColumnReader(int initialBatchSize) { super(initialBatchSize); this.values = new boolean[initialBatchSize]; } @@ -197,11 +201,11 @@ public void resizeIfNeeded() { } } - public static class ByteColumnConverter extends BasePrimitiveColumnConverter { + public static class ByteColumnReader extends BasePrimitiveColumnReader { // working state private byte[] values; - ByteColumnConverter(int initialBatchSize) { + ByteColumnReader(int initialBatchSize) { super(initialBatchSize); this.values = new byte[initialBatchSize]; } @@ -234,11 +238,11 @@ public void resizeIfNeeded() { } } - public static class ShortColumnConverter extends BasePrimitiveColumnConverter { + public static class ShortColumnReader extends BasePrimitiveColumnReader { // working state private short[] values; - ShortColumnConverter(int initialBatchSize) { + ShortColumnReader(int initialBatchSize) { super(initialBatchSize); this.values = new short[initialBatchSize]; } @@ -271,12 +275,12 @@ public void resizeIfNeeded() { } } - public static class IntColumnConverter extends BasePrimitiveColumnConverter { + public static class IntColumnReader extends BasePrimitiveColumnReader { private final DataType dataType; // working state private int[] values; - IntColumnConverter(DataType dataType, int initialBatchSize) { + IntColumnReader(DataType dataType, int initialBatchSize) { super(initialBatchSize); checkArgument(dataType instanceof IntegerType || dataType instanceof DataType); this.dataType = dataType; @@ -311,12 +315,12 @@ public void resizeIfNeeded() { } } - public static class LongColumnConverter extends BasePrimitiveColumnConverter { + public static class LongColumnReader extends BasePrimitiveColumnReader { private final DataType dataType; // working state private long[] values; - LongColumnConverter(DataType dataType, int initialBatchSize) { + LongColumnReader(DataType dataType, int initialBatchSize) { super(initialBatchSize); checkArgument(dataType instanceof LongType || dataType instanceof TimestampType); this.dataType = dataType; @@ -351,11 +355,11 @@ public void resizeIfNeeded() { } } - public static class FloatColumnConverter extends BasePrimitiveColumnConverter { + public static class FloatColumnReader extends BasePrimitiveColumnReader { // working state private float[] values; - FloatColumnConverter(int initialBatchSize) { + FloatColumnReader(int initialBatchSize) { super(initialBatchSize); this.values = new float[initialBatchSize]; } @@ -388,11 +392,11 @@ public void resizeIfNeeded() { } } - public static class DoubleColumnConverter extends BasePrimitiveColumnConverter { + public static class DoubleColumnReader extends BasePrimitiveColumnReader { // working state private double[] values; - DoubleColumnConverter(int initialBatchSize) { + DoubleColumnReader(int initialBatchSize) { super(initialBatchSize); this.values = new double[initialBatchSize]; } @@ -426,13 +430,13 @@ public void resizeIfNeeded() { } } - public static class BinaryColumnConverter extends BasePrimitiveColumnConverter { + public static class BinaryColumnReader extends BasePrimitiveColumnReader { private final DataType dataType; // working state private byte[][] values; - BinaryColumnConverter(DataType dataType, int initialBatchSize) { + BinaryColumnReader(DataType dataType, int initialBatchSize) { super(initialBatchSize); this.dataType = dataType; this.values = new byte[initialBatchSize][]; @@ -466,8 +470,8 @@ public void resizeIfNeeded() { } } - public static class FileRowIndexColumnConverter extends LongColumnConverter { - FileRowIndexColumnConverter(int initialBatchSize) { + public static class FileRowIndexColumnReader extends LongColumnReader { + FileRowIndexColumnReader(int initialBatchSize) { super(LongType.LONG, initialBatchSize); } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetBatchReader.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java similarity index 95% rename from kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetBatchReader.java rename to kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java index 86298dc9e6d..7d717745259 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetBatchReader.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/ParquetFileReader.java @@ -40,11 +40,11 @@ import io.delta.kernel.utils.CloseableIterator; import static io.delta.kernel.internal.util.Preconditions.checkArgument; -public class ParquetBatchReader { +public class ParquetFileReader { private final Configuration configuration; private final int maxBatchSize; - public ParquetBatchReader(Configuration configuration) { + public ParquetFileReader(Configuration configuration) { this.configuration = requireNonNull(configuration, "configuration is null"); this.maxBatchSize = configuration.getInt("delta.kernel.default.parquet.reader.batch-size", 1024); @@ -163,7 +163,7 @@ public void finalizeCurrentRow(long fileRowIndex) { /** * Collects the records given by the Parquet reader as columnar data. Parquet reader allows - * reading data row by row, but {@link ParquetBatchReader} wants to expose the data as a + * reading data row by row, but {@link ParquetFileReader} wants to expose the data as a * columnar batch. Parquet reader takes an implementation of {@link RecordMaterializer} * to which it gives data for each column one row a time. This {@link RecordMaterializer} * implementation collects the column values for multiple rows and returns a @@ -172,10 +172,11 @@ public void finalizeCurrentRow(long fileRowIndex) { public static class RowRecordCollector extends RecordMaterializer { private static final Object FAKE_ROW_RECORD = new Object(); - private final RowConverter rowRecordGroupConverter; + private final RowColumnReader rowRecordGroupConverter; public RowRecordCollector(int maxBatchSize, StructType readSchema, MessageType fileSchema) { - this.rowRecordGroupConverter = new RowConverter(maxBatchSize, readSchema, fileSchema); + this.rowRecordGroupConverter = + new RowColumnReader(maxBatchSize, readSchema, fileSchema); } @Override @@ -184,7 +185,7 @@ public void skipCurrentRecord() { } /** - * Return a fake object. This is not used by {@link ParquetBatchReader}, instead + * Return a fake object. This is not used by {@link ParquetFileReader}, instead * {@link #getDataAsColumnarBatch}} once a sufficient number of rows are collected. */ @Override diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/RepeatedValueConverter.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/RepeatedValueConverter.java index 9ad641f94f9..ada3ad96e12 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/RepeatedValueConverter.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/RepeatedValueConverter.java @@ -23,15 +23,15 @@ import io.delta.kernel.data.ColumnVector; import static io.delta.kernel.internal.util.Preconditions.checkArgument; -import io.delta.kernel.defaults.internal.parquet.ParquetConverters.BaseConverter; -import static io.delta.kernel.defaults.internal.parquet.ParquetConverters.initNullabilityVector; -import static io.delta.kernel.defaults.internal.parquet.ParquetConverters.setNullabilityToTrue; +import io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.BaseColumnReader; +import static io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.initNullabilityVector; +import static io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.setNullabilityToTrue; /** * Abstract implementation of Parquet converters for capturing the repeated types such as * list or map. */ -abstract class RepeatedValueConverter extends GroupConverter implements BaseConverter { +abstract class RepeatedValueConverter extends GroupConverter implements BaseColumnReader { private final Collector collector; // working state @@ -148,7 +148,7 @@ public void start() {} public void end() { for (Converter converter : elementConverters) { long prevRowIndex = -1; // Row indexes are not needed for nested columns - ((ParquetConverters.BaseConverter) converter).finalizeCurrentRow(prevRowIndex); + ((BaseColumnReader) converter).finalizeCurrentRow(prevRowIndex); } currentEntryIndex++; } @@ -156,7 +156,7 @@ public void end() { ColumnVector[] getDataVectors() { ColumnVector[] dataVectors = new ColumnVector[elementConverters.length]; for (int i = 0; i < elementConverters.length; i++) { - dataVectors[i] = ((ParquetConverters.BaseConverter) elementConverters[i]) + dataVectors[i] = ((BaseColumnReader) elementConverters[i]) .getDataColumnVector(currentEntryIndex); } currentEntryIndex = 0; diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/RowConverter.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/RowColumnReader.java similarity index 86% rename from kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/RowConverter.java rename to kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/RowColumnReader.java index d131311cc11..638c4d74a4f 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/RowConverter.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/RowColumnReader.java @@ -36,9 +36,13 @@ import static io.delta.kernel.defaults.internal.parquet.ParquetSchemaUtils.findSubFieldType; import static io.delta.kernel.defaults.internal.parquet.ParquetSchemaUtils.getParquetFieldToTypeMap; -class RowConverter +/** + * Row column readers for materializing the column values from Parquet files into Kernels + * {@link ColumnVector}. + */ +class RowColumnReader extends GroupConverter - implements ParquetConverters.BaseConverter { + implements ParquetColumnReaders.BaseColumnReader { private final StructType readSchema; private final Converter[] converters; // The delta may request columns that don't exists in Parquet @@ -63,7 +67,7 @@ class RowConverter * fields in fileSchema are in the same order as the corresponding * fields in readSchema. */ - RowConverter(int initialBatchSize, StructType readSchema, GroupType fileSchema) { + RowColumnReader(int initialBatchSize, StructType readSchema, GroupType fileSchema) { checkArgument(initialBatchSize > 0, "invalid initialBatchSize: %s", initialBatchSize); this.readSchema = requireNonNull(readSchema, "readSchema is not null"); List fields = readSchema.fields(); @@ -71,7 +75,7 @@ class RowConverter this.parquetOrdinalToConverterOrdinal = new HashMap<>(); // Initialize the working state - this.nullability = ParquetConverters.initNullabilityVector(initialBatchSize); + this.nullability = ParquetColumnReaders.initNullabilityVector(initialBatchSize); int parquetOrdinal = 0; for (int i = 0; i < converters.length; i++) { @@ -86,13 +90,13 @@ class RowConverter checkArgument(field.getDataType() instanceof LongType, "row index metadata column must be type long"); converters[i] = - new ParquetConverters.FileRowIndexColumnConverter(initialBatchSize); + new ParquetColumnReaders.FileRowIndexColumnReader(initialBatchSize); } else { - converters[i] = new ParquetConverters.NonExistentColumnConverter( + converters[i] = new ParquetColumnReaders.NonExistentColumnReader( typeFromClient); } } else { - converters[i] = ParquetConverters.createConverter( + converters[i] = ParquetColumnReaders.createConverter( initialBatchSize, typeFromClient, typeFromFile); parquetOrdinalToConverterOrdinal.put(parquetOrdinal, i); parquetOrdinal++; @@ -148,7 +152,7 @@ public void resizeIfNeeded() { if (nullability.length == currentRowIndex) { int newSize = nullability.length * 2; this.nullability = Arrays.copyOf(this.nullability, newSize); - ParquetConverters.setNullabilityToTrue(this.nullability, newSize / 2, newSize); + ParquetColumnReaders.setNullabilityToTrue(this.nullability, newSize / 2, newSize); } } @@ -156,12 +160,13 @@ public void resizeIfNeeded() { public void resetWorkingState() { this.currentRowIndex = 0; this.isCurrentValueNull = true; - this.nullability = ParquetConverters.initNullabilityVector(this.nullability.length); + this.nullability = ParquetColumnReaders.initNullabilityVector(this.nullability.length); } private void finalizeLastRowInConverters(long prevRowIndex) { for (int i = 0; i < converters.length; i++) { - ((ParquetConverters.BaseConverter) converters[i]).finalizeCurrentRow(prevRowIndex); + ((ParquetColumnReaders.BaseColumnReader) converters[i]) + .finalizeCurrentRow(prevRowIndex); } } @@ -169,7 +174,7 @@ private ColumnVector[] collectMemberVectors(int batchSize) { final ColumnVector[] output = new ColumnVector[converters.length]; for (int i = 0; i < converters.length; i++) { - output[i] = ((ParquetConverters.BaseConverter) converters[i]) + output[i] = ((ParquetColumnReaders.BaseColumnReader) converters[i]) .getDataColumnVector(batchSize); } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/TimestampConverters.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/TimestampConverters.java index 286e7d7e4c1..57e7f1e0359 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/TimestampConverters.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/parquet/TimestampConverters.java @@ -52,7 +52,7 @@ public static Converter createTimestampConverter(int initialBatchSize, Type type "TimestampType must have parquet TimeType(isAdjustedToUTC=true)"); if (timestamp.getUnit() == LogicalTypeAnnotation.TimeUnit.MICROS) { - return new ParquetConverters.LongColumnConverter( + return new ParquetColumnReaders.LongColumnReader( TimestampType.TIMESTAMP, initialBatchSize); } else if (timestamp.getUnit() == LogicalTypeAnnotation.TimeUnit.MILLIS) { return new TimestampMillisConverter(initialBatchSize); @@ -68,7 +68,7 @@ public static Converter createTimestampConverter(int initialBatchSize, Type type } } - public static class TimestampMillisConverter extends ParquetConverters.LongColumnConverter { + public static class TimestampMillisConverter extends ParquetColumnReaders.LongColumnReader { TimestampMillisConverter( int initialBatchSize) { super(TimestampType.TIMESTAMP, initialBatchSize); @@ -80,7 +80,7 @@ public void addLong(long value) { } } - public static class TimestampBinaryConverter extends ParquetConverters.LongColumnConverter { + public static class TimestampBinaryConverter extends ParquetColumnReaders.LongColumnReader { TimestampBinaryConverter( int initialBatchSize) { super(TimestampType.TIMESTAMP, initialBatchSize); diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala index f221aeeb4d7..f94988035a5 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/parquet/ParquetSuiteBase.scala @@ -29,7 +29,6 @@ import io.delta.kernel.internal.util.Utils.toCloseableIterator import io.delta.kernel.types.{ArrayType, DataType, MapType, StructType} import io.delta.kernel.utils.{DataFileStatus, FileStatus} import org.apache.hadoop.fs.Path -import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.metadata.ParquetMetadata trait ParquetSuiteBase extends TestUtils { @@ -243,7 +242,7 @@ trait ParquetSuiteBase extends TestUtils { def footer(path: String): ParquetMetadata = { try { - ParquetFileReader.readFooter(configuration, new Path(path)) + org.apache.parquet.hadoop.ParquetFileReader.readFooter(configuration, new Path(path)) } catch { case NonFatal(e) => fail(s"Failed to read footer for file: $path", e) }