Skip to content

Commit

Permalink
[KYUUBI #4746] Do not recreate async request executor if has been shu…
Browse files Browse the repository at this point in the history
…tdown

### _Why are the changes needed?_

After #4480, there should be only one asyncRequestExecutor in one KyuubiSyncThriftClient

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4746 from turboFei/engine_alive.

Closes #4746

9d2fa5f [fwang12] fix typo
d90263b [fwang12] check
38aefdf [fwang12] close protocol first
6eb6178 [fwang12] do not renew executor
64e66a8 [fwang12] close protocol first

Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
  • Loading branch information
turboFei committed Apr 21, 2023
1 parent ccacb33 commit 8d424ef
Showing 1 changed file with 10 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,16 @@ class KyuubiSyncThriftClient private (
private var engineAliveThreadPool: ScheduledExecutorService = _
@volatile private var engineLastAlive: Long = _

private var asyncRequestExecutor: ExecutorService = _
private lazy val asyncRequestExecutor: ExecutorService =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"async-request-executor-" + SessionHandle(_remoteSessionHandle))

@VisibleForTesting
@volatile private[kyuubi] var asyncRequestInterrupted: Boolean = false

@VisibleForTesting
private[kyuubi] def getEngineAliveProbeProtocol: Option[TProtocol] = engineAliveProbeProtocol

private def newAsyncRequestExecutor(): ExecutorService = {
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
"async-request-executor-" + _remoteSessionHandle)
}

private def shutdownAsyncRequestExecutor(): Unit = {
Option(asyncRequestExecutor).filterNot(_.isShutdown).foreach(ThreadUtils.shutdown(_))
asyncRequestInterrupted = true
Expand Down Expand Up @@ -109,18 +106,18 @@ class KyuubiSyncThriftClient private (
}
}
} else {
shutdownAsyncRequestExecutor()
warn(s"Removing Clients for ${_remoteSessionHandle}")
Seq(protocol).union(engineAliveProbeProtocol.toSeq).foreach { tProtocol =>
Utils.tryLogNonFatalError {
if (tProtocol.getTransport.isOpen) {
tProtocol.getTransport.close()
}
}
clientClosedOnEngineBroken = true
Option(engineAliveThreadPool).foreach { pool =>
ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, TimeUnit.MILLISECONDS))
}
}
clientClosedOnEngineBroken = true
shutdownAsyncRequestExecutor()
Option(engineAliveThreadPool).foreach { pool =>
ThreadUtils.shutdown(pool, Duration(engineAliveProbeInterval, TimeUnit.MILLISECONDS))
}
}
}
Expand All @@ -144,8 +141,8 @@ class KyuubiSyncThriftClient private (
}

private def withLockAcquiredAsyncRequest[T](block: => T): T = withLockAcquired {
if (asyncRequestExecutor == null || asyncRequestExecutor.isShutdown) {
asyncRequestExecutor = newAsyncRequestExecutor()
if (asyncRequestExecutor.isShutdown) {
throw KyuubiSQLException.connectionDoesNotExist()
}

val task = asyncRequestExecutor.submit(() => {
Expand Down

0 comments on commit 8d424ef

Please sign in to comment.