diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala index bbdcdc437b5..73908bef8ef 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala @@ -54,23 +54,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext) password: String, ipAddress: String, conf: Map[String, String]): Session = { - conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap( - getSessionOption).getOrElse { - val flinkInternalSession = sessionManager.openSession( - SessionEnvironment.newBuilder - .setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1) - .addSessionConfig(mapAsJavaMap(conf)) - .build) - val session = new FlinkSessionImpl( - protocol, - user, - password, - ipAddress, - conf, - this, - flinkInternalSession) - session - } + val normalizedConf = conf.map { case (k, v) => k.stripPrefix("flink.") -> v } + normalizedConf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID) + .flatMap(getSessionOption).getOrElse { + val flinkInternalSession = sessionManager.openSession( + SessionEnvironment.newBuilder + .setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1) + .addSessionConfig(mapAsJavaMap(normalizedConf)) + .build) + val session = new FlinkSessionImpl( + protocol, + user, + password, + ipAddress, + normalizedConf, + this, + flinkInternalSession) + session + } } override def getSessionOption(sessionHandle: SessionHandle): Option[Session] = {