diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index afa8276b0a2..fdc4988b104 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -2450,7 +2450,8 @@ object CelebornConf extends Logging { val WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE: ConfigEntry[Double] = buildConf("celeborn.worker.directMemoryRatioToPauseReplicate") .categories("worker") - .doc("If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers.") + .doc("If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. " + + s"This value should be higher than ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}.") .version("0.2.0") .doubleConf .createWithDefault(0.95) diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md index d5759148576..e724fa2aad7 100644 --- a/docs/configuration/worker.md +++ b/docs/configuration/worker.md @@ -38,7 +38,7 @@ license: | | celeborn.worker.directMemoryRatioForMemoryShuffleStorage | 0.0 | Max ratio of direct memory to store shuffle data | 0.2.0 | | celeborn.worker.directMemoryRatioForReadBuffer | 0.1 | Max ratio of direct memory for read buffer | 0.2.0 | | celeborn.worker.directMemoryRatioToPauseReceive | 0.85 | If direct memory usage reaches this limit, the worker will stop to receive data from Celeborn shuffle clients. | 0.2.0 | -| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. | 0.2.0 | +| celeborn.worker.directMemoryRatioToPauseReplicate | 0.95 | If direct memory usage reaches this limit, the worker will stop to receive replication data from other workers. This value should be higher than celeborn.worker.directMemoryRatioToPauseReceive. | 0.2.0 | | celeborn.worker.directMemoryRatioToResume | 0.5 | If direct memory usage is less than this limit, worker will resume. | 0.2.0 | | celeborn.worker.fetch.heartbeat.enabled | false | enable the heartbeat from worker to client when fetching data | 0.3.0 | | celeborn.worker.fetch.io.threads | <undefined> | Netty IO thread number of worker to handle client fetch data. The default threads number is the number of flush thread. | 0.2.0 | diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java index c0524fa3a6a..758ddbb8ca6 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/MemoryManager.java @@ -120,7 +120,14 @@ private MemoryManager(CelebornConf conf) { .invoke(); Preconditions.checkArgument(maxDirectorMemory > 0); - Preconditions.checkArgument(pauseReplicateRatio > pausePushDataRatio); + Preconditions.checkArgument( + pauseReplicateRatio > pausePushDataRatio, + String.format( + "Invalid config, %s(%s) should be greater than %s(%s)", + CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE().key(), + pauseReplicateRatio, + CelebornConf.WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE().key(), + pausePushDataRatio)); Preconditions.checkArgument(pausePushDataRatio > resumeRatio); Preconditions.checkArgument(resumeRatio > (readBufferRatio + shuffleStorageRatio)); diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala new file mode 100644 index 00000000000..0e0d627bf15 --- /dev/null +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/MemoryManagerSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.celeborn.service.deploy.memory + +import org.scalatest.BeforeAndAfterEach +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.celeborn.common.CelebornConf +import org.apache.celeborn.common.CelebornConf.{WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE, WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE} +import org.apache.celeborn.service.deploy.worker.memory.MemoryManager +class MemoryManagerSuite extends AnyFunSuite with BeforeAndAfterEach { + + // reset the memory manager before each test + override protected def beforeEach(): Unit = { + super.beforeEach() + MemoryManager.reset() + } + + test("Init MemoryManager with invalid configuration") { + val conf = new CelebornConf().set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE, 0.95) + .set(WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE, 0.85) + val caught = + intercept[IllegalArgumentException] { + MemoryManager.initialize(conf); + } + assert( + caught.getMessage == s"Invalid config, ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_REPLICATE.key}(0.85) " + + s"should be greater than ${WORKER_DIRECT_MEMORY_RATIO_PAUSE_RECEIVE.key}(0.95)") + } +}