From d74e6be2e9c4bc17dc739b8c445c965cd9b5c4d4 Mon Sep 17 00:00:00 2001 From: Andreas Schwarte Date: Mon, 11 Nov 2024 14:34:56 +0100 Subject: [PATCH 1/3] GH-5197: preparation for supporting fair sub-query execution in FedX This change adds preparational infrastructure for having different implementations of schedulers. Configuration is here prepared by means of defining a "SchedulerFactory" interface with a default implementation aside (which essentially mimics the current behavior). Note that for ease of development some aspects of ControlledWorkerScheduler are made accessible to sub-classes. The idea is that in the end version there is an abstract scheduler class providing shared functionality and different implementation (e.g. the current FIFO one and a fair implementation) --- .../org/eclipse/rdf4j/federated/FedX.java | 20 ++++++++ .../rdf4j/federated/FederationManager.java | 15 +++--- .../concurrent/ControlledWorkerScheduler.java | 22 ++++++-- .../concurrent/DefaultSchedulerFactory.java | 44 ++++++++++++++++ .../concurrent/SchedulerFactory.java | 50 +++++++++++++++++++ 5 files changed, 141 insertions(+), 10 deletions(-) create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/DefaultSchedulerFactory.java create mode 100644 tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java index 568ae96d56a..f8f7d226607 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java @@ -22,6 +22,8 @@ import org.eclipse.rdf4j.federated.endpoint.Endpoint; import org.eclipse.rdf4j.federated.endpoint.ResolvableEndpoint; import org.eclipse.rdf4j.federated.evaluation.FederationEvaluationStrategyFactory; +import org.eclipse.rdf4j.federated.evaluation.concurrent.DefaultSchedulerFactory; +import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory; import org.eclipse.rdf4j.federated.exception.ExceptionUtil; import org.eclipse.rdf4j.federated.exception.FedXException; import org.eclipse.rdf4j.federated.exception.FedXRuntimeException; @@ -64,6 +66,8 @@ public class FedX extends AbstractSail implements RepositoryResolverClient { private FederationEvaluationStrategyFactory strategyFactory; + private SchedulerFactory schedulerFactory; + private WriteStrategyFactory writeStrategyFactory; private File dataDir; @@ -96,6 +100,22 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory this.strategyFactory = strategyFactory; } + /* package */ SchedulerFactory getSchedulerFactory() { + if (schedulerFactory == null) { + schedulerFactory = DefaultSchedulerFactory.INSTANCE; + } + return schedulerFactory; + } + + /** + * Set the {@link SchedulerFactory}. Can only be done before initialization of the federation + * + * @param schedulerFactory the {@link SchedulerFactory} + */ + public void setSchedulerFactory(SchedulerFactory schedulerFactory) { + this.schedulerFactory = schedulerFactory; + } + /** * * @param writeStrategyFactory the {@link WriteStrategyFactory} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java index 263ace78bf0..e2b34c8b708 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FederationManager.java @@ -26,6 +26,7 @@ import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler; import org.eclipse.rdf4j.federated.evaluation.concurrent.NamingThreadFactory; import org.eclipse.rdf4j.federated.evaluation.concurrent.Scheduler; +import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory; import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper; import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion; import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion; @@ -118,26 +119,28 @@ public void reset() { log.debug("Scheduler for join and union are reset."); } + SchedulerFactory schedulerFactory = federation.getSchedulerFactory(); + Optional taskWrapper = federationContext.getConfig().getTaskWrapper(); if (joinScheduler != null) { joinScheduler.abort(); } - joinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getJoinWorkerThreads(), - "Join Scheduler"); + joinScheduler = schedulerFactory.createJoinScheduler(federationContext, + federationContext.getConfig().getJoinWorkerThreads()); taskWrapper.ifPresent(joinScheduler::setTaskWrapper); if (unionScheduler != null) { unionScheduler.abort(); } - unionScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getUnionWorkerThreads(), - "Union Scheduler"); + unionScheduler = schedulerFactory.createUnionScheduler(federationContext, + federationContext.getConfig().getUnionWorkerThreads()); taskWrapper.ifPresent(unionScheduler::setTaskWrapper); if (leftJoinScheduler != null) { leftJoinScheduler.abort(); } - leftJoinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getLeftJoinWorkerThreads(), - "Left Join Scheduler"); + leftJoinScheduler = schedulerFactory.createLeftJoinScheduler(federationContext, + federationContext.getConfig().getLeftJoinWorkerThreads()); taskWrapper.ifPresent(leftJoinScheduler::setTaskWrapper); } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java index 1060e86a2de..da2ef0d334a 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java @@ -11,6 +11,7 @@ package org.eclipse.rdf4j.federated.evaluation.concurrent; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -42,7 +43,8 @@ public class ControlledWorkerScheduler implements Scheduler, TaskWrapperAw private final ExecutorService executor; - private final LinkedBlockingQueue _taskQueue = new LinkedBlockingQueue<>(); + // Note: initialized in #createExecutorService + protected BlockingQueue _taskQueue; private final int nWorkers; private final String name; @@ -57,7 +59,7 @@ public class ControlledWorkerScheduler implements Scheduler, TaskWrapperAw public ControlledWorkerScheduler(int nWorkers, String name) { this.nWorkers = nWorkers; this.name = name; - this.executor = createExecutorService(); + this.executor = createExecutorService(nWorkers, name); } /** @@ -112,13 +114,25 @@ public int getTotalNumberOfWorkers() { return nWorkers; } + @Deprecated(forRemoval = true) // currently unused and this class is internal public int getNumberOfTasks() { return _taskQueue.size(); } - private ExecutorService createExecutorService() { + /** + * Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The + * default implementation creates a thread pool with a {@link LinkedBlockingQueue}. + * + * @param nWorkers the number of workers in the thread pool + * @param name the base name for threads in the pool + * @return + */ + protected ExecutorService createExecutorService(int nWorkers, String name) { + + // use a LinkedBlockingQueue by default + this._taskQueue = new LinkedBlockingQueue<>(); - ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, _taskQueue, + ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue, new NamingThreadFactory(name)); executor.allowCoreThreadTimeOut(true); return executor; diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/DefaultSchedulerFactory.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/DefaultSchedulerFactory.java new file mode 100644 index 00000000000..dd063b6191a --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/DefaultSchedulerFactory.java @@ -0,0 +1,44 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.concurrent; + +import org.eclipse.rdf4j.federated.FederationContext; +import org.eclipse.rdf4j.query.BindingSet; + +/** + * The default {@link SchedulerFactory} + */ +public class DefaultSchedulerFactory implements SchedulerFactory { + + public static final DefaultSchedulerFactory INSTANCE = new DefaultSchedulerFactory(); + + @Override + public ControlledWorkerScheduler createJoinScheduler(FederationContext federationContext, + int nWorkers) { + return new ControlledWorkerScheduler<>(nWorkers, + "Join Scheduler"); + } + + @Override + public ControlledWorkerScheduler createUnionScheduler(FederationContext federationContext, + int nWorkers) { + return new ControlledWorkerScheduler<>(nWorkers, + "Union Scheduler"); + } + + @Override + public ControlledWorkerScheduler createLeftJoinScheduler(FederationContext federationContext, + int nWorkers) { + return new ControlledWorkerScheduler<>(nWorkers, + "Left Join Scheduler"); + } + +} diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java new file mode 100644 index 00000000000..0e53570d4ab --- /dev/null +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java @@ -0,0 +1,50 @@ +/******************************************************************************* + * Copyright (c) 2024 Eclipse RDF4J contributors. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Distribution License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/org/documents/edl-v10.php. + * + * SPDX-License-Identifier: BSD-3-Clause + *******************************************************************************/ +package org.eclipse.rdf4j.federated.evaluation.concurrent; + +import org.eclipse.rdf4j.federated.FederationContext; +import org.eclipse.rdf4j.query.BindingSet; + +/** + * Factory for creating {@link ControlledWorkerScheduler} for executing subqueries (e.g. joins) in the background + * + * @see DefaultSchedulerFactory + * @author Andreas Schwarte + */ +public interface SchedulerFactory { + + /** + * Create a {@link ControlledWorkerScheduler} for joins + * + * @param federationContext + * @param nWorkers + * @return + */ + ControlledWorkerScheduler createJoinScheduler(FederationContext federationContext, int nWorkers); + + /** + * Create a {@link ControlledWorkerScheduler} for unions + * + * @param federationContext + * @param nWorkers + * @return + */ + ControlledWorkerScheduler createUnionScheduler(FederationContext federationContext, int nWorkers); + + /** + * Create a {@link ControlledWorkerScheduler} for left joins + * + * @param federationContext + * @param nWorkers + * @return + */ + ControlledWorkerScheduler createLeftJoinScheduler(FederationContext federationContext, int nWorkers); +} From ddcadc0356c7eb4968f19b9b03b8e1adc92baa11 Mon Sep 17 00:00:00 2001 From: Andreas Schwarte Date: Tue, 12 Nov 2024 09:16:18 +0100 Subject: [PATCH 2/3] GH-5197: javadoc refinements + smaller initialization changes - for minor version compatibility the type of the "_taskQueue" field in the scheduler cannot be changed (to non-final). Hence, for now we use a dedicated protected initialization method. In the future (next major release) the idea is to leave the queue entirely managed by the executor service. - refinements and clarifications to the javadoc --- .../org/eclipse/rdf4j/federated/FedX.java | 5 +--- .../concurrent/ControlledWorkerScheduler.java | 23 ++++++++++++++----- .../concurrent/SchedulerFactory.java | 16 ++++++++++--- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java index f8f7d226607..7edb14df9e5 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/FedX.java @@ -66,7 +66,7 @@ public class FedX extends AbstractSail implements RepositoryResolverClient { private FederationEvaluationStrategyFactory strategyFactory; - private SchedulerFactory schedulerFactory; + private SchedulerFactory schedulerFactory = DefaultSchedulerFactory.INSTANCE; private WriteStrategyFactory writeStrategyFactory; @@ -101,9 +101,6 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory } /* package */ SchedulerFactory getSchedulerFactory() { - if (schedulerFactory == null) { - schedulerFactory = DefaultSchedulerFactory.INSTANCE; - } return schedulerFactory; } diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java index da2ef0d334a..28917c13218 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java @@ -43,8 +43,9 @@ public class ControlledWorkerScheduler implements Scheduler, TaskWrapperAw private final ExecutorService executor; - // Note: initialized in #createExecutorService - protected BlockingQueue _taskQueue; + // TODO: in the next major version of RDF4J this final field should be removed. + // Initialization of the executor service should managed the details + private final BlockingQueue _taskQueue; private final int nWorkers; private final String name; @@ -59,6 +60,7 @@ public class ControlledWorkerScheduler implements Scheduler, TaskWrapperAw public ControlledWorkerScheduler(int nWorkers, String name) { this.nWorkers = nWorkers; this.name = name; + this._taskQueue = createBlockingQueue(); this.executor = createExecutorService(nWorkers, name); } @@ -114,24 +116,33 @@ public int getTotalNumberOfWorkers() { return nWorkers; } - @Deprecated(forRemoval = true) // currently unused and this class is internal + @Deprecated(forRemoval = true, since = "5.1") // currently unused and this class is internal public int getNumberOfTasks() { return _taskQueue.size(); } + /** + * Create the {@link BlockingQueue} used for the thread pool. The default implementation creates a + * {@link LinkedBlockingQueue}. + * + * @return + */ + protected BlockingQueue createBlockingQueue() { + return new LinkedBlockingQueue<>(); + } + /** * Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The * default implementation creates a thread pool with a {@link LinkedBlockingQueue}. * + * The thread pool should be configured to terminate idle threads after a period of time (default: 60s) + * * @param nWorkers the number of workers in the thread pool * @param name the base name for threads in the pool * @return */ protected ExecutorService createExecutorService(int nWorkers, String name) { - // use a LinkedBlockingQueue by default - this._taskQueue = new LinkedBlockingQueue<>(); - ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue, new NamingThreadFactory(name)); executor.allowCoreThreadTimeOut(true); diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java index 0e53570d4ab..50c004b35cc 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/SchedulerFactory.java @@ -11,6 +11,10 @@ package org.eclipse.rdf4j.federated.evaluation.concurrent; import org.eclipse.rdf4j.federated.FederationContext; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin; +import org.eclipse.rdf4j.federated.evaluation.join.ParallelBindLeftJoinTask; +import org.eclipse.rdf4j.federated.evaluation.join.ParallelBoundJoinTask; import org.eclipse.rdf4j.query.BindingSet; /** @@ -22,16 +26,19 @@ public interface SchedulerFactory { /** - * Create a {@link ControlledWorkerScheduler} for joins + * Create a {@link ControlledWorkerScheduler} for regular joins (e.g., the sub-queries generated as part of bind + * joins) * * @param federationContext * @param nWorkers * @return + * @see ControlledWorkerBindJoin + * @see ParallelBoundJoinTask */ ControlledWorkerScheduler createJoinScheduler(FederationContext federationContext, int nWorkers); /** - * Create a {@link ControlledWorkerScheduler} for unions + * Create a {@link ControlledWorkerScheduler} for unions (e.g., for executing UNION operands in parallel) * * @param federationContext * @param nWorkers @@ -40,11 +47,14 @@ public interface SchedulerFactory { ControlledWorkerScheduler createUnionScheduler(FederationContext federationContext, int nWorkers); /** - * Create a {@link ControlledWorkerScheduler} for left joins + * Create a {@link ControlledWorkerScheduler} for left joins (e.g., the sub-queries generated as part of left bind + * joins, i.e. OPTIONAL) * * @param federationContext * @param nWorkers * @return + * @see ControlledWorkerBindLeftJoin + * @see ParallelBindLeftJoinTask */ ControlledWorkerScheduler createLeftJoinScheduler(FederationContext federationContext, int nWorkers); } From 721b3b6aba7c97b4153d973772611cd11bb15f66 Mon Sep 17 00:00:00 2001 From: Andreas Schwarte Date: Wed, 13 Nov 2024 08:41:23 +0100 Subject: [PATCH 3/3] GH-5197: mark extension points as experimental --- .../evaluation/concurrent/ControlledWorkerScheduler.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java index 28917c13218..ea57e5192df 100644 --- a/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java +++ b/tools/federation/src/main/java/org/eclipse/rdf4j/federated/evaluation/concurrent/ControlledWorkerScheduler.java @@ -18,6 +18,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.eclipse.rdf4j.common.annotation.Experimental; import org.eclipse.rdf4j.common.iteration.CloseableIteration; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin; import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin; @@ -127,6 +128,7 @@ public int getNumberOfTasks() { * * @return */ + @Experimental protected BlockingQueue createBlockingQueue() { return new LinkedBlockingQueue<>(); } @@ -141,6 +143,7 @@ protected BlockingQueue createBlockingQueue() { * @param name the base name for threads in the pool * @return */ + @Experimental protected ExecutorService createExecutorService(int nWorkers, String name) { ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue,