Skip to content

Commit

Permalink
[KYUUBI apache#6130] Stop engine immediately after close session for …
Browse files Browse the repository at this point in the history
…`CONNECTION` level FlinkSQLEngine

# 🔍 Description
## Issue References 🔗

This pull request fixes apache#6130

## Describe Your Solution 🔧

Stop engine immediately after close session for `CONNECTION` level FlinkSQLEngine

## Types of changes 🔖

- [X] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan 🧪

#### Behavior Without This Pull Request ⚰️

#### Behavior With This Pull Request 🎉

#### Related Unit Tests

---

# Checklist 📝

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes apache#6132 from wForget/KYUUBI-6130.

Closes apache#6130

033c0ff [wforget] assert exception
c0ce68e [wforget] debug
07e0320 [wforget] add test
a3c4ae3 [wforget] [KYUUBI-6130] Stop engine immediately after close session for `CONNECTION` level FlinkSQLEngine

Authored-by: wforget <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
wForget authored and zhaohehuhu committed Mar 21, 2024
1 parent d02536d commit 7c15693
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import org.apache.flink.table.gateway.api.session.SessionEnvironment
import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion
import org.apache.flink.table.gateway.service.context.DefaultContext

import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.flink.FlinkSQLEngine
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
import org.apache.kyuubi.engine.flink.shim.FlinkSessionManager
import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
Expand All @@ -35,6 +38,8 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)

override protected def isServer: Boolean = false

private lazy val shareLevel = ShareLevel.withName(conf.get(ENGINE_SHARE_LEVEL))

val operationManager = new FlinkSQLOperationManager()
val sessionManager = new FlinkSessionManager(engineContext)

Expand Down Expand Up @@ -77,10 +82,23 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
}

override def closeSession(sessionHandle: SessionHandle): Unit = {
val fSession = super.getSessionOption(sessionHandle)
fSession.foreach(s =>
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle))
super.closeSession(sessionHandle)
try {
val fSession = super.getSessionOption(sessionHandle)
fSession.foreach(s =>
sessionManager.closeSession(s.asInstanceOf[FlinkSessionImpl].fSession.getSessionHandle))
super.closeSession(sessionHandle)
} catch {
case t: Throwable =>
warn(s"Error closing session $sessionHandle", t)
}
if (shareLevel == ShareLevel.CONNECTION) {
info("Session stopped due to shared level is Connection.")
stopSession()
}
}

private def stopSession(): Unit = {
FlinkSQLEngine.currentEngine.foreach(_.stop())
}

override def stop(): Unit = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ package org.apache.kyuubi.engine.flink.operation

import java.util.UUID

import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._

import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_USER_KEY
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal}
import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
import org.apache.kyuubi.operation.{HiveJDBCTestHelper, NoneMode}

class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
trait FlinkEngineInitializeSuite extends HiveJDBCTestHelper
with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal {

protected def jdbcUrl: String = getFlinkEngineServiceUrl
Expand All @@ -51,7 +55,7 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
HA_NAMESPACE.key -> namespace,
HA_ENGINE_REF_ID.key -> engineRefId,
ENGINE_TYPE.key -> "FLINK_SQL",
ENGINE_SHARE_LEVEL.key -> shareLevel,
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name,
ENGINE_FLINK_INITIALIZE_SQL.key -> ENGINE_INITIALIZE_SQL_VALUE,
ENGINE_SESSION_FLINK_INITIALIZE_SQL.key -> ENGINE_SESSION_INITIALIZE_SQL_VALUE,
Expand All @@ -62,7 +66,7 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper

def namespace: String = "/kyuubi/flink-local-engine-test"

def shareLevel: String = ShareLevel.USER.toString
def shareLevel: ShareLevel

def engineType: String = "flink"

Expand Down Expand Up @@ -100,5 +104,33 @@ class FlinkEngineInitializeSuite extends HiveJDBCTestHelper
assert(dropResult.next())
assert(dropResult.getString(1) === "OK")
}
// check engine alive status after close session with connection level engine
if (shareLevel == ShareLevel.CONNECTION) {
eventually(Timeout(10.seconds)) {
assert(!engineProcess.isAlive)
}
val e = intercept[Exception] {
withJdbcStatement() { statement =>
statement.executeQuery("select 1")
}
}
assert(e.getMessage() == "Time out retrieving Flink engine service url.")
}
// check engine alive status after close session with user level engine
if (shareLevel == ShareLevel.USER) {
assert(engineProcess.isAlive)
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select 1")
assert(resultSet.next())
}
}
}
}

class FlinkConnectionLevelEngineInitializeSuite extends FlinkEngineInitializeSuite {
def shareLevel: ShareLevel = ShareLevel.CONNECTION
}

class FlinkUserLevelEngineInitializeSuite extends FlinkEngineInitializeSuite {
def shareLevel: ShareLevel = ShareLevel.USER
}

0 comments on commit 7c15693

Please sign in to comment.