Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Basic Secure HDFS Support [514] #540

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ object SparkSubmit extends CommandLineUtils {
}

// assure a keytab is available from any place in a JVM
if (clusterManager == YARN || clusterManager == LOCAL) {
if (clusterManager == YARN || clusterManager == KUBERNETES || clusterManager == LOCAL) {
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when principal is specified")
if (!new File(args.keytab).exists()) {
Expand Down
56 changes: 55 additions & 1 deletion docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,61 @@ from the other deployment modes. See the [configuration page](configuration.html
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.enabled</code></td>
<td>false</td>
<td>
Specify whether your job requires a Kerberos Authentication to access HDFS. By default, we
will assume that you will not require secure HDFS access.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.keytab</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you
may login by running <code>kinit</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.principal</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you
may login by running <code>kinit</code> before running the spark-submit, and the submission client
will look within your local TGT cache to resolve this.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.renewer.principal</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as
we will set the principal to be the job users principal by default.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.name</code></td>
<td>(none)</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
the name of the secret where your existing delegation token data is stored. You must also specify the
item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret.
This is optional in the case that you want to use pre-existing secret, otherwise a new secret will be automatically
created.
</td>
</tr>
<tr>
<td><code>spark.kubernetes.kerberos.tokensecret.itemkey</code></td>
<td>spark.kubernetes.kerberos.dt.label</td>
<td>
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious. Is the token refresh server supposed to renew this pre-populated token as well? Or is it supposed to be renewed by the job user? We may want to comment on that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The token refresh server is supposed to renew this pre-populated token. The assumption is that if you supply a pre-populated token it will be automatically updated by either an administrator or the token refresh server. In the later PR if you think, you should probably note this.

the data item key name within the pre-specified secret where the data of your existing delegation token data is stored.
We have a default value of <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> should you not include it. But
you should always include this if you are proposing a pre-existing secret contain the delegation token data.
<td><code>spark.executorEnv.[EnvironmentVariableName]</code></td>
<td>(none)</td>
<td>
Expand Down Expand Up @@ -791,4 +846,3 @@ from the other deployment modes. See the [configuration page](configuration.html
Running Spark on Kubernetes is currently an experimental feature. Some restrictions on the current implementation that
should be lifted in the future include:
* Applications can only run in cluster mode.
* Only Scala and Java applications can be run.
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import java.io.File

import scala.collection.JavaConverters._
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imports are not consistent with the rest of the project. Order should be as follows everywhere:

  1. java.io.*
  2. Empty Space
  3. Everything that isn't java.io.* or org.apache.spark.* (this includes scala.*)
  4. Empty space
  5. org.apache.spark.*

Please look over all files and fix all imports.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was curious about the import order. According to http://spark.apache.org/contributing.html, the recommended import order is slightly different. scala.* and other 3rd parties libraries are separated by an empty space. Do we know which one is correct?

In addition, sort imports in the following order (use alphabetical order within each group):

  • java.* and javax.*
  • scala.*
  • Third-party libraries (org., com., etc)
  • Project classes (org.apache.spark.*)

An example from the same page:

import java.*
import javax.*
 
import scala.*
 
import *
 
import org.apache.spark.*

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually @kimoonkim and I think our code is incorrect in most places.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mccheah Cool. Should we then follow the import order suggested in http://spark.apache.org/contributing.html going forward?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we should. We can fix the ordering as we merge upstream.


import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder}

import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.internal.Logging

/**
* This is separated out from the HadoopConf steps API because this component can be reused to
* set up the Hadoop Configuration for executors as well.
*/
private[spark] trait HadoopConfBootstrap {
/**
* Bootstraps a main container with the ConfigMaps containing Hadoop config files
* mounted as volumes and an ENV variable pointing to the mounted file.
*/
def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer
}

private[spark] class HadoopConfBootstrapImpl(
hadoopConfConfigMapName: String,
hadoopConfigFiles: Seq[File],
hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging {

override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
logInfo("HADOOP_CONF_DIR defined. Mounting Hadoop specific files")
val keyPaths = hadoopConfigFiles.map { file =>
val fileStringPath = file.toPath.getFileName.toString
new KeyToPathBuilder()
.withKey(fileStringPath)
.withPath(fileStringPath)
.build() }
val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editSpec()
.addNewVolume()
.withName(HADOOP_FILE_VOLUME)
.withNewConfigMap()
.withName(hadoopConfConfigMapName)
.withItems(keyPaths.asJava)
.endConfigMap()
.endVolume()
.endSpec()
.build()
val hadoopSupportedContainer = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewVolumeMount()
.withName(HADOOP_FILE_VOLUME)
.withMountPath(HADOOP_CONF_DIR_PATH)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_CONF_DIR)
.withValue(HADOOP_CONF_DIR_PATH)
.endEnv()
.build()

originalPodWithMainContainer.copy(
pod = hadoopSupportedPod,
mainContainer = hadoopSupportedContainer)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.ContainerBuilder

import org.apache.spark.deploy.k8s.constants._

// This trait is responsible for setting ENV_SPARK_USER when HADOOP_FILES are detected
// however, this step would not be run if Kerberos is enabled, as Kerberos sets SPARK_USER
private[spark] trait HadoopConfSparkUserBootstrap {
def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer
}

private[spark] class HadoopConfSparkUserBootstrapImpl(hadoopUGIUtil: HadoopUGIUtil)
extends HadoopConfSparkUserBootstrap {

override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer = {
val envModifiedContainer = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(hadoopUGIUtil.getShortUserName)
.endEnv()
.build()
originalPodWithMainContainer.copy(
pod = originalPodWithMainContainer.pod,
mainContainer = envModifiedContainer)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}

import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.util.{Clock, SystemClock, Utils}

private[spark] trait HadoopUGIUtil {
def getCurrentUser: UserGroupInformation
def getShortUserName: String
def getFileSystem(hadoopConf: Configuration): FileSystem
def isSecurityEnabled: Boolean
def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String) :
UserGroupInformation
def dfsAddDelegationToken(fileSystem: FileSystem,
hadoopConf: Configuration,
renewer: String,
creds: Credentials) : Iterable[Token[_ <: TokenIdentifier]]
def getCurrentTime: Long
def getTokenRenewalInterval(
renewedTokens: Iterable[Token[_ <: TokenIdentifier]],
hadoopConf: Configuration) : Option[Long]
def serialize(creds: Credentials): Array[Byte]
def deserialize(tokenBytes: Array[Byte]): Credentials
}

private[spark] class HadoopUGIUtilImpl extends HadoopUGIUtil {

private val clock: Clock = new SystemClock()
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser
def getShortUserName : String = getCurrentUser.getShortUserName
def getFileSystem(hadoopConf: Configuration): FileSystem = FileSystem.get(hadoopConf)
def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled

def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)

def dfsAddDelegationToken(fileSystem: FileSystem,
hadoopConf: Configuration,
renewer: String,
creds: Credentials) : Iterable[Token[_ <: TokenIdentifier]] =
fileSystem.addDelegationTokens(renewer, creds)

def getCurrentTime: Long = clock.getTimeMillis()

// Functions that should be in Core with Rebase to 2.3
@deprecated("Moved to core in 2.3", "2.3")
def getTokenRenewalInterval(
renewedTokens: Iterable[Token[_ <: TokenIdentifier]],
hadoopConf: Configuration): Option[Long] = {
val renewIntervals = renewedTokens.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
}.flatMap { token =>
Try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
interval
}.toOption
}
renewIntervals.reduceLeftOption(_ min _)
}

@deprecated("Moved to core in 2.3", "2.3")
def serialize(creds: Credentials): Array[Byte] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. It has business logic that should be tested than just being mocked. Move to some other class?

Utils.tryWithResource(new ByteArrayOutputStream()) { byteStream =>
Utils.tryWithResource(new DataOutputStream(byteStream)) { dataStream =>
creds.writeTokenStorageToStream(dataStream)
}
byteStream.toByteArray
}
}

@deprecated("Moved to core in 2.3", "2.3")
def deserialize(tokenBytes: Array[Byte]): Credentials = {
val creds = new Credentials()
Utils.tryWithResource(new ByteArrayInputStream(tokenBytes)) { byteStream =>
Utils.tryWithResource(new DataInputStream(byteStream)) { dataStream =>
creds.readTokenStorageStream(dataStream)
}
}
creds
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder}

import org.apache.spark.deploy.k8s.constants._
import org.apache.spark.internal.Logging


/**
* This is separated out from the HadoopConf steps API because this component can be reused to
* mounted the DT secret for executors as well.
*/
private[spark] trait KerberosTokenConfBootstrap {
// Bootstraps a main container with the Secret mounted as volumes and an ENV variable
// pointing to the mounted file containing the DT for Secure HDFS interaction
def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer)
: PodWithMainContainer
}

private[spark] class KerberosTokenConfBootstrapImpl(
secretName: String,
secretItemKey: String,
userName: String) extends KerberosTokenConfBootstrap with Logging {

override def bootstrapMainContainerAndVolumes(
originalPodWithMainContainer: PodWithMainContainer) : PodWithMainContainer = {
logInfo(s"Mounting HDFS DT from Secret $secretName for Secure HDFS")
val secretMountedPod = new PodBuilder(originalPodWithMainContainer.pod)
.editOrNewSpec()
.addNewVolume()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(secretName)
.endSecret()
.endVolume()
.endSpec()
.build()
// TODO: ENV_HADOOP_TOKEN_FILE_LOCATION should point to the latest token data item key.
val secretMountedContainer = new ContainerBuilder(
originalPodWithMainContainer.mainContainer)
.addNewVolumeMount()
.withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME)
.withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR)
.endVolumeMount()
.addNewEnv()
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION)
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey")
.endEnv()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(userName)
.endEnv()
.build()
originalPodWithMainContainer.copy(
pod = secretMountedPod,
mainContainer = secretMountedContainer)
}
}
Loading