Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #5952] Disconnect connections without running operations after engine maxlife time graceful period #6040

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 52 additions & 51 deletions docs/configuration/settings.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
info(s"Spark engine is de-registering from engine discovery space.")
frontendServices.flatMap(_.discoveryService).foreach(_.stop())
while (backendService.sessionManager.getOpenSessionCount > 0) {
Thread.sleep(1000 * 60)
Thread.sleep(TimeUnit.SECONDS.toMillis(10))
}
info(s"Spark engine has no open session now, terminating.")
stop()
Expand Down Expand Up @@ -170,8 +170,10 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
val maxLifetime = conf.get(ENGINE_SPARK_MAX_LIFETIME)
val deregistered = new AtomicBoolean(false)
if (maxLifetime > 0) {
val gracefulPeriod = conf.get(ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD)
val checkTask: Runnable = () => {
if (!shutdown.get && System.currentTimeMillis() - getStartTime > maxLifetime) {
val elapsedTime = System.currentTimeMillis() - getStartTime
if (!shutdown.get && elapsedTime > maxLifetime) {
if (deregistered.compareAndSet(false, true)) {
info(s"Spark engine has been running for more than $maxLifetime ms," +
s" deregistering from engine discovery space.")
Expand All @@ -182,6 +184,24 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin
info(s"Spark engine has been running for more than $maxLifetime ms" +
s" and no open session now, terminating.")
stop()
} else if (gracefulPeriod > 0 && elapsedTime > maxLifetime + gracefulPeriod) {
backendService.sessionManager.allSessions().foreach { session =>
val operationCount =
backendService.sessionManager.operationManager.allOperations()
.filter(_.getSession == session)
.size
if (operationCount == 0) {
warn(s"Closing session ${session.handle.identifier} forcibly that has no" +
s" operation and has been running for more than $gracefulPeriod ms after engine" +
s" max lifetime.")
try {
backendService.sessionManager.closeSession(session.handle)
} catch {
case e: Throwable =>
error(s"Error closing session ${session.handle.identifier}", e)
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,13 @@

package org.apache.kyuubi.engine.spark

import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel

trait EtcdShareLevelSparkEngineSuite
extends ShareLevelSparkEngineTests with WithEtcdCluster {
override def withKyuubiConf: Map[String, String] = {
super.withKyuubiConf ++
etcdConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
super.withKyuubiConf ++ etcdConf
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.UUID
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime

import org.apache.kyuubi.config.KyuubiConf.{ENGINE_CHECK_INTERVAL, ENGINE_SHARE_LEVEL, ENGINE_SPARK_MAX_INITIAL_WAIT, ENGINE_SPARK_MAX_LIFETIME, ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD}
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel
import org.apache.kyuubi.operation.HiveJDBCTestHelper
Expand All @@ -35,6 +36,13 @@ trait ShareLevelSparkEngineTests
extends WithDiscoverySparkSQLEngine with HiveJDBCTestHelper {
def shareLevel: ShareLevel

override def withKyuubiConf: Map[String, String] = super.withKyuubiConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT5s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT2s",
ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD.key -> "100")

override protected def jdbcUrl: String = getJdbcUrl
override val namespace: String = {
// for test, we always use uuid as namespace
Expand Down Expand Up @@ -76,4 +84,23 @@ trait ShareLevelSparkEngineTests
}
}
}

test("test spark engine max life-time with graceful period") {
withDiscoveryClient { discoveryClient =>
assert(engine.getServiceState == ServiceState.STARTED)
assert(discoveryClient.pathExists(namespace))
withJdbcStatement() { _ =>
eventually(Timeout(30.seconds)) {
shareLevel match {
case ShareLevel.CONNECTION =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(discoveryClient.pathNonExists(namespace))
case _ =>
assert(engine.getServiceState == ServiceState.STOPPED)
assert(discoveryClient.pathExists(namespace))
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,13 @@

package org.apache.kyuubi.engine.spark

import org.apache.kyuubi.config.KyuubiConf.ENGINE_CHECK_INTERVAL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SHARE_LEVEL
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_INITIAL_WAIT
import org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAX_LIFETIME
import org.apache.kyuubi.engine.ShareLevel
import org.apache.kyuubi.engine.ShareLevel.ShareLevel

trait ZookeeperShareLevelSparkEngineSuite
extends ShareLevelSparkEngineTests with WithEmbeddedZookeeper {
override def withKyuubiConf: Map[String, String] = {
super.withKyuubiConf ++
zookeeperConf ++ Map(
ENGINE_SHARE_LEVEL.key -> shareLevel.toString,
ENGINE_SPARK_MAX_LIFETIME.key -> "PT20s",
ENGINE_SPARK_MAX_INITIAL_WAIT.key -> "0",
ENGINE_CHECK_INTERVAL.key -> "PT5s")
super.withKyuubiConf ++ zookeeperConf
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,16 @@ object KyuubiConf {
.timeConf
.createWithDefault(0)

val ENGINE_SPARK_MAX_LIFETIME_GRACEFUL_PERIOD: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.spark.max.lifetime.gracefulPeriod")
.doc("Graceful period for Spark engine to wait the connections disconnected after reaching" +
" the end of life. After the graceful period, all the connections without running" +
" operations will be forcibly disconnected. 0 or negative means always waiting the" +
" connections disconnected.")
.version("1.8.1")
.timeConf
.createWithDefault(0)

val ENGINE_SPARK_MAX_INITIAL_WAIT: ConfigEntry[Long] =
buildConf("kyuubi.session.engine.spark.max.initial.wait")
.doc("Max wait time for the initial connection to Spark engine. The engine will" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.kyuubi.ha.client

import java.util.concurrent.CountDownLatch
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.kyuubi.Logging
Expand Down Expand Up @@ -68,7 +68,7 @@ abstract class ServiceDiscovery(
def stopGracefully(isLost: Boolean = false): Unit = {
while (fe.be.sessionManager.getOpenSessionCount > 0) {
info(s"${fe.be.sessionManager.getOpenSessionCount} connection(s) are active, delay shutdown")
Thread.sleep(1000 * 60)
Thread.sleep(TimeUnit.SECONDS.toMillis(10))
}
isServerLost.set(isLost)
gracefulShutdownLatch.countDown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import org.apache.kyuubi.ha.client.DiscoveryPaths
import org.apache.kyuubi.ha.client.ServiceDiscovery
import org.apache.kyuubi.ha.client.ServiceNodeInfo
import org.apache.kyuubi.ha.client.etcd.EtcdDiscoveryClient._
import org.apache.kyuubi.util.ThreadUtils

class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {

Expand Down Expand Up @@ -381,7 +382,12 @@ class EtcdDiscoveryClient(conf: KyuubiConf) extends DiscoveryClient {
.filter(_.getEventType == WatchEvent.EventType.DELETE).foreach(_ => {
warn(s"This Kyuubi instance ${instance} is now de-registered from" +
s" ETCD. The server will be shut down after the last client session completes.")
serviceDiscovery.stopGracefully()
// for jetcd, the watcher event process might block the main thread,
// so start a new thread to do the de-register work as a workaround,
// see details in https://github.com/etcd-io/jetcd/issues/1089
ThreadUtils.runInNewThread("deregister-watcher-thread", isDaemon = false) {
serviceDiscovery.stopGracefully()
}
})
}

Expand Down
Loading