From add20fd572bd0477b33565247da70b020ea853a2 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Mon, 29 Jan 2024 16:25:02 +0800 Subject: [PATCH] nit --- .../org/apache/kyuubi/engine/EngineRef.scala | 34 +++++++++---------- .../server/api/v1/AdminResourceSuite.scala | 4 +-- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala index da45a472093..0d1b0adc775 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala @@ -55,7 +55,7 @@ import org.apache.kyuubi.server.KyuubiServer private[kyuubi] class EngineRef( conf: KyuubiConf, sessionUser: String, - val doAsEnabled: Boolean, + doAsEnabled: Boolean, groupProvider: GroupProvider, engineRefId: String, engineManager: KyuubiApplicationManager, @@ -192,32 +192,32 @@ private[kyuubi] class EngineRef( builder = engineType match { case SPARK_SQL => conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName) - new SparkProcessBuilder(routingUser, doAsEnabled, conf, engineRefId, extraEngineLog) + new SparkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) case FLINK_SQL => conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName) - new FlinkProcessBuilder(routingUser, doAsEnabled, conf, engineRefId, extraEngineLog) + new FlinkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) case TRINO => - new TrinoProcessBuilder(routingUser, doAsEnabled, conf, engineRefId, extraEngineLog) + new TrinoProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) case HIVE_SQL => conf.setIfMissing(HiveProcessBuilder.HIVE_ENGINE_NAME, defaultEngineName) HiveProcessBuilder( - routingUser, + appUser, doAsEnabled, conf, engineRefId, extraEngineLog, defaultEngineName) case JDBC => - new JdbcProcessBuilder(routingUser, doAsEnabled, conf, engineRefId, extraEngineLog) + new JdbcProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) case CHAT => - new ChatProcessBuilder(routingUser, doAsEnabled, conf, engineRefId, extraEngineLog) + new ChatProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog) } MetricsSystem.tracing(_.incCount(ENGINE_TOTAL)) var acquiredPermit = false try { if (!startupProcessSemaphore.forall(_.tryAcquire(timeout, TimeUnit.MILLISECONDS))) { - MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, routingUser))) + MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser))) throw KyuubiSQLException( s"Timeout($timeout ms, you can modify ${ENGINE_INIT_TIMEOUT.key} to change it) to" + s" acquires a permit from engine builder semaphore.") @@ -232,10 +232,10 @@ private[kyuubi] class EngineRef( while (engineRef.isEmpty) { if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) { exitValue = Some(process.exitValue()) - if (exitValue != Some(0)) { + if (!exitValue.contains(0)) { val error = builder.getError MetricsSystem.tracing { ms => - ms.incCount(MetricRegistry.name(ENGINE_FAIL, routingUser)) + ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser)) ms.incCount(MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName)) } throw error @@ -244,9 +244,9 @@ private[kyuubi] class EngineRef( if (started + timeout <= System.currentTimeMillis()) { val killMessage = - engineManager.killApplication(builder.appMgrInfo(), engineRefId, Some(routingUser)) + engineManager.killApplication(builder.appMgrInfo(), engineRefId, Some(appUser)) builder.close(true) - MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, routingUser))) + MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser))) throw KyuubiSQLException( s"Timeout($timeout ms, you can modify ${ENGINE_INIT_TIMEOUT.key} to change it) to" + s" launched $engineType engine with $redactedCmd. $killMessage", @@ -256,7 +256,7 @@ private[kyuubi] class EngineRef( // even the submit process succeeds, the application might meet failure when initializing, // check the engine application state from engine manager and fast fail on engine terminate - if (engineRef.isEmpty && exitValue == Some(0)) { + if (engineRef.isEmpty && exitValue.contains(0)) { Option(engineManager).foreach { engineMgr => if (lastApplicationInfo.isDefined) { TimeUnit.SECONDS.sleep(1) @@ -265,13 +265,13 @@ private[kyuubi] class EngineRef( val applicationInfo = engineMgr.getApplicationInfo( builder.appMgrInfo(), engineRefId, - Some(routingUser), + Some(appUser), Some(started)) applicationInfo.foreach { appInfo => if (ApplicationState.isTerminated(appInfo.state)) { MetricsSystem.tracing { ms => - ms.incCount(MetricRegistry.name(ENGINE_FAIL, routingUser)) + ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser)) ms.incCount(MetricRegistry.name(ENGINE_FAIL, "ENGINE_TERMINATE")) } throw new KyuubiSQLException( @@ -351,7 +351,7 @@ private[kyuubi] class EngineRef( discoveryClient: DiscoveryClient, hostPort: (String, Int)): Option[ServiceNodeInfo] = { val serviceNodes = discoveryClient.getServiceNodesInfo(engineSpace) - serviceNodes.filter { sn => (sn.host, sn.port) == hostPort }.headOption + serviceNodes.find { sn => (sn.host, sn.port) == hostPort } } def close(): Unit = { @@ -359,7 +359,7 @@ private[kyuubi] class EngineRef( try { val appMgrInfo = builder.appMgrInfo() builder.close(true) - engineManager.killApplication(appMgrInfo, engineRefId, Some(routingUser)) + engineManager.killApplication(appMgrInfo, engineRefId, Some(appUser)) } catch { case e: Exception => warn(s"Error closing engine builder, engineRefId: $engineRefId", e) diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala index b9efedbd9c9..95aa3de025d 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala @@ -416,7 +416,7 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") // In EngineRef, when use hive.server2.proxy.user or kyuubi.session.proxy.user - // the user is the proxyUser, and in our test it is normalUser + // the sessionUser is the proxyUser, and in our test it is normalUser val engine = new EngineRef( conf.clone, @@ -667,7 +667,7 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper { conf.set(KyuubiConf.GROUP_PROVIDER, "hadoop") // In EngineRef, when use hive.server2.proxy.user or kyuubi.session.proxy.user - // the user is the proxyUser, and in our test it is normalUser + // the sessionUser is the proxyUser, and in our test it is normalUser val engine = new EngineRef( conf.clone,