Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Optimization of Parquet Predicate Pushdown Capability #4608

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public FileRecordReader<InternalRow> createReader(FormatReaderFactory.Context co
buildFieldsList(projectedType.getFields(), projectedType.getFieldNames(), columnIO);

return new ParquetReader(
reader, requestedSchema, reader.getRecordCount(), poolOfBatches, fields);
reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields);
}

private void setReadOptions(ParquetReadOptions.Builder builder) {
Expand Down Expand Up @@ -406,7 +406,7 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
}

private void readNextRowGroup() throws IOException {
PageReadStore rowGroup = reader.readNextRowGroup();
PageReadStore rowGroup = reader.readNextFilteredRowGroup();
if (rowGroup == null) {
throw new IOException(
"expecting more rows but reached last block. Read "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.parquet.column.page.DataPageV1;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.ParquetDecodingException;
Expand Down Expand Up @@ -65,20 +66,16 @@ public abstract class AbstractColumnReader<VECTOR extends WritableColumnVector>

protected final ColumnDescriptor descriptor;

/** Total number of values read. */
private long valuesRead;

/**
* value that indicates the end of the current page. That is, if valuesRead ==
* endOfPageValueCount, we are at the end of the page.
*/
private long endOfPageValueCount;

/** If true, the current page is dictionary encoded. */
private boolean isCurrentPageDictionaryEncoded;

/** Total values in the current page. */
private int pageValueCount;
// private int pageValueCount;

/**
* Helper struct to track intermediate states while reading Parquet pages in the column chunk.
*/
private final ParquetReadState readState;

/*
* Input streams:
Expand All @@ -101,12 +98,14 @@ public abstract class AbstractColumnReader<VECTOR extends WritableColumnVector>
/** Dictionary decoder to wrap dictionary ids input stream. */
private RunLengthDecoder dictionaryIdsDecoder;

public AbstractColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
public AbstractColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore)
throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
this.pageReader = pageReadStore.getPageReader(descriptor);
this.maxDefLevel = descriptor.getMaxDefinitionLevel();

this.readState = new ParquetReadState(pageReadStore.getRowIndexes().orElse(null));

DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
if (dictionaryPage != null) {
try {
Expand Down Expand Up @@ -147,56 +146,136 @@ public final void readToVector(int readNumber, VECTOR vector) throws IOException
if (dictionary != null) {
dictionaryIds = vector.reserveDictionaryIds(readNumber);
}
while (readNumber > 0) {

readState.resetForNewBatch(readNumber);

while (readState.rowsToReadInBatch > 0) {
// Compute the number of values we want to read in this page.
int leftInPage = (int) (endOfPageValueCount - valuesRead);
if (leftInPage == 0) {
DataPage page = pageReader.readPage();
if (page instanceof DataPageV1) {
readPageV1((DataPageV1) page);
} else if (page instanceof DataPageV2) {
readPageV2((DataPageV2) page);
} else {
throw new RuntimeException("Unsupported page type: " + page.getClass());
if (readState.valuesToReadInPage == 0) {
int pageValueCount = readPage();
if (pageValueCount < 0) {
// we've read all the pages; this could happen when we're reading a repeated
// list and we
// don't know where the list will end until we've seen all the pages.
break;
}
leftInPage = (int) (endOfPageValueCount - valuesRead);
}
int num = Math.min(readNumber, leftInPage);
if (isCurrentPageDictionaryEncoded) {
// Read and decode dictionary ids.
runLenDecoder.readDictionaryIds(
num, dictionaryIds, vector, rowId, maxDefLevel, this.dictionaryIdsDecoder);

if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
// Column vector supports lazy decoding of dictionary values so just set the
// dictionary.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e.
// some
// non-dictionary encoded values have already been added).
vector.setDictionary(new ParquetDictionary(dictionary));

if (readState.isFinished()) {
break;
}

long pageRowId = readState.rowId;
int leftInBatch = readState.rowsToReadInBatch;
int leftInPage = readState.valuesToReadInPage;

int readBatch = Math.min(leftInBatch, leftInPage);

long rangeStart = readState.currentRangeStart();
long rangeEnd = readState.currentRangeEnd();

if (pageRowId < rangeStart) {
int toSkip = (int) (rangeStart - pageRowId);
if (toSkip >= leftInPage) { // drop page
pageRowId += leftInPage;
leftInPage = 0;
} else {
readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
if (isCurrentPageDictionaryEncoded) {
runLenDecoder.skipDictionaryIds(
toSkip, maxDefLevel, this.dictionaryIdsDecoder);
pageRowId += toSkip;
leftInPage -= toSkip;
} else {
skipBatch(toSkip);
pageRowId += toSkip;
leftInPage -= toSkip;
}
}
} else if (pageRowId > rangeEnd) {
readState.nextRange();
} else {
if (vector.hasDictionary() && rowId != 0) {
// This batch already has dictionary encoded values but this new page is not.
// The batch
// does not support a mix of dictionary and not so we will decode the
// dictionary.
readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds());
long start = pageRowId;
long end = Math.min(rangeEnd, pageRowId + readBatch - 1);
int num = (int) (end - start + 1);

if (isCurrentPageDictionaryEncoded) {
// Read and decode dictionary ids.
runLenDecoder.readDictionaryIds(
num,
dictionaryIds,
vector,
rowId,
maxDefLevel,
this.dictionaryIdsDecoder);

if (vector.hasDictionary() || (rowId == 0 && supportLazyDecode())) {
// Column vector supports lazy decoding of dictionary values so just set the
// dictionary.
// We can't do this if rowId != 0 AND the column doesn't have a dictionary
// (i.e.
// some
// non-dictionary encoded values have already been added).
vector.setDictionary(new ParquetDictionary(dictionary));
} else {
readBatchFromDictionaryIds(rowId, num, vector, dictionaryIds);
}
} else {
if (vector.hasDictionary() && rowId != 0) {
// This batch already has dictionary encoded values but this new page is
// not.
// The batch
// does not support a mix of dictionary and not so we will decode the
// dictionary.
readBatchFromDictionaryIds(0, rowId, vector, vector.getDictionaryIds());
}
vector.setDictionary(null);
readBatch(rowId, num, vector);
}
vector.setDictionary(null);
readBatch(rowId, num, vector);
leftInBatch -= num;
pageRowId += num;
leftInPage -= num;
rowId += num;
}
readState.rowsToReadInBatch = leftInBatch;
readState.valuesToReadInPage = leftInPage;
readState.rowId = pageRowId;
}
}

valuesRead += num;
rowId += num;
readNumber -= num;
private int readPage() {
DataPage page = pageReader.readPage();
if (page == null) {
return -1;
}
long pageFirstRowIndex = page.getFirstRowIndex().orElse(0L);

int pageValueCount =
page.accept(
new DataPage.Visitor<Integer>() {
@Override
public Integer visit(DataPageV1 dataPageV1) {
try {
return readPageV1(dataPageV1);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Override
public Integer visit(DataPageV2 dataPageV2) {
try {
return readPageV2(dataPageV2);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
readState.resetForNewPage(pageValueCount, pageFirstRowIndex);
return pageValueCount;
}

private void readPageV1(DataPageV1 page) throws IOException {
this.pageValueCount = page.getValueCount();
private int readPageV1(DataPageV1 page) throws IOException {
int pageValueCount = page.getValueCount();
ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);

// Initialize the decoders.
Expand All @@ -211,30 +290,31 @@ private void readPageV1(DataPageV1 page) throws IOException {
ByteBufferInputStream in = bytes.toInputStream();
rlReader.initFromPage(pageValueCount, in);
this.runLenDecoder.initFromStream(pageValueCount, in);
prepareNewPage(page.getValueEncoding(), in);
prepareNewPage(page.getValueEncoding(), in, pageValueCount);
return pageValueCount;
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
}

private void readPageV2(DataPageV2 page) throws IOException {
this.pageValueCount = page.getValueCount();
private int readPageV2(DataPageV2 page) throws IOException {
int pageValueCount = page.getValueCount();

int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
// do not read the length from the stream. v2 pages handle dividing the page bytes.
this.runLenDecoder = new RunLengthDecoder(bitWidth, false);
this.runLenDecoder.initFromStream(
this.pageValueCount, page.getDefinitionLevels().toInputStream());
pageValueCount, page.getDefinitionLevels().toInputStream());
try {
prepareNewPage(page.getDataEncoding(), page.getData().toInputStream());
prepareNewPage(page.getDataEncoding(), page.getData().toInputStream(), pageValueCount);
return pageValueCount;
} catch (IOException e) {
throw new IOException("could not read page " + page + " in col " + descriptor, e);
}
}

private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream in)
private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream in, int pageValueCount)
throws IOException {
this.endOfPageValueCount = valuesRead + pageValueCount;
if (dataEncoding.usesDictionary()) {
if (dictionary == null) {
throw new IOException(
Expand Down Expand Up @@ -269,6 +349,14 @@ private void prepareNewPage(Encoding dataEncoding, ByteBufferInputStream in)
afterReadPage();
}

final void skipDataBuffer(int length) {
try {
dataInputStream.skipFully(length);
} catch (IOException e) {
throw new ParquetDecodingException("Failed to skip " + length + " bytes", e);
}
}

final ByteBuffer readDataBuffer(int length) {
try {
return dataInputStream.slice(length).order(ByteOrder.LITTLE_ENDIAN);
Expand All @@ -291,6 +379,8 @@ protected boolean supportLazyDecode() {
/** Read batch from {@link #runLenDecoder} and {@link #dataInputStream}. */
protected abstract void readBatch(int rowId, int num, VECTOR column);

protected abstract void skipBatch(int num);

/**
* Decode dictionary ids to data. From {@link #runLenDecoder} and {@link #dictionaryIdsDecoder}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.paimon.data.columnar.writable.WritableIntVector;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.schema.PrimitiveType;

Expand All @@ -36,9 +36,9 @@ public class BooleanColumnReader extends AbstractColumnReader<WritableBooleanVec

private byte currentByte = 0;

public BooleanColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
public BooleanColumnReader(ColumnDescriptor descriptor, PageReadStore pageReadStore)
throws IOException {
super(descriptor, pageReader);
super(descriptor, pageReadStore);
checkTypeName(PrimitiveType.PrimitiveTypeName.BOOLEAN);
}

Expand Down Expand Up @@ -94,6 +94,36 @@ protected void readBatch(int rowId, int num, WritableBooleanVector column) {
}
}

@Override
protected void skipBatch(int num) {
int left = num;
while (left > 0) {
if (runLenDecoder.currentCount == 0) {
runLenDecoder.readNextGroup();
}
int n = Math.min(left, runLenDecoder.currentCount);
switch (runLenDecoder.mode) {
case RLE:
if (runLenDecoder.currentValue == maxDefLevel) {
for (int i = 0; i < n; i++) {
readBoolean();
}
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (runLenDecoder.currentBuffer[runLenDecoder.currentBufferIdx++]
== maxDefLevel) {
readBoolean();
}
}
break;
}
left -= n;
runLenDecoder.currentCount -= n;
}
}

private boolean readBoolean() {
if (bitOffset == 0) {
try {
Expand Down
Loading
Loading