Skip to content

Commit

Permalink
[KYUUBI #5867] HiveEngine support run on YARN mode
Browse files Browse the repository at this point in the history
# 🔍 Description
## Issue References 🔗

This PR aims to support hive engine run on yarn mode, close #5867

## Describe Your Solution 🔧

Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.

## Types of changes 🔖

- [ ] Bugfix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklists
## 📝 Author Self Checklist

- [ ] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project
- [ ] I have performed a self-review
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

## 📝 Committer Pre-Merge Checklist

- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested

**Be nice. Be informative.**

Closes #5868 from Yikf/hive-on-yarn.

Closes #5867

44f7287 [yikaifei] fix
3c17d2c [yikaifei] fix test
5474ebf [yikaifei] parse classpath
6b97c42 [Cheng Pan] Update kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
34a67b4 [Cheng Pan] Update kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
5e5045e [yikaifei] fix app type
d1eb5ae [yikaifei] fix
d89d09c [Cheng Pan] Update kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
1fa18ba [Cheng Pan] Update kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala
1b0b77f [Cheng Pan] Update kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/ApplicationMaster.scala
2ed1d44 [Cheng Pan] Update kyuubi-common/src/main/scala/org/apache/kyuubi/engine/deploy/yarn/EngineYarnModeSubmitter.scala
98ff19c [yikaifei] HiveEngine support run on YARN mode

Lead-authored-by: yikaifei <[email protected]>
Co-authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
yikf and pan3793 committed Dec 29, 2023
1 parent c18dd61 commit 679aca5
Show file tree
Hide file tree
Showing 24 changed files with 1,412 additions and 15 deletions.
10 changes: 10 additions & 0 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ class HiveSQLEngine extends Serverable("HiveSQLEngine") {
super.start()
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker(() => stop())
backendService.sessionManager.startTerminatingChecker(() => {
selfExist = true
stop()
})
}

override protected def stopServer(): Unit = {
Expand Down Expand Up @@ -151,7 +154,8 @@ object HiveSQLEngine extends Logging {
}

} catch {
case t: Throwable => currentEngine match {
case t: Throwable =>
currentEngine match {
case Some(engine) =>
engine.stop()
val event = HiveEngineEvent(engine)
Expand All @@ -160,6 +164,7 @@ object HiveSQLEngine extends Logging {
case _ =>
error(s"Failed to start Hive SQL engine: ${t.getMessage}.", t)
}
throw t
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.hive.deploy

import java.io.File

import scala.collection.mutable.ListBuffer

import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf.ENGINE_HIVE_EXTRA_CLASSPATH
import org.apache.kyuubi.engine.deploy.yarn.EngineYarnModeSubmitter
import org.apache.kyuubi.engine.hive.HiveSQLEngine

object HiveYarnModeSubmitter extends EngineYarnModeSubmitter {

def main(args: Array[String]): Unit = {
Utils.fromCommandLineArgs(args, kyuubiConf)
submitApplication()
}

override var engineType: String = "hive"

override def engineMainClass(): String = HiveSQLEngine.getClass.getName

/**
* Jar list for the Hive engine.
*/
override def engineExtraJars(): Seq[File] = {
val hadoopCp = sys.env.get("HIVE_HADOOP_CLASSPATH")
val extraCp = kyuubiConf.get(ENGINE_HIVE_EXTRA_CLASSPATH)
val jars = new ListBuffer[File]
hadoopCp.foreach(cp => parseClasspath(cp, jars))
extraCp.foreach(cp => parseClasspath(cp, jars))
jars.toSeq
}

private[hive] def parseClasspath(classpath: String, jars: ListBuffer[File]): Unit = {
classpath.split(":").filter(_.nonEmpty).foreach { cp =>
if (cp.endsWith("/*")) {
val dir = cp.substring(0, cp.length - 2)
new File(dir) match {
case f if f.isDirectory =>
f.listFiles().filter(_.getName.endsWith(".jar")).foreach(jars += _)
case _ =>
}
} else {
jars += new File(cp)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.hive.deploy

import java.io.File

import scala.collection.mutable.ListBuffer

import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, SCALA_COMPILE_VERSION, Utils}

class HiveYarnModeSubmitterSuite extends KyuubiFunSuite {
val hiveEngineHome: String = Utils.getCodeSourceLocation(getClass).split("/target")(0)

test("hadoop class path") {
val jars = new ListBuffer[File]
val classpath =
s"$hiveEngineHome/target/scala-$SCALA_COMPILE_VERSION/jars/*:" +
s"$hiveEngineHome/target/kyuubi-hive-sql-engine-$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
HiveYarnModeSubmitter.parseClasspath(classpath, jars)
assert(jars.nonEmpty)
assert(jars.exists(
_.getName == s"kyuubi-hive-sql-engine-$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"))
}

}
31 changes: 31 additions & 0 deletions integration-tests/kyuubi-hive-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,37 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- YARN -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.it.hive.operation

import org.apache.kyuubi.{HiveEngineTests, Utils, WithKyuubiServerAndHadoopMiniCluster}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_IDLE_TIMEOUT, ENGINE_TYPE, KYUUBI_ENGINE_ENV_PREFIX, KYUUBI_HOME}
import org.apache.kyuubi.engine.deploy.DeployMode

class KyuubiOperationHiveEngineYarnModeSuite extends HiveEngineTests
with WithKyuubiServerAndHadoopMiniCluster {

override protected val conf: KyuubiConf = {
val metastore = Utils.createTempDir(prefix = getClass.getSimpleName)
metastore.toFile.delete()
KyuubiConf()
.set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
.set(ENGINE_TYPE, "HIVE_SQL")
.set(KyuubiConf.ENGINE_HIVE_DEPLOY_MODE, DeployMode.YARN.toString)
// increase this to 30s as hive session state and metastore client is slow initializing
.setIfMissing(ENGINE_IDLE_TIMEOUT, 30000L)
.set("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true")
}

override def beforeAll(): Unit = {
super.beforeAll()
conf
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MEMORY, Math.min(getYarnMaximumAllocationMb, 1024))
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_CORES, 1)
}

override protected def jdbcUrl: String = getJdbcUrl
}
11 changes: 11 additions & 0 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ object Utils extends Logging {
dir
}

/**
* List the files recursively in a directory.
*/
def listFilesRecursively(file: File): Seq[File] = {
if (!file.isDirectory) {
file :: Nil
} else {
file.listFiles().flatMap(listFilesRecursively)
}
}

/**
* Copies bytes from an InputStream source to a newly created temporary file
* created in the directory destination. The temporary file will be created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import scala.util.matching.Regex
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.{EngineType, ShareLevel}
import org.apache.kyuubi.engine.deploy.DeployMode
import org.apache.kyuubi.operation.{NoneMode, PlainStyle}
import org.apache.kyuubi.service.authentication.{AuthTypes, SaslQOP}

Expand Down Expand Up @@ -231,6 +232,7 @@ object KyuubiConf {
final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf"
final val KYUUBI_HOME = "KYUUBI_HOME"
final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv"
final val KYUUBI_ENGINE_YARN_MODE_ENV_PREFIX = "kyuubi.engine.yarn.AMEnv"
final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf"
final val KYUUBI_KUBERNETES_CONF_PREFIX = "kyuubi.kubernetes"
final val USER_DEFAULTS_CONF_QUOTE = "___"
Expand Down Expand Up @@ -2685,6 +2687,85 @@ object KyuubiConf {
.stringConf
.createOptional

val ENGINE_HIVE_DEPLOY_MODE: ConfigEntry[String] =
buildConf("kyuubi.engine.hive.deploy.mode")
.doc("Configures the hive engine deploy mode, The value can be 'local', 'yarn'. " +
"In local mode, the engine operates on the same node as the KyuubiServer. " +
"In YARN mode, the engine runs within the Application Master (AM) container of YARN. ")
.version("1.9.0")
.stringConf
.transformToUpperCase
.checkValue(
mode => Set("LOCAL", "YARN").contains(mode),
"Invalid value for 'kyuubi.engine.hive.deploy.mode'. Valid values are 'local', 'yarn'.")
.createWithDefault(DeployMode.LOCAL.toString)

val ENGINE_DEPLOY_YARN_MODE_STAGING_DIR: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.stagingDir")
.doc("Staging directory used while submitting kyuubi engine to YARN, " +
"It should be a absolute path in HDFS.")
.version("1.9.0")
.stringConf
.createOptional

val ENGINE_DEPLOY_YARN_MODE_REPORT_INTERVAL: ConfigEntry[Long] =
buildConf("kyuubi.engine.yarn.report.interval")
.doc("Interval between reports of the current engine on yarn app status.")
.version("1.9.0")
.timeConf
.checkValue(t => t > 0, "must be positive integer")
.createWithDefault(Duration.ofSeconds(1).toMillis)

val ENGINE_DEPLOY_YARN_MODE_TAGS: OptionalConfigEntry[Seq[String]] =
buildConf("kyuubi.engine.yarn.tags")
.doc(s"kyuubi engine yarn tags when the engine deploy mode is YARN.")
.version("1.9.0")
.stringConf
.toSequence()
.createOptional

val ENGINE_DEPLOY_YARN_MODE_QUEUE: ConfigEntry[String] =
buildConf("kyuubi.engine.yarn.queue")
.doc(s"kyuubi engine yarn queue when the engine deploy mode is YARN.")
.version("1.9.0")
.stringConf
.createWithDefault("default")

val ENGINE_DEPLOY_YARN_MODE_PRIORITY: OptionalConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.priority")
.doc(s"kyuubi engine yarn priority when the engine deploy mode is YARN.")
.version("1.9.0")
.intConf
.createOptional

val ENGINE_DEPLOY_YARN_MODE_APP_NAME: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.app.name")
.doc(s"The YARN app name when the engine deploy mode is YARN.")
.version("1.9.0")
.stringConf
.createOptional

val ENGINE_DEPLOY_YARN_MODE_MEMORY: ConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.memory")
.doc(s"kyuubi engine container memory in mb when the engine deploy mode is YARN.")
.version("1.9.0")
.intConf
.createWithDefault(1024)

val ENGINE_DEPLOY_YARN_MODE_CORES: ConfigEntry[Int] =
buildConf("kyuubi.engine.yarn.cores")
.doc(s"kyuubi engine container core number when the engine deploy mode is YARN.")
.version("1.9.0")
.intConf
.createWithDefault(1)

val ENGINE_DEPLOY_YARN_MODE_JAVA_OPTIONS: OptionalConfigEntry[String] =
buildConf("kyuubi.engine.yarn.java.options")
.doc(s"The extra Java options for the AM when the engine deploy mode is YARN.")
.version("1.9.0")
.stringConf
.createOptional

val ENGINE_FLINK_MEMORY: ConfigEntry[String] =
buildConf("kyuubi.engine.flink.memory")
.doc("The heap memory for the Flink SQL engine. Only effective in yarn session mode.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.deploy

object DeployMode extends Enumeration {
type DeployMode = Value
val
/**
* In this mode, the engine will be launched locally.
*/
LOCAL,
/**
* In this mode, the engine will be launched on YARN.
*/
YARN,
/**
* In this mode, the engine will be launched on Kubernetes.
*/
KUBERNETES = Value
}
Loading

0 comments on commit 679aca5

Please sign in to comment.