From b78e373ee897a97126a98710e95cd7613b768c96 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Fri, 17 Jan 2025 20:42:44 +0800 Subject: [PATCH] feat: Add support for switching scheduler --- .../ForkJoinPoolVirtualThreadSpec.scala | 67 ++++++++++++++ .../ThreadPoolVirtualThreadSpec.scala | 64 +++++++++++++ .../VirtualThreadPoolDispatcherSpec.scala | 10 +- actor/src/main/resources/reference.conf | 12 +++ .../pekko/dispatch/AbstractDispatcher.scala | 6 +- .../apache/pekko/dispatch/Dispatchers.scala | 2 +- .../ForkJoinExecutorConfigurator.scala | 40 ++++++-- .../pekko/dispatch/ThreadPoolBuilder.scala | 70 ++++++++++++-- .../pekko/dispatch/VirtualThreadSupport.scala | 8 +- .../VirtualThreadSupportReflect.scala | 66 +++++++++++++ .../dispatch/VirtualizedExecutorService.scala | 92 +++++++++++++++++++ docs/src/main/paradox/dispatchers.md | 12 +++ docs/src/main/paradox/typed/dispatchers.md | 12 +++ project/JdkOptions.scala | 5 + project/PekkoBuild.scala | 1 + 15 files changed, 439 insertions(+), 28 deletions(-) create mode 100644 actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala create mode 100644 actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala create mode 100644 actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupportReflect.scala create mode 100644 actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala new file mode 100644 index 00000000000..c1cd553bb6b --- /dev/null +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ForkJoinPoolVirtualThreadSpec.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import com.typesafe.config.ConfigFactory + +import org.apache.pekko +import pekko.actor.{ Actor, Props } +import pekko.testkit.{ ImplicitSender, PekkoSpec } +import pekko.util.JavaVersion + +object ForkJoinPoolVirtualThreadSpec { + val config = ConfigFactory.parseString(""" + |virtual { + | task-dispatcher { + | mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox" + | throughput = 5 + | fork-join-executor { + | parallelism-factor = 2 + | parallelism-max = 2 + | parallelism-min = 2 + | virtualize = on + | } + | } + |} + """.stripMargin) + + class ThreadNameActor extends Actor { + + override def receive = { + case "ping" => + sender() ! Thread.currentThread().getName + } + } + +} + +class ForkJoinPoolVirtualThreadSpec extends PekkoSpec(ForkJoinPoolVirtualThreadSpec.config) with ImplicitSender { + import ForkJoinPoolVirtualThreadSpec._ + + "PekkoForkJoinPool" must { + + "support virtualization with Virtual Thread" in { + val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("virtual.task-dispatcher")) + for (_ <- 1 to 1000) { + actor ! "ping" + expectMsgPF() { case name: String => name should include("virtual.task-dispatcher-virtual-thread") } + } + } + + } +} diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala new file mode 100644 index 00000000000..70f39e0aa13 --- /dev/null +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/ThreadPoolVirtualThreadSpec.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import com.typesafe.config.ConfigFactory +import org.apache.pekko +import pekko.actor.{ Actor, Props } +import pekko.testkit.{ ImplicitSender, PekkoSpec } + +object ThreadPoolVirtualThreadSpec { + val config = ConfigFactory.parseString(""" + |pekko.actor.default-dispatcher.executor = "thread-pool-executor" + |virtual { + | task-dispatcher { + | mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox" + | throughput = 1 + | thread-pool-executor { + | fixed-pool-size = 2 + | virtualize = on + | } + | } + |} + """.stripMargin) + + class ThreadNameActor extends Actor { + + override def receive = { + case "ping" => + sender() ! Thread.currentThread().getName + } + } + +} + +class ThreadPoolVirtualThreadSpec extends PekkoSpec(ThreadPoolVirtualThreadSpec.config) with ImplicitSender { + import ThreadPoolVirtualThreadSpec._ + + "PekkoThreadPoolExecutor" must { + + "support virtualization with Virtual Thread" in { + val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("virtual.task-dispatcher")) + for (_ <- 1 to 1000) { + actor ! "ping" + expectMsgPF() { case name: String => name should include("virtual.task-dispatcher-virtual-thread") } + } + } + + } +} diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala index 739bf26a5e3..d39531853ca 100644 --- a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/dispatch/VirtualThreadPoolDispatcherSpec.scala @@ -34,7 +34,7 @@ object VirtualThreadPoolDispatcherSpec { override def receive = { case "ping" => - sender() ! "All fine" + sender() ! Thread.currentThread().getName } } @@ -43,14 +43,14 @@ object VirtualThreadPoolDispatcherSpec { class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender { import VirtualThreadPoolDispatcherSpec._ - val Iterations = 1000 - "VirtualThreadPool support" must { "handle simple dispatch" in { val innocentActor = system.actorOf(Props(new InnocentActor).withDispatcher("virtual-thread-dispatcher")) - innocentActor ! "ping" - expectMsg("All fine") + for (_ <- 1 to 1000) { + innocentActor ! "ping" + expectMsgPF() { case name: String => name should include("virtual-thread") } + } } } diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 6831cf056db..75d28df3e91 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -487,6 +487,12 @@ pekko { # This config is new in Pekko v1.1.0 and only has an effect if you are running with JDK 9 and above. # Read the documentation on `java.util.concurrent.ForkJoinPool` to find out more. Default in hex is 0x7fff. maximum-pool-size = 32767 + + # This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above, + # When set to `on` but underlying runtime does not support virtual threads, an Exception will throw. + # Virtualize this dispatcher as a virtual-thread-executor + # Valid values are: `on`, `off` + virtualize = off } # This will be used if you have set "executor = "thread-pool-executor"" @@ -538,6 +544,12 @@ pekko { # Allow core threads to time out allow-core-timeout = on + + # This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above, + # When set to `on` but underlying runtime does not support virtual threads, an Exception will throw. + # Virtualize this dispatcher as a virtual-thread-executor + # Valid values are: `on`, `off` + virtualize = off } # This will be used if you have set "executor = "virtual-thread-executor" diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala index f2d6aa8d56e..06c2a231303 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/AbstractDispatcher.scala @@ -444,7 +444,7 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr @unused prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = { import org.apache.pekko.util.Helpers.ConfigOps val builder = - ThreadPoolConfigBuilder(ThreadPoolConfig()) + ThreadPoolConfigBuilder(ThreadPoolConfig(virtualize = false)) .setKeepAliveTime(config.getMillisDuration("keep-alive-time")) .setAllowCoreThreadTimeout(config.getBoolean("allow-core-timeout")) .configure(Some(config.getInt("task-queue-size")).flatMap { @@ -474,6 +474,10 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr config.getInt("max-pool-size-max")) else builder.setFixedPoolSize(config.getInt("fixed-pool-size")) + + if (config.getBoolean("virtualize")) { + builder.setVirtualize(true) + } else builder } def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatchers.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatchers.scala index 06d7c2f5106..2f17b1574b4 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Dispatchers.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Dispatchers.scala @@ -390,7 +390,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer this.getClass, "PinnedDispatcher [%s] not configured to use ThreadPoolExecutor, falling back to default config.".format( config.getString("id")))) - ThreadPoolConfig() + ThreadPoolConfig(virtualize = false) } /** diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala index 661dfb8dd70..99d9294c6c5 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ForkJoinExecutorConfigurator.scala @@ -86,15 +86,28 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer } class ForkJoinExecutorServiceFactory( + val id: String, val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, val parallelism: Int, val asyncMode: Boolean, - val maxPoolSize: Int) + val maxPoolSize: Int, + val virtualize: Boolean) extends ExecutorServiceFactory { + def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, + parallelism: Int, + asyncMode: Boolean, + maxPoolSize: Int, + virtualize: Boolean) = + this(null, threadFactory, parallelism, asyncMode, maxPoolSize, virtualize) + + def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, + parallelism: Int, + asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap, false) def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int, - asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap) + asyncMode: Boolean, + maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode, maxPoolSize, false) private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] = Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption @@ -116,12 +129,19 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) = this(threadFactory, parallelism, asyncMode = true) - def createExecutorService: ExecutorService = pekkoJdk9ForkJoinPoolHandleOpt match { - case Some(handle) => - handle.invoke(parallelism, threadFactory, maxPoolSize, - MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService] - case _ => - new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode) + def createExecutorService: ExecutorService = { + val forkJoinPool = pekkoJdk9ForkJoinPoolHandleOpt match { + case Some(handle) => + handle.invoke(parallelism, threadFactory, maxPoolSize, + MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService] + case _ => + new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode) + } + if (virtualize) { + new VirtualizedExecutorService(id, forkJoinPool) + } else { + forkJoinPool + } } } @@ -143,12 +163,14 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer } new ForkJoinExecutorServiceFactory( + id, validate(tf), ThreadPoolConfig.scaledPoolSize( config.getInt("parallelism-min"), config.getDouble("parallelism-factor"), config.getInt("parallelism-max")), asyncMode, - config.getInt("maximum-pool-size")) + config.getInt("maximum-pool-size"), + config.getBoolean("virtualize")) } } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala index 205c2e4ac77..55ed0a0f33d 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/ThreadPoolBuilder.scala @@ -58,12 +58,32 @@ object ThreadPoolConfig { def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory = () => queue def reusableQueue(queueFactory: QueueFactory): QueueFactory = reusableQueue(queueFactory()) + + // TODO remove in Pekko 2.0 after change to normal class + def unapply(config: ThreadPoolConfig) + : Option[(Boolean, Int, Int, Duration, ThreadPoolConfig.QueueFactory, RejectedExecutionHandler)] = + Some((config.allowCorePoolTimeout, config.corePoolSize, config.maxPoolSize, config.threadTimeout, + config.queueFactory, config.rejectionPolicy)) + + // TODO remove in Pekko 2.0 after change to normal class + def apply(allowCorePoolTimeout: Boolean, + corePoolSize: Int, + maxPoolSize: Int, + threadTimeout: Duration, + queueFactory: ThreadPoolConfig.QueueFactory, + rejectionPolicy: RejectedExecutionHandler): ThreadPoolConfig = + ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy, + virtualize = false) } /** * Function0 without the fun stuff (mostly for the sake of the Java API side of things) */ trait ExecutorServiceFactory { + + /** + * Create a new ExecutorService + */ def createExecutorService: ExecutorService } @@ -77,14 +97,26 @@ trait ExecutorServiceFactoryProvider { /** * A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher */ +//TODO don't use case class for this in 2.0, it's not a good fit final case class ThreadPoolConfig( allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout, corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize, maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, threadTimeout: Duration = ThreadPoolConfig.defaultTimeout, queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(), - rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy) + rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy, + virtualize: Boolean) extends ExecutorServiceFactoryProvider { + // TODO remove in Pekko 2.0 after change to normal class + def this(allowCorePoolTimeout: Boolean, + corePoolSize: Int, + maxPoolSize: Int, + threadTimeout: Duration, + queueFactory: ThreadPoolConfig.QueueFactory, + rejectionPolicy: RejectedExecutionHandler) = + this(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy, + virtualize = false) + // Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra // context information on the config @noinline @@ -94,13 +126,31 @@ final case class ThreadPoolConfig( maxPoolSize: Int = maxPoolSize, threadTimeout: Duration = threadTimeout, queueFactory: ThreadPoolConfig.QueueFactory = queueFactory, - rejectionPolicy: RejectedExecutionHandler = rejectionPolicy + rejectionPolicy: RejectedExecutionHandler = rejectionPolicy, + virtualize: Boolean = virtualize + ): ThreadPoolConfig = + ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy, + virtualize) + + // TODO remove in Pekko 2.0 after change to normal class + @noinline + def copy(allowCorePoolTimeout: Boolean, + corePoolSize: Int, + maxPoolSize: Int, + threadTimeout: Duration, + queueFactory: ThreadPoolConfig.QueueFactory, + rejectionPolicy: RejectedExecutionHandler ): ThreadPoolConfig = - ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy) + ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy, + virtualize) + + class ThreadPoolExecutorServiceFactory(val id: String, + val threadFactory: ThreadFactory) extends ExecutorServiceFactory { + + def this(threadFactory: ThreadFactory) = this(null, threadFactory) - class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory { def createExecutorService: ExecutorService = { - val service: ThreadPoolExecutor = new ThreadPoolExecutor( + val executor: ThreadPoolExecutor = new ThreadPoolExecutor( corePoolSize, maxPoolSize, threadTimeout.length, @@ -110,10 +160,11 @@ final case class ThreadPoolConfig( rejectionPolicy) with LoadMetrics { def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize } - service.allowCoreThreadTimeOut(allowCorePoolTimeout) - service + executor.allowCoreThreadTimeOut(allowCorePoolTimeout) + if (virtualize) new VirtualizedExecutorService(id, executor) else executor } } + def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = { val tf = threadFactory match { case m: MonitorableThreadFactory => @@ -121,7 +172,7 @@ final case class ThreadPoolConfig( m.withName(m.name + "-" + id) case other => other } - new ThreadPoolExecutorServiceFactory(tf) + new ThreadPoolExecutorServiceFactory(id, tf) } } @@ -178,6 +229,9 @@ final case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) { def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder = this.copy(config = config.copy(queueFactory = newQueueFactory)) + def setVirtualize(virtualize: Boolean): ThreadPoolConfigBuilder = + this.copy(config = config.copy(virtualize = virtualize)) + def configure(fs: Option[Function[ThreadPoolConfigBuilder, ThreadPoolConfigBuilder]]*): ThreadPoolConfigBuilder = fs.foldLeft(this)((c, f) => f.map(_(c)).getOrElse(c)) } diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala index 3a777891559..3be78c7a733 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupport.scala @@ -17,8 +17,9 @@ package org.apache.pekko.dispatch -import org.apache.pekko.annotation.InternalApi -import org.apache.pekko.util.JavaVersion +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.util.JavaVersion import java.lang.invoke.{ MethodHandles, MethodType } import java.util.concurrent.{ ExecutorService, ThreadFactory } @@ -34,8 +35,7 @@ private[dispatch] object VirtualThreadSupport { val isSupported: Boolean = JavaVersion.majorVersion >= 21 /** - * Create a virtual thread factory with a executor, the executor will be used as the scheduler of - * virtual thread. + * Create a virtual thread factory with the default Virtual Thread executor. */ def newVirtualThreadFactory(prefix: String): ThreadFactory = { require(isSupported, "Virtual thread is not supported.") diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupportReflect.scala b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupportReflect.scala new file mode 100644 index 00000000000..ab0d2fc4b7c --- /dev/null +++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualThreadSupportReflect.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import org.apache.pekko.annotation.InternalApi + +import java.util.concurrent.{ ExecutorService, ThreadFactory } +import scala.util.control.NonFatal + +/** + * TODO remove this class once we drop Java 8 support + */ +@InternalApi +private[dispatch] object VirtualThreadSupportReflect { + + /** + * Create a virtual thread factory with given executor, the executor will be used as the scheduler of + * virtual thread. + * + * The executor should run task on platform threads. + * + * returns null if not supported. + */ + def newThreadPerTaskExecutor(prefix: String, executor: ExecutorService): ExecutorService = { + require(VirtualThreadSupport.isSupported, "Virtual thread is not supported.") + val factory = virtualThreadFactory(prefix, executor) + VirtualThreadSupport.newThreadPerTaskExecutor(factory) + } + + private def virtualThreadFactory(prefix: String, executor: ExecutorService): ThreadFactory = + try { + val builderClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder") + val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual") + val ofVirtualMethod = classOf[Thread].getDeclaredMethod("ofVirtual") + var builder = ofVirtualMethod.invoke(null) + if (executor != null) { + val clazz = builder.getClass + val field = clazz.getDeclaredField("scheduler") + field.setAccessible(true) + field.set(builder, executor) + } + val nameMethod = ofVirtualClass.getDeclaredMethod("name", classOf[String], classOf[Long]) + val factoryMethod = builderClass.getDeclaredMethod("factory") + val zero = java.lang.Long.valueOf(0L) + builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", zero) + factoryMethod.invoke(builder).asInstanceOf[ThreadFactory] + } catch { + case NonFatal(e) => + throw new UnsupportedOperationException("Failed to create virtual thread factory", e) + } +} diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala new file mode 100644 index 00000000000..96ec87d120d --- /dev/null +++ b/actor/src/main/scala/org/apache/pekko/dispatch/VirtualizedExecutorService.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.dispatch + +import org.apache.pekko.annotation.InternalApi + +import java.util +import java.util.concurrent.{ Callable, ExecutorService, Future, TimeUnit } + +/** + * A virtualized executor service that creates a new virtual thread for each task. + * Will shut down the underlying executor service when this executor is being shutdown. + * + * INTERNAL API + */ +@InternalApi +final class VirtualizedExecutorService(prefix: String, underlying: ExecutorService) extends ExecutorService { + require(prefix ne null, "Parameter prefix must not be null or empty") + require(underlying ne null, "Parameter underlying must not be null") + + private val executor = VirtualThreadSupportReflect.newThreadPerTaskExecutor(prefix, underlying) + + override def shutdown(): Unit = { + executor.shutdown() + underlying.shutdown() + } + + override def shutdownNow(): util.List[Runnable] = { + executor.shutdownNow() + underlying.shutdownNow() + } + + override def isShutdown: Boolean = { + executor.isShutdown || underlying.isShutdown + } + + override def isTerminated: Boolean = { + executor.isTerminated && underlying.isTerminated + } + + override def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = { + executor.awaitTermination(timeout, unit) && underlying.awaitTermination(timeout, unit) + } + + override def submit[T](task: Callable[T]): Future[T] = { + executor.submit(task) + } + + override def submit[T](task: Runnable, result: T): Future[T] = { + executor.submit(task, result) + } + + override def submit(task: Runnable): Future[_] = { + executor.submit(task) + } + + override def invokeAll[T](tasks: util.Collection[_ <: Callable[T]]): util.List[Future[T]] = { + executor.invokeAll(tasks) + } + + override def invokeAll[T]( + tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): util.List[Future[T]] = { + executor.invokeAll(tasks, timeout, unit) + } + + override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]]): T = { + executor.invokeAny(tasks) + } + + override def invokeAny[T](tasks: util.Collection[_ <: Callable[T]], timeout: Long, unit: TimeUnit): T = { + executor.invokeAny(tasks, timeout, unit) + } + + override def execute(command: Runnable): Unit = { + executor.execute(command) + } +} diff --git a/docs/src/main/paradox/dispatchers.md b/docs/src/main/paradox/dispatchers.md index 5e804803967..87fe2b39587 100644 --- a/docs/src/main/paradox/dispatchers.md +++ b/docs/src/main/paradox/dispatchers.md @@ -42,6 +42,16 @@ allocated by the ForkJoinPool. It is a setting specifically talking about the nu threads the pool keep running in order to reduce the latency of handling a new incoming task. You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html). +When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool. + +Experimental: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature. +When using virtual threads, all virtual threads will use the same `unparker`, so you may want to +increase the number of `jdk.unparker.maxPoolSize`. + +Requirements for running virtual threads: +1. Running cask with Java 21 or later +2. add `--add-opens java.base/java.lang=ALL-UNNAMED` to your JVM options, which is needed to name the virtual threads. + @@@ Another example that uses the "thread-pool-executor": @@ -54,6 +64,8 @@ Another example that uses the "thread-pool-executor": The thread pool executor dispatcher is implemented using a @javadoc[java.util.concurrent.ThreadPoolExecutor](java.util.concurrent.ThreadPoolExecutor). You can read more about it in the JDK's [ThreadPoolExecutor documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html). +Experimental: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature. + @@@ For more options, see @ref[Dispatchers](typed/dispatchers.md) and the `default-dispatcher` section of the @ref:[configuration](general/configuration.md). diff --git a/docs/src/main/paradox/typed/dispatchers.md b/docs/src/main/paradox/typed/dispatchers.md index 2347dc73001..fbf47359ed1 100644 --- a/docs/src/main/paradox/typed/dispatchers.md +++ b/docs/src/main/paradox/typed/dispatchers.md @@ -127,6 +127,16 @@ allocated by the ForkJoinPool. It is a setting specifically talking about the nu threads the pool will keep running in order to reduce the latency of handling a new incoming task. You can read more about parallelism in the JDK's [ForkJoinPool documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html). +When Running on Java 9+, you can use `maximum-pool-size` to set the upper bound on the total number of threads allocated by the ForkJoinPool. + +Experimental: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature. +When using virtual threads, all virtual threads will use the same `unparker`, so you may want to +increase the number of `jdk.unparker.maxPoolSize`. + +Requirements for running virtual threads: +1. Running cask with Java 21 or later +2. add `--add-opens java.base/java.lang=ALL-UNNAMED` to your JVM options, which is needed to name the virtual threads. + @@@ @@@ note @@ -134,6 +144,8 @@ You can read more about parallelism in the JDK's [ForkJoinPool documentation](ht The `thread-pool-executor` dispatcher is implemented using by a `java.util.concurrent.ThreadPoolExecutor`. You can read more about it in the JDK's [ThreadPoolExecutor documentation](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html). +Experimental: When Running on Java 21+, you can use `virtualize=on` to enable the virtual threads feature. + @@@ ## Dispatcher aliases diff --git a/project/JdkOptions.scala b/project/JdkOptions.scala index 852a2bb82da..ed898e6e0c6 100644 --- a/project/JdkOptions.scala +++ b/project/JdkOptions.scala @@ -49,6 +49,11 @@ object JdkOptions extends AutoPlugin { lazy val versionSpecificJavaOptions = if (isJdk17orHigher) { + // for virtual threads + "--add-opens=java.base/sun.misc=ALL-UNNAMED" :: + "--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED" :: + "--add-opens=java.base/java.lang=ALL-UNNAMED" :: + "--add-opens=java.base/java.util=ALL-UNNAMED" :: // for aeron "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" :: // for LevelDB diff --git a/project/PekkoBuild.scala b/project/PekkoBuild.scala index 03052f9c6f5..b18682d6df0 100644 --- a/project/PekkoBuild.scala +++ b/project/PekkoBuild.scala @@ -290,6 +290,7 @@ object PekkoBuild { UsefulTask("testQuick", "Runs all the tests. When run multiple times will only run previously failing tests (shell mode only)"), UsefulTask("testOnly *.AnySpec", "Only run a selected test"), + UsefulTask("TestJdk9 / testOnly *.AnySpec", "Only run a Jdk9+ selected test"), UsefulTask("testQuick *.AnySpec", "Only run a selected test. When run multiple times will only run previously failing tests (shell mode only)"), UsefulTask("testQuickUntilPassed", "Runs all tests in a continuous loop until all tests pass"),