Skip to content

Commit

Permalink
Exited spark-submit process should not block batch submit queue
Browse files Browse the repository at this point in the history
  • Loading branch information
pan3793 committed Jan 30, 2024
1 parent 3f993f4 commit 148ebf8
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import java.nio.file.{Files, Path, Paths}

import scala.collection.JavaConverters._

import com.google.common.annotations.VisibleForTesting
import com.google.common.collect.EvictingQueue
import org.apache.commons.lang3.StringUtils.containsIgnoreCase

Expand Down Expand Up @@ -166,8 +165,7 @@ trait ProcBuilder {
// Visible for test
@volatile private[kyuubi] var logCaptureThreadReleased: Boolean = true
private var logCaptureThread: Thread = _
private var process: Process = _
@VisibleForTesting
@volatile private[kyuubi] var process: Process = _
@volatile private[kyuubi] var processLaunched: Boolean = _

private[kyuubi] lazy val engineLog: File = ProcBuilder.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class BatchJobSubmission(
getOperationLog)
}

def startupProcessAlive: Boolean =
builder.processLaunched && Option(builder.process).exists(_.isAlive)

override def currentApplicationInfo(): Option[ApplicationInfo] = {
if (isTerminal(state) && _applicationInfo.map(_.state).exists(ApplicationState.isTerminated)) {
return _applicationInfo
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,13 @@ class KyuubiBatchService(
metadata.appState match {
// app that is not submitted to resource manager
case None | Some(ApplicationState.NOT_FOUND) => false
// app that is pending in resource manager
case Some(ApplicationState.PENDING) => false
// app that is pending in resource manager while the local startup
// process is alive. For example, in Spark YARN cluster mode, if set
// spark.yarn.submit.waitAppCompletion=false, the local spark-submit
// process exits immediately once Application goes ACCEPTED status,
// even no resource could be allocated for the AM container.
case Some(ApplicationState.PENDING) if batchSession.startupProcessAlive =>
false
// not sure, added for safe
case Some(ApplicationState.UNKNOWN) => false
case _ => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ class KyuubiBatchSession(
batchArgs,
metadata)

def startupProcessAlive: Boolean = batchJobSubmissionOp.startupProcessAlive

private def waitMetadataRequestsRetryCompletion(): Unit = {
val batchId = batchJobSubmissionOp.batchId
sessionManager.getMetadataRequestsRetryRef(batchId).foreach {
Expand Down

0 comments on commit 148ebf8

Please sign in to comment.