diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index d913a9e068b..39d382d1064 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -172,8 +172,8 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u pushDataTimeout = conf.pushDataTimeoutMs(); } - // init rpc env and master endpointRef - rpcEnv = RpcEnv.create("ShuffleClient", Utils.localHostName(conf), 0, conf); + // init rpc env + rpcEnv = RpcEnv.create(RpcNameConstants.SHUFFLE_CLIENT_SYS, Utils.localHostName(conf), 0, conf); String module = TransportModuleConstants.DATA_MODULE; TransportConf dataTransportConf = diff --git a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java index 17d4fd2c876..c47f04f98ab 100644 --- a/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java +++ b/common/src/main/java/org/apache/celeborn/common/protocol/RpcNameConstants.java @@ -19,18 +19,19 @@ public class RpcNameConstants { // For Master - public static String MASTER_SYS = "MasterSys"; - + public static String MASTER_SYS = "Master"; // Master Endpoint Name public static String MASTER_EP = "MasterEndpoint"; // For Worker - public static String WORKER_SYS = "WorkerSys"; - + public static String WORKER_SYS = "Worker"; // Worker Endpoint Name public static String WORKER_EP = "WorkerEndpoint"; - // For Driver(SparkShuffleManager) + // For LifecycleManager + public static String LIFECYCLE_MANAGER_SYS = "LifecycleManager"; public static String LIFECYCLE_MANAGER_EP = "LifecycleManagerEndpoint"; - public static String LIFECYCLE_MANAGER_SYS = "LifecycleManagerSys"; + + // For Shuffle Client + public static String SHUFFLE_CLIENT_SYS = "ShuffleClient"; } diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 24b65b0229f..0879a610725 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -381,8 +381,17 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se new RpcTimeout(get(RPC_LOOKUP_TIMEOUT).milli, RPC_LOOKUP_TIMEOUT.key) def rpcAskTimeout: RpcTimeout = new RpcTimeout(get(RPC_ASK_TIMEOUT).milli, RPC_ASK_TIMEOUT.key) - def rpcDispatcherNumThreads(availableCores: Int): Int = - get(RPC_DISPATCHER_THREADS).getOrElse(availableCores) + def rpcDispatcherNumThreads(availableCores: Int): Int = { + val num = get(RPC_DISPATCHER_THREADS) + if (num != 0) num else availableCores + } + def rpcDispatcherNumThreads(availableCores: Int, role: String): Int = { + val num = getInt( + RPC_ROLE_DISPATHER_THREADS.key.replace("", role), + rpcDispatcherNumThreads(availableCores)) + if (num != 0) num else availableCores + } + def networkIoMode(module: String): String = { val key = NETWORK_IO_MODE.key.replace("", module) get(key, NETWORK_IO_MODE.defaultValue.get) @@ -1346,14 +1355,20 @@ object CelebornConf extends Logging { .timeConf(TimeUnit.MILLISECONDS) .createWithDefaultString("60s") - val RPC_DISPATCHER_THREADS: OptionalConfigEntry[Int] = + val RPC_DISPATCHER_THREADS: ConfigEntry[Int] = buildConf("celeborn.rpc.dispatcher.threads") .withAlternative("celeborn.rpc.dispatcher.numThreads") .categories("network") - .doc("Threads number of message dispatcher event loop") + .doc("Threads number of message dispatcher event loop. Default to 0, which is availableCore.") .version("0.3.0") .intConf - .createOptional + .createWithDefault(0) + + val RPC_ROLE_DISPATHER_THREADS: ConfigEntry[Int] = + buildConf("celeborn..rpc.dispatcher.threads") + .categories("network") + .doc("Threads number of message dispatcher event loop for roles") + .fallbackConf(RPC_DISPATCHER_THREADS) val NETWORK_IO_MODE: ConfigEntry[String] = buildConf("celeborn..io.mode") diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala index 4d3d6d0d2e6..f0d0f04528a 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala @@ -59,9 +59,9 @@ object RpcEnv { * * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. */ -abstract class RpcEnv(conf: CelebornConf) { +abstract class RpcEnv(config: RpcEnvConfig) { - private[celeborn] val defaultLookupTimeout = conf.rpcLookupTimeout + private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout /** * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala index 9ea65aef349..b137ae2f548 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/Dispatcher.scala @@ -32,11 +32,8 @@ import org.apache.celeborn.common.util.{JavaUtils, ThreadUtils} /** * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s). - * - * @param numUsableCores Number of CPU cores allocated to the process, for sizing the thread pool. - * If 0, will consider the available CPUs on the host. */ -private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging { +private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { private class EndpointData( val name: String, @@ -62,7 +59,7 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) e def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) - val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) + val endpointRef = new NettyRpcEndpointRef(nettyEnv.celebornConf, addr, nettyEnv) synchronized { if (stopped) { throw new IllegalStateException("RpcEnv has been stopped") @@ -200,11 +197,14 @@ private[celeborn] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) e /** Thread pool used for dispatching messages. */ private val threadpool: ThreadPoolExecutor = { + val numUsableCores = nettyEnv.config.numUsableCores val availableCores = if (numUsableCores > 0) numUsableCores else Math.max(16, Runtime.getRuntime.availableProcessors()) - val numThreads = nettyEnv.conf.rpcDispatcherNumThreads(availableCores) - val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop") + val role = nettyEnv.config.name.toLowerCase() + val numThreads = nettyEnv.celebornConf.rpcDispatcherNumThreads(availableCores, role) + + val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "celeborn-dispatcher") logInfo(s"Dispatcher numThreads: $numThreads") for (i <- 0 until numThreads) { pool.execute(new MessageLoop) diff --git a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala index 31809e36622..2d467cc9b87 100644 --- a/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala +++ b/common/src/main/scala/org/apache/celeborn/common/rpc/netty/NettyRpcEnv.scala @@ -44,17 +44,17 @@ import org.apache.celeborn.common.serializer.{JavaSerializer, JavaSerializerInst import org.apache.celeborn.common.util.{ByteBufferInputStream, ByteBufferOutputStream, JavaUtils, ThreadUtils, Utils} class NettyRpcEnv( - val conf: CelebornConf, - javaSerializerInstance: JavaSerializerInstance, - host: String, - numUsableCores: Int) extends RpcEnv(conf) with Logging { + val config: RpcEnvConfig, + javaSerializerInstance: JavaSerializerInstance) extends RpcEnv(config) with Logging { + + val celebornConf = config.conf private[celeborn] val transportConf = Utils.fromCelebornConf( - conf.clone, + celebornConf.clone, TransportModuleConstants.RPC_MODULE, - conf.rpcIoThreads.getOrElse(numUsableCores)) + celebornConf.rpcIoThreads.getOrElse(config.numUsableCores)) - private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores) + private val dispatcher: Dispatcher = new Dispatcher(this) private var worker: RpcEndpoint = null @@ -70,7 +70,7 @@ class NettyRpcEnv( // to implement non-blocking send/ask. private[celeborn] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool( "netty-rpc-connection", - conf.rpcConnectThreads) + celebornConf.rpcConnectThreads) @volatile private var server: TransportServer = _ @@ -101,7 +101,7 @@ class NettyRpcEnv( @Nullable override lazy val address: RpcAddress = { - if (server != null) RpcAddress(host, server.getPort()) else null + if (server != null) RpcAddress(config.advertiseAddress, server.getPort()) else null } override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { @@ -121,12 +121,12 @@ class NettyRpcEnv( def asyncSetupEndpointRefByAddr(addr: RpcEndpointAddress): Future[RpcEndpointRef] = { val verifier = new NettyRpcEndpointRef( - conf, + celebornConf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this) verifier.ask[Boolean](RpcEndpointVerifier.CheckExistence(addr.name)).flatMap { find => if (find) { - Future.successful(new NettyRpcEndpointRef(conf, addr, this)) + Future.successful(new NettyRpcEndpointRef(celebornConf, addr, this)) } else { Future.failed(new RpcEndpointNotFoundException(addr.toString)) } @@ -341,17 +341,12 @@ private[celeborn] object NettyRpcEnv extends Logging { private[celeborn] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { def create(config: RpcEnvConfig): RpcEnv = { - val conf = config.conf + val celebornConf = config.conf // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance val javaSerializerInstance = - new JavaSerializer(conf).newInstance().asInstanceOf[JavaSerializerInstance] - val nettyEnv = - new NettyRpcEnv( - conf, - javaSerializerInstance, - config.advertiseAddress, - config.numUsableCores) + new JavaSerializer(celebornConf).newInstance().asInstanceOf[JavaSerializerInstance] + val nettyEnv = new NettyRpcEnv(config, javaSerializerInstance) val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => logInfo(s"Starting RPC Server [${config.name}] on ${config.bindAddress}:$actualPort " + s"with advisor endpoint ${config.advertiseAddress}:$actualPort") @@ -359,7 +354,7 @@ private[celeborn] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { (nettyEnv, nettyEnv.address.port) } try { - Utils.startServiceOnPort(config.port, startNettyRpcEnv, conf, config.name)._1 + Utils.startServiceOnPort(config.port, startNettyRpcEnv, celebornConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() @@ -495,7 +490,7 @@ private[celeborn] object RequestMessage { try { val senderAddress = readRpcAddress(in) val endpointAddress = RpcEndpointAddress(readRpcAddress(in), in.readUTF()) - val ref = new NettyRpcEndpointRef(nettyEnv.conf, endpointAddress, nettyEnv) + val ref = new NettyRpcEndpointRef(nettyEnv.config.conf, endpointAddress, nettyEnv) ref.client = client new RequestMessage( senderAddress, diff --git a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala index f57ce904012..f3e01c5ab43 100644 --- a/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala +++ b/common/src/test/scala/org/apache/celeborn/common/CelebornConfSuite.scala @@ -226,4 +226,27 @@ class CelebornConfSuite extends CelebornFunSuite { conf.set("celeborn.storage.activeTypes", "SDD,HDD") assert(conf.workerCommitThreads === 32) } + + test("Test role rpcDispatcherNumThreads") { + val availableCores = 5 + val conf = new CelebornConf() + assert(conf.rpcDispatcherNumThreads(availableCores, "shuffleclient") === 5) + + conf.set("celeborn.shuffleclient.rpc.dispatcher.threads", "1") + assert(conf.rpcDispatcherNumThreads(availableCores, "shuffleclient") === 1) + assert(conf.rpcDispatcherNumThreads(availableCores, "lifecyclemanager") === 5) + + conf.set("celeborn.rpc.dispatcher.threads", "2") + assert(conf.rpcDispatcherNumThreads(availableCores, "lifecyclemanager") === 2) + + conf.unset("celeborn.rpc.dispatcher.threads") + conf.set("celeborn.rpc.dispatcher.numThreads", "3") + assert(conf.rpcDispatcherNumThreads(availableCores, "lifecyclemanager") === 3) + } + + test("Test rpcDispatcherNumThreads") { + val availableCores = 5 + val conf = new CelebornConf() + assert(conf.rpcDispatcherNumThreads(availableCores) === 5) + } } diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 225c9de7b02..f63fc8bf7d8 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -38,6 +38,7 @@ license: | | celeborn.<module>.io.serverThreads | 0 | Number of threads used in the server thread pool. Default to 0, which is 2x#cores. | | | celeborn.<module>.push.timeoutCheck.interval | 5s | Interval for checking push data timeout. If setting to `data`, it works for shuffle client push data and should be configured on client side. If setting to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 | | celeborn.<module>.push.timeoutCheck.threads | 4 | Threads num for checking push data timeout. If setting to `data`, it works for shuffle client push data and should be configured on client side. If setting to `replicate`, it works for worker replicate data to peer worker and should be configured on worker side. | 0.3.0 | +| celeborn.<role>.rpc.dispatcher.threads | <value of celeborn.rpc.dispatcher.threads> | Threads number of message dispatcher event loop for roles | | | celeborn.network.bind.preferIpAddress | true | When `ture`, prefer to use IP address, otherwise FQDN. This configuration only takes effects when the bind hostname is not set explicitly, in such case, Celeborn will find the first non-loopback address to bind. | 0.3.0 | | celeborn.network.connect.timeout | 10s | Default socket connect timeout. | 0.2.0 | | celeborn.network.memory.allocator.numArenas | <undefined> | Number of arenas for pooled memory allocator. Default value is Runtime.getRuntime.availableProcessors, min value is 2. | 0.3.0 | @@ -46,7 +47,7 @@ license: | | celeborn.port.maxRetries | 1 | When port is occupied, we will retry for max retry times. | 0.2.0 | | celeborn.rpc.askTimeout | 60s | Timeout for RPC ask operations. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.activeTypes` | 0.2.0 | | celeborn.rpc.connect.threads | 64 | | 0.2.0 | -| celeborn.rpc.dispatcher.threads | <undefined> | Threads number of message dispatcher event loop | 0.3.0 | +| celeborn.rpc.dispatcher.threads | 0 | Threads number of message dispatcher event loop. Default to 0, which is availableCore. | 0.3.0 | | celeborn.rpc.io.threads | <undefined> | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 | | celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. | 0.2.0 | | celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn..io.maxRetries` and `celeborn..io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 |