Skip to content

Commit

Permalink
Remove flink. prefix for create session configurations
Browse files Browse the repository at this point in the history
  • Loading branch information
wForget committed Mar 11, 2024
1 parent 1dddd08 commit 4e2f602
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,21 +54,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
val newConf = conf.map {
case (k, v) => k.stripPrefix("flink.") -> v
}
newConf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
getSessionOption).getOrElse {
val flinkInternalSession = sessionManager.openSession(
SessionEnvironment.newBuilder
.setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1)
.addSessionConfig(mapAsJavaMap(conf))
.addSessionConfig(mapAsJavaMap(newConf))
.build)
val sessionConfig = flinkInternalSession.getSessionConfig
sessionConfig.putAll(conf.asJava)
sessionConfig.putAll(newConf.asJava)
val session = new FlinkSessionImpl(
protocol,
user,
password,
ipAddress,
conf,
newConf,
this,
flinkInternalSession)
session
Expand Down

0 comments on commit 4e2f602

Please sign in to comment.