Skip to content

Commit

Permalink
[CELEBORN-1620][CIP-11] Support passing worker tags via RequestSlots …
Browse files Browse the repository at this point in the history
…message

### What changes were proposed in this pull request?

Supporting passing tag expression in RequestSlots request. Clients can pass the tags using CelebornConf. Default tag configs for system/tenant/user will be suppoted in follow up PRs.

### Why are the changes needed?

https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing UTs passed, will add more UTs while integrating TagsManager with ConfigService.

Closes #2770 from s0nskar/request-slots.

Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
s0nskar authored and FMX committed Oct 12, 2024
1 parent 1d44e5f commit d14e9bb
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
.maximumSize(rpcCacheSize)
.build().asInstanceOf[Cache[Int, ByteBuffer]]

private val clientTagsExpr = conf.clientTagsExpr
private val mockDestroyFailure = conf.testMockDestroySlotsFailure
private val authEnabled = conf.authEnabledOnClient
private var applicationMeta: ApplicationMeta = _
Expand Down Expand Up @@ -1642,7 +1643,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends
slotsAssignMaxWorkers,
availableStorageTypes,
excludedWorkerSet,
true)
true,
clientTagsExpr)
val res = requestMasterRequestSlots(req)
if (res.status != StatusCode.SUCCESS) {
requestMasterRequestSlots(req)
Expand Down
1 change: 1 addition & 0 deletions common/src/main/proto/TransportMessages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ message PbRequestSlots {
int32 availableStorageTypes = 11;
repeated PbWorkerInfo excludedWorkerSet = 12;
bool packed = 13;
string tagsExpr = 14;
}

message PbSlotInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED)
def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW)

def clientTagsExpr: String = get(CLIENT_TAGS_EXPR)

// //////////////////////////////////////////////////////
// kerberos //
// //////////////////////////////////////////////////////
Expand Down Expand Up @@ -5819,4 +5821,13 @@ object CelebornConf extends Logging {
.booleanConf
.createWithDefault(false)

val CLIENT_TAGS_EXPR: ConfigEntry[String] =
buildConf("celeborn.client.tagsExpr")
.categories("client")
.version("0.6.0")
.doc("Expression to filter workers by tags. The expression is a comma-separated list of " +
"tags. The expression is evaluated as a logical AND of all tags. For example, " +
"`prod,high-io` filters workers that have both the `prod` and `high-io` tags.")
.stringConf
.createWithDefault("")
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ object ControlMessages extends Logging {
availableStorageTypes: Int,
excludedWorkerSet: Set[WorkerInfo] = Set.empty,
packed: Boolean = false,
tagsExpr: String = "",
override var requestId: String = ZERO_UUID)
extends MasterRequestMessage

Expand Down Expand Up @@ -633,6 +634,7 @@ object ControlMessages extends Logging {
availableStorageTypes,
excludedWorkerSet,
packed,
tagsExpr,
requestId) =>
val payload = PbRequestSlots.newBuilder()
.setApplicationId(applicationId)
Expand All @@ -648,6 +650,7 @@ object ControlMessages extends Logging {
.addAllExcludedWorkerSet(excludedWorkerSet.map(
PbSerDeUtils.toPbWorkerInfo(_, true, true)).asJava)
.setPacked(packed)
.setTagsExpr(tagsExpr)
.build().toByteArray
new TransportMessage(MessageType.REQUEST_SLOTS, payload)

Expand Down Expand Up @@ -1073,6 +1076,7 @@ object ControlMessages extends Logging {
pbRequestSlots.getAvailableStorageTypes,
excludedWorkerInfoSet,
pbRequestSlots.getPacked,
pbRequestSlots.getTagsExpr,
pbRequestSlots.getRequestId)

case REQUEST_SLOTS_RESPONSE_VALUE =>
Expand Down
1 change: 1 addition & 0 deletions docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ license: |
| celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use spark built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use spark built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota, shuffle partition number; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above. | 0.5.0 | |
| celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always use spark built-in shuffle implementation. This configuration is deprecated, consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. | 0.3.0 | celeborn.shuffle.forceFallback.enabled |
| celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | celeborn.shuffle.writer |
| celeborn.client.tagsExpr | | false | Expression to filter workers by tags. The expression is a comma-separated list of tags. The expression is evaluated as a logical AND of all tags. For example, `prod,high-io` filters workers that have both the `prod` and `high-io` tags. | 0.6.0 | |
| celeborn.master.endpoints | &lt;localhost&gt;:9097 | false | Endpoints of master nodes for celeborn clients to connect. Client uses resolver provided byceleborn.master.endpoints.resolver to resolve the master endpoints. By default Celeborn uses `org.apache.celeborn.common.client.StaticMasterEndpointResolver` which take static master endpoints as input. Allowed pattern: `<host1>:<port1>[,<host2>:<port2>]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. If the master endpoints are not static then users can pass custom resolver implementation to discover master endpoints actively using celeborn.master.endpoints.resolver. | 0.2.0 | |
| celeborn.master.endpoints.resolver | org.apache.celeborn.common.client.StaticMasterEndpointResolver | false | Resolver class that can be used for discovering and updating the master endpoints. This allows users to provide a custom master endpoint resolver implementation. This is useful in environments where the master nodes might change due to scaling operations or infrastructure updates. Clients need to ensure that provided resolver class should be present in the classpath. | 0.6.0 | |
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. | 0.2.0 | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.apache.celeborn.server.common.{HttpService, Service}
import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager
import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler}
import org.apache.celeborn.service.deploy.master.quota.QuotaManager
import org.apache.celeborn.service.deploy.master.tags.TagsManager

private[celeborn] class Master(
override val conf: CelebornConf,
Expand Down Expand Up @@ -185,6 +186,7 @@ private[celeborn] class Master(
private val hasS3Storage = conf.hasS3Storage

private val quotaManager = new QuotaManager(conf, configService)
private val tagsManager = new TagsManager()
private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval
private val userResourceConsumptions =
JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]()
Expand Down Expand Up @@ -455,7 +457,7 @@ private[celeborn] class Master(
// keep it for compatible reason
context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS))

case requestSlots @ RequestSlots(applicationId, _, _, _, _, _, _, _, _, _, _, _) =>
case requestSlots @ RequestSlots(applicationId, _, _, _, _, _, _, _, _, _, _, _, _) =>
logTrace(s"Received RequestSlots request $requestSlots.")
checkAuth(context, applicationId)
executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots))
Expand Down Expand Up @@ -846,7 +848,13 @@ private[celeborn] class Master(
val numReducers = requestSlots.partitionIdList.size()
val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId)

val availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
var availableWorkers = workersAvailable(requestSlots.excludedWorkerSet)
if (requestSlots.tagsExpr.nonEmpty) {
availableWorkers = tagsManager.getTaggedWorkers(
requestSlots.tagsExpr,
availableWorkers)
}

val numAvailableWorkers = availableWorkers.size()

if (numAvailableWorkers == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.celeborn.service.deploy.master.tags
import java.util
import java.util.{Set => JSet}
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Predicate
import java.util.stream.Collectors

import scala.collection.JavaConverters.{asScalaIteratorConverter, mapAsScalaConcurrentMapConverter}

Expand All @@ -36,13 +38,26 @@ class TagsManager extends Logging {
ConcurrentHashMap.newKeySet[String]()
}

def getTaggedWorkers(tag: String, workers: List[WorkerInfo]): List[WorkerInfo] = {
def getTaggedWorkers(tagExpr: String, workers: util.List[WorkerInfo]): util.List[WorkerInfo] = {
val tags = tagExpr.split(",").map(_.trim)

if (tags.isEmpty) {
logWarning("No tags provided")
return new util.ArrayList[WorkerInfo]()
}

// TODO: Support multiple tags (CELEBORN-1642)
val tag = tags(0)
val workersForTag = tagStore.get(tag)
if (workersForTag == null) {
logWarning(s"Tag $tag not found in cluster")
return List.empty
return new util.ArrayList[WorkerInfo]()
}

val workerTagsPredicate = new Predicate[WorkerInfo] {
override def test(w: WorkerInfo): Boolean = workersForTag.contains(w.toUniqueId())
}
workers.filter(worker => workersForTag.contains(worker.toUniqueId()))
workers.stream().filter(workerTagsPredicate).collect(Collectors.toList())
}

def addTagToWorker(tag: String, workerId: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.celeborn.service.deploy.master.tags

import scala.collection.JavaConverters.seqAsJavaListConverter

import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.funsuite.AnyFunSuite

Expand All @@ -33,7 +35,7 @@ class TagsManagerSuite extends AnyFunSuite
val WORKER2 = new WorkerInfo("host2", 211, 212, 213, 214, 215)
val WORKER3 = new WorkerInfo("host3", 311, 312, 313, 314, 315)

val workers = List(WORKER1, WORKER2, WORKER3)
val workers = List(WORKER1, WORKER2, WORKER3).asJava

test("test tags manager") {
tagsManager = new TagsManager()
Expand Down

0 comments on commit d14e9bb

Please sign in to comment.