Skip to content

Commit

Permalink
[KYUUBI apache#6130] Stop engine immediately after close session for …
Browse files Browse the repository at this point in the history
…`CONNECTION` level FlinkSQLEngine

# 🔍 Description
## Issue References 🔗

This pull request fixes apache#6130

## Describe Your Solution 🔧

Stop engine immediately after close session for `CONNECTION` level FlinkSQLEngine

## Types of changes 🔖

- [X] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes apache#6132 from wForget/KYUUBI-6130.

Closes apache#6130

033c0ff [wforget] assert exception
c0ce68e [wforget] debug
07e0320 [wforget] add test
a3c4ae3 [wforget] [KYUUBI-6130] Stop engine immediately after close session for `CONNECTION` level FlinkSQLEngine

Authored-by: wforget <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>

(cherry picked from commit 9c18a7e)
  • Loading branch information
wForget committed Mar 14, 2024
1 parent 734fdbd commit e539e70
Showing 1 changed file with 22 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion
import org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.hive.service.rpc.thrift.TProtocolVersion

import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.flink.FlinkSQLEngine
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
import org.apache.kyuubi.engine.flink.shim.FlinkSessionManager
import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
Expand All @@ -35,6 +38,8 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)

override protected def isServer: Boolean = false

private lazy val shareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))

val operationManager = new FlinkSQLOperationManager()
val sessionManager = new FlinkSessionManager(engineContext)

Expand Down Expand Up @@ -77,10 +82,23 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
}

override def closeSession(sessionHandle: SessionHandle): Unit = {
val fSession = super.getSessionOption(sessionHandle)
fSession.foreach(s =>
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle))
super.closeSession(sessionHandle)
try {
val fSession = super.getSessionOption(sessionHandle)
fSession.foreach(s =>
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle))
super.closeSession(sessionHandle)
} catch {
case t: Throwable =>
warn(s"Error closing session $sessionHandle", t)
}
if (shareLevel == ShareLevel.CONNECTION) {
info("Session stopped due to shared level is Connection.")
stopSession()
}
}

private def stopSession(): Unit = {
FlinkSQLEngine.currentEngine.foreach(_.stop())
}

override def stop(): Unit = synchronized {
Expand Down

0 comments on commit e539e70

Please sign in to comment.