diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 87a51328ad8..74b04359845 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -151,6 +151,7 @@ message PbHeartbeatFromWorker { string requestId = 8; map userResourceConsumption = 9; map estimatedAppDiskUsage = 10; + bool highWorkload = 11; } message PbHeartbeatFromWorkerResponse { diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index b11d475cbd3..916d0806bad 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -654,6 +654,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def workerPushMaxComponents: Int = get(WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS) def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED) def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED) + def workerActiveConnectionMax: Option[Long] = get(WORKER_ACTIVE_CONNECTION_MAX) // ////////////////////////////////////////////////////// // Metrics System // @@ -2693,6 +2694,16 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val WORKER_ACTIVE_CONNECTION_MAX: OptionalConfigEntry[Long] = + buildConf("celeborn.worker.activeConnection.max") + .categories("worker") + .doc("If the number of active connections on a worker exceeds this configuration value, " + + "the worker will be marked as high-load in the heartbeat report, " + + "and the master will not include that node in the response of RequestSlots.") + .version("0.3.1") + .longConf + .createOptional + val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] = buildConf("celeborn.client.application.heartbeatInterval") .withAlternative("celeborn.application.heartbeatInterval") diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 89ef41a9317..18dc2675d00 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -113,6 +113,7 @@ object ControlMessages extends Logging { userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption], activeShuffleKeys: util.Set[String], estimatedAppDiskUsage: util.HashMap[String, java.lang.Long], + highWorkload: Boolean, override var requestId: String = ZERO_UUID) extends MasterRequestMessage case class HeartbeatFromWorkerResponse( @@ -445,6 +446,7 @@ object ControlMessages extends Logging { userResourceConsumption, activeShuffleKeys, estimatedAppDiskUsage, + highWorkload, requestId) => val pbDisks = disks.map(PbSerDeUtils.toPbDiskInfo).asJava val pbUserResourceConsumption = @@ -459,6 +461,7 @@ object ControlMessages extends Logging { .setReplicatePort(replicatePort) .addAllActiveShuffleKeys(activeShuffleKeys) .putAllEstimatedAppDiskUsage(estimatedAppDiskUsage) + .setHighWorkload(highWorkload) .setRequestId(requestId) .build().toByteArray new TransportMessage(MessageType.HEARTBEAT_FROM_WORKER, payload) @@ -821,6 +824,7 @@ object ControlMessages extends Logging { userResourceConsumption, activeShuffleKeys, estimatedAppDiskUsage, + pbHeartbeatFromWorker.getHighWorkload, pbHeartbeatFromWorker.getRequestId) case HEARTBEAT_FROM_WORKER_RESPONSE_VALUE => diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index 994838b3fef..8b05d5ad784 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -24,6 +24,7 @@ license: | | celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 | | celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 | | celeborn.storage.hdfs.dir | <undefined> | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 | +| celeborn.worker.activeConnection.max | <undefined> | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 | | celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for read buffer per mount point. | 0.3.0 | | celeborn.worker.closeIdleConnections | false | Whether worker will close idle connections. | 0.2.0 | | celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 | diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java index 8df57405fa5..8d7b9376a7b 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/AbstractMetaManager.java @@ -143,7 +143,8 @@ public void updateWorkerHeartbeatMeta( Map disks, Map userResourceConsumption, Map estimatedAppDiskUsage, - long time) { + long time, + boolean highWorkload) { WorkerInfo worker = new WorkerInfo( host, rpcPort, pushPort, fetchPort, replicatePort, disks, userResourceConsumption); @@ -161,10 +162,11 @@ public void updateWorkerHeartbeatMeta( } appDiskUsageMetric.update(estimatedAppDiskUsage); // If using HDFSONLY mode, workers with empty disks should not be put into excluded worker list. - if (!excludedWorkers.contains(worker) && (disks.isEmpty() && !conf.hasHDFSStorage())) { + if (!excludedWorkers.contains(worker) + && ((disks.isEmpty() && !conf.hasHDFSStorage()) || highWorkload)) { LOG.debug("Worker: {} num total slots is 0, add to excluded list", worker); excludedWorkers.add(worker); - } else if (availableSlots.get() > 0) { + } else if ((availableSlots.get() > 0 || conf.hasHDFSStorage()) && !highWorkload) { // only unblack if numSlots larger than 0 excludedWorkers.remove(worker); } diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java index 6c4c65a73db..a34cb445d53 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/IMetadataHandler.java @@ -55,6 +55,7 @@ void handleWorkerHeartbeat( Map userResourceConsumption, Map estimatedAppDiskUsage, long time, + boolean highWorkload, String requestId); void handleRegisterWorker( diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java index 3d12db8b405..15c0c6d6d7b 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/SingleMasterMetaManager.java @@ -92,6 +92,7 @@ public void handleWorkerHeartbeat( Map userResourceConsumption, Map estimatedAppDiskUsage, long time, + boolean highWorkload, String requestId) { updateWorkerHeartbeatMeta( host, @@ -102,7 +103,8 @@ public void handleWorkerHeartbeat( disks, userResourceConsumption, estimatedAppDiskUsage, - time); + time, + highWorkload); } @Override diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java index f7a10013c03..181c6e4874b 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/HAMasterMetaManager.java @@ -197,6 +197,7 @@ public void handleWorkerHeartbeat( Map userResourceConsumption, Map estimatedAppDiskUsage, long time, + boolean highWorkload, String requestId) { try { ratisServer.submitRequest( @@ -215,6 +216,7 @@ public void handleWorkerHeartbeat( MetaUtil.toPbUserResourceConsumption(userResourceConsumption)) .putAllEstimatedAppDiskUsage(estimatedAppDiskUsage) .setTime(time) + .setHighWorkload(highWorkload) .build()) .build()); } catch (CelebornRuntimeException e) { diff --git a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java index d6ab8309cba..27ba6d8828b 100644 --- a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java +++ b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MetaHandler.java @@ -163,6 +163,7 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques estimatedAppDiskUsage.putAll( request.getWorkerHeartbeatRequest().getEstimatedAppDiskUsageMap()); replicatePort = request.getWorkerHeartbeatRequest().getReplicatePort(); + boolean highWorkload = request.getWorkerHeartbeatRequest().getHighWorkload(); LOG.debug( "Handle worker heartbeat for {} {} {} {} {} {} {}", host, @@ -182,7 +183,8 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques diskInfos, userResourceConsumption, estimatedAppDiskUsage, - time); + time, + highWorkload); break; case RegisterWorker: diff --git a/master/src/main/proto/Resource.proto b/master/src/main/proto/Resource.proto index 8f6f62f258b..a6fb5d17327 100644 --- a/master/src/main/proto/Resource.proto +++ b/master/src/main/proto/Resource.proto @@ -121,6 +121,7 @@ message WorkerHeartbeatRequest { required int64 time = 7; map userResourceConsumption = 8; map estimatedAppDiskUsage = 9; + required bool highWorkload = 10; } message RegisterWorkerRequest { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index 466e088d165..c3dd2712555 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -334,6 +334,7 @@ private[celeborn] class Master( userResourceConsumption, activeShuffleKey, estimatedAppDiskUsage, + highWorkload, requestId) => logDebug(s"Received heartbeat from" + s" worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort with $disks.") @@ -350,6 +351,7 @@ private[celeborn] class Master( userResourceConsumption, activeShuffleKey, estimatedAppDiskUsage, + highWorkload, requestId)) case ReportWorkerUnavailable(failedWorkers: util.List[WorkerInfo], requestId: String) => @@ -432,6 +434,7 @@ private[celeborn] class Master( userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption], activeShuffleKeys: util.Set[String], estimatedAppDiskUsage: util.HashMap[String, java.lang.Long], + highWorkload: Boolean, requestId: String): Unit = { val targetWorker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort) val registered = workersSnapShot.asScala.contains(targetWorker) @@ -449,6 +452,7 @@ private[celeborn] class Master( userResourceConsumption, estimatedAppDiskUsage, System.currentTimeMillis(), + highWorkload, requestId) } diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java index aaae7861a87..2962ebafdb5 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/DefaultMetaSystemSuiteJ.java @@ -506,6 +506,7 @@ public void testHandleWorkerHeartbeat() { userResourceConsumption1, new HashMap<>(), 1, + false, getNewReqeustId()); Assert.assertEquals(statusSystem.excludedWorkers.size(), 1); @@ -520,23 +521,40 @@ public void testHandleWorkerHeartbeat() { userResourceConsumption2, new HashMap<>(), 1, + false, getNewReqeustId()); Assert.assertEquals(statusSystem.excludedWorkers.size(), 2); statusSystem.handleWorkerHeartbeat( - HOSTNAME1, - RPCPORT1, - PUSHPORT1, - FETCHPORT1, + HOSTNAME3, + RPCPORT3, + PUSHPORT3, + FETCHPORT3, REPLICATEPORT3, - disks1, - userResourceConsumption1, + disks3, + userResourceConsumption3, new HashMap<>(), 1, + false, getNewReqeustId()); Assert.assertEquals(statusSystem.excludedWorkers.size(), 2); + + statusSystem.handleWorkerHeartbeat( + HOSTNAME3, + RPCPORT3, + PUSHPORT3, + FETCHPORT3, + REPLICATEPORT3, + disks3, + userResourceConsumption3, + new HashMap<>(), + 1, + true, + getNewReqeustId()); + + Assert.assertEquals(statusSystem.excludedWorkers.size(), 3); } @Test diff --git a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java index 1d0e2c17b76..6ae55d1620e 100644 --- a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java +++ b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/RatisMasterStatusSystemSuiteJ.java @@ -735,6 +735,7 @@ public void testHandleWorkerHeartbeat() throws InterruptedException { userResourceConsumption1, new HashMap<>(), 1, + false, getNewReqeustId()); Thread.sleep(3000L); @@ -752,6 +753,7 @@ public void testHandleWorkerHeartbeat() throws InterruptedException { userResourceConsumption2, new HashMap<>(), 1, + false, getNewReqeustId()); Thread.sleep(3000L); @@ -770,6 +772,7 @@ public void testHandleWorkerHeartbeat() throws InterruptedException { userResourceConsumption1, new HashMap<>(), 1, + false, getNewReqeustId()); Thread.sleep(3000L); @@ -777,6 +780,24 @@ public void testHandleWorkerHeartbeat() throws InterruptedException { Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size()); Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size()); Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size()); + + statusSystem.handleWorkerHeartbeat( + HOSTNAME1, + RPCPORT1, + PUSHPORT1, + FETCHPORT1, + REPLICATEPORT1, + disks1, + userResourceConsumption1, + new HashMap<>(), + 1, + true, + getNewReqeustId()); + Thread.sleep(3000L); + Assert.assertEquals(2, statusSystem.excludedWorkers.size()); + Assert.assertEquals(2, STATUSSYSTEM1.excludedWorkers.size()); + Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size()); + Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size()); } @Before diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java index 8ef2ee19d1d..cb248dd9b8d 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java @@ -429,7 +429,6 @@ public interface ReadBufferTargetChangeListener { void onChange(long newMemoryTarget); } - @VisibleForTesting public enum ServingState { NONE_PAUSED, PUSH_AND_REPLICATE_PAUSED, diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala index e5267a75fe5..c263f2222e8 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala @@ -46,8 +46,10 @@ import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils, ShutdownHoo // Can Remove this if celeborn don't support scala211 in future import org.apache.celeborn.common.util.FunctionConverter._ import org.apache.celeborn.server.common.{HttpService, Service} +import org.apache.celeborn.service.deploy.worker.WorkerSource.ACTIVE_CONNECTION_COUNT import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, MemoryManager} +import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager} private[celeborn] class Worker( @@ -296,6 +298,16 @@ private[celeborn] class Worker( memoryManager.getAllocatedReadBuffers } + private def highWorkload: Boolean = { + (memoryManager.currentServingState, conf.workerActiveConnectionMax) match { + case (ServingState.PUSH_AND_REPLICATE_PAUSED, _) => true + case (ServingState.PUSH_PAUSED, _) => true + case (_, Some(activeConnectionMax)) => + workerSource.getCounterCount(ACTIVE_CONNECTION_COUNT) >= activeConnectionMax + case _ => false + } + } + private def heartbeatToMaster(): Unit = { val activeShuffleKeys = new JHashSet[String]() val estimatedAppDiskUsage = new JHashMap[String, JLong]() @@ -322,7 +334,8 @@ private[celeborn] class Worker( diskInfos, resourceConsumption, activeShuffleKeys, - estimatedAppDiskUsage), + estimatedAppDiskUsage, + highWorkload), classOf[HeartbeatFromWorkerResponse]) response.expiredShuffleKeys.asScala.foreach(shuffleKey => workerInfo.releaseSlots(shuffleKey)) cleanTaskQueue.put(response.expiredShuffleKeys) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala index edcebbb2e08..e1f247a0833 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/WorkerSource.scala @@ -57,6 +57,10 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste addTimer(TAKE_BUFFER_TIME) addTimer(SORT_TIME) + def getCounterCount(metricsName: String): Long = { + val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, Map.empty) + namedCounters.get(metricNameWithLabel).counter.getCount + } // start cleaner thread startCleaner() }