From 16ae8528919730f553fcfd73703061b340aae488 Mon Sep 17 00:00:00 2001 From: senmiaoliu Date: Sat, 23 Sep 2023 23:18:32 +0800 Subject: [PATCH] [KYUUBI #4994][REST] Support listing all engines ### _Why are the changes needed?_ close #4994 ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request Closes #5157 from lsm1/branch-kyuubi_4994. Closes #4994 02e7eb5fa [senmiaoliu] use abbr a 6d001e519 [senmiaoliu] replace 'allengine' with 'all' f9c548299 [senmiaoliu] fix style df64e773e [senmiaoliu] fix style 6a7d40e63 [senmiaoliu] remove allenginecommand 6306dd8a2 [senmiaoliu] support list all engine Authored-by: senmiaoliu Signed-off-by: Shaoyun Chen --- docs/tools/kyuubi-admin.rst | 2 + .../ctl/cli/AdminControlCliArguments.scala | 1 + .../ctl/cmd/list/AdminListEngineCommand.scala | 3 +- .../kyuubi/ctl/opt/AdminCommandLine.scala | 24 ++- .../org/apache/kyuubi/ctl/opt/CliConfig.scala | 3 +- .../ctl/AdminControlCliArgumentsSuite.scala | 3 +- .../apache/kyuubi/client/AdminRestApi.java | 3 +- .../kyuubi/server/api/v1/AdminResource.scala | 46 +++++- .../server/api/v1/AdminResourceSuite.scala | 151 ++++++++++++++++++ .../rest/client/AdminRestApiSuite.scala | 4 +- 10 files changed, 230 insertions(+), 10 deletions(-) diff --git a/docs/tools/kyuubi-admin.rst b/docs/tools/kyuubi-admin.rst index 29149e92f5f..bd37f7e684f 100644 --- a/docs/tools/kyuubi-admin.rst +++ b/docs/tools/kyuubi-admin.rst @@ -99,6 +99,8 @@ Usage: ``bin/kyuubi-admin list engine [options]`` - The subdomain for the share level of an engine. If not specified, it will read the configuration item kyuubi.engine.share.level.subdomain from kyuubi-defaults.conf. * - --hs2ProxyUser - The proxy user to impersonate. When specified, it will list engines for the hs2ProxyUser. + * - -a --all + - All the engine. .. _list_server: diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/AdminControlCliArguments.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/AdminControlCliArguments.scala index 5a45630c685..e015525b3aa 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/AdminControlCliArguments.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cli/AdminControlCliArguments.scala @@ -61,6 +61,7 @@ class AdminControlCliArguments(args: Seq[String], env: Map[String, String] = sys | type ${cliConfig.engineOpts.engineType} | sharelevel ${cliConfig.engineOpts.engineShareLevel} | sharesubdomain ${cliConfig.engineOpts.engineSubdomain} + | all ${cliConfig.engineOpts.all} """.stripMargin case ControlObject.SERVER => s"""Parsed arguments: diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/AdminListEngineCommand.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/AdminListEngineCommand.scala index acd6fe44416..96be5cc4744 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/AdminListEngineCommand.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/cmd/list/AdminListEngineCommand.scala @@ -38,7 +38,8 @@ class AdminListEngineCommand(cliConfig: CliConfig) normalizedCliConfig.engineOpts.engineType, normalizedCliConfig.engineOpts.engineShareLevel, normalizedCliConfig.engineOpts.engineSubdomain, - normalizedCliConfig.commonOpts.hs2ProxyUser).asScala + normalizedCliConfig.commonOpts.hs2ProxyUser, + normalizedCliConfig.engineOpts.all).asScala } } diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala index c7e367405e8..c02826b6875 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/AdminCommandLine.scala @@ -52,7 +52,7 @@ object AdminCommandLine extends CommonCommandLine { .text("\tDelete resources.") .action((_, c) => c.copy(action = ControlAction.DELETE)) .children( - engineCmd(builder).text("\tDelete the specified engine node for user."))) + deleteEngineCmd(builder).text("\tDelete the specified engine node for user."))) } @@ -64,7 +64,7 @@ object AdminCommandLine extends CommonCommandLine { .text("\tList information about resources.") .action((_, c) => c.copy(action = ControlAction.LIST)) .children( - engineCmd(builder).text("\tList all the engine nodes for a user"), + listEngineCmd(builder).text("\tList the engine nodes"), serverCmd(builder).text("\tList all the server nodes"))) } @@ -80,7 +80,7 @@ object AdminCommandLine extends CommonCommandLine { refreshConfigCmd(builder).text("\tRefresh the config with specified type."))) } - private def engineCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = { + private def deleteEngineCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = { import builder._ cmd("engine").action((_, c) => c.copy(resource = ControlObject.ENGINE)) .children( @@ -95,6 +95,24 @@ object AdminCommandLine extends CommonCommandLine { .text("The engine share level this engine belong to.")) } + private def listEngineCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = { + import builder._ + cmd("engine").action((_, c) => c.copy(resource = ControlObject.ENGINE)) + .children( + opt[String]("engine-type").abbr("et") + .action((v, c) => c.copy(engineOpts = c.engineOpts.copy(engineType = v))) + .text("The engine type this engine belong to."), + opt[String]("engine-subdomain").abbr("es") + .action((v, c) => c.copy(engineOpts = c.engineOpts.copy(engineSubdomain = v))) + .text("The engine subdomain this engine belong to."), + opt[String]("engine-share-level").abbr("esl") + .action((v, c) => c.copy(engineOpts = c.engineOpts.copy(engineShareLevel = v))) + .text("The engine share level this engine belong to."), + opt[String]("all").abbr("a") + .action((v, c) => c.copy(engineOpts = c.engineOpts.copy(all = v))) + .text("All the engine.")) + } + private def serverCmd(builder: OParserBuilder[CliConfig]): OParser[_, CliConfig] = { import builder._ cmd("server").action((_, c) => c.copy(resource = ControlObject.SERVER)) diff --git a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala index 7818f694a3f..4ccae109c6a 100644 --- a/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala +++ b/kyuubi-ctl/src/main/scala/org/apache/kyuubi/ctl/opt/CliConfig.scala @@ -77,6 +77,7 @@ case class EngineOpts( user: String = null, engineType: String = null, engineSubdomain: String = null, - engineShareLevel: String = null) + engineShareLevel: String = null, + all: String = null) case class AdminConfigOpts(configType: String = null) diff --git a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala index 52a2796f463..ae7c0fa1b96 100644 --- a/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala +++ b/kyuubi-ctl/src/test/scala/org/apache/kyuubi/ctl/AdminControlCliArgumentsSuite.scala @@ -158,13 +158,14 @@ class AdminControlCliArgumentsSuite extends KyuubiFunSuite with TestPrematureExi |Command: list [engine|server] | List information about resources. |Command: list engine [options] - | List all the engine nodes for a user + | List the engine nodes | -et, --engine-type | The engine type this engine belong to. | -es, --engine-subdomain | The engine subdomain this engine belong to. | -esl, --engine-share-level | The engine share level this engine belong to. + | -a, --all All the engine. |Command: list server | List all the server nodes | diff --git a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java index e315a96cc56..3b220cbc234 100644 --- a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java +++ b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/AdminRestApi.java @@ -73,12 +73,13 @@ public String deleteEngine( } public List listEngines( - String engineType, String shareLevel, String subdomain, String hs2ProxyUser) { + String engineType, String shareLevel, String subdomain, String hs2ProxyUser, String all) { Map params = new HashMap<>(); params.put("type", engineType); params.put("sharelevel", shareLevel); params.put("subdomain", subdomain); params.put("hive.server2.proxy.user", hs2ProxyUser); + params.put("all", all); Engine[] result = this.getClient() .get(API_BASE_PATH + "/engine", params, Engine[].class, client.getAuthHeader()); diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala index 0c2065ff1dd..5f410ab7de9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala @@ -286,7 +286,51 @@ private[v1] class AdminResource extends ApiRequestContext with Logging { @QueryParam("type") engineType: String, @QueryParam("sharelevel") shareLevel: String, @QueryParam("subdomain") subdomain: String, - @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String): Seq[Engine] = { + @QueryParam("hive.server2.proxy.user") hs2ProxyUser: String, + @QueryParam("all") @DefaultValue("false") all: String): Seq[Engine] = { + if (all.toBoolean) { + val userName = fe.getSessionUser(Map.empty[String, String]) + val ipAddress = fe.getIpAddress + info(s"Received list all kyuubi engine request from $userName/$ipAddress") + if (!isAdministrator(userName)) { + throw new NotAllowedException( + s"$userName is not allowed to list all kyuubi engine") + } + val engines = ListBuffer[Engine]() + val engineSpace = fe.getConf.get(HA_NAMESPACE) + val shareLevel = fe.getConf.get(ENGINE_SHARE_LEVEL) + val engineType = fe.getConf.get(ENGINE_TYPE) + withDiscoveryClient(fe.getConf) { discoveryClient => + val commonParent = s"/${engineSpace}_${KYUUBI_VERSION}_${shareLevel}_$engineType" + info(s"Listing engine nodes for $commonParent") + try { + discoveryClient.getChildren(commonParent).map { + user => + val engine = getEngine(user, engineType, shareLevel, "", "") + val engineSpace = getEngineSpace(engine) + discoveryClient.getChildren(engineSpace).map { child => + info(s"Listing engine nodes for $engineSpace/$child") + engines ++= discoveryClient.getServiceNodesInfo(s"$engineSpace/$child").map(node => + new Engine( + engine.getVersion, + engine.getUser, + engine.getEngineType, + engine.getSharelevel, + node.namespace.split("/").last, + node.instance, + node.namespace, + node.attributes.asJava)) + } + } + } catch { + case nne: NoNodeException => + error(s"No such engine for engine type: $engineType, share level: $shareLevel", nne) + throw new NotFoundException( + s"No such engine for engine type: $engineType, share level: $shareLevel") + } + } + return engines.toSeq + } val userName = if (isAdministrator(fe.getRealUser())) { Option(hs2ProxyUser).getOrElse(fe.getRealUser()) } else { diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index ff5a7e84412..6ca00c802c9 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -587,4 +587,155 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { assert("Running".equals(testServer.getStatus)) } } + + test("list all engine - user share level") { + val id = UUID.randomUUID().toString + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, USER.toString) + conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString) + conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) + conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test") + conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L) + conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") + + val engine = + new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null) + + val engineSpace = DiscoveryPaths.makePath( + s"kyuubi_test_${KYUUBI_VERSION}_USER_SPARK_SQL", + Utils.currentUser, + "") + + withDiscoveryClient(conf) { client => + engine.getOrCreate(client) + + assert(client.pathExists(engineSpace)) + assert(client.getChildren(engineSpace).size == 1) + + val response = webTarget.path("api/v1/admin/engine") + .queryParam("all", "true") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") + .get + + assert(200 == response.getStatus) + val engines = response.readEntity(new GenericType[Seq[Engine]]() {}) + assert(engines.size == 1) + assert(engines(0).getEngineType == "SPARK_SQL") + assert(engines(0).getSharelevel == "USER") + assert(engines(0).getSubdomain == "default") + + // kill the engine application + engineMgr.killApplication(ApplicationManagerInfo(None), id) + eventually(timeout(30.seconds), interval(100.milliseconds)) { + assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists( + _.state == ApplicationState.NOT_FOUND)) + } + } + } + + test("list all engines - group share level") { + val id = UUID.randomUUID().toString + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, GROUP.toString) + conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString) + conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) + conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test") + conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L) + conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") + + val engine = + new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id, null) + + val engineSpace = DiscoveryPaths.makePath( + s"kyuubi_test_${KYUUBI_VERSION}_GROUP_SPARK_SQL", + fe.asInstanceOf[KyuubiRestFrontendService].sessionManager.groupProvider.primaryGroup( + Utils.currentUser, + null), + "") + + withDiscoveryClient(conf) { client => + engine.getOrCreate(client) + + assert(client.pathExists(engineSpace)) + assert(client.getChildren(engineSpace).size == 1) + + val response = webTarget.path("api/v1/admin/engine") + .queryParam("all", "true") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") + .get + + assert(200 == response.getStatus) + val engines = response.readEntity(new GenericType[Seq[Engine]]() {}) + assert(engines.size == 1) + assert(engines(0).getEngineType == "SPARK_SQL") + assert(engines(0).getSharelevel == "GROUP") + assert(engines(0).getSubdomain == "default") + + // kill the engine application + engineMgr.killApplication(ApplicationManagerInfo(None), id) + eventually(timeout(30.seconds), interval(100.milliseconds)) { + assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id).exists( + _.state == ApplicationState.NOT_FOUND)) + } + } + } + + test("list all engines - connection share level") { + conf.set(KyuubiConf.ENGINE_SHARE_LEVEL, CONNECTION.toString) + conf.set(KyuubiConf.ENGINE_TYPE, SPARK_SQL.toString) + conf.set(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0) + conf.set(HighAvailabilityConf.HA_NAMESPACE, "kyuubi_test") + conf.set(KyuubiConf.ENGINE_IDLE_TIMEOUT, 180000L) + conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") + + val engineSpace = DiscoveryPaths.makePath( + s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL", + Utils.currentUser, + "") + + val id1 = UUID.randomUUID().toString + val engine1 = + new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id1, null) + val engineSpace1 = DiscoveryPaths.makePath( + s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL", + Utils.currentUser, + id1) + + val id2 = UUID.randomUUID().toString + val engine2 = + new EngineRef(conf.clone, Utils.currentUser, PluginLoader.loadGroupProvider(conf), id2, null) + val engineSpace2 = DiscoveryPaths.makePath( + s"kyuubi_test_${KYUUBI_VERSION}_CONNECTION_SPARK_SQL", + Utils.currentUser, + id2) + + withDiscoveryClient(conf) { client => + engine1.getOrCreate(client) + engine2.getOrCreate(client) + + assert(client.pathExists(engineSpace)) + assert(client.getChildren(engineSpace).size == 2) + assert(client.pathExists(engineSpace1)) + assert(client.pathExists(engineSpace2)) + + val response = webTarget.path("api/v1/admin/engine") + .queryParam("all", "true") + .request(MediaType.APPLICATION_JSON_TYPE) + .header(AUTHORIZATION_HEADER, s"BASIC $encodeAuthorization") + .get + assert(200 == response.getStatus) + val result = response.readEntity(new GenericType[Seq[Engine]]() {}) + assert(result.size == 2) + + // kill the engine application + engineMgr.killApplication(ApplicationManagerInfo(None), id1) + engineMgr.killApplication(ApplicationManagerInfo(None), id2) + eventually(timeout(30.seconds), interval(100.milliseconds)) { + assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id1) + .exists(_.state == ApplicationState.NOT_FOUND)) + assert(engineMgr.getApplicationInfo(ApplicationManagerInfo(None), id2) + .exists(_.state == ApplicationState.NOT_FOUND)) + } + } + } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala index e3bb298e092..d63e4660772 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/AdminRestApiSuite.scala @@ -74,7 +74,7 @@ class AdminRestApiSuite extends RestClientTestHelper { .build() val adminRestApi = new AdminRestApi(basicKyuubiRestClient) - var engines = adminRestApi.listEngines("spark_sql", "user", "default", "").asScala + var engines = adminRestApi.listEngines("spark_sql", "user", "default", "", "false").asScala assert(engines.size == 1) assert(engines(0).getUser == user) assert(engines(0).getVersion == KYUUBI_VERSION) @@ -87,7 +87,7 @@ class AdminRestApiSuite extends RestClientTestHelper { val result = adminRestApi.deleteEngine("spark_sql", "user", "default", "") assert(result == s"Engine ${engineSpace} is deleted successfully.") - engines = adminRestApi.listEngines("spark_sql", "user", "default", "").asScala + engines = adminRestApi.listEngines("spark_sql", "user", "default", "", "false").asScala assert(engines.isEmpty) }