Skip to content

Commit

Permalink
config option
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenliang123 committed Nov 28, 2023
1 parent b920836 commit cbb4b41
Show file tree
Hide file tree
Showing 17 changed files with 192 additions and 124 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.net.URI
import java.nio.file.{Files, Paths}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable

import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
Expand All @@ -32,6 +32,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, KyuubiFunSuite, SCALA
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
import org.apache.kyuubi.util.command.CommandLineUtils._
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT, ZK_CLIENT_PORT_ADDRESS}

Expand Down Expand Up @@ -111,7 +112,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
processBuilder.environment().putAll(envs.asJava)

conf.set(ENGINE_FLINK_EXTRA_CLASSPATH, udfJar.getAbsolutePath)
val command = new ArrayBuffer[String]()
val command = new mutable.ListBuffer[String]()

command += envs("JAVA_EXEC")

Expand All @@ -122,8 +123,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
command += javaOptions.get
}

command += "-cp"
val classpathEntries = new java.util.LinkedHashSet[String]
val classpathEntries = new mutable.LinkedHashSet[String]
// flink engine runtime jar
mainResource(envs).foreach(classpathEntries.add)
// flink sql jars
Expand Down Expand Up @@ -163,13 +163,11 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
classpathEntries.add(s"$devHadoopJars${File.separator}*")
}
}
command += classpathEntries.asScala.mkString(File.pathSeparator)
command ++= genClasspathOption(classpathEntries)

command += "org.apache.kyuubi.engine.flink.FlinkSQLEngine"

conf.getAll.foreach { case (k, v) =>
command += "--conf"
command += s"$k=$v"
}
command ++= confKeyValues(conf.getAll)

processBuilder.command(command.toList.asJava)
processBuilder.redirectOutput(Redirect.INHERIT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, SCALA_COMPILE_VERSION,
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, KYUUBI_HOME}
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
import org.apache.kyuubi.util.command.CommandLineUtils._
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT, ZK_CLIENT_PORT_ADDRESS}

Expand Down Expand Up @@ -179,10 +180,7 @@ trait WithFlinkSQLEngineOnYarn extends KyuubiFunSuite with WithFlinkTestResource
conf.set(k, v)
}

for ((k, v) <- conf.getAll) {
command += "--conf"
command += s"$k=$v"
}
command ++= confKeyValues(conf.getAll)

processBuilder.command(command.toList.asJava)
processBuilder.redirectOutput(Redirect.INHERIT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.util.command.CommandLineUtils._

class HiveCatalogDatabaseOperationSuite extends HiveJDBCTestHelper {

override def beforeAll(): Unit = {
val metastore = Utils.createTempDir(prefix = getClass.getSimpleName)
metastore.toFile.delete()
val args = Array(
"--conf",
CONF,
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true",
"--conf",
CONF,
s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true")
HiveSQLEngine.main(args)
super.beforeAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.kyuubi.{HiveEngineTests, KYUUBI_VERSION, Utils}
import org.apache.kyuubi.engine.hive.HiveSQLEngine
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.util.command.CommandLineUtils._

class HiveOperationSuite extends HiveEngineTests {

override def beforeAll(): Unit = {
val metastore = Utils.createTempDir(prefix = getClass.getSimpleName)
metastore.toFile.delete()
val args = Array(
"--conf",
CONF,
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true")
HiveSQLEngine.main(args)
super.beforeAll()
Expand Down
7 changes: 4 additions & 3 deletions kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.hadoop.util.ShutdownHookManager

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.internal.Tests.IS_TESTING
import org.apache.kyuubi.util.command.CommandLineUtils._

object Utils extends Logging {

Expand Down Expand Up @@ -325,7 +326,7 @@ object Utils extends Logging {
require(args.length % 2 == 0, s"Illegal size of arguments.")
for (i <- args.indices by 2) {
require(
args(i) == "--conf",
args(i) == CONF,
s"Unrecognized main arguments prefix ${args(i)}," +
s"the argument format is '--conf k=v'.")

Expand All @@ -347,9 +348,9 @@ object Utils extends Logging {
case PATTERN_FOR_KEY_VALUE_ARG(key, value) if nextKV =>
val (_, newValue) = redact(redactionPattern, Seq((key, value))).head
nextKV = false
s"$key=$newValue"
genKeyValuePair(key, newValue)

case cmd if cmd == "--conf" =>
case cmd if cmd == CONF =>
nextKV = true
cmd

Expand Down
13 changes: 5 additions & 8 deletions kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.hadoop.security.UserGroupInformation

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.SERVER_SECRET_REDACTION_PATTERN
import org.apache.kyuubi.util.command.CommandLineUtils._

class UtilsSuite extends KyuubiFunSuite {

Expand Down Expand Up @@ -158,14 +159,10 @@ class UtilsSuite extends KyuubiFunSuite {

val buffer = new ArrayBuffer[String]()
buffer += "main"
buffer += "--conf"
buffer += "kyuubi.my.password=sensitive_value"
buffer += "--conf"
buffer += "kyuubi.regular.property1=regular_value"
buffer += "--conf"
buffer += "kyuubi.my.secret=sensitive_value"
buffer += "--conf"
buffer += "kyuubi.regular.property2=regular_value"
buffer ++= confKeyValue("kyuubi.my.password", "sensitive_value")
buffer ++= confKeyValue("kyuubi.regular.property1", "regular_value")
buffer ++= confKeyValue("kyuubi.my.secret", "sensitive_value")
buffer ++= confKeyValue("kyuubi.regular.property2", "regular_value")

val commands = buffer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package org.apache.kyuubi.engine.chat

import java.io.File
import java.nio.file.{Files, Paths}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable

import com.google.common.annotations.VisibleForTesting

Expand All @@ -33,6 +31,7 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.command.CommandLineUtils._

class ChatProcessBuilder(
override val proxyUser: String,
Expand Down Expand Up @@ -60,7 +59,7 @@ class ChatProcessBuilder(
override protected def mainClass: String = "org.apache.kyuubi.engine.chat.ChatEngine"

override protected val commands: Iterable[String] = {
val buffer = new ArrayBuffer[String]()
val buffer = new mutable.ListBuffer[String]()
buffer += executable

val memory = conf.get(ENGINE_CHAT_MEMORY)
Expand All @@ -69,8 +68,7 @@ class ChatProcessBuilder(
val javaOptions = conf.get(ENGINE_CHAT_JAVA_OPTIONS)
javaOptions.foreach(buffer += _)

buffer += "-cp"
val classpathEntries = new util.LinkedHashSet[String]
val classpathEntries = new mutable.LinkedHashSet[String]
mainResource.foreach(classpathEntries.add)
mainResource.foreach { path =>
val parent = Paths.get(path).getParent
Expand All @@ -88,16 +86,14 @@ class ChatProcessBuilder(

val extraCp = conf.get(ENGINE_CHAT_EXTRA_CLASSPATH)
extraCp.foreach(classpathEntries.add)
buffer += classpathEntries.asScala.mkString(File.pathSeparator)
buffer ++= genClasspathOption(classpathEntries)

buffer += mainClass

buffer += "--conf"
buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)

buffer ++= confKeyValues(conf.getAll)

conf.getAll.foreach { case (k, v) =>
buffer += "--conf"
buffer += s"$k=$v"
}
buffer
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ package org.apache.kyuubi.engine.flink
import java.io.{File, FilenameFilter}
import java.nio.file.{Files, Paths}

import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.mutable

import com.google.common.annotations.VisibleForTesting

Expand All @@ -32,6 +31,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.command.CommandLineUtils._

/**
* A builder to build flink sql engine progress.
Expand Down Expand Up @@ -84,11 +84,11 @@ class FlinkProcessBuilder(
// flink.execution.target are required in Kyuubi conf currently
executionTarget match {
case Some("yarn-application") =>
val buffer = new ArrayBuffer[String]()
val buffer = new mutable.ListBuffer[String]()
buffer += flinkExecutable
buffer += "run-application"

val flinkExtraJars = new ListBuffer[String]
val flinkExtraJars = new mutable.ListBuffer[String]
// locate flink sql jars
val flinkSqlJars = Paths.get(flinkHome)
.resolve("opt")
Expand Down Expand Up @@ -134,18 +134,14 @@ class FlinkProcessBuilder(
buffer += s"$mainClass"
buffer += s"${mainResource.get}"

buffer += "--conf"
buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
conf.getAll.foreach { case (k, v) =>
if (k.startsWith("kyuubi.")) {
buffer += "--conf"
buffer += s"$k=$v"
}
}
buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)

buffer ++= confKeyValues(conf.getAll.filter(_._1.startsWith("kyuubi.")))

buffer

case _ =>
val buffer = new ArrayBuffer[String]()
val buffer = new mutable.ListBuffer[String]()
buffer += executable

val memory = conf.get(ENGINE_FLINK_MEMORY)
Expand All @@ -155,8 +151,7 @@ class FlinkProcessBuilder(
buffer += javaOptions.get
}

buffer += "-cp"
val classpathEntries = new java.util.LinkedHashSet[String]
val classpathEntries = new mutable.LinkedHashSet[String]
// flink engine runtime jar
mainResource.foreach(classpathEntries.add)
// flink sql jars
Expand Down Expand Up @@ -200,16 +195,14 @@ class FlinkProcessBuilder(
classpathEntries.add(s"$devHadoopJars${File.separator}*")
}
}
buffer += classpathEntries.asScala.mkString(File.pathSeparator)
buffer ++= genClasspathOption(classpathEntries)

buffer += mainClass

buffer += "--conf"
buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)

buffer ++= confKeyValues(conf.getAll)

conf.getAll.foreach { case (k, v) =>
buffer += "--conf"
buffer += s"$k=$v"
}
buffer
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@ package org.apache.kyuubi.engine.hive

import java.io.File
import java.nio.file.{Files, Paths}
import java.util

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable

import com.google.common.annotations.VisibleForTesting

Expand All @@ -33,6 +31,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, KYUUBI_SES
import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder}
import org.apache.kyuubi.engine.hive.HiveProcessBuilder._
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.command.CommandLineUtils._

class HiveProcessBuilder(
override val proxyUser: String,
Expand All @@ -54,7 +53,7 @@ class HiveProcessBuilder(

override protected val commands: Iterable[String] = {
KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf)
val buffer = new ArrayBuffer[String]()
val buffer = new mutable.ListBuffer[String]()
buffer += executable

val memory = conf.get(ENGINE_HIVE_MEMORY)
Expand All @@ -65,8 +64,7 @@ class HiveProcessBuilder(
}
// -Xmx5g
// java options
buffer += "-cp"
val classpathEntries = new util.LinkedHashSet[String]
val classpathEntries = new mutable.LinkedHashSet[String]
// hive engine runtime jar
mainResource.foreach(classpathEntries.add)
// classpath contains hive configurations, default to hive.home/conf
Expand Down Expand Up @@ -101,18 +99,14 @@ class HiveProcessBuilder(
classpathEntries.add(s"$devHadoopJars${File.separator}*")
}
}
buffer += classpathEntries.asScala.mkString(File.pathSeparator)
buffer ++= genClasspathOption(classpathEntries)
buffer += mainClass

buffer += "--conf"
buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser"
buffer += "--conf"
buffer += s"$KYUUBI_ENGINE_ID=$engineRefId"
buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser)
buffer ++= confKeyValue(KYUUBI_ENGINE_ID, engineRefId)

buffer ++= confKeyValues(conf.getAll)

for ((k, v) <- conf.getAll) {
buffer += "--conf"
buffer += s"$k=$v"
}
buffer
}

Expand Down
Loading

0 comments on commit cbb4b41

Please sign in to comment.