From fa79b263a0069408e9cdabb1145c023017f54246 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 24 Jul 2023 15:31:57 +0800 Subject: [PATCH 1/5] [CELEBORN-827] Eliminate unnecessary chunksBeingTransferred calculation ### What changes were proposed in this pull request? Eliminate `chunksBeingTransferred` calculation when `celeborn.shuffle.io.maxChunksBeingTransferred` is not configured ### Why are the changes needed? I observed high CPU usage on `ChunkStreamManager#chunksBeingTransferred` calculation. We can eliminate the method call if no threshold is configured, and investigate how to improve the method itself in the future. image ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? CI and Review. Closes #1749 from pan3793/CELEBORN-827. Authored-by: Cheng Pan Signed-off-by: Cheng Pan --- .../apache/celeborn/common/CelebornConf.scala | 6 +- docs/configuration/network.md | 2 +- .../service/deploy/worker/FetchHandler.scala | 87 ++++++++++--------- 3 files changed, 51 insertions(+), 44 deletions(-) diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index a3aaf228104..5480c7de054 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -466,7 +466,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def networkAllocatorVerboseMetric: Boolean = get(NETWORK_MEMORY_ALLOCATOR_VERBOSE_METRIC) - def shuffleIoMaxChunksBeingTransferred: Long = { + def shuffleIoMaxChunksBeingTransferred: Option[Long] = { get(MAX_CHUNKS_BEING_TRANSFERRED) } @@ -1418,7 +1418,7 @@ object CelebornConf extends Logging { .bytesConf(ByteUnit.BYTE) .createWithDefaultString("2m") - val MAX_CHUNKS_BEING_TRANSFERRED: ConfigEntry[Long] = + val MAX_CHUNKS_BEING_TRANSFERRED: OptionalConfigEntry[Long] = buildConf("celeborn.shuffle.io.maxChunksBeingTransferred") .categories("network") .doc("The max number of chunks allowed to be transferred at the same time on shuffle service. Note " + @@ -1427,7 +1427,7 @@ object CelebornConf extends Logging { "`celeborn..io.retryWait`), if those limits are reached the task will fail with fetch failure.") .version("0.2.0") .longConf - .createWithDefault(Long.MaxValue) + .createOptional val PUSH_TIMEOUT_CHECK_INTERVAL: ConfigEntry[Long] = buildConf("celeborn..push.timeoutCheck.interval") diff --git a/docs/configuration/network.md b/docs/configuration/network.md index 52b39b483ea..e1001ee9b9a 100644 --- a/docs/configuration/network.md +++ b/docs/configuration/network.md @@ -49,5 +49,5 @@ license: | | celeborn.rpc.dispatcher.threads | <undefined> | Threads number of message dispatcher event loop | 0.3.0 | | celeborn.rpc.io.threads | <undefined> | Netty IO thread number of NettyRpcEnv to handle RPC request. The default threads number is the number of runtime available processors. | 0.2.0 | | celeborn.rpc.lookupTimeout | 30s | Timeout for RPC lookup operations. | 0.2.0 | -| celeborn.shuffle.io.maxChunksBeingTransferred | 9223372036854775807 | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn..io.maxRetries` and `celeborn..io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 | +| celeborn.shuffle.io.maxChunksBeingTransferred | <undefined> | The max number of chunks allowed to be transferred at the same time on shuffle service. Note that new incoming connections will be closed when the max number is hit. The client will retry according to the shuffle retry configs (see `celeborn..io.maxRetries` and `celeborn..io.retryWait`), if those limits are reached the task will fail with fetch failure. | 0.2.0 | diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 950b5208ae7..9b4d08f8c25 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -27,6 +27,7 @@ import com.google.common.base.Throwables import io.netty.util.concurrent.{Future, GenericFutureListener} import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.CelebornConf.MAX_CHUNKS_BEING_TRANSFERRED import org.apache.celeborn.common.internal.Logging import org.apache.celeborn.common.meta.{FileInfo, FileManagedBuffers} import org.apache.celeborn.common.network.buffer.NioManagedBuffer @@ -36,12 +37,15 @@ import org.apache.celeborn.common.network.protocol.Message.Type import org.apache.celeborn.common.network.server.BaseMessageHandler import org.apache.celeborn.common.network.util.{NettyUtils, TransportConf} import org.apache.celeborn.common.protocol.PartitionType -import org.apache.celeborn.common.util.ExceptionUtils +import org.apache.celeborn.common.util.{ExceptionUtils, Utils} import org.apache.celeborn.service.deploy.worker.storage.{ChunkStreamManager, CreditStreamManager, PartitionFilesSorter, StorageManager} class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf) extends BaseMessageHandler with Logging { - var chunkStreamManager = new ChunkStreamManager() + + val chunkStreamManager = new ChunkStreamManager() + val maxChunkBeingTransferred: Option[Long] = conf.shuffleIoMaxChunksBeingTransferred + val creditStreamManager = new CreditStreamManager( conf.partitionReadBuffersMin, conf.partitionReadBuffersMax, @@ -214,46 +218,49 @@ class FetchHandler(val conf: CelebornConf, val transportConf: TransportConf) logTrace(s"Received req from ${NettyUtils.getRemoteAddress(client.getChannel)}" + s" to fetch block ${req.streamChunkSlice}") - val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred - if (chunksBeingTransferred > conf.shuffleIoMaxChunksBeingTransferred) { - val message = "Worker is too busy. The number of chunks being transferred " + - s"$chunksBeingTransferred exceeds celeborn.shuffle.maxChunksBeingTransferred " + - s"${conf.shuffleIoMaxChunksBeingTransferred}." - logError(message) - client.getChannel.writeAndFlush(new ChunkFetchFailure(req.streamChunkSlice, message)) - } else { - workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString) - val fetchTimeMetric = chunkStreamManager.getFetchTimeMetric(req.streamChunkSlice.streamId) - val fetchBeginTime = System.nanoTime() - try { - val buf = chunkStreamManager.getChunk( - req.streamChunkSlice.streamId, - req.streamChunkSlice.chunkIndex, - req.streamChunkSlice.offset, - req.streamChunkSlice.len) - chunkStreamManager.chunkBeingSent(req.streamChunkSlice.streamId) - client.getChannel.writeAndFlush(new ChunkFetchSuccess(req.streamChunkSlice, buf)) - .addListener(new GenericFutureListener[Future[_ >: Void]] { - override def operationComplete(future: Future[_ >: Void]): Unit = { - chunkStreamManager.chunkSent(req.streamChunkSlice.streamId) - if (fetchTimeMetric != null) { - fetchTimeMetric.update(System.nanoTime() - fetchBeginTime) - } - workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString) - } - }) - } catch { - case e: Exception => - logError( - s"Error opening block ${req.streamChunkSlice} for request from " + - NettyUtils.getRemoteAddress(client.getChannel), - e) - client.getChannel.writeAndFlush(new ChunkFetchFailure( - req.streamChunkSlice, - Throwables.getStackTraceAsString(e))) - workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString) + maxChunkBeingTransferred.foreach { threshold => + val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred // take high cpu usage + if (chunksBeingTransferred > threshold) { + val message = "Worker is too busy. The number of chunks being transferred " + + s"$chunksBeingTransferred exceeds ${MAX_CHUNKS_BEING_TRANSFERRED.key} " + + s"${Utils.bytesToString(threshold)}." + logError(message) + client.getChannel.writeAndFlush(new ChunkFetchFailure(req.streamChunkSlice, message)) + return } } + + workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString) + val fetchTimeMetric = chunkStreamManager.getFetchTimeMetric(req.streamChunkSlice.streamId) + val fetchBeginTime = System.nanoTime() + try { + val buf = chunkStreamManager.getChunk( + req.streamChunkSlice.streamId, + req.streamChunkSlice.chunkIndex, + req.streamChunkSlice.offset, + req.streamChunkSlice.len) + chunkStreamManager.chunkBeingSent(req.streamChunkSlice.streamId) + client.getChannel.writeAndFlush(new ChunkFetchSuccess(req.streamChunkSlice, buf)) + .addListener(new GenericFutureListener[Future[_ >: Void]] { + override def operationComplete(future: Future[_ >: Void]): Unit = { + chunkStreamManager.chunkSent(req.streamChunkSlice.streamId) + if (fetchTimeMetric != null) { + fetchTimeMetric.update(System.nanoTime() - fetchBeginTime) + } + workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString) + } + }) + } catch { + case e: Exception => + logError( + s"Error opening block ${req.streamChunkSlice} for request from " + + NettyUtils.getRemoteAddress(client.getChannel), + e) + client.getChannel.writeAndFlush(new ChunkFetchFailure( + req.streamChunkSlice, + Throwables.getStackTraceAsString(e))) + workerSource.stopTimer(WorkerSource.FETCH_CHUNK_TIME, req.toString) + } } override def checkRegistered: Boolean = registered.get From 00c36fda999660bd40c295d3f15b1a64a7a43535 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Mon, 24 Jul 2023 15:37:32 +0800 Subject: [PATCH 2/5] [CELEBORN-828] Merge Monitoring to Development doc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? As title 截屏2023-07-24 上午11 34 43 ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1751 from AngersZhuuuu/CELEBORN-828. Authored-by: Angerszhuuuu Signed-off-by: zky.zhoukeyong --- docs/monitoring.md | 3 --- docs/{upgrade.md => upgrading.md} | 2 +- mkdocs.yml | 4 ++-- 3 files changed, 3 insertions(+), 6 deletions(-) rename docs/{upgrade.md => upgrading.md} (99%) diff --git a/docs/monitoring.md b/docs/monitoring.md index d38fb9945b2..b84d2ec8b07 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1,7 +1,4 @@ --- -hide: - - navigation - license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with diff --git a/docs/upgrade.md b/docs/upgrading.md similarity index 99% rename from docs/upgrade.md rename to docs/upgrading.md index edec9af087f..7504476c21f 100644 --- a/docs/upgrade.md +++ b/docs/upgrading.md @@ -15,7 +15,7 @@ license: | --- -Upgrade +Upgrading === ## Rolling upgrade diff --git a/mkdocs.yml b/mkdocs.yml index 007cecb9cce..b39e13cea41 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -76,10 +76,10 @@ nav: - Deployment: - Overview: deploy.md - Kubernetes: deploy_on_k8s.md - - Upgrade: upgrade.md + - Monitoring: monitoring.md + - Upgrading: upgrading.md - Ratis Shell: celeborn_ratis_shell.md - Configuration: configuration/index.md - - Monitoring: monitoring.md - Migration Guide: migration.md - Developers Doc: - Overview: developers/overview.md From 070d8bc0f8572cfa4c8b16e6a47e705eb9c756f4 Mon Sep 17 00:00:00 2001 From: "zky.zhoukeyong" Date: Mon, 24 Jul 2023 16:12:42 +0800 Subject: [PATCH 3/5] [CELEBORN-826][DOC] Add storage document ### What changes were proposed in this pull request? As title. ### Why are the changes needed? As title. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? No. Closes #1752 from waitinfuture/826. Authored-by: zky.zhoukeyong Signed-off-by: zky.zhoukeyong --- docs/assets/img/mappartition.svg | 4 + docs/assets/img/multilayer.svg | 4 + docs/assets/img/reducepartition.svg | 4 + docs/developers/pushdata.md | 1 + docs/developers/storage.md | 128 ++++++++++++++++++++++++++++ docs/developers/worker.md | 31 +++++++ mkdocs.yml | 12 +-- 7 files changed, 179 insertions(+), 5 deletions(-) create mode 100644 docs/assets/img/mappartition.svg create mode 100644 docs/assets/img/multilayer.svg create mode 100644 docs/assets/img/reducepartition.svg create mode 100644 docs/developers/storage.md diff --git a/docs/assets/img/mappartition.svg b/docs/assets/img/mappartition.svg new file mode 100644 index 00000000000..e70da9c49e3 --- /dev/null +++ b/docs/assets/img/mappartition.svg @@ -0,0 +1,4 @@ + + + +
P1
P1
P2
P2
...
...
Pn
Pn
P1
P1
P2
P2
...
...
Pn
Pn
...
...
Region1
Region1
Region2
Region2
Region1
Index
Region1...
Region2
Index
Region2...
Text is not SVG - cannot display
\ No newline at end of file diff --git a/docs/assets/img/multilayer.svg b/docs/assets/img/multilayer.svg new file mode 100644 index 00000000000..af2c086714d --- /dev/null +++ b/docs/assets/img/multilayer.svg @@ -0,0 +1,4 @@ + + + +
Mapper1
Mapper1
P1
P1
P2
P2
P4
P4
Push Region
Push Region
P2
P2
P4
P4
P5
P5
Cache Region
Cache Region
Memory
Memory
Local Disk
Local Disk
P5
P5
P6
P6
DFS/OSS
DFS/OSS
P6
P6
P1
P1
2
2
6
6
5
5
7
7
1
1
3
3
P7
P7
P7
P7
4
4
Viewer does not support full SVG 1.1
\ No newline at end of file diff --git a/docs/assets/img/reducepartition.svg b/docs/assets/img/reducepartition.svg new file mode 100644 index 00000000000..c9dcf13c4ce --- /dev/null +++ b/docs/assets/img/reducepartition.svg @@ -0,0 +1,4 @@ + + + +
Chunk1
Chunk1
Chunk2
Chunk2
...
...
ChunkN
ChunkN
Data File
Data File
Index
Index
Text is not SVG - cannot display
\ No newline at end of file diff --git a/docs/developers/pushdata.md b/docs/developers/pushdata.md index 07bdae1c308..694dbe361af 100644 --- a/docs/developers/pushdata.md +++ b/docs/developers/pushdata.md @@ -100,6 +100,7 @@ For the first three cases, `Worker` informs `ShuffleClient` that it should trigg `ShuffleClient` triggers split itself. There are two kinds of Split: + - `HARD_SPLIT`, meaning old `PartitionLocation` epoch refuses to accept any data, and future data of the `PartitionLocation` will only be pushed after new `PartitionLocation` epoch is ready - `SOFT_SPLIT`, meaning old `PartitionLocation` epoch continues to accept data, when new epoch is ready, `ShuffleClient` diff --git a/docs/developers/storage.md b/docs/developers/storage.md new file mode 100644 index 00000000000..61adc34f220 --- /dev/null +++ b/docs/developers/storage.md @@ -0,0 +1,128 @@ +--- +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +# Storage +This article describes the detailed design of Celeborn `Worker`'s storage management. + +## `PartitionLocation` Physical Storage +Logically, `PartitionLocation` contains all data with the same partition id. Physically, Celeborn stores +`PartitionLocation` in multiple files, each file corresponds to one `PartitionLocation` object with a unique epoch +for the partition. All `PartitionLocation`s with the same partition id but different epochs aggregate to the complete +data for the partition. The file can be in memory, local disks, or DFS/OSS, see `Multi-layered Storage` below. + +A `PartitionLocation` file can be read only after it is committed, trigger by `CommitFiles` RPC. + +## File Layout +Celeborn supports two kinds of partitions: + +- `ReducePartition`, where each `PartitionLocation` file stores a portion of data with the same partition id, + currently used for Apache Spark. +- `MapPartition`, where each `PartitionLocation` file stores a portion of data from the same map id, currently + used for Apache Flink. + +#### ReducePartition +The layout of `ReducePartition` is as follows: + +![ReducePartition](../../assets/img/reducepartition.svg) + +`ReducePartition` data file consists of several chunks (defaults to 8 MiB). Each data file has an in-memory index +which points to start positions of each chunk. Upon requesting data from some partition, `Worker` first returns the +index, then sequentially reads and returns a chunk upon each `ChunkFetchRequest`, which is very efficient. + +#### MapPartition +The layout of `MapPartition` is as follows: + +![MapPartition](../../assets/img/mappartition.svg) + +`MapPartition` data file consists of several regions (defaults to 64MiB), each region is sorted by partition id. +Each region has an in-memory index which points to start positions of each partition. Upon requesting data from +some partition, `Worker` reads the partition data from every region. + +For more details about reading data, please refer to [ReadData](../../developers/readdata). + +## Local Disk and Memory Buffer +To the time this article is written, the most common case is local disk only. Users specify directories and +capacity that Celeborn can use to store data. It is recommended to specify one directory per disk. If users +specify more directories on one disk, Celeborn will try to figure it out and manage in the disk-level +granularity. + +`Worker` periodically checks disk health, isolates unhealthy or spaceless disks, and reports to `Master` +through heartbeat. + +Upon receiving `ReserveSlots`, `Worker` will first try to create a `FileWriter` on the hinted disk. If that disk is +unavailable, `Worker` will choose a healthy one. + +Upon receiving `PushData` or `PushMergedData`, `Worker` unpacks the data (for `PushMergedData`) and logically appends +to the buffered data for each `PartitionLocation` (no physical memory copy). If the buffer exceeds the threshold +(defaults to 256KiB), data will be flushed to the file asynchronously. + +If data replication is turned on, `Worker` will send the data to replica asynchronously. Only after `Worker` +receives ACK from replica will it return ACK to `ShuffleClient`. Notice that it's not required that data is flushed +to file before sending ACK. + +Upon receiving `CommitFiles`, `Worker` will flush all buffered data for `PartitionLocation`s specified in +the RPC and close files, then responds the succeeded and failed `PartitionLocation` lists. + +## Trigger Split +Upon receiving `PushData` (note: currently receiving `PushMergedData` does not trigger Split, it's future work), +`Worker` will check whether disk usage exceeds disk reservation (defaults to 5GiB). If so, `Worker` will respond +Split to `ShuffleClient`. + +Celeborn supports two configurable kinds of split: + +- `HARD_SPLIT`, meaning old `PartitionLocation` epoch refuses to accept any data, and future data of the + `PartitionLocation` will only be pushed after new `PartitionLocation` epoch is ready +- `SOFT_SPLIT`, meaning old `PartitionLocation` epoch continues to accept data, when new epoch is ready, `ShuffleClient` + switches to the new location transparently + +The detailed design of split can be found [Here](../../developers/pushdata#split). + +## Self Check +In additional to health and space check on each disk, `Worker` also collects perf statistics to feed Master for +better [slots allocation](../../developers/slotsallocation): + +- Average flush time of the last time window +- Average fetch time of the last time window + +## Multi-layered Storage +Celeborn aims to store data in multiple layers, i.e. memory, local disks and distributed file systems(or object store +like S3, OSS). To the time this article is written, Celeborn supports local disks and HDFS. + +The principles of data placement are: + +- Try to cache small data in memory +- Always prefer faster storage +- Trade off between faster storage's space and cost of data movement + +The high-level design of multi-layered storage is: + +![storage](../../assets/img/multilayer.svg) + +`Worker`'s memory is divided into two logical regions: `Push Region` and `Cache Region`. `ShuffleClient` pushes data +into `Push Region`, as ① indicates. Whenever the buffered data in `PushRegion` for a `PartitionLocation` exceeds the +threshold (defaults to 256KiB), `Worker` flushes it to some storage layer. The policy of data movement is as follows: + +- If the `PartitionLocation` is not in `Cache Region` and `Cache Region` has enough space, logically move the data + to `Cache Region`. Notice this just counts the data in `Cache Region` and does not physically do memory copy. As ② + indicates. +- If the `PartitionLocation` is in `Cache Region`, logically append the current data, as ③ indicates. +- If the `PartitionLocation` is not in `Cache Region` and `Cache Region` does not have enough memory, + flush the data into local disk, as ④ indicates. +- If the `PartitionLocation` is not in `Cache Region` and both `Cache Region` and local disk do not have enough memory, + flush the data into DFS/OSS, as ⑤ indicates. +- If the `Cache Region` exceeds the threshold, choose the largest `PartitionLocation` and flush it to local disk, as ⑥ + indicates. +- Optionally, if local disk does not have enough memory, choose a `PartitionLocation` split and evict to HDFS/OSS. \ No newline at end of file diff --git a/docs/developers/worker.md b/docs/developers/worker.md index e69de29bb2d..05028069d85 100644 --- a/docs/developers/worker.md +++ b/docs/developers/worker.md @@ -0,0 +1,31 @@ +--- +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +# Worker +The main functions of Celeborn `Worker` are: + +- Store, serve, and manage `PartitionLocation` data. See [Storage](../../developers/storage) +- Traffic control through `Back Pressure` and `Congestion Control` +- Support rolling upgrade through `Graceful Shutdown` +- Support elasticity through `Decommission Shutdown` +- Self health check + +Celeborn `Worker` has four dedicated servers: + +- `Controller` handles control messages, i.e. `ReserveSlots`, `CommitFiles`, and `DestroyWorkerSlots` +- `Push Server` handles primary input data, i.e. `PushData` and `PushMergedData`, and push related control messages +- `Replicate Server` handles replica input data, it has the same logic with `Push Server` +- `Fetch Server` handles fetch requests, i.e. `ChunkFetchRequest`, and fetch related control messages \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index b39e13cea41..f684d708a0f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -83,9 +83,11 @@ nav: - Migration Guide: migration.md - Developers Doc: - Overview: developers/overview.md - - Master: developers/master.md - - Worker: developers/worker.md - - Client: developers/client.md +# - Master: developers/master.md + - Worker: + - Overview: developers/worker.md + - Storage: developers/storage.md +# - Client: developers/client.md - PushData: developers/pushdata.md - - ReadData: developers/readdata.md - - Slots Allocation: developers/slotsallocation.md +# - ReadData: developers/readdata.md +# - Slots Allocation: developers/slotsallocation.md From b8cdf36b40f77f1cd697e60024e6fdb8b432b9e5 Mon Sep 17 00:00:00 2001 From: "zky.zhoukeyong" Date: Mon, 24 Jul 2023 19:51:02 +0800 Subject: [PATCH 4/5] [CELEBORN-831][DOC] Add traffic control document ### What changes were proposed in this pull request? As title. ### Why are the changes needed? As title. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual test. Closes #1754 from waitinfuture/831. Authored-by: zky.zhoukeyong Signed-off-by: zky.zhoukeyong --- docs/assets/img/backpressure.svg | 4 ++ docs/developers/trafficcontrol.md | 80 +++++++++++++++++++++++++++++++ docs/developers/worker.md | 3 +- mkdocs.yml | 1 + 4 files changed, 87 insertions(+), 1 deletion(-) create mode 100644 docs/assets/img/backpressure.svg create mode 100644 docs/developers/trafficcontrol.md diff --git a/docs/assets/img/backpressure.svg b/docs/assets/img/backpressure.svg new file mode 100644 index 00000000000..67596a081eb --- /dev/null +++ b/docs/assets/img/backpressure.svg @@ -0,0 +1,4 @@ + + + +
Check
Memory
Check...
Y
Y
N
N
> 0.85?
> 0.85?
Y
Y
N
N
> 0.95?
> 0.95?
Pause Receive &&
Pause Replicate &&
Force Flush
Pause Receive &&...
Y
Y
N
N
Under
presure?
Under...
N
N
Y
Y
< 0.5?
< 0.5?
Pause Receive &&
Force Flush
Pause Receive &&...
Resume
Resume
Text is not SVG - cannot display
\ No newline at end of file diff --git a/docs/developers/trafficcontrol.md b/docs/developers/trafficcontrol.md new file mode 100644 index 00000000000..d824fee5ae9 --- /dev/null +++ b/docs/developers/trafficcontrol.md @@ -0,0 +1,80 @@ +--- +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +# Traffic Control +This article describes the detailed design of Celeborn `Worker`'s traffic control. + +## Design Goal +The design goal of Traffic Control is to prevent `Worker` OOM without harming performance. At the +same time, Celeborn tries to achieve fairness without harming performance. + +Celeborn reaches the goal through `Back Pressure` and `Congestion Control`. + +## Data Flow +From the `Worker`'s perspective, the income data flow comes from two sources: + +- `ShuffleClient` that pushes primary data to the primary `Worker` +- Primary `Worker` that sends data replication to the replica `Worker` + +The buffered memory can be released when the following conditions are satisfied: + +- Data is flushed to file +- If replication is on, after primary data is written to wire + +The basic idea is that, when `Worker` is under high memory pressure, slow down or stop income data, and at same +time force flush to release memory. + +## Back Pressure +`Back Pressure` defines three watermarks: + +- `Pause Receive` watermark (defaults to 0.85). If used direct memory ratio exceeds this, `Worker` will pause + receiving data from `ShuffleClient`, and force flush buffered data into file. +- `Pause Replicate` watermark (defaults to 0.95). If used direct memory ratio exceeds this, `Worker` will pause + receiving both data from `ShuffleClient` and replica data from primary `Worker`, and force flush buffered + data into file. +- `Resume` watermark (defaults to 0.5). When either `Pause Receive` or `Pause Replicate` is triggered, to resume + receiving data from `ShuffleClient`, the used direct memory ratio should decrease under this watermark. + +`Worker` high-frequently checks used direct memory ratio, and triggers `Pause Receive`, `Pause Replicate` and `Resume` +accordingly. The state machine is as follows: + +![backpressure](../../assets/img/backpressure.svg) + +`Back Pressure` is the basic traffic control and can't be disabled. Users can tune the three watermarks through the +following configuration. + +- `celeborn.worker.directMemoryRatio*` + +## Congestion Control +`Congestion Control` is an optional mechanism for traffic control, the purpose is to slow down the push rate +from `ShuffleClient` when memory is under pressure, and suppress those who occupied the most resources in the +last time window. It defines two watermarks: + +- `Low Watermark`, under which everything goes OK +- `High Watermark`, when exceeds this, top users will be Congestion Controlled + +Celeborn uses `UserIdentifier` to identify users. `Worker` collects bytes pushed from each user in the last time +window. When used direct memory exceeds `High Watermark`, users who occupied more resources than the average +occupation will receive `Congestion Control` message. + +`ShuffleClient` controls the push ratio in a fashion that is very like `TCP Congestion Control`. Initially, it's in +`Slow Start` phase, with a low push rate but increases very fast. When threshold is reached, it transfers to +`Congestion Avoidance` phase, which slowly increases push rate. Upon receiving `Congestion Control`, it goes back +to `Slow Start` phase. + +`Congestion Control` can be enabled and tuned by the following configurations: + +- `celeborn.worker.congestionControl.*` \ No newline at end of file diff --git a/docs/developers/worker.md b/docs/developers/worker.md index 05028069d85..154e0920522 100644 --- a/docs/developers/worker.md +++ b/docs/developers/worker.md @@ -18,7 +18,8 @@ license: | The main functions of Celeborn `Worker` are: - Store, serve, and manage `PartitionLocation` data. See [Storage](../../developers/storage) -- Traffic control through `Back Pressure` and `Congestion Control` +- Traffic control through `Back Pressure` and `Congestion Control`. See + [Traffic Control](../../developers/trafficcontrol) - Support rolling upgrade through `Graceful Shutdown` - Support elasticity through `Decommission Shutdown` - Self health check diff --git a/mkdocs.yml b/mkdocs.yml index f684d708a0f..da132deeff0 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -87,6 +87,7 @@ nav: - Worker: - Overview: developers/worker.md - Storage: developers/storage.md + - Traffic Control: developers/trafficcontrol.md # - Client: developers/client.md - PushData: developers/pushdata.md # - ReadData: developers/readdata.md From 67c18e6607efececd2757c80689c9451528a4812 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Mon, 24 Jul 2023 20:20:22 +0800 Subject: [PATCH 5/5] [CELEBORN-656][FOLLOWUP] Fix wrong message call when revive return STAGE_END ### What changes were proposed in this pull request? As title ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? Closes #1755 from AngersZhuuuu/CELEBORN-656-FOLLOWUP. Authored-by: Angerszhuuuu Signed-off-by: zky.zhoukeyong --- .../java/org/apache/celeborn/client/ShuffleClientImpl.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java index 19e03c63fd6..202eb62d3ad 100644 --- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java +++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java @@ -729,7 +729,7 @@ Map reviveBatch( pushExcludedWorkers.remove(loc.getPeer().hostAndPushPort()); } } else if (StatusCode.STAGE_ENDED.getValue() == statusCode) { - stageEnded(shuffleId); + stageEndShuffleSet.add(shuffleId); return results; } else if (StatusCode.SHUFFLE_NOT_REGISTERED.getValue() == statusCode) { logger.error("SHUFFLE_NOT_REGISTERED!"); @@ -1650,10 +1650,10 @@ public void setupLifecycleManagerRef(RpcEndpointRef endpointRef) { boolean mapperEnded(int shuffleId, int mapId) { return (mapperEndMap.containsKey(shuffleId) && mapperEndMap.get(shuffleId).contains(mapId)) - || stageEnded(shuffleId); + || isStageEnded(shuffleId); } - protected boolean stageEnded(int shuffleId) { + protected boolean isStageEnded(int shuffleId) { return stageEndShuffleSet.contains(shuffleId); }