Skip to content

Commit

Permalink
[CELEBORN-1058] Support specifying the number of dispatcher threads f…
Browse files Browse the repository at this point in the history
…or each role

### What changes were proposed in this pull request?
Support specifying the number of dispatcher threads for each role, especially shuffle client side. For shuffle client, there is only RpcEndpointVerifier endpoint which handles not many requests, one thread is enough. The rpc env of other roles has only two endpoints at most, using a shared event loop is reasonable. I am not sure if there is a need to add rpc requests to shuffle client. So add specific parameters to specify the dispatcher threads here.

And change the dispatcher thread pool name in order to distinguish it from spark's.

### Why are the changes needed?
Ditto

### Does this PR introduce _any_ user-facing change?
Yes, add params celeborn.\<role>.rpc.dispatcher.threads

### How was this patch tested?
Manual test and UT

Closes apache#2003 from onebox-li/my_dev.

Authored-by: onebox-li <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
onebox-li authored and waitinfuture committed Nov 3, 2023
1 parent 4e8e8c2 commit 7b185a2
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u
pushDataTimeout = conf.pushDataTimeoutMs();
}

// init rpc env and master endpointRef
rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(conf), 0, conf);
// init rpc env
rpcEnv = RpcEnv.create(RpcNameConstants.SHUFFLE_CLIENT_SYS, Utils.localHostName(conf), 0, conf);

String module = TransportModuleConstants.DATA_MODULE;
TransportConf dataTransportConf =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,19 @@

public class RpcNameConstants {
// For Master
public static String MASTER_SYS = "MasterSys";

public static String MASTER_SYS = "Master";
// Master Endpoint Name
public static String MASTER_EP = "MasterEndpoint";

// For Worker
public static String WORKER_SYS = "WorkerSys";

public static String WORKER_SYS = "Worker";
// Worker Endpoint Name
public static String WORKER_EP = "WorkerEndpoint";

// For Driver(SparkShuffleManager)
// For LifecycleManager
public static String LIFECYCLE_MANAGER_SYS = "LifecycleManager";
public static String LIFECYCLE_MANAGER_EP = "LifecycleManagerEndpoint";
public static String LIFECYCLE_MANAGER_SYS = "LifecycleManagerSys";

// For Shuffle Client
public static String SHUFFLE_CLIENT_SYS = "ShuffleClient";
}
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,17 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key)
def rpcAskTimeout: RpcTimeout =
new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key)
def rpcDispatcherNumThreads(availableCores: Int): Int =
get(RPC_DISPATCHER_THREADS).getOrElse(availableCores)
def rpcDispatcherNumThreads(availableCores: Int): Int = {
val num = get(RPC_DISPATCHER_THREADS)
if (num != 0) num else availableCores
}
def rpcDispatcherNumThreads(availableCores: Int, role: String): Int = {
val num = getInt(
RPC_ROLE_DISPATHER_THREADS.key.replace("<role>", role),
rpcDispatcherNumThreads(availableCores))
if (num != 0) num else availableCores
}

def networkIoMode(module: String): String = {
val key = NETWORK_IO_MODE.key.replace("<module>", module)
get(key, NETWORK_IO_MODE.defaultValue.get)
Expand Down Expand Up @@ -1346,14 +1355,20 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("60s")

val RPC_DISPATCHER_THREADS: OptionalConfigEntry[Int] =
val RPC_DISPATCHER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.rpc.dispatcher.threads")
.withAlternative("celeborn.rpc.dispatcher.numThreads")
.categories("network")
.doc("Threads number of message dispatcher event loop")
.doc("Threads number of message dispatcher event loop. Default to 0, which is availableCore.")
.version("0.3.0")
.intConf
.createOptional
.createWithDefault(0)

val RPC_ROLE_DISPATHER_THREADS: ConfigEntry[Int] =
buildConf("celeborn.<role>.rpc.dispatcher.threads")
.categories("network")
.doc("Threads number of message dispatcher event loop for roles")
.fallbackConf(RPC_DISPATCHER_THREADS)

val NETWORK_IO_MODE: ConfigEntry[String] =
buildConf("celeborn.<module>.io.mode")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ object RpcEnv {
*
* [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri.
*/
abstract class RpcEnv(conf: CelebornConf) {
abstract class RpcEnv(config: RpcEnvConfig) {

private[celeborn] val defaultLookupTimeout = conf.rpcLookupTimeout
private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout

/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils}

/**
* A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
*
* @param numUsableCores Number of CPU cores allocated to the process, for sizing the thread pool.
* If 0, will consider the available CPUs on the host.
*/
private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging {

private class EndpointData(
val name: String,
Expand All @@ -62,7 +59,7 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) e

def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.celebornConf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
Expand Down Expand Up @@ -200,11 +197,14 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) e

/** Thread pool used for dispatching messages. */
private val threadpool: ThreadPoolExecutor = {
val numUsableCores = nettyEnv.config.numUsableCores
val availableCores =
if (numUsableCores > 0) numUsableCores
else Math.max(16, Runtime.getRuntime.availableProcessors())
val numThreads = nettyEnv.conf.rpcDispatcherNumThreads(availableCores)
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
val role = nettyEnv.config.name.toLowerCase()
val numThreads = nettyEnv.celebornConf.rpcDispatcherNumThreads(availableCores, role)

val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "celeborn-dispatcher")
logInfo(s"Dispatcher numThreads: $numThreads")
for (i <- 0 until numThreads) {
pool.execute(new MessageLoop)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,17 @@ import org.apache.celeborn.common.serializer.{JavaSerializer, JavaSerializerInst
import org.apache.celeborn.common.util.{ByteBufferInputStream, ByteBufferOutputStream, JavaUtils, ThreadUtils, Utils}

class NettyRpcEnv(
val conf: CelebornConf,
javaSerializerInstance: JavaSerializerInstance,
host: String,
numUsableCores: Int) extends RpcEnv(conf) with Logging {
val config: RpcEnvConfig,
javaSerializerInstance: JavaSerializerInstance) extends RpcEnv(config) with Logging {

val celebornConf = config.conf

private[celeborn] val transportConf = Utils.fromCelebornConf(
conf.clone,
celebornConf.clone,
TransportModuleConstants.RPC_MODULE,
conf.rpcIoThreads.getOrElse(numUsableCores))
celebornConf.rpcIoThreads.getOrElse(config.numUsableCores))

private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val dispatcher: Dispatcher = new Dispatcher(this)

private var worker: RpcEndpoint = null

Expand All @@ -70,7 +70,7 @@ class NettyRpcEnv(
// to implement non-blocking send/ask.
private[celeborn] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
"netty-rpc-connection",
conf.rpcConnectThreads)
celebornConf.rpcConnectThreads)

@volatile private var server: TransportServer = _

Expand Down Expand Up @@ -101,7 +101,7 @@ class NettyRpcEnv(

@Nullable
override lazy val address: RpcAddress = {
if (server != null) RpcAddress(host, server.getPort()) else null
if (server != null) RpcAddress(config.advertiseAddress, server.getPort()) else null
}

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
Expand All @@ -121,12 +121,12 @@ class NettyRpcEnv(

def asyncSetupEndpointRefByAddr(addr: RpcEndpointAddress): Future[RpcEndpointRef] = {
val verifier = new NettyRpcEndpointRef(
conf,
celebornConf,
RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME),
this)
verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(addr.name)).flatMap { find =>
if (find) {
Future.successful(new NettyRpcEndpointRef(conf, addr, this))
Future.successful(new NettyRpcEndpointRef(celebornConf, addr, this))
} else {
Future.failed(new RpcEndpointNotFoundException(addr.toString))
}
Expand Down Expand Up @@ -341,25 +341,20 @@ private[celeborn] object NettyRpcEnv extends Logging {
private[celeborn] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {

def create(config: RpcEnvConfig): RpcEnv = {
val conf = config.conf
val celebornConf = config.conf
// Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
// KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
val javaSerializerInstance =
new JavaSerializer(conf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
new NettyRpcEnv(
conf,
javaSerializerInstance,
config.advertiseAddress,
config.numUsableCores)
new JavaSerializer(celebornConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv = new NettyRpcEnv(config, javaSerializerInstance)
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
logInfo(s"Starting RPC Server [${config.name}] on ${config.bindAddress}:$actualPort " +
s"with advisor endpoint ${config.advertiseAddress}:$actualPort")
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, conf, config.name)._1
Utils.startServiceOnPort(config.port, startNettyRpcEnv, celebornConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
Expand Down Expand Up @@ -495,7 +490,7 @@ private[celeborn] object RequestMessage {
try {
val senderAddress = readRpcAddress(in)
val endpointAddress = RpcEndpointAddress(readRpcAddress(in), in.readUTF())
val ref = new NettyRpcEndpointRef(nettyEnv.conf, endpointAddress, nettyEnv)
val ref = new NettyRpcEndpointRef(nettyEnv.config.conf, endpointAddress, nettyEnv)
ref.client = client
new RequestMessage(
senderAddress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,4 +226,27 @@ class CelebornConfSuite extends CelebornFunSuite {
conf.set("celeborn.storage.activeTypes", "SDD,HDD")
assert(conf.workerCommitThreads === 32)
}

test("Test role rpcDispatcherNumThreads") {
val availableCores = 5
val conf = new CelebornConf()
assert(conf.rpcDispatcherNumThreads(availableCores, "shuffleclient") === 5)

conf.set("celeborn.shuffleclient.rpc.dispatcher.threads", "1")
assert(conf.rpcDispatcherNumThreads(availableCores, "shuffleclient") === 1)
assert(conf.rpcDispatcherNumThreads(availableCores, "lifecyclemanager") === 5)

conf.set("celeborn.rpc.dispatcher.threads", "2")
assert(conf.rpcDispatcherNumThreads(availableCores, "lifecyclemanager") === 2)

conf.unset("celeborn.rpc.dispatcher.threads")
conf.set("celeborn.rpc.dispatcher.numThreads", "3")
assert(conf.rpcDispatcherNumThreads(availableCores, "lifecyclemanager") === 3)
}

test("Test rpcDispatcherNumThreads") {
val availableCores = 5
val conf = new CelebornConf()
assert(conf.rpcDispatcherNumThreads(availableCores) === 5)
}
}
3 changes: 2 additions & 1 deletion docs/configuration/network.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ license: |
| celeborn.&lt;module&gt;.io.serverThreads | 0 | Number of threads used in the server thread pool. Default to 0, which is 2x#cores. | |
| celeborn.&lt;module&gt;.push.timeoutCheck.interval | 5s | Interval for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data and should be configured on client side. If setting <module> to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 |
| celeborn.&lt;module&gt;.push.timeoutCheck.threads | 4 | Threads num for checking push data timeout. If setting <module> to `data`, it works for shuffle client push data and should be configured on client side. If setting <module> to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 |
| celeborn.&lt;role&gt;.rpc.dispatcher.threads | &lt;value of celeborn.rpc.dispatcher.threads&gt; | Threads number of message dispatcher event loop for roles | |
| celeborn.network.bind.preferIpAddress | true | When `ture`, prefer to use IP address, otherwise FQDN. This configuration only takes effects when the bind hostname is not set explicitly, in such case, Celeborn will find the first non-loopback address to bind. | 0.3.0 |
| celeborn.network.connect.timeout | 10s | Default socket connect timeout. | 0.2.0 |
| celeborn.network.memory.allocator.numArenas | &lt;undefined&gt; | Number of arenas for pooled memory allocator. Default value is Runtime.getRuntime.availableProcessors, min value is 2. | 0.3.0 |
Expand All @@ -46,7 +47,7 @@ license: |
| celeborn.port.maxRetries | 1 | When port is occupied, we will retry for max retry times. | 0.2.0 |
| celeborn.rpc.askTimeout | 60s | Timeout for RPC ask operations. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes` | 0.2.0 |
| celeborn.rpc.connect.threads | 64 | | 0.2.0 |
| celeborn.rpc.dispatcher.threads | &lt;undefined&gt; | Threads number of message dispatcher event loop | 0.3.0 |
| celeborn.rpc.dispatcher.threads | 0 | Threads number of message dispatcher event loop. Default to 0, which is availableCore. | 0.3.0 |
| celeborn.rpc.io.threads | &lt;undefined&gt; | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 |
| celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. | 0.2.0 |
| celeborn.shuffle.io.maxChunksBeingTransferred | &lt;undefined&gt; | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn.<module>.io.maxRetries` and `celeborn.<module>.io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 |
Expand Down

0 comments on commit 7b185a2

Please sign in to comment.