Skip to content

Commit

Permalink
PARQUET-34: Implement page-level Size filter
Browse files Browse the repository at this point in the history
  • Loading branch information
clairemcginty committed Jan 20, 2025
1 parent 2e8ba22 commit 9586427
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,69 @@ public <T extends Comparable<T>> PrimitiveIterator.OfInt visit(Contains<T> conta

@Override
public PrimitiveIterator.OfInt visit(Size size) {
return IndexIterator.all(getPageCount());
if (repLevelHistogram == null || defLevelHistogram == null) {
return IndexIterator.all(getPageCount());
}

final int[] repLevelOffsets = calculateOffsetsForHistogram(repLevelHistogram, nullPages);
final int[] defLevelOffsets = calculateOffsetsForHistogram(defLevelHistogram, nullPages);

return IndexIterator.filter(getPageCount(), pageIndex -> {
final boolean isFinalPage = pageIndex + 1 == nullPages.length;
final List<Long> pageRepLevelHistogram = getRepetitionLevelHistogram()
.subList(
repLevelOffsets[pageIndex],
isFinalPage ? repLevelHistogram.length : repLevelOffsets[pageIndex + 1]);
final List<Long> pageDefLevelHistogram = getDefinitionLevelHistogram()
.subList(
defLevelOffsets[pageIndex],
isFinalPage ? defLevelHistogram.length : defLevelOffsets[pageIndex + 1]);

if (pageRepLevelHistogram.isEmpty() || pageDefLevelHistogram.isEmpty()) {
// Page might match; cannot be filtered out
return true;
}

final int defLevelCount = pageDefLevelHistogram.size();

// If all values have repetition level 0, then no array has more than 1 element
if (pageRepLevelHistogram.size() == 1
|| (pageRepLevelHistogram.get(0) > 0
&& pageRepLevelHistogram.subList(1, pageRepLevelHistogram.size()).stream()
.allMatch(l -> l == 0))) {

if (
// all lists are null or empty
(pageDefLevelHistogram.subList(1, defLevelCount).stream().allMatch(l -> l == 0))) {
return size.filter(
(eq) -> eq <= 0, (lt) -> true, (lte) -> true, (gt) -> gt < 0, (gte) -> gte <= 0);
}

final int maxDefinitionLevel = defLevelCount - 1;

// If all repetition levels are zero and all definition levels are > MAX_DEFINITION_LEVEL - 1, all
// lists are of size 1
if (pageDefLevelHistogram.subList(0, maxDefinitionLevel - 1).stream()
.allMatch(l -> l == 0)) {
return size.filter(
(eq) -> eq == 1, (lt) -> lt > 1, (lte) -> lte >= 1, (gt) -> gt < 1, (gte) -> gte <= 1);
}
}

final long nonNullElementCount =
pageRepLevelHistogram.stream().mapToLong(l -> l).sum() - pageDefLevelHistogram.get(0);
final long numNonNullRecords = pageRepLevelHistogram.get(0) - pageDefLevelHistogram.get(0);

// Given the total number of elements and non-null fields, we can compute the max size of any array
// field
final long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords);
return size.filter(
(eq) -> eq <= maxArrayElementCount,
(lt) -> true,
(lte) -> true,
(gt) -> gt < maxArrayElementCount,
(gte) -> gte <= maxArrayElementCount);
});
}

@Override
Expand Down Expand Up @@ -444,6 +506,23 @@ public boolean test(int pageIndex) {
}
});
}

// Calculates each page's starting offset in a concatenated histogram
private static int[] calculateOffsetsForHistogram(long[] histogram, boolean[] nullPages) {
final int numNullPages =
(int) BooleanList.of(nullPages).stream().filter(p -> p).count();
final int numNonNullPages = nullPages.length - numNullPages;
final int numLevelsPerNonNullPage = (histogram.length - numNullPages) / numNonNullPages;

int[] offsets = new int[nullPages.length];
int currOffset = 0;
for (int i = 0; i < nullPages.length; ++i) {
offsets[i] = currOffset;
currOffset += (nullPages[i] ? 1 : numLevelsPerNonNullPage);
}

return offsets;
}
}

private static final ColumnIndexBuilder NO_OP_BUILDER = new ColumnIndexBuilder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static org.apache.parquet.filter2.predicate.FilterApi.notEq;
import static org.apache.parquet.filter2.predicate.FilterApi.notIn;
import static org.apache.parquet.filter2.predicate.FilterApi.or;
import static org.apache.parquet.filter2.predicate.FilterApi.size;
import static org.apache.parquet.filter2.predicate.FilterApi.userDefined;
import static org.apache.parquet.filter2.predicate.LogicalInverter.invert;
import static org.apache.parquet.schema.OriginalType.DECIMAL;
Expand All @@ -56,6 +57,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.common.collect.ImmutableList;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -64,9 +66,11 @@
import java.util.List;
import java.util.Set;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.statistics.SizeStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.filter2.predicate.ContainsRewriter;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.Operators;
import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
Expand Down Expand Up @@ -1627,6 +1631,99 @@ public void testNoOpBuilder() {
assertNull(builder.build());
}

@Test
public void testSizeRequiredElements() {
final PrimitiveType type = Types.required(DOUBLE).named("element");
final DoubleColumn col = doubleColumn(type.getName());

final List<List<Double>> pageValueList = new ArrayList<>();
pageValueList.add(ImmutableList.of(1.0, 2.0, 3.0));
pageValueList.add(ImmutableList.of(1.0, 2.0, 3.0, 4.0, 5.0));
pageValueList.add(ImmutableList.of(-1.0));
pageValueList.add(ImmutableList.of());
pageValueList.add(null);

final ColumnIndex columnIndex = createArrayColumnIndex(type, pageValueList);

assertEquals(BoundaryOrder.UNORDERED, columnIndex.getBoundaryOrder());
assertCorrectNullCounts(columnIndex, 0, 0, 0, 0, 0);
assertCorrectNullPages(columnIndex, false, false, false, true, true);
assertCorrectValues(columnIndex.getMaxValues(), 3.0, 5.0, -1.0, null, null);
assertCorrectValues(columnIndex.getMinValues(), 1.0, 1.0, -1.0, null, null);

// we know max array size is 5; all elements of page 2 have size 1; and page 3 and 4 are null or empty
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 0), 0, 1, 3, 4);
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 4), 1);
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 3), 0, 1);
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LT, 2), 0, 1, 2, 3, 4);
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LTE, 1), 0, 1, 2, 3, 4);
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 0), 0, 1, 2);
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 0), 0, 1, 2, 3, 4);
}

@Test
public void testSizeOptionalElements() {
final PrimitiveType type = Types.optional(DOUBLE).named("element");
final DoubleColumn col = doubleColumn(type.getName());

final List<Double> listWithNulls = new ArrayList<>();
listWithNulls.add(null);
listWithNulls.add(3.0);
listWithNulls.add(null);

final List<List<Double>> pageValueList = new ArrayList<>();
pageValueList.add(listWithNulls);

final ColumnIndex columnIndex = createArrayColumnIndex(type, pageValueList);

assertCorrectNullCounts(columnIndex, 2);
assertCorrectNullPages(columnIndex, false);
assertCorrectValues(columnIndex.getMaxValues(), 3.0);
assertCorrectValues(columnIndex.getMinValues(), 3.0);

// We know that the array values for the page have min size 0 and max size 3
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 0), 0);
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.EQ, 5));
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LT, 4), 0);
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.LTE, 3), 0);
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 0), 0);
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GT, 3));
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 3), 0);
assertCorrectFiltering(columnIndex, size(col, Operators.Size.Operator.GTE, 4));
}

private static ColumnIndex createArrayColumnIndex(PrimitiveType type, List<List<Double>> pageValueList) {
final ColumnIndexBuilder builder = ColumnIndexBuilder.getBuilder(type, Integer.MAX_VALUE);

for (List<Double> pageValues : pageValueList) {
final StatsBuilder sb = new StatsBuilder();
boolean isNullOrEmpty = pageValues == null || pageValues.isEmpty();

final SizeStatistics.Builder sizeStatistics =
SizeStatistics.newBuilder(type, isNullOrEmpty ? 0 : 1, isNullOrEmpty ? 0 : 1);

if (isNullOrEmpty) sizeStatistics.add(0, 0);

if (pageValues != null) {
for (int i = 0; i < pageValues.size(); i++) {
if (i == 0) {
sizeStatistics.add(0, 1);
} else {
sizeStatistics.add(1, 1);
}
}
}

if (pageValues == null) {
builder.add(sb.stats(type), sizeStatistics.build());
} else {
builder.add(sb.stats(type, pageValues.toArray(new Double[0])), sizeStatistics.build());
}
}

return builder.build();
}

private static List<ByteBuffer> toBBList(Binary... values) {
List<ByteBuffer> buffers = new ArrayList<>(values.length);
for (Binary value : values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ public <T extends Comparable<T>> Boolean visit(Contains<T> contains) {
return contains.filter(this, (l, r) -> l || r, (l, r) -> l && r, v -> BLOCK_MIGHT_MATCH);
}

/**
* Logically equivalent to {@link org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder},
* but for block granularity
**/
@Override
public Boolean visit(Size size) {
final ColumnChunkMetaData metadata = getColumnChunk(size.getColumn().getColumnPath());
Expand All @@ -239,40 +243,37 @@ public Boolean visit(Size size) {

// If all values have repetition level 0, then no array has more than 1 element
if (repetitionLevelHistogram.size() == 1
|| repetitionLevelHistogram.subList(1, repetitionLevelHistogram.size()).stream()
.allMatch(l -> l == 0)) {

// Null list fields are treated as having size 0
if (( // all lists are nulls
definitionLevelHistogram.subList(1, definitionLevelHistogram.size()).stream()
.allMatch(l -> l == 0))
|| // all lists are size 0
(definitionLevelHistogram.get(0) == 0
&& definitionLevelHistogram.subList(2, definitionLevelHistogram.size()).stream()
.allMatch(l -> l == 0))) {
|| (repetitionLevelHistogram.get(0) > 0
&& repetitionLevelHistogram.subList(1, repetitionLevelHistogram.size()).stream()
.allMatch(l -> l == 0))) {

// All lists are null or empty
if ((definitionLevelHistogram.subList(1, definitionLevelHistogram.size()).stream()
.allMatch(l -> l == 0))) {

final boolean blockCannotMatch =
size.filter((eq) -> eq > 0, (lt) -> false, (lte) -> false, (gt) -> gt >= 0, (gte) -> gte > 0);
return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH;
}

long maxDefinitionLevel = definitionLevelHistogram.get(definitionLevelHistogram.size() - 1);
final int maxDefinitionLevel = definitionLevelHistogram.size() - 1;

// If all repetition levels are zero and all definitions level are > MAX_DEFINITION_LEVEL - 1, all lists
// If all repetition levels are zero and all definition levels are > MAX_DEFINITION_LEVEL - 1, all lists
// are of size 1
if (definitionLevelHistogram.stream().allMatch(l -> l > maxDefinitionLevel - 1)) {
if (definitionLevelHistogram.subList(0, maxDefinitionLevel - 1).stream()
.allMatch(l -> l == 0)) {
final boolean blockCannotMatch = size.filter(
(eq) -> eq != 1, (lt) -> lt <= 1, (lte) -> lte < 1, (gt) -> gt >= 1, (gte) -> gte > 1);

return blockCannotMatch ? BLOCK_CANNOT_MATCH : BLOCK_MIGHT_MATCH;
}
}
long nonNullElementCount =
final long nonNullElementCount =
repetitionLevelHistogram.stream().mapToLong(l -> l).sum() - definitionLevelHistogram.get(0);
long numNonNullRecords = repetitionLevelHistogram.get(0) - definitionLevelHistogram.get(0);
final long numNonNullRecords = repetitionLevelHistogram.get(0) - definitionLevelHistogram.get(0);

// Given the total number of elements and non-null fields, we can compute the max size of any array field
long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords);
final long maxArrayElementCount = 1 + (nonNullElementCount - numNonNullRecords);
final boolean blockCannotMatch = size.filter(
(eq) -> eq > maxArrayElementCount,
(lt) -> false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,23 @@ public void testSizeFilterRequiredGroupRequiredElements() throws Exception {
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta));

// Case 3: all lists have size 1
columnMeta = Collections.singletonList(getIntColumnMeta(
nestedListColumn.getColumnPath(),
minMaxStats,
createSizeStatisticsForRepeatedField(true, ImmutableList.of(ImmutableList.of(1), ImmutableList.of(1))),
2));

// We know that records have max array size 1
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 2), columnMeta));
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 1), columnMeta));
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 2), columnMeta));

// These predicates should not be able to filter out the page
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 2), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 1), columnMeta));
}

@Test
Expand Down Expand Up @@ -563,25 +580,47 @@ public void testSizeFilterOptionalGroup() throws Exception {
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 6), columnMeta));

// These predicates should not be able to filter out the page
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 3), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 2), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 5), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 3), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta));

// Case 2: List is null
// Case 2: Lists are null
final List<List<Integer>> listWithNull = new ArrayList<>();
listWithNull.add(null);
listWithNull.add(null);
columnMeta = Collections.singletonList(getIntColumnMeta(
nestedListColumn.getColumnPath(),
minMaxStats,
createSizeStatisticsForRepeatedField(true, listWithNull),
5));
2));

// These predicates should be able to filter out the page
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta));
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta));
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta));

// These predicates should not be able to filter out the page
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 0), columnMeta));

// Case 3: lists are empty
columnMeta = Collections.singletonList(getIntColumnMeta(
nestedListColumn.getColumnPath(),
minMaxStats,
createSizeStatisticsForRepeatedField(true, ImmutableList.of(ImmutableList.of(), ImmutableList.of())),
2));

// These predicates should be able to filter out the page
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GT, 0), columnMeta));
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 1), columnMeta));
assertTrue(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 5), columnMeta));

// These predicates should not be able to filter out the page
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.GTE, 0), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LTE, 1), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.LT, 1), columnMeta));
assertFalse(canDrop(size(nestedListColumn, Operators.Size.Operator.EQ, 0), columnMeta));
Expand Down Expand Up @@ -615,7 +654,6 @@ private static SizeStatistics createSizeStatisticsForRepeatedField(
for (List<Integer> arrayValue : arrayValues) {
final SimpleGroup record = new SimpleGroup(messageSchema);
final Group nestedGroup = record.addGroup("nestedGroup");

if (arrayValue != null) {
Group listField = nestedGroup.addGroup("listField");
for (Integer value : arrayValue) {
Expand Down

0 comments on commit 9586427

Please sign in to comment.