diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala index 1bb82a0ef6d..221fc0dfea2 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala @@ -54,21 +54,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext) password: String, ipAddress: String, conf: Map[String, String]): Session = { - conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( + val newConf = conf.map { + case (k, v) => k.stripPrefix("flink.") -> v + } + newConf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( getSessionOption).getOrElse { val flinkInternalSession = sessionManager.openSession( SessionEnvironment.newBuilder .setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1) - .addSessionConfig(mapAsJavaMap(conf)) + .addSessionConfig(mapAsJavaMap(newConf)) .build) val sessionConfig = flinkInternalSession.getSessionConfig - sessionConfig.putAll(conf.asJava) + sessionConfig.putAll(newConf.asJava) val session = new FlinkSessionImpl( protocol, user, password, ipAddress, - conf, + newConf, this, flinkInternalSession) session