Skip to content

Commit

Permalink
[CELEBORN-1487][PHASE1] CongestionController support control traffic …
Browse files Browse the repository at this point in the history
…by user/worker traffic speed

### What changes were proposed in this pull request?
Introduce support control traffic by user/worker traffic speed.

### Why are the changes needed?
Currently, Celeborn only supports quota management based on disk file bytes/count, and this quota management cannot cope with sudden increases in traffic, which will cause corrupt to the cluster.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
UTs.

Closes #2797 from leixm/issue_1487_1.

Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
leixm authored and RexXiong committed Oct 12, 2024
1 parent f871c98 commit 1d44e5f
Show file tree
Hide file tree
Showing 9 changed files with 448 additions and 79 deletions.
73 changes: 63 additions & 10 deletions common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1269,10 +1269,19 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
// TODO related to `WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE`,
// `WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE`and `WORKER_DIRECT_MEMORY_RATIO_RESUME`,
// we'd better refine the logic among them
def workerCongestionControlLowWatermark: Option[Long] =
get(WORKER_CONGESTION_CONTROL_LOW_WATERMARK)
def workerCongestionControlHighWatermark: Option[Long] =
get(WORKER_CONGESTION_CONTROL_HIGH_WATERMARK)
def workerCongestionControlDiskBufferLowWatermark: Option[Long] =
get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK)
def workerCongestionControlDiskBufferHighWatermark: Option[Long] =
get(WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK)
def workerCongestionControlUserProduceSpeedLowWatermark: Option[Long] =
get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK)
def workerCongestionControlUserProduceSpeedHighWatermark: Option[Long] =
get(WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK)
def workerCongestionControlWorkerProduceSpeedLowWatermark: Option[Long] =
get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK)
def workerCongestionControlWorkerProduceSpeedHighWatermark: Option[Long] =
get(WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK)

def workerCongestionControlUserInactiveIntervalMs: Long =
get(WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL)
def workerCongestionControlCheckIntervalMs: Long = get(WORKER_CONGESTION_CONTROL_CHECK_INTERVAL)
Expand Down Expand Up @@ -1540,7 +1549,15 @@ object CelebornConf extends Logging {
DeprecatedConfig(
"celeborn.shuffle.forceFallback.enabled",
"0.5.0",
"Please use celeborn.client.spark.shuffle.fallback.policy"))
"Please use celeborn.client.spark.shuffle.fallback.policy"),
DeprecatedConfig(
"celeborn.worker.congestionControl.low.watermark",
"0.6.0",
"Please use celeborn.worker.congestionControl.diskBuffer.low.watermark"),
DeprecatedConfig(
"celeborn.worker.congestionControl.high.watermark",
"0.6.0",
"Please use celeborn.worker.congestionControl.diskBuffer.high.watermark"))

Map(configs.map { cfg => (cfg.key -> cfg) }: _*)
}
Expand Down Expand Up @@ -3775,27 +3792,63 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")

val WORKER_CONGESTION_CONTROL_LOW_WATERMARK: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.low.watermark")
val WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.diskBuffer.low.watermark")
.withAlternative("celeborn.worker.congestionControl.low.watermark")
.categories("worker")
.doc("Will stop congest users if the total pending bytes of disk buffer is lower than " +
"this configuration")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createOptional

val WORKER_CONGESTION_CONTROL_HIGH_WATERMARK: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.high.watermark")
val WORKER_CONGESTION_CONTROL_DISK_BUFFER_HIGH_WATERMARK: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.diskBuffer.high.watermark")
.withAlternative("celeborn.worker.congestionControl.high.watermark")
.categories("worker")
.doc("If the total bytes in disk buffer exceeds this configure, will start to congest" +
"users whose produce rate is higher than the potential average consume rate. " +
"The congestion will stop if the produce rate is lower or equal to the " +
"average consume rate, or the total pending bytes lower than " +
s"${WORKER_CONGESTION_CONTROL_LOW_WATERMARK.key}")
s"${WORKER_CONGESTION_CONTROL_DISK_BUFFER_LOW_WATERMARK.key}")
.version("0.3.0")
.bytesConf(ByteUnit.BYTE)
.createOptional

val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_LOW_WATERMARK: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.userProduceSpeed.low.watermark")
.categories("worker")
.doc("For those users that produce byte speeds less than this configuration, " +
"stop congestion for these users")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createOptional

val WORKER_CONGESTION_CONTROL_USER_PRODUCE_SPEED_HIGH_WATERMARK: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.userProduceSpeed.high.watermark")
.categories("worker")
.doc("For those users that produce byte speeds greater than this configuration, " +
"start congestion for these users")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createOptional

val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_LOW_WATERMARK: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.workerProduceSpeed.low.watermark")
.categories("worker")
.doc("Stop congestion If worker total produce speed less than this configuration")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createOptional

val WORKER_CONGESTION_CONTROL_WORKER_PRODUCE_SPEED_HIGH_WATERMARK: OptionalConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.workerProduceSpeed.high.watermark")
.categories("worker")
.doc("Start congestion If worker total produce speed greater than this configuration")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createOptional

val WORKER_CONGESTION_CONTROL_USER_INACTIVE_INTERVAL: ConfigEntry[Long] =
buildConf("celeborn.worker.congestionControl.user.inactive.interval")
.categories("worker")
Expand Down
8 changes: 6 additions & 2 deletions docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ license: |
| celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 | celeborn.worker.shuffle.commit.timeout |
| celeborn.worker.commitFiles.wait.threads | 32 | false | Thread number of worker to wait for commit shuffle data files to finish. | 0.5.0 | |
| celeborn.worker.congestionControl.check.interval | 10ms | false | Interval of worker checks congestion if celeborn.worker.congestionControl.enabled is true. | 0.3.2 | |
| celeborn.worker.congestionControl.diskBuffer.high.watermark | &lt;undefined&gt; | false | If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.diskBuffer.low.watermark | 0.3.0 | celeborn.worker.congestionControl.high.watermark |
| celeborn.worker.congestionControl.diskBuffer.low.watermark | &lt;undefined&gt; | false | Will stop congest users if the total pending bytes of disk buffer is lower than this configuration | 0.3.0 | celeborn.worker.congestionControl.low.watermark |
| celeborn.worker.congestionControl.enabled | false | false | Whether to enable congestion control or not. | 0.3.0 | |
| celeborn.worker.congestionControl.high.watermark | &lt;undefined&gt; | false | If the total bytes in disk buffer exceeds this configure, will start to congestusers whose produce rate is higher than the potential average consume rate. The congestion will stop if the produce rate is lower or equal to the average consume rate, or the total pending bytes lower than celeborn.worker.congestionControl.low.watermark | 0.3.0 | |
| celeborn.worker.congestionControl.low.watermark | &lt;undefined&gt; | false | Will stop congest users if the total pending bytes of disk buffer is lower than this configuration | 0.3.0 | |
| celeborn.worker.congestionControl.sample.time.window | 10s | false | The worker holds a time sliding list to calculate users' produce/consume rate | 0.3.0 | |
| celeborn.worker.congestionControl.user.inactive.interval | 10min | false | How long will consider this user is inactive if it doesn't send data | 0.3.0 | |
| celeborn.worker.congestionControl.userProduceSpeed.high.watermark | &lt;undefined&gt; | false | For those users that produce byte speeds greater than this configuration, start congestion for these users | 0.6.0 | |
| celeborn.worker.congestionControl.userProduceSpeed.low.watermark | &lt;undefined&gt; | false | For those users that produce byte speeds less than this configuration, stop congestion for these users | 0.6.0 | |
| celeborn.worker.congestionControl.workerProduceSpeed.high.watermark | &lt;undefined&gt; | false | Start congestion If worker total produce speed greater than this configuration | 0.6.0 | |
| celeborn.worker.congestionControl.workerProduceSpeed.low.watermark | &lt;undefined&gt; | false | Stop congestion If worker total produce speed less than this configuration | 0.6.0 | |
| celeborn.worker.decommission.checkInterval | 30s | false | The wait interval of checking whether all the shuffle expired during worker decommission | 0.4.0 | |
| celeborn.worker.decommission.forceExitTimeout | 6h | false | The wait time of waiting for all the shuffle expire during worker decommission. | 0.4.0 | |
| celeborn.worker.directMemoryRatioForMemoryFileStorage | 0.0 | false | Max ratio of direct memory to store shuffle data. This feature is experimental and disabled by default. | 0.5.0 | |
Expand Down
4 changes: 4 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ license: |
# Migration Guide

# Upgrading from 0.5 to 0.6
- Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.low.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.

- Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.high.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.high.watermark` instead.


- Since 0.6.0, Celeborn has introduced a new RESTful API namespace: /api/v1, which uses the application/json media type for requests and responses.
The `celeborn-openapi-client` SDK is also available to help users interact with the new RESTful APIs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,36 +40,57 @@ public class CongestionController {
private final WorkerSource workerSource;

private final int sampleTimeWindowSeconds;
private final long highWatermark;
private final long lowWatermark;
private final long diskBufferHighWatermark;
private final long diskBufferLowWatermark;
private final long userProduceSpeedHighWatermark;
private final long userProduceSpeedLowWatermark;
private final long workerProduceSpeedHighWatermark;
private final long workerProduceSpeedLowWatermark;
private final long userInactiveTimeMills;

private final AtomicBoolean overHighWatermark = new AtomicBoolean(false);

private final BufferStatusHub consumedBufferStatusHub;

private final BufferStatusHub producedBufferStatusHub;

private final ConcurrentHashMap<UserIdentifier, UserBufferInfo> userBufferStatuses;

private final ScheduledExecutorService removeUserExecutorService;

private final ScheduledExecutorService checkService;

private final ConcurrentHashMap<UserIdentifier, UserCongestionControlContext>
userCongestionContextMap;

protected CongestionController(
WorkerSource workerSource,
int sampleTimeWindowSeconds,
long highWatermark,
long lowWatermark,
long diskBufferHighWatermark,
long diskBufferLowWatermark,
long userProduceSpeedHighWatermark,
long userProduceSpeedLowWatermark,
long workerProduceSpeedHighWatermark,
long workerProduceSpeedLowWatermark,
long userInactiveTimeMills,
long checkIntervalTimeMills) {
assert (highWatermark > lowWatermark);
assert (diskBufferHighWatermark > diskBufferLowWatermark);
assert (userProduceSpeedHighWatermark > userProduceSpeedLowWatermark);
assert (workerProduceSpeedHighWatermark > workerProduceSpeedLowWatermark);

this.workerSource = workerSource;
this.sampleTimeWindowSeconds = sampleTimeWindowSeconds;
this.highWatermark = highWatermark;
this.lowWatermark = lowWatermark;
this.diskBufferHighWatermark = diskBufferHighWatermark;
this.diskBufferLowWatermark = diskBufferLowWatermark;
this.userProduceSpeedHighWatermark = userProduceSpeedHighWatermark;
this.userProduceSpeedLowWatermark = userProduceSpeedLowWatermark;
this.workerProduceSpeedHighWatermark = workerProduceSpeedHighWatermark;
this.workerProduceSpeedLowWatermark = workerProduceSpeedLowWatermark;
this.userInactiveTimeMills = userInactiveTimeMills;
this.consumedBufferStatusHub = new BufferStatusHub(sampleTimeWindowSeconds);
this.producedBufferStatusHub = new BufferStatusHub(sampleTimeWindowSeconds);
this.userBufferStatuses = JavaUtils.newConcurrentHashMap();
this.userCongestionContextMap = JavaUtils.newConcurrentHashMap();

this.removeUserExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
Expand All @@ -94,16 +115,24 @@ protected CongestionController(
public static synchronized CongestionController initialize(
WorkerSource workSource,
int sampleTimeWindowSeconds,
long highWatermark,
long lowWatermark,
long highWatermarkDiskBuffer,
long lowWatermarkDiskBuffer,
long highWatermarkUserProduceSpeed,
long lowWatermarkUserProduceSpeed,
long highWatermarkWorkerProduceSpeed,
long lowWatermarkWorkerProduceSpeed,
long userInactiveTimeMills,
long checkIntervalTimeMills) {
_INSTANCE =
new CongestionController(
workSource,
sampleTimeWindowSeconds,
highWatermark,
lowWatermark,
highWatermarkDiskBuffer,
lowWatermarkDiskBuffer,
highWatermarkUserProduceSpeed,
lowWatermarkUserProduceSpeed,
highWatermarkWorkerProduceSpeed,
lowWatermarkWorkerProduceSpeed,
userInactiveTimeMills,
checkIntervalTimeMills);
return _INSTANCE;
Expand All @@ -122,25 +151,42 @@ public static CongestionController instance() {
* <p>3. If the pending bytes doesn't exceed the high watermark, will allow all users to try to
* get max throughout capacity.
*/
public boolean isUserCongested(UserIdentifier userIdentifier) {
if (userBufferStatuses.size() == 0) {
public boolean isUserCongested(UserCongestionControlContext userCongestionControlContext) {
if (userBufferStatuses.isEmpty()) {
return false;
}

UserIdentifier userIdentifier = userCongestionControlContext.getUserIdentifier();
long userProduceSpeed = getUserProduceSpeed(userCongestionControlContext.getUserBufferInfo());
// If the user produce speed is higher that the avg consume speed, will congest it
if (overHighWatermark.get()) {
// If the user produce speed is higher that the avg consume speed, will congest it
long userProduceSpeed = getUserProduceSpeed(userBufferStatuses.get(userIdentifier));
long avgConsumeSpeed = getPotentialConsumeSpeed();
long avgConsumeSpeed = getPotentialProduceSpeed();
if (userProduceSpeed > avgConsumeSpeed) {
if (logger.isDebugEnabled()) {
logger.debug(
"The user {}, produceSpeed is {}, while consumeSpeed is {}, need to congest it.",
userIdentifier,
userProduceSpeed,
avgConsumeSpeed);
}
return true;
}
}

if (userProduceSpeed > userProduceSpeedHighWatermark) {
userCongestionControlContext.onCongestionControl();
if (logger.isDebugEnabled()) {
logger.debug(
"The user {}, produceSpeed is {}, while consumeSpeed is {}, need to congest it: {}",
"The user {}, produceSpeed is {}, while userProduceSpeedHighWatermark is {}, need to congest it.",
userIdentifier,
userProduceSpeed,
avgConsumeSpeed,
userProduceSpeed > avgConsumeSpeed);
userProduceSpeedHighWatermark);
}
return userProduceSpeed > avgConsumeSpeed;
} else if (userCongestionControlContext.inCongestionControl()
&& userProduceSpeed < userProduceSpeedLowWatermark) {
userCongestionControlContext.offCongestionControl();
}
return false;
return userCongestionControlContext.inCongestionControl();
}

public UserBufferInfo getUserBuffer(UserIdentifier userIdentifier) {
Expand Down Expand Up @@ -184,6 +230,14 @@ public long getPotentialConsumeSpeed() {
return consumedBufferStatusHub.avgBytesPerSec() / userBufferStatuses.size();
}

public long getPotentialProduceSpeed() {
if (userBufferStatuses.size() == 0) {
return 0;
}

return producedBufferStatusHub.avgBytesPerSec() / userBufferStatuses.size();
}

/** Get the avg user produce speed, the unit is bytes/sec. */
private long getUserProduceSpeed(UserBufferInfo userBufferInfo) {
if (userBufferInfo != null) {
Expand All @@ -205,6 +259,7 @@ private void removeInactiveUsers() {
UserBufferInfo userBufferInfo = next.getValue();
if (currentTimeMillis - userBufferInfo.getTimestamp() >= userInactiveTimeMills) {
userBufferStatuses.remove(userIdentifier);
userCongestionContextMap.remove(userIdentifier);
workerSource.removeGauge(WorkerSource.USER_PRODUCE_SPEED(), userIdentifier.toMap());
logger.info("User {} has been expired, remove from rate limit list", userIdentifier);
}
Expand All @@ -217,13 +272,19 @@ private void removeInactiveUsers() {
protected void checkCongestion() {
try {
long pendingConsume = getTotalPendingBytes();
if (pendingConsume < lowWatermark) {
long workerProduceSpeed = producedBufferStatusHub.avgBytesPerSec();
if (pendingConsume < diskBufferLowWatermark
&& workerProduceSpeed < workerProduceSpeedLowWatermark) {
if (overHighWatermark.compareAndSet(true, false)) {
logger.info("Pending consume is lower than low watermark, exit congestion control");
logger.info(
"Pending consume and produce speed is lower than low watermark, exit congestion control");
}
return;
} else if (pendingConsume > highWatermark && overHighWatermark.compareAndSet(false, true)) {
logger.info("Pending consume is higher than high watermark, need congestion control");
} else if ((pendingConsume > diskBufferHighWatermark
|| workerProduceSpeed > workerProduceSpeedHighWatermark)
&& overHighWatermark.compareAndSet(false, true)) {
logger.info(
"Pending consume or produce speed is higher than high watermark, need congestion control");
}
if (overHighWatermark.get()) {
trimMemoryUsage();
Expand All @@ -239,6 +300,7 @@ public void close() {
this.checkService.shutdownNow();
this.userBufferStatuses.clear();
this.consumedBufferStatusHub.clear();
this.producedBufferStatusHub.clear();
}

public static synchronized void destroy() {
Expand All @@ -247,4 +309,27 @@ public static synchronized void destroy() {
_INSTANCE = null;
}
}

public BufferStatusHub getProducedBufferStatusHub() {
return producedBufferStatusHub;
}

public UserCongestionControlContext getUserCongestionContext(UserIdentifier userIdentifier) {
return userCongestionContextMap.computeIfAbsent(
userIdentifier,
user -> {
UserBufferInfo userBufferInfo = getUserBuffer(userIdentifier);
return new UserCongestionControlContext(
producedBufferStatusHub, userBufferInfo, workerSource, userIdentifier);
});
}

public ConcurrentHashMap<UserIdentifier, UserCongestionControlContext>
getUserCongestionContextMap() {
return userCongestionContextMap;
}

public BufferStatusHub getConsumedBufferStatusHub() {
return consumedBufferStatusHub;
}
}
Loading

0 comments on commit 1d44e5f

Please sign in to comment.