diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md index b14a8ada1f8..a1ff4fdfcd3 100644 --- a/docs/configuration/settings.md +++ b/docs/configuration/settings.md @@ -310,23 +310,25 @@ You can configure the Kyuubi properties in `$KYUUBI_HOME/conf/kyuubi-defaults.co ### Kubernetes -| Key | Default | Meaning | Type | Since | -|----------------------------------------------------------------------|-------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| -| kyuubi.kubernetes.authenticate.caCertFile | <undefined> | Path to the CA cert file for connecting to the Kubernetes API server over TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 | -| kyuubi.kubernetes.authenticate.clientCertFile | <undefined> | Path to the client cert file for connecting to the Kubernetes API server over TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 | -| kyuubi.kubernetes.authenticate.clientKeyFile | <undefined> | Path to the client key file for connecting to the Kubernetes API server over TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 | -| kyuubi.kubernetes.authenticate.oauthToken | <undefined> | The OAuth token to use when authenticating against the Kubernetes API server. Note that unlike, the other authentication options, this must be the exact string value of the token to use for the authentication. | string | 1.7.0 | -| kyuubi.kubernetes.authenticate.oauthTokenFile | <undefined> | Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 | -| kyuubi.kubernetes.context | <undefined> | The desired context from your kubernetes config file used to configure the K8s client for interacting with the cluster. | string | 1.6.0 | -| kyuubi.kubernetes.context.allow.list || The allowed kubernetes context list, if it is empty, there is no kubernetes context limitation. | set | 1.8.0 | -| kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 | -| kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 | -| kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 | -| kyuubi.kubernetes.spark.deleteDriverPodOnTermination.enabled | false | If set to true then Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. | boolean | 1.8.1 | -| kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled | false | Whether to forcibly rewrite Spark driver pod name with 'kyuubi--driver'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 | -| kyuubi.kubernetes.spark.forciblyRewriteExecutorPodNamePrefix.enabled | false | Whether to forcibly rewrite Spark executor pod name prefix with 'kyuubi-'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter Pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 | -| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M | The period for which the Kyuubi server retains application information after the application terminates. | duration | 1.7.1 | -| kyuubi.kubernetes.trust.certificates | false | If set to true then client can submit to kubernetes cluster only with token | boolean | 1.7.0 | +| Key | Default | Meaning | Type | Since | +|----------------------------------------------------------------------|-------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|-------| +| kyuubi.kubernetes.application.state.container | spark-kubernetes-driver | The container name to retrieve the application state from. | string | 1.8.1 | +| kyuubi.kubernetes.application.state.source | POD | The source to retrieve the application state from. The valid values are pod and container. If the source is container and there is container inside the pod with the name of kyuubi.kubernetes.application.state.container, the application state will be from the matched container state. Otherwise, the application state will be from the pod state. | string | 1.8.1 | +| kyuubi.kubernetes.authenticate.caCertFile | <undefined> | Path to the CA cert file for connecting to the Kubernetes API server over TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 | +| kyuubi.kubernetes.authenticate.clientCertFile | <undefined> | Path to the client cert file for connecting to the Kubernetes API server over TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 | +| kyuubi.kubernetes.authenticate.clientKeyFile | <undefined> | Path to the client key file for connecting to the Kubernetes API server over TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 | +| kyuubi.kubernetes.authenticate.oauthToken | <undefined> | The OAuth token to use when authenticating against the Kubernetes API server. Note that unlike, the other authentication options, this must be the exact string value of the token to use for the authentication. | string | 1.7.0 | +| kyuubi.kubernetes.authenticate.oauthTokenFile | <undefined> | Path to the file containing the OAuth token to use when authenticating against the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do not provide a scheme) | string | 1.7.0 | +| kyuubi.kubernetes.context | <undefined> | The desired context from your kubernetes config file used to configure the K8s client for interacting with the cluster. | string | 1.6.0 | +| kyuubi.kubernetes.context.allow.list || The allowed kubernetes context list, if it is empty, there is no kubernetes context limitation. | set | 1.8.0 | +| kyuubi.kubernetes.master.address | <undefined> | The internal Kubernetes master (API server) address to be used for kyuubi. | string | 1.7.0 | +| kyuubi.kubernetes.namespace | default | The namespace that will be used for running the kyuubi pods and find engines. | string | 1.7.0 | +| kyuubi.kubernetes.namespace.allow.list || The allowed kubernetes namespace list, if it is empty, there is no kubernetes namespace limitation. | set | 1.8.0 | +| kyuubi.kubernetes.spark.deleteDriverPodOnTermination.enabled | false | If set to true then Kyuubi server will delete the spark driver pod after the application terminates for kyuubi.kubernetes.terminatedApplicationRetainPeriod. | boolean | 1.8.1 | +| kyuubi.kubernetes.spark.forciblyRewriteDriverPodName.enabled | false | Whether to forcibly rewrite Spark driver pod name with 'kyuubi--driver'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 | +| kyuubi.kubernetes.spark.forciblyRewriteExecutorPodNamePrefix.enabled | false | Whether to forcibly rewrite Spark executor pod name prefix with 'kyuubi-'. If disabled, Kyuubi will try to preserve the application name while satisfying K8s' pod name policy, but some vendors may have stricter Pod name policies, thus the generated name may become illegal. | boolean | 1.8.1 | +| kyuubi.kubernetes.terminatedApplicationRetainPeriod | PT5M | The period for which the Kyuubi server retains application information after the application terminates. | duration | 1.7.1 | +| kyuubi.kubernetes.trust.certificates | false | If set to true then client can submit to kubernetes cluster only with token | boolean | 1.7.0 | ### Lineage diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala index 373c0190b51..e412bb96579 100644 --- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala +++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala @@ -1239,6 +1239,30 @@ object KyuubiConf { .booleanConf .createWithDefault(false) + val KUBERNETES_APPLICATION_STATE_CONTAINER: ConfigEntry[String] = + buildConf("kyuubi.kubernetes.application.state.container") + .doc("The container name to retrieve the application state from.") + .version("1.8.1") + .stringConf + .createWithDefault("spark-kubernetes-driver") + + val KUBERNETES_APPLICATION_STATE_SOURCE: ConfigEntry[String] = + buildConf("kyuubi.kubernetes.application.state.source") + .doc("The source to retrieve the application state from. The valid values are " + + "pod and container. If the source is container and there is container inside the pod " + + s"with the name of ${KUBERNETES_APPLICATION_STATE_CONTAINER.key}, the application state " + + s"will be from the matched container state. " + + s"Otherwise, the application state will be from the pod state.") + .version("1.8.1") + .stringConf + .checkValues(KubernetesApplicationStateSource) + .createWithDefault(KubernetesApplicationStateSource.POD.toString) + + object KubernetesApplicationStateSource extends Enumeration { + type KubernetesApplicationStateSource = Value + val POD, CONTAINER = Value + } + // /////////////////////////////////////////////////////////////////////////////////////////////// // SQL Engine Configuration // // /////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala index 731b9d7b5ba..64569f7d82a 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationAuditLogger.scala @@ -20,14 +20,19 @@ package org.apache.kyuubi.engine import io.fabric8.kubernetes.api.model.Pod import org.apache.kyuubi.Logging -import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL} +import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource +import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationStateAndError, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL} object KubernetesApplicationAuditLogger extends Logging { final private val AUDIT_BUFFER = new ThreadLocal[StringBuilder]() { override protected def initialValue: StringBuilder = new StringBuilder() } - def audit(kubernetesInfo: KubernetesInfo, pod: Pod): Unit = { + def audit( + kubernetesInfo: KubernetesInfo, + pod: Pod, + appStateSource: KubernetesApplicationStateSource, + appStateContainer: String): Unit = { val sb = AUDIT_BUFFER.get() sb.setLength(0) sb.append(s"label=${pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY)}").append("\t") @@ -35,7 +40,10 @@ object KubernetesApplicationAuditLogger extends Logging { sb.append(s"namespace=${kubernetesInfo.namespace.orNull}").append("\t") sb.append(s"pod=${pod.getMetadata.getName}").append("\t") sb.append(s"appId=${pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL)}").append("\t") - sb.append(s"appState=${toApplicationState(pod.getStatus.getPhase)}") + val (appState, appError) = + toApplicationStateAndError(pod, appStateSource, appStateContainer) + sb.append(s"appState=$appState").append("\t") + sb.append(s"appError='${appError.getOrElse("")}'") info(sb.toString()) } } diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala index dcdadf983fa..95f68d4b639 100644 --- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala +++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala @@ -24,17 +24,19 @@ import scala.collection.JavaConverters._ import scala.util.control.NonFatal import com.google.common.cache.{Cache, CacheBuilder, RemovalNotification} -import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.api.model.{ContainerState, Pod} import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.informers.{ResourceEventHandler, SharedIndexInformer} import org.apache.kyuubi.{KyuubiException, Logging, Utils} import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource +import org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.KubernetesApplicationStateSource import org.apache.kyuubi.engine.ApplicationState.{isTerminated, ApplicationState, FAILED, FINISHED, NOT_FOUND, PENDING, RUNNING, UNKNOWN} -import org.apache.kyuubi.engine.KubernetesApplicationOperation.{toApplicationState, toLabel, LABEL_KYUUBI_UNIQUE_KEY, SPARK_APP_ID_LABEL} import org.apache.kyuubi.util.KubernetesUtils class KubernetesApplicationOperation extends ApplicationOperation with Logging { + import KubernetesApplicationOperation._ private val kubernetesClients: ConcurrentHashMap[KubernetesInfo, KubernetesClient] = new ConcurrentHashMap[KubernetesInfo, KubernetesClient] @@ -49,6 +51,12 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { private def allowedNamespaces: Set[String] = kyuubiConf.get(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST) + private def appStateSource: KubernetesApplicationStateSource = + KubernetesApplicationStateSource.withName( + kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_SOURCE)) + private def appStateContainer: String = + kyuubiConf.get(KyuubiConf.KUBERNETES_APPLICATION_STATE_CONTAINER) + // key is kyuubi_unique_key private val appInfoStore: ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)] = new ConcurrentHashMap[String, (KubernetesInfo, ApplicationInfo)] @@ -244,18 +252,26 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { override def onAdd(pod: Pod): Unit = { if (isSparkEnginePod(pod)) { updateApplicationState(kubernetesInfo, pod) - KubernetesApplicationAuditLogger.audit(kubernetesInfo, pod) + KubernetesApplicationAuditLogger.audit( + kubernetesInfo, + pod, + appStateSource, + appStateContainer) } } override def onUpdate(oldPod: Pod, newPod: Pod): Unit = { if (isSparkEnginePod(newPod)) { updateApplicationState(kubernetesInfo, newPod) - val appState = toApplicationState(newPod.getStatus.getPhase) + val appState = toApplicationState(newPod, appStateSource, appStateContainer) if (isTerminated(appState)) { markApplicationTerminated(newPod) } - KubernetesApplicationAuditLogger.audit(kubernetesInfo, newPod) + KubernetesApplicationAuditLogger.audit( + kubernetesInfo, + newPod, + appStateSource, + appStateContainer) } } @@ -263,7 +279,11 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { if (isSparkEnginePod(pod)) { updateApplicationState(kubernetesInfo, pod) markApplicationTerminated(pod) - KubernetesApplicationAuditLogger.audit(kubernetesInfo, pod) + KubernetesApplicationAuditLogger.audit( + kubernetesInfo, + pod, + appStateSource, + appStateContainer) } } } @@ -274,7 +294,8 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { } private def updateApplicationState(kubernetesInfo: KubernetesInfo, pod: Pod): Unit = { - val appState = toApplicationState(pod.getStatus.getPhase) + val (appState, appError) = + toApplicationStateAndError(pod, appStateSource, appStateContainer) debug(s"Driver Informer changes pod: ${pod.getMetadata.getName} to state: $appState") appInfoStore.put( pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY), @@ -282,13 +303,15 @@ class KubernetesApplicationOperation extends ApplicationOperation with Logging { id = pod.getMetadata.getLabels.get(SPARK_APP_ID_LABEL), name = pod.getMetadata.getName, state = appState, - error = Option(pod.getStatus.getReason))) + error = appError)) } private def markApplicationTerminated(pod: Pod): Unit = synchronized { val key = pod.getMetadata.getLabels.get(LABEL_KYUUBI_UNIQUE_KEY) if (cleanupTerminatedAppInfoTrigger.getIfPresent(key) == null) { - cleanupTerminatedAppInfoTrigger.put(key, toApplicationState(pod.getStatus.getPhase)) + cleanupTerminatedAppInfoTrigger.put( + key, + toApplicationState(pod, appStateSource, appStateContainer)) } } } @@ -301,16 +324,60 @@ object KubernetesApplicationOperation extends Logging { def toLabel(tag: String): String = s"label: $LABEL_KYUUBI_UNIQUE_KEY=$tag" - def toApplicationState(state: String): ApplicationState = state match { - // https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/types.go#L2396 - // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/ + def toApplicationState( + pod: Pod, + appStateSource: KubernetesApplicationStateSource, + appStateContainer: String): ApplicationState = { + toApplicationStateAndError(pod, appStateSource, appStateContainer)._1 + } + + def toApplicationStateAndError( + pod: Pod, + appStateSource: KubernetesApplicationStateSource, + appStateContainer: String): (ApplicationState, Option[String]) = { + val containerStateToBuildAppState = appStateSource match { + case KubernetesApplicationStateSource.CONTAINER => + pod.getStatus.getContainerStatuses.asScala + .find(_.getState == appStateContainer).map(_.getState) + case KubernetesApplicationStateSource.POD => None + } + val applicationState = containerStateToBuildAppState.map(containerStateToApplicationState) + .getOrElse(podStateToApplicationState(pod.getStatus.getPhase)) + val applicationError = containerStateToBuildAppState.map(containerStateToApplicationError) + .getOrElse(Option(pod.getStatus.getReason)) + applicationState -> applicationError + } + + def containerStateToApplicationState(containerState: ContainerState): ApplicationState = { + // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#container-states + if (containerState.getWaiting != null) { + PENDING + } else if (containerState.getRunning != null) { + RUNNING + } else if (containerState.getTerminated == null) { + UNKNOWN + } else if (containerState.getTerminated.getExitCode == 0) { + FINISHED + } else { + FAILED + } + } + + def containerStateToApplicationError(containerState: ContainerState): Option[String] = { + // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#container-states + Option(containerState.getWaiting).map(_.getReason) + .orElse(Option(containerState.getTerminated).map(_.getReason)) + } + + def podStateToApplicationState(podState: String): ApplicationState = podState match { + // https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase case "Pending" => PENDING case "Running" => RUNNING case "Succeeded" => FINISHED case "Failed" | "Error" => FAILED case "Unknown" => UNKNOWN case _ => - warn(s"The kubernetes driver pod state: $state is not supported, " + + warn(s"The spark driver pod state: $podState is not supported, " + "mark the application state as UNKNOWN.") UNKNOWN }