Skip to content

Commit

Permalink
[CELEBORN-920] Worker sends its load to Master through heartbeat
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

 Adding a flag indicating high load in the worker's heartbeat allows the master to better schedule the workers

### Why are the changes needed?

In our production environment, there is a node with abnormally high load, but the master is not aware of this situation. It assigned numerous jobs to this node, and as a result, the stability of these jobs has been affected.

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?

UT

Closes #1840 from JQ-Cao/920.

Lead-authored-by: Keyong Zhou <[email protected]>
Co-authored-by: caojiaqing <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
  • Loading branch information
waitinfuture and JQ-Cao committed Aug 26, 2023
1 parent ae39a97 commit 1d04a23
Show file tree
Hide file tree
Showing 16 changed files with 99 additions and 13 deletions.
1 change: 1 addition & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ message PbHeartbeatFromWorker {
string requestId = 8;
map<string, PbResourceConsumption> userResourceConsumption = 9;
map<string, int64> estimatedAppDiskUsage = 10;
bool highWorkload = 11;
}

message PbHeartbeatFromWorkerResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def workerPushMaxComponents: Int = get(WORKER_PUSH_COMPOSITEBUFFER_MAXCOMPONENTS)
def workerFetchHeartbeatEnabled: Boolean = get(WORKER_FETCH_HEARTBEAT_ENABLED)
def workerPartitionSplitEnabled: Boolean = get(WORKER_PARTITION_SPLIT_ENABLED)
def workerActiveConnectionMax: Option[Long] = get(WORKER_ACTIVE_CONNECTION_MAX)

// //////////////////////////////////////////////////////
// Metrics System //
Expand Down Expand Up @@ -2693,6 +2694,16 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)

val WORKER_ACTIVE_CONNECTION_MAX: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.activeConnection.max")
.categories("worker")
.doc("If the number of active connections on a worker exceeds this configuration value, " +
"the worker will be marked as high-load in the heartbeat report, " +
"and the master will not include that node in the response of RequestSlots.")
.version("0.3.1")
.longConf
.createOptional

val APPLICATION_HEARTBEAT_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.client.application.heartbeatInterval")
.withAlternative("celeborn.application.heartbeatInterval")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ object ControlMessages extends Logging {
userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption],
activeShuffleKeys: util.Set[String],
estimatedAppDiskUsage: util.HashMap[String, java.lang.Long],
highWorkload: Boolean,
override var requestId: String = ZERO_UUID) extends MasterRequestMessage

case class HeartbeatFromWorkerResponse(
Expand Down Expand Up @@ -445,6 +446,7 @@ object ControlMessages extends Logging {
userResourceConsumption,
activeShuffleKeys,
estimatedAppDiskUsage,
highWorkload,
requestId) =>
val pbDisks = disks.map(PbSerDeUtils.toPbDiskInfo).asJava
val pbUserResourceConsumption =
Expand All @@ -459,6 +461,7 @@ object ControlMessages extends Logging {
.setReplicatePort(replicatePort)
.addAllActiveShuffleKeys(activeShuffleKeys)
.putAllEstimatedAppDiskUsage(estimatedAppDiskUsage)
.setHighWorkload(highWorkload)
.setRequestId(requestId)
.build().toByteArray
new TransportMessage(MessageType.HEARTBEAT_FROM_WORKER, payload)
Expand Down Expand Up @@ -821,6 +824,7 @@ object ControlMessages extends Logging {
userResourceConsumption,
activeShuffleKeys,
estimatedAppDiskUsage,
pbHeartbeatFromWorker.getHighWorkload,
pbHeartbeatFromWorker.getRequestId)

case HEARTBEAT_FROM_WORKER_RESPONSE_VALUE =>
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ license: |
| celeborn.shuffle.chunk.size | 8m | Max chunk size of reducer's merged shuffle data. For example, if a reducer's shuffle data is 128M and the data will need 16 fetch chunk requests to fetch. | 0.2.0 |
| celeborn.storage.activeTypes | HDD,SSD | Enabled storage levels. Available options: HDD,SSD,HDFS. | 0.3.0 |
| celeborn.storage.hdfs.dir | &lt;undefined&gt; | HDFS base directory for Celeborn to store shuffle data. | 0.2.0 |
| celeborn.worker.activeConnection.max | &lt;undefined&gt; | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 |
| celeborn.worker.bufferStream.threadsPerMountpoint | 8 | Threads count for read buffer per mount point. | 0.3.0 |
| celeborn.worker.closeIdleConnections | false | Whether worker will close idle connections. | 0.2.0 |
| celeborn.worker.commitFiles.threads | 32 | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.activeTypes`. | 0.3.0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public void updateWorkerHeartbeatMeta(
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
Map<String, Long> estimatedAppDiskUsage,
long time) {
long time,
boolean highWorkload) {
WorkerInfo worker =
new WorkerInfo(
host, rpcPort, pushPort, fetchPort, replicatePort, disks, userResourceConsumption);
Expand All @@ -161,10 +162,11 @@ public void updateWorkerHeartbeatMeta(
}
appDiskUsageMetric.update(estimatedAppDiskUsage);
// If using HDFSONLY mode, workers with empty disks should not be put into excluded worker list.
if (!excludedWorkers.contains(worker) && (disks.isEmpty() && !conf.hasHDFSStorage())) {
if (!excludedWorkers.contains(worker)
&& ((disks.isEmpty() && !conf.hasHDFSStorage()) || highWorkload)) {
LOG.debug("Worker: {} num total slots is 0, add to excluded list", worker);
excludedWorkers.add(worker);
} else if (availableSlots.get() > 0) {
} else if ((availableSlots.get() > 0 || conf.hasHDFSStorage()) && !highWorkload) {
// only unblack if numSlots larger than 0
excludedWorkers.remove(worker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ void handleWorkerHeartbeat(
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
Map<String, Long> estimatedAppDiskUsage,
long time,
boolean highWorkload,
String requestId);

void handleRegisterWorker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public void handleWorkerHeartbeat(
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
Map<String, Long> estimatedAppDiskUsage,
long time,
boolean highWorkload,
String requestId) {
updateWorkerHeartbeatMeta(
host,
Expand All @@ -102,7 +103,8 @@ public void handleWorkerHeartbeat(
disks,
userResourceConsumption,
estimatedAppDiskUsage,
time);
time,
highWorkload);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public void handleWorkerHeartbeat(
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
Map<String, Long> estimatedAppDiskUsage,
long time,
boolean highWorkload,
String requestId) {
try {
ratisServer.submitRequest(
Expand All @@ -215,6 +216,7 @@ public void handleWorkerHeartbeat(
MetaUtil.toPbUserResourceConsumption(userResourceConsumption))
.putAllEstimatedAppDiskUsage(estimatedAppDiskUsage)
.setTime(time)
.setHighWorkload(highWorkload)
.build())
.build());
} catch (CelebornRuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques
estimatedAppDiskUsage.putAll(
request.getWorkerHeartbeatRequest().getEstimatedAppDiskUsageMap());
replicatePort = request.getWorkerHeartbeatRequest().getReplicatePort();
boolean highWorkload = request.getWorkerHeartbeatRequest().getHighWorkload();
LOG.debug(
"Handle worker heartbeat for {} {} {} {} {} {} {}",
host,
Expand All @@ -182,7 +183,8 @@ public ResourceResponse handleWriteRequest(ResourceProtos.ResourceRequest reques
diskInfos,
userResourceConsumption,
estimatedAppDiskUsage,
time);
time,
highWorkload);
break;

case RegisterWorker:
Expand Down
1 change: 1 addition & 0 deletions master/src/main/proto/Resource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ message WorkerHeartbeatRequest {
required int64 time = 7;
map<string, ResourceConsumption> userResourceConsumption = 8;
map<string, int64> estimatedAppDiskUsage = 9;
required bool highWorkload = 10;
}

message RegisterWorkerRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ private[celeborn] class Master(
userResourceConsumption,
activeShuffleKey,
estimatedAppDiskUsage,
highWorkload,
requestId) =>
logDebug(s"Received heartbeat from" +
s" worker $host:$rpcPort:$pushPort:$fetchPort:$replicatePort with $disks.")
Expand All @@ -350,6 +351,7 @@ private[celeborn] class Master(
userResourceConsumption,
activeShuffleKey,
estimatedAppDiskUsage,
highWorkload,
requestId))

case ReportWorkerUnavailable(failedWorkers: util.List[WorkerInfo], requestId: String) =>
Expand Down Expand Up @@ -432,6 +434,7 @@ private[celeborn] class Master(
userResourceConsumption: util.Map[UserIdentifier, ResourceConsumption],
activeShuffleKeys: util.Set[String],
estimatedAppDiskUsage: util.HashMap[String, java.lang.Long],
highWorkload: Boolean,
requestId: String): Unit = {
val targetWorker = new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort)
val registered = workersSnapShot.asScala.contains(targetWorker)
Expand All @@ -449,6 +452,7 @@ private[celeborn] class Master(
userResourceConsumption,
estimatedAppDiskUsage,
System.currentTimeMillis(),
highWorkload,
requestId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ public void testHandleWorkerHeartbeat() {
userResourceConsumption1,
new HashMap<>(),
1,
false,
getNewReqeustId());

Assert.assertEquals(statusSystem.excludedWorkers.size(), 1);
Expand All @@ -520,23 +521,40 @@ public void testHandleWorkerHeartbeat() {
userResourceConsumption2,
new HashMap<>(),
1,
false,
getNewReqeustId());

Assert.assertEquals(statusSystem.excludedWorkers.size(), 2);

statusSystem.handleWorkerHeartbeat(
HOSTNAME1,
RPCPORT1,
PUSHPORT1,
FETCHPORT1,
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
disks1,
userResourceConsumption1,
disks3,
userResourceConsumption3,
new HashMap<>(),
1,
false,
getNewReqeustId());

Assert.assertEquals(statusSystem.excludedWorkers.size(), 2);

statusSystem.handleWorkerHeartbeat(
HOSTNAME3,
RPCPORT3,
PUSHPORT3,
FETCHPORT3,
REPLICATEPORT3,
disks3,
userResourceConsumption3,
new HashMap<>(),
1,
true,
getNewReqeustId());

Assert.assertEquals(statusSystem.excludedWorkers.size(), 3);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ public void testHandleWorkerHeartbeat() throws InterruptedException {
userResourceConsumption1,
new HashMap<>(),
1,
false,
getNewReqeustId());
Thread.sleep(3000L);

Expand All @@ -752,6 +753,7 @@ public void testHandleWorkerHeartbeat() throws InterruptedException {
userResourceConsumption2,
new HashMap<>(),
1,
false,
getNewReqeustId());
Thread.sleep(3000L);

Expand All @@ -770,13 +772,32 @@ public void testHandleWorkerHeartbeat() throws InterruptedException {
userResourceConsumption1,
new HashMap<>(),
1,
false,
getNewReqeustId());
Thread.sleep(3000L);

Assert.assertEquals(1, statusSystem.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(1, STATUSSYSTEM3.excludedWorkers.size());

statusSystem.handleWorkerHeartbeat(
HOSTNAME1,
RPCPORT1,
PUSHPORT1,
FETCHPORT1,
REPLICATEPORT1,
disks1,
userResourceConsumption1,
new HashMap<>(),
1,
true,
getNewReqeustId());
Thread.sleep(3000L);
Assert.assertEquals(2, statusSystem.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM1.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM2.excludedWorkers.size());
Assert.assertEquals(2, STATUSSYSTEM3.excludedWorkers.size());
}

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,6 @@ public interface ReadBufferTargetChangeListener {
void onChange(long newMemoryTarget);
}

@VisibleForTesting
public enum ServingState {
NONE_PAUSED,
PUSH_AND_REPLICATE_PAUSED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ import org.apache.celeborn.common.util.{CelebornExitKind, JavaUtils, ShutdownHoo
// Can Remove this if celeborn don't support scala211 in future
import org.apache.celeborn.common.util.FunctionConverter._
import org.apache.celeborn.server.common.{HttpService, Service}
import org.apache.celeborn.service.deploy.worker.WorkerSource.ACTIVE_CONNECTION_COUNT
import org.apache.celeborn.service.deploy.worker.congestcontrol.CongestionController
import org.apache.celeborn.service.deploy.worker.memory.{ChannelsLimiter, MemoryManager}
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager.ServingState
import org.apache.celeborn.service.deploy.worker.storage.{PartitionFilesSorter, StorageManager}

private[celeborn] class Worker(
Expand Down Expand Up @@ -296,6 +298,16 @@ private[celeborn] class Worker(
memoryManager.getAllocatedReadBuffers
}

private def highWorkload: Boolean = {
(memoryManager.currentServingState, conf.workerActiveConnectionMax) match {
case (ServingState.PUSH_AND_REPLICATE_PAUSED, _) => true
case (ServingState.PUSH_PAUSED, _) => true
case (_, Some(activeConnectionMax)) =>
workerSource.getCounterCount(ACTIVE_CONNECTION_COUNT) >= activeConnectionMax
case _ => false
}
}

private def heartbeatToMaster(): Unit = {
val activeShuffleKeys = new JHashSet[String]()
val estimatedAppDiskUsage = new JHashMap[String, JLong]()
Expand All @@ -322,7 +334,8 @@ private[celeborn] class Worker(
diskInfos,
resourceConsumption,
activeShuffleKeys,
estimatedAppDiskUsage),
estimatedAppDiskUsage,
highWorkload),
classOf[HeartbeatFromWorkerResponse])
response.expiredShuffleKeys.asScala.foreach(shuffleKey => workerInfo.releaseSlots(shuffleKey))
cleanTaskQueue.put(response.expiredShuffleKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste
addTimer(TAKE_BUFFER_TIME)
addTimer(SORT_TIME)

def getCounterCount(metricsName: String): Long = {
val metricNameWithLabel = metricNameWithCustomizedLabels(metricsName, Map.empty)
namedCounters.get(metricNameWithLabel).counter.getCount
}
// start cleaner thread
startCleaner()
}
Expand Down

0 comments on commit 1d04a23

Please sign in to comment.