From ad099d99f92dfa1cf788449c381c443c363709fa Mon Sep 17 00:00:00 2001 From: zwangsheng <2213335496@qq.com> Date: Thu, 10 Aug 2023 12:02:38 +0800 Subject: [PATCH] fix comments --- .../apache/celeborn/common/CelebornConf.scala | 2 +- .../deploy/worker/memory/MemoryManager.java | 6 ++-- .../deploy/memory/MemoryManagerSuite.scala | 28 +++++++++++++++++++ 3 files changed, 33 insertions(+), 3 deletions(-) create mode 100644 worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 258356757b5..fdc4988b104 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -2450,7 +2450,7 @@ object CelebornConf extends Logging { val WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE: ConfigEntry[Double] = buildConf("celeborn.worker.directMemoryRatioToPauseReplicate") .categories("worker") - .doc("If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers." + + .doc("If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. " + s"This value should be higher than ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}.") .version("0.2.0") .doubleConf diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java index 0e52a5c085d..758ddbb8ca6 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java @@ -123,9 +123,11 @@ private MemoryManager(CelebornConf conf) { Preconditions.checkArgument( pauseReplicateRatio > pausePushDataRatio, String.format( - "Invalid config, {} should be greater than {}", + "Invalid config, %s(%s) should be greater than %s(%s)", CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE().key(), - CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key())); + pauseReplicateRatio, + CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), + pausePushDataRatio)); Preconditions.checkArgument(pausePushDataRatio > resumeRatio); Preconditions.checkArgument(resumeRatio > (readBufferRatio + shuffleStorageRatio)); diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala new file mode 100644 index 00000000000..e3e69c3416c --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala @@ -0,0 +1,28 @@ +package org.apache.celeborn.service.deploy.memory + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.CelebornConf.{WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE, WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE} +import org.apache.celeborn.service.deploy.worker.memory.MemoryManager +class MemoryManagerSuite extends AnyFunSuite with BeforeAndAfterEach { + + // reset the memory manager before each test + override protected def beforeEach(): Unit = { + super.beforeEach() + MemoryManager.reset() + } + + test("Init MemoryManager with invalid configuration") { + val conf = new CelebornConf().set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE, 0.95) + .set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE, 0.85) + val caught = + intercept[IllegalArgumentException] { + MemoryManager.initialize(conf); + } + assert( + caught.getMessage == s"Invalid config, ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE.key}(${0.85}) " + + s"should be greater than ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}(${0.95})") + } +}