Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-1452: implement Size() filter for repeated columns #3098

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.parquet.filter2.predicate.Operators.NotEq;
import org.apache.parquet.filter2.predicate.Operators.NotIn;
import org.apache.parquet.filter2.predicate.Operators.Or;
import org.apache.parquet.filter2.predicate.Operators.Size;
import org.apache.parquet.filter2.predicate.Operators.UserDefined;

/**
Expand Down Expand Up @@ -97,6 +98,11 @@ public <T extends Comparable<T>> FilterPredicate visit(Contains<T> contains) {
return contains;
}

@Override
public FilterPredicate visit(Size size) {
return size;
}

@Override
public FilterPredicate visit(And and) {
final FilterPredicate left;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.parquet.filter2.predicate.Operators.NotIn;
import org.apache.parquet.filter2.predicate.Operators.Or;
import org.apache.parquet.filter2.predicate.Operators.SingleColumnFilterPredicate;
import org.apache.parquet.filter2.predicate.Operators.Size;
import org.apache.parquet.filter2.predicate.Operators.SupportsEqNotEq;
import org.apache.parquet.filter2.predicate.Operators.SupportsLtGt;
import org.apache.parquet.filter2.predicate.Operators.UserDefined;
Expand Down Expand Up @@ -263,6 +264,10 @@ public static <T extends Comparable<T>, P extends SingleColumnFilterPredicate<T>
return Contains.of(pred);
}

public static Size size(Column<?> column, Size.Operator operator, int value) {
return new Size(column, operator, value);
}

/**
* Keeps records that pass the provided {@link UserDefinedPredicate}
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.parquet.filter2.predicate.Operators.NotEq;
import org.apache.parquet.filter2.predicate.Operators.NotIn;
import org.apache.parquet.filter2.predicate.Operators.Or;
import org.apache.parquet.filter2.predicate.Operators.Size;
import org.apache.parquet.filter2.predicate.Operators.UserDefined;

/**
Expand Down Expand Up @@ -89,6 +90,10 @@ default <T extends Comparable<T>> R visit(Contains<T> contains) {
throw new UnsupportedOperationException("visit Contains is not supported.");
}

default R visit(Size size) {
throw new UnsupportedOperationException("visit Size is not supported.");
}

R visit(And and);

R visit(Or or);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.parquet.filter2.predicate.Operators.NotEq;
import org.apache.parquet.filter2.predicate.Operators.NotIn;
import org.apache.parquet.filter2.predicate.Operators.Or;
import org.apache.parquet.filter2.predicate.Operators.Size;
import org.apache.parquet.filter2.predicate.Operators.UserDefined;

/**
Expand Down Expand Up @@ -104,6 +105,11 @@ public <T extends Comparable<T>> FilterPredicate visit(Contains<T> contains) {
return contains;
}

@Override
public FilterPredicate visit(Size size) {
return size;
}

@Override
public FilterPredicate visit(And and) {
return and(and.getLeft().accept(this), and.getRight().accept(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.parquet.filter2.predicate.Operators.NotEq;
import org.apache.parquet.filter2.predicate.Operators.NotIn;
import org.apache.parquet.filter2.predicate.Operators.Or;
import org.apache.parquet.filter2.predicate.Operators.Size;
import org.apache.parquet.filter2.predicate.Operators.UserDefined;

/**
Expand Down Expand Up @@ -98,6 +99,19 @@ public <T extends Comparable<T>> FilterPredicate visit(Contains<T> contains) {
return contains.not();
}

@Override
public FilterPredicate visit(Size size) {
final int value = size.getValue();
final Operators.Column<?> column = size.getColumn();

return size.filter(
(eq) -> new Or(new Size(column, Size.Operator.LT, value), new Size(column, Size.Operator.GT, value)),
(lt) -> new Size(column, Size.Operator.GTE, value),
(ltEq) -> new Size(column, Size.Operator.GT, value),
(gt) -> new Size(column, Size.Operator.LTE, value),
(gtEq) -> new Size(column, Size.Operator.LT, value));
}

@Override
public FilterPredicate visit(And and) {
return new Or(and.getLeft().accept(this), and.getRight().accept(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,82 @@ public <R> R filter(
}
}

public static final class Size implements FilterPredicate, Serializable {
public enum Operator {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support notEqual for completeness, though not that useful?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking the same for the LogicalInverter above

EQ,
LT,
LTE,
GT,
GTE
Comment on lines +512 to +514
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LTE,
GT,
GTE
LE,
GT,
GE

IIRC, these are commonly used abbreviations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

private final Column<?> column;
private final Operator operator;
private final int value;

Size(Column<?> column, Operator operator, int value) {
this.column = column;
this.operator = operator;
if (value < 0 || (operator == Operator.LT && value == 0)) {
throw new IllegalArgumentException("Invalid predicate " + this + ": array size can never be negative");
}
this.value = value;
}

@Override
public <R> R accept(Visitor<R> visitor) {
return visitor.visit(this);
}

public int getValue() {
return value;
}

public Column<?> getColumn() {
return column;
}

public <R> R filter(
Function<Integer, R> onEq,
Function<Integer, R> onLt,
Function<Integer, R> onLtEq,
Function<Integer, R> onGt,
Function<Integer, R> onGtEq) {
if (operator == Operator.EQ) {
return onEq.apply(value);
} else if (operator == Operator.LT) {
return onLt.apply(value);
} else if (operator == Operator.LTE) {
return onLtEq.apply(value);
} else if (operator == Operator.GT) {
return onGt.apply(value);
} else if (operator == Operator.GTE) {
return onGtEq.apply(value);
} else {
throw new UnsupportedOperationException("Operator " + operator + " cannot be used with size() filter");
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

return column.equals(((Size) o).column) && operator == ((Size) o).operator && value == ((Size) o).value;
}

@Override
public int hashCode() {
return Objects.hash(column, operator, value);
}

@Override
public String toString() {
return "size(" + column.getColumnPath().toDotString() + " "
+ operator.toString().toLowerCase() + " " + value + ")";
}
}

public static final class NotIn<T extends Comparable<T>> extends SetColumnFilterPredicate<T> {

NotIn(Column<T> column, Set<T> values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import org.apache.parquet.filter2.predicate.Operators.NotIn;
import org.apache.parquet.filter2.predicate.Operators.Or;
import org.apache.parquet.filter2.predicate.Operators.SetColumnFilterPredicate;
import org.apache.parquet.filter2.predicate.Operators.Size;
import org.apache.parquet.filter2.predicate.Operators.UserDefined;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

/**
* Inspects the column types found in the provided {@link FilterPredicate} and compares them
Expand Down Expand Up @@ -135,6 +137,12 @@ public <T extends Comparable<T>> Void visit(Contains<T> pred) {
return null;
}

@Override
public Void visit(Size size) {
validateColumn(size.getColumn(), true, true);
return null;
}

@Override
public Void visit(And and) {
and.getLeft().accept(this);
Expand Down Expand Up @@ -175,14 +183,15 @@ private <T extends Comparable<T>> void validateColumnFilterPredicate(SetColumnFi
}

private <T extends Comparable<T>> void validateColumnFilterPredicate(Contains<T> pred) {
validateColumn(pred.getColumn(), true);
validateColumn(pred.getColumn(), true, false);
}

private <T extends Comparable<T>> void validateColumn(Column<T> column) {
validateColumn(column, false);
validateColumn(column, false, false);
}

private <T extends Comparable<T>> void validateColumn(Column<T> column, boolean shouldBeRepeated) {
private <T extends Comparable<T>> void validateColumn(
Column<T> column, boolean isRepeatedColumn, boolean mustBeRequired) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it precise to rename these parameters as below?

  • isRepeatedColumn -> expectRepeated
  • mustBeRequired -> expectRequired

ColumnPath path = column.getColumnPath();

Class<?> alreadySeen = columnTypesEncountered.get(path);
Expand All @@ -204,15 +213,21 @@ private <T extends Comparable<T>> void validateColumn(Column<T> column, boolean
return;
}

if (shouldBeRepeated && descriptor.getMaxRepetitionLevel() == 0) {
if (isRepeatedColumn && descriptor.getMaxRepetitionLevel() == 0) {
throw new IllegalArgumentException(
"FilterPredicate for column " + path.toDotString() + " requires a repeated "
+ "schema, but found max repetition level " + descriptor.getMaxRepetitionLevel());
} else if (!shouldBeRepeated && descriptor.getMaxRepetitionLevel() > 0) {
} else if (!isRepeatedColumn && descriptor.getMaxRepetitionLevel() > 0) {
throw new IllegalArgumentException("FilterPredicates do not currently support repeated columns. "
+ "Column " + path.toDotString() + " is repeated.");
}

if (mustBeRequired && descriptor.getPrimitiveType().isRepetition(Type.Repetition.OPTIONAL)) {
throw new IllegalArgumentException("FilterPredicate for column " + path.toDotString()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the column is List<List<Int32>> and elements are required, we can only support the size operator on the inner list?

+ " requires schema to have repetition REQUIRED, but found "
+ descriptor.getPrimitiveType().getRepetition() + ".");
}

ValidTypeMap.assertTypeValid(column, descriptor.getType());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Arrays;
import java.util.Objects;
import java.util.function.Function;
import org.apache.parquet.io.api.Binary;

/**
Expand Down Expand Up @@ -223,6 +224,83 @@ public void reset() {
}
}

class CountingValueInspector extends ValueInspector {
private int observedValueCount;
private final ValueInspector delegate;

/**
* Triggering function to update the underlying delegate. We want to be careful not to trigger before
* all relevant column values have been considered.
*
* For example, given the predicate `size(col, LT, 3)` and a record with 4 array values, we don't want the
* underlying `lt(3)` predicate to be evaluated on the first or second elements of the array, since it would
* return a premature True value.
*/
private final Function<Integer, Boolean> shouldUpdateDelegate;

public CountingValueInspector(ValueInspector delegate, Function<Integer, Boolean> shouldUpdateDelegate) {
this.observedValueCount = 0;
this.delegate = delegate;
this.shouldUpdateDelegate = shouldUpdateDelegate;
}

@Override
public void updateNull() {
delegate.update(observedValueCount);
if (!delegate.isKnown()) {
delegate.updateNull();
}
setResult(delegate.getResult());
}

@Override
public void update(int value) {
incrementCount();
}

@Override
public void update(long value) {
incrementCount();
}

@Override
public void update(double value) {
incrementCount();
}

@Override
public void update(float value) {
incrementCount();
}

@Override
public void update(boolean value) {
incrementCount();
}

@Override
public void update(Binary value) {
incrementCount();
}

@Override
public void reset() {
super.reset();
delegate.reset();
observedValueCount = 0;
}

private void incrementCount() {
observedValueCount++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it better to check whether the result is known before doing anything?

if (!delegate.isKnown() && shouldUpdateDelegate.apply(observedValueCount)) {
delegate.update(observedValueCount);
if (delegate.isKnown()) {
setResult(delegate.getResult());
}
}
}
}

// base class for and / or
abstract static class BinaryLogical implements IncrementallyUpdatedFilterPredicate {
private final IncrementallyUpdatedFilterPredicate left;
Expand Down
Loading