-
Notifications
You must be signed in to change notification settings - Fork 118
Basic Secure HDFS Support [514] #540
Basic Secure HDFS Support [514] #540
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have gone through about half of the PR. Haven't touched much of the Kerberos logic though. Will need more time on that.
docs/running-on-kubernetes.md
Outdated
<td>(none)</td> | ||
<td> | ||
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify | ||
the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as you |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete "you".
.withNewConfigMap() | ||
.withName(hadoopConfConfigMapName) | ||
.withItems(keyPaths.asJava) | ||
.endConfigMap() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong indention.
@deprecated("Moved to core in 2.3", "2.3") | ||
def serialize(creds: Credentials): Array[Byte] = { | ||
val byteStream = new ByteArrayOutputStream | ||
val dataStream = new DataOutputStream(byteStream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is writeTokenStorageToStream
calling close
on dataStream
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
handled this below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Utils.tryWithResource
. That will close even if an exception is thrown.
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey") | ||
.endEnv() | ||
.addNewEnv() | ||
.withName(ENV_SPARK_USER) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HadoopConfBootstrapImpl
also sets SPARK_USER
, but to hadoopUGI.getShortName
. So one will override the value set by the other, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SPARK_USER could be different from Job User so yes we are overwriting it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we resolve it in one place and set it consistently everywhere? Right now the ordering and overwriting is ambiguous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not ambiguous as there are scenarios where the UGI could be either the Job User or taken from the TGT
|
||
// Bootstrapping dependencies with the init-container | ||
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We changed to use the new initContainer
field in Kubernetes 1.8 and removed this annotation in #528.
@@ -81,6 +81,9 @@ private[spark] class Client( | |||
|
|||
private val driverJavaOptions = submissionSparkConf.get( | |||
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) | |||
private val isKerberosEnabled = submissionSparkConf.get(KUBERNETES_KERBEROS_SUPPORT) | |||
private val maybeSimpleAuthentication = | |||
if (isKerberosEnabled) Some(s"-D$HADOOP_SECURITY_AUTHENTICATION=simple") else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the value be kberberos
or simple
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this the case? The intuition is to set this to kerberos
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. The same question was asked for the executor side as well. Copying my answer below. I'd love to hear your opinions about this:
Excellent question.
If this is set to "kerberos", then UserGroupInformation code crashes complaining it cannot read kerberos config files like /etc/krb5.conf. So we want to prevent that by suppressing the code path.
As an alternative, we could create a config map containing /etc/krb5.conf and mount it in the driver and executor pods. But that seems to be an overkill.
Now, there is another question. How can setting it to "simple" allows the driver and executor to access secure HDFS? It works because the driver and executor need only the delegation token to access secure HDFS. i.e. They don't need to sign on to Kerberos on their own.
This is counter-intuitive and hard to explain. I am open to suggestions to make this part easier to read. Maybe we can call the associated variable like maybeTokenOnlyAuthentication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine as long as we document this.
driverContainer = bootstrappedPodAndMainContainer.mainContainer, | ||
configMapProperties = | ||
hadoopConfigurationFiles.map(file => | ||
(file.toPath.getFileName.toString, readFileToString(file))).toMap, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the encoding of the strings from the files matter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No it doesn't. I thought this method was the most simple
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this version of readFileToString
has been deprecated. https://commons.apache.org/proper/commons-io/javadocs/api-2.5/org/apache/commons/io/FileUtils.html#readFileToString(java.io.File). I would suggest using https://commons.apache.org/proper/commons-io/javadocs/api-2.5/org/apache/commons/io/FileUtils.html#readFileToString(java.io.File,%20java.lang.String) with UTF-8
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or you can use Guava's Files.toString
https://google.github.io/guava/releases/19.0/api/docs/com/google/common/io/Files.html#toString(java.io.File, java.nio.charset.Charset) with https://google.github.io/guava/releases/19.0/api/docs/com/google/common/base/Charsets.html#UTF_8.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always hard encode to UTF-8. It's unclear between JVMs if the default encoding will be consistent. +1 for using Guava's Files.toString
. I think we read files into string contents elsewhere in the codebase - be consistent with those.
maybePrincipal: Option[String], | ||
maybeKeytab: Option[File], | ||
maybeRenewerPrincipal: Option[String], | ||
hadoopUGI: HadoopUGIUtil) extends HadoopConfigurationStep with Logging{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add an space after Logging
.
|
||
override def configureContainers(hadoopConfigSpec: HadoopConfigSpec): HadoopConfigSpec = { | ||
val hadoopConf = SparkHadoopUtil.get.newConfiguration(submissionSparkConf) | ||
if (hadoopUGI.isSecurityEnabled) logDebug("Hadoop not configured with Kerberos") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hadoop not configured with Kerberos
or Hadoop configured with Kerberos
?
keytab <- maybeKeytab | ||
} yield { | ||
// Not necessary with [Spark-16742] | ||
// Reliant on [Spark-20328] for changing to YARN principal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need more time to understand this.
@@ -54,6 +57,9 @@ private[spark] class ExecutorPodFactoryImpl( | |||
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) | |||
private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) | |||
|
|||
private val isKerberosEnabled = sparkConf.get(KUBERNETES_KERBEROS_SUPPORT) | |||
private val maybeSimpleAuthentication = | |||
if (isKerberosEnabled) Some(s"-D$HADOOP_SECURITY_AUTHENTICATION=simple") else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems simple
is the default. I don't quite understand why you need to set it to simple
when Kerberos is enabled. Can you elaborate on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent question.
If this is set to "kerberos", then UserGroupInformation code crashes complaining it cannot read kerberos config files like /etc/krb5.conf. So we want to prevent that by suppressing the code path.
As an alternative, we could create a config map containing /etc/krb5.conf and mount it in the driver and executor pods. But that seems to be an overkill.
Now, there is another question. How can setting it to "simple" allows the driver and executor to access secure HDFS? It works because the driver and executor need only the delegation token to access secure HDFS. i.e. They don't need to sign on to Kerberos on their own.
This is counter-intuitive and hard to explain. I am open to suggestions to make this part easier to read. Maybe we can call the associated variable like maybeTokenOnlyAuthentication
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or at least some comments on why this is set to simple
would be very helpful.
val bootstrapKerberos = new KerberosTokenConfBootstrapImpl( | ||
tokenSecretName, | ||
tokenItemKeyName, | ||
UserGroupInformation.getCurrentUser.getShortUserName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why you use UserGroupInformation.getCurrentUser.getShortUserName
here but jobUserUGI.getShortUserName
in HadoopKerberosKeytabResolverStep
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 - make this consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jobUserUGI is different from the UGI obtained in the LTC via UserGroupInformation.getCurrentUser
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it.
override def bootstrapMainContainerAndVolumes( | ||
originalPodWithMainContainer: PodWithMainContainer) | ||
: PodWithMainContainer = { | ||
logInfo("HADOOP_CONF_DIR defined. Mounting HDFS specific .xml files") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HDFS
-> Hadoop
.
Seq(hadoopConfMounterStep) ++ maybeKerberosStep.toSeq | ||
} | ||
|
||
private def getHadoopConfFiles(path: String) : Seq[File] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we filter out .xml
files only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean the goal is to mount all files in the hadoop conf directory. .xml
or not. But if we wish to filter we can do that as well
@@ -67,6 +67,7 @@ private[spark] class HadoopUGIUtil{ | |||
val byteStream = new ByteArrayOutputStream | |||
val dataStream = new DataOutputStream(byteStream) | |||
creds.writeTokenStorageToStream(dataStream) | |||
dataStream.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure this is called even if creds.writeTokenStorageToStream(dataStream)
throws an exception (unlikely but still worth considering). Not sure what's the best practice to do this in Scala.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -107,7 +106,7 @@ package object constants { | |||
private[spark] val ENV_HADOOP_CONF_DIR = "HADOOP_CONF_DIR" | |||
private[spark] val HADOOP_CONF_DIR_LOC = "spark.kubernetes.hadoop.conf.dir" | |||
private[spark] val HADOOP_CONFIG_MAP_SPARK_CONF_NAME = | |||
"spark.kubernetes.hadoop.executor.hadoopconfigmapname" | |||
"spark.kubernetes.hadoop.executor.hadoopConfigMapName" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the same for properties below.
maybeKeytab: Option[File], | ||
maybeRenewerPrincipal: Option[String], | ||
hadoopUGI: HadoopUGIUtil) extends HadoopConfigurationStep with Logging { | ||
private var originalCredentials: Credentials = _ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid using var
in general
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are Hadoop objects, in java, that are being modified, I believe that I need var.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only have one method in this class - can't all of the fields be defined as vals
as they are being created?
namespace: String, | ||
hadoopConfigMapName: String, | ||
submissionSparkConf: SparkConf, | ||
hadoopConfDir: String) extends Logging{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space between Logging
and {
at the end of the line
.endMetadata() | ||
.addToData(currentHadoopSpec.configMapProperties.asJava) | ||
.build() | ||
val executorSparkConf = driverSpec.driverSparkConf.clone() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a more clear name - especially since driverSparkConf = executorSparkConf
below doesn't read very well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But that is because we want the executorSparkConf to clone the driverSparkConf and append extra EnvVariables. I don't see what is wrong with naming convention here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think we want driverSparkConfWithExecutorSetup
or something like that. Basically we want to say "This is the driver's spark configuration, that configures executors to behave such and such a way". Current naming suggests that this is the SparkConf that the executor itself will get.
docs/running-on-kubernetes.md
Outdated
</td> | ||
</tr> | ||
<tr> | ||
<td><code>spark.kubernetes.kerberos.rewewer.principal</code></td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo. s/rewewer/renewer/
<td><code>spark.kubernetes.kerberos.tokensecret.itemkey</code></td> | ||
<td>spark.kubernetes.kerberos.dt.label</td> | ||
<td> | ||
Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious. Is the token refresh server supposed to renew this pre-populated token as well? Or is it supposed to be renewed by the job user? We may want to comment on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The token refresh server is supposed to renew this pre-populated token. The assumption is that if you supply a pre-populated token it will be automatically updated by either an administrator or the token refresh server. In the later PR if you think, you should probably note this.
val keyPaths = hadoopConfigFiles.map(file => | ||
new KeyToPathBuilder() | ||
.withKey(file.toPath.getFileName.toString) | ||
.withPath(file.toPath.getFileName.toString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file.toPath.getFileName.toString
is used repeatedly at line 53 and 54. Extract to a val
, say fileName
at a line before 52 and use the variable in these lines?
.endVolume() | ||
.endSpec() | ||
.build() | ||
val mainContainerWithMountedHadoopConf = new ContainerBuilder( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe s/mainContainerWithMountedHadoopConf/hadoopSupportedContainer/ to be consistent with hadoopSupportedPod
at line 56?
private[spark] class HadoopUGIUtil{ | ||
def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser | ||
|
||
def getShortName: String = getCurrentUser.getShortUserName |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need this method. There is only one caller. And the caller can easily just do hadoopUgiUtil.getCurrentUser.getShortUserName
itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is used purely for mocking
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should wrap a minimal set of methods for mocking. And we already wrap getCurrentUser
, so this wrapping is unnecessary. Besides, this method name getShortName
makes the caller code a bit difficult to read by masking that the short name is for the current user. The reader will question "short name of what?".
|
||
// Functions that should be in Core with Rebase to 2.3 | ||
@deprecated("Moved to core in 2.3", "2.3") | ||
def getTokenRenewalInterval( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 30 says "Function of this class is merely for mocking reasons". But it seems this function has real business logic, more than just mocking purpose. Move it to some other class?
} | ||
|
||
@deprecated("Moved to core in 2.3", "2.3") | ||
def serialize(creds: Credentials): Array[Byte] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto. It has business logic that should be tested than just being mocked. Move to some other class?
@@ -52,6 +54,7 @@ private[spark] class DriverConfigurationStepsOrchestrator( | |||
private val filesDownloadPath = submissionSparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION) | |||
private val dockerImagePullPolicy = submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY) | |||
private val initContainerConfigMapName = s"$kubernetesResourceNamePrefix-init-config" | |||
private val hadoopConfigMapName = s"$kubernetesResourceNamePrefix-hadoop-config" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also name the auto-generated secret using kubernetesResourceNamePrefix and pass it down below so that secret is named after the job name? So that it is easier to find which secret is used by which spark job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We find secrets based on labels... secret name is irrelevant tho...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
although, you are right... this could be helpful. It will break my unit tests... but I guess it is worth for the sake of naming conventions :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generated secret needs to have a unique name, and we've been using the kubernetes resource name prefix to guarantee uniqueness everywhere.
hadoopUGI.getTokenRenewalInterval(tokens, hadoopConf).getOrElse(Long.MaxValue) | ||
val currentTime: Long = hadoopUGI.getCurrentTime | ||
val initialTokenDataKeyName = s"$KERBEROS_SECRET_LABEL_PREFIX-$currentTime-$renewalInterval" | ||
val uniqueSecretName = s"$HADOOP_KERBEROS_SECRET_NAME.$currentTime" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we name this using $kubernetesResourceNamePrefix like the hadoop config map name so it's easier to tell which secret is for which Spark job?
new SecretBuilder() | ||
.withNewMetadata() | ||
.withName(uniqueSecretName) | ||
.withLabels(Map("refresh-hadoop-tokens" -> "yes").asJava) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe put "refresh-hadoop-tokens" and "yes" in named constants and indicate that they are expected by the token refresh server in the constant names and/or comments?
@@ -54,6 +57,9 @@ private[spark] class ExecutorPodFactoryImpl( | |||
org.apache.spark.internal.config.EXECUTOR_CLASS_PATH) | |||
private val executorJarsDownloadDir = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) | |||
|
|||
private val isKerberosEnabled = sparkConf.get(KUBERNETES_KERBEROS_SUPPORT) | |||
private val maybeSimpleAuthentication = | |||
if (isKerberosEnabled) Some(s"-D$HADOOP_SECURITY_AUTHENTICATION=simple") else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent question.
If this is set to "kerberos", then UserGroupInformation code crashes complaining it cannot read kerberos config files like /etc/krb5.conf. So we want to prevent that by suppressing the code path.
As an alternative, we could create a config map containing /etc/krb5.conf and mount it in the driver and executor pods. But that seems to be an overkill.
Now, there is another question. How can setting it to "simple" allows the driver and executor to access secure HDFS? It works because the driver and executor need only the delegation token to access secure HDFS. i.e. They don't need to sign on to Kerberos on their own.
This is counter-intuitive and hard to explain. I am open to suggestions to make this part easier to read. Maybe we can call the associated variable like maybeTokenOnlyAuthentication
.
.endVolumeMount() | ||
.addNewEnv() | ||
.withName(ENV_HADOOP_TOKEN_FILE_LOCATION) | ||
.withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized we have an edge case that this will fail. Imagine a job ran for many weeks and the refresh server added new weekly tokens. And also imagine the dynamic allocation is enabled and new executors are launching.
Those new executors should use the latest token, not the initial token. i.e. ENV_HADOOP_TOKEN_FILE_LOCATION should point to the latest token data item key.
I don't know how we can solve this yet. And we should probably address it later in a follow-up PR that we'll write for picking up the new token. Maybe add a TODO here so we don't forget this?
private[spark] class HadoopConfBootstrapImpl( | ||
hadoopConfConfigMapName: String, | ||
hadoopConfigFiles: Seq[File], | ||
hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a space after Logging
.
@deprecated("Moved to core in 2.3", "2.3") | ||
def deserialize(tokenBytes: Array[Byte]): Credentials = { | ||
val creds = new Credentials() | ||
creds.readTokenStorageStream(new DataInputStream(new ByteArrayInputStream(tokenBytes))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DataInputStream
also needs to be closed.
private[spark] class KerberosTokenConfBootstrapImpl( | ||
secretName: String, | ||
secretItemKey: String, | ||
userName: String) extends KerberosTokenConfBootstrap with Logging{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Space after Logging
.
creds.readTokenStorageStream(new DataInputStream(new ByteArrayInputStream(tokenBytes))) | ||
val dataStream = new DataInputStream(new ByteArrayInputStream(tokenBytes)) | ||
creds.readTokenStorageStream(dataStream) | ||
dataStream.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wraps close in a finally
block.
|
||
import java.io.File | ||
|
||
import scala.collection.JavaConverters._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imports are not consistent with the rest of the project. Order should be as follows everywhere:
- java.io.*
- Empty Space
- Everything that isn't java.io.* or org.apache.spark.* (this includes scala.*)
- Empty space
- org.apache.spark.*
Please look over all files and fix all imports.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was curious about the import order. According to http://spark.apache.org/contributing.html, the recommended import order is slightly different. scala.* and other 3rd parties libraries are separated by an empty space. Do we know which one is correct?
In addition, sort imports in the following order (use alphabetical order within each group):
- java.* and javax.*
- scala.*
- Third-party libraries (org., com., etc)
- Project classes (org.apache.spark.*)
An example from the same page:
import java.*
import javax.*
import scala.*
import *
import org.apache.spark.*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually @kimoonkim and I think our code is incorrect in most places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mccheah Cool. Should we then follow the import order suggested in http://spark.apache.org/contributing.html going forward?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we should. We can fix the ordering as we merge upstream.
* mounted as volumes and an ENV variable pointing to the mounted file. | ||
*/ | ||
def bootstrapMainContainerAndVolumes( | ||
originalPodWithMainContainer: PodWithMainContainer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this all fit on one line?
|
||
|
||
// Function of this class is merely for mocking reasons | ||
private[spark] class HadoopUGIUtil{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put a trait
over this and extend the trait. Then, only mock the trait.
|
||
def dfsAddDelegationToken(hadoopConf: Configuration, renewer: String, creds: Credentials) | ||
: Iterable[Token[_ <: TokenIdentifier]] = | ||
FileSystem.get(hadoopConf).addDelegationTokens(renewer, creds) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just return a FileSystem
,the test can mock the FileSystem
object, and then call addDelegationTokens
on the mock FileSystem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How exactly can this be done? This has been tripping me up, as I am trying to mock this FileSystem object but with no luck (while ensuring that it passes Integration tests)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can add a method to this class like:
def getFileSystem(hadoopConf: Configuration): FileSystem = FileSystem.get(hadoopConf)
@deprecated("Moved to core in 2.3", "2.3") | ||
def deserialize(tokenBytes: Array[Byte]): Credentials = { | ||
val creds = new Credentials() | ||
val dataStream = new DataInputStream(new ByteArrayInputStream(tokenBytes)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Utils.tryWithResource
.
// Bootstraps a main container with the Secret mounted as volumes and an ENV variable | ||
// pointing to the mounted file containing the DT for Secure HDFS interaction | ||
def bootstrapMainContainerAndVolumes( | ||
originalPodWithMainContainer: PodWithMainContainer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation is off here - think we want this line and the next line indented in one more.
userName: String) extends KerberosTokenConfBootstrap with Logging { | ||
|
||
override def bootstrapMainContainerAndVolumes( | ||
originalPodWithMainContainer: PodWithMainContainer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation is off here, indent in one more along with the line below.
def isFile(file: File) = if (file.isFile) Some(file) else None | ||
val dir = new File(path) | ||
if (dir.isDirectory) { | ||
dir.listFiles.flatMap { file => isFile(file) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be simple enough to inline the isFile
method. Alternatively: Some(file).filter(_.isFile)
No unit tests exist for the new classes - were those drafted up in the original as well? Can we include them here? Some of the utility classes created for testing lack that context. |
Unit tests were a massive portion of the PR that would almost double line count. Should I include them in this or separate PR? |
rerun integration tests please |
1 similar comment
rerun integration tests please |
6ee8b1e
to
765455d
Compare
rerun integration tests please |
Rerun integration tests please |
rerun integration tests please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of my comments are addressed in the latest commit. LGTM.
Thanks for writing this change, @ifilonenko!
override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer) | ||
: PodWithMainContainer = { | ||
logInfo("HADOOP_CONF_DIR defined. Mounting Hadoop specific .xml files") | ||
val keyPaths = hadoopConfigFiles.map{ file => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty space after map
.
|
||
override def bootstrapMainContainerAndVolumes(originalPodWithMainContainer: PodWithMainContainer) | ||
: PodWithMainContainer = { | ||
logInfo("HADOOP_CONF_DIR defined. Mounting Hadoop specific .xml files") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we filter .xml
files only?
.withValue(HADOOP_CONF_DIR_PATH) | ||
.endEnv() | ||
.build() | ||
originalPodWithMainContainer.copy( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: put an empty line before the returned value.
private[spark] val KUBERNETES_KERBEROS_KEYTAB = | ||
ConfigBuilder("spark.kubernetes.kerberos.keytab") | ||
.doc("Specify the location of keytab" + | ||
" for Kerberos in order to access Secure HDFS") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty space at the end of the first part. Ditto below.
@@ -67,7 +68,8 @@ private[spark] object ClientArguments { | |||
mainAppResource.get, | |||
otherPyFiles, | |||
mainClass.get, | |||
driverArgs.toArray) | |||
driverArgs.toArray, | |||
sys.env.get("HADOOP_CONF_DIR")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use ENV_HADOOP_CONF_DIR
.
"-Dspark.logConf=true", | ||
s"-D${SecondTestConfigurationStep.sparkConfKey}=" + | ||
s"${SecondTestConfigurationStep.sparkConfValue}", | ||
s"-XX:+HeapDumpOnOutOfMemoryError", | ||
s"-XX:+PrintGCDetails") | ||
s"-XX:+PrintGCDetails", | ||
"-Dspark.hadoop.hadoop.security.authentication=simple") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use HADOOP_SECURITY_AUTHENTICATION
.
@@ -199,6 +205,31 @@ private[spark] class DriverConfigurationStepsOrchestratorSuite extends SparkFunS | |||
classOf[LocalDirectoryMountConfigurationStep], | |||
classOf[MountSecretsStep]) | |||
} | |||
test("Submission steps with hdfs interaction and HADOOP_CONF_DIR defined") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line before.
assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL) | ||
assert(returnContainerSpec.configMapProperties === expectedConfigMap) | ||
} | ||
private def createTempFile(contents: String): File = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: empty line before.
)}}) | ||
} | ||
|
||
test("Test of mounting hadoop_conf_dir files into HadoopConfigSpec") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the description accurate and are we missing something in the here? I don't see asserts that are HadoopConfSparkUserBootstrap
specific.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description is not accurate. But for the steps, because we are leveraging the bootstrap method. This allows for us to mock the call to the bootstrap. As such, we can just mock the method with a label change.
any[Configuration])).thenReturn(Some(INTERVAL)) | ||
} | ||
|
||
test("Testing Error Catching for Security Enabling") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be consistent in using capitals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall the changes LGTM, with one minor comment.
"spark.kubernetes.hadoop.executor.hadoopConfigMapName" | ||
|
||
// Kerberos Configuration | ||
private[spark] val HADOOP_KERBEROS_SECRET_NAME = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also rename the constant val names?
Would like to merge since I have already recieved two LGTM. Any contentions / comments? |
Any more comments? If not, will merge by EOD today. |
…tion ids (apache-spark-on-k8s#540) We originally made the shuffle map output writer API behave like an iterator in fetching the "next" partition writer. However, the shuffle writer implementations tend to skip opening empty partitions. If we used an iterator-like API though we would be tied down to opening a partition writer for every single partition, even if some of them are empty. Here, we go back to using specific partition identifiers to give us more freedom to avoid needing to create writers for empty partitions.
…tion ids (apache-spark-on-k8s#540) We originally made the shuffle map output writer API behave like an iterator in fetching the "next" partition writer. However, the shuffle writer implementations tend to skip opening empty partitions. If we used an iterator-like API though we would be tied down to opening a partition writer for every single partition, even if some of them are empty. Here, we go back to using specific partition identifiers to give us more freedom to avoid needing to create writers for empty partitions.
What changes were proposed in this pull request?
This is the on-going work of setting up Secure HDFS interaction with Spark-on-K8S #514
The architecture is discussed in this community-wide google doc
This initiative can be broken down into 3 stages.
STAGE 1
HADOOP_CONF_DIR
environmental variable and using Config Maps to store all Hadoop config files locally, while also settingHADOOP_CONF_DIR
locally in the driver / executorsSTAGE 2
TGT
fromLTC
or using keytabs+principle and creating aDT
that will be mounted as a secretSTAGE 3