Skip to content

Commit

Permalink
HiveEngine support run on YARN mode
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf committed Dec 16, 2023
1 parent a0fdead commit 5a36d6e
Show file tree
Hide file tree
Showing 21 changed files with 1,193 additions and 15 deletions.
9 changes: 9 additions & 0 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,16 @@ class HiveSQLEngine extends Serverable("HiveSQLEngine") {
override val frontendServices: Seq[AbstractFrontendService] =
Seq(new HiveTBinaryFrontendService(this))
private[hive] val engineStartTime = System.currentTimeMillis()
private[hive] var selfExist: Boolean = false

override def start(): Unit = {
super.start()
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker(() => stop())
backendService.sessionManager.startTerminatingChecker(() => {
selfExist = true
stop()
})
}

override protected def stopServer(): Unit = {
Expand Down Expand Up @@ -114,6 +118,13 @@ object HiveSQLEngine extends Logging {
currentEngine = Some(engine)
}

def runEngine(args: Array[String]): Unit = main(args)

def getCurrentEngine: HiveSQLEngine = {
require(currentEngine.isDefined, "HiveSQLEngine has not been initialized")
currentEngine.get
}

private def initLoggerEventHandler(conf: KyuubiConf): Unit = {
HiveEventHandlerRegister.registerEventLoggers(conf)
}
Expand All @@ -122,6 +133,7 @@ object HiveSQLEngine extends Logging {
SignalRegister.registerLogger(logger)
try {
Utils.fromCommandLineArgs(args, kyuubiConf)
kyuubiConf.loadFileDefaults()
val sessionUser = kyuubiConf.getOption(KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY)
val realUser = UserGroupInformation.getLoginUser

Expand All @@ -143,7 +155,8 @@ object HiveSQLEngine extends Logging {
}

} catch {
case t: Throwable => currentEngine match {
case t: Throwable =>
currentEngine match {
case Some(engine) =>
engine.stop()
val event = HiveEngineEvent(engine)
Expand All @@ -152,6 +165,7 @@ object HiveSQLEngine extends Logging {
case _ =>
error(s"Failed to start Hive SQL engine: ${t.getMessage}.", t)
}
throw t
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.hive.deploy

import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.hive.{HiveSQLEngine, HiveTBinaryFrontendService}
import org.apache.kyuubi.util.KyuubiHadoopUtils

object ApplicationMaster extends Logging {

private var amClient: AMRMClient[ContainerRequest] = _
private var yarnConf: YarnConfiguration = _

private var currentEngine: HiveSQLEngine = _
private val kyuubiConf = new KyuubiConf()

private var finalMsg: String = _

@volatile private var registered: Boolean = false
@volatile private var unregistered: Boolean = false
@volatile private var finalStatus = FinalApplicationStatus.UNDEFINED

def main(args: Array[String]): Unit = {
try {
kyuubiConf.loadFileDefaults()
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
Utils.addShutdownHook(() => {
if (!unregistered) {
if (currentEngine != null && currentEngine.selfExist) {
finalMsg = "Kyuubi Application Master is shutting down."
finalStatus = FinalApplicationStatus.SUCCEEDED
} else {
finalMsg = "Kyuubi Application Master is shutting down with error."
finalStatus = FinalApplicationStatus.FAILED
}
unregister(finalStatus, finalMsg)
}
})
runApplicationMaster(args)
} catch {
case t: Throwable =>
error("Error running ApplicationMaster", t)
finalStatus = FinalApplicationStatus.FAILED
finalMsg = t.getMessage
unregister(finalStatus, finalMsg)
if (currentEngine != null) {
currentEngine.stop()
}
}
}

def runApplicationMaster(args: Array[String]): Unit = {
amClient = AMRMClient.createAMRMClient()
amClient.init(yarnConf)
amClient.start()

runHiveEngine(args)

registryAM()
}

def runHiveEngine(args: Array[String]): Unit = {
HiveSQLEngine.runEngine(args)
currentEngine = HiveSQLEngine.getCurrentEngine
}

def initAmClient(): Unit = {
amClient = AMRMClient.createAMRMClient()
amClient.init(yarnConf)
amClient.start()
}

def registryAM(): Unit = {
val frontendService =
currentEngine.frontendServices.head.asInstanceOf[HiveTBinaryFrontendService]
val (host, port) = frontendService.listenerTBinaryFrontendAddress()
val trackingUrl = frontendService.connectionUrl
info("Registering the HiveSQLEngine ApplicationMaster")
synchronized {
amClient.registerApplicationMaster(host, port, trackingUrl)
registered = true
}
}

def unregister(status: FinalApplicationStatus, diagnostics: String): Unit = {
synchronized {
if (registered && !unregistered) {
info(s"Unregistering ApplicationMaster with $status" +
Option(diagnostics).map(msg => s" (diagnostics message: $msg)").getOrElse(""))
unregistered = true
amClient.unregisterApplicationMaster(status, diagnostics, "")
if (amClient != null) {
amClient.stop()
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.engine.hive.deploy

import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.deploy.EngineYarnModeSubmitter
import org.apache.kyuubi.util.KyuubiHadoopUtils

object HiveYarnModeSubmitter extends EngineYarnModeSubmitter {

def main(args: Array[String]): Unit = {
Utils.fromCommandLineArgs(args, kyuubiConf)
// Initialize the engine submitter.
init()
// Submit the engine application to YARN.
submitApplication()
}

private def init(): Unit = {
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(kyuubiConf)
hadoopConf = KyuubiHadoopUtils.newHadoopConf(kyuubiConf)
}

override var engineType: String = "hive"

override def amClass(): String = "org.apache.kyuubi.engine.hive.deploy.ApplicationMaster"
}
31 changes: 31 additions & 0 deletions integration-tests/kyuubi-hive-it/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,37 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>

<!-- YARN -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client-minicluster</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk15on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.activation</groupId>
<artifactId>jakarta.activation-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>jakarta.xml.bind</groupId>
<artifactId>jakarta.xml.bind-api</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kyuubi.it.hive.operation

import org.apache.kyuubi.{HiveEngineTests, Utils, WithKyuubiServerAndHadoopMiniCluster}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_IDLE_TIMEOUT, ENGINE_TYPE, KYUUBI_ENGINE_ENV_PREFIX, KYUUBI_HOME}
import org.apache.kyuubi.engine.deploy.YarnMode

class KyuubiOperationHiveEngineYarnModeSuite extends HiveEngineTests
with WithKyuubiServerAndHadoopMiniCluster {

override protected val conf: KyuubiConf = {
val metastore = Utils.createTempDir(prefix = getClass.getSimpleName)
metastore.toFile.delete()
KyuubiConf()
.set(s"$KYUUBI_ENGINE_ENV_PREFIX.$KYUUBI_HOME", kyuubiHome)
.set(ENGINE_TYPE, "HIVE_SQL")
.set(KyuubiConf.ENGINE_DEPLOY_MODE, YarnMode.name)
// increase this to 30s as hive session state and metastore client is slow initializing
.setIfMissing(ENGINE_IDLE_TIMEOUT, 30000L)
.set("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastore;create=true")
}

override def beforeAll(): Unit = {
super.beforeAll()
conf
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MEMORY, Math.min(getYarnMaximumAllocationMb, 1024))
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_MAX_APP_ATTEMPTS, 1)
.set(KyuubiConf.ENGINE_DEPLOY_YARN_MODE_CORES, 1)
}

override protected def jdbcUrl: String = getJdbcUrl
}
11 changes: 11 additions & 0 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ object Utils extends Logging {
dir
}

/**
* List the files recursively in a directory.
*/
def listFilesRecursive(file: File): Seq[File] = {
if (!file.isDirectory) {
file :: Nil
} else {
file.listFiles().flatMap(listFilesRecursive)
}
}

/**
* Copies bytes from an InputStream source to a newly created temporary file
* created in the directory destination. The temporary file will be created
Expand Down
Loading

0 comments on commit 5a36d6e

Please sign in to comment.