Skip to content

Commit

Permalink
[KYUUBI #5306] YarnApplicationOperation supports proxy user
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

For the secured YARN cluster, the Kyuubi Server's user typically has no permission to kill the application. Proxy user or admin should be used instead.

https://docs.cloudera.com/documentation/enterprise/latest/topics/cm_mc_yarn_acl.html#concept_yarn_app_acls__section_killing_an_app

> For YARN, the following three groups of users are allowed to kill a running application:
> - The application owner
> - A cluster administrator defined in yarn.admin.acl
> - A queue administrator defined in aclAdministerApps for the queue in which the application is running

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

Verified ADMIN mode in internal deployment. (output message is formatted for readable)
```
Error: Batch e351185f-1ed8-437a-91bf-da2174e611e2 failed:
{
    "id":"e351185f-1ed8-437a-91bf-da2174e611e2",
    "user":"da_music",
    "batchType":"SPARK",
    "name":"SparkPi",
    "appStartTime":0,
    "appId":"application_1694730881181_58306",
    "appUrl":"http://xxxx-rm-2.xxxx:8088/cluster/app/application_1694730881181_58306",
    "appState":"KILLED",
    "appDiagnostic":"Application application_1694730881181_58306 was killed by user yarn at 10.49.59.149",
    "kyuubiInstance":"kyuubi-1.kyuubi-headless.spark.svc.cluster.local:10099",
    "state":"CANCELED",
    "createTime":1695102138188,
    "endTime":1695102163341,
    "batchInfo":{}
}
```

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No.

Closes #5306 from pan3793/kill-proxy-user.

Closes #5306

2b2e543 [Cheng Pan] address comments
e7e9a9c [Cheng Pan] nit
9cf2afc [Cheng Pan] polish
ff82d12 [Cheng Pan] polish
bf0057b [Cheng Pan] ApplicationManager supports proxy user

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
pan3793 committed Sep 20, 2023
1 parent 18d043f commit cd325b4
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 91 deletions.
7 changes: 7 additions & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,13 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.spnego.keytab | &lt;undefined&gt; | Keytab file for SPNego principal | string | 1.6.0 |
| kyuubi.spnego.principal | &lt;undefined&gt; | SPNego service principal, typical value would look like HTTP/_[email protected]. SPNego service principal would be used when restful Kerberos security is enabled. This needs to be set only if SPNEGO is to be used in authentication. | string | 1.6.0 |

### Yarn

| Key | Default | Meaning | Type | Since |
|---------------------------|---------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------|-------|
| kyuubi.yarn.user.admin | yarn | When kyuubi.yarn.user.strategy is set to ADMIN, use this admin user to construct YARN client for application management, e.g. kill application. | string | 1.8.0 |
| kyuubi.yarn.user.strategy | NONE | Determine which user to use to construct YARN client for application management, e.g. kill application. Options: <ul><li>NONE: use Kyuubi server user.</li><li>ADMIN: use admin user configured in `kyuubi.yarn.user.admin`.</li><li>OWNER: use session user, typically is application owner.</li></ul> | string | 1.8.0 |

### Zookeeper

| Key | Default | Meaning | Type | Since |
Expand Down
9 changes: 9 additions & 0 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io._
import java.net.{Inet4Address, InetAddress, NetworkInterface}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.security.PrivilegedAction
import java.text.SimpleDateFormat
import java.util.{Date, Properties, TimeZone, UUID}
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -203,6 +204,14 @@ object Utils extends Logging {

def currentUser: String = UserGroupInformation.getCurrentUser.getShortUserName

def doAs[T](
proxyUser: String,
realUser: UserGroupInformation = UserGroupInformation.getCurrentUser)(f: () => T): T = {
UserGroupInformation.createProxyUser(proxyUser, realUser).doAs(new PrivilegedAction[T] {
override def run(): T = f()
})
}

private val shortVersionRegex = """^(\d+\.\d+\.\d+)(.*)?$""".r

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2826,6 +2826,32 @@ object KyuubiConf {
.version("1.7.2")
.fallbackConf(ENGINE_SUBMIT_TIMEOUT)

object YarnUserStrategy extends Enumeration {
type YarnUserStrategy = Value
val NONE, ADMIN, OWNER = Value
}

val YARN_USER_STRATEGY: ConfigEntry[String] =
buildConf("kyuubi.yarn.user.strategy")
.doc("Determine which user to use to construct YARN client for application management, " +
"e.g. kill application. Options: <ul>" +
"<li>NONE: use Kyuubi server user.</li>" +
"<li>ADMIN: use admin user configured in `kyuubi.yarn.user.admin`.</li>" +
"<li>OWNER: use session user, typically is application owner.</li>" +
"</ul>")
.version("1.8.0")
.stringConf
.checkValues(YarnUserStrategy)
.createWithDefault("NONE")

val YARN_USER_ADMIN: ConfigEntry[String] =
buildConf("kyuubi.yarn.user.admin")
.doc(s"When ${YARN_USER_STRATEGY.key} is set to ADMIN, use this admin user to " +
"construct YARN client for application management, e.g. kill application.")
.version("1.8.0")
.stringConf
.createWithDefault("yarn")

/**
* Holds information about keys that have been deprecated.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,35 @@ trait ApplicationOperation {
* For example,
* if the Hadoop Yarn is used, for spark applications,
* the tag will be preset via spark.yarn.tags
* @param proxyUser the proxy user to use for executing kill commands.
* For secured YARN cluster, the Kyuubi Server's user typically
* has no permission to kill the application. Admin user or
* application owner should be used instead.
* @return a message contains response describing how the kill process.
*
* @note For implementations, please suppress exceptions and always return KillResponse
*/
def killApplicationByTag(appMgrInfo: ApplicationManagerInfo, tag: String): KillResponse
def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None): KillResponse

/**
* Get the engine/application status by the unique application tag
*
* @param appMgrInfo the application manager information
* @param tag the unique application tag for engine instance.
* @param submitTime engine submit to resourceManager time
* @param proxyUser the proxy user to use for creating YARN client
* For secured YARN cluster, the Kyuubi Server's user may have no permission
* to operate the application. Admin user or application owner could be used
* instead.
* @return [[ApplicationInfo]]
*/
def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): ApplicationInfo
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ private[kyuubi] class EngineRef(
}

if (started + timeout <= System.currentTimeMillis()) {
val killMessage = engineManager.killApplication(builder.appMgrInfo(), engineRefId)
val killMessage =
engineManager.killApplication(builder.appMgrInfo(), engineRefId, Some(appUser))
builder.close(true)
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
Expand All @@ -254,6 +255,7 @@ private[kyuubi] class EngineRef(
val applicationInfo = engineMgr.getApplicationInfo(
builder.appMgrInfo(),
engineRefId,
Some(appUser),
Some(started))

applicationInfo.foreach { appInfo =>
Expand Down Expand Up @@ -310,7 +312,7 @@ private[kyuubi] class EngineRef(
try {
val appMgrInfo = builder.appMgrInfo()
builder.close(true)
engineManager.killApplication(appMgrInfo, engineRefId)
engineManager.killApplication(appMgrInfo, engineRefId, Some(appUser))
} catch {
case e: Exception =>
warn(s"Error closing engine builder, engineRefId: $engineRefId", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,16 @@ class JpsApplicationOperation extends ApplicationOperation {

override def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String): KillResponse = {
tag: String,
proxyUser: Option[String] = None): KillResponse = {
killJpsApplicationByTag(tag, true)
}

override def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
submitTime: Option[Long]): ApplicationInfo = {
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): ApplicationInfo = {
val commandOption = getEngine(tag)
if (commandOption.nonEmpty) {
val idAndCmd = commandOption.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {

override def killApplicationByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String): KillResponse = {
tag: String,
proxyUser: Option[String] = None): KillResponse = {
if (kyuubiConf == null) {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
}
Expand Down Expand Up @@ -157,7 +158,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging {
override def getApplicationInfoByTag(
appMgrInfo: ApplicationManagerInfo,
tag: String,
submitTime: Option[Long]): ApplicationInfo = {
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): ApplicationInfo = {
if (kyuubiConf == null) {
throw new IllegalStateException("Methods initialize and isSupported must be called ahead")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,14 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
super.stop()
}

def killApplication(appMgrInfo: ApplicationManagerInfo, tag: String): KillResponse = {
def killApplication(
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None): KillResponse = {
var (killed, lastMessage): KillResponse = (false, null)
for (operation <- operations if !killed) {
if (operation.isSupported(appMgrInfo)) {
val (k, m) = operation.killApplicationByTag(appMgrInfo, tag)
val (k, m) = operation.killApplicationByTag(appMgrInfo, tag, proxyUser)
killed = k
lastMessage = m
}
Expand All @@ -83,10 +86,11 @@ class KyuubiApplicationManager extends AbstractService("KyuubiApplicationManager
def getApplicationInfo(
appMgrInfo: ApplicationManagerInfo,
tag: String,
proxyUser: Option[String] = None,
submitTime: Option[Long] = None): Option[ApplicationInfo] = {
val operation = operations.find(_.isSupported(appMgrInfo))
operation match {
case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag, submitTime))
case Some(op) => Some(op.getApplicationInfoByTag(appMgrInfo, tag, proxyUser, submitTime))
case None => None
}
}
Expand Down
Loading

0 comments on commit cd325b4

Please sign in to comment.