Skip to content

[NOSQUASH] Resync with Kubernetes #269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Oct 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
b61f495
Move executor pod construction to a separate class. (#452)
mccheah Sep 6, 2017
f28cb17
Added configuration properties to inject arbitrary secrets into the d…
liyinan926 Sep 7, 2017
c0e3a5e
Extract more of the shuffle management to a different class. (#454)
mccheah Sep 7, 2017
701fe97
Unit Tests for KubernetesClusterSchedulerBackend (#459)
mccheah Sep 8, 2017
c0df462
Use a headless service to give a hostname to the driver. (#483)
mccheah Sep 8, 2017
8fb4b5d
Code enhancement: Replaced explicit synchronized access to a hashmap …
varunkatta Sep 15, 2017
a771a02
docs: Fix path to spark-base Dockerfile (#493)
somcsel Sep 15, 2017
be394c6
Improve the image building workflow (#488)
foxish Sep 16, 2017
b84056e
Fail submission if submitter-local files are provided without resourc…
Sep 16, 2017
8b17246
Rename package to k8s (#497)
foxish Sep 21, 2017
84f4602
Added reference YAML files for RBAC configs for driver and shuffle se…
liyinan926 Sep 22, 2017
0dff8be
Removing deprecated configuration (#503)
liyinan926 Sep 22, 2017
c42952d
[SPARK-21642][CORE] Use FQDN for DRIVER_HOST_ADDRESS instead of ip ad…
akitanaka Aug 17, 2017
f0a50aa
Fix scalastyle
mccheah Sep 26, 2017
375fa39
Empty commit to retrigger build
mccheah Oct 12, 2017
3a9a591
Update poms for 2.2 release 0.4.0 (#508)
foxish Sep 25, 2017
8314cbc
Update POM to 0.5.0-SNAPSHOT (#512)
foxish Sep 26, 2017
0fba119
Add unit-testing for executorpodfactory (#491)
foxish Oct 10, 2017
b4e3880
Merge remote-tracking branch 'palantir/master' into mccheah/resync-kube
ash211 Oct 16, 2017
56310db
Fix compilation error with external shuffle client.
mccheah Oct 16, 2017
0e04996
Add hadoop delegation creds
mccheah Oct 16, 2017
ea36f4c
Fix missing import
mccheah Oct 16, 2017
6a39244
Fix more imports
mccheah Oct 16, 2017
a86c002
Fix more imports
mccheah Oct 16, 2017
7960510
Mount emptyDir volumes for temporary directories on executors in stat…
mccheah Oct 16, 2017
76a5544
Reorder imports
ash211 Oct 16, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,64 +17,13 @@

package org.apache.spark.network.shuffle.kubernetes;

import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.protocol.RegisterDriver;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;

/**
* A client for talking to the external shuffle service in Kubernetes cluster mode.
*
* This is used by the each Spark executor to register with a corresponding external
* shuffle service on the cluster. The purpose is for cleaning up shuffle files
* reliably if the application exits unexpectedly.
*/
public class KubernetesExternalShuffleClient extends ExternalShuffleClient {
private static final Logger logger = LoggerFactory
.getLogger(KubernetesExternalShuffleClient.class);

/**
* Creates an Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}.
* Please refer to docs on {@link ExternalShuffleClient} for more information.
*/
public KubernetesExternalShuffleClient(
TransportConf conf,
SecretKeyHolder secretKeyHolder,
boolean authEnabled,
long registrationTimeoutMs) {
super(conf, secretKeyHolder, authEnabled, registrationTimeoutMs);
}

public void registerDriverWithShuffleService(String host, int port)
throws IOException, InterruptedException {
checkInit();
ByteBuffer registerDriver = new RegisterDriver(appId, 0).toByteBuffer();
TransportClient client = clientFactory.createClient(host, port);
client.sendRpc(registerDriver, new RegisterDriverCallback());
}

private class RegisterDriverCallback implements RpcResponseCallback {
@Override
public void onSuccess(ByteBuffer response) {
logger.info("Successfully registered app " + appId + " with external shuffle service.");
}
public interface KubernetesExternalShuffleClient extends Closeable {

@Override
public void onFailure(Throwable e) {
logger.warn("Unable to register app " + appId + " with external shuffle service. " +
"Please manually remove shuffle data after driver exit. Error: " + e);
}
}
void init(String appId);

@Override
public void close() {
super.close();
}
void registerDriverWithShuffleService(String host, int port)
throws IOException, InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle.kubernetes;

import org.apache.spark.network.client.RpcResponseCallback;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.sasl.SecretKeyHolder;
import org.apache.spark.network.shuffle.ExternalShuffleClient;
import org.apache.spark.network.shuffle.protocol.RegisterDriver;
import org.apache.spark.network.util.TransportConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;

/**
* A client for talking to the external shuffle service in Kubernetes cluster mode.
*
* This is used by the each Spark executor to register with a corresponding external
* shuffle service on the cluster. The purpose is for cleaning up shuffle files
* reliably if the application exits unexpectedly.
*/
public class KubernetesExternalShuffleClientImpl
extends ExternalShuffleClient implements KubernetesExternalShuffleClient {

private static final Logger logger = LoggerFactory
.getLogger(KubernetesExternalShuffleClientImpl.class);

/**
* Creates a Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}.
* Please refer to docs on {@link ExternalShuffleClient} for more information.
*/
public KubernetesExternalShuffleClientImpl(
TransportConf conf,
SecretKeyHolder secretKeyHolder,
boolean saslEnabled,
long registrationTimeoutMs) {
super(conf, secretKeyHolder, saslEnabled, registrationTimeoutMs);
}

@Override
public void registerDriverWithShuffleService(String host, int port)
throws IOException, InterruptedException {
checkInit();
ByteBuffer registerDriver = new RegisterDriver(appId, 0).toByteBuffer();
TransportClient client = clientFactory.createClient(host, port);
client.sendRpc(registerDriver, new RegisterDriverCallback());
}

private class RegisterDriverCallback implements RpcResponseCallback {
@Override
public void onSuccess(ByteBuffer response) {
logger.info("Successfully registered app " + appId + " with external shuffle service.");
}

@Override
public void onFailure(Throwable e) {
logger.warn("Unable to register app " + appId + " with external shuffle service. " +
"Please manually remove shuffle data after driver exit. Error: " + e);
}
}
}
80 changes: 80 additions & 0 deletions conf/k8s-shuffle-service-rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

apiVersion: v1
kind: ServiceAccount
metadata:
name: spark-shuffle-service-service-account
namespace: default
labels:
app: spark-shuffle-service
spark-version: 2.2.0
---
apiVersion: extensions/v1beta1
kind: PodSecurityPolicy
metadata:
name: spark-shuffle-service-pod-security-policy
labels:
app: spark-shuffle-service
spark-version: 2.2.0
spec:
privileged: false
fsGroup:
rule: RunAsAny
runAsUser:
rule: RunAsAny
volumes:
- "hostPath"
- "secret"
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRole
metadata:
name: spark-shuffle-service-role
labels:
app: spark-shuffle-service
spark-version: 2.2.0
rules:
- apiGroups:
- "extensions"
resources:
- "podsecuritypolicies"
resourceNames:
- "spark-shuffle-service-pod-security-policy"
verbs:
- "use"
- apiGroups:
- "" # "" indicates the core API group
resources:
- "pods"
verbs:
- "get"
- "list"
- "watch"
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: spark-shuffle-service-role-binding
subjects:
- kind: ServiceAccount
name: spark-shuffle-service-service-account
namespace: default
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: spark-shuffle-service-role
55 changes: 55 additions & 0 deletions conf/k8s-spark-rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

apiVersion: v1
kind: ServiceAccount
metadata:
name: spark
namespace: default
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: Role
metadata:
namespace: default
name: spark-role
rules:
- apiGroups:
- "" # "" indicates the core API group
resources:
- "pods"
verbs:
- "*"
- apiGroups:
- "" # "" indicates the core API group
resources:
- "services"
verbs:
- "*"
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: RoleBinding
metadata:
name: spark-role-binding
namespace: default
subjects:
- kind: ServiceAccount
name: spark
namespace: default
roleRef:
kind: Role
name: spark-role
apiGroup: rbac.authorization.k8s.io
6 changes: 3 additions & 3 deletions conf/kubernetes-resource-staging-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#
---
apiVersion: extensions/v1beta1
apiVersion: apps/v1beta1
kind: Deployment
metadata:
name: spark-resource-staging-server
Expand All @@ -32,14 +32,14 @@ spec:
name: spark-resource-staging-server-config
containers:
- name: spark-resource-staging-server
image: kubespark/spark-resource-staging-server:v2.2.0-kubernetes-0.3.0
image: kubespark/spark-resource-staging-server:v2.2.0-kubernetes-0.4.0
resources:
requests:
cpu: 100m
memory: 256Mi
limits:
cpu: 100m
memory: 256Mi
memory: 1Gi
volumeMounts:
- name: resource-staging-server-properties
mountPath: '/etc/spark-resource-staging-server'
Expand Down
6 changes: 3 additions & 3 deletions conf/kubernetes-shuffle-service.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ spec:
volumes:
- name: temp-volume
hostPath:
path: '/var/tmp' # change this path according to your cluster configuration.
path: '/tmp/spark-local' # change this path according to your cluster configuration.
containers:
- name: shuffle
# This is an official image that is built
# from the dockerfiles/shuffle directory
# in the spark distribution.
image: kubespark/spark-shuffle:v2.2.0-kubernetes-0.3.0
image: kubespark/spark-shuffle:v2.2.0-kubernetes-0.4.0
imagePullPolicy: IfNotPresent
volumeMounts:
- mountPath: '/tmp'
- mountPath: '/tmp/spark-local'
name: temp-volume
# more volumes can be mounted here.
# The spark job must be configured to use these
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ object SparkSubmit extends CommandLineUtils with Logging {
}

if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
if (args.isPython) {
childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ package object config {
private[spark] val DRIVER_HOST_ADDRESS = ConfigBuilder("spark.driver.host")
.doc("Address of driver endpoints.")
.stringConf
.createWithDefault(Utils.localHostName())
.createWithDefault(Utils.localCanonicalHostName())

private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress")
.doc("Address where to bind network listen sockets on the driver.")
Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,13 @@ private[spark] object Utils extends Logging {
customHostname = Some(hostname)
}

/**
* Get the local machine's FQDN.
*/
def localCanonicalHostName(): String = {
customHostname.getOrElse(localIpAddress.getCanonicalHostName)
}

/**
* Get the local machine's hostname.
*/
Expand Down
Loading