From 7562a9711506ee922729a73336ff6c7b17c21904 Mon Sep 17 00:00:00 2001 From: wforget <643348094@qq.com> Date: Tue, 12 Mar 2024 19:39:38 +0800 Subject: [PATCH] [KYUUBI #6156] Remove `flink.` prefix for create session configurations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes #6156 ## Describe Your Solution ๐Ÿ”ง Remove `flink.` prefix for open flink session configurations. ## Types of changes :bookmark: - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช #### Behavior Without This Pull Request :coffin: #### Behavior With This Pull Request :tada: #### Related Unit Tests --- # Checklist ๐Ÿ“ - [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6157 from wForget/KYUUBI-6156. Closes #6156 fc750dc5a [wforget] comment f6134919c [wforget] Remove `flink.` prefix for create session configurations Authored-by: wforget <643348094@qq.com> Signed-off-by: Cheng Pan --- .../session/FlinkSQLSessionManager.scala | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala index bbdcdc437b5..73908bef8ef 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala @@ -54,23 +54,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext) password: String, ipAddress: String, conf: Map[String, String]): Session = { - conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( - getSessionOption).getOrElse { - val flinkInternalSession = sessionManager.openSession( - SessionEnvironment.newBuilder - .setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1) - .addSessionConfig(mapAsJavaMap(conf)) - .build) - val session = new FlinkSessionImpl( - protocol, - user, - password, - ipAddress, - conf, - this, - flinkInternalSession) - session - } + val normalizedConf = conf.map { case (k, v) => k.stripPrefix("flink.") -> v } + normalizedConf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID) + .flatMap(getSessionOption).getOrElse { + val flinkInternalSession = sessionManager.openSession( + SessionEnvironment.newBuilder + .setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1) + .addSessionConfig(mapAsJavaMap(normalizedConf)) + .build) + val session = new FlinkSessionImpl( + protocol, + user, + password, + ipAddress, + normalizedConf, + this, + flinkInternalSession) + session + } } override def getSessionOption(sessionHandle: SessionHandle): Option[Session] = {