Skip to content

Commit

Permalink
GH-5197: preparation for supporting fair sub-query execution in FedX (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad authored Nov 16, 2024
2 parents 1339109 + 721b3b6 commit f687e85
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.eclipse.rdf4j.federated.endpoint.Endpoint;
import org.eclipse.rdf4j.federated.endpoint.ResolvableEndpoint;
import org.eclipse.rdf4j.federated.evaluation.FederationEvaluationStrategyFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.DefaultSchedulerFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory;
import org.eclipse.rdf4j.federated.exception.ExceptionUtil;
import org.eclipse.rdf4j.federated.exception.FedXException;
import org.eclipse.rdf4j.federated.exception.FedXRuntimeException;
Expand Down Expand Up @@ -64,6 +66,8 @@ public class FedX extends AbstractSail implements RepositoryResolverClient {

private FederationEvaluationStrategyFactory strategyFactory;

private SchedulerFactory schedulerFactory = DefaultSchedulerFactory.INSTANCE;

private WriteStrategyFactory writeStrategyFactory;

private File dataDir;
Expand Down Expand Up @@ -96,6 +100,19 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory
this.strategyFactory = strategyFactory;
}

/* package */ SchedulerFactory getSchedulerFactory() {
return schedulerFactory;
}

/**
* Set the {@link SchedulerFactory}. Can only be done before initialization of the federation
*
* @param schedulerFactory the {@link SchedulerFactory}
*/
public void setSchedulerFactory(SchedulerFactory schedulerFactory) {
this.schedulerFactory = schedulerFactory;
}

/**
*
* @param writeStrategyFactory the {@link WriteStrategyFactory}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.NamingThreadFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.Scheduler;
import org.eclipse.rdf4j.federated.evaluation.concurrent.SchedulerFactory;
import org.eclipse.rdf4j.federated.evaluation.concurrent.TaskWrapper;
import org.eclipse.rdf4j.federated.evaluation.union.ControlledWorkerUnion;
import org.eclipse.rdf4j.federated.evaluation.union.SynchronousWorkerUnion;
Expand Down Expand Up @@ -118,26 +119,28 @@ public void reset() {
log.debug("Scheduler for join and union are reset.");
}

SchedulerFactory schedulerFactory = federation.getSchedulerFactory();

Optional<TaskWrapper> taskWrapper = federationContext.getConfig().getTaskWrapper();
if (joinScheduler != null) {
joinScheduler.abort();
}
joinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getJoinWorkerThreads(),
"Join Scheduler");
joinScheduler = schedulerFactory.createJoinScheduler(federationContext,
federationContext.getConfig().getJoinWorkerThreads());
taskWrapper.ifPresent(joinScheduler::setTaskWrapper);

if (unionScheduler != null) {
unionScheduler.abort();
}
unionScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getUnionWorkerThreads(),
"Union Scheduler");
unionScheduler = schedulerFactory.createUnionScheduler(federationContext,
federationContext.getConfig().getUnionWorkerThreads());
taskWrapper.ifPresent(unionScheduler::setTaskWrapper);

if (leftJoinScheduler != null) {
leftJoinScheduler.abort();
}
leftJoinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getLeftJoinWorkerThreads(),
"Left Join Scheduler");
leftJoinScheduler = schedulerFactory.createLeftJoinScheduler(federationContext,
federationContext.getConfig().getLeftJoinWorkerThreads());
taskWrapper.ifPresent(leftJoinScheduler::setTaskWrapper);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.eclipse.rdf4j.common.annotation.Experimental;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerJoin;
Expand All @@ -42,7 +44,9 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw

private final ExecutorService executor;

private final LinkedBlockingQueue<Runnable> _taskQueue = new LinkedBlockingQueue<>();
// TODO: in the next major version of RDF4J this final field should be removed.
// Initialization of the executor service should managed the details
private final BlockingQueue<Runnable> _taskQueue;

private final int nWorkers;
private final String name;
Expand All @@ -57,7 +61,8 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw
public ControlledWorkerScheduler(int nWorkers, String name) {
this.nWorkers = nWorkers;
this.name = name;
this.executor = createExecutorService();
this._taskQueue = createBlockingQueue();
this.executor = createExecutorService(nWorkers, name);
}

/**
Expand Down Expand Up @@ -112,13 +117,36 @@ public int getTotalNumberOfWorkers() {
return nWorkers;
}

@Deprecated(forRemoval = true, since = "5.1") // currently unused and this class is internal
public int getNumberOfTasks() {
return _taskQueue.size();
}

private ExecutorService createExecutorService() {
/**
* Create the {@link BlockingQueue} used for the thread pool. The default implementation creates a
* {@link LinkedBlockingQueue}.
*
* @return
*/
@Experimental
protected BlockingQueue<Runnable> createBlockingQueue() {
return new LinkedBlockingQueue<>();
}

/**
* Create the {@link ExecutorService} which is managing the individual {@link ParallelTask}s in a thread pool. The
* default implementation creates a thread pool with a {@link LinkedBlockingQueue}.
*
* The thread pool should be configured to terminate idle threads after a period of time (default: 60s)
*
* @param nWorkers the number of workers in the thread pool
* @param name the base name for threads in the pool
* @return
*/
@Experimental
protected ExecutorService createExecutorService(int nWorkers, String name) {

ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, _taskQueue,
ThreadPoolExecutor executor = new ThreadPoolExecutor(nWorkers, nWorkers, 60L, TimeUnit.SECONDS, this._taskQueue,
new NamingThreadFactory(name));
executor.allowCoreThreadTimeOut(true);
return executor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import org.eclipse.rdf4j.federated.FederationContext;
import org.eclipse.rdf4j.query.BindingSet;

/**
* The default {@link SchedulerFactory}
*/
public class DefaultSchedulerFactory implements SchedulerFactory {

public static final DefaultSchedulerFactory INSTANCE = new DefaultSchedulerFactory();

@Override
public ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext,
int nWorkers) {
return new ControlledWorkerScheduler<>(nWorkers,
"Join Scheduler");
}

@Override
public ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext,
int nWorkers) {
return new ControlledWorkerScheduler<>(nWorkers,
"Union Scheduler");
}

@Override
public ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext,
int nWorkers) {
return new ControlledWorkerScheduler<>(nWorkers,
"Left Join Scheduler");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*******************************************************************************
* Copyright (c) 2024 Eclipse RDF4J contributors.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Distribution License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: BSD-3-Clause
*******************************************************************************/
package org.eclipse.rdf4j.federated.evaluation.concurrent;

import org.eclipse.rdf4j.federated.FederationContext;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ControlledWorkerBindLeftJoin;
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBindLeftJoinTask;
import org.eclipse.rdf4j.federated.evaluation.join.ParallelBoundJoinTask;
import org.eclipse.rdf4j.query.BindingSet;

/**
* Factory for creating {@link ControlledWorkerScheduler} for executing subqueries (e.g. joins) in the background
*
* @see DefaultSchedulerFactory
* @author Andreas Schwarte
*/
public interface SchedulerFactory {

/**
* Create a {@link ControlledWorkerScheduler} for regular joins (e.g., the sub-queries generated as part of bind
* joins)
*
* @param federationContext
* @param nWorkers
* @return
* @see ControlledWorkerBindJoin
* @see ParallelBoundJoinTask
*/
ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext, int nWorkers);

/**
* Create a {@link ControlledWorkerScheduler} for unions (e.g., for executing UNION operands in parallel)
*
* @param federationContext
* @param nWorkers
* @return
*/
ControlledWorkerScheduler<BindingSet> createUnionScheduler(FederationContext federationContext, int nWorkers);

/**
* Create a {@link ControlledWorkerScheduler} for left joins (e.g., the sub-queries generated as part of left bind
* joins, i.e. OPTIONAL)
*
* @param federationContext
* @param nWorkers
* @return
* @see ControlledWorkerBindLeftJoin
* @see ParallelBindLeftJoinTask
*/
ControlledWorkerScheduler<BindingSet> createLeftJoinScheduler(FederationContext federationContext, int nWorkers);
}

0 comments on commit f687e85

Please sign in to comment.