From 10c49b3bea089f7e49247a5772d8f2be364f0081 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Sun, 9 Jun 2019 16:28:37 -0500 Subject: [PATCH 1/3] [SPARK-27872][K8S] Fix executor service account inconsistency Fixes the service account inconsistency that breaks pull secrets. It gives the option to the user to setup a specific service account for the executors if he has to (via `spark.kubernetes.authenticate.executor.serviceAccountName`). Defaults to the driver's one. We are not supporting special authentication credentials for the executors with this PR. Tested manually by launching a Spark job exercising the introduced settings. Added a new integration tests for this fix. Closes #24748 from skonto/fix_executor_sa. Authored-by: Stavros Kontopoulos Signed-off-by: Sean Owen (cherry picked from commit 7912ab85a6fc086b814d93e7c71af1f50515517a) --- .../org/apache/spark/deploy/k8s/Config.scala | 7 ++-- .../spark/deploy/k8s/KubernetesUtils.scala | 28 ++++++++++++++ ...iverKubernetesCredentialsFeatureStep.scala | 11 +----- ...utorKubernetesCredentialsFeatureStep.scala | 37 +++++++++++++++++++ .../k8s/integrationtest/BasicTestsSuite.scala | 7 ++++ .../k8s/integrationtest/KubernetesSuite.scala | 4 ++ 6 files changed, 81 insertions(+), 13 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorKubernetesCredentialsFeatureStep.scala diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index c7338a721595f..02471af989743 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -61,10 +61,9 @@ private[spark] object Config extends Logging { .stringConf .createOptional - val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = - "spark.kubernetes.authenticate.driver" - val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = - "spark.kubernetes.authenticate.driver.mounted" + val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = "spark.kubernetes.authenticate.driver" + val KUBERNETES_AUTH_EXECUTOR_CONF_PREFIX = "spark.kubernetes.authenticate.executor" + val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = "spark.kubernetes.authenticate.driver.mounted" val KUBERNETES_AUTH_CLIENT_MODE_PREFIX = "spark.kubernetes.authenticate" val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 588cd9d40f9a0..511f5d37252ae 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -60,4 +60,32 @@ private[spark] object KubernetesUtils { } def parseMasterUrl(url: String): String = url.substring("k8s://".length) + + /** + * Upload a file to a Hadoop-compatible filesystem. + */ + private def uploadFileToHadoopCompatibleFS( + src: Path, + dest: Path, + fs: FileSystem, + delSrc : Boolean = false, + overwrite: Boolean = true): Unit = { + try { + fs.copyFromLocalFile(false, true, src, dest) + } catch { + case e: IOException => + throw new SparkException(s"Error uploading file ${src.getName}", e) + } + } + + def buildPodWithServiceAccount(serviceAccount: Option[String], pod: SparkPod): Option[Pod] = { + serviceAccount.map { account => + new PodBuilder(pod.pod) + .editOrNewSpec() + .withServiceAccount(account) + .withServiceAccountName(account) + .endSpec() + .build() + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala index ff5ad6673b309..60793817ec405 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala @@ -27,6 +27,7 @@ import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilde import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesUtils.buildPodWithServiceAccount private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf[_]) extends KubernetesFeatureConfigStep { @@ -70,15 +71,7 @@ private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube override def configurePod(pod: SparkPod): SparkPod = { if (!shouldMountSecret) { - pod.copy( - pod = driverServiceAccount.map { account => - new PodBuilder(pod.pod) - .editOrNewSpec() - .withServiceAccount(account) - .withServiceAccountName(account) - .endSpec() - .build() - }.getOrElse(pod.pod)) + pod.copy(pod = buildPodWithServiceAccount(driverServiceAccount, pod).getOrElse(pod.pod)) } else { val driverPodWithMountedKubernetesCredentials = new PodBuilder(pod.pod) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorKubernetesCredentialsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorKubernetesCredentialsFeatureStep.scala new file mode 100644 index 0000000000000..8b84aad553855 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExecutorKubernetesCredentialsFeatureStep.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import org.apache.spark.deploy.k8s.{KubernetesConf, SparkPod} +import org.apache.spark.deploy.k8s.Config.KUBERNETES_SERVICE_ACCOUNT_NAME +import org.apache.spark.deploy.k8s.KubernetesUtils.buildPodWithServiceAccount + +private[spark] class ExecutorKubernetesCredentialsFeatureStep(kubernetesConf: KubernetesConf) + extends KubernetesFeatureConfigStep { + private lazy val driverServiceAccount = kubernetesConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configurePod(pod: SparkPod): SparkPod = { + pod.copy( + // if not setup by the pod template fallback to the driver's sa, + // last option is the default sa. + pod = if (Option(pod.pod.getSpec.getServiceAccount).isEmpty) { + buildPodWithServiceAccount(driverServiceAccount, pod).getOrElse(pod.pod) + } else { + pod.pod + }) + } +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala index 4e749c40563dc..efa17e2451a0e 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala @@ -81,6 +81,13 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => }) } + test("All pods have the same service account by default", k8sTestTag) { + runSparkPiAndVerifyCompletion( + executorPodChecker = (executorPod: Pod) => { + doExecutorServiceAccountCheck(executorPod, kubernetesTestComponents.serviceAccountName) + }) + } + test("Run extraJVMOptions check on driver", k8sTestTag) { sparkAppConf .set("spark.driver.extraJavaOptions", "-Dspark.test.foo=spark.test.bar") diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index e6840ce818c1f..1c8962cc75105 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -263,6 +263,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite === baseMemory) } + protected def doExecutorServiceAccountCheck(executorPod: Pod, account: String): Unit = { + doBasicExecutorPodCheck(executorPod) + assert(executorPod.getSpec.getServiceAccount == kubernetesTestComponents.serviceAccountName) + } protected def doBasicDriverPyPodCheck(driverPod: Pod): Unit = { assert(driverPod.getMetadata.getName === driverPodName) From b6a13844340e12c4c000237132d6ce8ad7f2642b Mon Sep 17 00:00:00 2001 From: Greg Sterin Date: Thu, 17 Sep 2020 21:57:49 -0700 Subject: [PATCH 2/3] Unneeded func --- .../spark/deploy/k8s/KubernetesUtils.scala | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 511f5d37252ae..7d6f0f1af151e 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -61,23 +61,6 @@ private[spark] object KubernetesUtils { def parseMasterUrl(url: String): String = url.substring("k8s://".length) - /** - * Upload a file to a Hadoop-compatible filesystem. - */ - private def uploadFileToHadoopCompatibleFS( - src: Path, - dest: Path, - fs: FileSystem, - delSrc : Boolean = false, - overwrite: Boolean = true): Unit = { - try { - fs.copyFromLocalFile(false, true, src, dest) - } catch { - case e: IOException => - throw new SparkException(s"Error uploading file ${src.getName}", e) - } - } - def buildPodWithServiceAccount(serviceAccount: Option[String], pod: SparkPod): Option[Pod] = { serviceAccount.map { account => new PodBuilder(pod.pod) From c19cd14ca6289d63082d789799fb1b36d8ec4411 Mon Sep 17 00:00:00 2001 From: Greg Sterin Date: Thu, 17 Sep 2020 22:08:51 -0700 Subject: [PATCH 3/3] Add explicit ExecutorCredentialsFeature --- .../spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 364b6fb367722..3a9522441808c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -51,7 +51,7 @@ private[spark] class KubernetesExecutorBuilder( Seq(provideVolumesStep(kubernetesConf)) } else Nil - val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature + val allFeatures = baseFeatures ++ secretFeature ++ secretEnvFeature ++ volumesFeature ++ Seq(new ExecutorKubernetesCredentialsFeatureStep(kubernetesConf)) var executorPod = SparkPod.initialPod() for (feature <- allFeatures) {