Skip to content

Commit

Permalink
[Kernel] Rename Parquet readers to maintain consistency with Parquet …
Browse files Browse the repository at this point in the history
…writers

Rename Parquet readers to maintain consistency with Parquet writers.

Resolves #2636
  • Loading branch information
rpinzon authored Mar 7, 2024
1 parent 2fddb8b commit 707e7a6
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import io.delta.kernel.internal.util.Utils;

import io.delta.kernel.defaults.internal.parquet.ParquetBatchReader;
import io.delta.kernel.defaults.internal.parquet.ParquetFileReader;
import io.delta.kernel.defaults.internal.parquet.ParquetFileWriter;

/**
Expand All @@ -55,7 +55,7 @@ public CloseableIterator<ColumnarBatch> readParquetFiles(
StructType physicalSchema,
Optional<Predicate> predicate) throws IOException {
return new CloseableIterator<ColumnarBatch>() {
private final ParquetBatchReader batchReader = new ParquetBatchReader(hadoopConf);
private final ParquetFileReader batchReader = new ParquetFileReader(hadoopConf);
private FileStatus currentFile;
private CloseableIterator<ColumnarBatch> currentFileReader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import io.delta.kernel.types.ArrayType;

import io.delta.kernel.defaults.internal.data.vector.DefaultArrayVector;
import static io.delta.kernel.defaults.internal.parquet.ParquetConverters.createConverter;
import static io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.createConverter;

class ArrayConverter extends RepeatedValueConverter {
/**
* Array column reader for materializing the column values from Parquet files into Kernels
* {@link ColumnVector}.
*/
class ArrayColumnReader extends RepeatedValueConverter {
private final ArrayType typeFromClient;

ArrayConverter(int initialBatchSize, ArrayType typeFromClient, GroupType typeFromFile) {
ArrayColumnReader(int initialBatchSize, ArrayType typeFromClient, GroupType typeFromFile) {
super(
initialBatchSize,
createElementConverter(initialBatchSize, typeFromClient, typeFromFile));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@
import static io.delta.kernel.internal.util.Preconditions.checkArgument;

import io.delta.kernel.defaults.internal.data.vector.DefaultDecimalVector;
import io.delta.kernel.defaults.internal.parquet.ParquetConverters.BasePrimitiveColumnConverter;
import io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.BasePrimitiveColumnReader;

public class DecimalConverters {
/**
* Decimal column readers for materializing the column values from Parquet files into Kernels
* {@link ColumnVector}.
*/
public class DecimalColumnReader {

public static Converter createDecimalConverter(
int initialBatchSize,
Expand All @@ -54,25 +58,25 @@ public static Converter createDecimalConverter(
if (typeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) typeAnnotation;
return new IntDictionaryAwareDecimalConverter(typeFromClient,
return new IntDictionaryAwareDecimalColumnReader(typeFromClient,
decimalType.getPrecision(), decimalType.getScale(), initialBatchSize);
} else {
// If the column is a plain INT32, we should pick the precision that can host
// the largest INT32 value.
return new IntDictionaryAwareDecimalConverter(typeFromClient,
return new IntDictionaryAwareDecimalColumnReader(typeFromClient,
10, 0, initialBatchSize);
}
} else if (primType.getPrimitiveTypeName() == INT64) {
// For INT64 backed decimals
if (typeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) typeAnnotation;
return new LongDictionaryAwareDecimalConverter(typeFromClient,
return new LongDictionaryAwareDecimalColumnReader(typeFromClient,
decimalType.getPrecision(), decimalType.getScale(), initialBatchSize);
} else {
// If the column is a plain INT64, we should pick the precision that can host
// the largest INT64 value.
return new LongDictionaryAwareDecimalConverter(typeFromClient,
return new LongDictionaryAwareDecimalColumnReader(typeFromClient,
20, 0, initialBatchSize);
}
} else if (primType.getPrimitiveTypeName() == FIXED_LEN_BYTE_ARRAY ||
Expand All @@ -81,7 +85,7 @@ public static Converter createDecimalConverter(
if (typeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) typeAnnotation;
return new BinaryDictionaryAwareDecimalConverter(typeFromClient,
return new BinaryDictionaryAwareDecimalColumnReader(typeFromClient,
decimalType.getPrecision(), decimalType.getScale(), initialBatchSize);
} else {
throw new RuntimeException(String.format(
Expand All @@ -98,15 +102,15 @@ public static Converter createDecimalConverter(
}
}

public abstract static class BaseDecimalConverter extends BasePrimitiveColumnConverter {
public abstract static class BaseDecimalColumnReader extends BasePrimitiveColumnReader {
// working state
private BigDecimal[] values;

private final DataType dataType;
private final int scale;
protected BigDecimal[] expandedDictionary;

BaseDecimalConverter(DataType dataType, int precision, int scale, int initialBatchSize) {
BaseDecimalColumnReader(DataType dataType, int precision, int scale, int initialBatchSize) {
super(initialBatchSize);
DecimalType decimalType = (DecimalType) dataType;
checkArgument(
Expand Down Expand Up @@ -144,7 +148,7 @@ public void addValueFromDictionary(int dictionaryId) {
public ColumnVector getDataColumnVector(int batchSize) {
ColumnVector vector = new DefaultDecimalVector(dataType, batchSize, values);
// re-initialize the working space
this.nullability = ParquetConverters.initNullabilityVector(nullability.length);
this.nullability = ParquetColumnReaders.initNullabilityVector(nullability.length);
this.values = new BigDecimal[values.length];
this.currentRowIndex = 0;
return vector;
Expand All @@ -156,7 +160,7 @@ public void resizeIfNeeded() {
int newSize = values.length * 2;
this.values = Arrays.copyOf(this.values, newSize);
this.nullability = Arrays.copyOf(this.nullability, newSize);
ParquetConverters.setNullabilityToTrue(this.nullability, newSize / 2, newSize);
ParquetColumnReaders.setNullabilityToTrue(this.nullability, newSize / 2, newSize);
}
}

Expand All @@ -169,8 +173,8 @@ protected BigDecimal decimalFromBinary(Binary value) {
}
}

public static class IntDictionaryAwareDecimalConverter extends BaseDecimalConverter {
IntDictionaryAwareDecimalConverter(
public static class IntDictionaryAwareDecimalColumnReader extends BaseDecimalColumnReader {
IntDictionaryAwareDecimalColumnReader(
DataType dataType, int precision, int scale, int initialBatchSize) {
super(dataType, precision, scale, initialBatchSize);
}
Expand All @@ -190,8 +194,8 @@ public void addInt(int value) {
}
}

public static class LongDictionaryAwareDecimalConverter extends BaseDecimalConverter {
LongDictionaryAwareDecimalConverter(
public static class LongDictionaryAwareDecimalColumnReader extends BaseDecimalColumnReader {
LongDictionaryAwareDecimalColumnReader(
DataType dataType, int precision, int scale, int initialBatchSize) {
super(dataType, precision, scale, initialBatchSize);
}
Expand All @@ -211,8 +215,8 @@ public void addLong(long value) {
}
}

public static class BinaryDictionaryAwareDecimalConverter extends BaseDecimalConverter {
BinaryDictionaryAwareDecimalConverter(
public static class BinaryDictionaryAwareDecimalColumnReader extends BaseDecimalColumnReader {
BinaryDictionaryAwareDecimalColumnReader(
DataType dataType, int precision, int scale, int initialBatchSize) {
super(dataType, precision, scale, initialBatchSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import io.delta.kernel.types.MapType;

import io.delta.kernel.defaults.internal.data.vector.DefaultMapVector;
import static io.delta.kernel.defaults.internal.parquet.ParquetConverters.createConverter;
import static io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.createConverter;

class MapConverter extends RepeatedValueConverter {
/**
* Map column readers for materializing the column values from Parquet files into Kernels
* {@link ColumnVector}.
*/
class MapColumnReader extends RepeatedValueConverter {
private final MapType typeFromClient;

MapConverter(int initialBatchSize, MapType typeFromClient, GroupType typeFromFile) {
MapColumnReader(int initialBatchSize, MapType typeFromClient, GroupType typeFromFile) {
super(
initialBatchSize,
createElementConverters(initialBatchSize, typeFromClient, typeFromFile));
Expand Down
Loading

0 comments on commit 707e7a6

Please sign in to comment.