diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 38b57c1a01279..0da9a04684fe7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -573,7 +573,7 @@ object SparkSubmit extends CommandLineUtils { } // assure a keytab is available from any place in a JVM - if (clusterManager == YARN || clusterManager == LOCAL) { + if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) { if (args.principal != null) { require(args.keytab != null, "Keytab must be specified when principal is specified") if (!new File(args.keytab).exists()) { diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 99d356044b146..1b070973afe7e 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -752,6 +752,61 @@ from the other deployment modes. See the [configuration page](configuration.html + spark.kubernetes.kerberos.enabled + false + + Specify whether your job requires a Kerberos Authentication to access HDFS. By default, we + will assume that you will not require secure HDFS access. + + + + spark.kubernetes.kerberos.keytab + (none) + + Assuming you have set spark.kubernetes.kerberos.enabled to be true. This will let you specify + the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you + may login by running kinit before running the spark-submit, and the submission client + will look within your local TGT cache to resolve this. + + + + spark.kubernetes.kerberos.principal + (none) + + Assuming you have set spark.kubernetes.kerberos.enabled to be true. This will let you specify + your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you + may login by running kinit before running the spark-submit, and the submission client + will look within your local TGT cache to resolve this. + + + + spark.kubernetes.kerberos.renewer.principal + (none) + + Assuming you have set spark.kubernetes.kerberos.enabled to be true. This will let you specify + the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as + we will set the principal to be the job users principal by default. + + + + spark.kubernetes.kerberos.tokensecret.name + (none) + + Assuming you have set spark.kubernetes.kerberos.enabled to be true. This will let you specify + the name of the secret where your existing delegation token data is stored. You must also specify the + item key spark.kubernetes.kerberos.tokensecret.itemkey where your data is stored on the secret. + This is optional in the case that you want to use pre-existing secret, otherwise a new secret will be automatically + created. + + + + spark.kubernetes.kerberos.tokensecret.itemkey + spark.kubernetes.kerberos.dt.label + + Assuming you have set spark.kubernetes.kerberos.enabled to be true. This will let you specify + the data item key name within the pre-specified secret where the data of your existing delegation token data is stored. + We have a default value of spark.kubernetes.kerberos.tokensecret.itemkey should you not include it. But + you should always include this if you are proposing a pre-existing secret contain the delegation token data. spark.executorEnv.[EnvironmentVariableName] (none) @@ -791,4 +846,3 @@ from the other deployment modes. See the [configuration page](configuration.html Running Spark on Kubernetes is currently an experimental feature. Some restrictions on the current implementation that should be lifted in the future include: * Applications can only run in cluster mode. -* Only Scala and Java applications can be run. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala new file mode 100644 index 0000000000000..ad6c805c20b2e --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfBootstrap.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.io.File + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder} + +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.internal.Logging + +/** + * This is separated out from the HadoopConf steps API because this component can be reused to + * set up the Hadoop Configuration for executors as well. + */ +private[spark] trait HadoopConfBootstrap { + /** + * Bootstraps a main container with the ConfigMaps containing Hadoop config files + * mounted as volumes and an ENV variable pointing to the mounted file. + */ + def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer) + : PodWithMainContainer +} + +private[spark] class HadoopConfBootstrapImpl( + hadoopConfConfigMapName: String, + hadoopConfigFiles: Seq[File], + hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging { + + override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer) + : PodWithMainContainer = { + logInfo("HADOOP_CONF_DIR defined. Mounting Hadoop specific files") + val keyPaths = hadoopConfigFiles.map { file => + val fileStringPath = file.toPath.getFileName.toString + new KeyToPathBuilder() + .withKey(fileStringPath) + .withPath(fileStringPath) + .build() } + val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod) + .editSpec() + .addNewVolume() + .withName(HADOOP_FILE_VOLUME) + .withNewConfigMap() + .withName(hadoopConfConfigMapName) + .withItems(keyPaths.asJava) + .endConfigMap() + .endVolume() + .endSpec() + .build() + val hadoopSupportedContainer = new ContainerBuilder( + originalPodWithMainContainer.mainContainer) + .addNewVolumeMount() + .withName(HADOOP_FILE_VOLUME) + .withMountPath(HADOOP_CONF_DIR_PATH) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_CONF_DIR) + .withValue(HADOOP_CONF_DIR_PATH) + .endEnv() + .build() + + originalPodWithMainContainer.copy( + pod = hadoopSupportedPod, + mainContainer = hadoopSupportedContainer) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfSparkUserBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfSparkUserBootstrap.scala new file mode 100644 index 0000000000000..6ddabbbf1e0c8 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopConfSparkUserBootstrap.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.ContainerBuilder + +import org.apache.spark.deploy.k8s.constants._ + +// This trait is responsible for setting ENV_SPARK_USER when HADOOP_FILES are detected +// however, this step would not be run if Kerberos is enabled, as Kerberos sets SPARK_USER +private[spark] trait HadoopConfSparkUserBootstrap { + def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer) + : PodWithMainContainer +} + +private[spark] class HadoopConfSparkUserBootstrapImpl(hadoopUGIUtil: HadoopUGIUtil) + extends HadoopConfSparkUserBootstrap { + + override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer) + : PodWithMainContainer = { + val envModifiedContainer = new ContainerBuilder( + originalPodWithMainContainer.mainContainer) + .addNewEnv() + .withName(ENV_SPARK_USER) + .withValue(hadoopUGIUtil.getShortUserName) + .endEnv() + .build() + originalPodWithMainContainer.copy( + pod = originalPodWithMainContainer.pod, + mainContainer = envModifiedContainer) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopUGIUtil.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopUGIUtil.scala new file mode 100644 index 0000000000000..4dcfe6642d919 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/HadoopUGIUtil.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} + +import scala.util.Try + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier + +import org.apache.spark.util.{Clock, SystemClock, Utils} + +private[spark] trait HadoopUGIUtil { + def getCurrentUser: UserGroupInformation + def getShortUserName: String + def getFileSystem(hadoopConf: Configuration): FileSystem + def isSecurityEnabled: Boolean + def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String) : + UserGroupInformation + def dfsAddDelegationToken(fileSystem: FileSystem, + hadoopConf: Configuration, + renewer: String, + creds: Credentials) : Iterable[Token[_ <: TokenIdentifier]] + def getCurrentTime: Long + def getTokenRenewalInterval( + renewedTokens: Iterable[Token[_ <: TokenIdentifier]], + hadoopConf: Configuration) : Option[Long] + def serialize(creds: Credentials): Array[Byte] + def deserialize(tokenBytes: Array[Byte]): Credentials +} + +private[spark] class HadoopUGIUtilImpl extends HadoopUGIUtil { + + private val clock: Clock = new SystemClock() + def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser + def getShortUserName : String = getCurrentUser.getShortUserName + def getFileSystem(hadoopConf: Configuration): FileSystem = FileSystem.get(hadoopConf) + def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled + + def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation = + UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) + + def dfsAddDelegationToken(fileSystem: FileSystem, + hadoopConf: Configuration, + renewer: String, + creds: Credentials) : Iterable[Token[_ <: TokenIdentifier]] = + fileSystem.addDelegationTokens(renewer, creds) + + def getCurrentTime: Long = clock.getTimeMillis() + + // Functions that should be in Core with Rebase to 2.3 + @deprecated("Moved to core in 2.3", "2.3") + def getTokenRenewalInterval( + renewedTokens: Iterable[Token[_ <: TokenIdentifier]], + hadoopConf: Configuration): Option[Long] = { + val renewIntervals = renewedTokens.filter { + _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] + }.flatMap { token => + Try { + val newExpiration = token.renew(hadoopConf) + val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] + val interval = newExpiration - identifier.getIssueDate + interval + }.toOption + } + renewIntervals.reduceLeftOption(_ min _) + } + + @deprecated("Moved to core in 2.3", "2.3") + def serialize(creds: Credentials): Array[Byte] = { + Utils.tryWithResource(new ByteArrayOutputStream()) { byteStream => + Utils.tryWithResource(new DataOutputStream(byteStream)) { dataStream => + creds.writeTokenStorageToStream(dataStream) + } + byteStream.toByteArray + } + } + + @deprecated("Moved to core in 2.3", "2.3") + def deserialize(tokenBytes: Array[Byte]): Credentials = { + val creds = new Credentials() + Utils.tryWithResource(new ByteArrayInputStream(tokenBytes)) { byteStream => + Utils.tryWithResource(new DataInputStream(byteStream)) { dataStream => + creds.readTokenStorageStream(dataStream) + } + } + creds + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KerberosTokenConfBootstrap.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KerberosTokenConfBootstrap.scala new file mode 100644 index 0000000000000..c873f72aac956 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KerberosTokenConfBootstrap.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder} + +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.internal.Logging + + + /** + * This is separated out from the HadoopConf steps API because this component can be reused to + * mounted the DT secret for executors as well. + */ +private[spark] trait KerberosTokenConfBootstrap { + // Bootstraps a main container with the Secret mounted as volumes and an ENV variable + // pointing to the mounted file containing the DT for Secure HDFS interaction + def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer) + : PodWithMainContainer +} + +private[spark] class KerberosTokenConfBootstrapImpl( + secretName: String, + secretItemKey: String, + userName: String) extends KerberosTokenConfBootstrap with Logging { + + override def bootstrapMainContainerAndVolumes( + originalPodWithMainContainer: PodWithMainContainer) : PodWithMainContainer = { + logInfo(s"Mounting HDFS DT from Secret $secretName for Secure HDFS") + val secretMountedPod = new PodBuilder(originalPodWithMainContainer.pod) + .editOrNewSpec() + .addNewVolume() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withNewSecret() + .withSecretName(secretName) + .endSecret() + .endVolume() + .endSpec() + .build() + // TODO: ENV_HADOOP_TOKEN_FILE_LOCATION should point to the latest token data item key. + val secretMountedContainer = new ContainerBuilder( + originalPodWithMainContainer.mainContainer) + .addNewVolumeMount() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR) + .endVolumeMount() + .addNewEnv() + .withName(ENV_HADOOP_TOKEN_FILE_LOCATION) + .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey") + .endEnv() + .addNewEnv() + .withName(ENV_SPARK_USER) + .withValue(userName) + .endEnv() + .build() + originalPodWithMainContainer.copy( + pod = secretMountedPod, + mainContainer = secretMountedContainer) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala index 0e35e04ff5803..e395fed810a3d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/config.scala @@ -496,6 +496,49 @@ package object config extends Logging { private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." + private[spark] val KUBERNETES_KERBEROS_SUPPORT = + ConfigBuilder("spark.kubernetes.kerberos.enabled") + .doc("Specify whether your job is a job that will require a Delegation Token to access HDFS") + .booleanConf + .createWithDefault(false) + + private[spark] val KUBERNETES_KERBEROS_KEYTAB = + ConfigBuilder("spark.kubernetes.kerberos.keytab") + .doc("Specify the location of keytab " + + "for Kerberos in order to access Secure HDFS") + .stringConf + .createOptional + + private[spark] val KUBERNETES_KERBEROS_PRINCIPAL = + ConfigBuilder("spark.kubernetes.kerberos.principal") + .doc("Specify the principal " + + "for Kerberos in order to access Secure HDFS") + .stringConf + .createOptional + + private[spark] val KUBERNETES_KERBEROS_RENEWER_PRINCIPAL = + ConfigBuilder("spark.kubernetes.kerberos.renewer.principal") + .doc("Specify the principal " + + "you wish to renew and retrieve your Kerberos values with") + .stringConf + .createOptional + + private[spark] val KUBERNETES_KERBEROS_DT_SECRET_NAME = + ConfigBuilder("spark.kubernetes.kerberos.tokensecret.name") + .doc("Specify the name of the secret where " + + "your existing delegation token is stored. This removes the need " + + "for the job user to provide any keytab for launching a job") + .stringConf + .createOptional + + private[spark] val KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY = + ConfigBuilder("spark.kubernetes.kerberos.tokensecret.itemkey") + .doc("Specify the item key of the data where " + + "your existing delegation token is stored. This removes the need " + + "for the job user to provide any keytab for launching a job") + .stringConf + .createOptional + private[spark] def resolveK8sMaster(rawMasterString: String): String = { if (!rawMasterString.startsWith("k8s://")) { throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala index 26cdcaa7f67c8..939ac8533553c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/constants.scala @@ -43,6 +43,10 @@ package object constants { s"$DRIVER_CREDENTIALS_SECRETS_BASE_DIR/$DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME" private[spark] val DRIVER_CREDENTIALS_SECRET_VOLUME_NAME = "kubernetes-credentials" + // Hadoop credentials secrets for the Spark app. + private[spark] val SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR = "/mnt/secrets/hadoop-credentials" + private[spark] val SPARK_APP_HADOOP_SECRET_VOLUME_NAME = "hadoop-secret" + // Default and fixed ports private[spark] val DEFAULT_DRIVER_PORT = 7078 private[spark] val DEFAULT_BLOCKMANAGER_PORT = 7079 @@ -73,6 +77,8 @@ package object constants { private[spark] val ENV_R_FILE = "R_FILE" private[spark] val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" private[spark] val ENV_MOUNTED_FILES_FROM_SECRET_DIR = "SPARK_MOUNTED_FILES_FROM_SECRET_DIR" + private[spark] val ENV_HADOOP_TOKEN_FILE_LOCATION = "HADOOP_TOKEN_FILE_LOCATION" + private[spark] val ENV_SPARK_USER = "SPARK_USER" // Bootstrapping dependencies with the init-container private[spark] val INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH = @@ -94,6 +100,31 @@ package object constants { private[spark] val DEFAULT_SHUFFLE_MOUNT_NAME = "shuffle" private[spark] val INIT_CONTAINER_SECRET_VOLUME_NAME = "spark-init-secret" + // Hadoop Configuration + private[spark] val HADOOP_FILE_VOLUME = "hadoop-properties" + private[spark] val HADOOP_CONF_DIR_PATH = "/etc/hadoop/conf" + private[spark] val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR" + private[spark] val HADOOP_CONF_DIR_LOC = "spark.kubernetes.hadoop.conf.dir" + private[spark] val HADOOP_CONFIG_MAP_SPARK_CONF_NAME = + "spark.kubernetes.hadoop.executor.hadoopConfigMapName" + + // Kerberos Configuration + private[spark] val KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME = + "spark.kubernetes.kerberos.delegationTokenSecretName" + private[spark] val KERBEROS_KEYTAB_SECRET_NAME = + "spark.kubernetes.kerberos.keyTabSecretName" + private[spark] val KERBEROS_KEYTAB_SECRET_KEY = + "spark.kubernetes.kerberos.keyTabSecretKey" + private[spark] val KERBEROS_SECRET_LABEL_PREFIX = + "hadoop-tokens" + private[spark] val SPARK_HADOOP_PREFIX = "spark.hadoop." + private[spark] val HADOOP_SECURITY_AUTHENTICATION = + SPARK_HADOOP_PREFIX + "hadoop.security.authentication" + + // Kerberos Token-Refresh Server + private[spark] val KERBEROS_REFRESH_LABEL_KEY = "refresh-hadoop-tokens" + private[spark] val KERBEROS_REFRESH_LABEL_VALUE = "yes" + // Bootstrapping dependencies via a secret private[spark] val MOUNTED_SMALL_FILES_SECRET_MOUNT_PATH = "/etc/spark-submitted-files" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala index 6ed497130429f..fadd13dbe0b62 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala @@ -35,7 +35,8 @@ private[spark] case class ClientArguments( mainAppResource: MainAppResource, otherPyFiles: Seq[String], mainClass: String, - driverArgs: Array[String]) + driverArgs: Array[String], + hadoopConfDir: Option[String]) private[spark] object ClientArguments { def fromCommandLineArgs(args: Array[String]): ClientArguments = { @@ -67,7 +68,8 @@ private[spark] object ClientArguments { mainAppResource.get, otherPyFiles, mainClass.get, - driverArgs.toArray) + driverArgs.toArray, + sys.env.get(ENV_HADOOP_CONF_DIR)) } } @@ -81,6 +83,11 @@ private[spark] class Client( private val driverJavaOptions = submissionSparkConf.get( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT) + // HADOOP_SECURITY_AUTHENTICATION is defined as simple for the driver and executors as + // they need only the delegation token to access secure HDFS, no need to sign in to Kerberos + private val maybeSimpleAuthentication = + if (isKerberosEnabled) Some(s"-D$HADOOP_SECURITY_AUTHENTICATION=simple") else None /** * Run command that initalizes a DriverSpec that will be updated after each @@ -101,7 +108,8 @@ private[spark] class Client( .getAll .map { case (confKey, confValue) => s"-D$confKey=$confValue" - } ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) + } ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) ++ + maybeSimpleAuthentication val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map { case (option, index) => new EnvVarBuilder() .withName(s"$ENV_JAVA_OPT_PREFIX$index") @@ -174,6 +182,7 @@ private[spark] object Client { clientArguments.mainClass, clientArguments.driverArgs, clientArguments.otherPyFiles, + clientArguments.hadoopConfDir, sparkConf) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala index d5710fab5a207..edd27e81f03ff 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala @@ -20,7 +20,8 @@ import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.ConfigurationUtils import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ -import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep} +import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, HadoopConfigBootstrapStep, InitContainerBootstrapStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep} +import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.HadoopStepsOrchestrator import org.apache.spark.deploy.k8s.submit.submitsteps.initcontainer.InitContainerConfigurationStepsOrchestrator import org.apache.spark.deploy.k8s.submit.submitsteps.LocalDirectoryMountConfigurationStep import org.apache.spark.launcher.SparkLauncher @@ -38,6 +39,7 @@ private[spark] class DriverConfigurationStepsOrchestrator( mainClass: String, appArgs: Array[String], additionalPythonFiles: Seq[String], + hadoopConfDir: Option[String], submissionSparkConf: SparkConf) { // The resource name prefix is derived from the application name, making it easy to connect the @@ -52,6 +54,7 @@ private[spark] class DriverConfigurationStepsOrchestrator( private val filesDownloadPath = submissionSparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION) private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY) private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config" + private val hadoopConfigMapName = s"$kubernetesResourceNamePrefix-hadoop-config" def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = { val additionalMainAppJar = mainAppResource match { @@ -114,6 +117,18 @@ private[spark] class DriverConfigurationStepsOrchestrator( val localDirectoryMountConfigurationStep = new LocalDirectoryMountConfigurationStep( submissionSparkConf) + val hadoopConfigSteps = + hadoopConfDir.map { conf => + val hadoopStepsOrchestrator = + new HadoopStepsOrchestrator( + kubernetesResourceNamePrefix, + namespace, + hadoopConfigMapName, + submissionSparkConf, + conf) + val hadoopConfSteps = hadoopStepsOrchestrator.getHadoopSteps() + Some(new HadoopConfigBootstrapStep(hadoopConfSteps, hadoopConfigMapName))} + .getOrElse(Option.empty[DriverConfigurationStep]) val resourceStep = mainAppResource match { case PythonMainAppResource(mainPyResource) => Option(new PythonStep(mainPyResource, additionalPythonFiles, filesDownloadPath)) @@ -203,6 +218,7 @@ private[spark] class DriverConfigurationStepsOrchestrator( dependencyResolutionStep, localDirectoryMountConfigurationStep) ++ submittedDependenciesBootstrapSteps ++ + hadoopConfigSteps.toSeq ++ resourceStep.toSeq ++ mountSecretsStep.toSeq } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/PodWithMainContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/PodWithMainContainer.scala new file mode 100644 index 0000000000000..42f3343acee7d --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/PodWithMainContainer.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.{Container, Pod} + + /** + * The purpose of this case class is so that we can package together + * the driver pod with its container so we can bootstrap and modify + * the class instead of each component seperately + */ +private[spark] case class PodWithMainContainer( + pod: Pod, + mainContainer: Container) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala new file mode 100644 index 0000000000000..916619475bc2a --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStep.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.ConfigMapBuilder + +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.{HadoopConfigSpec, HadoopConfigurationStep} + + /** + * This class configures the driverSpec with hadoop configuration logic which includes + * volume mounts, config maps, and environment variable manipulation. The steps are + * resolved with the orchestrator and they are run modifying the HadoopSpec with each + * step. The final HadoopSpec's contents will be appended to the driverSpec. + */ +private[spark] class HadoopConfigBootstrapStep( + hadoopConfigurationSteps: Seq[HadoopConfigurationStep], + hadoopConfigMapName: String ) + extends DriverConfigurationStep { + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { + var currentHadoopSpec = HadoopConfigSpec( + driverPod = driverSpec.driverPod, + driverContainer = driverSpec.driverContainer, + configMapProperties = Map.empty[String, String], + additionalDriverSparkConf = Map.empty[String, String], + dtSecret = None, + dtSecretName = KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME, + dtSecretItemKey = "") + for (nextStep <- hadoopConfigurationSteps) { + currentHadoopSpec = nextStep.configureContainers(currentHadoopSpec) + } + val configMap = + new ConfigMapBuilder() + .withNewMetadata() + .withName(hadoopConfigMapName) + .endMetadata() + .addToData(currentHadoopSpec.configMapProperties.asJava) + .build() + val driverSparkConfWithExecutorSetup = driverSpec.driverSparkConf.clone() + .set(HADOOP_CONFIG_MAP_SPARK_CONF_NAME, hadoopConfigMapName) + .setAll(currentHadoopSpec.additionalDriverSparkConf) + driverSpec.copy( + driverPod = currentHadoopSpec.driverPod, + driverContainer = currentHadoopSpec.driverContainer, + driverSparkConf = driverSparkConfWithExecutorSetup, + otherKubernetesResources = + driverSpec.otherKubernetesResources ++ + Seq(configMap) ++ currentHadoopSpec.dtSecret.toSeq + ) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala new file mode 100644 index 0000000000000..37a41d71ba616 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStep.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + +import java.io.File + +import com.google.common.base.Charsets +import com.google.common.io.Files + +import org.apache.spark.deploy.k8s.{HadoopConfBootstrap, PodWithMainContainer} +import org.apache.spark.deploy.k8s.constants._ + + /** + * This step is responsible for taking the contents from each file in + * HADOOP_CONF_DIR, grabbing its contents as a string and storing each of them + * as a key-value pair in a configmap. Each key-value pair will be stored + * as a file, via Volume Mounts, later. The HADOOP_CONF_DIR_LOC is passed into the + * SchedulerBackend via sparkConf. + */ +private[spark] class HadoopConfMounterStep( + hadoopConfigMapName: String, + hadoopConfigurationFiles: Seq[File], + hadoopConfBootstrapConf: HadoopConfBootstrap, + hadoopConfDir: String) + extends HadoopConfigurationStep { + + override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = { + val bootstrappedPodAndMainContainer = + hadoopConfBootstrapConf.bootstrapMainContainerAndVolumes( + PodWithMainContainer( + hadoopConfigSpec.driverPod, + hadoopConfigSpec.driverContainer)) + hadoopConfigSpec.copy( + driverPod = bootstrappedPodAndMainContainer.pod, + driverContainer = bootstrappedPodAndMainContainer.mainContainer, + configMapProperties = + hadoopConfigurationFiles.map(file => + (file.toPath.getFileName.toString, Files.toString(file, Charsets.UTF_8))).toMap, + additionalDriverSparkConf = hadoopConfigSpec.additionalDriverSparkConf ++ + Map(HADOOP_CONF_DIR_LOC -> hadoopConfDir) + ) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfSparkUserStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfSparkUserStep.scala new file mode 100644 index 0000000000000..43562dcc9f340 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfSparkUserStep.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + +import org.apache.spark.deploy.k8s.{HadoopConfSparkUserBootstrap, PodWithMainContainer} + +private[spark] class HadoopConfSparkUserStep(hadoopUserBootstrapConf: HadoopConfSparkUserBootstrap) + extends HadoopConfigurationStep { + + override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = { + val bootstrappedPodAndMainContainer = + hadoopUserBootstrapConf.bootstrapMainContainerAndVolumes( + PodWithMainContainer( + hadoopConfigSpec.driverPod, + hadoopConfigSpec.driverContainer)) + hadoopConfigSpec.copy( + driverPod = bootstrappedPodAndMainContainer.pod, + driverContainer = bootstrappedPodAndMainContainer.mainContainer, + configMapProperties = hadoopConfigSpec.configMapProperties, + additionalDriverSparkConf = hadoopConfigSpec.additionalDriverSparkConf) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfigSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfigSpec.scala new file mode 100644 index 0000000000000..b38cae25dca26 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfigSpec.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + +import io.fabric8.kubernetes.api.model.{Container, Pod, Secret} + + /** + * Represents a given configuration of the hadoop configuration logic, informing the + * HadoopConfigBootstrapStep of how the driver should be configured. This includes: + *

+ * - What Spark properties should be set on the driver's SparkConf for the executors + * - The spec of the main container so that it can be modified to share volumes + * - The spec of the driver pod EXCEPT for the addition of the given hadoop configs (e.g. volumes + * the hadoop logic needs) + * - The properties that will be stored into the config map which have (key, value) + * pairs of (path, data) + * - The secret containing a DT, either previously specified or built on the fly + * - The name of the secret where the DT will be stored + * - The data item-key on the secret which correlates with where the current DT data is stored + */ +private[spark] case class HadoopConfigSpec( + additionalDriverSparkConf: Map[String, String], + driverPod: Pod, + driverContainer: Container, + configMapProperties: Map[String, String], + dtSecret: Option[Secret], + dtSecretName: String, + dtSecretItemKey: String) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfigurationStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfigurationStep.scala new file mode 100644 index 0000000000000..b08b180ce8531 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfigurationStep.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + + /** + * Represents a step in preparing the driver with Hadoop Configuration logic. + */ +private[spark] trait HadoopConfigurationStep { + + def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStep.scala new file mode 100644 index 0000000000000..635de4a859969 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStep.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + +import java.io._ +import java.security.PrivilegedExceptionAction + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.SecretBuilder +import org.apache.commons.codec.binary.Base64 +import org.apache.hadoop.security.Credentials +import org.apache.hadoop.security.token.{Token, TokenIdentifier} + +import org.apache.spark.SparkConf +import org.apache.spark.SparkException +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.{HadoopUGIUtil, KerberosTokenConfBootstrapImpl, PodWithMainContainer} +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.internal.Logging + + /** + * This step does all the heavy lifting for Delegation Token logic. This step + * assumes that the job user has either specified a principal and keytab or ran + * $kinit before running spark-submit. With a TGT stored locally, by running + * UGI.getCurrentUser you are able to obtain the current user, alternatively + * you can run UGI.loginUserFromKeytabAndReturnUGI and by running .doAs run + * as the logged into user instead of the current user. With the Job User principal + * you then retrieve the delegation token from the NameNode and store values in + * DelegationToken. Lastly, the class puts the data into a secret. All this is + * appended to the current HadoopSpec which in turn will append to the current + * DriverSpec. + */ +private[spark] class HadoopKerberosKeytabResolverStep( + kubernetesResourceNamePrefix: String, + submissionSparkConf: SparkConf, + maybePrincipal: Option[String], + maybeKeytab: Option[File], + maybeRenewerPrincipal: Option[String], + hadoopUGI: HadoopUGIUtil) extends HadoopConfigurationStep with Logging { + + private var credentials: Credentials = _ + + override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = { + val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf) + if (!hadoopUGI.isSecurityEnabled) { + throw new SparkException("Hadoop not configured with Kerberos") + } + val maybeJobUserUGI = + for { + principal <- maybePrincipal + keytab <- maybeKeytab + } yield { + // Not necessary with [Spark-16742] + // Reliant on [Spark-20328] for changing to YARN principal + submissionSparkConf.set("spark.yarn.principal", principal) + submissionSparkConf.set("spark.yarn.keytab", keytab.toURI.toString) + logDebug("Logged into KDC with keytab using Job User UGI") + hadoopUGI.loginUserFromKeytabAndReturnUGI( + principal, + keytab.toURI.toString) + } + // In the case that keytab is not specified we will read from Local Ticket Cache + val jobUserUGI = maybeJobUserUGI.getOrElse(hadoopUGI.getCurrentUser) + // It is necessary to run as jobUserUGI because logged in user != Current User + val tokens = jobUserUGI.doAs( + new PrivilegedExceptionAction[Iterable[Token[_ <: TokenIdentifier]]] { + override def run(): Iterable[Token[_ <: TokenIdentifier]] = { + val originalCredentials = jobUserUGI.getCredentials + // TODO: This is not necessary with [Spark-20328] since we would be using + // Spark core providers to handle delegation token renewal + val renewerPrincipal = maybeRenewerPrincipal.getOrElse(jobUserUGI.getShortUserName) + credentials = new Credentials(originalCredentials) + hadoopUGI.dfsAddDelegationToken(hadoopUGI.getFileSystem(hadoopConf), + hadoopConf, + renewerPrincipal, + credentials) + credentials.getAllTokens.asScala + }}) + + if (tokens.isEmpty) throw new SparkException(s"Did not obtain any delegation tokens") + val data = hadoopUGI.serialize(credentials) + val renewalInterval = + hadoopUGI.getTokenRenewalInterval(tokens, hadoopConf).getOrElse(Long.MaxValue) + val currentTime = hadoopUGI.getCurrentTime + val initialTokenDataKeyName = s"$KERBEROS_SECRET_LABEL_PREFIX-$currentTime-$renewalInterval" + val uniqueSecretName = + s"$kubernetesResourceNamePrefix-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME.$currentTime" + val secretDT = + new SecretBuilder() + .withNewMetadata() + .withName(uniqueSecretName) + .withLabels(Map(KERBEROS_REFRESH_LABEL_KEY -> KERBEROS_REFRESH_LABEL_VALUE).asJava) + .endMetadata() + .addToData(initialTokenDataKeyName, Base64.encodeBase64String(data)) + .build() + val bootstrapKerberos = new KerberosTokenConfBootstrapImpl( + uniqueSecretName, + initialTokenDataKeyName, + jobUserUGI.getShortUserName) + val withKerberosEnvPod = bootstrapKerberos.bootstrapMainContainerAndVolumes( + PodWithMainContainer( + hadoopConfigSpec.driverPod, + hadoopConfigSpec.driverContainer)) + hadoopConfigSpec.copy( + additionalDriverSparkConf = + hadoopConfigSpec.additionalDriverSparkConf ++ Map( + KERBEROS_KEYTAB_SECRET_KEY -> initialTokenDataKeyName, + KERBEROS_KEYTAB_SECRET_NAME -> uniqueSecretName), + driverPod = withKerberosEnvPod.pod, + driverContainer = withKerberosEnvPod.mainContainer, + dtSecret = Some(secretDT), + dtSecretName = uniqueSecretName, + dtSecretItemKey = initialTokenDataKeyName) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStep.scala new file mode 100644 index 0000000000000..f93192c6628c3 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStep.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + +import org.apache.hadoop.security.UserGroupInformation + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.{KerberosTokenConfBootstrapImpl, PodWithMainContainer} +import org.apache.spark.deploy.k8s.constants._ + + /** + * This step assumes that you have already done all the heavy lifting in retrieving a + * delegation token and storing the following data in a secret before running this job. + * This step requires that you just specify the secret name and data item-key corresponding + * to the data where the delegation token is stored. + */ +private[spark] class HadoopKerberosSecretResolverStep( + submissionSparkConf: SparkConf, + tokenSecretName: String, + tokenItemKeyName: String) extends HadoopConfigurationStep { + + override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = { + val bootstrapKerberos = new KerberosTokenConfBootstrapImpl( + tokenSecretName, + tokenItemKeyName, + UserGroupInformation.getCurrentUser.getShortUserName) + val withKerberosEnvPod = bootstrapKerberos.bootstrapMainContainerAndVolumes( + PodWithMainContainer( + hadoopConfigSpec.driverPod, + hadoopConfigSpec.driverContainer)) + hadoopConfigSpec.copy( + driverPod = withKerberosEnvPod.pod, + driverContainer = withKerberosEnvPod.mainContainer, + additionalDriverSparkConf = + hadoopConfigSpec.additionalDriverSparkConf ++ Map( + KERBEROS_KEYTAB_SECRET_KEY -> tokenItemKeyName, + KERBEROS_KEYTAB_SECRET_NAME -> tokenSecretName), + dtSecret = None, + dtSecretName = tokenSecretName, + dtSecretItemKey = tokenItemKeyName) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala new file mode 100644 index 0000000000000..06de3fc6c74b9 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestrator.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, HadoopUGIUtilImpl, OptionRequirements} +import org.apache.spark.deploy.k8s.HadoopConfSparkUserBootstrapImpl +import org.apache.spark.deploy.k8s.config._ +import org.apache.spark.internal.Logging + + /** + * Returns the complete ordered list of steps required to configure the hadoop configurations. + */ +private[spark] class HadoopStepsOrchestrator( + kubernetesResourceNamePrefix: String, + namespace: String, + hadoopConfigMapName: String, + submissionSparkConf: SparkConf, + hadoopConfDir: String) extends Logging { + + private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT) + private val maybePrincipal = submissionSparkConf.get(KUBERNETES_KERBEROS_PRINCIPAL) + private val maybeKeytab = submissionSparkConf.get(KUBERNETES_KERBEROS_KEYTAB) + .map(k => new File(k)) + private val maybeExistingSecret = submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_NAME) + private val maybeExistingSecretItemKey = + submissionSparkConf.get(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY) + private val maybeRenewerPrincipal = + submissionSparkConf.get(KUBERNETES_KERBEROS_RENEWER_PRINCIPAL) + private val hadoopConfigurationFiles = getHadoopConfFiles(hadoopConfDir) + private val hadoopUGI = new HadoopUGIUtilImpl + logInfo(s"Hadoop Conf directory: $hadoopConfDir") + + require(maybeKeytab.forall( _ => isKerberosEnabled ), + "You must enable Kerberos support if you are specifying a Kerberos Keytab") + + require(maybeExistingSecret.forall( _ => isKerberosEnabled ), + "You must enable Kerberos support if you are specifying a Kerberos Secret") + + OptionRequirements.requireBothOrNeitherDefined( + maybeKeytab, + maybePrincipal, + "If a Kerberos keytab is specified you must also specify a Kerberos principal", + "If a Kerberos principal is specified you must also specify a Kerberos keytab") + + OptionRequirements.requireBothOrNeitherDefined( + maybeExistingSecret, + maybeExistingSecretItemKey, + "If a secret storing a Kerberos Delegation Token is specified you must also" + + " specify the label where the data is stored", + "If a secret data item-key where the data of the Kerberos Delegation Token is specified" + + " you must also specify the name of the secret") + + def getHadoopSteps(): Seq[HadoopConfigurationStep] = { + val hadoopConfBootstrapImpl = new HadoopConfBootstrapImpl( + hadoopConfigMapName, + hadoopConfigurationFiles, + hadoopUGI) + val hadoopConfMounterStep = new HadoopConfMounterStep( + hadoopConfigMapName, + hadoopConfigurationFiles, + hadoopConfBootstrapImpl, + hadoopConfDir) + val maybeKerberosStep = + if (isKerberosEnabled) { + maybeExistingSecret.map(existingSecretName => Some(new HadoopKerberosSecretResolverStep( + submissionSparkConf, + existingSecretName, + maybeExistingSecretItemKey.get))).getOrElse(Some( + new HadoopKerberosKeytabResolverStep( + kubernetesResourceNamePrefix, + submissionSparkConf, + maybePrincipal, + maybeKeytab, + maybeRenewerPrincipal, + hadoopUGI))) + } else { + Some(new HadoopConfSparkUserStep(new HadoopConfSparkUserBootstrapImpl(hadoopUGI))) + } + Seq(hadoopConfMounterStep) ++ maybeKerberosStep.toSeq + } + + private def getHadoopConfFiles(path: String) : Seq[File] = { + val dir = new File(path) + if (dir.isDirectory) { + dir.listFiles.flatMap { file => Some(file).filter(_.isFile) }.toSeq + } else { + Seq.empty[File] + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 73e08b170a4a0..1c7ccba394a3c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -16,11 +16,12 @@ */ package org.apache.spark.scheduler.cluster.k8s -import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder} import scala.collection.JavaConverters._ -import org.apache.spark.SparkConf -import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPlugin, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, ContainerPortBuilder, EnvVar, EnvVarBuilder, EnvVarSourceBuilder, Pod, PodBuilder, QuantityBuilder, VolumeBuilder, VolumeMountBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrap, HadoopConfSparkUserBootstrap, InitContainerResourceStagingServerSecretPlugin, KerberosTokenConfBootstrap, PodWithDetachedInitContainer, PodWithMainContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.{InitContainerUtil, MountSecretsBootstrap, MountSmallFilesBootstrap} @@ -46,7 +47,10 @@ private[spark] class ExecutorPodFactoryImpl( executorInitContainerBootstrap: Option[SparkPodInitContainerBootstrap], executorInitContainerMountSecretsBootstrap: Option[MountSecretsBootstrap], executorMountInitContainerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin], - executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider) + executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider, + hadoopBootStrap: Option[HadoopConfBootstrap], + kerberosBootstrap: Option[KerberosTokenConfBootstrap], + hadoopUserBootstrap: Option[HadoopConfSparkUserBootstrap]) extends ExecutorPodFactory { import ExecutorPodFactoryImpl._ @@ -55,6 +59,11 @@ private[spark] class ExecutorPodFactoryImpl( org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) + private val isKerberosEnabled = sparkConf.get(KUBERNETES_KERBEROS_SUPPORT) + // HADOOP_SECURITY_AUTHENTICATION is defined as simple for the driver and executors as + // they need only the delegation token to access secure HDFS, no need to sign in to Kerberos + private val maybeSimpleAuthentication = + if (isKerberosEnabled) Some(s"-D$HADOOP_SECURITY_AUTHENTICATION=simple") else None private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX, @@ -133,15 +142,16 @@ private[spark] class ExecutorPodFactoryImpl( .withValue(cp) .build() } - val executorExtraJavaOptionsEnv = sparkConf - .get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) - .map { opts => - val delimitedOpts = Utils.splitCommandString(opts) - delimitedOpts.zipWithIndex.map { - case (opt, index) => - new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() - } - }.getOrElse(Seq.empty[EnvVar]) + val executorExtraJavaOptions = Option(( + sparkConf.get(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS) + ++ maybeSimpleAuthentication).mkString(" ")).filter(str => !str.isEmpty) + val executorExtraJavaOptionsEnv = executorExtraJavaOptions.map { opts => + val delimitedOpts = Utils.splitCommandString(opts) + delimitedOpts.zipWithIndex.map { + case (opt, index) => + new EnvVarBuilder().withName(s"$ENV_JAVA_OPT_PREFIX$index").withValue(opt).build() + } + }.getOrElse(Seq.empty[EnvVar]) val executorEnv = (Seq( (ENV_EXECUTOR_PORT, executorPort.toString), (ENV_DRIVER_URL, driverUrl), @@ -264,10 +274,30 @@ private[spark] class ExecutorPodFactoryImpl( val executorPodWithNodeAffinity = nodeAffinityExecutorPodModifier.addNodeAffinityAnnotationIfUseful( executorPodWithInitContainer, nodeToLocalTaskCount) + val (executorHadoopConfPod, executorHadoopConfContainer) = + hadoopBootStrap.map { bootstrap => + val podWithMainContainer = bootstrap.bootstrapMainContainerAndVolumes( + PodWithMainContainer(executorPodWithNodeAffinity, initBootstrappedExecutorContainer)) + (podWithMainContainer.pod, podWithMainContainer.mainContainer) + }.getOrElse(executorPodWithNodeAffinity, initBootstrappedExecutorContainer) + + val (executorKerberosPod, executorKerberosContainer) = + kerberosBootstrap.map { bootstrap => + val podWithMainContainer = bootstrap.bootstrapMainContainerAndVolumes( + PodWithMainContainer(executorHadoopConfPod, executorHadoopConfContainer)) + (podWithMainContainer.pod, podWithMainContainer.mainContainer) + }.getOrElse((executorHadoopConfPod, executorHadoopConfContainer)) + + val (executorSparkUserPod, executorSparkUserContainer) = + hadoopUserBootstrap.map { bootstrap => + val podWithMainContainer = bootstrap.bootstrapMainContainerAndVolumes( + PodWithMainContainer(executorKerberosPod, executorKerberosContainer)) + (podWithMainContainer.pod, podWithMainContainer.mainContainer) + }.getOrElse((executorKerberosPod, executorKerberosContainer)) - new PodBuilder(executorPodWithNodeAffinity) + new PodBuilder(executorSparkUserPod) .editSpec() - .addToContainers(initBootstrappedExecutorContainer) + .addToContainers(executorSparkUserContainer) .endSpec() .build() } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 709f3b72b92f5..bd90766d07002 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -21,7 +21,7 @@ import java.io.File import io.fabric8.kubernetes.client.Config import org.apache.spark.SparkContext -import org.apache.spark.deploy.k8s.{ConfigurationUtils, InitContainerResourceStagingServerSecretPluginImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl} +import org.apache.spark.deploy.k8s.{ConfigurationUtils, HadoopConfBootstrapImpl, HadoopConfSparkUserBootstrapImpl, HadoopUGIUtilImpl, InitContainerResourceStagingServerSecretPluginImpl, KerberosTokenConfBootstrapImpl, SparkKubernetesClientFactory, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrapImpl} @@ -29,7 +29,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClientImpl import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { @@ -44,6 +44,10 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler) : SchedulerBackend = { val sparkConf = sc.getConf + val maybeHadoopConfigMap = sparkConf.getOption(HADOOP_CONFIG_MAP_SPARK_CONF_NAME) + val maybeHadoopConfDir = sparkConf.getOption(HADOOP_CONF_DIR_LOC) + val maybeDTSecretName = sparkConf.getOption(KERBEROS_KEYTAB_SECRET_NAME) + val maybeDTDataItem = sparkConf.getOption(KERBEROS_KEYTAB_SECRET_KEY) val maybeInitContainerConfigMap = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP) val maybeInitContainerConfigMapKey = sparkConf.get(EXECUTOR_INIT_CONTAINER_CONFIG_MAP_KEY) val maybeSubmittedFilesSecret = sparkConf.get(EXECUTOR_SUBMITTED_SMALL_FILES_SECRET) @@ -83,6 +87,35 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sparkConf) } + val hadoopUtil = new HadoopUGIUtilImpl + val hadoopBootStrap = maybeHadoopConfigMap.map{ hadoopConfigMap => + val hadoopConfigurations = maybeHadoopConfDir.map( + conf_dir => getHadoopConfFiles(conf_dir)).getOrElse(Array.empty[File]) + new HadoopConfBootstrapImpl( + hadoopConfigMap, + hadoopConfigurations, + hadoopUtil) + } + + val kerberosBootstrap = + maybeHadoopConfigMap.flatMap { _ => + for { + secretName <- maybeDTSecretName + secretItemKey <- maybeDTDataItem + } yield { + new KerberosTokenConfBootstrapImpl( + secretName, + secretItemKey, + Utils.getCurrentUserName() ) } + } + + val hadoopUserBootstrap = + if (hadoopBootStrap.isDefined && kerberosBootstrap.isEmpty) { + Some(new HadoopConfSparkUserBootstrapImpl(hadoopUtil)) + } else { + None + } + val mountSmallFilesBootstrap = for { secretName <- maybeSubmittedFilesSecret secretMountPath <- maybeSubmittedFilesSecretMountPath @@ -107,11 +140,17 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit logWarning("The executor's init-container config map was not specified. Executors will" + " therefore not attempt to fetch remote or submitted dependencies.") } + if (maybeInitContainerConfigMapKey.isEmpty) { logWarning("The executor's init-container config map key was not specified. Executors will" + " therefore not attempt to fetch remote or submitted dependencies.") } + if (maybeHadoopConfigMap.isEmpty) { + logWarning("The executor's hadoop config map key was not specified. Executors will" + + " therefore not attempt to mount hadoop configuration files.") + } + val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( KUBERNETES_MASTER_INTERNAL_URL, Some(sparkConf.get(KUBERNETES_NAMESPACE)), @@ -142,7 +181,10 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit executorInitContainerBootstrap, executorInitContainerMountSecretsBootstrap, executorInitContainerSecretVolumePlugin, - executorLocalDirVolumeProvider) + executorLocalDirVolumeProvider, + hadoopBootStrap, + kerberosBootstrap, + hadoopUserBootstrap) val allocatorExecutor = ThreadUtils .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( @@ -160,4 +202,13 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) } + + private def getHadoopConfFiles(path: String) : Array[File] = { + val dir = new File(path) + if (dir.isDirectory) { + dir.listFiles.flatMap { file => Some(file).filter(_.isFile) } + } else { + Array.empty[File] + } + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala index 041f51e912002..846335889ee54 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala @@ -29,6 +29,7 @@ import org.scalatest.mock.MockitoSugar._ import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.submitsteps.{DriverConfigurationStep, KubernetesDriverSpec} @@ -136,6 +137,7 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { .set( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS, "-XX:+HeapDumpOnOutOfMemoryError -XX:+PrintGCDetails") + .set(KUBERNETES_KERBEROS_SUPPORT, true) val submissionClient = new Client( submissionSteps, sparkConf, @@ -150,14 +152,16 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter { val driverJvmOptsEnvs = driverContainer.getEnv.asScala.filter { env => env.getName.startsWith(ENV_JAVA_OPT_PREFIX) }.sortBy(_.getName) - assert(driverJvmOptsEnvs.size === 4) + assert(driverJvmOptsEnvs.size === 6) val expectedJvmOptsValues = Seq( + "-Dspark.kubernetes.kerberos.enabled=true", "-Dspark.logConf=true", s"-D${SecondTestConfigurationStep.sparkConfKey}=" + s"${SecondTestConfigurationStep.sparkConfValue}", s"-XX:+HeapDumpOnOutOfMemoryError", - s"-XX:+PrintGCDetails") + s"-XX:+PrintGCDetails", + s"-D$HADOOP_SECURITY_AUTHENTICATION=simple") driverJvmOptsEnvs.zip(expectedJvmOptsValues).zipWithIndex.foreach { case ((resolvedEnv, expectedJvmOpt), index) => assert(resolvedEnv.getName === s"$ENV_JAVA_OPT_PREFIX$index") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala index c3ec943e3e87f..39ab330786943 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.submit import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.config._ -import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep} +import org.apache.spark.deploy.k8s.submit.submitsteps.{BaseDriverConfigurationStep, DependencyResolutionStep, DriverConfigurationStep, DriverKubernetesCredentialsStep, DriverServiceBootstrapStep, HadoopConfigBootstrapStep, InitContainerBootstrapStep, LocalDirectoryMountConfigurationStep, MountSecretsStep, MountSmallLocalFilesStep, PythonStep, RStep} private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite { @@ -45,7 +45,8 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_NAME, MAIN_CLASS, APP_ARGS, - Seq.empty[String], + ADDITIONAL_PYTHON_FILES, + None, sparkConf) validateStepTypes( orchestrator, @@ -69,7 +70,8 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_NAME, MAIN_CLASS, APP_ARGS, - Seq.empty[String], + ADDITIONAL_PYTHON_FILES, + None, sparkConf) validateStepTypes( orchestrator, @@ -93,6 +95,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS MAIN_CLASS, APP_ARGS, ADDITIONAL_PYTHON_FILES, + None, sparkConf) validateStepTypes( orchestrator, @@ -116,6 +119,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS MAIN_CLASS, APP_ARGS, Seq.empty[String], + None, sparkConf) validateStepTypes( orchestrator, @@ -127,7 +131,6 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[RStep]) } - test("Only submitter local files without a resource staging server.") { val sparkConf = new SparkConf(false).set("spark.files", "/var/spark/file1.txt") val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") @@ -139,7 +142,8 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_NAME, MAIN_CLASS, APP_ARGS, - Seq.empty[String], + ADDITIONAL_PYTHON_FILES, + None, sparkConf) validateStepTypes( orchestrator, @@ -164,6 +168,7 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS MAIN_CLASS, APP_ARGS, Seq.empty[String], + None, sparkConf) validateStepTypes( orchestrator, @@ -188,7 +193,8 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS APP_NAME, MAIN_CLASS, APP_ARGS, - Seq.empty[String], + ADDITIONAL_PYTHON_FILES, + None, sparkConf) validateStepTypes( orchestrator, @@ -200,6 +206,32 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS classOf[MountSecretsStep]) } + test("Submission steps with hdfs interaction and HADOOP_CONF_DIR defined") { + val sparkConf = new SparkConf(false) + val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") + val hadoopConf = Some("/etc/hadoop/conf") + val orchestrator = new DriverConfigurationStepsOrchestrator( + NAMESPACE, + APP_ID, + LAUNCH_TIME, + mainAppResource, + APP_NAME, + MAIN_CLASS, + APP_ARGS, + ADDITIONAL_PYTHON_FILES, + hadoopConf, + sparkConf) + val steps = orchestrator.getAllConfigurationSteps() + validateStepTypes( + orchestrator, + classOf[BaseDriverConfigurationStep], + classOf[DriverServiceBootstrapStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep], + classOf[LocalDirectoryMountConfigurationStep], + classOf[HadoopConfigBootstrapStep]) + } + private def validateStepTypes( orchestrator: DriverConfigurationStepsOrchestrator, types: Class[_ <: DriverConfigurationStep]*): Unit = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala new file mode 100644 index 0000000000000..03ada4090a56a --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfBootstrapSuite.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.io.File +import java.util.UUID + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model._ +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, HadoopUGIUtilImpl, PodWithMainContainer} +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.util.Utils + +private[spark] class HadoopConfBootstrapSuite extends SparkFunSuite with BeforeAndAfter{ + private val CONFIG_MAP_NAME = "config-map" + private val TEMP_HADOOP_FILE = createTempFile("core-site.xml") + private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE) + private val SPARK_USER_VALUE = "sparkUser" + + @Mock + private var hadoopUtil: HadoopUGIUtilImpl = _ + + before { + MockitoAnnotations.initMocks(this) + when(hadoopUtil.getShortUserName).thenReturn(SPARK_USER_VALUE) + } + + test("Test of bootstrapping hadoop_conf_dir files") { + val hadoopConfStep = new HadoopConfBootstrapImpl( + CONFIG_MAP_NAME, + HADOOP_FILES, + hadoopUtil) + val expectedKeyPaths = Seq( + new KeyToPathBuilder() + .withKey(TEMP_HADOOP_FILE.toPath.getFileName.toString) + .withPath(TEMP_HADOOP_FILE.toPath.getFileName.toString) + .build()) + val expectedPod = new PodBuilder() + .editOrNewSpec() + .addNewVolume() + .withName(HADOOP_FILE_VOLUME) + .withNewConfigMap() + .withName(CONFIG_MAP_NAME) + .withItems(expectedKeyPaths.asJava) + .endConfigMap() + .endVolume() + .endSpec() + .build() + + val podWithMain = PodWithMainContainer( + new PodBuilder().withNewSpec().endSpec().build(), + new Container()) + val returnedPodContainer = hadoopConfStep.bootstrapMainContainerAndVolumes(podWithMain) + assert(expectedPod === returnedPodContainer.pod) + assert(returnedPodContainer.mainContainer.getVolumeMounts.asScala.map(vm => + (vm.getName, vm.getMountPath)).head === (HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH)) + assert(returnedPodContainer.mainContainer.getEnv.asScala.head === + new EnvVarBuilder().withName(ENV_HADOOP_CONF_DIR).withValue(HADOOP_CONF_DIR_PATH).build()) + } + + private def createTempFile(contents: String): File = { + val dir = Utils.createTempDir() + val file = new File(dir, s"${UUID.randomUUID().toString}") + Files.write(contents.getBytes, file) + file + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfSparkUserBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfSparkUserBootstrapSuite.scala new file mode 100644 index 0000000000000..f252799377c05 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/HadoopConfSparkUserBootstrapSuite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.{HadoopUGIUtilImpl, PodWithMainContainer} +import org.apache.spark.deploy.k8s.HadoopConfSparkUserBootstrapImpl +import org.apache.spark.deploy.k8s.constants._ + +private[spark] class HadoopConfSparkUserBootstrapSuite extends SparkFunSuite with BeforeAndAfter{ + private val SPARK_USER_VALUE = "sparkUser" + + @Mock + private var hadoopUtil: HadoopUGIUtilImpl = _ + + before { + MockitoAnnotations.initMocks(this) + when(hadoopUtil.getShortUserName).thenReturn(SPARK_USER_VALUE) + } + + test("Test of bootstrapping ENV_VARs for SPARK_USER") { + val hadoopConfStep = new HadoopConfSparkUserBootstrapImpl(hadoopUtil) + val emptyPod = new PodBuilder().withNewSpec().endSpec().build() + val podWithMain = PodWithMainContainer( + emptyPod, + new Container()) + val returnedPodContainer = hadoopConfStep.bootstrapMainContainerAndVolumes(podWithMain) + assert(emptyPod === returnedPodContainer.pod) + assert(returnedPodContainer.mainContainer.getEnv.asScala.head === + new EnvVarBuilder().withName(ENV_SPARK_USER).withValue(SPARK_USER_VALUE).build()) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KerberosTokenConfBootstrapSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KerberosTokenConfBootstrapSuite.scala new file mode 100644 index 0000000000000..dae097a22d3fb --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KerberosTokenConfBootstrapSuite.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.{KerberosTokenConfBootstrapImpl, PodWithMainContainer} +import org.apache.spark.deploy.k8s.constants._ + + +private[spark] class KerberosTokenConfBootstrapSuite extends SparkFunSuite { + private val SECRET_NAME = "dtSecret" + private val SECRET_LABEL = "dtLabel" + private val TEST_SPARK_USER = "hdfs" + + test("Test of bootstrapping kerberos secrets and env") { + val kerberosConfStep = new KerberosTokenConfBootstrapImpl( + SECRET_NAME, + SECRET_LABEL, + TEST_SPARK_USER) + val expectedPod = new PodBuilder() + .editOrNewSpec() + .addNewVolume() + .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) + .withNewSecret() + .withSecretName(SECRET_NAME) + .endSecret() + .endVolume() + .endSpec() + .build() + val podWithMain = PodWithMainContainer( + new PodBuilder().withNewSpec().endSpec().build(), + new Container()) + val returnedPodContainer = kerberosConfStep.bootstrapMainContainerAndVolumes(podWithMain) + assert(expectedPod === returnedPodContainer.pod) + assert(returnedPodContainer.mainContainer.getVolumeMounts.asScala.map(vm => + (vm.getName, vm.getMountPath)).head === + (SPARK_APP_HADOOP_SECRET_VOLUME_NAME, SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)) + assert(returnedPodContainer.mainContainer.getEnv.asScala.head.getName === + ENV_HADOOP_TOKEN_FILE_LOCATION) + assert(returnedPodContainer.mainContainer.getEnv.asScala(1).getName === ENV_SPARK_USER) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStepSuite.scala new file mode 100644 index 0000000000000..d9dae9a9d5257 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/HadoopConfigBootstrapStepSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Matchers.any +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps.{HadoopConfigSpec, HadoopConfigurationStep} + + +private[spark] class HadoopConfigBootstrapStepSuite extends SparkFunSuite with BeforeAndAfter{ + private val CONFIG_MAP_NAME = "config-map" + private val POD_LABEL = Map("bootstrap" -> "true") + private val DRIVER_CONTAINER_NAME = "driver-container" + private val EXPECTED_SECRET = new SecretBuilder() + .withNewMetadata() + .withName(KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME) + .endMetadata() + .addToData("data", "secretdata") + .build() + + @Mock + private var hadoopConfigStep : HadoopConfigurationStep = _ + + before { + MockitoAnnotations.initMocks(this) + when(hadoopConfigStep.configureContainers(any[HadoopConfigSpec])).thenReturn( + HadoopConfigSpec( + configMapProperties = Map("data" -> "dataBytesToString"), + driverPod = new PodBuilder() + .withNewMetadata() + .addToLabels("bootstrap", "true") + .endMetadata() + .withNewSpec().endSpec() + .build(), + driverContainer = new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(), + additionalDriverSparkConf = Map("sparkConf" -> "confValue"), + dtSecret = + Some(EXPECTED_SECRET), + dtSecretName = KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME, + dtSecretItemKey = "")) + } + + test("Test modification of driverSpec with Hadoop Steps") { + val hadoopConfStep = new HadoopConfigBootstrapStep( + Seq(hadoopConfigStep), + CONFIG_MAP_NAME) + val expectedDriverSparkConf = new SparkConf(true) + .set(HADOOP_CONFIG_MAP_SPARK_CONF_NAME, CONFIG_MAP_NAME) + .set("sparkConf", "confValue") + val expectedConfigMap = new ConfigMapBuilder() + .withNewMetadata() + .withName(CONFIG_MAP_NAME) + .endMetadata() + .addToData(Map("data" -> "dataBytesToString").asJava) + .build() + val expectedResources = Seq(expectedConfigMap, EXPECTED_SECRET) + val driverSpec = KubernetesDriverSpec( + driverPod = new Pod(), + driverContainer = new Container(), + driverSparkConf = new SparkConf(true), + otherKubernetesResources = Seq.empty[HasMetadata]) + val returnContainerSpec = hadoopConfStep.configureDriver(driverSpec) + assert(expectedDriverSparkConf.getAll === returnContainerSpec.driverSparkConf.getAll) + assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME) + assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL) + assert(returnContainerSpec.otherKubernetesResources === expectedResources) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala new file mode 100644 index 0000000000000..18bb3b631cf28 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfMounterStepSuite.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + +import java.io.File +import java.util.UUID + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model._ +import org.apache.commons.io.FileUtils.readFileToString +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Matchers.any +import org.mockito.Mockito.when +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.{HadoopConfBootstrap, PodWithMainContainer} +import org.apache.spark.deploy.k8s.constants.HADOOP_CONF_DIR_LOC +import org.apache.spark.util.Utils + + +private[spark] class HadoopConfMounterStepSuite extends SparkFunSuite with BeforeAndAfter{ + private val CONFIG_MAP_NAME = "config-map" + private val HADOOP_CONF_DIR_VAL = "/etc/hadoop" + private val POD_LABEL = Map("bootstrap" -> "true") + private val DRIVER_CONTAINER_NAME = "driver-container" + private val TEMP_HADOOP_FILE = createTempFile("core-site.xml") + private val HADOOP_FILES = Seq(TEMP_HADOOP_FILE) + + @Mock + private var hadoopConfBootstrap : HadoopConfBootstrap = _ + + before { + MockitoAnnotations.initMocks(this) + when(hadoopConfBootstrap.bootstrapMainContainerAndVolumes( + any[PodWithMainContainer])).thenAnswer(new Answer[PodWithMainContainer] { + override def answer(invocation: InvocationOnMock) : PodWithMainContainer = { + val pod = invocation.getArgumentAt(0, classOf[PodWithMainContainer]) + pod.copy( + pod = + new PodBuilder(pod.pod) + .withNewMetadata() + .addToLabels("bootstrap", "true") + .endMetadata() + .withNewSpec().endSpec() + .build(), + mainContainer = + new ContainerBuilder() + .withName(DRIVER_CONTAINER_NAME).build() + )}}) + } + + test("Test of mounting hadoop_conf_dir files into HadoopConfigSpec") { + val hadoopConfStep = new HadoopConfMounterStep( + CONFIG_MAP_NAME, + HADOOP_FILES, + hadoopConfBootstrap, + HADOOP_CONF_DIR_VAL) + val expectedDriverSparkConf = Map(HADOOP_CONF_DIR_LOC -> HADOOP_CONF_DIR_VAL) + val expectedConfigMap = Map( + TEMP_HADOOP_FILE.toPath.getFileName.toString -> + readFileToString(TEMP_HADOOP_FILE)) + val hadoopConfSpec = HadoopConfigSpec( + Map.empty[String, String], + new Pod(), + new Container(), + Map.empty[String, String], + None, + "", + "") + val returnContainerSpec = hadoopConfStep.configureContainers(hadoopConfSpec) + assert(expectedDriverSparkConf === returnContainerSpec.additionalDriverSparkConf) + assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME) + assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL) + assert(returnContainerSpec.configMapProperties === expectedConfigMap) + } + + private def createTempFile(contents: String): File = { + val dir = Utils.createTempDir() + val file = new File(dir, s"${UUID.randomUUID().toString}") + Files.write(contents.getBytes, file) + file + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfSparkUserStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfSparkUserStepSuite.scala new file mode 100644 index 0000000000000..82085fb979309 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopConfSparkUserStepSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Matchers.any +import org.mockito.Mockito.when +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.{HadoopConfSparkUserBootstrap, PodWithMainContainer} + + +private[spark] class HadoopConfSparkUserStepSuite extends SparkFunSuite with BeforeAndAfter{ + private val POD_LABEL = Map("bootstrap" -> "true") + private val DRIVER_CONTAINER_NAME = "driver-container" + + @Mock + private var hadoopConfSparkUserBootstrap : HadoopConfSparkUserBootstrap = _ + + before { + MockitoAnnotations.initMocks(this) + when(hadoopConfSparkUserBootstrap.bootstrapMainContainerAndVolumes( + any[PodWithMainContainer])).thenAnswer(new Answer[PodWithMainContainer] { + override def answer(invocation: InvocationOnMock) : PodWithMainContainer = { + val pod = invocation.getArgumentAt(0, classOf[PodWithMainContainer]) + pod.copy( + pod = + new PodBuilder(pod.pod) + .withNewMetadata() + .addToLabels("bootstrap", "true") + .endMetadata() + .withNewSpec().endSpec() + .build(), + mainContainer = + new ContainerBuilder() + .withName(DRIVER_CONTAINER_NAME).build() + )}}) + } + + test("Test of calling the SPARK_USER bootstrap to modify the HadoopConfSpec") { + val hadoopSparkUserStep = new HadoopConfSparkUserStep(hadoopConfSparkUserBootstrap) + val hadoopConfSpec = HadoopConfigSpec( + Map.empty[String, String], + new Pod(), + new Container(), + Map.empty[String, String], + None, + "", + "") + val returnContainerSpec = hadoopSparkUserStep.configureContainers(hadoopConfSpec) + assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME) + assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStepSuite.scala new file mode 100644 index 0000000000000..7ae64f5bd34f2 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosKeytabResolverStepSuite.scala @@ -0,0 +1,358 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + +import java.io.File +import java.security.PrivilegedExceptionAction +import java.util.UUID + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files +import io.fabric8.kubernetes.api.model._ +import org.apache.commons.codec.binary.Base64 +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier +import org.apache.hadoop.io.Text +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.{Token, TokenIdentifier} +import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier +import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.Matchers.{any, eq => mockitoEq} +import org.mockito.Mockito.{verify, when} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SparkConf, SparkException, SparkFunSuite} +import org.apache.spark.deploy.k8s.HadoopUGIUtilImpl +import org.apache.spark.deploy.k8s.constants._ +import org.apache.spark.util.{Clock, SystemClock, Utils} + + +private[spark] class HadoopKerberosKeytabResolverStepSuite + extends SparkFunSuite with BeforeAndAfter{ + private val clock: Clock = new SystemClock() + private val POD_LABEL = Map("bootstrap" -> "true") + private val DRIVER_CONTAINER_NAME = "driver-container" + private val TEMP_KEYTAB_FILE = createTempFile("keytab") + private val KERB_PRINCIPAL = "user@k8s.com" + private val SPARK_USER_VALUE = "sparkUser" + private val TEST_TOKEN_VALUE = "data" + private def getByteArray(input: String) = input.toCharArray.map(_.toByte) + private val TEST_DATA = getByteArray(TEST_TOKEN_VALUE) + private val OUTPUT_TEST_DATA = Base64.encodeBase64String(TEST_DATA) + private val TEST_TOKEN_SERVICE = new Text("hdfsService") + private val TEST_TOKEN = + new Token[DelegationTokenIdentifier](TEST_DATA, TEST_DATA, + DelegationTokenIdentifier.HDFS_DELEGATION_KIND, TEST_TOKEN_SERVICE) + private val INTERVAL = 500L + private val CURR_TIME = clock.getTimeMillis() + private val KUBE_TEST_NAME = "spark-testing" + private val DATA_KEY_NAME = + s"$KERBEROS_SECRET_LABEL_PREFIX-$CURR_TIME-$INTERVAL" + private val SECRET_NAME = + s"$KUBE_TEST_NAME-$KERBEROS_DELEGEGATION_TOKEN_SECRET_NAME.$CURR_TIME" + + private val hadoopUGI = new HadoopUGIUtilImpl + + @Mock + private var fileSystem: FileSystem = _ + + @Mock + private var hadoopUtil: HadoopUGIUtilImpl = _ + + @Mock + private var ugi: UserGroupInformation = _ + + @Mock + private var token: Token[AbstractDelegationTokenIdentifier] = _ + + @Mock + private var identifier: AbstractDelegationTokenIdentifier = _ + + before { + MockitoAnnotations.initMocks(this) + when(hadoopUtil.loginUserFromKeytabAndReturnUGI(any[String], any[String])) + .thenAnswer(new Answer[UserGroupInformation] { + override def answer(invocation: InvocationOnMock): UserGroupInformation = { + ugi + } + }) + when(hadoopUtil.getCurrentUser).thenReturn(ugi) + when(ugi.getShortUserName).thenReturn(SPARK_USER_VALUE) + when(hadoopUtil.getShortUserName).thenReturn(SPARK_USER_VALUE) + when(hadoopUtil.getFileSystem(any[Configuration])).thenReturn(fileSystem) + val tokens = Iterable[Token[_ <: TokenIdentifier]](token) + when(hadoopUtil.serialize(any())) + .thenReturn(TEST_DATA) + when(token.decodeIdentifier()).thenReturn(identifier) + when(hadoopUtil.getCurrentTime).thenReturn(CURR_TIME) + when(hadoopUtil.getTokenRenewalInterval(mockitoEq(tokens), + any[Configuration])).thenReturn(Some(INTERVAL)) + } + + test("Testing error catching for security enabling") { + when(hadoopUtil.isSecurityEnabled).thenReturn(false) + val keytabStep = new HadoopKerberosKeytabResolverStep( + KUBE_TEST_NAME, + new SparkConf(), + Some(KERB_PRINCIPAL), + Some(TEMP_KEYTAB_FILE), + None, + hadoopUtil) + val hadoopConfSpec = HadoopConfigSpec( + Map.empty[String, String], + new PodBuilder() + .withNewMetadata() + .addToLabels("bootstrap", "true") + .endMetadata() + .withNewSpec().endSpec() + .build(), + new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(), + Map.empty[String, String], + None, + "", + "") + withClue("Security was not enabled as true for Kerberos conf") { + intercept[SparkException]{keytabStep.configureContainers(hadoopConfSpec)} + } + } + + test("Testing error catching for no token catching") { + when(hadoopUtil.isSecurityEnabled).thenReturn(false) + when(ugi.doAs(any(classOf[PrivilegedExceptionAction[Iterable[Token[_ <: TokenIdentifier]]]]))) + .thenReturn(Iterable[Token[_ <: TokenIdentifier]]()) + val keytabStep = new HadoopKerberosKeytabResolverStep( + KUBE_TEST_NAME, + new SparkConf(), + Some(KERB_PRINCIPAL), + Some(TEMP_KEYTAB_FILE), + None, + hadoopUtil) + val hadoopConfSpec = HadoopConfigSpec( + Map.empty[String, String], + new PodBuilder() + .withNewMetadata() + .addToLabels("bootstrap", "true") + .endMetadata() + .withNewSpec().endSpec() + .build(), + new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(), + Map.empty[String, String], + None, + "", + "") + withClue("Error Catching for No Token Catching") { + intercept[SparkException]{keytabStep.configureContainers(hadoopConfSpec)} + } + } + + test("Testing keytab login with Principal and Keytab") { + when(hadoopUtil.isSecurityEnabled).thenReturn(true) + when(ugi.doAs(any(classOf[PrivilegedExceptionAction[Iterable[Token[_ <: TokenIdentifier]]]]))) + .thenReturn(Iterable[Token[_ <: TokenIdentifier]](token)) + val creds = new Credentials() + when(ugi.getCredentials).thenReturn(creds) + val actionCaptor: ArgumentCaptor[ + PrivilegedExceptionAction[Iterable[Token[_ <: TokenIdentifier]]]] = + ArgumentCaptor.forClass( + classOf[PrivilegedExceptionAction[Iterable[Token[_ <: TokenIdentifier]]]]) + val keytabStep = new HadoopKerberosKeytabResolverStep( + KUBE_TEST_NAME, + new SparkConf(), + Some(KERB_PRINCIPAL), + Some(TEMP_KEYTAB_FILE), + None, + hadoopUtil) + val hadoopConfSpec = HadoopConfigSpec( + Map.empty[String, String], + new PodBuilder() + .withNewMetadata() + .addToLabels("bootstrap", "true") + .endMetadata() + .withNewSpec().endSpec() + .build(), + new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(), + Map.empty[String, String], + None, + "", + "") + val returnContainerSpec = keytabStep.configureContainers(hadoopConfSpec) + verify(ugi).doAs(actionCaptor.capture()) + val action = actionCaptor.getValue + when(hadoopUtil.dfsAddDelegationToken(mockitoEq(fileSystem), + any[Configuration], + mockitoEq(SPARK_USER_VALUE), + any())).thenAnswer(new Answer[Iterable[Token[_ <: TokenIdentifier]]] { + override def answer(invocation: InvocationOnMock) + : Iterable[Token[_ <: TokenIdentifier]] = { + creds.addToken(TEST_TOKEN_SERVICE, TEST_TOKEN) + Iterable[Token[_ <: TokenIdentifier]](TEST_TOKEN) + } + }) + // TODO: ACTION.run() is still not calling the above function + // assert(action.run() == Iterable[Token[_ <: TokenIdentifier]](TEST_TOKEN)) + assert(returnContainerSpec.additionalDriverSparkConf(KERBEROS_KEYTAB_SECRET_KEY) + .contains(KERBEROS_SECRET_LABEL_PREFIX)) + assert(returnContainerSpec.additionalDriverSparkConf === + Map(KERBEROS_KEYTAB_SECRET_KEY -> DATA_KEY_NAME, + KERBEROS_KEYTAB_SECRET_NAME -> SECRET_NAME)) + assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME) + assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL) + assert(returnContainerSpec.dtSecretItemKey === DATA_KEY_NAME) + assert(returnContainerSpec.dtSecret.get.getData.asScala === Map( + DATA_KEY_NAME -> OUTPUT_TEST_DATA)) + assert(returnContainerSpec.dtSecretName === SECRET_NAME) + assert(returnContainerSpec.dtSecret.get.getMetadata.getLabels.asScala === + Map(KERBEROS_REFRESH_LABEL_KEY -> KERBEROS_REFRESH_LABEL_VALUE)) + assert(returnContainerSpec.dtSecret.nonEmpty) + assert(returnContainerSpec.dtSecret.get.getMetadata.getName === SECRET_NAME) + } + + test("Testing keytab login w/o Principal and Keytab") { + when(hadoopUtil.isSecurityEnabled).thenReturn(true) + when(ugi.doAs(any(classOf[PrivilegedExceptionAction[Iterable[Token[_ <: TokenIdentifier]]]]))) + .thenReturn(Iterable[Token[_ <: TokenIdentifier]](token)) + val creds = new Credentials() + when(ugi.getCredentials).thenReturn(creds) + val actionCaptor: ArgumentCaptor[ + PrivilegedExceptionAction[Iterable[Token[_ <: TokenIdentifier]]]] = + ArgumentCaptor.forClass( + classOf[PrivilegedExceptionAction[Iterable[Token[_ <: TokenIdentifier]]]]) + val keytabStep = new HadoopKerberosKeytabResolverStep( + KUBE_TEST_NAME, + new SparkConf(), + None, + None, + None, + hadoopUtil) + val hadoopConfSpec = HadoopConfigSpec( + Map.empty[String, String], + new PodBuilder() + .withNewMetadata() + .addToLabels("bootstrap", "true") + .endMetadata() + .withNewSpec().endSpec() + .build(), + new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(), + Map.empty[String, String], + None, + "", + "") + val returnContainerSpec = keytabStep.configureContainers(hadoopConfSpec) + verify(ugi).doAs(actionCaptor.capture()) + val action = actionCaptor.getValue + when(hadoopUtil.dfsAddDelegationToken(mockitoEq(fileSystem), + any[Configuration], + mockitoEq(SPARK_USER_VALUE), + any())).thenAnswer(new Answer[Iterable[Token[_ <: TokenIdentifier]]] { + override def answer(invocation: InvocationOnMock) + : Iterable[Token[_ <: TokenIdentifier]] = { + creds.addToken(TEST_TOKEN_SERVICE, TEST_TOKEN) + Iterable[Token[_ <: TokenIdentifier]](TEST_TOKEN) + } + }) + // TODO: ACTION.run() is still not calling the above function + // assert(action.run() == Iterable[Token[_ <: TokenIdentifier]](TEST_TOKEN)) + assert(returnContainerSpec.additionalDriverSparkConf(KERBEROS_KEYTAB_SECRET_KEY) + .contains(KERBEROS_SECRET_LABEL_PREFIX)) + assert(returnContainerSpec.additionalDriverSparkConf === + Map(KERBEROS_KEYTAB_SECRET_KEY -> DATA_KEY_NAME, + KERBEROS_KEYTAB_SECRET_NAME -> SECRET_NAME)) + assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME) + assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL) + assert(returnContainerSpec.dtSecretItemKey === DATA_KEY_NAME) + assert(returnContainerSpec.dtSecret.get.getData.asScala === Map( + DATA_KEY_NAME -> OUTPUT_TEST_DATA)) + assert(returnContainerSpec.dtSecretName === SECRET_NAME) + assert(returnContainerSpec.dtSecret.get.getMetadata.getLabels.asScala === + Map(KERBEROS_REFRESH_LABEL_KEY -> KERBEROS_REFRESH_LABEL_VALUE)) + assert(returnContainerSpec.dtSecret.nonEmpty) + assert(returnContainerSpec.dtSecret.get.getMetadata.getName === SECRET_NAME) + } + + test("Testing keytab login with Principal, Keytab, and Renewer Principle") { + when(hadoopUtil.isSecurityEnabled).thenReturn(true) + when(ugi.doAs(any(classOf[PrivilegedExceptionAction[Iterable[Token[_ <: TokenIdentifier]]]]))) + .thenReturn(Iterable[Token[_ <: TokenIdentifier]](token)) + val creds = new Credentials() + when(ugi.getCredentials).thenReturn(creds) + val actionCaptor: ArgumentCaptor[ + PrivilegedExceptionAction[Iterable[Token[_ <: TokenIdentifier]]]] = + ArgumentCaptor.forClass( + classOf[PrivilegedExceptionAction[Iterable[Token[_ <: TokenIdentifier]]]]) + val keytabStep = new HadoopKerberosKeytabResolverStep( + KUBE_TEST_NAME, + new SparkConf(), + Some(KERB_PRINCIPAL), + Some(TEMP_KEYTAB_FILE), + Some("SHORT_NAME"), + hadoopUtil) + val hadoopConfSpec = HadoopConfigSpec( + Map.empty[String, String], + new PodBuilder() + .withNewMetadata() + .addToLabels("bootstrap", "true") + .endMetadata() + .withNewSpec().endSpec() + .build(), + new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(), + Map.empty[String, String], + None, + "", + "") + val returnContainerSpec = keytabStep.configureContainers(hadoopConfSpec) + verify(ugi).doAs(actionCaptor.capture()) + val action = actionCaptor.getValue + when(hadoopUtil.dfsAddDelegationToken(mockitoEq(fileSystem), + any[Configuration], + mockitoEq("SHORT_NAME"), + any())).thenAnswer(new Answer[Iterable[Token[_ <: TokenIdentifier]]] { + override def answer(invocation: InvocationOnMock) + : Iterable[Token[_ <: TokenIdentifier]] = { + creds.addToken(TEST_TOKEN_SERVICE, TEST_TOKEN) + Iterable[Token[_ <: TokenIdentifier]](TEST_TOKEN) + } + }) + // TODO: ACTION.run() is still not calling the above function + // assert(action.run() == Iterable[Token[_ <: TokenIdentifier]](TEST_TOKEN)) + assert(returnContainerSpec.additionalDriverSparkConf(KERBEROS_KEYTAB_SECRET_KEY) + .contains(KERBEROS_SECRET_LABEL_PREFIX)) + assert(returnContainerSpec.additionalDriverSparkConf === + Map(KERBEROS_KEYTAB_SECRET_KEY -> DATA_KEY_NAME, + KERBEROS_KEYTAB_SECRET_NAME -> SECRET_NAME)) + assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME) + assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL) + assert(returnContainerSpec.dtSecretItemKey === DATA_KEY_NAME) + assert(returnContainerSpec.dtSecret.get.getData.asScala === Map( + DATA_KEY_NAME -> OUTPUT_TEST_DATA)) + assert(returnContainerSpec.dtSecretName === SECRET_NAME) + assert(returnContainerSpec.dtSecret.get.getMetadata.getLabels.asScala === + Map(KERBEROS_REFRESH_LABEL_KEY -> KERBEROS_REFRESH_LABEL_VALUE)) + assert(returnContainerSpec.dtSecret.nonEmpty) + assert(returnContainerSpec.dtSecret.get.getMetadata.getName === SECRET_NAME) + } + + private def createTempFile(contents: String): File = { + val dir = Utils.createTempDir() + val file = new File(dir, s"${UUID.randomUUID().toString}") + Files.write(contents.getBytes, file) + file + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStepSuite.scala new file mode 100644 index 0000000000000..5215704a317d3 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopKerberosSecretResolverStepSuite.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.constants._ + +private[spark] class HadoopKerberosSecretResolverStepSuite extends SparkFunSuite { + private val CONFIG_MAP_NAME = "config-map" + private val HADOOP_CONF_DIR_VAL = "/etc/hadoop" + private val POD_LABEL = Map("bootstrap" -> "true") + private val DRIVER_CONTAINER_NAME = "driver-container" + private val TOKEN_SECRET_NAME = "secretName" + private val TOKEN_SECRET_DATA_ITEM_KEY = "secretItemKey" + + test("Testing kerberos with Secret") { + val keytabStep = new HadoopKerberosSecretResolverStep( + new SparkConf(), + TOKEN_SECRET_NAME, + TOKEN_SECRET_DATA_ITEM_KEY) + val expectedDriverSparkConf = Map( + KERBEROS_KEYTAB_SECRET_KEY -> TOKEN_SECRET_DATA_ITEM_KEY, + KERBEROS_KEYTAB_SECRET_NAME -> TOKEN_SECRET_NAME) + val hadoopConfSpec = HadoopConfigSpec( + Map.empty[String, String], + new PodBuilder() + .withNewMetadata() + .addToLabels("bootstrap", "true") + .endMetadata() + .withNewSpec().endSpec() + .build(), + new ContainerBuilder().withName(DRIVER_CONTAINER_NAME).build(), + Map.empty[String, String], + None, + "", + "") + val returnContainerSpec = keytabStep.configureContainers(hadoopConfSpec) + assert(expectedDriverSparkConf === returnContainerSpec.additionalDriverSparkConf) + assert(returnContainerSpec.driverContainer.getName == DRIVER_CONTAINER_NAME) + assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL) + assert(returnContainerSpec.dtSecret === None) + assert(returnContainerSpec.dtSecretItemKey === TOKEN_SECRET_DATA_ITEM_KEY) + assert(returnContainerSpec.dtSecretName === TOKEN_SECRET_NAME) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala new file mode 100644 index 0000000000000..b7701b12c5b0c --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/submitsteps/hadoopsteps/HadoopStepsOrchestratorSuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.submitsteps.hadoopsteps + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.config._ + +private[spark] class HadoopStepsOrchestratorSuite extends SparkFunSuite { + private val NAMESPACE = "testNamespace" + private val HADOOP_CONFIG_MAP = "hadoop-config-map" + private val HADOOP_CONF_DIR_VAL = "/etc/hadoop/conf" + private val KUBE_PREFIX = "spark-test" + + test("Testing without Kerberos") { + val sparkTestConf = new SparkConf(true) + .set(KUBERNETES_KERBEROS_SUPPORT, false) + val hadoopOrchestrator = new HadoopStepsOrchestrator( + KUBE_PREFIX, + NAMESPACE, + HADOOP_CONFIG_MAP, + sparkTestConf, + HADOOP_CONF_DIR_VAL) + val steps = hadoopOrchestrator.getHadoopSteps() + assert(steps.length === 2) + assert(steps.head.isInstanceOf[HadoopConfMounterStep]) + assert(steps(1).isInstanceOf[HadoopConfSparkUserStep]) + } + + test("Testing with Keytab Kerberos Login") { + val sparkTestConf = new SparkConf(true) + .set(KUBERNETES_KERBEROS_SUPPORT, true) + .set(KUBERNETES_KERBEROS_KEYTAB, "keytab.file") + .set(KUBERNETES_KERBEROS_PRINCIPAL, "user@kerberos") + val hadoopOrchestrator = new HadoopStepsOrchestrator( + KUBE_PREFIX, + NAMESPACE, + HADOOP_CONFIG_MAP, + sparkTestConf, + HADOOP_CONF_DIR_VAL) + val steps = hadoopOrchestrator.getHadoopSteps() + assert(steps.length === 2) + assert(steps.head.isInstanceOf[HadoopConfMounterStep]) + assert(steps(1).isInstanceOf[HadoopKerberosKeytabResolverStep]) + } + + test("Testing with kinit Kerberos Login") { + val sparkTestConf = new SparkConf(true) + .set(KUBERNETES_KERBEROS_SUPPORT, true) + val hadoopOrchestrator = new HadoopStepsOrchestrator( + KUBE_PREFIX, + NAMESPACE, + HADOOP_CONFIG_MAP, + sparkTestConf, + HADOOP_CONF_DIR_VAL) + val steps = hadoopOrchestrator.getHadoopSteps() + assert(steps.length === 2) + assert(steps.head.isInstanceOf[HadoopConfMounterStep]) + assert(steps(1).isInstanceOf[HadoopKerberosKeytabResolverStep]) + } + + test("Testing with Secret stored Kerberos") { + val sparkTestConf = new SparkConf(true) + .set(KUBERNETES_KERBEROS_SUPPORT, true) + .set(KUBERNETES_KERBEROS_DT_SECRET_NAME, "dtSecret") + .set(KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY, "dtItemKey") + val hadoopOrchestrator = new HadoopStepsOrchestrator( + KUBE_PREFIX, + NAMESPACE, + HADOOP_CONFIG_MAP, + sparkTestConf, + HADOOP_CONF_DIR_VAL) + val steps = hadoopOrchestrator.getHadoopSteps() + assert(steps.length === 2) + assert(steps.head.isInstanceOf[HadoopConfMounterStep]) + assert(steps(1).isInstanceOf[HadoopKerberosSecretResolverStep]) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 14cadb4acaa0f..47493c827ddb5 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -16,19 +16,27 @@ */ package org.apache.spark.scheduler.cluster.k8s +import java.io.File +import java.util.UUID + +import scala.collection.JavaConverters._ + +import com.google.common.base.Charsets +import com.google.common.io.Files import io.fabric8.kubernetes.api.model.{Pod, PodBuilder, VolumeBuilder, VolumeMountBuilder} +import io.fabric8.kubernetes.api.model.KeyToPathBuilder import io.fabric8.kubernetes.client.KubernetesClient import org.mockito.{AdditionalAnswers, Mock, Mockito, MockitoAnnotations} import org.mockito.Matchers.any import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} -import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} +import org.apache.spark.deploy.k8s.{HadoopConfBootstrapImpl, HadoopConfSparkUserBootstrapImpl, HadoopUGIUtilImpl, KerberosTokenConfBootstrapImpl, PodWithDetachedInitContainer, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.k8s.config._ import org.apache.spark.deploy.k8s.constants._ import org.apache.spark.deploy.k8s.submit.{MountSecretsBootstrapImpl, MountSmallFilesBootstrap, MountSmallFilesBootstrapImpl} +import org.apache.spark.util.Utils class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach { private val driverPodName: String = "driver-pod" @@ -56,6 +64,9 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef @Mock private var executorLocalDirVolumeProvider: ExecutorLocalDirVolumeProvider = _ + @Mock + private var hadoopUGI: HadoopUGIUtilImpl = _ + before { MockitoAnnotations.initMocks(this) baseConf = new SparkConf() @@ -78,7 +89,10 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, - executorLocalDirVolumeProvider) + executorLocalDirVolumeProvider, + None, + None, + None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -119,7 +133,10 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, - executorLocalDirVolumeProvider) + executorLocalDirVolumeProvider, + None, + None, + None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -141,7 +158,10 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, - executorLocalDirVolumeProvider) + executorLocalDirVolumeProvider, + None, + None, + None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -176,7 +196,10 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef Some(initContainerBootstrap), None, None, - executorLocalDirVolumeProvider) + executorLocalDirVolumeProvider, + None, + None, + None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -203,7 +226,10 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef Some(initContainerBootstrap), Some(secretsBootstrap), None, - executorLocalDirVolumeProvider) + executorLocalDirVolumeProvider, + None, + None, + None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -239,7 +265,10 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, - executorLocalDirVolumeProvider) + executorLocalDirVolumeProvider, + None, + None, + None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) assert(executor.getSpec.getVolumes.size === 1) @@ -251,7 +280,6 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef test("Small-files add a secret & secret volume mount to the container") { val conf = baseConf.clone() - val smallFiles = new MountSmallFilesBootstrapImpl("secret1", "/var/secret1") val factory = new ExecutorPodFactoryImpl( conf, @@ -261,7 +289,10 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, - executorLocalDirVolumeProvider) + executorLocalDirVolumeProvider, + None, + None, + None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -295,7 +326,10 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef None, None, None, - executorLocalDirVolumeProvider) + executorLocalDirVolumeProvider, + None, + None, + None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) @@ -309,6 +343,132 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef checkOwnerReferences(executor, driverPodUid) } + test("check that hadoop bootstrap mounts files w/o SPARK_USER") { + when(hadoopUGI.getShortUserName).thenReturn("test-user") + val conf = baseConf.clone() + val configName = "hadoop-test" + val hadoopFile = createTempFile + val hadoopFiles = Seq(hadoopFile) + val hadoopBootsrap = new HadoopConfBootstrapImpl( + hadoopConfConfigMapName = configName, + hadoopConfigFiles = hadoopFiles, + hadoopUGI = hadoopUGI) + + val factory = new ExecutorPodFactoryImpl( + conf, + nodeAffinityExecutorPodModifier, + None, + None, + None, + None, + None, + executorLocalDirVolumeProvider, + Some(hadoopBootsrap), + None, + None) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) + + checkEnv(executor, + Map(ENV_HADOOP_CONF_DIR -> HADOOP_CONF_DIR_PATH, + "qux" -> "quux")) + checkOwnerReferences(executor, driverPodUid) + checkConfigMapVolumes(executor, + HADOOP_FILE_VOLUME, + configName, + hadoopFile.toPath.getFileName.toString) + checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH) + } + + test("check that hadoop bootstrap mounts files w/ SPARK_USER") { + when(hadoopUGI.getShortUserName).thenReturn("test-user") + val conf = baseConf.clone() + val configName = "hadoop-test" + val hadoopFile = createTempFile + val hadoopFiles = Seq(hadoopFile) + val hadoopBootstrap = new HadoopConfBootstrapImpl( + hadoopConfConfigMapName = configName, + hadoopConfigFiles = hadoopFiles, + hadoopUGI = hadoopUGI) + val hadoopUserBootstrap = new HadoopConfSparkUserBootstrapImpl(hadoopUGI) + + val factory = new ExecutorPodFactoryImpl( + conf, + nodeAffinityExecutorPodModifier, + None, + None, + None, + None, + None, + executorLocalDirVolumeProvider, + Some(hadoopBootstrap), + None, + Some(hadoopUserBootstrap)) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) + + checkEnv(executor, + Map(ENV_SPARK_USER -> "test-user", + ENV_HADOOP_CONF_DIR -> HADOOP_CONF_DIR_PATH, + "qux" -> "quux")) + checkOwnerReferences(executor, driverPodUid) + checkConfigMapVolumes(executor, + HADOOP_FILE_VOLUME, + configName, + hadoopFile.toPath.getFileName.toString) + checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH) + } + + test("check that hadoop and kerberos bootstrap function properly") { + when(hadoopUGI.getShortUserName).thenReturn("test-user") + val conf = baseConf.clone() + val configName = "hadoop-test" + val hadoopFile = createTempFile + val hadoopFiles = Seq(hadoopFile) + val hadoopBootstrap = new HadoopConfBootstrapImpl( + hadoopConfConfigMapName = configName, + hadoopConfigFiles = hadoopFiles, + hadoopUGI = hadoopUGI) + val secretName = "secret-test" + val secretItemKey = "item-test" + val userName = "sparkUser" + val kerberosBootstrap = new KerberosTokenConfBootstrapImpl( + secretName = secretName, + secretItemKey = secretItemKey, + userName = userName) + val factory = new ExecutorPodFactoryImpl( + conf, + nodeAffinityExecutorPodModifier, + None, + None, + None, + None, + None, + executorLocalDirVolumeProvider, + Some(hadoopBootstrap), + Some(kerberosBootstrap), + None) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) + + checkEnv(executor, + Map(ENV_SPARK_USER -> userName, + ENV_HADOOP_CONF_DIR -> HADOOP_CONF_DIR_PATH, + ENV_HADOOP_TOKEN_FILE_LOCATION -> + s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey", + "qux" -> "quux")) + checkOwnerReferences(executor, driverPodUid) + checkConfigMapVolumes(executor, + HADOOP_FILE_VOLUME, + configName, + hadoopFile.toPath.getFileName.toString) + checkSecretVolumes(executor, SPARK_APP_HADOOP_SECRET_VOLUME_NAME, secretName) + checkVolumeMounts(executor, HADOOP_FILE_VOLUME, HADOOP_CONF_DIR_PATH) + checkVolumeMounts(executor, + SPARK_APP_HADOOP_SECRET_VOLUME_NAME, + SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR) + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) @@ -335,4 +495,39 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef }.toMap assert(defaultEnvs === mapEnvs) } + + private def checkVolumeMounts(executor: Pod, name: String, mountPath: String) : Unit = { + assert(executor.getSpec.getContainers.size() === 1) + val volumeMount = executor.getSpec.getContainers + .get(0).getVolumeMounts.asScala.find(_.getName == name) + assert(volumeMount.nonEmpty) + assert(volumeMount.get.getMountPath == mountPath) + } + + private def checkConfigMapVolumes(executor: Pod, + volName: String, + configMapName: String, + content: String) : Unit = { + val volume = executor.getSpec.getVolumes.asScala.find(_.getName == volName) + assert(volume.nonEmpty) + assert(volume.get.getConfigMap.getName == configMapName) + assert(volume.get.getConfigMap.getItems.asScala.find(_.getKey == content).get == + new KeyToPathBuilder() + .withKey(content) + .withPath(content).build() ) + } + + private def checkSecretVolumes(executor: Pod, volName: String, secretName: String) : Unit = { + val volume = executor.getSpec.getVolumes.asScala.find(_.getName == volName) + assert(volume.nonEmpty) + assert(volume.get.getSecret.getSecretName == secretName) + } + + // Creates temp files for the purpose of testing file mounting + private def createTempFile: File = { + val dir = Utils.createTempDir() + val file = new File(dir, s"${UUID.randomUUID().toString}") + Files.write(UUID.randomUUID().toString, file, Charsets.UTF_8) + file + } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 9d48d488bf967..50ee46c93b592 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -366,7 +366,8 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { mainAppResource = appResource, mainClass = mainClass, driverArgs = appArgs, - otherPyFiles = otherPyFiles) + otherPyFiles = otherPyFiles, + hadoopConfDir = None) Client.run(sparkConf, clientArguments) val driverPod = kubernetesTestComponents.kubernetesClient .pods()