Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Rename Parquet readers to maintain consistency with Parquet writers #2722

Merged
merged 5 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import io.delta.kernel.internal.util.Utils;

import io.delta.kernel.defaults.internal.parquet.ParquetBatchReader;
import io.delta.kernel.defaults.internal.parquet.ParquetFileReader;
import io.delta.kernel.defaults.internal.parquet.ParquetFileWriter;

/**
Expand All @@ -55,7 +55,7 @@ public CloseableIterator<ColumnarBatch> readParquetFiles(
StructType physicalSchema,
Optional<Predicate> predicate) throws IOException {
return new CloseableIterator<ColumnarBatch>() {
private final ParquetBatchReader batchReader = new ParquetBatchReader(hadoopConf);
private final ParquetFileReader batchReader = new ParquetFileReader(hadoopConf);
private FileStatus currentFile;
private CloseableIterator<ColumnarBatch> currentFileReader;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import io.delta.kernel.types.ArrayType;

import io.delta.kernel.defaults.internal.data.vector.DefaultArrayVector;
import static io.delta.kernel.defaults.internal.parquet.ParquetConverters.createConverter;
import static io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.createConverter;

class ArrayConverter extends RepeatedValueConverter {
/**
* Array column reader for materializing the column values from Parquet files into Kernels
* {@link ColumnVector}.
*/
class ArrayColumnReader extends RepeatedValueConverter {
private final ArrayType typeFromClient;

ArrayConverter(int initialBatchSize, ArrayType typeFromClient, GroupType typeFromFile) {
ArrayColumnReader(int initialBatchSize, ArrayType typeFromClient, GroupType typeFromFile) {
super(
initialBatchSize,
createElementConverter(initialBatchSize, typeFromClient, typeFromFile));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@
import static io.delta.kernel.internal.util.Preconditions.checkArgument;

import io.delta.kernel.defaults.internal.data.vector.DefaultDecimalVector;
import io.delta.kernel.defaults.internal.parquet.ParquetConverters.BasePrimitiveColumnConverter;
import io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.BasePrimitiveColumnReader;

public class DecimalConverters {
/**
* Decimal column readers for materializing the column values from Parquet files into Kernels
* {@link ColumnVector}.
*/
public class DecimalColumnReader {

public static Converter createDecimalConverter(
int initialBatchSize,
Expand All @@ -54,25 +58,25 @@ public static Converter createDecimalConverter(
if (typeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) typeAnnotation;
return new IntDictionaryAwareDecimalConverter(typeFromClient,
return new IntDictionaryAwareDecimalColumnReader(typeFromClient,
decimalType.getPrecision(), decimalType.getScale(), initialBatchSize);
} else {
// If the column is a plain INT32, we should pick the precision that can host
// the largest INT32 value.
return new IntDictionaryAwareDecimalConverter(typeFromClient,
return new IntDictionaryAwareDecimalColumnReader(typeFromClient,
10, 0, initialBatchSize);
}
} else if (primType.getPrimitiveTypeName() == INT64) {
// For INT64 backed decimals
if (typeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) typeAnnotation;
return new LongDictionaryAwareDecimalConverter(typeFromClient,
return new LongDictionaryAwareDecimalColumnReader(typeFromClient,
decimalType.getPrecision(), decimalType.getScale(), initialBatchSize);
} else {
// If the column is a plain INT64, we should pick the precision that can host
// the largest INT64 value.
return new LongDictionaryAwareDecimalConverter(typeFromClient,
return new LongDictionaryAwareDecimalColumnReader(typeFromClient,
20, 0, initialBatchSize);
}
} else if (primType.getPrimitiveTypeName() == FIXED_LEN_BYTE_ARRAY ||
Expand All @@ -81,7 +85,7 @@ public static Converter createDecimalConverter(
if (typeAnnotation instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType =
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) typeAnnotation;
return new BinaryDictionaryAwareDecimalConverter(typeFromClient,
return new BinaryDictionaryAwareDecimalColumnReader(typeFromClient,
decimalType.getPrecision(), decimalType.getScale(), initialBatchSize);
} else {
throw new RuntimeException(String.format(
Expand All @@ -98,15 +102,15 @@ public static Converter createDecimalConverter(
}
}

public abstract static class BaseDecimalConverter extends BasePrimitiveColumnConverter {
public abstract static class BaseDecimalColumnReader extends BasePrimitiveColumnReader {
// working state
private BigDecimal[] values;

private final DataType dataType;
private final int scale;
protected BigDecimal[] expandedDictionary;

BaseDecimalConverter(DataType dataType, int precision, int scale, int initialBatchSize) {
BaseDecimalColumnReader(DataType dataType, int precision, int scale, int initialBatchSize) {
super(initialBatchSize);
DecimalType decimalType = (DecimalType) dataType;
checkArgument(
Expand Down Expand Up @@ -144,7 +148,7 @@ public void addValueFromDictionary(int dictionaryId) {
public ColumnVector getDataColumnVector(int batchSize) {
ColumnVector vector = new DefaultDecimalVector(dataType, batchSize, values);
// re-initialize the working space
this.nullability = ParquetConverters.initNullabilityVector(nullability.length);
this.nullability = ParquetColumnReaders.initNullabilityVector(nullability.length);
this.values = new BigDecimal[values.length];
this.currentRowIndex = 0;
return vector;
Expand All @@ -156,7 +160,7 @@ public void resizeIfNeeded() {
int newSize = values.length * 2;
this.values = Arrays.copyOf(this.values, newSize);
this.nullability = Arrays.copyOf(this.nullability, newSize);
ParquetConverters.setNullabilityToTrue(this.nullability, newSize / 2, newSize);
ParquetColumnReaders.setNullabilityToTrue(this.nullability, newSize / 2, newSize);
}
}

Expand All @@ -169,8 +173,8 @@ protected BigDecimal decimalFromBinary(Binary value) {
}
}

public static class IntDictionaryAwareDecimalConverter extends BaseDecimalConverter {
IntDictionaryAwareDecimalConverter(
public static class IntDictionaryAwareDecimalColumnReader extends BaseDecimalColumnReader {
IntDictionaryAwareDecimalColumnReader(
DataType dataType, int precision, int scale, int initialBatchSize) {
super(dataType, precision, scale, initialBatchSize);
}
Expand All @@ -190,8 +194,8 @@ public void addInt(int value) {
}
}

public static class LongDictionaryAwareDecimalConverter extends BaseDecimalConverter {
LongDictionaryAwareDecimalConverter(
public static class LongDictionaryAwareDecimalColumnReader extends BaseDecimalColumnReader {
LongDictionaryAwareDecimalColumnReader(
DataType dataType, int precision, int scale, int initialBatchSize) {
super(dataType, precision, scale, initialBatchSize);
}
Expand All @@ -211,8 +215,8 @@ public void addLong(long value) {
}
}

public static class BinaryDictionaryAwareDecimalConverter extends BaseDecimalConverter {
BinaryDictionaryAwareDecimalConverter(
public static class BinaryDictionaryAwareDecimalColumnReader extends BaseDecimalColumnReader {
BinaryDictionaryAwareDecimalColumnReader(
DataType dataType, int precision, int scale, int initialBatchSize) {
super(dataType, precision, scale, initialBatchSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import io.delta.kernel.types.MapType;

import io.delta.kernel.defaults.internal.data.vector.DefaultMapVector;
import static io.delta.kernel.defaults.internal.parquet.ParquetConverters.createConverter;
import static io.delta.kernel.defaults.internal.parquet.ParquetColumnReaders.createConverter;

class MapConverter extends RepeatedValueConverter {
/**
* Map column readers for materializing the column values from Parquet files into Kernels
* {@link ColumnVector}.
*/
class MapColumnReader extends RepeatedValueConverter {
private final MapType typeFromClient;

MapConverter(int initialBatchSize, MapType typeFromClient, GroupType typeFromFile) {
MapColumnReader(int initialBatchSize, MapType typeFromClient, GroupType typeFromFile) {
super(
initialBatchSize,
createElementConverters(initialBatchSize, typeFromClient, typeFromFile));
Expand Down
Loading
Loading