Skip to content

Commit

Permalink
[CELEBORN-152] Add config to limit max workers when offering slots
Browse files Browse the repository at this point in the history
  • Loading branch information
waitinfuture committed Aug 3, 2023
1 parent 39ab731 commit 8b865e1
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def masterSlotAssignLoadAwareFetchTimeWeight: Double =
get(MASTER_SLOT_ASSIGN_LOADAWARE_FETCHTIME_WEIGHT)
def masterSlotAssignExtraSlots: Int = get(MASTER_SLOT_ASSIGN_EXTRA_SLOTS)
def masterSlotAssignMaxWorkers: Int = get(MASTER_SLOT_ASSIGN_MAX_WORKERS)
def initialEstimatedPartitionSize: Long = get(ESTIMATED_PARTITION_SIZE_INITIAL_SIZE)
def estimatedPartitionSizeUpdaterInitialDelay: Long =
get(ESTIMATED_PARTITION_SIZE_UPDATE_INITIAL_DELAY)
Expand Down Expand Up @@ -1870,6 +1871,14 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(2)

val MASTER_SLOT_ASSIGN_MAX_WORKERS: ConfigEntry[Int] =
buildConf("celeborn.master.slot.assign.maxWorkers")
.categories("master")
.version("0.3.1")
.doc("Max workers that slots of one shuffle can be allocated on.")
.intConf
.createWithDefault(1024)

val ESTIMATED_PARTITION_SIZE_INITIAL_SIZE: ConfigEntry[Long] =
buildConf("celeborn.master.estimatedPartitionSize.initialSize")
.withAlternative("celeborn.shuffle.initialEstimatedPartitionSize")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy.master
import java.io.IOException
import java.net.BindException
import java.util
import java.util.Collections
import java.util.concurrent.{ConcurrentHashMap, ScheduledFuture, TimeUnit}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -117,6 +118,7 @@ private[celeborn] class Master(

private def diskReserveSize = conf.workerDiskReserveSize

private val slotsAssignMaxWorkers = conf.masterSlotAssignMaxWorkers
private val slotsAssignLoadAwareDiskGroupNum = conf.masterSlotAssignLoadAwareDiskGroupNum
private val slotsAssignLoadAwareDiskGroupGradient =
conf.masterSlotAssignLoadAwareDiskGroupGradient
Expand Down Expand Up @@ -554,7 +556,13 @@ private[celeborn] class Master(
val numReducers = requestSlots.partitionIdList.size()
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId)

val availableWorkers = workersAvailable()
var availableWorkers = workersAvailable()
Collections.shuffle(availableWorkers)
val numWorkers = Math.max(if (requestSlots.shouldReplicate) 2 else 1, slotsAssignMaxWorkers)
availableWorkers =
if (availableWorkers.size() > numWorkers)
availableWorkers.subList(0, numWorkers)
else availableWorkers
// offer slots
val slots =
masterSource.sample(MasterSource.OFFER_SLOTS_TIME, s"offerSlots-${Random.nextInt()}") {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.tests.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite

import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.ShuffleMode

class SlotsAssignMaxWorkersLargeTest extends AnyFunSuite
with SparkTestBase
with BeforeAndAfterEach {

override def beforeAll(): Unit = {
logInfo("test initialized, setup Celeborn mini cluster")
val masterConf = Map(
s"${CelebornConf.MASTER_SLOT_ASSIGN_MAX_WORKERS.key}" -> "10")
setUpMiniCluster(masterConf = masterConf, workerConf = null)
}

override def beforeEach(): Unit = {
ShuffleClient.reset()
}

override def afterEach(): Unit = {
System.gc()
}

test("celeborn spark integration test - slots assign maxWorkers small") {
val sparkConf = new SparkConf()
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "false")
.setAppName("celeborn-demo").setMaster("local[2]")
val ss = SparkSession.builder()
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.getOrCreate()
ss.sparkContext.parallelize(1 to 1000, 2)
.map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
ss.stop()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.celeborn.tests.spark

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.BeforeAndAfterEach
import org.scalatest.funsuite.AnyFunSuite

import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.ShuffleMode

class SlotsAssignMaxWorkersSmallTest extends AnyFunSuite
with SparkTestBase
with BeforeAndAfterEach {

override def beforeAll(): Unit = {
logInfo("test initialized, setup Celeborn mini cluster")
val masterConf = Map(
s"${CelebornConf.MASTER_SLOT_ASSIGN_MAX_WORKERS.key}" -> "1")
setUpMiniCluster(masterConf = masterConf, workerConf = null)
}

override def beforeEach(): Unit = {
ShuffleClient.reset()
}

override def afterEach(): Unit = {
System.gc()
}

test("celeborn spark integration test - slots assign maxWorkers small") {
val sparkConf = new SparkConf()
.set(s"spark.${CelebornConf.CLIENT_PUSH_REPLICATE_ENABLED.key}", "true")
.setAppName("celeborn-demo").setMaster("local[2]")
val ss = SparkSession.builder()
.config(updateSparkConf(sparkConf, ShuffleMode.HASH))
.getOrCreate()
ss.sparkContext.parallelize(1 to 1000, 2)
.map { i => (i, Range(1, 1000).mkString(",")) }.groupByKey(16).collect()
ss.stop()
}
}

0 comments on commit 8b865e1

Please sign in to comment.