From 13af6aeb708c16a5db85c02876c6a97122d91eed Mon Sep 17 00:00:00 2001 From: liangbowen Date: Sat, 2 Dec 2023 01:25:02 +0800 Subject: [PATCH] [KYUUBI #5767] Extract common utils for assembling key value pairs with config option prefix in processbuilder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— As described. ## Describe Your Solution ๐Ÿ”ง - Focus on key points for configuration option assembling, instead of repeating manually command configs assembling - Avoid using magic string value "--conf" / "-cp" in each processbuilder - Extract common utils for assembling key value pairs with config option prefix in processbuilder - Use `mutable.ListBuffer` for command assembling - Extract common method for redact config value by key names - NO changes in expected string value for processbuilder command assertions in test suites ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: No behavior changes. #### Behavior With This Pull Request :tada: No behavior changes. #### Related Unit Tests Added `CommandUtilsSuite`. --- # Checklists ## ๐Ÿ“ Author Self Checklist - [x] My code follows the [style guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html) of this project - [x] I have performed a self-review - [x] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) ## ๐Ÿ“ Committer Pre-Merge Checklist - [ ] Pull request title is okay. - [ ] No license issues. - [ ] Milestone correctly set? - [ ] Test coverage is ok - [ ] Assignees are selected. - [ ] Minimum number of approvals - [ ] No changes are requested **Be nice. Be informative.** Closes #5767 from bowenliang123/config-option. Closes #5767 b043888d6 [liangbowen] use ++ for command configs 16a3c27d1 [liangbowen] .key bc285004e [liangbowen] use raw literal in test suites ab018cf2d [Bowen Liang] config option Lead-authored-by: liangbowen Co-authored-by: Bowen Liang Signed-off-by: liangbowen --- .../flink/WithFlinkSQLEngineLocal.scala | 16 ++-- .../flink/WithFlinkSQLEngineOnYarn.scala | 6 +- .../HiveCatalogDatabaseOperationSuite.scala | 5 +- .../hive/operation/HiveOperationSuite.scala | 3 +- .../main/scala/org/apache/kyuubi/Utils.scala | 38 +++++----- .../scala/org/apache/kyuubi/UtilsSuite.scala | 29 ++++--- .../engine/chat/ChatProcessBuilder.scala | 31 +++----- .../engine/flink/FlinkProcessBuilder.scala | 37 ++++----- .../engine/hive/HiveProcessBuilder.scala | 24 +++--- .../engine/jdbc/JdbcProcessBuilder.scala | 31 +++----- .../spark/SparkBatchProcessBuilder.scala | 13 ++-- .../engine/spark/SparkProcessBuilder.scala | 18 ++--- .../engine/trino/TrinoProcessBuilder.scala | 38 ++++------ .../spark/SparkProcessBuilderSuite.scala | 1 + .../util/command/CommandLineUtils.scala | 75 +++++++++++++++++++ .../util/command/CommandUtilsSuite.scala | 50 +++++++++++++ 16 files changed, 248 insertions(+), 167 deletions(-) create mode 100644 kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/command/CommandLineUtils.scala create mode 100644 kyuubi-util-scala/src/test/scala/org/apache/kyuubi/util/command/CommandUtilsSuite.scala diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala index ccaefb496b0..1c4adce189d 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala @@ -23,7 +23,7 @@ import java.net.URI import java.nio.file.{Files, Paths} import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import org.apache.flink.configuration.{Configuration, RestOptions} import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration} @@ -32,6 +32,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, KyuubiFunSuite, SCALA import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES +import org.apache.kyuubi.util.command.CommandLineUtils._ import org.apache.kyuubi.zookeeper.EmbeddedZookeeper import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT, ZK_CLIENT_PORT_ADDRESS} @@ -111,7 +112,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources processBuilder.environment().putAll(envs.asJava) conf.set(ENGINE_FLINK_EXTRA_CLASSPATH, udfJar.getAbsolutePath) - val command = new ArrayBuffer[String]() + val command = new mutable.ListBuffer[String]() command += envs("JAVA_EXEC") @@ -122,8 +123,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources command += javaOptions.get } - command += "-cp" - val classpathEntries = new java.util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] // flink engine runtime jar mainResource(envs).foreach(classpathEntries.add) // flink sql jars @@ -163,13 +163,11 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources classpathEntries.add(s"$devHadoopJars${File.separator}*") } } - command += classpathEntries.asScala.mkString(File.pathSeparator) + command ++= genClasspathOption(classpathEntries) + command += "org.apache.kyuubi.engine.flink.FlinkSQLEngine" - conf.getAll.foreach { case (k, v) => - command += "--conf" - command += s"$k=$v" - } + command ++= confKeyValues(conf.getAll) processBuilder.command(command.toList.asJava) processBuilder.redirectOutput(Redirect.INHERIT) diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala index 49fb947a3ec..730a2646bed 100644 --- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala +++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala @@ -34,6 +34,7 @@ import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiFunSuite, SCALA_COMPILE_VERSION, import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, KYUUBI_HOME} import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES +import org.apache.kyuubi.util.command.CommandLineUtils._ import org.apache.kyuubi.zookeeper.EmbeddedZookeeper import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT, ZK_CLIENT_PORT_ADDRESS} @@ -179,10 +180,7 @@ trait WithFlinkSQLEngineOnYarn extends KyuubiFunSuite with WithFlinkTestResource conf.set(k, v) } - for ((k, v) <- conf.getAll) { - command += "--conf" - command += s"$k=$v" - } + command ++= confKeyValues(conf.getAll) processBuilder.command(command.toList.asJava) processBuilder.redirectOutput(Redirect.INHERIT) diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala index a63de20c7de..7db2d7fdca3 100644 --- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala +++ b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveCatalogDatabaseOperationSuite.scala @@ -23,6 +23,7 @@ import org.apache.kyuubi.Utils import org.apache.kyuubi.config.KyuubiConf.ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED import org.apache.kyuubi.engine.hive.HiveSQLEngine import org.apache.kyuubi.operation.HiveJDBCTestHelper +import org.apache.kyuubi.util.command.CommandLineUtils._ class HiveCatalogDatabaseOperationSuite extends HiveJDBCTestHelper { @@ -30,9 +31,9 @@ class HiveCatalogDatabaseOperationSuite extends HiveJDBCTestHelper { val metastore = Utils.createTempDir(prefix = getClass.getSimpleName) metastore.toFile.delete() val args = Array( - "--conf", + CONF, s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true", - "--conf", + CONF, s"${ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key}=true") HiveSQLEngine.main(args) super.beforeAll() diff --git a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala index eb10e0b4144..53cc9457ae1 100644 --- a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala +++ b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala @@ -22,6 +22,7 @@ import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.kyuubi.{HiveEngineTests, KYUUBI_VERSION, Utils} import org.apache.kyuubi.engine.hive.HiveSQLEngine import org.apache.kyuubi.jdbc.hive.KyuubiStatement +import org.apache.kyuubi.util.command.CommandLineUtils._ class HiveOperationSuite extends HiveEngineTests { @@ -29,7 +30,7 @@ class HiveOperationSuite extends HiveEngineTests { val metastore = Utils.createTempDir(prefix = getClass.getSimpleName) metastore.toFile.delete() val args = Array( - "--conf", + CONF, s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$metastore;create=true") HiveSQLEngine.main(args) super.beforeAll() diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala index 0144dadbb86..896ed9df29d 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/Utils.scala @@ -40,6 +40,7 @@ import org.apache.hadoop.util.ShutdownHookManager import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.internal.Tests.IS_TESTING +import org.apache.kyuubi.util.command.CommandLineUtils._ object Utils extends Logging { @@ -325,7 +326,7 @@ object Utils extends Logging { require(args.length % 2 == 0, s"Illegal size of arguments.") for (i <- args.indices by 2) { require( - args(i) == "--conf", + args(i) == CONF, s"Unrecognized main arguments prefix ${args(i)}," + s"the argument format is '--conf k=v'.") @@ -336,25 +337,24 @@ object Utils extends Logging { } } - val REDACTION_REPLACEMENT_TEXT = "*********(redacted)" - - private val PATTERN_FOR_KEY_VALUE_ARG = "(.+?)=(.+)".r - def redactCommandLineArgs(conf: KyuubiConf, commands: Iterable[String]): Iterable[String] = { - val redactionPattern = conf.get(SERVER_SECRET_REDACTION_PATTERN) - var nextKV = false - commands.map { - case PATTERN_FOR_KEY_VALUE_ARG(key, value) if nextKV => - val (_, newValue) = redact(redactionPattern, Seq((key, value))).head - nextKV = false - s"$key=$newValue" - - case cmd if cmd == "--conf" => - nextKV = true - cmd - - case cmd => - cmd + conf.get(SERVER_SECRET_REDACTION_PATTERN) match { + case Some(redactionPattern) => + var nextKV = false + commands.map { + case PATTERN_FOR_KEY_VALUE_ARG(key, value) if nextKV => + val (_, newValue) = redact(redactionPattern, Seq((key, value))).head + nextKV = false + genKeyValuePair(key, newValue) + + case cmd if cmd == CONF => + nextKV = true + cmd + + case cmd => + cmd + } + case _ => commands } } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala index 97d9cd1b552..60bdd3d22a6 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/UtilsSuite.scala @@ -23,12 +23,13 @@ import java.nio.file.{Files, Paths} import java.security.PrivilegedExceptionAction import java.util.Properties -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import org.apache.hadoop.security.UserGroupInformation import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.SERVER_SECRET_REDACTION_PATTERN +import org.apache.kyuubi.util.command.CommandLineUtils._ class UtilsSuite extends KyuubiFunSuite { @@ -156,30 +157,26 @@ class UtilsSuite extends KyuubiFunSuite { val conf = new KyuubiConf() conf.set(SERVER_SECRET_REDACTION_PATTERN, "(?i)secret|password".r) - val buffer = new ArrayBuffer[String]() + val buffer = new mutable.ListBuffer[String]() buffer += "main" - buffer += "--conf" - buffer += "kyuubi.my.password=sensitive_value" - buffer += "--conf" - buffer += "kyuubi.regular.property1=regular_value" - buffer += "--conf" - buffer += "kyuubi.my.secret=sensitive_value" - buffer += "--conf" - buffer += "kyuubi.regular.property2=regular_value" + buffer ++= confKeyValue("kyuubi.my.password", "sensitive_value") + buffer ++= confKeyValue("kyuubi.regular.property1", "regular_value") + buffer ++= confKeyValue("kyuubi.my.secret", "sensitive_value") + buffer ++= confKeyValue("kyuubi.regular.property2", "regular_value") val commands = buffer // Redact sensitive information val redactedCmdArgs = Utils.redactCommandLineArgs(conf, commands) - val expectBuffer = new ArrayBuffer[String]() + val expectBuffer = new mutable.ListBuffer[String]() expectBuffer += "main" expectBuffer += "--conf" - expectBuffer += "kyuubi.my.password=" + Utils.REDACTION_REPLACEMENT_TEXT + expectBuffer += "kyuubi.my.password=" + REDACTION_REPLACEMENT_TEXT expectBuffer += "--conf" expectBuffer += "kyuubi.regular.property1=regular_value" expectBuffer += "--conf" - expectBuffer += "kyuubi.my.secret=" + Utils.REDACTION_REPLACEMENT_TEXT + expectBuffer += "kyuubi.my.secret=" + REDACTION_REPLACEMENT_TEXT expectBuffer += "--conf" expectBuffer += "kyuubi.regular.property2=regular_value" @@ -189,11 +186,11 @@ class UtilsSuite extends KyuubiFunSuite { test("redact sensitive information") { val secretKeys = Some("my.password".r) assert(Utils.redact(secretKeys, Seq(("kyuubi.my.password", "12345"))) === - Seq(("kyuubi.my.password", Utils.REDACTION_REPLACEMENT_TEXT))) + Seq(("kyuubi.my.password", REDACTION_REPLACEMENT_TEXT))) assert(Utils.redact(secretKeys, Seq(("anything", "kyuubi.my.password=12345"))) === - Seq(("anything", Utils.REDACTION_REPLACEMENT_TEXT))) + Seq(("anything", REDACTION_REPLACEMENT_TEXT))) assert(Utils.redact(secretKeys, Seq((999, "kyuubi.my.password=12345"))) === - Seq((999, Utils.REDACTION_REPLACEMENT_TEXT))) + Seq((999, REDACTION_REPLACEMENT_TEXT))) // Do not redact when value type is not string assert(Utils.redact(secretKeys, Seq(("my.password", 12345))) === Seq(("my.password", 12345))) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala index ade6026b18f..ddf88e14924 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/chat/ChatProcessBuilder.scala @@ -19,20 +19,18 @@ package org.apache.kyuubi.engine.chat import java.io.File import java.nio.file.{Files, Paths} -import java.util -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import com.google.common.annotations.VisibleForTesting import org.apache.kyuubi.{Logging, SCALA_COMPILE_VERSION, Utils} -import org.apache.kyuubi.Utils.REDACTION_REPLACEMENT_TEXT import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.ProcBuilder import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ class ChatProcessBuilder( override val proxyUser: String, @@ -60,7 +58,7 @@ class ChatProcessBuilder( override protected def mainClass: String = "org.apache.kyuubi.engine.chat.ChatEngine" override protected val commands: Iterable[String] = { - val buffer = new ArrayBuffer[String]() + val buffer = new mutable.ListBuffer[String]() buffer += executable val memory = conf.get(ENGINE_CHAT_MEMORY) @@ -69,8 +67,7 @@ class ChatProcessBuilder( val javaOptions = conf.get(ENGINE_CHAT_JAVA_OPTIONS) javaOptions.foreach(buffer += _) - buffer += "-cp" - val classpathEntries = new util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] mainResource.foreach(classpathEntries.add) mainResource.foreach { path => val parent = Paths.get(path).getParent @@ -88,16 +85,14 @@ class ChatProcessBuilder( val extraCp = conf.get(ENGINE_CHAT_EXTRA_CLASSPATH) extraCp.foreach(classpathEntries.add) - buffer += classpathEntries.asScala.mkString(File.pathSeparator) + buffer ++= genClasspathOption(classpathEntries) + buffer += mainClass - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + + buffer ++= confKeyValues(conf.getAll) - conf.getAll.foreach { case (k, v) => - buffer += "--conf" - buffer += s"$k=$v" - } buffer } @@ -105,11 +100,9 @@ class ChatProcessBuilder( if (commands == null) { super.toString } else { - Utils.redactCommandLineArgs(conf, commands).map { - case arg if arg.contains(ENGINE_CHAT_GPT_API_KEY.key) => - s"${ENGINE_CHAT_GPT_API_KEY.key}=$REDACTION_REPLACEMENT_TEXT" - case arg => arg - }.map { + redactConfValues( + Utils.redactCommandLineArgs(conf, commands), + Set(ENGINE_CHAT_GPT_API_KEY.key)).map { case arg if arg.startsWith("-") || arg == mainClass => s"\\\n\t$arg" case arg => arg }.mkString(" ") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index 52364f1894c..a1e8cdcd38b 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -20,8 +20,7 @@ package org.apache.kyuubi.engine.flink import java.io.{File, FilenameFilter} import java.nio.file.{Files, Paths} -import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, ListBuffer} +import scala.collection.mutable import com.google.common.annotations.VisibleForTesting @@ -32,6 +31,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder} import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._ import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ /** * A builder to build flink sql engine progress. @@ -84,11 +84,11 @@ class FlinkProcessBuilder( // flink.execution.target are required in Kyuubi conf currently executionTarget match { case Some("yarn-application") => - val buffer = new ArrayBuffer[String]() + val buffer = new mutable.ListBuffer[String]() buffer += flinkExecutable buffer += "run-application" - val flinkExtraJars = new ListBuffer[String] + val flinkExtraJars = new mutable.ListBuffer[String] // locate flink sql jars val flinkSqlJars = Paths.get(flinkHome) .resolve("opt") @@ -134,18 +134,14 @@ class FlinkProcessBuilder( buffer += s"$mainClass" buffer += s"${mainResource.get}" - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" - conf.getAll.foreach { case (k, v) => - if (k.startsWith("kyuubi.")) { - buffer += "--conf" - buffer += s"$k=$v" - } - } + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + + buffer ++= confKeyValues(conf.getAll.filter(_._1.startsWith("kyuubi."))) + buffer case _ => - val buffer = new ArrayBuffer[String]() + val buffer = new mutable.ListBuffer[String]() buffer += executable val memory = conf.get(ENGINE_FLINK_MEMORY) @@ -155,8 +151,7 @@ class FlinkProcessBuilder( buffer += javaOptions.get } - buffer += "-cp" - val classpathEntries = new java.util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] // flink engine runtime jar mainResource.foreach(classpathEntries.add) // flink sql jars @@ -200,16 +195,14 @@ class FlinkProcessBuilder( classpathEntries.add(s"$devHadoopJars${File.separator}*") } } - buffer += classpathEntries.asScala.mkString(File.pathSeparator) + buffer ++= genClasspathOption(classpathEntries) + buffer += mainClass - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + + buffer ++= confKeyValues(conf.getAll) - conf.getAll.foreach { case (k, v) => - buffer += "--conf" - buffer += s"$k=$v" - } buffer } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala index d7e2709119f..d8e4454b610 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/hive/HiveProcessBuilder.scala @@ -19,10 +19,8 @@ package org.apache.kyuubi.engine.hive import java.io.File import java.nio.file.{Files, Paths} -import java.util -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import com.google.common.annotations.VisibleForTesting @@ -33,6 +31,7 @@ import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_ENGINE_ID, KYUUBI_SES import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder} import org.apache.kyuubi.engine.hive.HiveProcessBuilder._ import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ class HiveProcessBuilder( override val proxyUser: String, @@ -54,7 +53,7 @@ class HiveProcessBuilder( override protected val commands: Iterable[String] = { KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf) - val buffer = new ArrayBuffer[String]() + val buffer = new mutable.ListBuffer[String]() buffer += executable val memory = conf.get(ENGINE_HIVE_MEMORY) @@ -65,8 +64,7 @@ class HiveProcessBuilder( } // -Xmx5g // java options - buffer += "-cp" - val classpathEntries = new util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] // hive engine runtime jar mainResource.foreach(classpathEntries.add) // classpath contains hive configurations, default to hive.home/conf @@ -101,18 +99,14 @@ class HiveProcessBuilder( classpathEntries.add(s"$devHadoopJars${File.separator}*") } } - buffer += classpathEntries.asScala.mkString(File.pathSeparator) + buffer ++= genClasspathOption(classpathEntries) buffer += mainClass - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" - buffer += "--conf" - buffer += s"$KYUUBI_ENGINE_ID=$engineRefId" + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + buffer ++= confKeyValue(KYUUBI_ENGINE_ID, engineRefId) + + buffer ++= confKeyValues(conf.getAll) - for ((k, v) <- conf.getAll) { - buffer += "--conf" - buffer += s"$k=$v" - } buffer } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala index 5b52dbbb471..2d08d510199 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/jdbc/JdbcProcessBuilder.scala @@ -19,20 +19,18 @@ package org.apache.kyuubi.engine.jdbc import java.io.File import java.nio.file.Paths -import java.util -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import com.google.common.annotations.VisibleForTesting import org.apache.kyuubi.{Logging, SCALA_COMPILE_VERSION, Utils} -import org.apache.kyuubi.Utils.REDACTION_REPLACEMENT_TEXT import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_JDBC_CONNECTION_PASSWORD, ENGINE_JDBC_CONNECTION_URL, ENGINE_JDBC_EXTRA_CLASSPATH, ENGINE_JDBC_JAVA_OPTIONS, ENGINE_JDBC_MEMORY} import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.ProcBuilder import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ class JdbcProcessBuilder( override val proxyUser: String, @@ -63,7 +61,7 @@ class JdbcProcessBuilder( require( conf.get(ENGINE_JDBC_CONNECTION_URL).nonEmpty, s"Jdbc server url can not be null! Please set ${ENGINE_JDBC_CONNECTION_URL.key}") - val buffer = new ArrayBuffer[String]() + val buffer = new mutable.ListBuffer[String]() buffer += executable val memory = conf.get(ENGINE_JDBC_MEMORY) @@ -72,8 +70,7 @@ class JdbcProcessBuilder( val javaOptions = conf.get(ENGINE_JDBC_JAVA_OPTIONS) javaOptions.foreach(buffer += _) - buffer += "-cp" - val classpathEntries = new util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] mainResource.foreach(classpathEntries.add) mainResource.foreach { path => val parent = Paths.get(path).getParent @@ -91,16 +88,14 @@ class JdbcProcessBuilder( val extraCp = conf.get(ENGINE_JDBC_EXTRA_CLASSPATH) extraCp.foreach(classpathEntries.add) - buffer += classpathEntries.asScala.mkString(File.pathSeparator) + buffer ++= genClasspathOption(classpathEntries) + buffer += mainClass - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + + buffer ++= confKeyValues(conf.getAll) - for ((k, v) <- conf.getAll) { - buffer += "--conf" - buffer += s"$k=$v" - } buffer } @@ -108,11 +103,9 @@ class JdbcProcessBuilder( if (commands == null) { super.toString } else { - Utils.redactCommandLineArgs(conf, commands).map { - case arg if arg.contains(ENGINE_JDBC_CONNECTION_PASSWORD.key) => - s"${ENGINE_JDBC_CONNECTION_PASSWORD.key}=$REDACTION_REPLACEMENT_TEXT" - case arg => arg - }.map { + redactConfValues( + Utils.redactCommandLineArgs(conf, commands), + Set(ENGINE_JDBC_CONNECTION_PASSWORD.key)).map { case arg if arg.startsWith("-") => s"\\\n\t$arg" case arg => arg }.mkString(" ") diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala index 7d69b90d5db..0167f95516d 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkBatchProcessBuilder.scala @@ -17,11 +17,12 @@ package org.apache.kyuubi.engine.spark -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.engine.KyuubiApplicationManager import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ class SparkBatchProcessBuilder( override val proxyUser: String, @@ -37,7 +38,7 @@ class SparkBatchProcessBuilder( import SparkProcessBuilder._ override protected lazy val commands: Iterable[String] = { - val buffer = new ArrayBuffer[String]() + val buffer = new mutable.ListBuffer[String]() buffer += executable Option(mainClass).foreach { cla => buffer += CLASS @@ -51,13 +52,11 @@ class SparkBatchProcessBuilder( // tag batch application KyuubiApplicationManager.tagApplication(batchId, "spark", clusterManager(), batchKyuubiConf) - (batchKyuubiConf.getAll ++ + val allConfigs = batchKyuubiConf.getAll ++ sparkAppNameConf() ++ engineLogPathConf() ++ - appendPodNameConf(batchConf)).foreach { case (k, v) => - buffer += CONF - buffer += s"${convertConfigKey(k)}=$v" - } + appendPodNameConf(batchConf) + buffer ++= confKeyValues(allConfigs) setupKerberos(buffer) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala index 57d5f73357d..d147e529031 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala @@ -22,7 +22,6 @@ import java.nio.file.Paths import java.util.Locale import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import com.google.common.annotations.VisibleForTesting import org.apache.commons.lang3.StringUtils @@ -38,6 +37,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf import org.apache.kyuubi.ha.client.AuthTypes import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.util.{KubernetesUtils, Validator} +import org.apache.kyuubi.util.command.CommandLineUtils._ class SparkProcessBuilder( override val proxyUser: String, @@ -127,7 +127,7 @@ class SparkProcessBuilder( completeMasterUrl(conf) KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf) - val buffer = new ArrayBuffer[String]() + val buffer = new mutable.ListBuffer[String]() buffer += executable buffer += CLASS buffer += mainClass @@ -141,8 +141,7 @@ class SparkProcessBuilder( } // pass spark engine log path to spark conf (allConf ++ engineLogPathConf ++ appendPodNameConf(allConf)).foreach { case (k, v) => - buffer += CONF - buffer += s"${convertConfigKey(k)}=$v" + buffer ++= confKeyValue(convertConfigKey(k), v) } setupKerberos(buffer) @@ -154,7 +153,7 @@ class SparkProcessBuilder( override protected def module: String = "kyuubi-spark-sql-engine" - protected def setupKerberos(buffer: ArrayBuffer[String]): Unit = { + protected def setupKerberos(buffer: mutable.Buffer[String]): Unit = { // if the keytab is specified, PROXY_USER is not supported tryKeytab() match { case None => @@ -286,13 +285,11 @@ class SparkProcessBuilder( override def validateConf: Unit = Validator.validateConf(conf) // For spark on kubernetes, spark pod using env SPARK_USER_NAME as current user - def setSparkUserName(userName: String, buffer: ArrayBuffer[String]): Unit = { + def setSparkUserName(userName: String, buffer: mutable.Buffer[String]): Unit = { clusterManager().foreach { cm => if (cm.toUpperCase.startsWith("K8S")) { - buffer += CONF - buffer += s"spark.kubernetes.driverEnv.SPARK_USER_NAME=$userName" - buffer += CONF - buffer += s"spark.executorEnv.SPARK_USER_NAME=$userName" + buffer ++= confKeyValue("spark.kubernetes.driverEnv.SPARK_USER_NAME", userName) + buffer ++= confKeyValue("spark.executorEnv.SPARK_USER_NAME", userName) } } } @@ -335,7 +332,6 @@ object SparkProcessBuilder { "spark.kubernetes.kerberos.krb5.path", "spark.kubernetes.file.upload.path") - final private[spark] val CONF = "--conf" final private[spark] val CLASS = "--class" final private[spark] val PROXY_USER = "--proxy-user" final private[spark] val SPARK_FILES = "spark.files" diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala index 04dc49e037a..96502fb9607 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/trino/TrinoProcessBuilder.scala @@ -19,20 +19,18 @@ package org.apache.kyuubi.engine.trino import java.io.File import java.nio.file.Paths -import java.util -import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable import com.google.common.annotations.VisibleForTesting import org.apache.kyuubi.{Logging, SCALA_COMPILE_VERSION, Utils} -import org.apache.kyuubi.Utils.REDACTION_REPLACEMENT_TEXT import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.{KyuubiApplicationManager, ProcBuilder} import org.apache.kyuubi.operation.log.OperationLog +import org.apache.kyuubi.util.command.CommandLineUtils._ class TrinoProcessBuilder( override val proxyUser: String, @@ -58,7 +56,7 @@ class TrinoProcessBuilder( require( conf.get(ENGINE_TRINO_CONNECTION_CATALOG).nonEmpty, s"Trino default catalog can not be null! Please set ${ENGINE_TRINO_CONNECTION_CATALOG.key}") - val buffer = new ArrayBuffer[String]() + val buffer = new mutable.ListBuffer[String]() buffer += executable val memory = conf.get(ENGINE_TRINO_MEMORY) @@ -68,8 +66,7 @@ class TrinoProcessBuilder( buffer += javaOptions.get } - buffer += "-cp" - val classpathEntries = new util.LinkedHashSet[String] + val classpathEntries = new mutable.LinkedHashSet[String] // trino engine runtime jar mainResource.foreach(classpathEntries.add) @@ -90,20 +87,18 @@ class TrinoProcessBuilder( val extraCp = conf.get(ENGINE_TRINO_EXTRA_CLASSPATH) extraCp.foreach(classpathEntries.add) - buffer += classpathEntries.asScala.mkString(File.pathSeparator) + buffer ++= genClasspathOption(classpathEntries) + buffer += mainClass // TODO: How shall we deal with proxyUser, // user.name // kyuubi.session.user // or just leave it, because we can handle it at operation layer - buffer += "--conf" - buffer += s"$KYUUBI_SESSION_USER_KEY=$proxyUser" + buffer ++= confKeyValue(KYUUBI_SESSION_USER_KEY, proxyUser) + + buffer ++= confKeyValues(conf.getAll) - for ((k, v) <- conf.getAll) { - buffer += "--conf" - buffer += s"$k=$v" - } buffer } @@ -113,15 +108,12 @@ class TrinoProcessBuilder( if (commands == null) { super.toString } else { - Utils.redactCommandLineArgs(conf, commands).map { - case arg if arg.contains(ENGINE_TRINO_CONNECTION_PASSWORD.key) => - s"${ENGINE_TRINO_CONNECTION_PASSWORD.key}=$REDACTION_REPLACEMENT_TEXT" - case arg if arg.contains(ENGINE_TRINO_CONNECTION_KEYSTORE_PASSWORD.key) => - s"${ENGINE_TRINO_CONNECTION_KEYSTORE_PASSWORD.key}=$REDACTION_REPLACEMENT_TEXT" - case arg if arg.contains(ENGINE_TRINO_CONNECTION_TRUSTSTORE_PASSWORD.key) => - s"${ENGINE_TRINO_CONNECTION_TRUSTSTORE_PASSWORD.key}=$REDACTION_REPLACEMENT_TEXT" - case arg => arg - }.map { + redactConfValues( + Utils.redactCommandLineArgs(conf, commands), + Set( + ENGINE_TRINO_CONNECTION_PASSWORD.key, + ENGINE_TRINO_CONNECTION_KEYSTORE_PASSWORD.key, + ENGINE_TRINO_CONNECTION_TRUSTSTORE_PASSWORD.key)).map { case arg if arg.startsWith("-") => s"\\\n\t$arg" case arg => arg }.mkString(" ") diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index 27fd36815f8..6b498628b00 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -35,6 +35,7 @@ import org.apache.kyuubi.ha.HighAvailabilityConf import org.apache.kyuubi.ha.client.AuthTypes import org.apache.kyuubi.service.ServiceUtils import org.apache.kyuubi.util.AssertionUtils._ +import org.apache.kyuubi.util.command.CommandLineUtils._ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { private def conf = KyuubiConf().set("kyuubi.on", "off") diff --git a/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/command/CommandLineUtils.scala b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/command/CommandLineUtils.scala new file mode 100644 index 00000000000..91327223a60 --- /dev/null +++ b/kyuubi-util-scala/src/main/scala/org/apache/kyuubi/util/command/CommandLineUtils.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.util.command + +import java.io.File + +import scala.util.matching.Regex + +object CommandLineUtils { + val CONF = "--conf" + + val PATTERN_FOR_KEY_VALUE_ARG: Regex = "(.+?)=(.+)".r + + val REDACTION_REPLACEMENT_TEXT = "*********(redacted)" + + /** + * The Java command's option name for classpath + */ + val CP = "-cp" + + /** + * Assemble key value pair with "=" seperator + */ + def genKeyValuePair(key: String, value: String): String = s"$key=$value".trim + + /** + * Assemble key value pair with config option prefix + */ + def confKeyValue(key: String, value: String, confOption: String = CONF): Iterable[String] = + Seq(confOption, genKeyValuePair(key, value)) + + def confKeyValueStr(key: String, value: String, confOption: String = CONF): String = + confKeyValue(key, value, confOption).mkString(" ") + + def confKeyValues(configs: Iterable[(String, String)]): Iterable[String] = + configs.flatMap { case (k, v) => confKeyValue(k, v) }.toSeq + + /** + * Generate classpath option by assembling the classpath entries with "-cp" prefix + */ + def genClasspathOption(classpathEntries: Iterable[String]): Iterable[String] = + Seq(CP, classpathEntries.mkString(File.pathSeparator)) + + /** + * Match the conf string in the form of "key=value" + * and redact the value with the replacement text if keys are contained in given config keys + */ + def redactConfValues( + commands: Iterable[String], + redactKeys: Iterable[String]): Iterable[String] = { + redactKeys.toSet match { + case redactKeySet if redactKeySet.isEmpty => commands + case redactKeySet => commands.map { + case PATTERN_FOR_KEY_VALUE_ARG(key, _) if redactKeySet.contains(key) => + genKeyValuePair(key, REDACTION_REPLACEMENT_TEXT) + case part => part + } + } + } +} diff --git a/kyuubi-util-scala/src/test/scala/org/apache/kyuubi/util/command/CommandUtilsSuite.scala b/kyuubi-util-scala/src/test/scala/org/apache/kyuubi/util/command/CommandUtilsSuite.scala new file mode 100644 index 00000000000..e000e7478b6 --- /dev/null +++ b/kyuubi-util-scala/src/test/scala/org/apache/kyuubi/util/command/CommandUtilsSuite.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kyuubi.util.command +// scalastyle:off +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.kyuubi.util.AssertionUtils._ +import org.apache.kyuubi.util.command.CommandLineUtils._ + +// scalastyle:off +class CommandUtilsSuite extends AnyFunSuite { +// scalastyle:on + + test("assemble key value pair") { + assertResult("abc=123")(genKeyValuePair("abc", "123")) + assertResult("abc=123")(genKeyValuePair(" abc", "123 ")) + assertResult("abc.def=xyz.123")(genKeyValuePair("abc.def", "xyz.123")) + + assertMatches(genKeyValuePair("abc", "123"), PATTERN_FOR_KEY_VALUE_ARG) + assertMatches(genKeyValuePair(" abc", "123 "), PATTERN_FOR_KEY_VALUE_ARG) + assertMatches(genKeyValuePair("abc.def", "xyz.123"), PATTERN_FOR_KEY_VALUE_ARG) + } + + test("assemble key value pair with config option") { + assertResult("--conf abc=123")(confKeyValueStr("abc", "123")) + assertResult("--conf abc.def=xyz.123")(confKeyValueStr("abc.def", "xyz.123")) + + assertResult(Seq("--conf", "abc=123"))(confKeyValue("abc", "123")) + assertResult(Seq("--conf", "abc.def=xyz.123"))(confKeyValue("abc.def", "xyz.123")) + } + + test("assemble classpath options") { + assertResult(Seq("-cp", "/path/a.jar:/path2/b*.jar"))( + genClasspathOption(Seq("/path/a.jar", "/path2/b*.jar"))) + } +}