Skip to content

Commit

Permalink
Gh 5121 - bind left join support in FedX
Browse files Browse the repository at this point in the history
  • Loading branch information
aschwarte10 authored Oct 23, 2024
2 parents 5d25e15 + 01bc075 commit eefa338
Show file tree
Hide file tree
Showing 19 changed files with 1,002 additions and 39 deletions.
3 changes: 2 additions & 1 deletion site/content/documentation/programming/federation.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ FedX provides various means for configuration. Configuration settings can be def
|leftJoinWorkerThreads | The number of left join worker threads for parallelization, default _10_ |
|boundJoinBlockSize | Block size for bound joins, default _25_ |
|enforceMaxQueryTime | Max query time in seconds, 0 to disable, default _30_ |
|enableServiceAsBoundJoin | Flag for evaluating a SERVICE expression (contacting non-federation members) using vectored evaluation, default _true_. For today's endpoints it is more efficient to disable vectored evaluation of SERVICE |
|enableServiceAsBoundJoin | Flag for evaluating a SERVICE expression (contacting non-federation members) using vectored evaluation, default _true_. |
|enableOptionalAsBindJoin | Flag for evaluating an OPTIONAL expression using bind join, default _true_. |
|includeInferredDefault | whether include inferred statements should be considered, default _true_ |
|consumingIterationMax | the max number of results to be consumed by `ConsumingIteration`, default _1000_ |
|debugQueryPlan | Print the optimized query execution plan to stdout, default _false_ |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Optional;

import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory;
import org.eclipse.rdf4j.federated.cache.SourceSelectionCache;
import org.eclipse.rdf4j.federated.cache.SourceSelectionCacheFactory;
import org.eclipse.rdf4j.federated.cache.SourceSelectionMemoryCache;
Expand Down Expand Up @@ -48,6 +47,8 @@ public class FedXConfig {

private boolean enableServiceAsBoundJoin = true;

private boolean enableOptionalAsBindJoin = true;

private boolean enableMonitoring = false;

private boolean isLogQueryPlan = false;
Expand All @@ -68,7 +69,6 @@ public class FedXConfig {

private int consumingIterationMax = 1000;

private CollectionFactory cf = new DefaultCollectionFactory();
/* factory like setters */

/**
Expand Down Expand Up @@ -244,6 +244,17 @@ public FedXConfig withEnableServiceAsBoundJoin(boolean flag) {
return this;
}

/**
* Whether OPTIONAL clauses are evaluated using bind join (i.e. with the VALUES clause). Default <i>true</i>
*
* @param flag
* @return the current config.
*/
public FedXConfig withEnableOptionalAsBindJoin(boolean flag) {
this.enableOptionalAsBindJoin = flag;
return this;
}

/**
* The cache specification for the {@link SourceSelectionMemoryCache}. If not set explicitly, the
* {@link SourceSelectionMemoryCache#DEFAULT_CACHE_SPEC} is used.
Expand Down Expand Up @@ -326,16 +337,26 @@ public int getBoundJoinBlockSize() {
* Returns a flag indicating whether vectored evaluation using the VALUES clause shall be applied for SERVICE
* expressions.
*
* Default: false
* Default: true
*
* Note: for todays endpoints it is more efficient to disable vectored evaluation of SERVICE.
*
* @return whether SERVICE expressions are evaluated using bound joins
* @return whether SERVICE expressions are evaluated using bind joins
*/
public boolean getEnableServiceAsBoundJoin() {
return enableServiceAsBoundJoin;
}

/**
* Returns a flag indicating whether bind join evaluation using the VALUES clause shall be applied for OPTIONAL
* expressions.
*
* Default: true
*
* @return whether OPTIONAL expressions are evaluated using bind joins
*/
public boolean isEnableOptionalAsBindJoin() {
return enableOptionalAsBindJoin;
}

/**
* Get the maximum query time in seconds used for query evaluation. Applied if {@link QueryManager} is used to
* create queries.
Expand Down Expand Up @@ -485,9 +506,10 @@ public int getConsumingIterationMax() {
*
* @param cf
* @return the current config
* @deprecated unusedO
*/
@Deprecated(forRemoval = true)
public FedXConfig withCollectionFactory(CollectionFactory cf) {
this.cf = cf;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
*******************************************************************************/
package org.eclipse.rdf4j.federated.algebra;

import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;

/**
* Marker interface indicating that instances are applicable for bound join processing (see
* {@link ControlledWorkerBoundJoin}
* {@link ControlledWorkerBindJoin}
*
* @author Andreas Schwarte
* @see ControlledWorkerBoundJoin
* @see ControlledWorkerBindJoin
*/
public interface BoundJoinTupleExpr {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.EmptyIteration;
import org.eclipse.rdf4j.common.iteration.SingletonIteration;
Expand All @@ -39,6 +37,7 @@
import org.eclipse.rdf4j.federated.algebra.FedXZeroLengthPath;
import org.eclipse.rdf4j.federated.algebra.FederatedDescribeOperator;
import org.eclipse.rdf4j.federated.algebra.FilterExpr;
import org.eclipse.rdf4j.federated.algebra.FilterTuple;
import org.eclipse.rdf4j.federated.algebra.FilterValueExpr;
import org.eclipse.rdf4j.federated.algebra.HolderNode;
import org.eclipse.rdf4j.federated.algebra.NJoin;
Expand All @@ -53,12 +52,14 @@
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelServiceExecutor;
import org.eclipse.rdf4j.federated.evaluation.iterator.BindLeftJoinIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.FedXPathIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.FederatedDescribeIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
import org.eclipse.rdf4j.federated.evaluation.join.SynchronousBoundJoin;
import org.eclipse.rdf4j.federated.evaluation.join.SynchronousJoin;
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
Expand All @@ -68,6 +69,7 @@
import org.eclipse.rdf4j.federated.evaluation.union.ParallelUnionOperatorTask;
import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion;
import org.eclipse.rdf4j.federated.evaluation.union.WorkerUnionBase;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
import org.eclipse.rdf4j.federated.exception.IllegalQueryException;
import org.eclipse.rdf4j.federated.optimizer.DefaultFedXCostModel;
Expand Down Expand Up @@ -97,6 +99,7 @@
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.DescribeOperator;
import org.eclipse.rdf4j.query.algebra.Join;
import org.eclipse.rdf4j.query.algebra.LeftJoin;
import org.eclipse.rdf4j.query.algebra.QueryRoot;
import org.eclipse.rdf4j.query.algebra.Service;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
Expand All @@ -108,12 +111,10 @@
import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedService;
import org.eclipse.rdf4j.query.algebra.evaluation.federation.ServiceJoinIterator;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.DefaultEvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.EvaluationStatistics;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.QueryEvaluationContext;
import org.eclipse.rdf4j.query.algebra.evaluation.impl.StrictEvaluationStrategy;
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.BadlyDesignedLeftJoinIterator;
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.DescribeIteration;
import org.eclipse.rdf4j.query.algebra.evaluation.iterator.HashJoinIteration;
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.ConstantOptimizer;
import org.eclipse.rdf4j.query.algebra.evaluation.optimizer.DisjunctiveConstraintOptimizer;
Expand Down Expand Up @@ -748,10 +749,7 @@ public CloseableIteration<BindingSet> evaluate(BindingSet bindings) {

if (problemVars.containsAll(bindings.getBindingNames())) {
var leftIter = leftPrepared.evaluate(bindings);
ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(scheduler, FederationEvalStrategy.this,
leftIter, leftJoin, bindings, leftJoin.getQueryInfo());
executor.execute(join);
return join;
return executeLeftJoin(scheduler, leftIter, leftJoin, bindings, leftJoin.getQueryInfo());
} else {
Set<String> problemVarsClone = new HashSet<>(problemVars);
problemVarsClone.retainAll(bindings.getBindingNames());
Expand Down Expand Up @@ -815,8 +813,14 @@ public QueryEvaluationStep prepareNaryUnion(NUnion union, QueryEvaluationContext
/**
* Execute the join in a separate thread using some join executor.
*
* Join executors are for instance: - {@link SynchronousJoin} - {@link SynchronousBoundJoin} -
* {@link ControlledWorkerJoin} - {@link ControlledWorkerBoundJoin}
* Join executors are for instance:
*
* <ul>
* <li>{@link SynchronousJoin}</li>
* <li>{@link SynchronousBoundJoin}</li>
* <li>{@link ControlledWorkerJoin}</li>
* <li>{@link ControlledWorkerBindJoin}</li>
* </ul>
*
* For endpoint federation use controlled worker bound join, for local federation use controlled worker join. The
* other operators are there for completeness.
Expand All @@ -836,6 +840,21 @@ protected abstract CloseableIteration<BindingSet> executeJoin(
CloseableIteration<BindingSet> leftIter, TupleExpr rightArg,
Set<String> joinVariables, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException;

/**
* Execute the left join in a separate thread using some join executor.
*
* @param joinScheduler
* @param leftIter
* @param leftJoin
* @param bindings
* @return the result
* @throws QueryEvaluationException
*/
protected abstract CloseableIteration<BindingSet> executeLeftJoin(
ControlledWorkerScheduler<BindingSet> joinScheduler,
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin,
BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException;

public abstract CloseableIteration<BindingSet> evaluateExclusiveGroup(
ExclusiveGroup group, BindingSet bindings)
throws RepositoryException, MalformedQueryException, QueryEvaluationException;
Expand Down Expand Up @@ -920,10 +939,63 @@ public abstract CloseableIteration<BindingSet> evaluateBoundJoinStatementPattern
public abstract CloseableIteration<BindingSet> evaluateGroupedCheck(
CheckStatementPattern stmt, final List<BindingSet> bindings) throws QueryEvaluationException;

/**
* Evaluate the left bind join for the given {@link StatementTupleExpr} and bindings at the relevant endpoints.
*
* @param stmt
* @param bindings
* @return the result iteration
* @throws QueryEvaluationException
* @see {@link BindLeftJoinIteration}
*/
public CloseableIteration<BindingSet> evaluateLeftBoundJoinStatementPattern(
StatementTupleExpr stmt, final List<BindingSet> bindings) throws QueryEvaluationException {
// we can omit the bound join handling
if (bindings.size() == 1) {
return evaluate(stmt, bindings.get(0));
}

FilterValueExpr filterExpr = null;
if (stmt instanceof FilterTuple) {
filterExpr = ((FilterTuple) stmt).getFilterExpr();
}

AtomicBoolean isEvaluated = new AtomicBoolean(false);
String preparedQuery = QueryStringUtil.selectQueryStringBoundJoinVALUES((StatementPattern) stmt, bindings,
filterExpr, isEvaluated, stmt.getQueryInfo().getDataset());

CloseableIteration<BindingSet> result = null;
try {
result = evaluateAtStatementSources(preparedQuery, stmt.getStatementSources(), stmt.getQueryInfo());

// apply filter and/or convert to original bindings
if (filterExpr != null && !isEvaluated.get()) {
result = new BindLeftJoinIteration(result, bindings); // apply conversion
result = new FilteringIteration(filterExpr, result, this); // apply filter
if (!result.hasNext()) {
result.close();
return new EmptyIteration<>();
}
} else {
result = new BindLeftJoinIteration(result, bindings);
}

return result;
} catch (Throwable t) {
if (result != null) {
result.close();
}
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw ExceptionUtil.toQueryEvaluationException(t);
}
}

/**
* Evaluate a SERVICE using vectored evaluation, taking the provided bindings as input.
*
* See {@link ControlledWorkerBoundJoin} and {@link FedXConfig#getEnableServiceAsBoundJoin()}
* See {@link ControlledWorkerBindJoin} and {@link FedXConfig#getEnableServiceAsBoundJoin()}
*
* @param service
* @param bindings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.GroupedCheckConversionIteration;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
import org.eclipse.rdf4j.federated.structures.QueryInfo;
import org.eclipse.rdf4j.federated.util.QueryAlgebraUtil;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.LeftJoin;
import org.eclipse.rdf4j.query.algebra.StatementPattern;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.repository.RepositoryException;
Expand Down Expand Up @@ -119,6 +121,16 @@ public CloseableIteration<BindingSet> executeJoin(
return join;
}

@Override
protected CloseableIteration<BindingSet> executeLeftJoin(ControlledWorkerScheduler<BindingSet> joinScheduler,
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo)
throws QueryEvaluationException {
ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this,
leftIter, leftJoin, bindings, queryInfo);
executor.execute(join);
return join;
}

@Override
public CloseableIteration<BindingSet> evaluateExclusiveGroup(
ExclusiveGroup group, BindingSet bindings)
Expand Down
Loading

0 comments on commit eefa338

Please sign in to comment.