From 32c5033568cd396fa80173743f0286a722575e27 Mon Sep 17 00:00:00 2001 From: Paul Lin Date: Tue, 5 Sep 2023 04:23:28 +0800 Subject: [PATCH] [KYUUBI #5238] Fix credentials may break Flink engine launch command ### _Why are the changes needed?_ Currently, Flink engine doesn't use delegation tokens and these tokens need to be filtered out from the Flink engine launch command, or the command may be corrupted because the credentials could contain new lines. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request ### _Was this patch authored or co-authored using generative AI tooling?_ No. Closes #5238 from link3280/filter_engine_credential. Closes #5238 5e2403a53 [Paul Lin] Optimize code style 41df6e2a4 [Paul Lin] Fix test error 524189443 [Paul Lin] Fix credentials may break Flink engine launch command Authored-by: Paul Lin Signed-off-by: Cheng Pan --- .../org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala | 5 +++-- .../kyuubi/engine/flink/FlinkProcessBuilderSuite.scala | 4 ++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala index 716db0fcc54..77b9cd6d3b1 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.{ArrayBuffer, ListBuffer} import com.google.common.annotations.VisibleForTesting import org.apache.kyuubi._ -import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys} import org.apache.kyuubi.config.KyuubiConf._ import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY import org.apache.kyuubi.engine.{ApplicationManagerInfo, KyuubiApplicationManager, ProcBuilder} @@ -79,7 +79,8 @@ class FlinkProcessBuilder( override protected val commands: Array[String] = { KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf) - + // unset engine credentials because Flink doesn't support them at the moment + conf.unset(KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY) // flink.execution.target are required in Kyuubi conf currently executionTarget match { case Some("yarn-application") => diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala index 25295c37431..990d56f15c4 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala @@ -27,6 +27,7 @@ import scala.util.matching.Regex import org.apache.kyuubi.KyuubiFunSuite import org.apache.kyuubi.config.KyuubiConf import org.apache.kyuubi.config.KyuubiConf.{ENGINE_FLINK_APPLICATION_JARS, ENGINE_FLINK_EXTRA_CLASSPATH, ENGINE_FLINK_JAVA_OPTIONS, ENGINE_FLINK_MEMORY} +import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_CREDENTIALS_KEY import org.apache.kyuubi.engine.flink.FlinkProcessBuilder._ class FlinkProcessBuilderSuite extends KyuubiFunSuite { @@ -37,12 +38,14 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { .set( ENGINE_FLINK_JAVA_OPTIONS, "-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005") + .set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used") private def applicationModeConf = KyuubiConf() .set("flink.execution.target", "yarn-application") .set(ENGINE_FLINK_APPLICATION_JARS, tempUdfJar.toString) .set(APP_KEY, "kyuubi_connection_flink_paul") .set("kyuubi.on", "off") + .set(KYUUBI_ENGINE_CREDENTIALS_KEY, "should-not-be-used") private val tempFlinkHome = Files.createTempDirectory("flink-home").toFile private val tempOpt = @@ -65,6 +68,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite { (FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop") private def confStr: String = { sessionModeConf.clone.getAll + .filter(!_._1.equals(KYUUBI_ENGINE_CREDENTIALS_KEY)) .map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" } .mkString(" ") }