From 558a5952b9161ce97701914d1a6daebc6291f5e2 Mon Sep 17 00:00:00 2001 From: Andreas Schwarte Date: Fri, 6 Sep 2024 14:11:17 +0200 Subject: [PATCH] GH-5121: implementation of left bind join operator This change provides the implementation and activation for the left bind join operator. The algorithm is as follows: - execute left bind join using regular bound join query - process result iteration similar to BoundJoinVALUESConversionIteration - remember seen set of bindings (using index) and add original bindings to those, i.e. put to result return all non-seen bindings directly from the input Note that the terminology in literature has changed to "bind joins". Hence, for new classes and methods I try to follow that. Change is covered with some unit tests --- .../evaluation/FederationEvalStrategy.java | 57 +++++ .../SparqlFederationEvalStrategy.java | 25 +- .../iterator/BindLeftJoinIteration.java | 99 ++++++++ .../join/ControlledWorkerBindLeftJoin.java | 70 ++++++ .../join/ParallelBindLeftJoinTask.java | 53 +++++ .../rdf4j/federated/BindLeftJoinTests.java | 225 ++++++++++++++++++ 6 files changed, 527 insertions(+), 2 deletions(-) create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BindLeftJoinIteration.java create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelBindLeftJoinTask.java create mode 100644 tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java index 56693c0277..561a00bacb 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/FederationEvalStrategy.java @@ -37,6 +37,7 @@ import org.eclipse.rdf4j.federated.algebra.FedXZeroLengthPath; import org.eclipse.rdf4j.federated.algebra.FederatedDescribeOperator; import org.eclipse.rdf4j.federated.algebra.FilterExpr; +import org.eclipse.rdf4j.federated.algebra.FilterTuple; import org.eclipse.rdf4j.federated.algebra.FilterValueExpr; import org.eclipse.rdf4j.federated.algebra.HolderNode; import org.eclipse.rdf4j.federated.algebra.NJoin; @@ -51,8 +52,10 @@ import org.eclipse.rdf4j.federated.endpoint.Endpoint; import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelServiceExecutor; +import org.eclipse.rdf4j.federated.evaluation.iterator.BindLeftJoinIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.FedXPathIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.FederatedDescribeIteration; +import org.eclipse.rdf4j.federated.evaluation.iterator.FilteringIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBoundJoin; @@ -66,6 +69,7 @@ import org.eclipse.rdf4j.federated.evaluation.union.ParallelUnionOperatorTask; import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion; import org.eclipse.rdf4j.federated.evaluation.union.WorkerUnionBase; +import org.eclipse.rdf4j.federated.exception.ExceptionUtil; import org.eclipse.rdf4j.federated.exception.FedXRuntimeException; import org.eclipse.rdf4j.federated.exception.IllegalQueryException; import org.eclipse.rdf4j.federated.optimizer.DefaultFedXCostModel; @@ -935,6 +939,59 @@ public abstract CloseableIteration evaluateBoundJoinStatementPattern public abstract CloseableIteration evaluateGroupedCheck( CheckStatementPattern stmt, final List bindings) throws QueryEvaluationException; + /** + * Evaluate the left bind join for the given {@link StatementTupleExpr} and bindings at the relevant endpoints. + * + * @param stmt + * @param bindings + * @return the result iteration + * @throws QueryEvaluationException + * @see {@link BindLeftJoinIteration} + */ + public CloseableIteration evaluateLeftBoundJoinStatementPattern( + StatementTupleExpr stmt, final List bindings) throws QueryEvaluationException { + // we can omit the bound join handling + if (bindings.size() == 1) { + return evaluate(stmt, bindings.get(0)); + } + + FilterValueExpr filterExpr = null; + if (stmt instanceof FilterTuple) { + filterExpr = ((FilterTuple) stmt).getFilterExpr(); + } + + AtomicBoolean isEvaluated = new AtomicBoolean(false); + String preparedQuery = QueryStringUtil.selectQueryStringBoundJoinVALUES((StatementPattern) stmt, bindings, + filterExpr, isEvaluated, stmt.getQueryInfo().getDataset()); + + CloseableIteration result = null; + try { + result = evaluateAtStatementSources(preparedQuery, stmt.getStatementSources(), stmt.getQueryInfo()); + + // apply filter and/or convert to original bindings + if (filterExpr != null && !isEvaluated.get()) { + result = new BindLeftJoinIteration(result, bindings); // apply conversion + result = new FilteringIteration(filterExpr, result, this); // apply filter + if (!result.hasNext()) { + result.close(); + return new EmptyIteration<>(); + } + } else { + result = new BindLeftJoinIteration(result, bindings); + } + + return result; + } catch (Throwable t) { + if (result != null) { + result.close(); + } + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + throw ExceptionUtil.toQueryEvaluationException(t); + } + } + /** * Evaluate a SERVICE using vectored evaluation, taking the provided bindings as input. * diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java index 234614402a..2018c76218 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/SparqlFederationEvalStrategy.java @@ -32,6 +32,7 @@ import org.eclipse.rdf4j.federated.evaluation.iterator.InsertBindingsIteration; import org.eclipse.rdf4j.federated.evaluation.iterator.SingleBindingSetIteration; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerLeftJoin; import org.eclipse.rdf4j.federated.evaluation.join.JoinExecutorBase; @@ -207,8 +208,28 @@ public CloseableIteration executeJoin( protected CloseableIteration executeLeftJoin(ControlledWorkerScheduler joinScheduler, CloseableIteration leftIter, LeftJoin leftJoin, BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException { - ControlledWorkerLeftJoin join = new ControlledWorkerLeftJoin(joinScheduler, this, - leftIter, leftJoin, bindings, queryInfo); + + var rightArg = leftJoin.getRightArg(); + + // determine if we can execute the expr as bind join + boolean executeAsBindJoin = false; + if (rightArg instanceof BoundJoinTupleExpr) { + if (rightArg instanceof FedXService) { + executeAsBindJoin = false; + } else { + executeAsBindJoin = true; + } + } + + JoinExecutorBase join; + if (executeAsBindJoin) { + join = new ControlledWorkerBindLeftJoin(joinScheduler, this, leftIter, rightArg, + bindings, queryInfo); + } else { + join = new ControlledWorkerLeftJoin(joinScheduler, this, + leftIter, leftJoin, bindings, queryInfo); + } + executor.execute(join); return join; } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BindLeftJoinIteration.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BindLeftJoinIteration.java new file mode 100644 index 0000000000..4b220db24b --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/iterator/BindLeftJoinIteration.java @@ -0,0 +1,99 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.iterator; + +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.ListIterator; +import java.util.Set; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.common.iteration.LookAheadIteration; +import org.eclipse.rdf4j.query.Binding; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet; + +/** + * A {@link LookAheadIteration} for processing bind left join results (i.e., result of joining OPTIONAL clauses) + * + * Algorithm: + * + *
    + *
  • execute left bind join using regular bound join query
  • + *
  • process result iteration similar to {@link BoundJoinVALUESConversionIteration}
  • + *
  • remember seen set of bindings (using index) and add original bindings to those, i.e. put to result return all + * non-seen bindings directly from the input
  • + * + * + * @author Andreas Schwarte + */ +public class BindLeftJoinIteration extends LookAheadIteration { + + protected final CloseableIteration iter; + protected final List bindings; + + protected Set seenBindingIndexes = new HashSet<>(); + protected final ListIterator bindingsIterator; + + public BindLeftJoinIteration(CloseableIteration iter, + List bindings) { + this.iter = iter; + this.bindings = bindings; + this.bindingsIterator = bindings.listIterator(); + } + + @Override + protected BindingSet getNextElement() { + + if (iter.hasNext()) { + var bIn = iter.next(); + int bIndex = Integer.parseInt( + bIn.getBinding(BoundJoinVALUESConversionIteration.INDEX_BINDING_NAME).getValue().stringValue()); + seenBindingIndexes.add(bIndex); + return convert(bIn, bIndex); + } + + while (bindingsIterator.hasNext()) { + if (seenBindingIndexes.contains(bindingsIterator.nextIndex())) { + // the binding was already processed as part of the optional + bindingsIterator.next(); + continue; + } + return bindingsIterator.next(); + } + + return null; + } + + @Override + protected void handleClose() { + iter.close(); + } + + protected BindingSet convert(BindingSet bIn, int bIndex) throws QueryEvaluationException { + QueryBindingSet res = new QueryBindingSet(); + Iterator bIter = bIn.iterator(); + while (bIter.hasNext()) { + Binding b = bIter.next(); + if (b.getName().equals(BoundJoinVALUESConversionIteration.INDEX_BINDING_NAME)) { + continue; + } + res.addBinding(b); + } + for (Binding bs : bindings.get(bIndex)) { + res.setBinding(bs); + } + return res; + } + +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java new file mode 100644 index 0000000000..a30ed66cf7 --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ControlledWorkerBindLeftJoin.java @@ -0,0 +1,70 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.join; + +import java.util.List; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr; +import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTask; +import org.eclipse.rdf4j.federated.structures.QueryInfo; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.QueryEvaluationException; +import org.eclipse.rdf4j.query.algebra.TupleExpr; + +/** + * Bind join implementation for left joins (i.e., OPTIOAL clauses) + * + * @author Andreas Schwarte + */ +public class ControlledWorkerBindLeftJoin extends ControlledWorkerBindJoinBase { + + public ControlledWorkerBindLeftJoin(ControlledWorkerScheduler scheduler, + FederationEvalStrategy strategy, CloseableIteration leftIter, TupleExpr rightArg, + BindingSet bindings, QueryInfo queryInfo) throws QueryEvaluationException { + super(scheduler, strategy, leftIter, rightArg, bindings, queryInfo); + } + + @Override + protected TaskCreator determineTaskCreator(TupleExpr expr, BindingSet bs) { + final TaskCreator taskCreator; + if (expr instanceof StatementTupleExpr) { + StatementTupleExpr stmt = (StatementTupleExpr) expr; + taskCreator = new LeftBoundJoinTaskCreator(strategy, stmt); + + } else { + throw new RuntimeException("Expr is of unexpected type: " + expr.getClass().getCanonicalName() + + ". Please report this problem."); + } + return taskCreator; + } + + static protected class LeftBoundJoinTaskCreator implements TaskCreator { + protected final FederationEvalStrategy _strategy; + protected final StatementTupleExpr _expr; + + public LeftBoundJoinTaskCreator( + FederationEvalStrategy strategy, StatementTupleExpr expr) { + super(); + _strategy = strategy; + _expr = expr; + } + + @Override + public ParallelTask getTask(ParallelExecutor control, List bindings) { + return new ParallelBindLeftJoinTask(control, _strategy, _expr, bindings); + } + } + +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelBindLeftJoinTask.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelBindLeftJoinTask.java new file mode 100644 index 0000000000..bfabcb87c4 --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/join/ParallelBindLeftJoinTask.java @@ -0,0 +1,53 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.join; + +import java.util.List; + +import org.eclipse.rdf4j.common.iteration.CloseableIteration; +import org.eclipse.rdf4j.federated.algebra.StatementTupleExpr; +import org.eclipse.rdf4j.federated.evaluation.FederationEvalStrategy; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelExecutor; +import org.eclipse.rdf4j.federated.evaluation.concurrent.ParallelTaskBase; +import org.eclipse.rdf4j.query.BindingSet; + +/** + * A {@link ParallelTaskBase} for executing bind left joins. + * + * @author Andreas Schwarte + * @see FederationEvalStrategy#evaluateLeftBoundJoinStatementPattern(StatementTupleExpr, List) + */ +public class ParallelBindLeftJoinTask extends ParallelTaskBase { + + protected final FederationEvalStrategy strategy; + protected final StatementTupleExpr rightArg; + protected final List bindings; + protected final ParallelExecutor joinControl; + + public ParallelBindLeftJoinTask(ParallelExecutor joinControl, FederationEvalStrategy strategy, + StatementTupleExpr expr, List bindings) { + this.strategy = strategy; + this.rightArg = expr; + this.bindings = bindings; + this.joinControl = joinControl; + } + + @Override + public ParallelExecutor getControl() { + return joinControl; + } + + @Override + protected CloseableIteration performTaskInternal() throws Exception { + return strategy.evaluateLeftBoundJoinStatementPattern(rightArg, bindings); + } + +} diff --git a/tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java new file mode 100644 index 0000000000..8bc3f09c66 --- /dev/null +++ b/tools/federation/src/test/java/org/eclipse/rdf4j/federated/BindLeftJoinTests.java @@ -0,0 +1,225 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated; + +import java.util.Arrays; +import java.util.Set; + +import org.eclipse.rdf4j.common.iteration.Iterations; +import org.eclipse.rdf4j.federated.monitoring.MonitoringUtil; +import org.eclipse.rdf4j.model.util.Values; +import org.eclipse.rdf4j.model.vocabulary.FOAF; +import org.eclipse.rdf4j.model.vocabulary.OWL; +import org.eclipse.rdf4j.query.BindingSet; +import org.eclipse.rdf4j.query.TupleQuery; +import org.eclipse.rdf4j.query.TupleQueryResult; +import org.eclipse.rdf4j.repository.Repository; +import org.eclipse.rdf4j.repository.RepositoryConnection; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class BindLeftJoinTests extends SPARQLBaseTest { + + @Override + protected void initFedXConfig() { + + fedxRule.withConfiguration(config -> { + config.withEnableMonitoring(true); + }); + } + + @Test + public void test_leftBindJoin_basic() throws Exception { + + prepareTest( + Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl", + "/tests/basic/data_emptyStore.ttl")); + + Repository repo1 = getRepository(1); + Repository repo2 = getRepository(2); + Repository repo3 = getRepository(3); + + Repository fedxRepo = fedxRule.getRepository(); + + fedxRule.setConfig(config -> { + config.withBoundJoinBlockSize(10); + }); + + // add some persons + try (RepositoryConnection conn = repo1.getConnection()) { + + for (int i = 1; i <= 30; i++) { + var p = Values.iri("http://ex.com/p" + i); + var otherP = Values.iri("http://other.com/p" + i); + conn.add(p, OWL.SAMEAS, otherP); + } + } + + // add names for person 1, 4, 7, ... + try (RepositoryConnection conn = repo2.getConnection()) { + + for (int i = 1; i <= 30; i += 3) { + var otherP = Values.iri("http://other.com/p" + i); + conn.add(otherP, FOAF.NAME, Values.literal("Person " + i)); + } + } + + // add names for person 2, 5, 8, ... + try (RepositoryConnection conn = repo3.getConnection()) { + + for (int i = 2; i <= 30; i += 3) { + var otherP = Values.iri("http://other.com/p" + i); + conn.add(otherP, FOAF.NAME, Values.literal("Person " + i)); + } + } + + try { + // run query which joins results from multiple repos + // for a subset of persons there exist names + try (RepositoryConnection conn = fedxRepo.getConnection()) { + String query = "PREFIX foaf: " + + "SELECT * WHERE { " + + " ?person owl:sameAs ?otherPerson . " + + " OPTIONAL { ?otherPerson foaf:name ?name . } " // # @repo2 and @repo3 + + "}"; + + TupleQuery tupleQuery = conn.prepareTupleQuery(query); + try (TupleQueryResult tqr = tupleQuery.evaluate()) { + var bindings = Iterations.asList(tqr); + + MonitoringUtil.printMonitoringInformation(federationContext()); + + Assertions.assertEquals(30, bindings.size()); + + for (int i = 1; i <= 30; i++) { + var p = Values.iri("http://ex.com/p" + i); + var otherP = Values.iri("http://other.com/p" + i); + + // find the bindingset for the person in the unordered result + BindingSet bs = bindings.stream() + .filter(b -> b.getValue("person").equals(p)) + .findFirst() + .orElseThrow(); + + Assertions.assertEquals(otherP, bs.getValue("otherPerson")); + if (i % 3 == 1 || i % 3 == 2) { + // names from repo 2 or 3 + Assertions.assertEquals("Person " + i, bs.getValue("name").stringValue()); + } else { + // no name for others + Assertions.assertFalse(bs.hasBinding("name")); + } + } + } + + } + + } finally { + fedxRepo.shutDown(); + } + + } + + @Test + public void testBoundLeftJoin_stmt_nonExclusive_boundCheck() + throws Exception { + + prepareTest( + Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl", + "/tests/basic/data_emptyStore.ttl")); + + // test scenario: + // 3 repositories, 30 persons, bind join size 10, names distributed in repo 2 + // and repo 3 + Repository repo1 = getRepository(1); + Repository repo2 = getRepository(2); + Repository repo3 = getRepository(3); + + Repository fedxRepo = fedxRule.getRepository(); + + fedxRule.setConfig(config -> { + config.withBoundJoinBlockSize(10); + }); + + // add some persons + try (RepositoryConnection conn = repo1.getConnection()) { + + for (int i = 1; i <= 30; i++) { + var p = Values.iri("http://ex.com/p" + i); + var otherP = Values.iri("http://other.com/p" + i); + conn.add(p, OWL.SAMEAS, otherP); + } + } + + // add "male" for person 1, 4, 7, ... + try (RepositoryConnection conn = repo2.getConnection()) { + + for (int i = 1; i <= 30; i += 3) { + var otherP = Values.iri("http://other.com/p" + i); + conn.add(otherP, FOAF.GENDER, Values.literal("male")); + } + } + + // add "female" for person 2, 5, 8, ... + // add "male" for person 30 + try (RepositoryConnection conn = repo3.getConnection()) { + + for (int i = 2; i <= 30; i += 3) { + var otherP = Values.iri("http://other.com/p" + i); + conn.add(otherP, FOAF.GENDER, Values.literal("female")); + } + + conn.add(Values.iri("http://other.com/p30"), FOAF.GENDER, Values.literal("male")); + } + + try { + // run query which joins results from multiple repos + // for a subset of persons there exist names + try (RepositoryConnection conn = fedxRepo.getConnection()) { + String query = "PREFIX foaf: " + + "SELECT * WHERE { " + + " ?person owl:sameAs ?otherPerson . " + + " OPTIONAL { " + + " ?otherPerson foaf:gender \"male\" . " // # @repo2 and @repo3 + + " } " + + "}"; + + TupleQuery tupleQuery = conn.prepareTupleQuery(query); + try (TupleQueryResult tqr = tupleQuery.evaluate()) { + var bindings = Iterations.asList(tqr); + + Assertions.assertEquals(30, bindings.size()); + + MonitoringUtil.printMonitoringInformation(federationContext()); + + for (int i = 1; i <= 30; i++) { + var p = Values.iri("http://ex.com/p" + i); + var otherP = Values.iri("http://other.com/p" + i); + + // find the bindingset for the person in the unordered result + BindingSet bs = bindings.stream() + .filter(b -> b.getValue("person").equals(p)) + .findFirst() + .orElseThrow(); + + Assertions.assertEquals(otherP, bs.getValue("otherPerson")); + Assertions.assertEquals(Set.of("person", "otherPerson"), bs.getBindingNames()); + } + } + + } + + } finally { + fedxRepo.shutDown(); + } + } + +}