Skip to content

Commit

Permalink
[KYUUBI #5243] Distinguish metadata between batch impl v2 and recovery
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

The `recoveryMetadata` is not accurate after batch impl is introduced. This PR proposes to rename `recoveryMetadata` to `metadata` and introduce a dedicated flay `fromRecovery` to distinguish metadata between them.

This PR also partially reverts #4798, by removing unnecessary constructor parameters `shouldRunAsync` and `batchConf`

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No.

Closes #5243 from pan3793/meta-recov.

Closes #5243

0718fbe [Cheng Pan] nit
b835846 [Cheng Pan] simplify
a2d6519 [Cheng Pan] fix test
2dad868 [Cheng Pan] refactor
f83d2a6 [Cheng Pan] Distinguish batch impl v2 metadata from recovery

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
  • Loading branch information
pan3793 committed Sep 5, 2023
1 parent c3b7af0 commit 6a23f88
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.kyuubi.kubernetes.test.spark

import java.util.UUID

import scala.collection.JavaConverters._
import scala.concurrent.duration._

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -149,7 +148,6 @@ class KyuubiOperationKubernetesClusterClientModeSuite
"kyuubi",
"passwd",
"localhost",
batchRequest.getConf.asScala.toMap,
batchRequest)

eventually(timeout(3.minutes), interval(50.milliseconds)) {
Expand Down Expand Up @@ -217,7 +215,6 @@ class KyuubiOperationKubernetesClusterClusterModeSuite
"runner",
"passwd",
"localhost",
batchRequest.getConf.asScala.toMap,
batchRequest)

// wait for driver pod start
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,12 @@ class BatchJobSubmission(
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata],
override val shouldRunAsync: Boolean)
metadata: Option[Metadata])
extends KyuubiApplicationOperation(session) {
import BatchJobSubmission._

override def shouldRunAsync: Boolean = true

private val _operationLog = OperationLog.createOperationLog(session, getHandle)

private val applicationManager = session.sessionManager.applicationManager
Expand All @@ -75,7 +76,7 @@ class BatchJobSubmission(
private var killMessage: KillResponse = (false, "UNKNOWN")
def getKillMessage: KillResponse = killMessage

@volatile private var _appStartTime = recoveryMetadata.map(_.engineOpenTime).getOrElse(0L)
@volatile private var _appStartTime = metadata.map(_.engineOpenTime).getOrElse(0L)
def appStartTime: Long = _appStartTime
def appStarted: Boolean = _appStartTime > 0

Expand Down Expand Up @@ -184,21 +185,24 @@ class BatchJobSubmission(
override protected def runInternal(): Unit = session.handleSessionException {
val asyncOperation: Runnable = () => {
try {
recoveryMetadata match {
metadata match {
case Some(metadata) if metadata.peerInstanceClosed =>
setState(OperationState.CANCELED)
case Some(metadata) if metadata.state == OperationState.PENDING.toString =>
// In recovery mode, only submit batch job when previous state is PENDING
// and fail to fetch the status including appId from resource manager.
// Otherwise, monitor the submitted batch application.
// case 1: new batch job created using batch impl v2
// case 2: batch job from recovery, do submission only when previous state is
// PENDING and fail to fetch the status by appId from resource manager, which
// is similar with case 1; otherwise, monitor the submitted batch application.
_applicationInfo = currentApplicationInfo()
applicationId(_applicationInfo) match {
case Some(appId) => monitorBatchJob(appId)
case None => submitAndMonitorBatchJob()
case Some(appId) => monitorBatchJob(appId)
}
case Some(metadata) =>
// batch job from recovery which was submitted
monitorBatchJob(metadata.engineId)
case None =>
// brand-new job created using batch impl v1
submitAndMonitorBatchJob()
}
setStateIfNotCanceled(OperationState.FINISHED)
Expand All @@ -219,7 +223,6 @@ class BatchJobSubmission(
updateBatchMetadata()
}
}
if (!shouldRunAsync) getBackgroundHandle.get()
}

private def submitAndMonitorBatchJob(): Unit = {
Expand Down Expand Up @@ -295,19 +298,19 @@ class BatchJobSubmission(
}
if (_applicationInfo.isEmpty) {
info(s"The $batchType batch[$batchId] job: $appId not found, assume that it has finished.")
} else if (applicationFailed(_applicationInfo)) {
return
}
if (applicationFailed(_applicationInfo)) {
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
}
updateBatchMetadata()
// TODO: add limit for max batch job submission lifetime
while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) {
Thread.sleep(applicationCheckInterval)
updateApplicationInfoMetadataIfNeeded()
}
if (applicationFailed(_applicationInfo)) {
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
} else {
updateBatchMetadata()
// TODO: add limit for max batch job submission lifetime
while (_applicationInfo.isDefined && !applicationTerminated(_applicationInfo)) {
Thread.sleep(applicationCheckInterval)
updateApplicationInfoMetadataIfNeeded()
}

if (applicationFailed(_applicationInfo)) {
throw new KyuubiException(s"$batchType batch[$batchId] job failed: ${_applicationInfo}")
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata],
shouldRunAsync: Boolean): BatchJobSubmission = {
metadata: Option[Metadata]): BatchJobSubmission = {
val operation = new BatchJobSubmission(
session,
batchType,
Expand All @@ -91,8 +90,7 @@ class KyuubiOperationManager private (name: String) extends OperationManager(nam
className,
batchConf,
batchArgs,
recoveryMetadata,
shouldRunAsync)
metadata)
addOperation(operation)
operation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicBoolean
import org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.server.metadata.api.Metadata
import org.apache.kyuubi.service.{AbstractService, Serverable}
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.ThreadUtils
Expand Down Expand Up @@ -81,16 +80,9 @@ class KyuubiBatchService(
Option(metadata.requestName),
metadata.resource,
metadata.className,
metadata.requestConf,
metadata.requestArgs,
Some(metadata), // TODO some logic need to fix since it's not from recovery
shouldRunAsync = true)
val metadataForUpdate = Metadata(
identifier = batchId,
kyuubiInstance = kyuubiInstance,
requestConf = batchSession.optimizedConf,
clusterManager = batchSession.batchJobSubmissionOp.builder.clusterManager())
metadataManager.updateMetadata(metadataForUpdate, asyncRetryOnError = false)
Some(metadata),
fromRecovery = false)
val sessionHandle = sessionManager.openBatchSession(batchSession)
var submitted = false
while (!submitted) { // block until batch job submitted
Expand All @@ -113,7 +105,7 @@ class KyuubiBatchService(
// }
if (!submitted) Thread.sleep(1000)
}
info(s"$batchId is submitted.")
info(s"$batchId is submitted or finished.")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ private[v1] class BatchesResource extends ApiRequestContext with Logging {
userName,
"anonymous",
ipAddress,
request.getConf.asScala.toMap,
request)
} match {
case Success(sessionHandle) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ class KyuubiBatchSession(
batchName: Option[String],
resource: String,
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata] = None,
shouldRunAsync: Boolean)
metadata: Option[Metadata] = None,
fromRecovery: Boolean)
extends KyuubiSession(
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
user,
Expand All @@ -55,11 +54,11 @@ class KyuubiBatchSession(
override val sessionType: SessionType = SessionType.BATCH

override val handle: SessionHandle = {
val batchId = recoveryMetadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
val batchId = metadata.map(_.identifier).getOrElse(conf(KYUUBI_BATCH_ID_KEY))
SessionHandle.fromUUID(batchId)
}

override def createTime: Long = recoveryMetadata.map(_.createTime).getOrElse(super.createTime)
override def createTime: Long = metadata.map(_.createTime).getOrElse(super.createTime)

override def getNoOperationTime: Long = {
if (batchJobSubmissionOp != null && !OperationState.isTerminal(
Expand All @@ -74,7 +73,7 @@ class KyuubiBatchSession(
sessionManager.getConf.get(KyuubiConf.BATCH_SESSION_IDLE_TIMEOUT)

override val normalizedConf: Map[String, String] =
sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(batchConf)
sessionConf.getBatchConf(batchType) ++ sessionManager.validateBatchConf(conf)

val optimizedConf: Map[String, String] = {
val confOverlay = sessionManager.sessionConfAdvisor.getConfOverlay(
Expand All @@ -95,7 +94,7 @@ class KyuubiBatchSession(

// whether the resource file is from uploading
private[kyuubi] val isResourceUploaded: Boolean =
batchConf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean
conf.getOrElse(KyuubiReservedKeys.KYUUBI_BATCH_RESOURCE_UPLOADED_KEY, "false").toBoolean

private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
.newBatchJobSubmissionOperation(
Expand All @@ -106,8 +105,7 @@ class KyuubiBatchSession(
className,
optimizedConf,
batchArgs,
recoveryMetadata,
shouldRunAsync)
metadata)

private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
Expand All @@ -122,7 +120,9 @@ class KyuubiBatchSession(
}

private val sessionEvent = KyuubiSessionEvent(this)
recoveryMetadata.foreach(metadata => sessionEvent.engineId = metadata.engineId)
if (fromRecovery) {
metadata.foreach { m => sessionEvent.engineId = m.engineId }
}
EventBus.post(sessionEvent)

override def getSessionEvent: Option[KyuubiSessionEvent] = {
Expand All @@ -142,32 +142,47 @@ class KyuubiBatchSession(
override def open(): Unit = handleSessionException {
traceMetricsOnOpen()

if (recoveryMetadata.isEmpty) {
lazy val kubernetesInfo: Map[String, String] = {
val appMgrInfo = batchJobSubmissionOp.builder.appMgrInfo()
val kubernetesInfo = appMgrInfo.kubernetesInfo.context.map { context =>
appMgrInfo.kubernetesInfo.context.map { context =>
Map(KyuubiConf.KUBERNETES_CONTEXT.key -> context)
}.getOrElse(Map.empty) ++ appMgrInfo.kubernetesInfo.namespace.map { namespace =>
Map(KyuubiConf.KUBERNETES_NAMESPACE.key -> namespace)
}.getOrElse(Map.empty)
val metaData = Metadata(
identifier = handle.identifier.toString,
sessionType = sessionType,
realUser = realUser,
username = user,
ipAddress = ipAddress,
kyuubiInstance = connectionUrl,
state = OperationState.PENDING.toString,
resource = resource,
className = className,
requestName = name.orNull,
requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info into request conf
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,
clusterManager = batchJobSubmissionOp.builder.clusterManager())

// there is a chance that operation failed w/ duplicated key error
sessionManager.insertMetadata(metaData)
}

(metadata, fromRecovery) match {
case (Some(initialMetadata), false) =>
// new batch job created using batch impl v2
val metadataToUpdate = Metadata(
identifier = initialMetadata.identifier,
kyuubiInstance = connectionUrl,
requestName = name.orNull,
requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info
clusterManager = batchJobSubmissionOp.builder.clusterManager())
sessionManager.updateMetadata(metadataToUpdate)
case (None, _) =>
// new batch job created using batch impl v1
val newMetadata = Metadata(
identifier = handle.identifier.toString,
sessionType = sessionType,
realUser = realUser,
username = user,
ipAddress = ipAddress,
kyuubiInstance = connectionUrl,
state = OperationState.PENDING.toString,
resource = resource,
className = className,
requestName = name.orNull,
requestConf = optimizedConf ++ kubernetesInfo, // save the kubernetes info
requestArgs = batchArgs,
createTime = createTime,
engineType = batchType,
clusterManager = batchJobSubmissionOp.builder.clusterManager())

// there is a chance that operation failed w/ duplicated key error
sessionManager.insertMetadata(newMetadata)
case _ =>
}

checkSessionAccessPathURIs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
batchName: Option[String],
resource: String,
className: String,
batchConf: Map[String, String],
batchArgs: Seq[String],
recoveryMetadata: Option[Metadata] = None,
shouldRunAsync: Boolean): KyuubiBatchSession = {
metadata: Option[Metadata] = None,
fromRecovery: Boolean): KyuubiBatchSession = {
// scalastyle:on
val username = Option(user).filter(_.nonEmpty).getOrElse("anonymous")
val sessionConf = this.getConf.getUserDefaults(user)
Expand All @@ -162,10 +161,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
batchName,
resource,
className,
batchConf,
batchArgs,
recoveryMetadata,
shouldRunAsync)
metadata,
fromRecovery)
}

private[kyuubi] def openBatchSession(batchSession: KyuubiBatchSession): SessionHandle = {
Expand Down Expand Up @@ -202,22 +200,19 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
user: String,
password: String,
ipAddress: String,
conf: Map[String, String],
batchRequest: BatchRequest,
shouldRunAsync: Boolean = true): SessionHandle = {
batchRequest: BatchRequest): SessionHandle = {
val batchSession = createBatchSession(
user,
password,
ipAddress,
conf,
batchRequest.getConf.asScala.toMap,
batchRequest.getBatchType,
Option(batchRequest.getName),
batchRequest.getResource,
batchRequest.getClassName,
batchRequest.getConf.asScala.toMap,
batchRequest.getArgs.asScala.toSeq,
None,
shouldRunAsync)
fromRecovery = false)
openBatchSession(batchSession)
}

Expand Down Expand Up @@ -313,10 +308,9 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
Option(metadata.requestName),
metadata.resource,
metadata.className,
metadata.requestConf,
metadata.requestArgs,
Some(metadata),
shouldRunAsync = true)
fromRecovery = true)
}).getOrElse(Seq.empty)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
"kyuubi",
"passwd",
"localhost",
batchRequest.getConf.asScala.toMap,
batchRequest)

val session = sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]
Expand Down Expand Up @@ -180,7 +179,6 @@ class KyuubiOperationYarnClusterSuite extends WithKyuubiServerOnYarn with HiveJD
"kyuubi",
"passwd",
"localhost",
batchRequest.getConf.asScala.toMap,
batchRequest)

val session = sessionManager.getSession(sessionHandle).asInstanceOf[KyuubiBatchSession]
Expand Down
Loading

0 comments on commit 6a23f88

Please sign in to comment.