Skip to content

Commit

Permalink
[core] Optimization of Parquet Predicate Pushdown Capability (#4608)
Browse files Browse the repository at this point in the history
  • Loading branch information
Aiden-Dong authored Dec 3, 2024
1 parent 300cc67 commit d33b871
Show file tree
Hide file tree
Showing 18 changed files with 898 additions and 171 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -809,6 +810,68 @@ public void testDeletionVectorsWithFileIndexInFile() throws Exception {
"1|4|500|binary|varbinary|mapKey:mapVal|multiset"));
}

@Test
public void testDeletionVectorsWithParquetFilter() throws Exception {
FileStoreTable table =
createFileStoreTable(
conf -> {
conf.set(BUCKET, 1);
conf.set(DELETION_VECTORS_ENABLED, true);
conf.set(FILE_FORMAT, "parquet");
conf.set("parquet.block.size", "1048576");
conf.set("parquet.page.size", "1024");
});

BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();

BatchTableWrite write =
(BatchTableWrite)
writeBuilder
.newWrite()
.withIOManager(new IOManagerImpl(tempDir.toString()));

for (int i = 0; i < 200000; i++) {
write.write(rowData(1, i, i * 100L));
}

List<CommitMessage> messages = write.prepareCommit();
BatchTableCommit commit = writeBuilder.newCommit();
commit.commit(messages);
write =
(BatchTableWrite)
writeBuilder
.newWrite()
.withIOManager(new IOManagerImpl(tempDir.toString()));
for (int i = 180000; i < 200000; i++) {
write.write(rowDataWithKind(RowKind.DELETE, 1, i, i * 100L));
}

messages = write.prepareCommit();
commit = writeBuilder.newCommit();
commit.commit(messages);

PredicateBuilder builder = new PredicateBuilder(ROW_TYPE);
List<Split> splits = toSplits(table.newSnapshotReader().read().dataSplits());
Random random = new Random();

for (int i = 0; i < 10; i++) {
int value = random.nextInt(180000);
TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter();
assertThat(getResult(read, splits, BATCH_ROW_TO_STRING))
.isEqualTo(
Arrays.asList(
String.format(
"%d|%d|%d|binary|varbinary|mapKey:mapVal|multiset",
1, value, value * 100L)));
}

for (int i = 0; i < 10; i++) {
int value = 180000 + random.nextInt(20000);
TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter();
assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)).isEmpty();
}
}

@Test
public void testDeletionVectorsWithFileIndexInMeta() throws Exception {
FileStoreTable table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.reader.ColumnReader;
import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
import org.apache.paimon.format.parquet.reader.ParquetReadState;
import org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
import org.apache.paimon.format.parquet.type.ParquetField;
import org.apache.paimon.fs.Path;
Expand Down Expand Up @@ -130,7 +131,7 @@ public FileRecordReader<InternalRow> createReader(FormatReaderFactory.Context co
buildFieldsList(projectedType.getFields(), projectedType.getFieldNames(), columnIO);

return new ParquetReader(
reader, requestedSchema, reader.getRecordCount(), poolOfBatches, fields);
reader, requestedSchema, reader.getFilteredRecordCount(), poolOfBatches, fields);
}

private void setReadOptions(ParquetReadOptions.Builder builder) {
Expand Down Expand Up @@ -336,6 +337,10 @@ private class ParquetReader implements FileRecordReader<InternalRow> {

private long nextRowPosition;

private ParquetReadState currentRowGroupReadState;

private long currentRowGroupFirstRowIndex;

/**
* For each request column, the reader to read this column. This is NULL if this column is
* missing from the file, in which case we populate the attribute with NULL.
Expand All @@ -359,6 +364,7 @@ private ParquetReader(
this.totalCountLoadedSoFar = 0;
this.currentRowPosition = 0;
this.nextRowPosition = 0;
this.currentRowGroupFirstRowIndex = 0;
this.fields = fields;
}

Expand Down Expand Up @@ -390,7 +396,8 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
currentRowPosition = nextRowPosition;
}

int num = (int) Math.min(batchSize, totalCountLoadedSoFar - rowsReturned);
int num = getBachSize();

for (int i = 0; i < columnReaders.length; ++i) {
if (columnReaders[i] == null) {
batch.writableVectors[i].fillWithNulls();
Expand All @@ -400,13 +407,13 @@ private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
}
}
rowsReturned += num;
nextRowPosition = currentRowPosition + num;
nextRowPosition = getNextRowPosition(num);
batch.columnarBatch.setNumRows(num);
return true;
}

private void readNextRowGroup() throws IOException {
PageReadStore rowGroup = reader.readNextRowGroup();
PageReadStore rowGroup = reader.readNextFilteredRowGroup();
if (rowGroup == null) {
throw new IOException(
"expecting more rows but reached last block. Read "
Expand All @@ -415,6 +422,9 @@ private void readNextRowGroup() throws IOException {
+ totalRowCount);
}

this.currentRowGroupReadState =
new ParquetReadState(rowGroup.getRowIndexes().orElse(null));

List<Type> types = requestedSchema.getFields();
columnReaders = new ColumnReader[types.size()];
for (int i = 0; i < types.size(); ++i) {
Expand All @@ -429,18 +439,62 @@ private void readNextRowGroup() throws IOException {
0);
}
}

totalCountLoadedSoFar += rowGroup.getRowCount();
if (rowGroup.getRowIndexOffset().isPresent()) {
currentRowPosition = rowGroup.getRowIndexOffset().get();

if (rowGroup.getRowIndexOffset().isPresent()) { // filter
currentRowGroupFirstRowIndex = rowGroup.getRowIndexOffset().get();
long pageIndex = 0;
if (!this.currentRowGroupReadState.isMaxRange()) {
pageIndex = this.currentRowGroupReadState.currentRangeStart();
}
currentRowPosition = currentRowGroupFirstRowIndex + pageIndex;
} else {
if (reader.rowGroupsFiltered()) {
throw new RuntimeException(
"There is a bug, rowIndexOffset must be present when row groups are filtered.");
}
currentRowGroupFirstRowIndex = nextRowPosition;
currentRowPosition = nextRowPosition;
}
}

private int getBachSize() throws IOException {

long rangeBatchSize = Long.MAX_VALUE;
if (this.currentRowGroupReadState.isFinished()) {
throw new IOException(
"expecting more rows but reached last page block. Read "
+ rowsReturned
+ " out of "
+ totalRowCount);
} else if (!this.currentRowGroupReadState.isMaxRange()) {
long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex;
rangeBatchSize = this.currentRowGroupReadState.currentRangeEnd() - pageIndex + 1;
}

return (int)
Math.min(
batchSize,
Math.min(rangeBatchSize, totalCountLoadedSoFar - rowsReturned));
}

private long getNextRowPosition(int num) {
if (this.currentRowGroupReadState.isMaxRange()) {
return this.currentRowPosition + num;
} else {
long pageIndex = this.currentRowPosition - this.currentRowGroupFirstRowIndex;
long nextIndex = pageIndex + num;

if (this.currentRowGroupReadState.currentRangeEnd() < nextIndex) {
this.currentRowGroupReadState.nextRange();
nextIndex = this.currentRowGroupReadState.currentRangeStart();
}

return nextIndex;
}
}

private ParquetReaderBatch getCachedEntry() throws IOException {
try {
return pool.pollEntry();
Expand Down
Loading

0 comments on commit d33b871

Please sign in to comment.