diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala index b7cd462172f..5bc58b1722d 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala @@ -25,7 +25,10 @@ import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion import org.apache.flink.table.gateway.service.context.DefaultContext import org.apache.hive.service.rpc.thrift.TProtocolVersion +import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY +import org.apache.kyuubi.engine.ShareLevel +import org.apache.kyuubi.engine.flink.FlinkSQLEngine import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager import org.apache.kyuubi.engine.flink.shim.FlinkSessionManager import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager} @@ -35,6 +38,8 @@ class FlinkSQLSessionManager(engineContext: DefaultContext) override protected def isServer: Boolean = false + private lazy val shareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL)) + val operationManager = new FlinkSQLOperationManager() val sessionManager = new FlinkSessionManager(engineContext) @@ -77,10 +82,23 @@ class FlinkSQLSessionManager(engineContext: DefaultContext) } override def closeSession(sessionHandle: SessionHandle): Unit = { - val fSession = super.getSessionOption(sessionHandle) - fSession.foreach(s => - sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle)) - super.closeSession(sessionHandle) + try { + val fSession = super.getSessionOption(sessionHandle) + fSession.foreach(s => + sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle)) + super.closeSession(sessionHandle) + } catch { + case t: Throwable => + warn(s"Error closing session $sessionHandle", t) + } + if (shareLevel == ShareLevel.CONNECTION) { + info("Session stopped due to shared level is Connection.") + stopSession() + } + } + + private def stopSession(): Unit = { + FlinkSQLEngine.currentEngine.foreach(_.stop()) } override def stop(): Unit = synchronized {