diff --git a/docs/client/rest/rest_api.md b/docs/client/rest/rest_api.md index 4f28dec05ac..9600655a824 100644 --- a/docs/client/rest/rest_api.md +++ b/docs/client/rest/rest_api.md @@ -457,13 +457,14 @@ Delete the specified engine. #### Request Parameters -| Name | Description | Type | -|:------------------------|:------------------------------|:-----------------| -| type | the engine type | String(optional) | -| sharelevel | the engine share level | String(optional) | -| subdomain | the engine subdomain | String(optional) | -| proxyUser | the proxy user to impersonate | String(optional) | -| hive.server2.proxy.user | the proxy user to impersonate | String(optional) | +| Name | Description | Type | +|:------------------------|:-------------------------------------------------------------|:------------------| +| type | the engine type | String(optional) | +| sharelevel | the engine share level | String(optional) | +| subdomain | the engine subdomain | String(optional) | +| proxyUser | the proxy user to impersonate | String(optional) | +| hive.server2.proxy.user | the proxy user to impersonate | String(optional) | +| kill | whether to kill the engine forcibly. Default value is false. | Boolean(optional) | `proxyUser` is an alternative to `hive.server2.proxy.user`, and the current behavior is consistent with `hive.server2.proxy.user`. When both parameters are set, `proxyUser` takes precedence. diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala index 2eed5253dd8..57f5cb14dff 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkTBinaryFrontendService.scala @@ -100,7 +100,8 @@ class SparkTBinaryFrontendService( val extraAttributes = conf.get(KyuubiConf.ENGINE_SPARK_REGISTER_ATTRIBUTES).map { attr => attr -> KyuubiSparkUtil.globalSparkContext.getConf.get(attr, "") }.toMap - val attributes = extraAttributes ++ Map(KYUUBI_ENGINE_ID -> KyuubiSparkUtil.engineId) + val attributes = + super.attributes ++ extraAttributes ++ Map(KYUUBI_ENGINE_ID -> KyuubiSparkUtil.engineId) // TODO Support Spark Web UI Enabled SSL sc.uiWebUrl match { case Some(url) => attributes ++ Map(KYUUBI_ENGINE_URL -> url.split("//").last) diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala index 9f22dd1f883..fb1385a3ac2 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiReservedKeys.scala @@ -36,6 +36,7 @@ object KyuubiReservedKeys { final val KYUUBI_ENGINE_URL = "kyuubi.engine.url" final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time" final val KYUUBI_ENGINE_CREDENTIALS_KEY = "kyuubi.engine.credentials" + final val KYUUBI_ENGINE_APP_MGR_INFO_KEY = "kyuubi.engine.appMgrInfo" final val KYUUBI_SESSION_HANDLE_KEY = "kyuubi.session.handle" final val KYUUBI_SESSION_ALIVE_PROBE = "kyuubi.session.alive.probe" final val KYUUBI_SESSION_ENGINE_LAUNCH_HANDLE_GUID = diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala index e2ba0678938..bdabaaa5d42 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractFrontendService.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.service -import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} import org.apache.kyuubi.service.ServiceState.LATENT /** @@ -40,4 +40,8 @@ abstract class AbstractFrontendService(name: String) discoveryService.foreach(addService) super.initialize(conf) } + + override def attributes: Map[String, String] = { + conf.getAll.filter(_._1 == KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY) + } } diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java index 0f6fbbc472e..c616352142f 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java @@ -23,8 +23,12 @@ import org.apache.kyuubi.client.api.v1.dto.OperationData; import org.apache.kyuubi.client.api.v1.dto.ServerData; import org.apache.kyuubi.client.api.v1.dto.SessionData; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AdminRestApi { + private static final Logger LOG = LoggerFactory.getLogger(AdminRestApi.class); + private KyuubiRestClient client; private static final String API_BASE_PATH = "admin"; @@ -65,13 +69,25 @@ public String refreshDenyIps() { return this.getClient().post(path, null, client.getAuthHeader()); } + /** This method is deprecated since 1.10 */ + @Deprecated public String deleteEngine( String engineType, String shareLevel, String subdomain, String hs2ProxyUser) { + LOG.warn( + "The method `deleteEngine(engineType, shareLevel, subdomain, hs2ProxyUser)` " + + "is deprecated since 1.10.0, using " + + "`deleteEngine(engineType, shareLevel, subdomain, hs2ProxyUser, kill)` instead."); + return this.deleteEngine(engineType, shareLevel, subdomain, hs2ProxyUser, false); + } + + public String deleteEngine( + String engineType, String shareLevel, String subdomain, String hs2ProxyUser, boolean kill) { Map params = new HashMap<>(); params.put("type", engineType); params.put("sharelevel", shareLevel); params.put("subdomain", subdomain); params.put("hive.server2.proxy.user", hs2ProxyUser); + params.put("kill", kill); return this.getClient().delete(API_BASE_PATH + "/engine", params, client.getAuthHeader()); } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala index 23a49c1ae5f..339df2f7857 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala @@ -17,6 +17,13 @@ package org.apache.kyuubi.engine +import java.nio.charset.StandardCharsets +import java.util.Base64 + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.kyuubi.Logging import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.ApplicationState.ApplicationState @@ -132,8 +139,11 @@ case class ApplicationManagerInfo( resourceManager: Option[String], kubernetesInfo: KubernetesInfo = KubernetesInfo()) -object ApplicationManagerInfo { +object ApplicationManagerInfo extends Logging { final val DEFAULT_KUBERNETES_NAMESPACE = "default" + val mapper: ObjectMapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + .registerModule(DefaultScalaModule) def apply( resourceManager: Option[String], @@ -143,4 +153,22 @@ object ApplicationManagerInfo { resourceManager, KubernetesInfo(kubernetesContext, kubernetesNamespace)) } + + def serialize(appMgrInfo: ApplicationManagerInfo): String = { + Base64.getEncoder.encodeToString( + mapper.writeValueAsString(appMgrInfo).getBytes(StandardCharsets.UTF_8)) + } + + def deserialize(encodedStr: String): ApplicationManagerInfo = { + try { + val json = new String( + Base64.getDecoder.decode(encodedStr.getBytes), + StandardCharsets.UTF_8) + mapper.readValue(json, classOf[ApplicationManagerInfo]) + } catch { + case _: Throwable => + error(s"Fail to deserialize the encoded string: $encodedStr") + ApplicationManagerInfo(None) + } + } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala index 566bc18ad7e..0e21bb9f730 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala @@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils import org.apache.commons.lang3.StringUtils.containsIgnoreCase import org.apache.kyuubi._ -import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} import org.apache.kyuubi.config.KyuubiConf.KYUUBI_HOME import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.util.{JavaUtils, NamedThreadFactory} @@ -169,6 +169,11 @@ trait ProcBuilder { @volatile private[kyuubi] var process: Process = _ @volatile private[kyuubi] var processLaunched: Boolean = false + // Set engine application manger info conf + conf.set( + KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY, + ApplicationManagerInfo.serialize(appMgrInfo())) + private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized { val engineLogTimeout = conf.get(KyuubiConf.ENGINE_LOG_TIMEOUT) val currentTime = System.currentTimeMillis() diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index 2e61e6b0816..0f1544fe685 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -31,15 +31,16 @@ import org.apache.commons.lang3.StringUtils import org.apache.kyuubi.{KYUUBI_VERSION, Logging} import org.apache.kyuubi.client.api.v1.dto._ -import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} import org.apache.kyuubi.config.KyuubiConf._ +import org.apache.kyuubi.engine.ApplicationManagerInfo import org.apache.kyuubi.ha.HighAvailabilityConf.HA_NAMESPACE import org.apache.kyuubi.ha.client.{DiscoveryPaths, ServiceNodeInfo} import org.apache.kyuubi.ha.client.DiscoveryClientProvider.withDiscoveryClient import org.apache.kyuubi.operation.{KyuubiOperation, OperationHandle} import org.apache.kyuubi.server.KyuubiServer import org.apache.kyuubi.server.api.{ApiRequestContext, ApiUtils} -import org.apache.kyuubi.session.{KyuubiSession, SessionHandle} +import org.apache.kyuubi.session.{KyuubiSession, KyuubiSessionManager, SessionHandle} @Tag(name = "Admin") @Produces(Array(MediaType.APPLICATION_JSON)) @@ -277,7 +278,8 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { @QueryParam("sharelevel") shareLevel: String, @QueryParam("subdomain") subdomain: String, @QueryParam("proxyUser") kyuubiProxyUser: String, - @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Response = { + @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String, + @QueryParam("kill") @DefaultValue("false") kill: Boolean): Response = { val activeProxyUser = Option(kyuubiProxyUser).getOrElse(hs2ProxyUser) val userName = if (fe.isAdministrator(fe.getRealUser())) { Option(activeProxyUser).getOrElse(fe.getRealUser()) @@ -286,24 +288,38 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { } val engine = normalizeEngineInfo(userName, engineType, shareLevel, subdomain, "default") val engineSpace = calculateEngineSpace(engine) + val responseMsgBuilder = new StringBuilder() withDiscoveryClient(fe.getConf) { discoveryClient => - val engineNodes = discoveryClient.getChildren(engineSpace) - engineNodes.foreach { node => - val nodePath = s"$engineSpace/$node" + val engineNodes = discoveryClient.getServiceNodesInfo(engineSpace, silent = true) + engineNodes.foreach { engineNode => + val nodePath = s"$engineSpace/${engineNode.nodeName}" + val engineRefId = engineNode.engineRefId.orNull info(s"Deleting engine node:$nodePath") try { discoveryClient.delete(nodePath) + responseMsgBuilder + .append(s"Engine $engineSpace refId=$engineRefId is deleted successfully.") } catch { case e: Exception => error(s"Failed to delete engine node:$nodePath", e) throw new NotFoundException(s"Failed to delete engine node:$nodePath," + s"${e.getMessage}") } + + if (kill && engineRefId != null) { + val appMgrInfo = + engineNode.attributes.get(KyuubiReservedKeys.KYUUBI_ENGINE_APP_MGR_INFO_KEY) + .map(ApplicationManagerInfo.deserialize).getOrElse(ApplicationManagerInfo(None)) + val killResponse = fe.be.sessionManager.asInstanceOf[KyuubiSessionManager] + .applicationManager.killApplication(appMgrInfo, engineRefId) + responseMsgBuilder + .append(s"\nKilled engine with $appMgrInfo/$engineRefId: $killResponse") + } } } - Response.ok(s"Engine $engineSpace is deleted successfully.").build() + Response.ok(responseMsgBuilder.toString()).build() } @ApiResponse( diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala index 8786ef79814..7895ff83f41 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala @@ -27,7 +27,9 @@ import scala.util.matching.Regex import org.apache.kyuubi.KyuubiFunSuite import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, ENGINE_FLINK_EXTRA_CLASSPATH, ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY} -import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY +import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_APP_MGR_INFO_KEY, KYUUBI_ENGINE_CREDENTIALS_KEY} +import org.apache.kyuubi.engine.ApplicationManagerInfo +import org.apache.kyuubi.engine.ApplicationManagerInfo.serialize import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._ class FlinkProcessBuilderSuite extends KyuubiFunSuite { @@ -39,6 +41,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { ENGINE_FLINK_JAVA_OPTIONS, "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005") .set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used") + .set(KYUUBI_ENGINE_APP_MGR_INFO_KEY, serialize(ApplicationManagerInfo(None))) private def applicationModeConf = KyuubiConf() .set("flink.execution.target", "yarn-application") @@ -46,6 +49,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { .set(APP_KEY, "kyuubi_connection_flink_paul") .set("kyuubi.on", "off") .set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used") + .set(KYUUBI_ENGINE_APP_MGR_INFO_KEY, serialize(ApplicationManagerInfo(None))) private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile private val tempOpt = diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index 2360dea600b..c5aa3fbd330 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -33,7 +33,7 @@ import org.apache.kyuubi.client.api.v1.dto._ import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_CONNECTION_URL_KEY -import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, EngineRef, KyuubiApplicationManager} +import org.apache.kyuubi.engine.{ApplicationManagerInfo, ApplicationState, EngineRef, KubernetesInfo, KyuubiApplicationManager} import org.apache.kyuubi.engine.EngineType.SPARK_SQL import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, USER} import org.apache.kyuubi.ha.HighAvailabilityConf @@ -301,6 +301,54 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { assert(!operations.map(op => op.getIdentifier).contains(operation.identifier.toString)) } + test("force to kill engine - user share level") { + val id = UUID.randomUUID().toString + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString) + conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString) + conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) + conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test") + conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") + + val engine = + new EngineRef( + conf.clone, + Utils.currentUser, + true, + PluginLoader.loadGroupProvider(conf), + id, + null) + + val engineSpace = DiscoveryPaths.makePath( + s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL", + Utils.currentUser, + "default") + + withDiscoveryClient(conf) { client => + engine.getOrCreate(client) + assert(client.pathExists(engineSpace)) + assert(client.getChildren(engineSpace).size == 1) + + val response = webTarget.path("api/v1/admin/engine") + .queryParam("sharelevel", "USER") + .queryParam("type", "spark_sql") + .queryParam("kill", "true") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser)) + .delete() + + assert(response.getStatus === 200) + eventually(timeout(5.seconds), interval(100.milliseconds)) { + assert(client.getChildren(engineSpace).isEmpty, s"refId same with $id?") + } + + eventually(timeout(30.seconds), interval(100.milliseconds)) { + val appMgrInfo = ApplicationManagerInfo(None, KubernetesInfo(None, None)) + assert(engineMgr.getApplicationInfo(appMgrInfo, id) + .exists(_.state == ApplicationState.NOT_FOUND)) + } + } + } + test("delete engine - user share level") { val id = UUID.randomUUID().toString conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala index 0c262d2389a..c11410da89b 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminCtlSuite.scala @@ -93,7 +93,7 @@ class AdminCtlSuite extends RestClientTestHelper with TestPrematureExit { ldapUserPasswd) testPrematureExitForAdminControlCli( args, - s"Engine ${engineSpace} is deleted successfully.") + s"Engine ${engineSpace} refId=${id} is deleted successfully.") args = Array( "list", diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala index 43eb06482e7..9aaefd2735c 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala @@ -85,8 +85,9 @@ class AdminRestApiSuite extends RestClientTestHelper { assert(engines(0).getNamespace == engineSpace) assert(engines(0).getAttributes.get(KyuubiReservedKeys.KYUUBI_ENGINE_ID).startsWith("local-")) - val result = adminRestApi.deleteEngine("spark_sql", "user", "default", "") - assert(result == s"Engine ${engineSpace} is deleted successfully.") + // kill engine to release memory quickly + val result = adminRestApi.deleteEngine("spark_sql", "user", "default", "", true) + assert(result startsWith s"Engine ${engineSpace} refId=${id} is deleted successfully.") engines = adminRestApi.listEngines("spark_sql", "user", "default", "").asScala assert(engines.isEmpty)