diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 212208b833c..f07b51aee1c 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -137,6 +137,7 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.chat.memory | 1g | The heap memory for the Chat engine | string | 1.8.0 |
| kyuubi.engine.chat.provider | ECHO | The provider for the Chat engine. Candidates:
- ECHO: simply replies a welcome message.
- GPT: a.k.a ChatGPT, powered by OpenAI.
- ERNIE: ErnieBot, powered by Baidu.
| string | 1.8.0 |
| kyuubi.engine.connection.url.use.hostname | true | (deprecated) When true, the engine registers with hostname to zookeeper. When Spark runs on K8s with cluster mode, set to false to ensure that server can connect to engine | boolean | 1.3.0 |
+| kyuubi.engine.deploy.mode | local | Configures the engine deploy mode, The value can be 'local', 'yarn'. In local mode, the engine operates on the same node as the KyuubiServer. In YARN mode, the engine runs within the Application Master (AM) container of YARN. Currently, only Hive engine supports YARN mode. | string | 1.9.0 |
| kyuubi.engine.deregister.exception.classes || A comma-separated list of exception classes. If there is any exception thrown, whose class matches the specified classes, the engine would deregister itself. | set | 1.2.0 |
| kyuubi.engine.deregister.exception.messages || A comma-separated list of exception messages. If there is any exception thrown, whose message or stacktrace matches the specified message list, the engine would deregister itself. | set | 1.2.0 |
| kyuubi.engine.deregister.exception.ttl | PT30M | Time to live(TTL) for exceptions pattern specified in kyuubi.engine.deregister.exception.classes and kyuubi.engine.deregister.exception.messages to deregister engines. Once the total error count hits the kyuubi.engine.deregister.job.max.failures within the TTL, an engine will deregister itself and wait for self-terminated. Otherwise, we suppose that the engine has recovered from temporary failures. | duration | 1.2.0 |
@@ -203,7 +204,15 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.engine.user.isolated.spark.session | true | When set to false, if the engine is running in a group or server share level, all the JDBC/ODBC connections will be isolated against the user. Including the temporary views, function registries, SQL configuration, and the current database. Note that, it does not affect if the share level is connection or user. | boolean | 1.6.0 |
| kyuubi.engine.user.isolated.spark.session.idle.interval | PT1M | The interval to check if the user-isolated Spark session is timeout. | duration | 1.6.0 |
| kyuubi.engine.user.isolated.spark.session.idle.timeout | PT6H | If kyuubi.engine.user.isolated.spark.session is false, we will release the Spark session if its corresponding user is inactive after this configured timeout. | duration | 1.6.0 |
+| kyuubi.engine.yarn.cores | 1 | kyuubi engine container core number when `kyuubi.engine.deploy.mode` is YARN. | int | 1.9.0 |
+| kyuubi.engine.yarn.java.options | <undefined> | The extra Java options for the AM when `kyuubi.engine.deploy.mode` is YARN. | string | 1.9.0 |
+| kyuubi.engine.yarn.maxAppAttempts | <undefined> | Maximum number of AM attempts before failing the app when `kyuubi.engine.deploy.mode` is YARN. | int | 1.9.0 |
+| kyuubi.engine.yarn.memory | 1024 | kyuubi engine container memory in mb when `kyuubi.engine.deploy.mode` is YARN. | int | 1.9.0 |
+| kyuubi.engine.yarn.queue | default | kyuubi engine yarn queue when `kyuubi.engine.deploy.mode` is YARN. | string | 1.9.0 |
+| kyuubi.engine.yarn.report.interval | PT1S | Interval between reports of the current engine on yarn app status. | duration | 1.9.0 |
+| kyuubi.engine.yarn.stagingDir | <undefined> | Staging directory used while submitting kyuubi engine to YARN. | string | 1.9.0 |
| kyuubi.engine.yarn.submit.timeout | PT30S | The engine submit timeout for YARN application. | duration | 1.7.2 |
+| kyuubi.engine.yarn.tags | <undefined> | kyuubi engine yarn tags when `kyuubi.engine.deploy.mode` is YARN. | seq | 1.9.0 |
### Event
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
index 3e6b8729db1..aff96dec5cb 100644
--- a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/HiveSQLEngine.scala
@@ -40,12 +40,16 @@ class HiveSQLEngine extends Serverable("HiveSQLEngine") {
override val frontendServices: Seq[AbstractFrontendService] =
Seq(new HiveTBinaryFrontendService(this))
private[hive] val engineStartTime = System.currentTimeMillis()
+ private[hive] var selfExist: Boolean = false
override def start(): Unit = {
super.start()
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
- backendService.sessionManager.startTerminatingChecker(() => stop())
+ backendService.sessionManager.startTerminatingChecker(() => {
+ selfExist = true
+ stop()
+ })
}
override protected def stopServer(): Unit = {
@@ -114,6 +118,13 @@ object HiveSQLEngine extends Logging {
currentEngine = Some(engine)
}
+ def runEngine(args: Array[String]): Unit = main(args)
+
+ def getCurrentEngine: HiveSQLEngine = {
+ require(currentEngine.isDefined, "HiveSQLEngine has not been initialized")
+ currentEngine.get
+ }
+
private def initLoggerEventHandler(conf: KyuubiConf): Unit = {
HiveEventHandlerRegister.registerEventLoggers(conf)
}
@@ -122,6 +133,7 @@ object HiveSQLEngine extends Logging {
SignalRegister.registerLogger(logger)
try {
Utils.fromCommandLineArgs(args, kyuubiConf)
+ kyuubiConf.loadFileDefaults()
val sessionUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
val realUser = UserGroupInformation.getLoginUser
@@ -143,7 +155,8 @@ object HiveSQLEngine extends Logging {
}
} catch {
- case t: Throwable => currentEngine match {
+ case t: Throwable =>
+ currentEngine match {
case Some(engine) =>
engine.stop()
val event = HiveEngineEvent(engine)
@@ -152,6 +165,7 @@ object HiveSQLEngine extends Logging {
case _ =>
error(s"Failed to start Hive SQL engine: ${t.getMessage}.", t)
}
+ throw t
}
}
}
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/ApplicationMaster.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/ApplicationMaster.scala
new file mode 100644
index 00000000000..ab6b5c14d28
--- /dev/null
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/ApplicationMaster.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.hive.deploy
+
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.kyuubi.{Logging, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.hive.{HiveSQLEngine, HiveTBinaryFrontendService}
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+
+object ApplicationMaster extends Logging {
+
+ private var amClient: AMRMClient[ContainerRequest] = _
+ private var yarnConf: YarnConfiguration = _
+
+ private var currentEngine: HiveSQLEngine = _
+ private val kyuubiConf = new KyuubiConf()
+
+ private var finalMsg: String = _
+
+ @volatile private var registered: Boolean = false
+ @volatile private var unregistered: Boolean = false
+ @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
+
+ def main(args: Array[String]): Unit = {
+ try {
+ kyuubiConf.loadFileDefaults()
+ yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
+ Utils.addShutdownHook(() => {
+ if (!unregistered) {
+ if (currentEngine != null && currentEngine.selfExist) {
+ finalMsg = "Kyuubi Application Master is shutting down."
+ finalStatus = FinalApplicationStatus.SUCCEEDED
+ } else {
+ finalMsg = "Kyuubi Application Master is shutting down with error."
+ finalStatus = FinalApplicationStatus.FAILED
+ }
+ unregister(finalStatus, finalMsg)
+ }
+ })
+ runApplicationMaster(args)
+ } catch {
+ case t: Throwable =>
+ error("Error running ApplicationMaster", t)
+ finalStatus = FinalApplicationStatus.FAILED
+ finalMsg = t.getMessage
+ unregister(finalStatus, finalMsg)
+ if (currentEngine != null) {
+ currentEngine.stop()
+ }
+ }
+ }
+
+ def runApplicationMaster(args: Array[String]): Unit = {
+ amClient = AMRMClient.createAMRMClient()
+ amClient.init(yarnConf)
+ amClient.start()
+
+ runHiveEngine(args)
+
+ registryAM()
+ }
+
+ def runHiveEngine(args: Array[String]): Unit = {
+ HiveSQLEngine.runEngine(args)
+ currentEngine = HiveSQLEngine.getCurrentEngine
+ }
+
+ def initAmClient(): Unit = {
+ amClient = AMRMClient.createAMRMClient()
+ amClient.init(yarnConf)
+ amClient.start()
+ }
+
+ def registryAM(): Unit = {
+ val frontendService =
+ currentEngine.frontendServices.head.asInstanceOf[HiveTBinaryFrontendService]
+ val (host, port) = frontendService.listenerTBinaryFrontendAddress()
+ val trackingUrl = frontendService.connectionUrl
+ info("Registering the HiveSQLEngine ApplicationMaster")
+ synchronized {
+ amClient.registerApplicationMaster(host, port, trackingUrl)
+ registered = true
+ }
+ }
+
+ def unregister(status: FinalApplicationStatus, diagnostics: String): Unit = {
+ synchronized {
+ if (registered && !unregistered) {
+ info(s"Unregistering ApplicationMaster with $status" +
+ Option(diagnostics).map(msg => s" (diagnostics message: $msg)").getOrElse(""))
+ unregistered = true
+ amClient.unregisterApplicationMaster(status, diagnostics, "")
+ if (amClient != null) {
+ amClient.stop()
+ }
+ }
+ }
+ }
+}
diff --git a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala
new file mode 100644
index 00000000000..7507679b45d
--- /dev/null
+++ b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/deploy/HiveYarnModeSubmitter.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.hive.deploy
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.deploy.EngineYarnModeSubmitter
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+
+object HiveYarnModeSubmitter extends EngineYarnModeSubmitter {
+
+ def main(args: Array[String]): Unit = {
+ Utils.fromCommandLineArgs(args, kyuubiConf)
+ // Initialize the engine submitter.
+ init()
+ // Submit the engine application to YARN.
+ submitApplication()
+ }
+
+ private def init(): Unit = {
+ yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
+ hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
+ }
+
+ override var engineType: String = "hive"
+
+ override def amClass(): String = "org.apache.kyuubi.engine.hive.deploy.ApplicationMaster"
+}
diff --git a/integration-tests/kyuubi-hive-it/pom.xml b/integration-tests/kyuubi-hive-it/pom.xml
index c4e9f320c95..cdd9fa4d99b 100644
--- a/integration-tests/kyuubi-hive-it/pom.xml
+++ b/integration-tests/kyuubi-hive-it/pom.xml
@@ -68,6 +68,37 @@
test-jar
test
+
+
+
+ org.apache.hadoop
+ hadoop-client-minicluster
+ test
+
+
+
+ org.bouncycastle
+ bcprov-jdk15on
+ test
+
+
+
+ org.bouncycastle
+ bcpkix-jdk15on
+ test
+
+
+
+ jakarta.activation
+ jakarta.activation-api
+ test
+
+
+
+ jakarta.xml.bind
+ jakarta.xml.bind-api
+ test
+
diff --git a/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEngineYarnModeSuite.scala b/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEngineYarnModeSuite.scala
new file mode 100644
index 00000000000..4e11b8606bb
--- /dev/null
+++ b/integration-tests/kyuubi-hive-it/src/test/scala/org/apache/kyuubi/it/hive/operation/KyuubiOperationHiveEngineYarnModeSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.it.hive.operation
+
+import org.apache.kyuubi.{HiveEngineTests, Utils, WithKyuubiServerAndHadoopMiniCluster}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_IDLE_TIMEOUT, ENGINE_TYPE, KYUUBI_ENGINE_ENV_PREFIX, KYUUBI_HOME}
+import org.apache.kyuubi.engine.deploy.YarnMode
+
+class KyuubiOperationHiveEngineYarnModeSuite extends HiveEngineTests
+ with WithKyuubiServerAndHadoopMiniCluster {
+
+ override protected val conf: KyuubiConf = {
+ val metastore = Utils.createTempDir(prefix = getClass.getSimpleName)
+ metastore.toFile.delete()
+ KyuubiConf()
+ .set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
+ .set(ENGINE_TYPE, "HIVE_SQL")
+ .set(KyuubiConf.ENGINE_DEPLOY_MODE, YarnMode.name)
+ // increase this to 30s as hive session state and metastore client is slow initializing
+ .setIfMissing(ENGINE_IDLE_TIMEOUT, 30000L)
+ .set("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true")
+ }
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ conf
+ .set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MEMORY, Math.min(getYarnMaximumAllocationMb, 1024))
+ .set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MAX_APP_ATTEMPTS, 1)
+ .set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_CORES, 1)
+ }
+
+ override protected def jdbcUrl: String = getJdbcUrl
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
index 896ed9df29d..ea4fd8eb4fe 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
@@ -159,6 +159,17 @@ object Utils extends Logging {
dir
}
+ /**
+ * List the files recursively in a directory.
+ */
+ def listFilesRecursive(file: File): Seq[File] = {
+ if (!file.isDirectory) {
+ file :: Nil
+ } else {
+ file.listFiles().flatMap(listFilesRecursive)
+ }
+ }
+
/**
* Copies bytes from an InputStream source to a newly created temporary file
* created in the directory destination. The temporary file will be created
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 00c1b89956e..a6e2ee85f2c 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -30,6 +30,7 @@ import scala.util.matching.Regex
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.{EngineType, ShareLevel}
+import org.apache.kyuubi.engine.deploy.LocalMode
import org.apache.kyuubi.operation.{NoneMode, PlainStyle}
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}
@@ -231,6 +232,7 @@ object KyuubiConf {
final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf"
final val KYUUBI_HOME = "KYUUBI_HOME"
final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv"
+ final val KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX = "kyuubi.engine.yarn.AMEnv"
final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf"
final val KYUUBI_KUBERNETES_CONF_PREFIX = "kyuubi.kubernetes"
final val USER_DEFAULTS_CONF_QUOTE = "___"
@@ -2677,6 +2679,79 @@ object KyuubiConf {
.stringConf
.createOptional
+ val ENGINE_DEPLOY_MODE: ConfigEntry[String] =
+ buildConf("kyuubi.engine.deploy.mode")
+ .doc("Configures the engine deploy mode, The value can be 'local', 'yarn'. " +
+ "In local mode, the engine operates on the same node as the KyuubiServer. " +
+ "In YARN mode, the engine runs within the Application Master (AM) container of YARN. " +
+ "Currently, only Hive engine supports YARN mode.")
+ .version("1.9.0")
+ .stringConf
+ .transformToUpperCase
+ .checkValue(
+ mode => Set("LOCAL", "YARN").contains(mode),
+ "Invalid value for 'kyuubi.engine.hive.deploy.mode'. Valid values are 'local', 'yarn'.")
+ .createWithDefault(LocalMode.name)
+
+ val ENGINE_DEPLOY_YARN_MODE_STAGING_DIR: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.yarn.stagingDir")
+ .doc("Staging directory used while submitting kyuubi engine to YARN.")
+ .version("1.9.0")
+ .stringConf
+ .createOptional
+
+ val ENGINE_DEPLOY_YARN_MODE_REPORT_INTERVAL: ConfigEntry[Long] =
+ buildConf("kyuubi.engine.yarn.report.interval")
+ .doc("Interval between reports of the current engine on yarn app status.")
+ .version("1.9.0")
+ .timeConf
+ .checkValue(t => t > 0, "must be positive integer")
+ .createWithDefault(Duration.ofSeconds(1).toMillis)
+
+ val ENGINE_DEPLOY_YARN_MODE_TAGS: OptionalConfigEntry[Seq[String]] =
+ buildConf("kyuubi.engine.yarn.tags")
+ .doc(s"kyuubi engine yarn tags when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
+ .version("1.9.0")
+ .stringConf
+ .toSequence()
+ .createOptional
+
+ val ENGINE_DEPLOY_YARN_MODE_QUEUE: ConfigEntry[String] =
+ buildConf("kyuubi.engine.yarn.queue")
+ .doc(s"kyuubi engine yarn queue when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
+ .version("1.9.0")
+ .stringConf
+ .createWithDefault("default")
+
+ val ENGINE_DEPLOY_YARN_MODE_MAX_APP_ATTEMPTS: OptionalConfigEntry[Int] =
+ buildConf("kyuubi.engine.yarn.maxAppAttempts")
+ .doc(s"Maximum number of AM attempts before failing the app " +
+ s"when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
+ .version("1.9.0")
+ .intConf
+ .createOptional
+
+ val ENGINE_DEPLOY_YARN_MODE_MEMORY: ConfigEntry[Int] =
+ buildConf("kyuubi.engine.yarn.memory")
+ .doc(s"kyuubi engine container memory in mb when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
+ .version("1.9.0")
+ .intConf
+ .createWithDefault(1024)
+
+ val ENGINE_DEPLOY_YARN_MODE_CORES: ConfigEntry[Int] =
+ buildConf("kyuubi.engine.yarn.cores")
+ .doc(s"kyuubi engine container core number when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
+ .version("1.9.0")
+ .intConf
+ .createWithDefault(1)
+
+ val ENGINE_DEPLOY_YARN_MODE_JAVA_OPTIONS: OptionalConfigEntry[String] =
+ buildConf("kyuubi.engine.yarn.java.options")
+ .doc(s"The extra Java options for the AM when `${ENGINE_DEPLOY_MODE.key}` is YARN.")
+ .version("1.9.0")
+ .stringConf
+ .createOptional
+
val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.flink.memory")
.doc("The heap memory for the Flink SQL engine. Only effective in yarn session mode.")
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/DeployMode.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/DeployMode.scala
new file mode 100644
index 00000000000..004f13b9dfa
--- /dev/null
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/DeployMode.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.deploy
+
+import java.util.Locale
+
+import org.apache.kyuubi.Logging
+
+sealed trait DeployMode {
+
+ /**
+ * String name of the engine deploy mode.
+ */
+ def name: String
+}
+
+case object LocalMode extends DeployMode { val name = "local" }
+
+case object YarnMode extends DeployMode { val name = "yarn" }
+
+case object KubernetesMode extends DeployMode { val name = "kubernetes" }
+
+object DeployMode extends Logging {
+
+ /**
+ * Returns the engine deploy mode from the given string.
+ */
+ def fromString(mode: String): DeployMode = mode.toLowerCase(Locale.ROOT) match {
+ case LocalMode.name => LocalMode
+ case YarnMode.name => YarnMode
+ case KubernetesMode.name => KubernetesMode
+ case _ =>
+ warn(s"Unknown deploy mode: $mode, fallback to local mode.")
+ LocalMode
+ }
+}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/EngineYarnModeSubmitter.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/EngineYarnModeSubmitter.scala
new file mode 100644
index 00000000000..5dc13bb724b
--- /dev/null
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/EngineYarnModeSubmitter.scala
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.deploy
+
+import java.io.{File, FileOutputStream, InterruptedIOException, IOException, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.util
+import java.util.{Locale, Properties}
+import java.util.zip.{ZipEntry, ZipOutputStream}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.permission.FsPermission
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.yarn.api.ApplicationConstants
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
+import org.apache.hadoop.yarn.util.Records
+
+import org.apache.kyuubi.{KyuubiException, Logging, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.deploy.EngineYarnModeSubmitter._
+import org.apache.kyuubi.util.KyuubiHadoopUtils
+
+abstract class EngineYarnModeSubmitter extends Logging {
+
+ val KYUUBI_ENGINE_STAGING: String = ".kyuubiEngineStaging"
+
+ val LOCALIZED_LIB_DIR = "__kyuubi_engine_libs__"
+ val LOCALIZED_CONF_DIR = "__kyuubi_engine_conf__"
+
+ val KYUUBI_CONF_FILE = "kyuubi-defaults.conf"
+ val CORE_SITE_FILE = "core-site.xml"
+ val YARN_SITE_FILE = "yarn-site.xml"
+
+ val STAGING_DIR_PERMISSION: FsPermission =
+ FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
+
+ @volatile private var yarnClient: YarnClient = _
+ private var appId: ApplicationId = _
+
+ private var stagingDirPath: Path = _
+
+ val kyuubiConf = new KyuubiConf()
+
+ var yarnConf: Configuration = _
+ var hadoopConf: Configuration = _
+
+ def amClass(): String
+
+ var engineType: String
+
+ protected def submitApplication(): Unit = {
+ assert(
+ hadoopConf != null && yarnConf != null,
+ "Hadoop Configuration is not initialized. " +
+ "Please initialize it before submitting application.")
+ try {
+ yarnClient = YarnClient.createYarnClient()
+ yarnClient.init(yarnConf)
+ yarnClient.start()
+
+ debug("Requesting a new application from cluster with %d NodeManagers"
+ .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
+
+ val newApp = yarnClient.createApplication()
+ val newAppResponse = newApp.getNewApplicationResponse
+ appId = newAppResponse.getApplicationId
+
+ // The app staging dir based on the STAGING_DIR configuration if configured
+ // otherwise based on the users home directory.
+ val appStagingBaseDir = kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_STAGING_DIR)
+ .map { new Path(_, UserGroupInformation.getCurrentUser.getShortUserName) }
+ .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
+ stagingDirPath = new Path(appStagingBaseDir, buildPath(KYUUBI_ENGINE_STAGING, appId.toString))
+
+ // Set up the appropriate contexts to launch AM
+ val containerContext = createContainerLaunchContext()
+ val appContext = createApplicationSubmissionContext(newApp, containerContext)
+
+ // Finally, submit and monitor the application
+ info(s"Submitting application $appId to ResourceManager")
+ yarnClient.submitApplication(appContext)
+ monitorApplication(appId)
+ } catch {
+ case e: Throwable =>
+ if (stagingDirPath != null) {
+ cleanupStagingDir()
+ }
+ throw new KyuubiException("Failed to submit application to YARN", e)
+ } finally {
+ if (yarnClient != null) {
+ yarnClient.stop()
+ }
+ }
+ }
+
+ private def createContainerLaunchContext(): ContainerLaunchContext = {
+ info("Setting up container launch context for engine AM")
+ val env = setupLaunchEnv(kyuubiConf)
+ val localResources = prepareLocalResources(stagingDirPath, env)
+
+ val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
+ amContainer.setLocalResources(localResources.asJava)
+ amContainer.setEnvironment(env.asJava)
+
+ val javaOpts = ListBuffer[String]()
+
+ val javaOptions = kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_JAVA_OPTIONS)
+ if (javaOptions.isDefined) {
+ javaOpts += javaOptions.get
+ }
+
+ val mainClass = Seq(amClass())
+
+ val commands =
+ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
+ javaOpts ++ mainClass ++
+ Seq(
+ "1>",
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
+ "2>",
+ ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
+
+ val printableCommands = commands.map(s => if (s == null) "null" else s).toList
+ amContainer.setCommands(printableCommands.asJava)
+ info(s"Commands: ${printableCommands.mkString(" ")}")
+
+ amContainer
+ }
+
+ private def prepareLocalResources(
+ destDir: Path,
+ env: mutable.HashMap[String, String]): mutable.HashMap[String, LocalResource] = {
+ info("Preparing resources for engine AM container")
+ // Upload kyuubi engine and the extra JAR to the remote file system if necessary,
+ // and add them as local resources to the application master.
+ val fs = destDir.getFileSystem(hadoopConf)
+
+ val localResources = mutable.HashMap[String, LocalResource]()
+ FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
+
+ distributeJars(localResources, env)
+ distributeConf(localResources)
+ localResources
+ }
+
+ private def distributeJars(
+ localResources: mutable.HashMap[String, LocalResource],
+ env: mutable.HashMap[String, String]): Unit = {
+ val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip", Utils.createTempDir().toFile)
+ val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive))
+ try {
+ jarsStream.setLevel(0)
+ val jars = kyuubiConf.getOption(KYUUBI_ENGINE_DEPLOY_YARN_MODE_JARS_KEY)
+ assert(jars.isDefined, "No jars specified for engine AM")
+ val putedEntry = new ListBuffer[String]
+ jars.get.split(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR).foreach { path =>
+ val jars = Utils.listFilesRecursive(new File(path))
+ jars.foreach { f =>
+ if (!putedEntry.contains(f.getName) && f.isFile && f.getName.toLowerCase(
+ Locale.ROOT).endsWith(".jar") && f.canRead) {
+ jarsStream.putNextEntry(new ZipEntry(f.getName))
+ Files.copy(f.toPath, jarsStream)
+ jarsStream.closeEntry()
+ putedEntry += f.getName
+ addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_LIB_DIR, f.getName), env)
+ }
+ }
+ }
+ putedEntry.clear()
+ } finally {
+ jarsStream.close()
+ }
+
+ distribute(
+ new Path(jarsArchive.getAbsolutePath),
+ resType = LocalResourceType.ARCHIVE,
+ destName = LOCALIZED_LIB_DIR,
+ localResources)
+ }
+
+ private def distributeConf(localResources: mutable.HashMap[String, LocalResource]): Unit = {
+ val confArchive = File.createTempFile(LOCALIZED_CONF_DIR, ".zip", Utils.createTempDir().toFile)
+ val confStream = new ZipOutputStream(new FileOutputStream(confArchive))
+ try {
+ confStream.setLevel(0)
+ val confs = kyuubiConf.getOption(KYUUBI_ENGINE_DEPLOY_YARN_MODE_CONF_KEY)
+ assert(confs.isDefined, "No conf specified for engine AM")
+ listDistinctFiles(confs.get).foreach { f =>
+ if (f.isFile && f.canRead) {
+ confStream.putNextEntry(new ZipEntry(f.getName))
+ Files.copy(f.toPath, confStream)
+ confStream.closeEntry()
+ }
+ }
+
+ val properties = confToProperties(kyuubiConf)
+ writePropertiesToArchive(properties, KYUUBI_CONF_FILE, confStream)
+ } finally {
+ confStream.close()
+ }
+
+ distribute(
+ new Path(confArchive.getAbsolutePath),
+ resType = LocalResourceType.ARCHIVE,
+ destName = LOCALIZED_CONF_DIR,
+ localResources)
+ }
+
+ def listDistinctFiles(archive: String): Seq[File] = {
+ val distinctFiles = new mutable.LinkedHashSet[File]
+ archive.split(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR).foreach {
+ path =>
+ val file = new File(path)
+ val files = Utils.listFilesRecursive(file)
+ files.foreach { f =>
+ if (f.isFile && f.canRead) {
+ distinctFiles += f
+ }
+ }
+ }
+ distinctFiles.groupBy(_.getName).map {
+ case (_, items) => items.head
+ }.toSeq
+ }
+
+ private def distribute(
+ srcPath: Path,
+ resType: LocalResourceType,
+ destName: String,
+ localResources: mutable.HashMap[String, LocalResource]): Unit = {
+ val fs = stagingDirPath.getFileSystem(hadoopConf)
+ val destPath = new Path(stagingDirPath, srcPath.getName)
+ info(s"Copying $srcPath to $destPath")
+ fs.copyFromLocalFile(srcPath, destPath)
+ fs.setPermission(destPath, new FsPermission(STAGING_DIR_PERMISSION))
+
+ val destFs = FileSystem.get(destPath.toUri, hadoopConf)
+ val destStatus = destFs.getFileStatus(destPath)
+
+ val destResource = Records.newRecord(classOf[LocalResource])
+ destResource.setType(resType)
+ destResource.setVisibility(LocalResourceVisibility.APPLICATION)
+ destResource.setResource(URL.fromPath(destPath))
+ destResource.setTimestamp(destStatus.getModificationTime)
+ destResource.setSize(destStatus.getLen)
+ localResources(destName) = destResource
+ }
+
+ private[kyuubi] def setupLaunchEnv(kyuubiConf: KyuubiConf): mutable.HashMap[String, String] = {
+ info("Setting up the launch environment for engine AM container")
+ val env = new mutable.HashMap[String, String]()
+
+ kyuubiConf.getAll
+ .filter { case (k, _) => k.startsWith(KyuubiConf.KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX) }
+ .map { case (k, v) =>
+ (k.substring(KyuubiConf.KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX.length + 1), v)
+ }
+ .foreach { case (k, v) => KyuubiHadoopUtils.addPathToEnvironment(env, k, v) }
+
+ addClasspathEntry(buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR), env)
+ env.put(Environment.HADOOP_CONF_DIR.name(), buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR))
+ env
+ }
+
+ private def createApplicationSubmissionContext(
+ newApp: YarnClientApplication,
+ containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
+
+ val appContext = newApp.getApplicationSubmissionContext
+ appContext.setApplicationName(s"Apache Kyuubi $engineType Engine")
+ appContext.setQueue(kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_QUEUE))
+ appContext.setAMContainerSpec(containerContext)
+
+ val allTags = new util.HashSet[String]
+ kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_TAGS).foreach { tags =>
+ allTags.addAll(tags.asJava)
+ }
+ appContext.setApplicationTags(allTags)
+
+ kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_MAX_APP_ATTEMPTS) match {
+ case Some(v) => appContext.setMaxAppAttempts(v)
+ case None => debug(s"${ENGINE_DEPLOY_YARN_MODE_MAX_APP_ATTEMPTS.key} is not set. " +
+ "Cluster's default value will be used.")
+ }
+
+ val capability = Records.newRecord(classOf[Resource])
+ capability.setMemorySize(kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_MEMORY))
+ capability.setVirtualCores(kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_CORES))
+ debug(s"Created resource capability for AM request: $capability")
+ appContext.setResource(capability)
+
+ appContext
+ }
+
+ private def monitorApplication(appId: ApplicationId): Unit = {
+ if (kyuubiConf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)) {
+ val YarnAppReport(appState, finalState, diags) = monitorApplication()
+ info(s"Application report for $appId (state: $appState, final state: $finalState)")
+ if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
+ diags.foreach { err =>
+ error(s"Application diagnostics message: $err")
+ }
+ throw new KyuubiException(s"Application $appId finished with failed status")
+ }
+ if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
+ throw new KyuubiException(s"Application $appId is killed")
+ }
+ } else {
+ val report = yarnClient.getApplicationReport(appId)
+ val state = report.getYarnApplicationState
+ info(s"Application report for $appId (state: $state)")
+ if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
+ throw new KyuubiException(s"Application $appId finished with status: $state")
+ }
+ }
+ }
+
+ private def cleanupStagingDir(): Unit = {
+ try {
+ val fs = stagingDirPath.getFileSystem(hadoopConf)
+ if (fs.delete(stagingDirPath, true)) {
+ info(s"Deleted staging directory $stagingDirPath")
+ }
+ } catch {
+ case ioe: IOException =>
+ warn("Failed to cleanup staging dir " + stagingDirPath, ioe)
+ }
+ }
+
+ private def monitorApplication(
+ interval: Long = kyuubiConf.get(ENGINE_DEPLOY_YARN_MODE_REPORT_INTERVAL)): YarnAppReport = {
+ var lastState: YarnApplicationState = null
+ while (true) {
+ Thread.sleep(interval)
+ val report: ApplicationReport =
+ try {
+ yarnClient.getApplicationReport(appId)
+ } catch {
+ case e: ApplicationNotFoundException =>
+ error(s"Application $appId not found.")
+ cleanupStagingDir()
+ return YarnAppReport(YarnApplicationState.KILLED, FinalApplicationStatus.KILLED, None)
+ case NonFatal(e) if !e.isInstanceOf[InterruptedIOException] =>
+ val msg = s"Failed to contact YARN for application $appId."
+ error(msg, e)
+ // Don't necessarily clean up staging dir because status is unknown
+ return YarnAppReport(
+ YarnApplicationState.FAILED,
+ FinalApplicationStatus.FAILED,
+ Some(msg))
+ }
+ val state = report.getYarnApplicationState
+ if (state == YarnApplicationState.FINISHED ||
+ state == YarnApplicationState.FAILED ||
+ state == YarnApplicationState.KILLED) {
+ cleanupStagingDir()
+ return createAppReport(report)
+ }
+
+ if (state == YarnApplicationState.RUNNING) {
+ return createAppReport(report)
+ }
+ lastState = state
+ }
+ // Never reached, but keeps compiler happy
+ throw new KyuubiException("While loop is depleted! This should never happen...")
+ }
+
+ private def createAppReport(report: ApplicationReport): YarnAppReport = {
+ val diags = report.getDiagnostics
+ val diagsOpt = if (diags != null && diags.nonEmpty) Some(diags) else None
+ YarnAppReport(report.getYarnApplicationState, report.getFinalApplicationStatus, diagsOpt)
+ }
+
+ /**
+ * Joins all the path components using Path.SEPARATOR.
+ */
+ private def buildPath(components: String*): String = {
+ components.mkString(Path.SEPARATOR)
+ }
+
+ /**
+ * Add the given path to the classpath entry of the given environment map.
+ * If the classpath is already set, this appends the new path to the existing classpath.
+ */
+ private def addClasspathEntry(path: String, env: mutable.HashMap[String, String]): Unit =
+ KyuubiHadoopUtils.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
+
+ private def confToProperties(conf: KyuubiConf): Properties = {
+ val props = new Properties()
+ conf.getAll.foreach { case (k, v) =>
+ props.setProperty(k, v)
+ }
+ props
+ }
+
+ def writePropertiesToArchive(props: Properties, name: String, out: ZipOutputStream): Unit = {
+ out.putNextEntry(new ZipEntry(name))
+ val writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)
+ props.store(writer, "Kyuubi configuration.")
+ writer.flush()
+ out.closeEntry()
+ }
+
+ def writeConfigurationToArchive(
+ conf: Configuration,
+ name: String,
+ out: ZipOutputStream): Unit = {
+ out.putNextEntry(new ZipEntry(name))
+ val writer = new OutputStreamWriter(out, StandardCharsets.UTF_8)
+ conf.writeXml(writer)
+ writer.flush()
+ out.closeEntry()
+ }
+}
+
+object EngineYarnModeSubmitter {
+ final val KYUUBI_ENGINE_DEPLOY_YARN_MODE_JARS_KEY = "kyuubi.engine.deploy.yarn.mode.jars"
+ final val KYUUBI_ENGINE_DEPLOY_YARN_MODE_CONF_KEY = "kyuubi.engine.deploy.yarn.mode.conf"
+
+ final val KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR = ","
+}
+
+case class YarnAppReport(
+ appState: YarnApplicationState,
+ finalState: FinalApplicationStatus,
+ diagnostics: Option[String])
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
index 19e2e31eafe..0afa3dbe6e8 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TBinaryFrontendService.scala
@@ -186,4 +186,8 @@ abstract class TBinaryFrontendService(name: String)
server.foreach(_.stop())
server = None
}
+
+ def listenerTBinaryFrontendAddress(): (String, Int) = {
+ (serverAddr.getHostName, actualPort)
+ }
}
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
index 4959c845d49..2d9ea4a8ad5 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/util/KyuubiHadoopUtils.scala
@@ -21,6 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da
import java.util.{Base64, Map => JMap}
import scala.collection.JavaConverters._
+import scala.collection.mutable.HashMap
import scala.util.{Failure, Success, Try}
import org.apache.hadoop.conf.Configuration
@@ -29,6 +30,7 @@ import org.apache.hadoop.io.Text
import org.apache.hadoop.security.{Credentials, SecurityUtil}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier
+import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.kyuubi.Logging
@@ -98,4 +100,18 @@ object KyuubiHadoopUtils extends Logging {
None
}
}
+
+ /**
+ * Add a path variable to the given environment map.
+ * If the map already contains this key, append the value to the existing value instead.
+ */
+ def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = {
+ val newValue =
+ if (env.contains(key)) {
+ env(key) + ApplicationConstants.CLASS_PATH_SEPARATOR + value
+ } else {
+ value
+ }
+ env.put(key, newValue)
+ }
}
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/deploy/EngineYarnModeSubmitterSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/deploy/EngineYarnModeSubmitterSuite.scala
new file mode 100644
index 00000000000..dd17f8d9554
--- /dev/null
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/engine/deploy/EngineYarnModeSubmitterSuite.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.deploy
+
+import java.io.File
+
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.scalatest.matchers.must.Matchers
+import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.deploy.EngineYarnModeSubmitter.KYUUBI_ENGINE_DEPLOY_YARN_MODE_JARS_KEY
+
+class EngineYarnModeSubmitterSuite extends KyuubiFunSuite with Matchers {
+
+ val kyuubiHome: String = Utils.getCodeSourceLocation(getClass).split("kyuubi-common").head
+
+ test("Classpath should contain engine jars dir and conf dir") {
+ val kyuubiConf = new KyuubiConf()
+ .set(KYUUBI_ENGINE_DEPLOY_YARN_MODE_JARS_KEY, "mock.jar")
+
+ val env = MockEngineYarnModeSubmitter.setupLaunchEnv(kyuubiConf)
+ assert(env.contains(Environment.HADOOP_CONF_DIR.name()))
+
+ val cp = env("CLASSPATH").split(":|;|")
+
+ assert(cp.length == 1)
+ cp should contain("{{PWD}}/__kyuubi_engine_conf__")
+ }
+
+ test("container env should contain engine env") {
+ val kyuubiConf = new KyuubiConf()
+ .set(s"${KyuubiConf.KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX}.KYUUBI_HOME", kyuubiHome)
+
+ val env = MockEngineYarnModeSubmitter.setupLaunchEnv(kyuubiConf)
+ assert(env.nonEmpty)
+ assert(env.contains("KYUUBI_HOME"))
+ assert(env("KYUUBI_HOME") == kyuubiHome)
+ }
+
+ test("distinct archive files") {
+ val targetJars: String = s"${Utils.getCodeSourceLocation(getClass)}"
+ // double the jars to make sure the distinct works
+ val archives = s"$targetJars,$targetJars"
+ val files = MockEngineYarnModeSubmitter.listDistinctFiles(archives)
+ val targetFiles = Utils.listFilesRecursive(new File(targetJars))
+ assert(targetFiles != null)
+ assert(targetFiles.length == files.length)
+ }
+
+}
+
+object MockEngineYarnModeSubmitter extends EngineYarnModeSubmitter {
+ override var engineType: String = "mock"
+
+ override def amClass(): String = "org.apache.kyuubi.engine.deploy.Mock"
+}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 6122a6f138f..6486a657ba7 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -196,7 +196,7 @@ private[kyuubi] class EngineRef(
new TrinoProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case HIVE_SQL =>
conf.setIfMissing(HiveProcessBuilder.HIVE_ENGINE_NAME, defaultEngineName)
- new HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
+ HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case JDBC =>
new JdbcProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
case CHAT =>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index f8b64005359..eca846d2414 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -103,6 +103,13 @@ object KyuubiApplicationManager {
conf.set(SparkProcessBuilder.TAG_KEY, newTag)
}
+ private def setupEngineYarnModeTag(tag: String, conf: KyuubiConf): Unit = {
+ val originalTag =
+ conf.getOption(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_TAGS.key).map(_ + ",").getOrElse("")
+ val newTag = s"${originalTag}KYUUBI" + Some(tag).filterNot(_.isEmpty).map("," + _).getOrElse("")
+ conf.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_TAGS.key, newTag)
+ }
+
private def setupSparkK8sTag(tag: String, conf: KyuubiConf): Unit = {
conf.set("spark.kubernetes.driver.label." + LABEL_KYUUBI_UNIQUE_KEY, tag)
}
@@ -182,6 +189,8 @@ object KyuubiApplicationManager {
// running flink on other platforms is not yet supported
setupFlinkYarnTag(applicationTag, conf)
// other engine types are running locally yet
+ case ("Hive", Some("YARN")) =>
+ setupEngineYarnModeTag(applicationTag, conf)
case _ =>
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
index d8e4454b610..46919e2e003 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala
@@ -26,10 +26,11 @@ import com.google.common.annotations.VisibleForTesting
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_HIVE_EXTRA_CLASSPATH, ENGINE_HIVE_JAVA_OPTIONS, ENGINE_HIVE_MEMORY}
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_DEPLOY_MODE, ENGINE_HIVE_EXTRA_CLASSPATH, ENGINE_HIVE_JAVA_OPTIONS, ENGINE_HIVE_MEMORY}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, KYUUBI_SESSION_USER_KEY}
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
-import org.apache.kyuubi.engine.hive.HiveProcessBuilder._
+import org.apache.kyuubi.engine.deploy.{DeployMode, LocalMode, YarnMode}
+import org.apache.kyuubi.engine.hive.HiveProcessBuilder.HIVE_HADOOP_CLASSPATH_KEY
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.command.CommandLineUtils._
@@ -45,7 +46,7 @@ class HiveProcessBuilder(
this(proxyUser, conf, "")
}
- private val hiveHome: String = getEngineHome(shortName)
+ protected val hiveHome: String = getEngineHome(shortName)
override protected def module: String = "kyuubi-hive-sql-engine"
@@ -113,7 +114,24 @@ class HiveProcessBuilder(
override def shortName: String = "hive"
}
-object HiveProcessBuilder {
+object HiveProcessBuilder extends Logging {
final val HIVE_HADOOP_CLASSPATH_KEY = "HIVE_HADOOP_CLASSPATH"
final val HIVE_ENGINE_NAME = "hive.engine.name"
+
+ def apply(
+ appUser: String,
+ conf: KyuubiConf,
+ engineRefId: String,
+ extraEngineLog: Option[OperationLog]): HiveProcessBuilder = {
+ val modeStr = conf.get(ENGINE_DEPLOY_MODE)
+ DeployMode.fromString(conf.get(ENGINE_DEPLOY_MODE)) match {
+ case LocalMode => new HiveProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
+ case YarnMode =>
+ warn(s"Hive on YARN model is experimental, please use it carefully. " +
+ s"Set `${ENGINE_DEPLOY_MODE.key}` to `${LocalMode.toString}` " +
+ s"if you want to use it on production.")
+ new HiveYarnModeProcessBuilder(appUser, conf, engineRefId, extraEngineLog)
+ case _ => throw new KyuubiException(s"Unsupported deploy mode: $modeStr")
+ }
+ }
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilder.scala
new file mode 100644
index 00000000000..e0f6332664d
--- /dev/null
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveYarnModeProcessBuilder.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.hive
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.kyuubi.{KyuubiException, Logging, SCALA_COMPILE_VERSION}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_HIVE_EXTRA_CLASSPATH, ENGINE_HIVE_MEMORY}
+import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, KYUUBI_SESSION_USER_KEY}
+import org.apache.kyuubi.engine.KyuubiApplicationManager
+import org.apache.kyuubi.engine.deploy.EngineYarnModeSubmitter._
+import org.apache.kyuubi.engine.hive.HiveProcessBuilder.HIVE_HADOOP_CLASSPATH_KEY
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.util.command.CommandLineUtils.{confKeyValue, confKeyValues}
+
+/**
+ * A process builder for Hive on Yarn.
+ *
+ * It will new a process on kyuubi server side to submit hive engine to yarn.
+ */
+class HiveYarnModeProcessBuilder(
+ override val proxyUser: String,
+ override val conf: KyuubiConf,
+ override val engineRefId: String,
+ override val extraEngineLog: Option[OperationLog] = None)
+ extends HiveProcessBuilder(proxyUser, conf, engineRefId, extraEngineLog) with Logging {
+
+ override protected def mainClass: String =
+ "org.apache.kyuubi.engine.hive.deploy.HiveYarnModeSubmitter"
+
+ override def isClusterMode(): Boolean = true
+
+ override def clusterManager(): Option[String] = Some("yarn")
+
+ override protected val commands: Iterable[String] = {
+ KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf)
+ val buffer = new ArrayBuffer[String]()
+ buffer += executable
+
+ val memory = conf.get(ENGINE_HIVE_MEMORY)
+ buffer += s"-Xmx$memory"
+ buffer += "-cp"
+
+ val classpathEntries = new util.LinkedHashSet[String]
+ classpathEntries.addAll(confFiles(true))
+ classpathEntries.addAll(jarFiles(true))
+
+ buffer += classpathEntries.asScala.mkString(File.pathSeparator)
+ buffer += mainClass
+
+ buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)
+ buffer ++= confKeyValue(KYUUBI_ENGINE_ID, engineRefId)
+
+ buffer ++= confKeyValue(
+ KYUUBI_ENGINE_DEPLOY_YARN_MODE_JARS_KEY,
+ jarFiles(false).asScala.mkString(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR))
+ buffer ++= confKeyValue(
+ KYUUBI_ENGINE_DEPLOY_YARN_MODE_CONF_KEY,
+ confFiles(false).asScala.mkString(KYUUBI_ENGINE_DEPLOY_YARN_MODE_ARCHIVE_SEPARATOR))
+
+ buffer ++= confKeyValues(conf.getAll)
+
+ buffer
+ }
+
+ private def jarFiles(isClasspath: Boolean): util.LinkedHashSet[String] = {
+ val jarEntries = new util.LinkedHashSet[String]
+
+ mainResource.foreach(jarEntries.add)
+
+ jarEntries.add(s"$hiveHome${File.separator}lib${appendClasspathSuffix(isClasspath)}")
+
+ val hadoopCp = env.get(HIVE_HADOOP_CLASSPATH_KEY)
+ hadoopCp.foreach(jarEntries.add)
+
+ val extraCp = conf.get(ENGINE_HIVE_EXTRA_CLASSPATH)
+ extraCp.foreach(jarEntries.add)
+
+ if (hadoopCp.isEmpty && extraCp.isEmpty) {
+ mainResource.foreach { path =>
+ val devHadoopJars = Paths.get(path).getParent
+ .resolve(s"scala-$SCALA_COMPILE_VERSION")
+ .resolve("jars")
+ if (!Files.exists(devHadoopJars)) {
+ throw new KyuubiException(s"The path $devHadoopJars does not exists. " +
+ s"Please set ${HIVE_HADOOP_CLASSPATH_KEY} or ${ENGINE_HIVE_EXTRA_CLASSPATH.key} for " +
+ s"configuring location of hadoop client jars, etc")
+ }
+ jarEntries.add(s"$devHadoopJars${appendClasspathSuffix(isClasspath)}")
+ }
+ }
+
+ jarEntries
+ }
+
+ private def confFiles(isClasspath: Boolean): util.LinkedHashSet[String] = {
+ val confEntries = new util.LinkedHashSet[String]
+ confEntries.add(env.getOrElse(
+ "HIVE_CONF_DIR",
+ s"$hiveHome${File.separator}conf${appendClasspathSuffix(isClasspath)}"))
+ env.get("HADOOP_CONF_DIR").foreach(confEntries.add)
+ env.get("YARN_CONF_DIR").foreach(confEntries.add)
+
+ confEntries
+ }
+
+ private def appendClasspathSuffix(isClasspath: Boolean): String = {
+ if (isClasspath) {
+ s"${File.separator}*"
+ } else {
+ ""
+ }
+ }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerAndHadoopMiniCluster.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerAndHadoopMiniCluster.scala
new file mode 100644
index 00000000000..bd11de08d43
--- /dev/null
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerAndHadoopMiniCluster.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi
+
+import java.io.File
+
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
+import org.apache.kyuubi.server.{MiniDFSService, MiniYarnService}
+
+trait WithKyuubiServerAndHadoopMiniCluster extends KyuubiFunSuite with WithKyuubiServer {
+
+ val kyuubiHome: String = Utils.getCodeSourceLocation(getClass).split("integration-tests").head
+
+ override protected val conf: KyuubiConf = new KyuubiConf(false)
+
+ private val hadoopConfDir: File = Utils.createTempDir().toFile
+
+ protected var miniHdfsService: MiniDFSService = _
+
+ protected var miniYarnService: MiniYarnService = _
+
+ override def beforeAll(): Unit = {
+ miniHdfsService = new MiniDFSService()
+ miniHdfsService.initialize(conf)
+ miniHdfsService.start()
+
+ miniYarnService = new MiniYarnService()
+ miniYarnService.initialize(conf)
+ miniYarnService.start()
+
+ miniHdfsService.saveHadoopConf(hadoopConfDir)
+ miniYarnService.saveYarnConf(hadoopConfDir)
+
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.KYUUBI_HOME", kyuubiHome)
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", hadoopConfDir.getAbsolutePath)
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.YARN_CONF_DIR", hadoopConfDir.getAbsolutePath)
+
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ if (miniYarnService != null) {
+ miniYarnService.stop()
+ miniYarnService = null
+ }
+ if (miniHdfsService != null) {
+ miniHdfsService.stop()
+ miniHdfsService = null
+ }
+ }
+
+ def getYarnMaximumAllocationMb: Int = {
+ require(miniYarnService != null, "MiniYarnService is not initialized")
+ miniYarnService.getYarnConf.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024)
+ }
+}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 5a674d98fd0..8d3f7b17d8b 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -54,7 +54,7 @@ sealed trait WithKyuubiServerOnYarn extends WithKyuubiServer {
miniYarnService = new MiniYarnService()
miniYarnService.initialize(conf)
miniYarnService.start()
- conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", miniYarnService.getHadoopConfDir)
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", miniYarnService.getYarnConfDir)
super.beforeAll()
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
index caacbb6bf39..dbc20be8796 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
@@ -60,7 +60,7 @@ class MiniDFSService(name: String, hdfsConf: Configuration)
s"NameNode address in configuration is " +
s"${hdfsConf.get(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)}")
super.start()
- saveHadoopConf()
+ saveHadoopConf(hadoopConfDir)
}
override def stop(): Unit = {
@@ -68,7 +68,7 @@ class MiniDFSService(name: String, hdfsConf: Configuration)
super.stop()
}
- private def saveHadoopConf(): Unit = {
+ def saveHadoopConf(hadoopConfDir: File): Unit = {
val configToWrite = new Configuration(false)
val hostName = InetAddress.getLocalHost.getHostName
hdfsConf.iterator().asScala.foreach { kv =>
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
index 68a175efc4e..deaeae3bed1 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniYarnService.scala
@@ -33,7 +33,7 @@ import org.apache.kyuubi.service.AbstractService
class MiniYarnService extends AbstractService("TestMiniYarnService") {
- private val hadoopConfDir: File = Utils.createTempDir().toFile
+ private val yarnConfDir: File = Utils.createTempDir().toFile
private var yarnConf: YarnConfiguration = {
val yarnConfig = new YarnConfiguration()
// Disable the disk utilization check to avoid the test hanging when people's disks are
@@ -82,7 +82,7 @@ class MiniYarnService extends AbstractService("TestMiniYarnService") {
override def start(): Unit = {
yarnCluster.start()
- saveHadoopConf()
+ saveYarnConf(yarnConfDir)
super.start()
}
@@ -91,7 +91,7 @@ class MiniYarnService extends AbstractService("TestMiniYarnService") {
super.stop()
}
- private def saveHadoopConf(): Unit = {
+ def saveYarnConf(yarnConfDir: File): Unit = {
val configToWrite = new Configuration(false)
val hostName = InetAddress.getLocalHost.getHostName
yarnCluster.getConfig.iterator().asScala.foreach { kv =>
@@ -100,10 +100,12 @@ class MiniYarnService extends AbstractService("TestMiniYarnService") {
configToWrite.set(key, value)
getConf.set(key, value)
}
- val writer = new FileWriter(new File(hadoopConfDir, "yarn-site.xml"))
+ val writer = new FileWriter(new File(yarnConfDir, "yarn-site.xml"))
configToWrite.writeXml(writer)
writer.close()
}
- def getHadoopConfDir: String = hadoopConfDir.getAbsolutePath
+ def getYarnConfDir: String = yarnConfDir.getAbsolutePath
+
+ def getYarnConf: YarnConfiguration = yarnConf
}