From d14e9bb3d7744bc1ea628d1c3d46c9714b34fba5 Mon Sep 17 00:00:00 2001 From: Sanskar Modi Date: Sat, 12 Oct 2024 15:57:01 +0800 Subject: [PATCH] [CELEBORN-1620][CIP-11] Support passing worker tags via RequestSlots message ### What changes were proposed in this pull request? Supporting passing tag expression in RequestSlots request. Clients can pass the tags using CelebornConf. Default tag configs for system/tenant/user will be suppoted in follow up PRs. ### Why are the changes needed? https://cwiki.apache.org/confluence/display/CELEBORN/CIP-11+Supporting+Tags+in+Celeborn ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UTs passed, will add more UTs while integrating TagsManager with ConfigService. Closes #2770 from s0nskar/request-slots. Authored-by: Sanskar Modi Signed-off-by: mingji --- .../celeborn/client/LifecycleManager.scala | 4 +++- common/src/main/proto/TransportMessages.proto | 1 + .../apache/celeborn/common/CelebornConf.scala | 11 ++++++++++ .../protocol/message/ControlMessages.scala | 4 ++++ docs/configuration/client.md | 1 + .../service/deploy/master/Master.scala | 12 +++++++++-- .../deploy/master/tags/TagsManager.scala | 21 ++++++++++++++++--- .../deploy/master/tags/TagsManagerSuite.scala | 4 +++- 8 files changed, 51 insertions(+), 7 deletions(-) diff --git a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala index 60721c160a..c4755fbf14 100644 --- a/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala +++ b/client/src/main/scala/org/apache/celeborn/client/LifecycleManager.scala @@ -114,6 +114,7 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends .maximumSize(rpcCacheSize) .build().asInstanceOf[Cache[Int, ByteBuffer]] + private val clientTagsExpr = conf.clientTagsExpr private val mockDestroyFailure = conf.testMockDestroySlotsFailure private val authEnabled = conf.authEnabledOnClient private var applicationMeta: ApplicationMeta = _ @@ -1642,7 +1643,8 @@ class LifecycleManager(val appUniqueId: String, val conf: CelebornConf) extends slotsAssignMaxWorkers, availableStorageTypes, excludedWorkerSet, - true) + true, + clientTagsExpr) val res = requestMasterRequestSlots(req) if (res.status != StatusCode.SUCCESS) { requestMasterRequestSlots(req) diff --git a/common/src/main/proto/TransportMessages.proto b/common/src/main/proto/TransportMessages.proto index 4dc4f14891..7d5bb2c5e1 100644 --- a/common/src/main/proto/TransportMessages.proto +++ b/common/src/main/proto/TransportMessages.proto @@ -285,6 +285,7 @@ message PbRequestSlots { int32 availableStorageTypes = 11; repeated PbWorkerInfo excludedWorkerSet = 12; bool packed = 13; + string tagsExpr = 14; } message PbSlotInfo { diff --git a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala index 33ebb3c07f..069f937a1e 100644 --- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala +++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala @@ -1320,6 +1320,8 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se def clientChunkPrefetchEnabled = get(CLIENT_CHUNK_PREFETCH_ENABLED) def clientInputStreamCreationWindow = get(CLIENT_INPUTSTREAM_CREATION_WINDOW) + def clientTagsExpr: String = get(CLIENT_TAGS_EXPR) + // ////////////////////////////////////////////////////// // kerberos // // ////////////////////////////////////////////////////// @@ -5819,4 +5821,13 @@ object CelebornConf extends Logging { .booleanConf .createWithDefault(false) + val CLIENT_TAGS_EXPR: ConfigEntry[String] = + buildConf("celeborn.client.tagsExpr") + .categories("client") + .version("0.6.0") + .doc("Expression to filter workers by tags. The expression is a comma-separated list of " + + "tags. The expression is evaluated as a logical AND of all tags. For example, " + + "`prod,high-io` filters workers that have both the `prod` and `high-io` tags.") + .stringConf + .createWithDefault("") } diff --git a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala index 5c345237c9..bf4e7f2696 100644 --- a/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala +++ b/common/src/main/scala/org/apache/celeborn/common/protocol/message/ControlMessages.scala @@ -183,6 +183,7 @@ object ControlMessages extends Logging { availableStorageTypes: Int, excludedWorkerSet: Set[WorkerInfo] = Set.empty, packed: Boolean = false, + tagsExpr: String = "", override var requestId: String = ZERO_UUID) extends MasterRequestMessage @@ -633,6 +634,7 @@ object ControlMessages extends Logging { availableStorageTypes, excludedWorkerSet, packed, + tagsExpr, requestId) => val payload = PbRequestSlots.newBuilder() .setApplicationId(applicationId) @@ -648,6 +650,7 @@ object ControlMessages extends Logging { .addAllExcludedWorkerSet(excludedWorkerSet.map( PbSerDeUtils.toPbWorkerInfo(_, true, true)).asJava) .setPacked(packed) + .setTagsExpr(tagsExpr) .build().toByteArray new TransportMessage(MessageType.REQUEST_SLOTS, payload) @@ -1073,6 +1076,7 @@ object ControlMessages extends Logging { pbRequestSlots.getAvailableStorageTypes, excludedWorkerInfoSet, pbRequestSlots.getPacked, + pbRequestSlots.getTagsExpr, pbRequestSlots.getRequestId) case REQUEST_SLOTS_RESPONSE_VALUE => diff --git a/docs/configuration/client.md b/docs/configuration/client.md index 7e7068e17f..80cd1bef3e 100644 --- a/docs/configuration/client.md +++ b/docs/configuration/client.md @@ -116,6 +116,7 @@ license: | | celeborn.client.spark.shuffle.fallback.policy | AUTO | false | Celeborn supports the following kind of fallback policies. 1. ALWAYS: always use spark built-in shuffle implementation; 2. AUTO: prefer to use celeborn shuffle implementation, and fallback to use spark built-in shuffle implementation based on certain factors, e.g. availability of enough workers and quota, shuffle partition number; 3. NEVER: always use celeborn shuffle implementation, and fail fast when it it is concluded that fallback is required based on factors above. | 0.5.0 | | | celeborn.client.spark.shuffle.forceFallback.enabled | false | false | Always use spark built-in shuffle implementation. This configuration is deprecated, consider configuring `celeborn.client.spark.shuffle.fallback.policy` instead. | 0.3.0 | celeborn.shuffle.forceFallback.enabled | | celeborn.client.spark.shuffle.writer | HASH | false | Celeborn supports the following kind of shuffle writers. 1. hash: hash-based shuffle writer works fine when shuffle partition count is normal; 2. sort: sort-based shuffle writer works fine when memory pressure is high or shuffle partition count is huge. This configuration only takes effect when celeborn.client.spark.push.dynamicWriteMode.enabled is false. | 0.3.0 | celeborn.shuffle.writer | +| celeborn.client.tagsExpr | | false | Expression to filter workers by tags. The expression is a comma-separated list of tags. The expression is evaluated as a logical AND of all tags. For example, `prod,high-io` filters workers that have both the `prod` and `high-io` tags. | 0.6.0 | | | celeborn.master.endpoints | <localhost>:9097 | false | Endpoints of master nodes for celeborn clients to connect. Client uses resolver provided byceleborn.master.endpoints.resolver to resolve the master endpoints. By default Celeborn uses `org.apache.celeborn.common.client.StaticMasterEndpointResolver` which take static master endpoints as input. Allowed pattern: `:[,:]*`, e.g. `clb1:9097,clb2:9098,clb3:9099`. If the port is omitted, 9097 will be used. If the master endpoints are not static then users can pass custom resolver implementation to discover master endpoints actively using celeborn.master.endpoints.resolver. | 0.2.0 | | | celeborn.master.endpoints.resolver | org.apache.celeborn.common.client.StaticMasterEndpointResolver | false | Resolver class that can be used for discovering and updating the master endpoints. This allows users to provide a custom master endpoint resolver implementation. This is useful in environments where the master nodes might change due to scaling operations or infrastructure updates. Clients need to ensure that provided resolver class should be present in the classpath. | 0.6.0 | | | celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service of Spark when Master side checks that there is no enough quota for current user. | 0.2.0 | | diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala index d0623a20b3..ec6e550bb4 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/Master.scala @@ -53,6 +53,7 @@ import org.apache.celeborn.server.common.{HttpService, Service} import org.apache.celeborn.service.deploy.master.clustermeta.SingleMasterMetaManager import org.apache.celeborn.service.deploy.master.clustermeta.ha.{HAHelper, HAMasterMetaManager, MetaHandler} import org.apache.celeborn.service.deploy.master.quota.QuotaManager +import org.apache.celeborn.service.deploy.master.tags.TagsManager private[celeborn] class Master( override val conf: CelebornConf, @@ -185,6 +186,7 @@ private[celeborn] class Master( private val hasS3Storage = conf.hasS3Storage private val quotaManager = new QuotaManager(conf, configService) + private val tagsManager = new TagsManager() private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval private val userResourceConsumptions = JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]() @@ -455,7 +457,7 @@ private[celeborn] class Master( // keep it for compatible reason context.reply(ReleaseSlotsResponse(StatusCode.SUCCESS)) - case requestSlots @ RequestSlots(applicationId, _, _, _, _, _, _, _, _, _, _, _) => + case requestSlots @ RequestSlots(applicationId, _, _, _, _, _, _, _, _, _, _, _, _) => logTrace(s"Received RequestSlots request $requestSlots.") checkAuth(context, applicationId) executeWithLeaderChecker(context, handleRequestSlots(context, requestSlots)) @@ -846,7 +848,13 @@ private[celeborn] class Master( val numReducers = requestSlots.partitionIdList.size() val shuffleKey = Utils.makeShuffleKey(requestSlots.applicationId, requestSlots.shuffleId) - val availableWorkers = workersAvailable(requestSlots.excludedWorkerSet) + var availableWorkers = workersAvailable(requestSlots.excludedWorkerSet) + if (requestSlots.tagsExpr.nonEmpty) { + availableWorkers = tagsManager.getTaggedWorkers( + requestSlots.tagsExpr, + availableWorkers) + } + val numAvailableWorkers = availableWorkers.size() if (numAvailableWorkers == 0) { diff --git a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala index 52c3f48faa..e80534d48a 100644 --- a/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala +++ b/master/src/main/scala/org/apache/celeborn/service/deploy/master/tags/TagsManager.scala @@ -20,6 +20,8 @@ package org.apache.celeborn.service.deploy.master.tags import java.util import java.util.{Set => JSet} import java.util.concurrent.ConcurrentHashMap +import java.util.function.Predicate +import java.util.stream.Collectors import scala.collection.JavaConverters.{asScalaIteratorConverter, mapAsScalaConcurrentMapConverter} @@ -36,13 +38,26 @@ class TagsManager extends Logging { ConcurrentHashMap.newKeySet[String]() } - def getTaggedWorkers(tag: String, workers: List[WorkerInfo]): List[WorkerInfo] = { + def getTaggedWorkers(tagExpr: String, workers: util.List[WorkerInfo]): util.List[WorkerInfo] = { + val tags = tagExpr.split(",").map(_.trim) + + if (tags.isEmpty) { + logWarning("No tags provided") + return new util.ArrayList[WorkerInfo]() + } + + // TODO: Support multiple tags (CELEBORN-1642) + val tag = tags(0) val workersForTag = tagStore.get(tag) if (workersForTag == null) { logWarning(s"Tag $tag not found in cluster") - return List.empty + return new util.ArrayList[WorkerInfo]() + } + + val workerTagsPredicate = new Predicate[WorkerInfo] { + override def test(w: WorkerInfo): Boolean = workersForTag.contains(w.toUniqueId()) } - workers.filter(worker => workersForTag.contains(worker.toUniqueId())) + workers.stream().filter(workerTagsPredicate).collect(Collectors.toList()) } def addTagToWorker(tag: String, workerId: String): Unit = { diff --git a/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala index 5f444a8169..544152ea7d 100644 --- a/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala +++ b/master/src/test/scala/org/apache/celeborn/service/deploy/master/tags/TagsManagerSuite.scala @@ -17,6 +17,8 @@ package org.apache.celeborn.service.deploy.master.tags +import scala.collection.JavaConverters.seqAsJavaListConverter + import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.scalatest.funsuite.AnyFunSuite @@ -33,7 +35,7 @@ class TagsManagerSuite extends AnyFunSuite val WORKER2 = new WorkerInfo("host2", 211, 212, 213, 214, 215) val WORKER3 = new WorkerInfo("host3", 311, 312, 313, 314, 315) - val workers = List(WORKER1, WORKER2, WORKER3) + val workers = List(WORKER1, WORKER2, WORKER3).asJava test("test tags manager") { tagsManager = new TagsManager()