Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RESTful API supports killing engine forcibly #6008

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5faa5b5
kill engine forcibly
zhaohehuhu Jan 16, 2024
f3ab9c5
new parameter added to decide whether to kill forcefully
zhaohehuhu Jan 25, 2024
d6f82ff
refactor code
zhaohehuhu Jan 30, 2024
8a65cf1
refactor code
zhaohehuhu Jan 31, 2024
f11e765
add an UT
zhaohehuhu Feb 2, 2024
51827ec
fix UT
zhaohehuhu Feb 5, 2024
070aad0
refactor
zhaohehuhu Feb 19, 2024
fb9b251
add default value for forceKill param
zhaohehuhu Feb 22, 2024
bd7bb45
refactor
zhaohehuhu Feb 23, 2024
632c56b
fix unused import
zhaohehuhu Mar 14, 2024
3ad9577
rename params to support multiple engines
zhaohehuhu Mar 15, 2024
5062206
get refId by user, sharelevel and subdomain
zhaohehuhu Mar 15, 2024
513bcdc
fix UT
zhaohehuhu Mar 15, 2024
ab31382
reformat
zhaohehuhu Mar 18, 2024
634ceb6
reformat
zhaohehuhu Mar 19, 2024
ba57c2c
refactor code to delete the node and then kill application
zhaohehuhu Mar 22, 2024
11106d7
comments
turboFei Mar 26, 2024
9bacc2c
fix UTs
zhaohehuhu Apr 17, 2024
936a54e
register app mgr info
turboFei May 5, 2024
ae24ea7
refactor UT
zhaohehuhu Jun 24, 2024
3a2f597
refactor
zhaohehuhu Jun 25, 2024
a13466e
update doc and log string encoded
zhaohehuhu Jun 25, 2024
f826d05
reformat
zhaohehuhu Jun 25, 2024
0cdeede
restore ENGINE_SPARK_REGISTER_ATTRIBUTES
zhaohehuhu Jun 25, 2024
b013194
move the position of log
zhaohehuhu Jun 25, 2024
6d5d087
Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/Applicat…
pan3793 Jun 25, 2024
72d7df3
Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/Applicat…
pan3793 Jun 25, 2024
5e1b6a1
Update kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/A…
pan3793 Jun 25, 2024
cd5129d
fix ut
pan3793 Jun 26, 2024
efc7587
client
pan3793 Jun 27, 2024
8721a2d
log
pan3793 Jun 27, 2024
00c208a
fix
pan3793 Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions docs/client/rest/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,14 @@ Delete the specified engine.

#### Request Parameters

| Name | Description | Type |
|:------------------------|:------------------------------|:-----------------|
| type | the engine type | String(optional) |
| sharelevel | the engine share level | String(optional) |
| subdomain | the engine subdomain | String(optional) |
| proxyUser | the proxy user to impersonate | String(optional) |
| hive.server2.proxy.user | the proxy user to impersonate | String(optional) |
| Name | Description | Type |
|:------------------------|:-------------------------------------------------------------|:------------------|
| type | the engine type | String(optional) |
| sharelevel | the engine share level | String(optional) |
| subdomain | the engine subdomain | String(optional) |
| proxyUser | the proxy user to impersonate | String(optional) |
| hive.server2.proxy.user | the proxy user to impersonate | String(optional) |
| kill | whether to kill the engine forcibly. Default value is false. | Boolean(optional) |

`proxyUser` is an alternative to `hive.server2.proxy.user`, and the current behavior is consistent with
`hive.server2.proxy.user`. When both parameters are set, `proxyUser` takes precedence.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class SparkTBinaryFrontendService(
val extraAttributes = conf.get(KyuubiConf.ENGINE_SPARK_REGISTER_ATTRIBUTES).map { attr =>
attr -> KyuubiSparkUtil.globalSparkContext.getConf.get(attr, "")
}.toMap
val attributes = extraAttributes ++ Map(KYUUBI_ENGINE_ID -> KyuubiSparkUtil.engineId)
val attributes =
super.attributes ++ extraAttributes ++ Map(KYUUBI_ENGINE_ID -> KyuubiSparkUtil.engineId)
// TODO Support Spark Web UI Enabled SSL
sc.uiWebUrl match {
case Some(url) => attributes ++ Map(KYUUBI_ENGINE_URL -> url.split("//").last)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object KyuubiReservedKeys {
final val KYUUBI_ENGINE_URL = "kyuubi.engine.url"
final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
final val KYUUBI_ENGINE_APP_MGR_INFO = "kyuubi.engine.appMgrInfo"
final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle"
final val KYUUBI_SESSION_ALIVE_PROBE = "kyuubi.session.alive.probe"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.service

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.service.ServiceState.LATENT

/**
Expand All @@ -40,4 +40,8 @@ abstract class AbstractFrontendService(name: String)
discoveryService.foreach(addService)
super.initialize(conf)
}

override def attributes: Map[String, String] = {
conf.getAll.filter(_._1 == KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.kyuubi.engine

import java.nio.charset.StandardCharsets
import java.util.Base64

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.ApplicationState

Expand Down Expand Up @@ -132,8 +139,11 @@ case class ApplicationManagerInfo(
resourceManager: Option[String],
kubernetesInfo: KubernetesInfo = KubernetesInfo())

object ApplicationManagerInfo {
object ApplicationManagerInfo extends Logging {
final val DEFAULT_KUBERNETES_NAMESPACE = "default"
val mapper: ObjectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.registerModule(DefaultScalaModule)

def apply(
resourceManager: Option[String],
Expand All @@ -143,4 +153,22 @@ object ApplicationManagerInfo {
resourceManager,
KubernetesInfo(kubernetesContext, kubernetesNamespace))
}

def serialize(appMgrInfo: ApplicationManagerInfo): String = {
Base64.getEncoder.encodeToString(
mapper.writeValueAsString(appMgrInfo).getBytes(StandardCharsets.UTF_8))
}

def deserialize(encodedStr: String): ApplicationManagerInfo = {
try {
val json = new String(
Base64.getDecoder.decode(encodedStr.getBytes),
StandardCharsets.UTF_8)
mapper.readValue(json, classOf[ApplicationManagerInfo])
} catch {
case _: Throwable =>
error(s"Fail to desirable the encoded string: $encodedStr")
pan3793 marked this conversation as resolved.
Show resolved Hide resolved
ApplicationManagerInfo(None)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.StringUtils.containsIgnoreCase

import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_HOME
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.{JavaUtils, NamedThreadFactory}
Expand Down Expand Up @@ -169,6 +169,11 @@ trait ProcBuilder {
@volatile private[kyuubi] var process: Process = _
@volatile private[kyuubi] var processLaunched: Boolean = false

// Set engine application manger info conf
conf.set(
KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO,
ApplicationManagerInfo.serialize(appMgrInfo()))

private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
val currentTime = System.currentTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.{KYUUBI_VERSION, Logging}
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.server.api.{ApiRequestContext, ApiUtils}
import org.apache.kyuubi.session.{KyuubiSession, SessionHandle}
import org.apache.kyuubi.session.{KyuubiSession, KyuubiSessionManager, SessionHandle}

@Tag(name = "Admin")
@Produces(Array(MediaType.APPLICATION_JSON))
Expand Down Expand Up @@ -277,7 +278,8 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
@QueryParam("sharelevel") shareLevel: String,
@QueryParam("subdomain") subdomain: String,
@QueryParam("proxyUser") kyuubiProxyUser: String,
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response = {
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String,
@QueryParam("kill") @DefaultValue("false") kill: Boolean): Response = {
val activeProxyUser = Option(kyuubiProxyUser).getOrElse(hs2ProxyUser)
val userName = if (fe.isAdministrator(fe.getRealUser())) {
Option(activeProxyUser).getOrElse(fe.getRealUser())
Expand All @@ -286,24 +288,38 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
}
val engine = normalizeEngineInfo(userName, engineType, shareLevel, subdomain, "default")
val engineSpace = calculateEngineSpace(engine)
val responseMsgBuilder = new StringBuilder()

withDiscoveryClient(fe.getConf) { discoveryClient =>
wForget marked this conversation as resolved.
Show resolved Hide resolved
val engineNodes = discoveryClient.getChildren(engineSpace)
engineNodes.foreach { node =>
val nodePath = s"$engineSpace/$node"
val engineNodes = discoveryClient.getServiceNodesInfo(engineSpace, silent = true)
engineNodes.foreach { engineNode =>
val nodePath = s"$engineSpace/${engineNode.nodeName}"
val engineRefId = engineNode.engineRefId.orNull
info(s"Deleting engine node:$nodePath")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late comment.
the engineSpace is not cleaned right? Why don't we delete it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except for CONNECTION level engine, we cannot guarantee that there are no other engines access the same engineSpace

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean there can be multiple engines under same subdomain?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Even in GROUP level, we can only create one engine per subdomain, right. And subdomain has been included in engineSpace right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are some corner cases. for example, if the engine has no response due to full GC or overload, kyuubi will create a new one on the same engine space. I also see sometimes the dist lock does not work properly(which should not, may be bugs), that also causes multi engines live in the same subdomain. finally, in theory, there are race conditions for kyuubi server deleting and engine registering under the same subdomain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation!

try {
discoveryClient.delete(nodePath)
responseMsgBuilder
.append(s"Engine $engineSpace refId=$engineRefId is deleted successfully.")
} catch {
case e: Exception =>
error(s"Failed to delete engine node:$nodePath", e)
throw new NotFoundException(s"Failed to delete engine node:$nodePath," +
s"${e.getMessage}")
}

if (kill && engineRefId != null) {
val appMgrInfo =
engineNode.attributes.get(KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO)
.map(ApplicationManagerInfo.deserialize).getOrElse(ApplicationManagerInfo(None))
val killResponse = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
.applicationManager.killApplication(appMgrInfo, engineRefId)
responseMsgBuilder
.append(s"\nKilled engine with $appMgrInfo/$engineRefId: $killResponse")
}
}
}

Response.ok(s"Engine $engineSpace is deleted successfully.").build()
Response.ok(responseMsgBuilder.toString()).build()
}

@ApiResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, EngineRef, KubernetesInfo, KyuubiApplicationManager}
import org.apache.kyuubi.engine.EngineType.SPARK_SQL
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, USER}
import org.apache.kyuubi.ha.HighAvailabilityConf
Expand Down Expand Up @@ -301,6 +301,57 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(!operations.map(op => op.getIdentifier).contains(operation.identifier.toString))
}

test("force to kill engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")

val engine =
new EngineRef(
conf.clone,
Utils.currentUser,
true,
PluginLoader.loadGroupProvider(conf),
id,
null)

val engineSpace = DiscoveryPaths.makePath(
s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL",
Utils.currentUser,
"default")

withDiscoveryClient(conf) { client =>
engine.getOrCreate(client)
assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)

val response = webTarget.path("api/v1/admin/engine")
.queryParam("sharelevel", "USER")
.queryParam("type", "spark_sql")
.queryParam("kill", "true")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
.delete()

assert(response.getStatus === 200)
eventually(timeout(5.seconds), interval(100.milliseconds)) {
assert(client.getChildren(engineSpace).isEmpty, s"refId same with $id?")
}

eventually(timeout(30.seconds), interval(100.milliseconds)) {
assert(engineMgr.getApplicationInfo(
ApplicationManagerInfo(
None,
KubernetesInfo(None, None)),
id)
.exists(_.state == ApplicationState.NOT_FOUND))
pan3793 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

test("delete engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class AdminCtlSuite extends RestClientTestHelper with TestPrematureExit {
ldapUserPasswd)
testPrematureExitForAdminControlCli(
args,
s"Engine ${engineSpace} is deleted successfully.")
s"Engine ${engineSpace} refId=${id} is deleted successfully.")

args = Array(
"list",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class AdminRestApiSuite extends RestClientTestHelper {
assert(engines(0).getAttributes.get(KyuubiReservedKeys.KYUUBI_ENGINE_ID).startsWith("local-"))

val result = adminRestApi.deleteEngine("spark_sql", "user", "default", "")
assert(result == s"Engine ${engineSpace} is deleted successfully.")
assert(result == s"Engine ${engineSpace} refId=${id} is deleted successfully.")

engines = adminRestApi.listEngines("spark_sql", "user", "default", "").asScala
assert(engines.isEmpty)
Expand Down
Loading