Skip to content

Commit

Permalink
feat: Add support for switching scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 17, 2025
1 parent b20ec82 commit 262b627
Show file tree
Hide file tree
Showing 15 changed files with 418 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/

/*
* Copyright (C) 2018-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package org.apache.pekko.dispatch

import com.typesafe.config.ConfigFactory

import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ ImplicitSender, PekkoSpec }
import pekko.util.JavaVersion

object ForkJoinPoolVirtualThreadSpec {
val config = ConfigFactory.parseString("""
|virtual {
| task-dispatcher {
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
| throughput = 5
| fork-join-executor {
| parallelism-factor = 2
| parallelism-max = 2
| parallelism-min = 2
| virtualize = on
| }
| }
|}
""".stripMargin)

class ThreadNameActor extends Actor {

override def receive = {
case "ping" =>
sender() ! Thread.currentThread().getName
}
}

}

class ForkJoinPoolVirtualThreadSpec extends PekkoSpec(ForkJoinPoolVirtualThreadSpec.config) with ImplicitSender {
import ForkJoinPoolVirtualThreadSpec._

"PekkoForkJoinPool" must {

"support virtualization with Virtual Thread" in {
val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("virtual.task-dispatcher"))
for (_ <- 1 to 1000) {
actor ! "ping"
expectMsgPF() { case name: String => name should include("virtual.task-dispatcher-virtual-thread") }
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.dispatch

import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.actor.{ Actor, Props }
import pekko.testkit.{ ImplicitSender, PekkoSpec }

object ThreadPoolVirtualThreadSpec {
val config = ConfigFactory.parseString("""
|pekko.actor.default-dispatcher.executor = "thread-pool-executor"
|virtual {
| task-dispatcher {
| mailbox-type = "org.apache.pekko.dispatch.SingleConsumerOnlyUnboundedMailbox"
| throughput = 1
| thread-pool-executor {
| fixed-pool-size = 2
| virtualize = on
| }
| }
|}
""".stripMargin)

class ThreadNameActor extends Actor {

override def receive = {
case "ping" =>
sender() ! Thread.currentThread().getName
}
}

}

class ThreadPoolVirtualThreadSpec extends PekkoSpec(ThreadPoolVirtualThreadSpec.config) with ImplicitSender {
import ThreadPoolVirtualThreadSpec._

"PekkoThreadPoolExecutor" must {

"support virtualization with Virtual Thread" in {
val actor = system.actorOf(Props(new ThreadNameActor).withDispatcher("virtual.task-dispatcher"))
for (_ <- 1 to 1000) {
actor ! "ping"
expectMsgPF() { case name: String => name should include("virtual.task-dispatcher-virtual-thread") }
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ object VirtualThreadPoolDispatcherSpec {
class VirtualThreadPoolDispatcherSpec extends PekkoSpec(VirtualThreadPoolDispatcherSpec.config) with ImplicitSender {
import VirtualThreadPoolDispatcherSpec._

val Iterations = 1000

"VirtualThreadPool support" must {

"handle simple dispatch" in {
Expand Down
12 changes: 12 additions & 0 deletions actor/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,12 @@ pekko {
# This config is new in Pekko v1.1.0 and only has an effect if you are running with JDK 9 and above.
# Read the documentation on `java.util.concurrent.ForkJoinPool` to find out more. Default in hex is 0x7fff.
maximum-pool-size = 32767

# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
# When set to `on` but underlying runtime does not support virtual threads, an Exception will throw.
# Virtualize this dispatcher as a virtual-thread-executor
# Valid values are: `on`, `off`
virtualize = off
}

# This will be used if you have set "executor = "thread-pool-executor""
Expand Down Expand Up @@ -538,6 +544,12 @@ pekko {

# Allow core threads to time out
allow-core-timeout = on

# This config is new in Pekko v1.2.0 and only has an effect if you are running with JDK 21 and above,
# When set to `on` but underlying runtime does not support virtual threads, an Exception will throw.
# Virtualize this dispatcher as a virtual-thread-executor
# Valid values are: `on`, `off`
virtualize = off
}

# This will be used if you have set "executor = "virtual-thread-executor"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
@unused prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = {
import org.apache.pekko.util.Helpers.ConfigOps
val builder =
ThreadPoolConfigBuilder(ThreadPoolConfig())
ThreadPoolConfigBuilder(ThreadPoolConfig(virtualize = false))
.setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
.setAllowCoreThreadTimeout(config.getBoolean("allow-core-timeout"))
.configure(Some(config.getInt("task-queue-size")).flatMap {
Expand Down Expand Up @@ -474,6 +474,10 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
config.getInt("max-pool-size-max"))
else
builder.setFixedPoolSize(config.getInt("fixed-pool-size"))

if (config.getBoolean("virtualize")) {
builder.setVirtualize(true)
} else builder
}

def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer
this.getClass,
"PinnedDispatcher [%s] not configured to use ThreadPoolExecutor, falling back to default config.".format(
config.getString("id"))))
ThreadPoolConfig()
ThreadPoolConfig(virtualize = false)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,28 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
}

class ForkJoinExecutorServiceFactory(
val id: String,
val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
val parallelism: Int,
val asyncMode: Boolean,
val maxPoolSize: Int)
val maxPoolSize: Int,
val virtualize: Boolean)
extends ExecutorServiceFactory {
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean,
maxPoolSize: Int,
virtualize: Boolean) =
this(null, threadFactory, parallelism, asyncMode, maxPoolSize, virtualize)

def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap, false)

def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
parallelism: Int,
asyncMode: Boolean) = this(threadFactory, parallelism, asyncMode, ForkJoinPoolConstants.MaxCap)
asyncMode: Boolean,
maxPoolSize: Int) = this(threadFactory, parallelism, asyncMode, maxPoolSize, false)

private def pekkoJdk9ForkJoinPoolClassOpt: Option[Class[_]] =
Try(Class.forName("org.apache.pekko.dispatch.PekkoJdk9ForkJoinPool")).toOption
Expand All @@ -116,12 +129,19 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
def this(threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory, parallelism: Int) =
this(threadFactory, parallelism, asyncMode = true)

def createExecutorService: ExecutorService = pekkoJdk9ForkJoinPoolHandleOpt match {
case Some(handle) =>
handle.invoke(parallelism, threadFactory, maxPoolSize,
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
case _ =>
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
def createExecutorService: ExecutorService = {
val forkJoinPool = pekkoJdk9ForkJoinPoolHandleOpt match {
case Some(handle) =>
handle.invoke(parallelism, threadFactory, maxPoolSize,
MonitorableThreadFactory.doNothing, asyncMode).asInstanceOf[ExecutorService]
case _ =>
new PekkoForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing, asyncMode)
}
if (virtualize) {
new VirtualizedExecutorService(id, forkJoinPool)
} else {
forkJoinPool
}
}
}

Expand All @@ -143,12 +163,14 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
}

new ForkJoinExecutorServiceFactory(
id,
validate(tf),
ThreadPoolConfig.scaledPoolSize(
config.getInt("parallelism-min"),
config.getDouble("parallelism-factor"),
config.getInt("parallelism-max")),
asyncMode,
config.getInt("maximum-pool-size"))
config.getInt("maximum-pool-size"),
config.getBoolean("virtualize"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,32 @@ object ThreadPoolConfig {
def reusableQueue(queue: BlockingQueue[Runnable]): QueueFactory = () => queue

def reusableQueue(queueFactory: QueueFactory): QueueFactory = reusableQueue(queueFactory())

// TODO remove in Pekko 2.0 after change to normal class
def unapply(config: ThreadPoolConfig)
: Option[(Boolean, Int, Int, Duration, ThreadPoolConfig.QueueFactory, RejectedExecutionHandler)] =
Some((config.allowCorePoolTimeout, config.corePoolSize, config.maxPoolSize, config.threadTimeout,
config.queueFactory, config.rejectionPolicy))

// TODO remove in Pekko 2.0 after change to normal class
def apply(allowCorePoolTimeout: Boolean,
corePoolSize: Int,
maxPoolSize: Int,
threadTimeout: Duration,
queueFactory: ThreadPoolConfig.QueueFactory,
rejectionPolicy: RejectedExecutionHandler): ThreadPoolConfig =
ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy,
virtualize = false)
}

/**
* Function0 without the fun stuff (mostly for the sake of the Java API side of things)
*/
trait ExecutorServiceFactory {

/**
* Create a new ExecutorService
*/
def createExecutorService: ExecutorService
}

Expand All @@ -77,14 +97,26 @@ trait ExecutorServiceFactoryProvider {
/**
* A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher
*/
//TODO don't use case class for this in 2.0, it's not a good fit
final case class ThreadPoolConfig(
allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(),
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy)
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy,
virtualize: Boolean)
extends ExecutorServiceFactoryProvider {
// TODO remove in Pekko 2.0 after change to normal class
def this(allowCorePoolTimeout: Boolean,
corePoolSize: Int,
maxPoolSize: Int,
threadTimeout: Duration,
queueFactory: ThreadPoolConfig.QueueFactory,
rejectionPolicy: RejectedExecutionHandler) =
this(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy,
virtualize = false)

// Written explicitly to permit non-inlined defn; this is necessary for downstream instrumentation that stores extra
// context information on the config
@noinline
Expand All @@ -94,13 +126,31 @@ final case class ThreadPoolConfig(
maxPoolSize: Int = maxPoolSize,
threadTimeout: Duration = threadTimeout,
queueFactory: ThreadPoolConfig.QueueFactory = queueFactory,
rejectionPolicy: RejectedExecutionHandler = rejectionPolicy
rejectionPolicy: RejectedExecutionHandler = rejectionPolicy,
virtualize: Boolean = virtualize
): ThreadPoolConfig =
ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy,
virtualize)

// TODO remove in Pekko 2.0 after change to normal class
@noinline
def copy(allowCorePoolTimeout: Boolean,
corePoolSize: Int,
maxPoolSize: Int,
threadTimeout: Duration,
queueFactory: ThreadPoolConfig.QueueFactory,
rejectionPolicy: RejectedExecutionHandler
): ThreadPoolConfig =
ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy)
ThreadPoolConfig(allowCorePoolTimeout, corePoolSize, maxPoolSize, threadTimeout, queueFactory, rejectionPolicy,
virtualize)

class ThreadPoolExecutorServiceFactory(val id: String,
val threadFactory: ThreadFactory) extends ExecutorServiceFactory {

def this(threadFactory: ThreadFactory) = this(null, threadFactory)

class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory {
def createExecutorService: ExecutorService = {
val service: ThreadPoolExecutor = new ThreadPoolExecutor(
val executor: ThreadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
threadTimeout.length,
Expand All @@ -110,18 +160,19 @@ final case class ThreadPoolConfig(
rejectionPolicy) with LoadMetrics {
def atFullThrottle(): Boolean = this.getActiveCount >= this.getPoolSize
}
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
service
executor.allowCoreThreadTimeOut(allowCorePoolTimeout)
if (virtualize) new VirtualizedExecutorService(id, executor) else executor
}
}

def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
val tf = threadFactory match {
case m: MonitorableThreadFactory =>
// add the dispatcher id to the thread names
m.withName(m.name + "-" + id)
case other => other
}
new ThreadPoolExecutorServiceFactory(tf)
new ThreadPoolExecutorServiceFactory(id, tf)
}
}

Expand Down Expand Up @@ -178,6 +229,9 @@ final case class ThreadPoolConfigBuilder(config: ThreadPoolConfig) {
def setQueueFactory(newQueueFactory: QueueFactory): ThreadPoolConfigBuilder =
this.copy(config = config.copy(queueFactory = newQueueFactory))

def setVirtualize(virtualize: Boolean): ThreadPoolConfigBuilder =
this.copy(config = config.copy(virtualize = virtualize))

def configure(fs: Option[Function[ThreadPoolConfigBuilder, ThreadPoolConfigBuilder]]*): ThreadPoolConfigBuilder =
fs.foldLeft(this)((c, f) => f.map(_(c)).getOrElse(c))
}
Expand Down
Loading

0 comments on commit 262b627

Please sign in to comment.