Skip to content

Commit

Permalink
GH-5121: implementation of left bind join operator
Browse files Browse the repository at this point in the history
This change provides the implementation and activation for the left bind
join operator.

The algorithm is as follows:

- execute left bind join using regular bound join query
- process result iteration similar to BoundJoinVALUESConversionIteration
- remember seen set of bindings (using index) and add original bindings
to those, i.e. put to result return all non-seen bindings directly from
the input

Note that the terminology in literature has changed to "bind joins".
Hence, for new classes and methods I try to follow that.

Change is covered with some unit tests
  • Loading branch information
aschwarte10 committed Sep 29, 2024
1 parent 2d86cd3 commit 558a595
Show file tree
Hide file tree
Showing 6 changed files with 527 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.eclipse.rdf4j.federated.algebra.FedXZeroLengthPath;
import org.eclipse.rdf4j.federated.algebra.FederatedDescribeOperator;
import org.eclipse.rdf4j.federated.algebra.FilterExpr;
import org.eclipse.rdf4j.federated.algebra.FilterTuple;
import org.eclipse.rdf4j.federated.algebra.FilterValueExpr;
import org.eclipse.rdf4j.federated.algebra.HolderNode;
import org.eclipse.rdf4j.federated.algebra.NJoin;
Expand All @@ -51,8 +52,10 @@
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelServiceExecutor;
import org.eclipse.rdf4j.federated.evaluation.iterator.BindLeftJoinIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.FedXPathIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.FederatedDescribeIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin;
Expand All @@ -66,6 +69,7 @@
import org.eclipse.rdf4j.federated.evaluation.union.ParallelUnionOperatorTask;
import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion;
import org.eclipse.rdf4j.federated.evaluation.union.WorkerUnionBase;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
import org.eclipse.rdf4j.federated.exception.IllegalQueryException;
import org.eclipse.rdf4j.federated.optimizer.DefaultFedXCostModel;
Expand Down Expand Up @@ -935,6 +939,59 @@ public abstract CloseableIteration<BindingSet> evaluateBoundJoinStatementPattern
public abstract CloseableIteration<BindingSet> evaluateGroupedCheck(
CheckStatementPattern stmt, final List<BindingSet> bindings) throws QueryEvaluationException;

/**
* Evaluate the left bind join for the given {@link StatementTupleExpr} and bindings at the relevant endpoints.
*
* @param stmt
* @param bindings
* @return the result iteration
* @throws QueryEvaluationException
* @see {@link BindLeftJoinIteration}
*/
public CloseableIteration<BindingSet> evaluateLeftBoundJoinStatementPattern(
StatementTupleExpr stmt, final List<BindingSet> bindings) throws QueryEvaluationException {
// we can omit the bound join handling
if (bindings.size() == 1) {
return evaluate(stmt, bindings.get(0));
}

FilterValueExpr filterExpr = null;
if (stmt instanceof FilterTuple) {
filterExpr = ((FilterTuple) stmt).getFilterExpr();
}

AtomicBoolean isEvaluated = new AtomicBoolean(false);
String preparedQuery = QueryStringUtil.selectQueryStringBoundJoinVALUES((StatementPattern) stmt, bindings,
filterExpr, isEvaluated, stmt.getQueryInfo().getDataset());

CloseableIteration<BindingSet> result = null;
try {
result = evaluateAtStatementSources(preparedQuery, stmt.getStatementSources(), stmt.getQueryInfo());

// apply filter and/or convert to original bindings
if (filterExpr != null && !isEvaluated.get()) {
result = new BindLeftJoinIteration(result, bindings); // apply conversion
result = new FilteringIteration(filterExpr, result, this); // apply filter
if (!result.hasNext()) {
result.close();
return new EmptyIteration<>();
}
} else {
result = new BindLeftJoinIteration(result, bindings);
}

return result;
} catch (Throwable t) {
if (result != null) {
result.close();
}
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw ExceptionUtil.toQueryEvaluationException(t);
}
}

/**
* Evaluate a SERVICE using vectored evaluation, taking the provided bindings as input.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.eclipse.rdf4j.federated.evaluation.iterator.InsertBindingsIteration;
import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin;
import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase;
Expand Down Expand Up @@ -207,8 +208,28 @@ public CloseableIteration<BindingSet> executeJoin(
protected CloseableIteration<BindingSet> executeLeftJoin(ControlledWorkerScheduler<BindingSet> joinScheduler,
CloseableIteration<BindingSet> leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo)
throws QueryEvaluationException {
ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this,
leftIter, leftJoin, bindings, queryInfo);

var rightArg = leftJoin.getRightArg();

// determine if we can execute the expr as bind join
boolean executeAsBindJoin = false;
if (rightArg instanceof BoundJoinTupleExpr) {
if (rightArg instanceof FedXService) {
executeAsBindJoin = false;
} else {
executeAsBindJoin = true;
}
}

JoinExecutorBase<BindingSet> join;
if (executeAsBindJoin) {
join = new ControlledWorkerBindLeftJoin(joinScheduler, this, leftIter, rightArg,
bindings, queryInfo);
} else {
join = new ControlledWorkerLeftJoin(joinScheduler, this,
leftIter, leftJoin, bindings, queryInfo);
}

executor.execute(join);
return join;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.iterator;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.Set;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.common.iteration.LookAheadIteration;
import org.eclipse.rdf4j.query.Binding;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;

/**
* A {@link LookAheadIteration} for processing bind left join results (i.e., result of joining OPTIONAL clauses)
*
* Algorithm:
*
* <ul>
* <li>execute left bind join using regular bound join query</li>
* <li>process result iteration similar to {@link BoundJoinVALUESConversionIteration}</li>
* <li>remember seen set of bindings (using index) and add original bindings to those, i.e. put to result return all
* non-seen bindings directly from the input</li>
*
*
* @author Andreas Schwarte
*/
public class BindLeftJoinIteration extends LookAheadIteration<BindingSet> {

protected final CloseableIteration<BindingSet> iter;
protected final List<BindingSet> bindings;

protected Set<Integer> seenBindingIndexes = new HashSet<>();
protected final ListIterator<BindingSet> bindingsIterator;

public BindLeftJoinIteration(CloseableIteration<BindingSet> iter,
List<BindingSet> bindings) {
this.iter = iter;
this.bindings = bindings;
this.bindingsIterator = bindings.listIterator();
}

@Override
protected BindingSet getNextElement() {

if (iter.hasNext()) {
var bIn = iter.next();
int bIndex = Integer.parseInt(
bIn.getBinding(BoundJoinVALUESConversionIteration.INDEX_BINDING_NAME).getValue().stringValue());
seenBindingIndexes.add(bIndex);
return convert(bIn, bIndex);
}

while (bindingsIterator.hasNext()) {
if (seenBindingIndexes.contains(bindingsIterator.nextIndex())) {
// the binding was already processed as part of the optional
bindingsIterator.next();
continue;
}
return bindingsIterator.next();
}

return null;
}

@Override
protected void handleClose() {
iter.close();
}

protected BindingSet convert(BindingSet bIn, int bIndex) throws QueryEvaluationException {
QueryBindingSet res = new QueryBindingSet();
Iterator<Binding> bIter = bIn.iterator();
while (bIter.hasNext()) {
Binding b = bIter.next();
if (b.getName().equals(BoundJoinVALUESConversionIteration.INDEX_BINDING_NAME)) {
continue;
}
res.addBinding(b);
}
for (Binding bs : bindings.get(bIndex)) {
res.setBinding(bs);
}
return res;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.join;

import java.util.List;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask;
import org.eclipse.rdf4j.federated.structures.QueryInfo;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.TupleExpr;

/**
* Bind join implementation for left joins (i.e., OPTIOAL clauses)
*
* @author Andreas Schwarte
*/
public class ControlledWorkerBindLeftJoin extends ControlledWorkerBindJoinBase {

public ControlledWorkerBindLeftJoin(ControlledWorkerScheduler<BindingSet> scheduler,
FederationEvalStrategy strategy, CloseableIteration<BindingSet> leftIter, TupleExpr rightArg,
BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException {
super(scheduler, strategy, leftIter, rightArg, bindings, queryInfo);
}

@Override
protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) {
final TaskCreator taskCreator;
if (expr instanceof StatementTupleExpr) {
StatementTupleExpr stmt = (StatementTupleExpr) expr;
taskCreator = new LeftBoundJoinTaskCreator(strategy, stmt);

} else {
throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName()
+ ". Please report this problem.");
}
return taskCreator;
}

static protected class LeftBoundJoinTaskCreator implements TaskCreator {
protected final FederationEvalStrategy _strategy;
protected final StatementTupleExpr _expr;

public LeftBoundJoinTaskCreator(
FederationEvalStrategy strategy, StatementTupleExpr expr) {
super();
_strategy = strategy;
_expr = expr;
}

@Override
public ParallelTask<BindingSet> getTask(ParallelExecutor<BindingSet> control, List<BindingSet> bindings) {
return new ParallelBindLeftJoinTask(control, _strategy, _expr, bindings);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.join;

import java.util.List;

import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr;
import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor;
import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTaskBase;
import org.eclipse.rdf4j.query.BindingSet;

/**
* A {@link ParallelTaskBase} for executing bind left joins.
*
* @author Andreas Schwarte
* @see FederationEvalStrategy#evaluateLeftBoundJoinStatementPattern(StatementTupleExpr, List)
*/
public class ParallelBindLeftJoinTask extends ParallelTaskBase<BindingSet> {

protected final FederationEvalStrategy strategy;
protected final StatementTupleExpr rightArg;
protected final List<BindingSet> bindings;
protected final ParallelExecutor<BindingSet> joinControl;

public ParallelBindLeftJoinTask(ParallelExecutor<BindingSet> joinControl, FederationEvalStrategy strategy,
StatementTupleExpr expr, List<BindingSet> bindings) {
this.strategy = strategy;
this.rightArg = expr;
this.bindings = bindings;
this.joinControl = joinControl;
}

@Override
public ParallelExecutor<BindingSet> getControl() {
return joinControl;
}

@Override
protected CloseableIteration<BindingSet> performTaskInternal() throws Exception {
return strategy.evaluateLeftBoundJoinStatementPattern(rightArg, bindings);
}

}
Loading

0 comments on commit 558a595

Please sign in to comment.