Skip to content

Commit

Permalink
[KYUUBI #6008] RESTful API supports killing engine forcibly
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

## Describe Your Solution 🔧

I'd like to introduce the feature that allows users to forcibly kill an engine through API.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6008 from zhaohehuhu/dev-0123.

Closes #6008

00c208a [Cheng Pan] fix
8721a2d [Cheng Pan] log
efc7587 [Cheng Pan] client
cd5129d [Cheng Pan] fix ut
5e1b6a1 [Cheng Pan] Update kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
72d7df3 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
6d5d087 [Cheng Pan] Update kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
b013194 [zhaohehuhu] move the position of log
0cdeede [zhaohehuhu] restore ENGINE_SPARK_REGISTER_ATTRIBUTES
f826d05 [zhaohehuhu] reformat
a13466e [zhaohehuhu] update doc and log string encoded
3a2f597 [zhaohehuhu] refactor
ae24ea7 [zhaohehuhu] refactor UT
936a54e [Wang, Fei] register app mgr info
9bacc2c [hezhao2] fix UTs
11106d7 [Wang, Fei] comments
ba57c2c [hezhao2] refactor code to delete the node and then kill application
634ceb6 [hezhao2] reformat
ab31382 [hezhao2] reformat
513bcdc [hezhao2] fix UT
5062206 [hezhao2] get refId by user, sharelevel and subdomain
3ad9577 [hezhao2] rename params to support multiple engines
632c56b [hezhao2] fix unused import
bd7bb45 [hezhao2] refactor
fb9b251 [hezhao2] add default value for forceKill param
070aad0 [hezhao2] refactor
51827ec [hezhao2] fix UT
f11e765 [hezhao2] add an UT
8a65cf1 [hezhao2] refactor code
d6f82ff [hezhao2] refactor code
f3ab9c5 [hezhao2] new parameter added to decide whether to kill forcefully handle the result of killApplication
5faa5b5 [hezhao2] kill engine forcibly

Lead-authored-by: hezhao2 <[email protected]>
Co-authored-by: zhaohehuhu <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Co-authored-by: Wang, Fei <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
5 people committed Jul 1, 2024
1 parent 693d8a2 commit ab273c8
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 23 deletions.
15 changes: 8 additions & 7 deletions docs/client/rest/rest_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,14 @@ Delete the specified engine.

#### Request Parameters

| Name | Description | Type |
|:------------------------|:------------------------------|:-----------------|
| type | the engine type | String(optional) |
| sharelevel | the engine share level | String(optional) |
| subdomain | the engine subdomain | String(optional) |
| proxyUser | the proxy user to impersonate | String(optional) |
| hive.server2.proxy.user | the proxy user to impersonate | String(optional) |
| Name | Description | Type |
|:------------------------|:-------------------------------------------------------------|:------------------|
| type | the engine type | String(optional) |
| sharelevel | the engine share level | String(optional) |
| subdomain | the engine subdomain | String(optional) |
| proxyUser | the proxy user to impersonate | String(optional) |
| hive.server2.proxy.user | the proxy user to impersonate | String(optional) |
| kill | whether to kill the engine forcibly. Default value is false. | Boolean(optional) |

`proxyUser` is an alternative to `hive.server2.proxy.user`, and the current behavior is consistent with
`hive.server2.proxy.user`. When both parameters are set, `proxyUser` takes precedence.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class SparkTBinaryFrontendService(
val extraAttributes = conf.get(KyuubiConf.ENGINE_SPARK_REGISTER_ATTRIBUTES).map { attr =>
attr -> KyuubiSparkUtil.globalSparkContext.getConf.get(attr, "")
}.toMap
val attributes = extraAttributes ++ Map(KYUUBI_ENGINE_ID -> KyuubiSparkUtil.engineId)
val attributes =
super.attributes ++ extraAttributes ++ Map(KYUUBI_ENGINE_ID -> KyuubiSparkUtil.engineId)
// TODO Support Spark Web UI Enabled SSL
sc.uiWebUrl match {
case Some(url) => attributes ++ Map(KYUUBI_ENGINE_URL -> url.split("//").last)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ object KyuubiReservedKeys {
final val KYUUBI_ENGINE_URL = "kyuubi.engine.url"
final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials"
final val KYUUBI_ENGINE_APP_MGR_INFO_KEY = "kyuubi.engine.appMgrInfo"
final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle"
final val KYUUBI_SESSION_ALIVE_PROBE = "kyuubi.session.alive.probe"
final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.service

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.service.ServiceState.LATENT

/**
Expand All @@ -40,4 +40,8 @@ abstract class AbstractFrontendService(name: String)
discoveryService.foreach(addService)
super.initialize(conf)
}

override def attributes: Map[String, String] = {
conf.getAll.filter(_._1 == KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
import org.apache.kyuubi.client.api.v1.dto.OperationData;
import org.apache.kyuubi.client.api.v1.dto.ServerData;
import org.apache.kyuubi.client.api.v1.dto.SessionData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdminRestApi {
private static final Logger LOG = LoggerFactory.getLogger(AdminRestApi.class);

private KyuubiRestClient client;

private static final String API_BASE_PATH = "admin";
Expand Down Expand Up @@ -65,13 +69,25 @@ public String refreshDenyIps() {
return this.getClient().post(path, null, client.getAuthHeader());
}

/** This method is deprecated since 1.10 */
@Deprecated
public String deleteEngine(
String engineType, String shareLevel, String subdomain, String hs2ProxyUser) {
LOG.warn(
"The method `deleteEngine(engineType, shareLevel, subdomain, hs2ProxyUser)` "
+ "is deprecated since 1.10.0, using "
+ "`deleteEngine(engineType, shareLevel, subdomain, hs2ProxyUser, kill)` instead.");
return this.deleteEngine(engineType, shareLevel, subdomain, hs2ProxyUser, false);
}

public String deleteEngine(
String engineType, String shareLevel, String subdomain, String hs2ProxyUser, boolean kill) {
Map<String, Object> params = new HashMap<>();
params.put("type", engineType);
params.put("sharelevel", shareLevel);
params.put("subdomain", subdomain);
params.put("hive.server2.proxy.user", hs2ProxyUser);
params.put("kill", kill);
return this.getClient().delete(API_BASE_PATH + "/engine", params, client.getAuthHeader());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.kyuubi.engine

import java.nio.charset.StandardCharsets
import java.util.Base64

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.kyuubi.Logging
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.ApplicationState

Expand Down Expand Up @@ -132,8 +139,11 @@ case class ApplicationManagerInfo(
resourceManager: Option[String],
kubernetesInfo: KubernetesInfo = KubernetesInfo())

object ApplicationManagerInfo {
object ApplicationManagerInfo extends Logging {
final val DEFAULT_KUBERNETES_NAMESPACE = "default"
val mapper: ObjectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.registerModule(DefaultScalaModule)

def apply(
resourceManager: Option[String],
Expand All @@ -143,4 +153,22 @@ object ApplicationManagerInfo {
resourceManager,
KubernetesInfo(kubernetesContext, kubernetesNamespace))
}

def serialize(appMgrInfo: ApplicationManagerInfo): String = {
Base64.getEncoder.encodeToString(
mapper.writeValueAsString(appMgrInfo).getBytes(StandardCharsets.UTF_8))
}

def deserialize(encodedStr: String): ApplicationManagerInfo = {
try {
val json = new String(
Base64.getDecoder.decode(encodedStr.getBytes),
StandardCharsets.UTF_8)
mapper.readValue(json, classOf[ApplicationManagerInfo])
} catch {
case _: Throwable =>
error(s"Fail to deserialize the encoded string: $encodedStr")
ApplicationManagerInfo(None)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.StringUtils.containsIgnoreCase

import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf.KYUUBI_HOME
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.{JavaUtils, NamedThreadFactory}
Expand Down Expand Up @@ -169,6 +169,11 @@ trait ProcBuilder {
@volatile private[kyuubi] var process: Process = _
@volatile private[kyuubi] var processLaunched: Boolean = false

// Set engine application manger info conf
conf.set(
KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY,
ApplicationManagerInfo.serialize(appMgrInfo()))

private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT)
val currentTime = System.currentTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ import org.apache.commons.lang3.StringUtils

import org.apache.kyuubi.{KYUUBI_VERSION, Logging}
import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE
import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceNodeInfo}
import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient
import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle}
import org.apache.kyuubi.server.KyuubiServer
import org.apache.kyuubi.server.api.{ApiRequestContext, ApiUtils}
import org.apache.kyuubi.session.{KyuubiSession, SessionHandle}
import org.apache.kyuubi.session.{KyuubiSession, KyuubiSessionManager, SessionHandle}

@Tag(name = "Admin")
@Produces(Array(MediaType.APPLICATION_JSON))
Expand Down Expand Up @@ -277,7 +278,8 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
@QueryParam("sharelevel") shareLevel: String,
@QueryParam("subdomain") subdomain: String,
@QueryParam("proxyUser") kyuubiProxyUser: String,
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response = {
@QueryParam("hive.server2.proxy.user") hs2ProxyUser: String,
@QueryParam("kill") @DefaultValue("false") kill: Boolean): Response = {
val activeProxyUser = Option(kyuubiProxyUser).getOrElse(hs2ProxyUser)
val userName = if (fe.isAdministrator(fe.getRealUser())) {
Option(activeProxyUser).getOrElse(fe.getRealUser())
Expand All @@ -286,24 +288,38 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
}
val engine = normalizeEngineInfo(userName, engineType, shareLevel, subdomain, "default")
val engineSpace = calculateEngineSpace(engine)
val responseMsgBuilder = new StringBuilder()

withDiscoveryClient(fe.getConf) { discoveryClient =>
val engineNodes = discoveryClient.getChildren(engineSpace)
engineNodes.foreach { node =>
val nodePath = s"$engineSpace/$node"
val engineNodes = discoveryClient.getServiceNodesInfo(engineSpace, silent = true)
engineNodes.foreach { engineNode =>
val nodePath = s"$engineSpace/${engineNode.nodeName}"
val engineRefId = engineNode.engineRefId.orNull
info(s"Deleting engine node:$nodePath")
try {
discoveryClient.delete(nodePath)
responseMsgBuilder
.append(s"Engine $engineSpace refId=$engineRefId is deleted successfully.")
} catch {
case e: Exception =>
error(s"Failed to delete engine node:$nodePath", e)
throw new NotFoundException(s"Failed to delete engine node:$nodePath," +
s"${e.getMessage}")
}

if (kill && engineRefId != null) {
val appMgrInfo =
engineNode.attributes.get(KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY)
.map(ApplicationManagerInfo.deserialize).getOrElse(ApplicationManagerInfo(None))
val killResponse = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
.applicationManager.killApplication(appMgrInfo, engineRefId)
responseMsgBuilder
.append(s"\nKilled engine with $appMgrInfo/$engineRefId: $killResponse")
}
}
}

Response.ok(s"Engine $engineSpace is deleted successfully.").build()
Response.ok(responseMsgBuilder.toString()).build()
}

@ApiResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import scala.util.matching.Regex
import org.apache.kyuubi.KyuubiFunSuite
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, ENGINE_FLINK_EXTRA_CLASSPATH, ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY}
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_APP_MGR_INFO_KEY, KYUUBI_ENGINE_CREDENTIALS_KEY}
import org.apache.kyuubi.engine.ApplicationManagerInfo
import org.apache.kyuubi.engine.ApplicationManagerInfo.serialize
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._

class FlinkProcessBuilderSuite extends KyuubiFunSuite {
Expand All @@ -39,13 +41,15 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
ENGINE_FLINK_JAVA_OPTIONS,
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005")
.set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used")
.set(KYUUBI_ENGINE_APP_MGR_INFO_KEY, serialize(ApplicationManagerInfo(None)))

private def applicationModeConf = KyuubiConf()
.set("flink.execution.target", "yarn-application")
.set(ENGINE_FLINK_APPLICATION_JARS, tempUdfJar.toString)
.set(APP_KEY, "kyuubi_connection_flink_paul")
.set("kyuubi.on", "off")
.set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used")
.set(KYUUBI_ENGINE_APP_MGR_INFO_KEY, serialize(ApplicationManagerInfo(None)))

private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile
private val tempOpt =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY
import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, EngineRef, KyuubiApplicationManager}
import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, EngineRef, KubernetesInfo, KyuubiApplicationManager}
import org.apache.kyuubi.engine.EngineType.SPARK_SQL
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, USER}
import org.apache.kyuubi.ha.HighAvailabilityConf
Expand Down Expand Up @@ -301,6 +301,54 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(!operations.map(op => op.getIdentifier).contains(operation.identifier.toString))
}

test("force to kill engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString)
conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test")
conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop")

val engine =
new EngineRef(
conf.clone,
Utils.currentUser,
true,
PluginLoader.loadGroupProvider(conf),
id,
null)

val engineSpace = DiscoveryPaths.makePath(
s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL",
Utils.currentUser,
"default")

withDiscoveryClient(conf) { client =>
engine.getOrCreate(client)
assert(client.pathExists(engineSpace))
assert(client.getChildren(engineSpace).size == 1)

val response = webTarget.path("api/v1/admin/engine")
.queryParam("sharelevel", "USER")
.queryParam("type", "spark_sql")
.queryParam("kill", "true")
.request(MediaType.APPLICATION_JSON_TYPE)
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
.delete()

assert(response.getStatus === 200)
eventually(timeout(5.seconds), interval(100.milliseconds)) {
assert(client.getChildren(engineSpace).isEmpty, s"refId same with $id?")
}

eventually(timeout(30.seconds), interval(100.milliseconds)) {
val appMgrInfo = ApplicationManagerInfo(None, KubernetesInfo(None, None))
assert(engineMgr.getApplicationInfo(appMgrInfo, id)
.exists(_.state == ApplicationState.NOT_FOUND))
}
}
}

test("delete engine - user share level") {
val id = UUID.randomUUID().toString
conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class AdminCtlSuite extends RestClientTestHelper with TestPrematureExit {
ldapUserPasswd)
testPrematureExitForAdminControlCli(
args,
s"Engine ${engineSpace} is deleted successfully.")
s"Engine ${engineSpace} refId=${id} is deleted successfully.")

args = Array(
"list",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ class AdminRestApiSuite extends RestClientTestHelper {
assert(engines(0).getNamespace == engineSpace)
assert(engines(0).getAttributes.get(KyuubiReservedKeys.KYUUBI_ENGINE_ID).startsWith("local-"))

val result = adminRestApi.deleteEngine("spark_sql", "user", "default", "")
assert(result == s"Engine ${engineSpace} is deleted successfully.")
// kill engine to release memory quickly
val result = adminRestApi.deleteEngine("spark_sql", "user", "default", "", true)
assert(result startsWith s"Engine ${engineSpace} refId=${id} is deleted successfully.")

engines = adminRestApi.listEngines("spark_sql", "user", "default", "").asScala
assert(engines.isEmpty)
Expand Down

0 comments on commit ab273c8

Please sign in to comment.