Skip to content

Commit

Permalink
PARQUET-34: Add #contains FilterPredicate for Array columns (apache#1328
Browse files Browse the repository at this point in the history
)
  • Loading branch information
clairemcginty authored Jun 4, 2024
1 parent 8b91d6c commit dab5aae
Show file tree
Hide file tree
Showing 28 changed files with 1,411 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Objects;
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.predicate.ContainsRewriter;
import org.apache.parquet.filter2.predicate.FilterPredicate;
import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
import org.slf4j.Logger;
Expand Down Expand Up @@ -82,7 +83,12 @@ public static Filter get(FilterPredicate filterPredicate) {
LOG.info("Predicate has been collapsed to: {}", collapsedPredicate);
}

return new FilterPredicateCompat(collapsedPredicate);
FilterPredicate rewrittenContainsPredicate = ContainsRewriter.rewrite(collapsedPredicate);
if (!collapsedPredicate.equals(rewrittenContainsPredicate)) {
LOG.info("Contains() Predicate has been rewritten to: {}", rewrittenContainsPredicate);
}

return new FilterPredicateCompat(rewrittenContainsPredicate);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.filter2.predicate;

import java.util.Objects;
import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
import org.apache.parquet.filter2.predicate.Operators.In;
import org.apache.parquet.filter2.predicate.Operators.LogicalNotUserDefined;
import org.apache.parquet.filter2.predicate.Operators.Lt;
import org.apache.parquet.filter2.predicate.Operators.LtEq;
import org.apache.parquet.filter2.predicate.Operators.Not;
import org.apache.parquet.filter2.predicate.Operators.NotEq;
import org.apache.parquet.filter2.predicate.Operators.NotIn;
import org.apache.parquet.filter2.predicate.Operators.Or;
import org.apache.parquet.filter2.predicate.Operators.UserDefined;

/**
* Recursively rewrites Contains predicates composed using And or Or into a single Contains predicate
* containing all predicate assertions.
*
* This is a performance optimization, as all composed Contains sub-predicates must share the same column, and
* can therefore be applied efficiently as a single predicate pass.
*/
public final class ContainsRewriter implements Visitor<FilterPredicate> {
private static final ContainsRewriter INSTANCE = new ContainsRewriter();

public static FilterPredicate rewrite(FilterPredicate pred) {
Objects.requireNonNull(pred, "pred cannot be null");
return pred.accept(INSTANCE);
}

private ContainsRewriter() {}

@Override
public <T extends Comparable<T>> FilterPredicate visit(Eq<T> eq) {
return eq;
}

@Override
public <T extends Comparable<T>> FilterPredicate visit(NotEq<T> notEq) {
return notEq;
}

@Override
public <T extends Comparable<T>> FilterPredicate visit(Lt<T> lt) {
return lt;
}

@Override
public <T extends Comparable<T>> FilterPredicate visit(LtEq<T> ltEq) {
return ltEq;
}

@Override
public <T extends Comparable<T>> FilterPredicate visit(Gt<T> gt) {
return gt;
}

@Override
public <T extends Comparable<T>> FilterPredicate visit(GtEq<T> gtEq) {
return gtEq;
}

@Override
public <T extends Comparable<T>> FilterPredicate visit(In<T> in) {
return in;
}

@Override
public <T extends Comparable<T>> FilterPredicate visit(NotIn<T> notIn) {
return notIn;
}

@Override
public <T extends Comparable<T>> FilterPredicate visit(Contains<T> contains) {
return contains;
}

@Override
public FilterPredicate visit(And and) {
final FilterPredicate left;
if (and.getLeft() instanceof And) {
left = visit((And) and.getLeft());
} else if (and.getLeft() instanceof Or) {
left = visit((Or) and.getLeft());
} else if (and.getLeft() instanceof Contains) {
left = and.getLeft();
} else {
return and;
}

final FilterPredicate right;
if (and.getRight() instanceof And) {
right = visit((And) and.getRight());
} else if (and.getRight() instanceof Or) {
right = visit((Or) and.getRight());
} else if (and.getRight() instanceof Contains) {
right = and.getRight();
} else {
return and;
}

if (left instanceof Contains) {
if (!(right instanceof Contains)) {
throw new UnsupportedOperationException(
"Contains predicates cannot be composed with non-Contains predicates");
}
return ((Contains) left).and(right);
} else {
return and;
}
}

@Override
public FilterPredicate visit(Or or) {
final FilterPredicate left;
if (or.getLeft() instanceof And) {
left = visit((And) or.getLeft());
} else if (or.getLeft() instanceof Or) {
left = visit((Or) or.getLeft());
} else if (or.getLeft() instanceof Contains) {
left = or.getLeft();
} else {
return or;
}

final FilterPredicate right;
if (or.getRight() instanceof And) {
right = visit((And) or.getRight());
} else if (or.getRight() instanceof Or) {
right = visit((Or) or.getRight());
} else if (or.getRight() instanceof Contains) {
right = or.getRight();
} else {
return or;
}

if (left instanceof Contains) {
if (!(right instanceof Contains)) {
throw new UnsupportedOperationException(
"Contains predicates cannot be composed with non-Contains predicates");
}
return ((Contains) left).or(right);
} else {
return or;
}
}

@Override
public FilterPredicate visit(Not not) {
throw new IllegalStateException("Not predicate should be rewritten before being evaluated by ContainsRewriter");
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> FilterPredicate visit(UserDefined<T, U> udp) {
return udp;
}

@Override
public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> FilterPredicate visit(
LogicalNotUserDefined<T, U> udp) {
return udp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.parquet.filter2.predicate.Operators.BinaryColumn;
import org.apache.parquet.filter2.predicate.Operators.BooleanColumn;
import org.apache.parquet.filter2.predicate.Operators.Column;
import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.DoubleColumn;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.FloatColumn;
Expand Down Expand Up @@ -257,6 +258,10 @@ public static <T extends Comparable<T>, C extends Column<T> & SupportsEqNotEq> N
return new NotIn<>(column, values);
}

public static <T extends Comparable<T>> Contains<T> contains(Eq<T> pred) {
return Contains.of(pred);
}

/**
* Keeps records that pass the provided {@link UserDefinedPredicate}
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.parquet.filter2.predicate;

import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
Expand Down Expand Up @@ -84,6 +85,10 @@ default <T extends Comparable<T>> R visit(NotIn<T> notIn) {
throw new UnsupportedOperationException("visit NotIn is not supported.");
}

default <T extends Comparable<T>> R visit(Contains<T> contains) {
throw new UnsupportedOperationException("visit Contains is not supported.");
}

R visit(And and);

R visit(Or or);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Objects;
import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
Expand Down Expand Up @@ -98,6 +99,11 @@ public <T extends Comparable<T>> FilterPredicate visit(NotIn<T> notIn) {
return notIn;
}

@Override
public <T extends Comparable<T>> FilterPredicate visit(Contains<T> contains) {
return contains;
}

@Override
public FilterPredicate visit(And and) {
return and(and.getLeft().accept(this), and.getRight().accept(this));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Objects;
import org.apache.parquet.filter2.predicate.FilterPredicate.Visitor;
import org.apache.parquet.filter2.predicate.Operators.And;
import org.apache.parquet.filter2.predicate.Operators.Contains;
import org.apache.parquet.filter2.predicate.Operators.Eq;
import org.apache.parquet.filter2.predicate.Operators.Gt;
import org.apache.parquet.filter2.predicate.Operators.GtEq;
Expand Down Expand Up @@ -92,6 +93,11 @@ public <T extends Comparable<T>> FilterPredicate visit(NotIn<T> notIn) {
return new In<>(notIn.getColumn(), notIn.getValues());
}

@Override
public <T extends Comparable<T>> FilterPredicate visit(Contains<T> contains) {
throw new UnsupportedOperationException("Contains not supported yet");
}

@Override
public FilterPredicate visit(And and) {
return new Or(and.getLeft().accept(this), and.getRight().accept(this));
Expand Down
Loading

0 comments on commit dab5aae

Please sign in to comment.