Skip to content

Commit

Permalink
GH-5121: configurability of bind left joins
Browse files Browse the repository at this point in the history
Bind left joins for OPTIONAL can be disabled using the
"enableOptionalAsBindJoin" flag in the federation config

Integrate the switch between implementations in the unit test as
parameterized test
  • Loading branch information
aschwarte10 committed Sep 29, 2024
1 parent 558a595 commit 2a7075a
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 21 deletions.
3 changes: 2 additions & 1 deletion site/content/documentation/programming/federation.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ FedX provides various means for configuration. Configuration settings can be def
|leftJoinWorkerThreads | The number of left join worker threads for parallelization, default _10_ |
|boundJoinBlockSize | Block size for bound joins, default _25_ |
|enforceMaxQueryTime | Max query time in seconds, 0 to disable, default _30_ |
|enableServiceAsBoundJoin | Flag for evaluating a SERVICE expression (contacting non-federation members) using vectored evaluation, default _true_. For today's endpoints it is more efficient to disable vectored evaluation of SERVICE |
|enableServiceAsBoundJoin | Flag for evaluating a SERVICE expression (contacting non-federation members) using vectored evaluation, default _true_. |
|enableOptionalAsBindJoin | Flag for evaluating an OPTIONAL expression using bind join, default _true_. |
|includeInferredDefault | whether include inferred statements should be considered, default _true_ |
|consumingIterationMax | the max number of results to be consumed by `ConsumingIteration`, default _1000_ |
|debugQueryPlan | Print the optimized query execution plan to stdout, default _false_ |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import java.util.Optional;

import org.eclipse.rdf4j.collection.factory.api.CollectionFactory;
import org.eclipse.rdf4j.collection.factory.impl.DefaultCollectionFactory;
import org.eclipse.rdf4j.federated.cache.SourceSelectionCache;
import org.eclipse.rdf4j.federated.cache.SourceSelectionCacheFactory;
import org.eclipse.rdf4j.federated.cache.SourceSelectionMemoryCache;
Expand Down Expand Up @@ -48,6 +47,8 @@ public class FedXConfig {

private boolean enableServiceAsBoundJoin = true;

private boolean enableOptionalAsBindJoin = true;

private boolean enableMonitoring = false;

private boolean isLogQueryPlan = false;
Expand All @@ -68,7 +69,6 @@ public class FedXConfig {

private int consumingIterationMax = 1000;

private CollectionFactory cf = new DefaultCollectionFactory();
/* factory like setters */

/**
Expand Down Expand Up @@ -244,6 +244,17 @@ public FedXConfig withEnableServiceAsBoundJoin(boolean flag) {
return this;
}

/**
* Whether OPTIONAL clauses are evaluated using bind join (i.e. with the VALUES clause). Default <i>true</i>
*
* @param flag
* @return the current config.
*/
public FedXConfig withEnableOptionalAsBindJoin(boolean flag) {
this.enableOptionalAsBindJoin = flag;
return this;
}

/**
* The cache specification for the {@link SourceSelectionMemoryCache}. If not set explicitly, the
* {@link SourceSelectionMemoryCache#DEFAULT_CACHE_SPEC} is used.
Expand Down Expand Up @@ -326,16 +337,26 @@ public int getBoundJoinBlockSize() {
* Returns a flag indicating whether vectored evaluation using the VALUES clause shall be applied for SERVICE
* expressions.
*
* Default: false
* Default: true
*
* Note: for todays endpoints it is more efficient to disable vectored evaluation of SERVICE.
*
* @return whether SERVICE expressions are evaluated using bound joins
* @return whether SERVICE expressions are evaluated using bind joins
*/
public boolean getEnableServiceAsBoundJoin() {
return enableServiceAsBoundJoin;
}

/**
* Returns a flag indicating whether bind join evaluation using the VALUES clause shall be applied for OPTIONAL
* expressions.
*
* Default: true
*
* @return whether OPTIONAL expressions are evaluated using bind joins
*/
public boolean isEnableOptionalAsBindJoin() {
return enableOptionalAsBindJoin;
}

/**
* Get the maximum query time in seconds used for query evaluation. Applied if {@link QueryManager} is used to
* create queries.
Expand Down Expand Up @@ -485,9 +506,10 @@ public int getConsumingIterationMax() {
*
* @param cf
* @return the current config
* @deprecated unusedO
*/
@Deprecated(forRemoval = true)
public FedXConfig withCollectionFactory(CollectionFactory cf) {
this.cf = cf;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,10 +210,11 @@ protected CloseableIteration<BindingSet> executeLeftJoin(ControlledWorkerSchedul
throws QueryEvaluationException {

var rightArg = leftJoin.getRightArg();
var fedxConfig = queryInfo.getFederationContext().getConfig();

// determine if we can execute the expr as bind join
boolean executeAsBindJoin = false;
if (rightArg instanceof BoundJoinTupleExpr) {
if (fedxConfig.isEnableOptionalAsBindJoin() && rightArg instanceof BoundJoinTupleExpr) {
if (rightArg instanceof FedXService) {
executeAsBindJoin = false;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ public class FedXRepositoryConfig extends AbstractRepositoryImplConfig {
*/
public static final IRI CONFIG_ENABLE_SERVICE_AS_BOUND_JOIN = vf.createIRI(NAMESPACE, "enableServiceAsBoundJoin");

/**
* IRI of the property populating {@link FedXConfig#isEnableOptionalAsBindJoin()}
*/
public static final IRI CONFIG_ENABLE_OPTIONAL_AS_BIND_JOIN = vf.createIRI(NAMESPACE, "enableOptionalAsBindJoin");

/**
* IRI of the property populating {@link FedXConfig#isEnableMonitoring()}
*/
Expand Down Expand Up @@ -331,6 +336,9 @@ private void parseFedXConfigInternal(Model m, Resource confNode) throws Reposito
Models.objectLiteral(m.getStatements(confNode, CONFIG_ENABLE_SERVICE_AS_BOUND_JOIN, null))
.ifPresent(value -> config.withEnableServiceAsBoundJoin(value.booleanValue()));

Models.objectLiteral(m.getStatements(confNode, CONFIG_ENABLE_OPTIONAL_AS_BIND_JOIN, null))
.ifPresent(value -> config.withEnableOptionalAsBindJoin(value.booleanValue()));

Models.objectLiteral(m.getStatements(confNode, CONFIG_ENABLE_MONITORING, null))
.ifPresent(value -> config.withEnableMonitoring(value.booleanValue()));

Expand Down Expand Up @@ -384,6 +392,9 @@ protected void exportFedXConfig(Model model, Resource implNode) {
model.add(confNode, CONFIG_ENABLE_SERVICE_AS_BOUND_JOIN,
vf.createLiteral(config.getEnableServiceAsBoundJoin()));

model.add(confNode, CONFIG_ENABLE_OPTIONAL_AS_BIND_JOIN,
vf.createLiteral(config.isEnableOptionalAsBindJoin()));

model.add(confNode, CONFIG_ENABLE_MONITORING, vf.createLiteral(config.isEnableMonitoring()));

model.add(confNode, CONFIG_LOG_QUERY_PLAN, vf.createLiteral(config.isLogQueryPlan()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class BindLeftJoinTests extends SPARQLBaseTest {

Expand All @@ -36,8 +37,9 @@ protected void initFedXConfig() {
});
}

@Test
public void test_leftBindJoin_basic() throws Exception {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void test_leftBindJoin_basic(boolean bindLeftJoinOptimizationEnabled) throws Exception {

prepareTest(
Arrays.asList("/tests/basic/data_emptyStore.ttl", "/tests/basic/data_emptyStore.ttl",
Expand All @@ -51,6 +53,7 @@ public void test_leftBindJoin_basic() throws Exception {

fedxRule.setConfig(config -> {
config.withBoundJoinBlockSize(10);
config.withEnableOptionalAsBindJoin(bindLeftJoinOptimizationEnabled);
});

// add some persons
Expand Down Expand Up @@ -95,8 +98,6 @@ public void test_leftBindJoin_basic() throws Exception {
try (TupleQueryResult tqr = tupleQuery.evaluate()) {
var bindings = Iterations.asList(tqr);

MonitoringUtil.printMonitoringInformation(federationContext());

Assertions.assertEquals(30, bindings.size());

for (int i = 1; i <= 30; i++) {
Expand All @@ -122,14 +123,25 @@ public void test_leftBindJoin_basic() throws Exception {

}

if (bindLeftJoinOptimizationEnabled) {
assertNumberOfRequests("endpoint1", 3);
assertNumberOfRequests("endpoint2", 5);
assertNumberOfRequests("endpoint3", 5);
} else {
assertNumberOfRequests("endpoint1", 3);
assertNumberOfRequests("endpoint2", 32);
assertNumberOfRequests("endpoint3", 32);
}

} finally {
fedxRepo.shutDown();
}

}

@Test
public void testBoundLeftJoin_stmt_nonExclusive_boundCheck()
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testBoundLeftJoin_stmt_nonExclusive_boundCheck(boolean bindLeftJoinOptimizationEnabled)
throws Exception {

prepareTest(
Expand All @@ -147,6 +159,7 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck()

fedxRule.setConfig(config -> {
config.withBoundJoinBlockSize(10);
config.withEnableOptionalAsBindJoin(bindLeftJoinOptimizationEnabled);
});

// add some persons
Expand Down Expand Up @@ -180,6 +193,8 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck()
conn.add(Values.iri("http://other.com/p30"), FOAF.GENDER, Values.literal("male"));
}

fedxRule.enableDebug();

try {
// run query which joins results from multiple repos
// for a subset of persons there exist names
Expand Down Expand Up @@ -217,6 +232,18 @@ public void testBoundLeftJoin_stmt_nonExclusive_boundCheck()

}

if (bindLeftJoinOptimizationEnabled) {
assertNumberOfRequests("endpoint1", 3);
assertNumberOfRequests("endpoint2", 5);
assertNumberOfRequests("endpoint3", 5);
} else {
assertNumberOfRequests("endpoint1", 3);
// Note: with the current implementation we cannot
// make exact assertions for endpoint 2 and 3
// this is because due to the check statement
// not all requests are required
}

} finally {
fedxRepo.shutDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;

import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.monitoring.MonitoringService;
import org.eclipse.rdf4j.federated.repository.RepositorySettings;
import org.eclipse.rdf4j.federated.server.NativeStoreServer;
import org.eclipse.rdf4j.federated.server.SPARQLEmbeddedServer;
Expand All @@ -28,6 +29,7 @@
import org.eclipse.rdf4j.rio.RDFParseException;
import org.eclipse.rdf4j.rio.Rio;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -237,4 +239,39 @@ protected RepositorySettings repoSettings(int endpoint) {
return server.getRepository(endpoint);
}

/**
* Helper method to check the number of requests sent to respective endpoint
*
* @param memberName the memberName, typically "endpointN", where N >= 1
* @param expectedRequests
*/
protected void assertNumberOfRequests(String memberName, int expectedRequests) {
if (!isSPARQLServer()) {
return; // ignore for non SPARQL server environment where requests are not counted
}
var fedxContext = federationContext();
if (!fedxContext.getConfig().isEnableMonitoring()) {
Assertions.fail("monitoring is not enabled in the current federation.");
}
MonitoringService monitoringService = (MonitoringService) fedxContext.getMonitoringService();

// obtain the monitoring information
// Note: this method has some simplifications for the name
var monitoringInformation = monitoringService.getAllMonitoringInformation()
.stream()
.filter(m -> {
var endpoint = m.getE();
return endpoint.getId().equals(memberName)
|| endpoint.getId().equals("http://" + memberName)
|| endpoint.getName().equals(memberName)
|| endpoint.getName().equals("http://" + memberName);
})
.findFirst()
.orElse(null);

Assertions.assertEquals(expectedRequests,
(monitoringInformation != null ? monitoringInformation.getNumberOfRequests() : 0));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.rdf4j.model.util.Models.subject;
import static org.junit.Assert.assertThat;

import java.io.InputStream;
import java.util.Optional;

import org.eclipse.rdf4j.federated.FedXConfig;
import org.eclipse.rdf4j.federated.util.Vocabulary.FEDX;
Expand Down Expand Up @@ -161,7 +159,7 @@ public void testExport() throws Exception {
.orElse(null);
assertThat(configNode).isNotNull();

assertThat(export.filter(configNode, null, null)).hasSize(14);
assertThat(export.filter(configNode, null, null)).hasSize(15);

assertThat(
Models.objectLiteral(
Expand Down Expand Up @@ -189,6 +187,11 @@ public void testExport() throws Exception {
export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_SERVICE_AS_BOUND_JOIN,
null)))
.hasValueSatisfying(v -> assertThat(v.booleanValue()).isFalse());
assertThat(
Models.objectLiteral(
export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_OPTIONAL_AS_BIND_JOIN,
null)))
.hasValueSatisfying(v -> assertThat(v.booleanValue()).isFalse());
assertThat(
Models.objectLiteral(
export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_MONITORING, null)))
Expand Down Expand Up @@ -242,9 +245,9 @@ public void testExportWithEmptyConfig() throws Exception {
.orElse(null);
assertThat(configNode).isNotNull();

// Note: 14 instead of 12 since CONFIG_SOURCE_SELECTION_CACHE_SPEC & CONFIG_PREFIX_DECLARATIONS are null
// Note: 13 instead of 15 since CONFIG_SOURCE_SELECTION_CACHE_SPEC & CONFIG_PREFIX_DECLARATIONS are null
// and thus should not be populated
assertThat(export.filter(configNode, null, null)).hasSize(12);
assertThat(export.filter(configNode, null, null)).hasSize(13);

assertThat(
Models.objectLiteral(
Expand Down Expand Up @@ -272,6 +275,11 @@ public void testExportWithEmptyConfig() throws Exception {
export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_SERVICE_AS_BOUND_JOIN,
null)))
.hasValueSatisfying(v -> assertThat(v.booleanValue()).isTrue());
assertThat(
Models.objectLiteral(
export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_OPTIONAL_AS_BIND_JOIN,
null)))
.hasValueSatisfying(v -> assertThat(v.booleanValue()).isTrue());
assertThat(
Models.objectLiteral(
export.getStatements(configNode, FedXRepositoryConfig.CONFIG_ENABLE_MONITORING, null)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
fedx:boundJoinBlockSize 104 ;
fedx:enforceMaxQueryTime 105 ;
fedx:enableServiceAsBoundJoin false ;
fedx:enableOptionalAsBindJoin false ;
fedx:enableMonitoring true ;
fedx:logQueryPlan true ;
fedx:logQueries true ;
Expand Down

0 comments on commit 2a7075a

Please sign in to comment.