diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java index 568ae96d56..7edb14df9e 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java @@ -22,6 +22,8 @@ import org.eclipse.rdf4j.federated.endpoint.Endpoint; import org.eclipse.rdf4j.federated.endpoint.ResolvableEndpoint; import org.eclipse.rdf4j.federated.evaluation.FederationEvaluationStrategyFactory; +import org.eclipse.rdf4j.federated.evaluation.concurrent.DefaultSchedulerFactory; +import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory; import org.eclipse.rdf4j.federated.exception.ExceptionUtil; import org.eclipse.rdf4j.federated.exception.FedXException; import org.eclipse.rdf4j.federated.exception.FedXRuntimeException; @@ -64,6 +66,8 @@ public class FedX extends AbstractSail implements RepositoryResolverClient { private FederationEvaluationStrategyFactory strategyFactory; + private SchedulerFactory schedulerFactory = DefaultSchedulerFactory.INSTANCE; + private WriteStrategyFactory writeStrategyFactory; private File dataDir; @@ -96,6 +100,19 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory this.strategyFactory = strategyFactory; } + /* package */ SchedulerFactory getSchedulerFactory() { + return schedulerFactory; + } + + /** + * Set the {@link SchedulerFactory}. Can only be done before initialization of the federation + * + * @param schedulerFactory the {@link SchedulerFactory} + */ + public void setSchedulerFactory(SchedulerFactory schedulerFactory) { + this.schedulerFactory = schedulerFactory; + } + /** * * @param writeStrategyFactory the {@link WriteStrategyFactory} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java index 263ace78bf..e2b34c8b70 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java @@ -26,6 +26,7 @@ import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; import org.eclipse.rdf4j.federated.evaluation.concurrent.NamingThreadFactory; import org.eclipse.rdf4j.federated.evaluation.concurrent.Scheduler; +import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory; import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper; import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion; import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion; @@ -118,26 +119,28 @@ public void reset() { log.debug("Scheduler for join and union are reset."); } + SchedulerFactory schedulerFactory = federation.getSchedulerFactory(); + Optional taskWrapper = federationContext.getConfig().getTaskWrapper(); if (joinScheduler != null) { joinScheduler.abort(); } - joinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getJoinWorkerThreads(), - "Join Scheduler"); + joinScheduler = schedulerFactory.createJoinScheduler(federationContext, + federationContext.getConfig().getJoinWorkerThreads()); taskWrapper.ifPresent(joinScheduler::setTaskWrapper); if (unionScheduler != null) { unionScheduler.abort(); } - unionScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getUnionWorkerThreads(), - "Union Scheduler"); + unionScheduler = schedulerFactory.createUnionScheduler(federationContext, + federationContext.getConfig().getUnionWorkerThreads()); taskWrapper.ifPresent(unionScheduler::setTaskWrapper); if (leftJoinScheduler != null) { leftJoinScheduler.abort(); } - leftJoinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getLeftJoinWorkerThreads(), - "Left Join Scheduler"); + leftJoinScheduler = schedulerFactory.createLeftJoinScheduler(federationContext, + federationContext.getConfig().getLeftJoinWorkerThreads()); taskWrapper.ifPresent(leftJoinScheduler::setTaskWrapper); } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java index 1060e86a2d..ea57e5192d 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java @@ -11,12 +11,14 @@ package org.eclipse.rdf4j.federated.evaluation.concurrent; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.eclipse.rdf4j.common.annotation.Experimental; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; @@ -42,7 +44,9 @@ public class ControlledWorkerScheduler implements Scheduler, TaskWrapperAw private final ExecutorService executor; - private final LinkedBlockingQueue _taskQueue = new LinkedBlockingQueue<>(); + // TODO: in the next major version of RDF4J this final field should be removed. + // Initialization of the executor service should managed the details + private final BlockingQueue _taskQueue; private final int nWorkers; private final String name; @@ -57,7 +61,8 @@ public class ControlledWorkerScheduler implements Scheduler, TaskWrapperAw public ControlledWorkerScheduler(int nWorkers, String name) { this.nWorkers = nWorkers; this.name = name; - this.executor = createExecutorService(); + this._taskQueue = createBlockingQueue(); + this.executor = createExecutorService(nWorkers, name); } /** @@ -112,13 +117,36 @@ public int getTotalNumberOfWorkers() { return nWorkers; } + @Deprecated(forRemoval = true, since = "5.1") // currently unused and this class is internal public int getNumberOfTasks() { return _taskQueue.size(); } - private ExecutorService createExecutorService() { + /** + * Create the {@link BlockingQueue} used for the thread pool. The default implementation creates a + * {@link LinkedBlockingQueue}. + * + * @return + */ + @Experimental + protected BlockingQueue createBlockingQueue() { + return new LinkedBlockingQueue<>(); + } + + /** + * Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The + * default implementation creates a thread pool with a {@link LinkedBlockingQueue}. + * + * The thread pool should be configured to terminate idle threads after a period of time (default: 60s) + * + * @param nWorkers the number of workers in the thread pool + * @param name the base name for threads in the pool + * @return + */ + @Experimental + protected ExecutorService createExecutorService(int nWorkers, String name) { - ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, _taskQueue, + ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue, new NamingThreadFactory(name)); executor.allowCoreThreadTimeOut(true); return executor; diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/DefaultSchedulerFactory.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/DefaultSchedulerFactory.java new file mode 100644 index 0000000000..dd063b6191 --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/DefaultSchedulerFactory.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.concurrent; + +import org.eclipse.rdf4j.federated.FederationContext; +import org.eclipse.rdf4j.query.BindingSet; + +/** + * The default {@link SchedulerFactory} + */ +public class DefaultSchedulerFactory implements SchedulerFactory { + + public static final DefaultSchedulerFactory INSTANCE = new DefaultSchedulerFactory(); + + @Override + public ControlledWorkerScheduler createJoinScheduler(FederationContext federationContext, + int nWorkers) { + return new ControlledWorkerScheduler<>(nWorkers, + "Join Scheduler"); + } + + @Override + public ControlledWorkerScheduler createUnionScheduler(FederationContext federationContext, + int nWorkers) { + return new ControlledWorkerScheduler<>(nWorkers, + "Union Scheduler"); + } + + @Override + public ControlledWorkerScheduler createLeftJoinScheduler(FederationContext federationContext, + int nWorkers) { + return new ControlledWorkerScheduler<>(nWorkers, + "Left Join Scheduler"); + } + +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java new file mode 100644 index 0000000000..50c004b35c --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java @@ -0,0 +1,60 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.concurrent; + +import org.eclipse.rdf4j.federated.FederationContext; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ParallelBindLeftJoinTask; +import org.eclipse.rdf4j.federated.evaluation.join.ParallelBoundJoinTask; +import org.eclipse.rdf4j.query.BindingSet; + +/** + * Factory for creating {@link ControlledWorkerScheduler} for executing subqueries (e.g. joins) in the background + * + * @see DefaultSchedulerFactory + * @author Andreas Schwarte + */ +public interface SchedulerFactory { + + /** + * Create a {@link ControlledWorkerScheduler} for regular joins (e.g., the sub-queries generated as part of bind + * joins) + * + * @param federationContext + * @param nWorkers + * @return + * @see ControlledWorkerBindJoin + * @see ParallelBoundJoinTask + */ + ControlledWorkerScheduler createJoinScheduler(FederationContext federationContext, int nWorkers); + + /** + * Create a {@link ControlledWorkerScheduler} for unions (e.g., for executing UNION operands in parallel) + * + * @param federationContext + * @param nWorkers + * @return + */ + ControlledWorkerScheduler createUnionScheduler(FederationContext federationContext, int nWorkers); + + /** + * Create a {@link ControlledWorkerScheduler} for left joins (e.g., the sub-queries generated as part of left bind + * joins, i.e. OPTIONAL) + * + * @param federationContext + * @param nWorkers + * @return + * @see ControlledWorkerBindLeftJoin + * @see ParallelBindLeftJoinTask + */ + ControlledWorkerScheduler createLeftJoinScheduler(FederationContext federationContext, int nWorkers); +}