Skip to content

Commit

Permalink
[KYUUBI apache#6156] Remove flink. prefix for create session config…
Browse files Browse the repository at this point in the history
…urations

# 🔍 Description
## Issue References 🔗

This pull request fixes apache#6156

## Describe Your Solution 🔧

Remove `flink.` prefix for open flink session configurations.

## Types of changes 🔖

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes apache#6157 from wForget/KYUUBI-6156.

Closes apache#6156

fc750dc [wforget] comment
f613491 [wforget] Remove `flink.` prefix for create session configurations

Authored-by: wforget <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
wForget authored and zhaohehuhu committed Mar 21, 2024
1 parent f580271 commit 5100901
Showing 1 changed file with 18 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,24 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
password: String,
ipAddress: String,
conf: Map[String, String]): Session = {
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
getSessionOption).getOrElse {
val flinkInternalSession = sessionManager.openSession(
SessionEnvironment.newBuilder
.setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1)
.addSessionConfig(mapAsJavaMap(conf))
.build)
val session = new FlinkSessionImpl(
protocol,
user,
password,
ipAddress,
conf,
this,
flinkInternalSession)
session
}
val normalizedConf = conf.map { case (k, v) => k.stripPrefix("flink.") -> v }
normalizedConf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID)
.flatMap(getSessionOption).getOrElse {
val flinkInternalSession = sessionManager.openSession(
SessionEnvironment.newBuilder
.setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1)
.addSessionConfig(mapAsJavaMap(normalizedConf))
.build)
val session = new FlinkSessionImpl(
protocol,
user,
password,
ipAddress,
normalizedConf,
this,
flinkInternalSession)
session
}
}

override def getSessionOption(sessionHandle: SessionHandle): Option[Session] = {
Expand Down

0 comments on commit 5100901

Please sign in to comment.