Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improves documentation for VirtualThreads boundedElastic behavior #3635

Merged
merged 12 commits into from
Nov 14, 2023
20 changes: 15 additions & 5 deletions docs/asciidoc/coreFeatures.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,23 @@ dedicated thread, use `Schedulers.newSingle()` for each call.
* An unbounded elastic thread pool (`Schedulers.elastic()`). This one is no longer preferred
with the introduction of `Schedulers.boundedElastic()`, as it has a tendency to hide backpressure
problems and lead to too many threads (see below).
* A bounded elastic thread pool (`Schedulers.boundedElastic()`). Like its predecessor `elastic()`, it
creates new worker pools as needed and reuses idle ones. Worker pools that stay idle for too long (the default is 60s) are
* A bounded elastic thread pool (`Schedulers.boundedElastic()`). This is a handy way to
give a blocking process its own thread so that it does not tie up other resources. This is a better choice for I/O blocking work. See
<<faq.wrap-blocking>>, but doesn't pressure the system too much with new threads.
Starting from 3.6.0 this can offer two different implementations depending on the setup:
- `ExecutorService`-based, which reuses Platform `Thread`s between tasks. This
implementation is like its predecessor `elastic()` creates new worker pools as needed
and reuses idle ones. Worker pools that stay idle for too long (the default is 60s) are
also disposed. Unlike its `elastic()` predecessor, it has a cap on the number of backing threads it can create (default is number of CPU cores x 10).
Up to 100 000 tasks submitted after the cap has been reached are enqueued and will be re-scheduled when a thread becomes available
(when scheduling with a delay, the delay starts when the thread becomes available). This is a better choice for I/O blocking work.
`Schedulers.boundedElastic()` is a handy way to give a blocking process its own thread so that
it does not tie up other resources. See <<faq.wrap-blocking>>, but doesn't pressure the system too much with new threads.
(when scheduling with a delay, the delay starts when the thread becomes available).
- Thread-per-task-based, designed to run on `VirtualThread` instances.
To embrace that functionality, the application should run in Java 21 environment and set the `reactor.schedulers.defaultBoundedElasticOnVirtualThreads` system property to `true`.
OlegDokuka marked this conversation as resolved.
Show resolved Hide resolved
Once the above is set, the shared `Schedulers.boundedElastic()` return a specific implementation
of `BoundedElasticScheduler` tailored to run every task on a new instance of the
`VirtualThread` class. This implementation is similar in terms of the behavior to the
`ExecutorService`-based one but does not have idle pool and creates a new `VirtualThread`
for each task.
* A fixed pool of workers that is tuned for parallel work (`Schedulers.parallel()`). It
creates as many workers as you have CPU cores.

Expand Down
2 changes: 1 addition & 1 deletion docs/asciidoc/faq.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ blockingWrapper = blockingWrapper.subscribeOn(Schedulers.boundedElastic()); <3>
----
<1> Create a new `Mono` by using `fromCallable`.
<2> Return the asynchronous, blocking resource.
<3> Ensure each subscription happens on a dedicated single-threaded worker
<3> Ensure each subscription happens on a dedicated worker
from `Schedulers.boundedElastic()`.
====

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@
import static reactor.core.scheduler.Schedulers.newBoundedElastic;

/**
* JDK 8 Specific implementation of BoundedElasticScheduler supplier which warns when
* one enables virtual thread support
* JDK 8 Specific implementation of BoundedElasticScheduler supplier, which warns when
* one enables virtual thread support. An alternative variant is available for use on JDK 21+
* where virtual threads are supported.
*/
class BoundedElasticSchedulerSupplier implements Supplier<Scheduler> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,16 @@
import reactor.core.Disposable;
import reactor.core.Scannable;

final class ThreadPerTaskBoundedElasticScheduler
implements Scheduler, SchedulerState.DisposeAwaiter<ThreadPerTaskBoundedElasticScheduler.BoundedServices>, Scannable {
/**
* This {@link BoundedElasticThreadPerTaskScheduler} variant is included when Reactor is
* used with JDK versions lower than 21, and all methods raise an
* {@link UnsupportedOperationException}. An alternative variant is available for use on
* JDK 21+ where virtual threads are supported.
*/
final class BoundedElasticThreadPerTaskScheduler
implements Scheduler, SchedulerState.DisposeAwaiter<BoundedElasticThreadPerTaskScheduler.BoundedServices>, Scannable {

ThreadPerTaskBoundedElasticScheduler(int maxThreads, int maxTaskQueuedPerThread, ThreadFactory factory) {
BoundedElasticThreadPerTaskScheduler(int maxThreads, int maxTaskQueuedPerThread, ThreadFactory factory) {
throw new UnsupportedOperationException("Unsupported in JDK lower thank 21");
}

Expand Down Expand Up @@ -55,6 +61,6 @@ private BoundedServices() {

}

BoundedServices(ThreadPerTaskBoundedElasticScheduler parent) {}
BoundedServices(BoundedElasticThreadPerTaskScheduler parent) {}
}
}
149 changes: 124 additions & 25 deletions reactor-core/src/main/java/reactor/core/scheduler/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@
* Factories prefixed with {@code new} (eg. {@link #newBoundedElastic(int, int, String)} return a new instance of their flavor of {@link Scheduler},
* while other factories like {@link #boundedElastic()} return a shared instance - which is the one used by operators requiring that flavor as their default Scheduler.
* All instances are returned in a {@link Scheduler#init() initialized} state.
* <p>
* Since 3.6.0 {@link #boundedElastic()} can run tasks on {@link VirtualThread}s if the application
* runs on a Java 21 runtime and the {@link #DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADS}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revisit all the places where we mention JDK 21 and consider if 21+ is more appropriate

* system property is set to {@code true}.
*
* @author Stephane Maldini
*/
Expand Down Expand Up @@ -187,26 +191,69 @@ public static Scheduler fromExecutorService(ExecutorService executorService, Str
}

/**
* The common <em>boundedElastic</em> instance, a {@link Scheduler} that dynamically creates a bounded number of
* ExecutorService-based Workers, reusing them once the Workers have been shut down. The underlying daemon
* threads can be evicted if idle for more than {@link BoundedElasticScheduler#DEFAULT_TTL_SECONDS 60} seconds.
* The common <em>boundedElastic</em> instance, a {@link Scheduler} that
* dynamically creates a bounded number of workers.
* <p>
* Depends on the available environment and specified configurations, there are two types
* of implementations for this shared scheduler:
* <ul>
*
* <li> ExecutorService-based implementation tailored to run on Platform {@link Thread}
* instances. Every Worker is {@link ExecutorService}-based. Reusing {@link Thread}s
* once the Workers have been shut down. The underlying daemon threads can be
* evicted if idle for more than
* {@link BoundedElasticScheduler#DEFAULT_TTL_SECONDS 60} seconds.
* </li>
*
* <li> As of 3.6.0 there is a thread-per-task implementation tailored for use
* with virtual threads. This implementation is enabled if the
* application runs on a JDK 21+ runtime and the system property
* {@link #DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADS} is set to
* {@code true}. Every Worker is based on the custom implementation of the execution
* mechanism which ensures every submitted task runs on a new
* {@link VirtualThread} instance. This implementation has a shared instance of
* {@link ScheduledExecutorService} used to schedule delayed and periodic tasks
* such that when triggered they are offloaded to a dedicated new
* {@link VirtualThread} instance.
* </li>
*
* </ul>
*
* <p>
* The maximum number of created threads is bounded by a {@code cap} (by default
* Both implementations share the same configurations:
* <ul>
* <li>
* The maximum number of concurrent
* threads is bounded by a {@code cap} (by default
* ten times the number of available CPU cores, see {@link #DEFAULT_BOUNDED_ELASTIC_SIZE}).
* <p>
* <b> Note: </b> Consider increasing {@link #DEFAULT_BOUNDED_ELASTIC_SIZE} with the
* thread-per-task implementation to run more concurrent {@link VirtualThread}
* instances underneath.
* </li>
* <li>
* The maximum number of task submissions that can be enqueued and deferred on each of these
* backing threads is bounded (by default 100K additional tasks, see
* {@link #DEFAULT_BOUNDED_ELASTIC_QUEUESIZE}). Past that point, a {@link RejectedExecutionException}
* is thrown.
* </li>
* </ul>
*
* <p>
* By order of preference, threads backing a new {@link reactor.core.scheduler.Scheduler.Worker} are
* picked from the idle pool, created anew or reused from the busy pool. In the later case, a best effort
* attempt at picking the thread backing the least amount of workers is made.
* Threads backing a new {@link reactor.core.scheduler.Scheduler.Worker} are
* picked from a pool or are created when needed. In the ExecutorService-based
* implementation, the pool is comprised either of idle or busy threads. When all
* threads are busy, a best effort attempt is made at picking the thread backing
* the least number of workers. In the case of the thread-per-task implementation, it
* always creates new threads up to the specified limit.
* <p>
* Note that if a thread is backing a low amount of workers, but these workers submit a lot of pending tasks,
* a second worker could end up being backed by the same thread and see tasks rejected.
* The picking of the backing thread is also done once and for all at worker creation, so
* tasks could be delayed due to two workers sharing the same backing thread and submitting long-running tasks,
* despite another backing thread becoming idle in the meantime.
* Note that if a scheduling mechanism is backing a low amount of workers, but these
* workers submit a lot of pending tasks, a second worker could end up being
* backed by the same mechanism and see tasks rejected.
* The picking of the backing mechanism is also done once and for all at worker
* creation, so tasks could be delayed due to two workers sharing the same backing
* mechanism and submitting long-running tasks, despite another backing mechanism
* becoming idle in the meantime.
* <p>
* Only one instance of this common scheduler will be created on the first call and is cached. The same instance
* is returned on subsequent calls until it is disposed.
Expand All @@ -215,9 +262,12 @@ public static Scheduler fromExecutorService(ExecutorService executorService, Str
* between callers. They can however be all {@link #shutdownNow() shut down} together, or replaced by a
* {@link #setFactory(Factory) change in Factory}.
*
* @return the common <em>boundedElastic</em> instance, a {@link Scheduler} that dynamically creates workers with
* an upper bound to the number of backing threads and after that on the number of enqueued tasks, that reuses
* threads and evict idle ones
* <p>
OlegDokuka marked this conversation as resolved.
Show resolved Hide resolved
*
* @return the ExecutorService/thread-per-task-based <em>boundedElastic</em>
* instance.
* A {@link Scheduler} that dynamically creates workers with an upper
* bound to the number of backing threads and after that on the number of enqueued tasks.
*/
public static Scheduler boundedElastic() {
return cache(CACHED_BOUNDED_ELASTIC, BOUNDED_ELASTIC, BOUNDED_ELASTIC_SUPPLIER);
Expand Down Expand Up @@ -279,6 +329,12 @@ public static Scheduler immediate() {
* from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole
* scheduler has been {@link Scheduler#dispose() disposed}.
*
* <p>
* Please note, this implementation is not designed to run tasks on
* {@link VirtualThread}. Please see
* {@link Factory#newBoundedElasticThreadPerTask(int, int, ThreadFactory)} if you need
* {@link VirtualThread} compatible scheduler implementation
*
* @param threadCap maximum number of underlying threads to create
* @param queuedTaskCap maximum number of tasks to enqueue when no more threads can be created. Can be {@link Integer#MAX_VALUE} for unbounded enqueueing.
* @param name Thread prefix
Expand Down Expand Up @@ -314,6 +370,12 @@ public static Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, Stri
* from exiting until their worker has been disposed AND they've been evicted by TTL, or the whole
* scheduler has been {@link Scheduler#dispose() disposed}.
*
* <p>
* Please note, this implementation is not designed to run tasks on
* {@link VirtualThread}. Please see
* {@link Factory#newBoundedElasticThreadPerTask(int, int, ThreadFactory)} if you need
* {@link VirtualThread} compatible scheduler implementation
*
* @param threadCap maximum number of underlying threads to create
* @param queuedTaskCap maximum number of tasks to enqueue when no more threads can be created. Can be {@link Integer#MAX_VALUE} for unbounded enqueueing.
* @param name Thread prefix
Expand Down Expand Up @@ -351,6 +413,12 @@ public static Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, Stri
* worker has been disposed AND they've been evicted by TTL, or the whole scheduler has been
* {@link Scheduler#dispose() disposed}.
*
* <p>
* Please note, this implementation is not designed to run tasks on
* {@link VirtualThread}. Please see
* {@link Factory#newBoundedElasticThreadPerTask(int, int, ThreadFactory)} if you need
* {@link VirtualThread} compatible scheduler implementation
*
* @param threadCap maximum number of underlying threads to create
* @param queuedTaskCap maximum number of tasks to enqueue when no more threads can be created. Can be {@link Integer#MAX_VALUE} for unbounded enqueueing.
* @param name Thread prefix
Expand Down Expand Up @@ -392,6 +460,12 @@ public static Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, Stri
* will prevent the JVM from exiting until their worker has been disposed AND they've been evicted by TTL,
* or the whole scheduler has been {@link Scheduler#dispose() disposed}.
*
* <p>
OlegDokuka marked this conversation as resolved.
Show resolved Hide resolved
* Please note, this implementation is not designed to run tasks on
* {@link VirtualThread}. Please see
* {@link Factory#newBoundedElasticThreadPerTask(int, int, ThreadFactory)} if you need
* {@link VirtualThread} compatible scheduler implementation
*
* @param threadCap maximum number of underlying threads to create
* @param queuedTaskCap maximum number of tasks to enqueue when no more threads can be created. Can be {@link Integer#MAX_VALUE} for unbounded enqueueing.
* @param threadFactory a {@link ThreadFactory} to use each thread initialization
Expand All @@ -409,14 +483,6 @@ public static Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, Thre
return fromFactory;
}

static Scheduler newThreadPerTaskBoundedElastic(int threadCap, int queuedTaskCap, ThreadFactory threadFactory) {
Scheduler fromFactory = factory.newThreadPerTaskBoundedElastic(threadCap,
queuedTaskCap,
threadFactory);
fromFactory.init();
return fromFactory;
}

/**
* {@link Scheduler} that hosts a fixed pool of single-threaded ExecutorService-based
* workers and is suited for parallel work. This type of {@link Scheduler} detects and
Expand Down Expand Up @@ -1014,7 +1080,7 @@ default Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, ThreadFact
* The maximum number of created thread pools is bounded by the provided {@code threadCap}.
* <p>
* The main difference between {@link BoundedElasticScheduler} and
* {@link ThreadPerTaskBoundedElasticScheduler} is that underlying machinery
* {@link BoundedElasticThreadPerTaskScheduler} is that underlying machinery
* allocates a new thread for every new task which is one of the requirements
* for usage with {@link VirtualThread}s
* <p>
Expand All @@ -1024,11 +1090,44 @@ default Scheduler newBoundedElastic(int threadCap, int queuedTaskCap, ThreadFact
* @param queuedTaskCap maximum number of tasks to enqueue when no more threads can be created. Can be {@link Integer#MAX_VALUE} for unbounded enqueueing.
* @param threadFactory a {@link ThreadFactory} to use each thread initialization
*
* @since 3.6.0
*
* @deprecated in favor of
* {@link #newBoundedElasticThreadPerTask(int, int, ThreadFactory)}.
* Should be safely removed in 3.8.0
*
* @return a new {@link Scheduler} that dynamically creates workers with an upper bound to
* the number of backing threads
*/
default Scheduler newThreadPerTaskBoundedElastic(int threadCap, int queuedTaskCap, ThreadFactory threadFactory) {
Copy link
Member

@chemicL chemicL Nov 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we have both threadPerTaskBoundedElastic and boundedElasticThreadPerTask... Even though the latter is more favourable, with the fact that we already released threadPerTaskBoundedElastic, I think it's just better to leave one variant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return new ThreadPerTaskBoundedElasticScheduler(threadCap, queuedTaskCap, threadFactory);
return newBoundedElasticThreadPerTask(threadCap, queuedTaskCap, threadFactory);
}



/**
* {@link Scheduler} that dynamically creates a bounded number of Workers.
* <p>
* The maximum number of created thread pools is bounded by the provided {@code threadCap}.
* <p>
* The main difference between {@link BoundedElasticScheduler} and
* {@link BoundedElasticThreadPerTaskScheduler} is that underlying machinery
* allocates a new thread for every new task which is one of the requirements
* for usage with {@link VirtualThread}s
* <p>
* <b>Note:</b> for now this scheduler is available only in Java 21 runtime
*
* @param threadCap maximum number of underlying threads to create
* @param queuedTaskCap maximum number of tasks to enqueue when no more threads can be created. Can be {@link Integer#MAX_VALUE} for unbounded enqueueing.
* @param threadFactory a {@link ThreadFactory} to use each thread initialization
*
* @since 3.6.1
*
* @return a new {@link Scheduler} that dynamically creates workers with an upper bound to
* the number of backing threads
*/
default Scheduler newBoundedElasticThreadPerTask(int threadCap, int queuedTaskCap, ThreadFactory threadFactory) {
return new BoundedElasticThreadPerTaskScheduler(threadCap, queuedTaskCap, threadFactory);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
package reactor.core.scheduler;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Supplier;

import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;

/**
* The noop {@link VirtualThread} Reactor {@link ThreadFactory} to be
* used with {@link ThreadPerTaskBoundedElasticScheduler}. It throws exceptions when is
* being created, so it indicates that current Java Runtime does not support
* {@link VirtualThread}s.
* used with {@link BoundedElasticThreadPerTaskScheduler}.
* This {@link VirtualThreadFactory} variant is included when Reactor is used with
* JDK versions lower than 21,
* and all methods raise an {@link UnsupportedOperationException}.
* An alternative variant is available for use on JDK 21+
* where virtual threads are supported.
*
* @author Oleh Dokuka
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import static reactor.core.scheduler.Schedulers.DEFAULT_BOUNDED_ELASTIC_SIZE;
import static reactor.core.scheduler.Schedulers.LOOM_BOUNDED_ELASTIC;
import static reactor.core.scheduler.Schedulers.newBoundedElastic;
import static reactor.core.scheduler.Schedulers.newThreadPerTaskBoundedElastic;
import static reactor.core.scheduler.Schedulers.factory;

/**
* JDK 8 Specific implementation of BoundedElasticScheduler supplier which uses
* JDK 21 Specific implementation of BoundedElasticScheduler supplier which uses
OlegDokuka marked this conversation as resolved.
Show resolved Hide resolved
* {@link java.lang.ThreadBuilders.VirtualThreadFactory} instead of the default
* {@link ReactorThreadFactory} when one enables virtual thread support
*/
Expand All @@ -36,7 +36,7 @@ class BoundedElasticSchedulerSupplier implements Supplier<Scheduler> {
@Override
public Scheduler get() {
if (DEFAULT_BOUNDED_ELASTIC_ON_VIRTUAL_THREADS) {
return newThreadPerTaskBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE,
return factory.newBoundedElasticThreadPerTask(DEFAULT_BOUNDED_ELASTIC_SIZE,
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
Thread.ofVirtual()
.name(LOOM_BOUNDED_ELASTIC + "-", 1)
Expand Down
Loading
Loading