-
Notifications
You must be signed in to change notification settings - Fork 165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-5197: preparation for supporting fair sub-query execution in FedX #5198
Conversation
This change adds preparational infrastructure for having different implementations of schedulers. Configuration is here prepared by means of defining a "SchedulerFactory" interface with a default implementation aside (which essentially mimics the current behavior). Note that for ease of development some aspects of ControlledWorkerScheduler are made accessible to sub-classes. The idea is that in the end version there is an abstract scheduler class providing shared functionality and different implementation (e.g. the current FIFO one and a fair implementation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Nice and concise, allows some manual control over creation (and hence operation) of thread pools for performing various tasks.
@@ -42,7 +43,8 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw | |||
|
|||
private final ExecutorService executor; | |||
|
|||
private final LinkedBlockingQueue<Runnable> _taskQueue = new LinkedBlockingQueue<>(); | |||
// Note: initialized in #createExecutorService | |||
protected BlockingQueue<Runnable> _taskQueue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, this is detected as API incompatibility.
Nov 11, 2024 1:43:39 PM japicmp.output.incompatible.IncompatibleErrorOutput warn
WARNING: Incompatibility detected: Requires semantic version level MAJOR: JApiField [changeStatus=MODIFIED, jApiClass=JApiClass [fullyQualifiedName=org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler, changeStatus=MODIFIED, compatibilityChanges=[]], oldFieldOptional=org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler._taskQueue:Ljava/util/concurrent/LinkedBlockingQueue;, newFieldOptional=org.eclipse.rdf4j.federated.evaluation.concurrent.ControlledWorkerScheduler._taskQueue:Ljava/util/concurrent/BlockingQueue;, annotations=[], accessModifier=japicmp.model.JApiModifier@7f5a7e9c, staticModifier=japicmp.model.JApiModifier@193c2d13, finalModifier=japicmp.model.JApiModifier@47c5d598, transientModifier=japicmp.model.JApiModifier@6dbebdeb, syntheticModifier=japicmp.model.JApiModifier@67c7088e, syntheticAttribute=japicmp.model.JApiAttribute@755aaf09, compatibilityChanges=[FIELD_TYPE_CHANGED], type=japicmp.model.JApiType@5ae9b65a]
I will rewrite it tomorrow morning with a getter in the constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compatibilityChanges=[FIELD_TYPE_CHANGED]
It was a private field, right? JAPICMP seems a bit aggressive here. We don't guarantee binary compatibility for minor releases, so JAPICMP may be configured incorrectly. You can take a look at the config if you want and see if you can make it work for minor releases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it is possible to add this override rule to the main pom.xml
I would not do so however in this change, I am not sure what else is included in FIELD_TYPE_CHANGED other than removing the "final" modifier
<overrideCompatibilityChangeParameter>
<compatibilityChange>FIELD_TYPE_CHANGED</compatibilityChange>
<binaryCompatible>true</binaryCompatible>
<sourceCompatible>true</sourceCompatible>
<semanticVersionLevel>MINOR</semanticVersionLevel>
</overrideCompatibilityChangeParameter>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's changed from LinkedBlockingQueue to BlockingQueue. Which I assume isn't binary compatible, but should be source compatible since the field was private.
Do you get the same issue if you leave it as private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I think that would work, but I haven't tested. I think the new solution is also OK and the implementation will be refactored anyways once there are different implementation for schedulers. Would leave it as in the current PR form now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice improvement. I've added some comments, but this is your code so I would think that you know better than me :)
I have a more general question that isn't really about this PR. How do we make sure that all the various thread pools (executors) and shutdown in a timely manner? Are they just for the duration of the query, or are they for the duration of the federation sail?
@Override | ||
public ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext, | ||
int nWorkers) { | ||
return new ControlledWorkerScheduler<>(nWorkers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this where you're going to return other schedulers in the future?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes exactly, either in this default factory or in a different factory
* @param name the base name for threads in the pool | ||
* @return | ||
*/ | ||
protected ExecutorService createExecutorService(int nWorkers, String name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you intend to override this with your own implementation in your code base, or is it something that you're going to override somewhere in RDF4J?
I could see that it would be useful to specify a different queue, like the PriorityBlockingQueue, to optimise the order of the operations. Could it make sense to have the BlockingQueue be a parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the end we want to have the priority / fair based implementation directly in RDF4J.
This partial "extension point" is mostly to allow us to work asychronously of the RDF4J release, i.e. we do implementation and testing as extension in our code-base according to our sprint planning, and once ready contribute the implementation back to RDF4J (tentatively for RDF4J 5.2)
This approach makes it much easier for testing. We also developed the left bind join implementation using this approach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can mark it as experimental so that you're able to modify it later on if needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion 👍 , this gives us a bit more freedom. Have added the annotation.
@@ -112,13 +114,25 @@ public int getTotalNumberOfWorkers() { | |||
return nWorkers; | |||
} | |||
|
|||
@Deprecated(forRemoval = true) // currently unused and this class is internal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add a version number to your deprecation annotations so that we know when they were added? When we are deciding if we should remove something in a major release, it’s nice to know how long it's been since something was deprecated so we only remove stuff that users have had a time to migrate away from.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -42,7 +43,8 @@ public class ControlledWorkerScheduler<T> implements Scheduler<T>, TaskWrapperAw | |||
|
|||
private final ExecutorService executor; | |||
|
|||
private final LinkedBlockingQueue<Runnable> _taskQueue = new LinkedBlockingQueue<>(); | |||
// Note: initialized in #createExecutorService | |||
protected BlockingQueue<Runnable> _taskQueue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compatibilityChanges=[FIELD_TYPE_CHANGED]
It was a private field, right? JAPICMP seems a bit aggressive here. We don't guarantee binary compatibility for minor releases, so JAPICMP may be configured incorrectly. You can take a look at the config if you want and see if you can make it work for minor releases.
* @param nWorkers | ||
* @return | ||
*/ | ||
ControlledWorkerScheduler<BindingSet> createJoinScheduler(FederationContext federationContext, int nWorkers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that this is for inner join. Could you add something to the javadocs explaining what it's used for? Eg. Mostly for joining statement patterns in the query.
Similarly for the left join that it's typically for handling OPTIONAL clauses in the query.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestions, added some details
Optional<TaskWrapper> taskWrapper = federationContext.getConfig().getTaskWrapper(); | ||
if (joinScheduler != null) { | ||
joinScheduler.abort(); | ||
} | ||
joinScheduler = new ControlledWorkerScheduler<>(federationContext.getConfig().getJoinWorkerThreads(), | ||
"Join Scheduler"); | ||
joinScheduler = schedulerFactory.createJoinScheduler(federationContext, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it's in the reset method. Could we have an assert to check that the existing joinScheduler is either null or shutdown?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The line above (see 126, joinScheduler.abort();
makes sure to terminate the thread pool, if it is active. Shouldn't this avoid the assertion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. Must have overlooked that. Will the abort always wait until all threads are stopped?
@@ -96,6 +100,22 @@ public void setFederationEvaluationStrategy(FederationEvaluationStrategyFactory | |||
this.strategyFactory = strategyFactory; | |||
} | |||
|
|||
/* package */ SchedulerFactory getSchedulerFactory() { | |||
if (schedulerFactory == null) { | |||
schedulerFactory = DefaultSchedulerFactory.INSTANCE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need to be a null check, or could we just set the scheduler factory to the default one where we declare the field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also fine to set to the default directly. Will change it
The thread pools are alive for the life-time of the repository / sail (i.e. #init / #shutdown). Please note that the thread pools themselves are configured to terminate idle threads after 60s. So if there is no load, there are also no threads dangling around. |
- for minor version compatibility the type of the "_taskQueue" field in the scheduler cannot be changed (to non-final). Hence, for now we use a dedicated protected initialization method. In the future (next major release) the idea is to leave the queue entirely managed by the executor service. - refinements and clarifications to the javadoc
I think this PR should be good to go now. All comments above are addressed and we have a good basis for the next development steps |
@hmottestad any further comments (or time for review) from your side? Otherwise I suggest to merge this one |
GitHub issue resolved: #5197
This change adds preparational infrastructure for having different
implementations of schedulers. Configuration is here prepared by means
of defining a "SchedulerFactory" interface with a default implementation
aside (which essentially mimics the current behavior).
Note that for ease of development some aspects of
ControlledWorkerScheduler are made accessible to sub-classes. The idea
is that in the end version there is an abstract scheduler class
providing shared functionality and different implementation (e.g. the
current FIFO one and a fair implementation)
Note: I would like to see this preparation merged in RDF4J 5.1 such that we can prepare (and test) an implementation of a fair scheduler asynchronously already. The final target version for the implementation is RDF4J 5.2
PR Author Checklist (see the contributor guidelines for more details):
mvn process-resources
to format from the command line)