From 4eb14799a856807d3fdacc41e8d6b53be901d090 Mon Sep 17 00:00:00 2001 From: Andreas Schwarte Date: Fri, 6 Sep 2024 09:47:24 +0200 Subject: [PATCH 1/5] GH-5121: refactor the bind join logic into a reusable base class Refactor the existing logic for executing bind joins into a reusable base class. This change mostly moves the implementation logic from the existing ControlledWorkerBindJoin class to a new intermediate implementation (with the goal to make it reusable in a second step for left joins). Note that the new bind join implementation no longer uses the ControlledWorkerJoin as base class, i.e. the decision of which join implementation to use is moved to the strategy. For backwards code compatibility the "ControlledWorkerBoundJoin" is kept, but no longer used. Instead the new code is in ControlledWorkerBindJoin. --- .../federated/algebra/BoundJoinTupleExpr.java | 6 +- .../evaluation/FederationEvalStrategy.java | 17 +- .../SparqlFederationEvalStrategy.java | 29 +++- .../concurrent/ControlledWorkerScheduler.java | 4 +- .../join/ControlledWorkerBindJoin.java | 114 ++++++++++++ .../join/ControlledWorkerBindJoinBase.java | 163 ++++++++++++++++++ .../join/ControlledWorkerBoundJoin.java | 2 + 7 files changed, 319 insertions(+), 16 deletions(-) create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoin.java create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoinBase.java diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/BoundJoinTupleExpr.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/BoundJoinTupleExpr.java index 68b9ca6e4b0..24290ed89cd 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/BoundJoinTupleExpr.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/algebra/BoundJoinTupleExpr.java @@ -10,14 +10,14 @@ *******************************************************************************/ package org.eclipse.rdf4j.federated.algebra; -import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; /** * Marker interface indicating that instances are applicable for bound join processing (see - * {@link ControlledWorkerBoundJoin} + * {@link ControlledWorkerBindJoin} * * @author Andreas Schwarte - * @see ControlledWorkerBoundJoin + * @see ControlledWorkerBindJoin */ public interface BoundJoinTupleExpr { diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java index a3e6237dac3..197e358b791 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java @@ -15,10 +15,8 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import java.util.stream.Collectors; -import org.eclipse.rdf4j.collection.factory.api.CollectionFactory; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.EmptyIteration; import org.eclipse.rdf4j.common.iteration.SingletonIteration; @@ -56,6 +54,7 @@ import org.eclipse.rdf4j.federated.evaluation.iterator.FedXPathIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.FederatedDescribeIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin; @@ -108,12 +107,10 @@ import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException; import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedService; import org.eclipse.rdf4j.query.algebra.evaluation.federation.ServiceJoinIterator; -import org.eclipse.rdf4j.query.algebra.evaluation.impl.DefaultEvaluationStrategy; import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics; import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext; import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy; import org.eclipse.rdf4j.query.algebra.evaluation.iterator.BadlyDesignedLeftJoinIterator; -import org.eclipse.rdf4j.query.algebra.evaluation.iterator.DescribeIteration; import org.eclipse.rdf4j.query.algebra.evaluation.iterator.HashJoinIteration; import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.ConstantOptimizer; import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.DisjunctiveConstraintOptimizer; @@ -815,8 +812,14 @@ public QueryEvaluationStep prepareNaryUnion(NUnion union, QueryEvaluationContext /** * Execute the join in a separate thread using some join executor. * - * Join executors are for instance: - {@link SynchronousJoin} - {@link SynchronousBoundJoin} - - * {@link ControlledWorkerJoin} - {@link ControlledWorkerBoundJoin} + * Join executors are for instance: + * + * * * For endpoint federation use controlled worker bound join, for local federation use controlled worker join. The * other operators are there for completeness. @@ -923,7 +926,7 @@ public abstract CloseableIteration evaluateGroupedCheck( /** * Evaluate a SERVICE using vectored evaluation, taking the provided bindings as input. * - * See {@link ControlledWorkerBoundJoin} and {@link FedXConfig#getEnableServiceAsBoundJoin()} + * See {@link ControlledWorkerBindJoin} and {@link FedXConfig#getEnableServiceAsBoundJoin()} * * @param service * @param bindings diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java index 0be9600d7c1..a65465ee4d1 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java @@ -17,8 +17,10 @@ import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.common.iteration.EmptyIteration; import org.eclipse.rdf4j.federated.FederationContext; +import org.eclipse.rdf4j.federated.algebra.BoundJoinTupleExpr; import org.eclipse.rdf4j.federated.algebra.CheckStatementPattern; import org.eclipse.rdf4j.federated.algebra.ExclusiveGroup; +import org.eclipse.rdf4j.federated.algebra.FedXService; import org.eclipse.rdf4j.federated.algebra.FilterTuple; import org.eclipse.rdf4j.federated.algebra.FilterValueExpr; import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr; @@ -29,7 +31,9 @@ import org.eclipse.rdf4j.federated.evaluation.iterator.GroupedCheckConversionIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.InsertBindingsIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration; -import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; +import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase; import org.eclipse.rdf4j.federated.exception.ExceptionUtil; import org.eclipse.rdf4j.federated.exception.IllegalQueryException; import org.eclipse.rdf4j.federated.structures.QueryInfo; @@ -45,7 +49,7 @@ * Implementation of a federation evaluation strategy which provides some special optimizations for SPARQL (remote) * endpoints. The most important optimization is to used prepared SPARQL Queries that are already created using Strings. *

- * Joins are executed using {@link ControlledWorkerBoundJoin}. + * Joins are executed using {@link ControlledWorkerBindJoin}. *

*

* This implementation uses the SPARQL 1.1 VALUES operator for the bound-join evaluation @@ -173,8 +177,25 @@ public CloseableIteration executeJoin( TupleExpr rightArg, Set joinVars, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException { - ControlledWorkerBoundJoin join = new ControlledWorkerBoundJoin(joinScheduler, this, leftIter, rightArg, - bindings, queryInfo); + // determine if we can execute the expr as bind join + boolean executeAsBindJoin = false; + if (rightArg instanceof BoundJoinTupleExpr) { + if (rightArg instanceof FedXService) { + executeAsBindJoin = queryInfo.getFederationContext().getConfig().getEnableServiceAsBoundJoin(); + } else { + executeAsBindJoin = true; + } + } + + JoinExecutorBase join; + if (executeAsBindJoin) { + join = new ControlledWorkerBindJoin(joinScheduler, this, leftIter, rightArg, + bindings, queryInfo); + } else { + join = new ControlledWorkerJoin(joinScheduler, this, leftIter, rightArg, bindings, + queryInfo); + } + join.setJoinVars(joinVars); executor.execute(join); return join; diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java index f677ca46ea4..1060e86a2de 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java @@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit; import org.eclipse.rdf4j.common.iteration.CloseableIteration; -import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion; import org.eclipse.rdf4j.federated.exception.ExceptionUtil; @@ -34,7 +34,7 @@ * @author Andreas Schwarte * @see ControlledWorkerUnion * @see ControlledWorkerJoin - * @see ControlledWorkerBoundJoin + * @see ControlledWorkerBindJoin */ public class ControlledWorkerScheduler implements Scheduler, TaskWrapperAware { diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoin.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoin.java new file mode 100644 index 00000000000..17010b1e3c5 --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoin.java @@ -0,0 +1,114 @@ +/******************************************************************************* + * Copyright (c) 2019 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.join; + +import java.util.List; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.federated.algebra.CheckStatementPattern; +import org.eclipse.rdf4j.federated.algebra.FedXService; +import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr; +import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask; +import org.eclipse.rdf4j.federated.structures.QueryInfo; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.TupleExpr; + +/** + * Execution of a regular join as bind join. + * + * @author Andreas Schwarte + * @see ControlledWorkerBindJoinBase + */ +public class ControlledWorkerBindJoin extends ControlledWorkerBindJoinBase { + + public ControlledWorkerBindJoin(ControlledWorkerScheduler scheduler, FederationEvalStrategy strategy, + CloseableIteration leftIter, + TupleExpr rightArg, BindingSet bindings, QueryInfo queryInfo) + throws QueryEvaluationException { + super(scheduler, strategy, leftIter, rightArg, bindings, queryInfo); + } + + @Override + protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) { + final TaskCreator taskCreator; + if (expr instanceof StatementTupleExpr) { + StatementTupleExpr stmt = (StatementTupleExpr) expr; + if (stmt.hasFreeVarsFor(bs)) { + taskCreator = new BoundJoinTaskCreator(strategy, stmt); + } else { + expr = new CheckStatementPattern(stmt, queryInfo); + taskCreator = new CheckJoinTaskCreator(strategy, (CheckStatementPattern) expr); + } + } else if (expr instanceof FedXService) { + taskCreator = new FedXServiceJoinTaskCreator(strategy, (FedXService) expr); + } else { + throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName() + + ". Please report this problem."); + } + return taskCreator; + } + + protected class BoundJoinTaskCreator implements TaskCreator { + protected final FederationEvalStrategy _strategy; + protected final StatementTupleExpr _expr; + + public BoundJoinTaskCreator( + FederationEvalStrategy strategy, StatementTupleExpr expr) { + super(); + _strategy = strategy; + _expr = expr; + } + + @Override + public ParallelTask getTask(ParallelExecutor control, List bindings) { + return new ParallelBoundJoinTask(control, _strategy, _expr, bindings); + } + } + + protected class CheckJoinTaskCreator implements TaskCreator { + protected final FederationEvalStrategy _strategy; + protected final CheckStatementPattern _expr; + + public CheckJoinTaskCreator( + FederationEvalStrategy strategy, CheckStatementPattern expr) { + super(); + _strategy = strategy; + _expr = expr; + } + + @Override + public ParallelTask getTask(ParallelExecutor control, List bindings) { + return new ParallelCheckJoinTask(control, _strategy, _expr, bindings); + } + } + + protected class FedXServiceJoinTaskCreator implements TaskCreator { + protected final FederationEvalStrategy _strategy; + protected final FedXService _expr; + + public FedXServiceJoinTaskCreator( + FederationEvalStrategy strategy, FedXService expr) { + super(); + _strategy = strategy; + _expr = expr; + } + + @Override + public ParallelTask getTask(ParallelExecutor control, List bindings) { + return new ParallelServiceJoinTask(control, _strategy, _expr, bindings); + } + } + +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoinBase.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoinBase.java new file mode 100644 index 00000000000..d00477a2adf --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindJoinBase.java @@ -0,0 +1,163 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.join; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.federated.algebra.BoundJoinTupleExpr; +import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask; +import org.eclipse.rdf4j.federated.structures.QueryInfo; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.TupleExpr; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Base class for executing joins as bind joins (i.e., the bindings of a block are injected in the SPARQL query as + * VALUES clause). + * + * The number of concurrent threads is controlled by a {@link ControlledWorkerScheduler} which works according to the + * FIFO principle and uses worker threads. + * + * This join cursor blocks until all scheduled tasks are finished, however the result iteration can be accessed from + * different threads to allow for pipelining. + * + * @author Andreas Schwarte + * + */ +public abstract class ControlledWorkerBindJoinBase extends JoinExecutorBase { + + private static final Logger log = LoggerFactory.getLogger(ControlledWorkerBindJoinBase.class); + + protected final ControlledWorkerScheduler scheduler; + + protected final Phaser phaser = new Phaser(1); + + public ControlledWorkerBindJoinBase(ControlledWorkerScheduler scheduler, + FederationEvalStrategy strategy, + CloseableIteration leftIter, + TupleExpr rightArg, BindingSet bindings, QueryInfo queryInfo) + throws QueryEvaluationException { + super(strategy, leftIter, rightArg, bindings, queryInfo); + this.scheduler = scheduler; + } + + @Override + protected void handleBindings() throws Exception { + if (!(rightArg instanceof BoundJoinTupleExpr)) { + String msg = "Right argument is not an applicable expression for bind joins. Was: " + + rightArg.getClass().getCanonicalName(); + log.debug(msg); + throw new QueryEvaluationException(msg); + } + + int nBindingsCfg = this.queryInfo.getFederationContext().getConfig().getBoundJoinBlockSize(); + int totalBindings = 0; // the total number of bindings + TupleExpr expr = rightArg; + + TaskCreator taskCreator = null; + Phaser currentPhaser = phaser; + + int nBindings; + List bindings; + while (!isClosed() && leftIter.hasNext()) { + + // create a new phaser if there are more than 10000 parties + // note: a phaser supports only up to 65535 registered parties + if (currentPhaser.getRegisteredParties() >= 10000) { + currentPhaser = new Phaser(currentPhaser); + } + + // determine the bind join block size + nBindings = getNextBindJoinSize(nBindingsCfg, totalBindings); + + bindings = new ArrayList<>(nBindings); + + int count = 0; + while (!isClosed() && count < nBindings && leftIter.hasNext()) { + var bs = leftIter.next(); + if (taskCreator == null) { + taskCreator = determineTaskCreator(expr, bs); + } + bindings.add(bs); + count++; + } + + totalBindings += count; + + currentPhaser.register(); + scheduler.schedule(taskCreator.getTask(new PhaserHandlingParallelExecutor(this, currentPhaser), bindings)); + } + + leftIter.close(); + + scheduler.informFinish(this); + + if (log.isDebugEnabled()) { + log.debug("JoinStats: left iter of " + getDisplayId() + " had " + totalBindings + " results."); + } + + phaser.awaitAdvanceInterruptibly(phaser.arrive(), queryInfo.getMaxRemainingTimeMS(), TimeUnit.MILLISECONDS); + } + + @Override + public void handleClose() throws QueryEvaluationException { + try { + super.handleClose(); + } finally { + // signal the phaser to close (if currently being blocked) + phaser.forceTermination(); + } + } + + /** + * Return the {@link TaskCreator} for executing the bind join + * + * @param expr + * @param bs + * @return + */ + protected abstract TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs); + + /** + * Return the size of the next bind join block. + * + * @param configuredBindJoinSize the configured bind join size + * @param totalBindings the current process bindings from the intermediate result set + * @return + */ + protected int getNextBindJoinSize(int configuredBindJoinSize, int totalBindings) { + + /* + * XXX idea: + * + * make nBindings dependent on the number of intermediate results of the left argument. + * + * If many intermediate results, increase the number of bindings. This will result in less remote SPARQL + * requests. + * + */ + + return configuredBindJoinSize; + } + + protected interface TaskCreator { + ParallelTask getTask(ParallelExecutor control, List bindings); + } +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBoundJoin.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBoundJoin.java index db939175e6f..062be55ab75 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBoundJoin.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBoundJoin.java @@ -44,7 +44,9 @@ * * @author Andreas Schwarte * + * @deprecated replaced with {@link ControlledWorkerBindJoin}l */ +@Deprecated(forRemoval = true) public class ControlledWorkerBoundJoin extends ControlledWorkerJoin { private static final Logger log = LoggerFactory.getLogger(ControlledWorkerBoundJoin.class); From 2d86cd3f170b5d225c5d1d832d013958c335caef Mon Sep 17 00:00:00 2001 From: Andreas Schwarte Date: Fri, 6 Sep 2024 10:53:26 +0200 Subject: [PATCH 2/5] GH-5121: prepare execution of left joins in the federation strategy Prepare to execute a specific implementation of a left join implementation through the federation strategy. --- .../evaluation/FederationEvalStrategy.java | 22 ++++++++++++++----- .../SailFederationEvalStrategy.java | 12 ++++++++++ .../SparqlFederationEvalStrategy.java | 12 ++++++++++ 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java index 197e358b791..56693c02776 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java @@ -57,7 +57,6 @@ import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; -import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin; import org.eclipse.rdf4j.federated.evaluation.join.SynchronousBoundJoin; import org.eclipse.rdf4j.federated.evaluation.join.SynchronousJoin; import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion; @@ -96,6 +95,7 @@ import org.eclipse.rdf4j.query.QueryEvaluationException; import org.eclipse.rdf4j.query.algebra.DescribeOperator; import org.eclipse.rdf4j.query.algebra.Join; +import org.eclipse.rdf4j.query.algebra.LeftJoin; import org.eclipse.rdf4j.query.algebra.QueryRoot; import org.eclipse.rdf4j.query.algebra.Service; import org.eclipse.rdf4j.query.algebra.StatementPattern; @@ -745,10 +745,7 @@ public CloseableIteration evaluate(BindingSet bindings) { if (problemVars.containsAll(bindings.getBindingNames())) { var leftIter = leftPrepared.evaluate(bindings); - ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(scheduler, FederationEvalStrategy.this, - leftIter, leftJoin, bindings, leftJoin.getQueryInfo()); - executor.execute(join); - return join; + return executeLeftJoin(scheduler, leftIter, leftJoin, bindings, leftJoin.getQueryInfo()); } else { Set problemVarsClone = new HashSet<>(problemVars); problemVarsClone.retainAll(bindings.getBindingNames()); @@ -839,6 +836,21 @@ protected abstract CloseableIteration executeJoin( CloseableIteration leftIter, TupleExpr rightArg, Set joinVariables, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException; + /** + * Execute the left join in a separate thread using some join executor. + * + * @param joinScheduler + * @param leftIter + * @param leftJoin + * @param bindings + * @return the result + * @throws QueryEvaluationException + */ + protected abstract CloseableIteration executeLeftJoin( + ControlledWorkerScheduler joinScheduler, + CloseableIteration leftIter, LeftJoin leftJoin, + BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException; + public abstract CloseableIteration evaluateExclusiveGroup( ExclusiveGroup group, BindingSet bindings) throws RepositoryException, MalformedQueryException, QueryEvaluationException; diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SailFederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SailFederationEvalStrategy.java index dbe6cf8bfcc..9315a4b8feb 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SailFederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SailFederationEvalStrategy.java @@ -27,11 +27,13 @@ import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.GroupedCheckConversionIteration; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin; import org.eclipse.rdf4j.federated.structures.QueryInfo; import org.eclipse.rdf4j.federated.util.QueryAlgebraUtil; import org.eclipse.rdf4j.query.BindingSet; import org.eclipse.rdf4j.query.MalformedQueryException; import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.LeftJoin; import org.eclipse.rdf4j.query.algebra.StatementPattern; import org.eclipse.rdf4j.query.algebra.TupleExpr; import org.eclipse.rdf4j.repository.RepositoryException; @@ -119,6 +121,16 @@ public CloseableIteration executeJoin( return join; } + @Override + protected CloseableIteration executeLeftJoin(ControlledWorkerScheduler joinScheduler, + CloseableIteration leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo) + throws QueryEvaluationException { + ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this, + leftIter, leftJoin, bindings, queryInfo); + executor.execute(join); + return join; + } + @Override public CloseableIteration evaluateExclusiveGroup( ExclusiveGroup group, BindingSet bindings) diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java index a65465ee4d1..234614402a3 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java @@ -33,6 +33,7 @@ import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin; import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase; import org.eclipse.rdf4j.federated.exception.ExceptionUtil; import org.eclipse.rdf4j.federated.exception.IllegalQueryException; @@ -41,6 +42,7 @@ import org.eclipse.rdf4j.query.BindingSet; import org.eclipse.rdf4j.query.MalformedQueryException; import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.LeftJoin; import org.eclipse.rdf4j.query.algebra.StatementPattern; import org.eclipse.rdf4j.query.algebra.TupleExpr; import org.eclipse.rdf4j.repository.RepositoryException; @@ -201,6 +203,16 @@ public CloseableIteration executeJoin( return join; } + @Override + protected CloseableIteration executeLeftJoin(ControlledWorkerScheduler joinScheduler, + CloseableIteration leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo) + throws QueryEvaluationException { + ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this, + leftIter, leftJoin, bindings, queryInfo); + executor.execute(join); + return join; + } + @Override public CloseableIteration evaluateExclusiveGroup( ExclusiveGroup group, BindingSet bindings) throws RepositoryException, From 558a5952b9161ce97701914d1a6daebc6291f5e2 Mon Sep 17 00:00:00 2001 From: Andreas Schwarte Date: Fri, 6 Sep 2024 14:11:17 +0200 Subject: [PATCH 3/5] GH-5121: implementation of left bind join operator This change provides the implementation and activation for the left bind join operator. The algorithm is as follows: - execute left bind join using regular bound join query - process result iteration similar to BoundJoinVALUESConversionIteration - remember seen set of bindings (using index) and add original bindings to those, i.e. put to result return all non-seen bindings directly from the input Note that the terminology in literature has changed to "bind joins". Hence, for new classes and methods I try to follow that. Change is covered with some unit tests --- .../evaluation/FederationEvalStrategy.java | 57 +++++ .../SparqlFederationEvalStrategy.java | 25 +- .../iterator/BindLeftJoinIteration.java | 99 ++++++++ .../join/ControlledWorkerBindLeftJoin.java | 70 ++++++ .../join/ParallelBindLeftJoinTask.java | 53 +++++ .../rdf4j/federated/BindLeftJoinTests.java | 225 ++++++++++++++++++ 6 files changed, 527 insertions(+), 2 deletions(-) create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BindLeftJoinIteration.java create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelBindLeftJoinTask.java create mode 100644 tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java index 56693c02776..561a00bacbc 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java @@ -37,6 +37,7 @@ import org.eclipse.rdf4j.federated.algebra.FedXZeroLengthPath; import org.eclipse.rdf4j.federated.algebra.FederatedDescribeOperator; import org.eclipse.rdf4j.federated.algebra.FilterExpr; +import org.eclipse.rdf4j.federated.algebra.FilterTuple; import org.eclipse.rdf4j.federated.algebra.FilterValueExpr; import org.eclipse.rdf4j.federated.algebra.HolderNode; import org.eclipse.rdf4j.federated.algebra.NJoin; @@ -51,8 +52,10 @@ import org.eclipse.rdf4j.federated.endpoint.Endpoint; import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelServiceExecutor; +import org.eclipse.rdf4j.federated.evaluation.iterator.BindLeftJoinIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.FedXPathIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.FederatedDescribeIteration; +import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin; @@ -66,6 +69,7 @@ import org.eclipse.rdf4j.federated.evaluation.union.ParallelUnionOperatorTask; import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion; import org.eclipse.rdf4j.federated.evaluation.union.WorkerUnionBase; +import org.eclipse.rdf4j.federated.exception.ExceptionUtil; import org.eclipse.rdf4j.federated.exception.FedXRuntimeException; import org.eclipse.rdf4j.federated.exception.IllegalQueryException; import org.eclipse.rdf4j.federated.optimizer.DefaultFedXCostModel; @@ -935,6 +939,59 @@ public abstract CloseableIteration evaluateBoundJoinStatementPattern public abstract CloseableIteration evaluateGroupedCheck( CheckStatementPattern stmt, final List bindings) throws QueryEvaluationException; + /** + * Evaluate the left bind join for the given {@link StatementTupleExpr} and bindings at the relevant endpoints. + * + * @param stmt + * @param bindings + * @return the result iteration + * @throws QueryEvaluationException + * @see {@link BindLeftJoinIteration} + */ + public CloseableIteration evaluateLeftBoundJoinStatementPattern( + StatementTupleExpr stmt, final List bindings) throws QueryEvaluationException { + // we can omit the bound join handling + if (bindings.size() == 1) { + return evaluate(stmt, bindings.get(0)); + } + + FilterValueExpr filterExpr = null; + if (stmt instanceof FilterTuple) { + filterExpr = ((FilterTuple) stmt).getFilterExpr(); + } + + AtomicBoolean isEvaluated = new AtomicBoolean(false); + String preparedQuery = QueryStringUtil.selectQueryStringBoundJoinVALUES((StatementPattern) stmt, bindings, + filterExpr, isEvaluated, stmt.getQueryInfo().getDataset()); + + CloseableIteration result = null; + try { + result = evaluateAtStatementSources(preparedQuery, stmt.getStatementSources(), stmt.getQueryInfo()); + + // apply filter and/or convert to original bindings + if (filterExpr != null && !isEvaluated.get()) { + result = new BindLeftJoinIteration(result, bindings); // apply conversion + result = new FilteringIteration(filterExpr, result, this); // apply filter + if (!result.hasNext()) { + result.close(); + return new EmptyIteration<>(); + } + } else { + result = new BindLeftJoinIteration(result, bindings); + } + + return result; + } catch (Throwable t) { + if (result != null) { + result.close(); + } + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw ExceptionUtil.toQueryEvaluationException(t); + } + } + /** * Evaluate a SERVICE using vectored evaluation, taking the provided bindings as input. * diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java index 234614402a3..2018c762185 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java @@ -32,6 +32,7 @@ import org.eclipse.rdf4j.federated.evaluation.iterator.InsertBindingsIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin; import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase; @@ -207,8 +208,28 @@ public CloseableIteration executeJoin( protected CloseableIteration executeLeftJoin(ControlledWorkerScheduler joinScheduler, CloseableIteration leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException { - ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this, - leftIter, leftJoin, bindings, queryInfo); + + var rightArg = leftJoin.getRightArg(); + + // determine if we can execute the expr as bind join + boolean executeAsBindJoin = false; + if (rightArg instanceof BoundJoinTupleExpr) { + if (rightArg instanceof FedXService) { + executeAsBindJoin = false; + } else { + executeAsBindJoin = true; + } + } + + JoinExecutorBase join; + if (executeAsBindJoin) { + join = new ControlledWorkerBindLeftJoin(joinScheduler, this, leftIter, rightArg, + bindings, queryInfo); + } else { + join = new ControlledWorkerLeftJoin(joinScheduler, this, + leftIter, leftJoin, bindings, queryInfo); + } + executor.execute(join); return join; } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BindLeftJoinIteration.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BindLeftJoinIteration.java new file mode 100644 index 00000000000..4b220db24b7 --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BindLeftJoinIteration.java @@ -0,0 +1,99 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.iterator; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.common.iteration.LookAheadIteration; +import org.eclipse.rdf4j.query.Binding; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet; + +/** + * A {@link LookAheadIteration} for processing bind left join results (i.e., result of joining OPTIONAL clauses) + * + * Algorithm: + * + *