Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zwangsheng committed Aug 10, 2023
1 parent 05e5a76 commit ad099d9
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2450,7 +2450,7 @@ object CelebornConf extends Logging {
val WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE: ConfigEntry[Double] =
buildConf("celeborn.worker.directMemoryRatioToPauseReplicate")
.categories("worker")
.doc("If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers." +
.doc("If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. " +
s"This value should be higher than ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}.")
.version("0.2.0")
.doubleConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,11 @@ private MemoryManager(CelebornConf conf) {
Preconditions.checkArgument(
pauseReplicateRatio > pausePushDataRatio,
String.format(
"Invalid config, {} should be greater than {}",
"Invalid config, %s(%s) should be greater than %s(%s)",
CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE().key(),
CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key()));
pauseReplicateRatio,
CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(),
pausePushDataRatio));
Preconditions.checkArgument(pausePushDataRatio > resumeRatio);
Preconditions.checkArgument(resumeRatio > (readBufferRatio + shuffleStorageRatio));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package org.apache.celeborn.service.deploy.memory

import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite

import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.CelebornConf.{WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE, WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE}
import org.apache.celeborn.service.deploy.worker.memory.MemoryManager
class MemoryManagerSuite extends AnyFunSuite with BeforeAndAfterEach {

// reset the memory manager before each test
override protected def beforeEach(): Unit = {
super.beforeEach()
MemoryManager.reset()
}

test("Init MemoryManager with invalid configuration") {
val conf = new CelebornConf().set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE, 0.95)
.set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE, 0.85)
val caught =
intercept[IllegalArgumentException] {
MemoryManager.initialize(conf);
}
assert(
caught.getMessage == s"Invalid config, ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE.key}(${0.85}) " +
s"should be greater than ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}(${0.95})")
}
}

0 comments on commit ad099d9

Please sign in to comment.