From 9940b593ccf48df918ed85d5326d35d5165a03ac Mon Sep 17 00:00:00 2001 From: wangjunbo Date: Thu, 18 Jan 2024 15:22:17 -0800 Subject: [PATCH] [KYUUBI #5797][FOLLOWUP] Desc engine command support show engine registered attributes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description desc engine command support show engine registered attributes. ## Issue References ๐Ÿ”— This pull request fixes #5797 ## Describe Your Solution ๐Ÿ”ง https://github.com/apache/kyuubi/pull/5931#discussion_r1440679545 ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #5948 from Kwafoor/kyuubi_5797_desc_engine_follow. Closes #5797 da8909919 [wangjunbo] fix code 085ffc5c1 [wangjunbo] fix code d58e8ec1b [wangjunbo] delete empty lines cca773a80 [wangjunbo] fix code 9fcc2c6a9 [wangjunbo] delete ENGINE_NAMESPACE column 9dfb2f509 [wangjunbo] [KYUUBI #5797][FOLLOWUP] desc engine command support show engine registered attributes Authored-by: wangjunbo Signed-off-by: Fei Wang --- .../kyuubi/client/KyuubiSyncThriftClient.scala | 2 ++ .../org/apache/kyuubi/engine/EngineRef.scala | 9 ++++++++- .../kyuubi/session/KyuubiSessionImpl.scala | 7 +++++++ .../sql/plan/command/DescribeEngine.scala | 17 ++++++++++++++--- .../operation/parser/DescribeEngineSuite.scala | 5 ++++- 5 files changed, 35 insertions(+), 5 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala index 0dc6692da43..d24387341e6 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala @@ -42,6 +42,7 @@ import org.apache.kyuubi.util.{ThreadUtils, ThriftUtils} import org.apache.kyuubi.util.ThreadUtils.scheduleTolerableRunnableWithFixedDelay class KyuubiSyncThriftClient private ( + val hostPort: (String, Int), protocol: TProtocol, engineAliveProbeProtocol: Option[TProtocol], engineAliveProbeInterval: Long, @@ -483,6 +484,7 @@ private[kyuubi] object KyuubiSyncThriftClient extends Logging { None } new KyuubiSyncThriftClient( + (host, port), tProtocol, aliveProbeProtocol, aliveProbeInterval, diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index ae32b04f0f1..eb9c7ab47c9 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -38,7 +38,7 @@ import org.apache.kyuubi.engine.jdbc.JdbcProcessBuilder import org.apache.kyuubi.engine.spark.SparkProcessBuilder import org.apache.kyuubi.engine.trino.TrinoProcessBuilder import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE} -import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider, DiscoveryPaths} +import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider, DiscoveryPaths, ServiceNodeInfo} import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT, ENGINE_TOTAL} import org.apache.kyuubi.metrics.MetricsSystem import org.apache.kyuubi.operation.log.OperationLog @@ -337,6 +337,13 @@ private[kyuubi] class EngineRef( } } + def getServiceNode( + discoveryClient: DiscoveryClient, + hostPort: (String, Int)): Option[ServiceNodeInfo] = { + val serviceNodes = discoveryClient.getServiceNodesInfo(engineSpace) + serviceNodes.filter { sn => (sn.host, sn.port) == hostPort }.headOption + } + def close(): Unit = { if (shareLevel == CONNECTION && builder != null) { try { diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala index a5d160e0714..e34f7b2a06d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala @@ -30,6 +30,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_CREDENTIALS_KE import org.apache.kyuubi.engine.{EngineRef, KyuubiApplicationManager} import org.apache.kyuubi.events.{EventBus, KyuubiSessionEvent} import org.apache.kyuubi.ha.client.DiscoveryClientProvider._ +import org.apache.kyuubi.ha.client.ServiceNodeInfo import org.apache.kyuubi.operation.{Operation, OperationHandle} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.service.authentication.InternalSecurityAccessor @@ -119,6 +120,12 @@ class KyuubiSessionImpl( engineLastAlive = System.currentTimeMillis() } + def getEngineNode: Option[ServiceNodeInfo] = { + withDiscoveryClient(sessionConf) { discoveryClient => + engine.getServiceNode(discoveryClient, _client.hostPort) + } + } + private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = None): Unit = handleSessionException { withDiscoveryClient(sessionConf) { discoveryClient => diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/command/DescribeEngine.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/command/DescribeEngine.scala index 85ec536853a..0c9a0bfa567 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/command/DescribeEngine.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/sql/plan/command/DescribeEngine.scala @@ -35,12 +35,20 @@ import org.apache.kyuubi.sql.schema.{Column, Row, Schema} case class DescribeEngine() extends RunnableCommand { override def run(kyuubiSession: KyuubiSession): Unit = { - val rows = Seq(kyuubiSession).map { session => - lazy val client = session.asInstanceOf[KyuubiSessionImpl].client + val rows = Seq(kyuubiSession.asInstanceOf[KyuubiSessionImpl]).map { session => + lazy val client = session.client val values = new ListBuffer[String]() values += client.engineId.getOrElse("") values += client.engineName.getOrElse("") values += client.engineUrl.getOrElse("") + session.getEngineNode match { + case Some(nodeInfo) => + values += s"${nodeInfo.host}:${nodeInfo.port}" + values += nodeInfo.version.getOrElse("") + values += nodeInfo.attributes.mkString(",") + case None => + values += ("", "", "") + } Row(values.toList) } iter = new IterableFetchIterator(rows) @@ -59,6 +67,9 @@ object DescribeEngine { Seq( Column("ENGINE_ID", TTypeId.STRING_TYPE, Some("Kyuubi engine identify")), Column("ENGINE_NAME", TTypeId.STRING_TYPE, Some("Kyuubi engine name")), - Column("ENGINE_URL", TTypeId.STRING_TYPE, Some("Kyuubi engine url"))) + Column("ENGINE_URL", TTypeId.STRING_TYPE, Some("Kyuubi engine url")), + Column("ENGINE_INSTANCE", TTypeId.STRING_TYPE, Some("Kyuubi engine instance host and port")), + Column("ENGINE_VERSION", TTypeId.STRING_TYPE, Some("Kyuubi engine version")), + Column("ENGINE_ATTRIBUTES", TTypeId.STRING_TYPE, Some("Kyuubi engine attributes"))) } } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/parser/DescribeEngineSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/parser/DescribeEngineSuite.scala index d9488abd62f..1b11fb827ef 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/parser/DescribeEngineSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/parser/DescribeEngineSuite.scala @@ -25,10 +25,13 @@ class DescribeEngineSuite extends ExecutedCommandExecSuite { val resultSet = statement.executeQuery(s"KYUUBI $desc ENGINE") assert(resultSet.next()) - assert(resultSet.getMetaData.getColumnCount == 3) + assert(resultSet.getMetaData.getColumnCount == 6) assert(resultSet.getMetaData.getColumnName(1) == "ENGINE_ID") assert(resultSet.getMetaData.getColumnName(2) == "ENGINE_NAME") assert(resultSet.getMetaData.getColumnName(3) == "ENGINE_URL") + assert(resultSet.getMetaData.getColumnName(4) == "ENGINE_INSTANCE") + assert(resultSet.getMetaData.getColumnName(5) == "ENGINE_VERSION") + assert(resultSet.getMetaData.getColumnName(6) == "ENGINE_ATTRIBUTES") } } }