diff --git a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ast/planNodes/FilterByPredicate.java b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ast/planNodes/FilterByPredicate.java index 8c5b49b021..99e1d6837f 100644 --- a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ast/planNodes/FilterByPredicate.java +++ b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ast/planNodes/FilterByPredicate.java @@ -36,6 +36,7 @@ public class FilterByPredicate implements PlanNode { private final Set filterOnPredicates; final PlanNode parent; private final On on; + private final ConnectionsGroup connectionsGroup; private boolean printed = false; private ValidationExecutionLogger validationExecutionLogger; private final Resource[] dataGraph; @@ -53,6 +54,7 @@ public FilterByPredicate(SailConnection connection, Set filterOnPredicates, assert this.connection != null; this.filterOnPredicates = filterOnPredicates; this.on = on; + this.connectionsGroup = connectionsGroup; } @Override @@ -77,18 +79,10 @@ void calculateNext() { return; } - filterOnPredicates = FilterByPredicate.this.filterOnPredicates.stream() - .map(predicate -> { - try (var stream = connection - .getStatements(null, predicate, null, true, dataGraph) - .stream()) { - return stream.map(Statement::getPredicate) - .findAny() - .orElse(null); - } - } - ) - .filter(Objects::nonNull) + filterOnPredicates = FilterByPredicate.this.filterOnPredicates + .stream() + .map(iri -> connectionsGroup.intern(connection, iri, + ConnectionsGroup.StatementPosition.predicate)) .collect(Collectors.toList()); } diff --git a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ast/planNodes/FilterByPredicateObject.java b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ast/planNodes/FilterByPredicateObject.java index 9a44baba19..e4bbb745f1 100644 --- a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ast/planNodes/FilterByPredicateObject.java +++ b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/ast/planNodes/FilterByPredicateObject.java @@ -20,8 +20,8 @@ import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.model.IRI; import org.eclipse.rdf4j.model.Resource; -import org.eclipse.rdf4j.model.Statement; import org.eclipse.rdf4j.model.Value; +import org.eclipse.rdf4j.model.vocabulary.RDF; import org.eclipse.rdf4j.sail.SailConnection; import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.memory.MemoryStoreConnection; @@ -42,10 +42,12 @@ public class FilterByPredicateObject implements PlanNode { private final FilterOn filterOn; private final PlanNode parent; private final boolean returnMatching; + private final ConnectionsGroup connectionsGroup; private StackTraceElement[] stackTrace; private boolean printed = false; private ValidationExecutionLogger validationExecutionLogger; private final Resource[] dataGraph; + boolean typeFilter; private final Cache cache; @@ -68,6 +70,10 @@ public FilterByPredicateObject(SailConnection connection, Resource[] dataGraph, cache = CacheBuilder.newBuilder().maximumSize(10000).build(); } + this.connectionsGroup = connectionsGroup; + if (RDF.TYPE.equals(filterOnPredicate)) { + typeFilter = true; + } // this.stackTrace = Thread.currentThread().getStackTrace(); } @@ -148,31 +154,29 @@ void calculateNext() { private void internResources() { if (filterOnObject == null) { - - try (var stream = connection - .getStatements(null, FilterByPredicateObject.this.filterOnPredicate, null, includeInferred, - dataGraph) - .stream()) { - filterOnPredicate = stream.map(Statement::getPredicate).findAny().orElse(null); - } - + filterOnPredicate = connectionsGroup.intern(connection, + FilterByPredicateObject.this.filterOnPredicate, + ConnectionsGroup.StatementPosition.predicate); if (filterOnPredicate == null) { filterOnObject = new Resource[0]; } else { - filterOnObject = FilterByPredicateObject.this.filterOnObject.stream() - .map(object -> { - try (var stream = connection - .getStatements(null, filterOnPredicate, object, includeInferred, dataGraph) - .stream()) { - return stream.map(Statement::getObject) - .map(o -> ((Resource) o)) - .findAny() - .orElse(null); - } - } - ) - .filter(Objects::nonNull) - .toArray(Resource[]::new); + if (typeFilter && includeInferred) { + filterOnObject = FilterByPredicateObject.this.filterOnObject.stream() + .flatMap(type -> connectionsGroup.getRdfsSubClassOfReasoner() + .backwardsChain(type) + .stream()) + .distinct() + .map(object -> connectionsGroup.intern(connection, object, + ConnectionsGroup.StatementPosition.object)) + .filter(Objects::nonNull) + .toArray(Resource[]::new); + } else { + filterOnObject = FilterByPredicateObject.this.filterOnObject.stream() + .map(object -> connectionsGroup.intern(connection, object, + ConnectionsGroup.StatementPosition.object)) + .filter(Objects::nonNull) + .toArray(Resource[]::new); + } } } @@ -237,8 +241,8 @@ private Boolean matchesCached(Resource subject, IRI filterOnPredicate, Resource[ private boolean matchesUnCached(Resource subject, IRI filterOnPredicate, Resource[] filterOnObject) { for (Resource object : filterOnObject) { - if (connection.hasStatement(subject, filterOnPredicate, object, includeInferred, - dataGraph)) { + if (connection.hasStatement(subject, filterOnPredicate, object, + includeInferred && !typeFilter, dataGraph)) { return true; } } diff --git a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/wrapper/data/ConnectionsGroup.java b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/wrapper/data/ConnectionsGroup.java index 38461cb1ed..5074df6ac1 100644 --- a/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/wrapper/data/ConnectionsGroup.java +++ b/core/sail/shacl/src/main/java/org/eclipse/rdf4j/sail/shacl/wrapper/data/ConnectionsGroup.java @@ -13,11 +13,18 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import org.eclipse.rdf4j.common.annotation.InternalUseOnly; import org.eclipse.rdf4j.common.transaction.IsolationLevels; +import org.eclipse.rdf4j.model.IRI; +import org.eclipse.rdf4j.model.Resource; +import org.eclipse.rdf4j.model.Statement; +import org.eclipse.rdf4j.model.Value; +import org.eclipse.rdf4j.model.util.Values; import org.eclipse.rdf4j.sail.Sail; import org.eclipse.rdf4j.sail.SailConnection; +import org.eclipse.rdf4j.sail.SailException; import org.eclipse.rdf4j.sail.shacl.ShaclSailConnection; import org.eclipse.rdf4j.sail.shacl.Stats; import org.eclipse.rdf4j.sail.shacl.ast.planNodes.BufferedSplitter; @@ -27,6 +34,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + /** * @apiNote since 3.0. This feature is for internal use only: its existence, signature or behavior may change without * warning from one release to the next. @@ -52,6 +62,14 @@ public class ConnectionsGroup implements AutoCloseable { // used to cache Select plan nodes so that we don't query a store for the same data during the same validation step. private final Map nodeCache = new ConcurrentHashMap<>(); + private final static Value NULL_VALUE = Values.bnode(); + + private final Cache INTERNED_VALUE_CACHE = CacheBuilder.newBuilder() + .concurrencyLevel(Runtime.getRuntime().availableProcessors() * 2) + .maximumSize(10000) + + .build(); + public ConnectionsGroup(SailConnection baseConnection, SailConnection previousStateConnection, Sail addedStatements, Sail removedStatements, Stats stats, RdfsSubClassOfReasonerProvider rdfsSubClassOfReasonerProvider, @@ -95,6 +113,55 @@ public SailConnection getRemovedStatements() { return removedStatements; } + public enum StatementPosition { + subject, + predicate, + object + } + + public T intern(SailConnection connection, T value, StatementPosition statementPosition) { + try { + + Value t = INTERNED_VALUE_CACHE.get(value, () -> { + + switch (statementPosition) { + case subject: + try (var statements = connection.getStatements(((Resource) value), null, null, false).stream()) { + Resource ret = statements.map(Statement::getSubject).findAny().orElse(null); + if (ret == null) { + return value; + } + return ret; + } + case predicate: + try (var statements = connection.getStatements(null, ((IRI) value), null, false).stream()) { + IRI ret = statements.map(Statement::getPredicate).findAny().orElse(null); + if (ret == null) { + return value; + } + return ret; + } + case object: + try (var statements = connection.getStatements(null, null, value, false).stream()) { + Value ret = statements.map(Statement::getObject).findAny().orElse(null); + if (ret == null) { + return value; + } + return ret; + } + } + + throw new IllegalStateException("Unknown statement position: " + statementPosition); + + }); + if (t == NULL_VALUE) + return null; + return ((T) t); + } catch (ExecutionException e) { + throw new SailException(e); + } + } + @Override public void close() { if (addedStatements != null) {