Skip to content

Commit e9b2070

Browse files
liyinan926rxin
authored andcommitted
[SPARK-18278][SCHEDULER] Spark on Kubernetes - Basic Scheduler Backend
## What changes were proposed in this pull request? This is a stripped down version of the `KubernetesClusterSchedulerBackend` for Spark with the following components: - Static Allocation of Executors - Executor Pod Factory - Executor Recovery Semantics It's step 1 from the step-wise plan documented [here](apache-spark-on-k8s#441 (comment)). This addition is covered by the [SPIP vote](http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-td22147.html) which passed on Aug 31 . ## How was this patch tested? - The patch contains unit tests which are passing. - Manual testing: `./build/mvn -Pkubernetes clean package` succeeded. - It is a **subset** of the entire changelist hosted in http://github.com/apache-spark-on-k8s/spark which is in active use in several organizations. - There is integration testing enabled in the fork currently [hosted by PepperData](spark-k8s-jenkins.pepperdata.org:8080) which is being moved over to RiseLAB CI. - Detailed documentation on trying out the patch in its entirety is in: https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html cc rxin felixcheung mateiz (shepherd) k8s-big-data SIG members & contributors: mccheah ash211 ssuchter varunkatta kimoonkim erikerlandson liyinan926 tnachen ifilonenko Author: Yinan Li <[email protected]> Author: foxish <[email protected]> Author: mcheah <[email protected]> Closes #19468 from foxish/spark-kubernetes-3.
1 parent 475a29f commit e9b2070

File tree

22 files changed

+1832
-34
lines changed

22 files changed

+1832
-34
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ notifications:
4343
# 5. Run maven install before running lint-java.
4444
install:
4545
- export MAVEN_SKIP_RC=1
46-
- build/mvn -T 4 -q -DskipTests -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
46+
- build/mvn -T 4 -q -DskipTests -Pkubernetes -Pmesos -Pyarn -Pkinesis-asl -Phive -Phive-thriftserver install
4747

4848
# 6. Run lint-java.
4949
script:

NOTICE

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,12 @@ Copyright (C) 2011 Google Inc.
448448
Apache Commons Pool
449449
Copyright 1999-2009 The Apache Software Foundation
450450

451+
This product includes/uses Kubernetes & OpenShift 3 Java Client (https://github.com/fabric8io/kubernetes-client)
452+
Copyright (C) 2015 Red Hat, Inc.
453+
454+
This product includes/uses OkHttp (https://github.com/square/okhttp)
455+
Copyright (C) 2012 The Android Open Source Project
456+
451457
=========================================================================
452458
== NOTICE file corresponding to section 4(d) of the Apache License, ==
453459
== Version 2.0, in this case for the DataNucleus distribution. ==
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.scheduler.cluster
18+
19+
import org.apache.spark.SparkConf
20+
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS, EXECUTOR_INSTANCES}
21+
import org.apache.spark.util.Utils
22+
23+
private[spark] object SchedulerBackendUtils {
24+
val DEFAULT_NUMBER_EXECUTORS = 2
25+
26+
/**
27+
* Getting the initial target number of executors depends on whether dynamic allocation is
28+
* enabled.
29+
* If not using dynamic allocation it gets the number of executors requested by the user.
30+
*/
31+
def getInitialTargetExecutorNumber(
32+
conf: SparkConf,
33+
numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
34+
if (Utils.isDynamicAllocationEnabled(conf)) {
35+
val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
36+
val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
37+
val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
38+
require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
39+
s"initial executor number $initialNumExecutors must between min executor number " +
40+
s"$minNumExecutors and max executor number $maxNumExecutors")
41+
42+
initialNumExecutors
43+
} else {
44+
conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
45+
}
46+
}
47+
}

dev/sparktestsupport/modules.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,14 @@ def __hash__(self):
532532
sbt_test_goals=["mesos/test"]
533533
)
534534

535+
kubernetes = Module(
536+
name="kubernetes",
537+
dependencies=[],
538+
source_file_regexes=["resource-managers/kubernetes/core"],
539+
build_profile_flags=["-Pkubernetes"],
540+
sbt_test_goals=["kubernetes/test"]
541+
)
542+
535543
# The root module is a dummy module which is used to run all of the tests.
536544
# No other modules should directly depend on this module.
537545
root = Module(

docs/configuration.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1438,10 +1438,10 @@ Apart from these, the following properties are also available, and may be useful
14381438
</tr>
14391439
<tr>
14401440
<td><code>spark.scheduler.minRegisteredResourcesRatio</code></td>
1441-
<td>0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td>
1441+
<td>0.8 for KUBERNETES mode; 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode</td>
14421442
<td>
14431443
The minimum ratio of registered resources (registered resources / total expected resources)
1444-
(resources are executors in yarn mode, CPU cores in standalone mode and Mesos coarsed-grained
1444+
(resources are executors in yarn mode and Kubernetes mode, CPU cores in standalone mode and Mesos coarsed-grained
14451445
mode ['spark.cores.max' value is total expected resources for Mesos coarse-grained mode] )
14461446
to wait for before scheduling begins. Specified as a double between 0.0 and 1.0.
14471447
Regardless of whether the minimum ratio of resources has been reached,

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2664,6 +2664,13 @@
26642664
</modules>
26652665
</profile>
26662666

2667+
<profile>
2668+
<id>kubernetes</id>
2669+
<modules>
2670+
<module>resource-managers/kubernetes/core</module>
2671+
</modules>
2672+
</profile>
2673+
26672674
<profile>
26682675
<id>hive-thriftserver</id>
26692676
<modules>

project/SparkBuild.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,11 @@ object BuildCommons {
5353
"tags", "sketch", "kvstore"
5454
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
5555

56-
val optionallyEnabledProjects@Seq(mesos, yarn,
56+
val optionallyEnabledProjects@Seq(kubernetes, mesos, yarn,
5757
streamingFlumeSink, streamingFlume,
5858
streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
5959
dockerIntegrationTests, hadoopCloud) =
60-
Seq("mesos", "yarn",
60+
Seq("kubernetes", "mesos", "yarn",
6161
"streaming-flume-sink", "streaming-flume",
6262
"streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
6363
"docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))
@@ -671,9 +671,9 @@ object Unidoc {
671671
publish := {},
672672

673673
unidocProjectFilter in(ScalaUnidoc, unidoc) :=
674-
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
674+
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
675675
unidocProjectFilter in(JavaUnidoc, unidoc) :=
676-
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn, tags, streamingKafka010, sqlKafka010),
676+
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, kubernetes, yarn, tags, streamingKafka010, sqlKafka010),
677677

678678
unidocAllClasspaths in (ScalaUnidoc, unidoc) := {
679679
ignoreClasspaths((unidocAllClasspaths in (ScalaUnidoc, unidoc)).value)
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Licensed to the Apache Software Foundation (ASF) under one or more
4+
~ contributor license agreements. See the NOTICE file distributed with
5+
~ this work for additional information regarding copyright ownership.
6+
~ The ASF licenses this file to You under the Apache License, Version 2.0
7+
~ (the "License"); you may not use this file except in compliance with
8+
~ the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
19+
<modelVersion>4.0.0</modelVersion>
20+
<parent>
21+
<groupId>org.apache.spark</groupId>
22+
<artifactId>spark-parent_2.11</artifactId>
23+
<version>2.3.0-SNAPSHOT</version>
24+
<relativePath>../../../pom.xml</relativePath>
25+
</parent>
26+
27+
<artifactId>spark-kubernetes_2.11</artifactId>
28+
<packaging>jar</packaging>
29+
<name>Spark Project Kubernetes</name>
30+
<properties>
31+
<sbt.project.name>kubernetes</sbt.project.name>
32+
<kubernetes.client.version>3.0.0</kubernetes.client.version>
33+
</properties>
34+
35+
<dependencies>
36+
<dependency>
37+
<groupId>org.apache.spark</groupId>
38+
<artifactId>spark-core_${scala.binary.version}</artifactId>
39+
<version>${project.version}</version>
40+
</dependency>
41+
42+
<dependency>
43+
<groupId>org.apache.spark</groupId>
44+
<artifactId>spark-core_${scala.binary.version}</artifactId>
45+
<version>${project.version}</version>
46+
<type>test-jar</type>
47+
<scope>test</scope>
48+
</dependency>
49+
50+
<dependency>
51+
<groupId>io.fabric8</groupId>
52+
<artifactId>kubernetes-client</artifactId>
53+
<version>${kubernetes.client.version}</version>
54+
<exclusions>
55+
<exclusion>
56+
<groupId>com.fasterxml.jackson.core</groupId>
57+
<artifactId>*</artifactId>
58+
</exclusion>
59+
<exclusion>
60+
<groupId>com.fasterxml.jackson.dataformat</groupId>
61+
<artifactId>jackson-dataformat-yaml</artifactId>
62+
</exclusion>
63+
</exclusions>
64+
</dependency>
65+
66+
<!-- Required by kubernetes-client but we exclude it -->
67+
<dependency>
68+
<groupId>com.fasterxml.jackson.dataformat</groupId>
69+
<artifactId>jackson-dataformat-yaml</artifactId>
70+
<version>${fasterxml.jackson.version}</version>
71+
</dependency>
72+
73+
<!-- Explicitly depend on shaded dependencies from the parent, since shaded deps aren't transitive -->
74+
<dependency>
75+
<groupId>com.google.guava</groupId>
76+
<artifactId>guava</artifactId>
77+
</dependency>
78+
<!-- End of shaded deps. -->
79+
80+
<dependency>
81+
<groupId>org.mockito</groupId>
82+
<artifactId>mockito-core</artifactId>
83+
<scope>test</scope>
84+
</dependency>
85+
86+
<dependency>
87+
<groupId>com.squareup.okhttp3</groupId>
88+
<artifactId>okhttp</artifactId>
89+
<version>3.8.1</version>
90+
</dependency>
91+
92+
</dependencies>
93+
94+
95+
<build>
96+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
97+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
98+
</build>
99+
100+
</project>
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s
18+
19+
import org.apache.spark.internal.Logging
20+
import org.apache.spark.internal.config.ConfigBuilder
21+
import org.apache.spark.network.util.ByteUnit
22+
23+
private[spark] object Config extends Logging {
24+
25+
val KUBERNETES_NAMESPACE =
26+
ConfigBuilder("spark.kubernetes.namespace")
27+
.doc("The namespace that will be used for running the driver and executor pods. When using " +
28+
"spark-submit in cluster mode, this can also be passed to spark-submit via the " +
29+
"--kubernetes-namespace command line argument.")
30+
.stringConf
31+
.createWithDefault("default")
32+
33+
val EXECUTOR_DOCKER_IMAGE =
34+
ConfigBuilder("spark.kubernetes.executor.docker.image")
35+
.doc("Docker image to use for the executors. Specify this using the standard Docker tag " +
36+
"format.")
37+
.stringConf
38+
.createOptional
39+
40+
val DOCKER_IMAGE_PULL_POLICY =
41+
ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
42+
.doc("Kubernetes image pull policy. Valid values are Always, Never, and IfNotPresent.")
43+
.stringConf
44+
.checkValues(Set("Always", "Never", "IfNotPresent"))
45+
.createWithDefault("IfNotPresent")
46+
47+
val APISERVER_AUTH_DRIVER_CONF_PREFIX =
48+
"spark.kubernetes.authenticate.driver"
49+
val APISERVER_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
50+
"spark.kubernetes.authenticate.driver.mounted"
51+
val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
52+
val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
53+
val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
54+
val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
55+
val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
56+
57+
val KUBERNETES_SERVICE_ACCOUNT_NAME =
58+
ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
59+
.doc("Service account that is used when running the driver pod. The driver pod uses " +
60+
"this service account when requesting executor pods from the API server. If specific " +
61+
"credentials are given for the driver pod to use, the driver will favor " +
62+
"using those credentials instead.")
63+
.stringConf
64+
.createOptional
65+
66+
// Note that while we set a default for this when we start up the
67+
// scheduler, the specific default value is dynamically determined
68+
// based on the executor memory.
69+
val KUBERNETES_EXECUTOR_MEMORY_OVERHEAD =
70+
ConfigBuilder("spark.kubernetes.executor.memoryOverhead")
71+
.doc("The amount of off-heap memory (in megabytes) to be allocated per executor. This " +
72+
"is memory that accounts for things like VM overheads, interned strings, other native " +
73+
"overheads, etc. This tends to grow with the executor size. (typically 6-10%).")
74+
.bytesConf(ByteUnit.MiB)
75+
.createOptional
76+
77+
val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
78+
val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation."
79+
80+
val KUBERNETES_DRIVER_POD_NAME =
81+
ConfigBuilder("spark.kubernetes.driver.pod.name")
82+
.doc("Name of the driver pod.")
83+
.stringConf
84+
.createOptional
85+
86+
val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
87+
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
88+
.doc("Prefix to use in front of the executor pod names.")
89+
.internal()
90+
.stringConf
91+
.createWithDefault("spark")
92+
93+
val KUBERNETES_ALLOCATION_BATCH_SIZE =
94+
ConfigBuilder("spark.kubernetes.allocation.batch.size")
95+
.doc("Number of pods to launch at once in each round of executor allocation.")
96+
.intConf
97+
.checkValue(value => value > 0, "Allocation batch size should be a positive integer")
98+
.createWithDefault(5)
99+
100+
val KUBERNETES_ALLOCATION_BATCH_DELAY =
101+
ConfigBuilder("spark.kubernetes.allocation.batch.delay")
102+
.doc("Number of seconds to wait between each round of executor allocation.")
103+
.longConf
104+
.checkValue(value => value > 0, "Allocation batch delay should be a positive integer")
105+
.createWithDefault(1)
106+
107+
val KUBERNETES_EXECUTOR_LIMIT_CORES =
108+
ConfigBuilder("spark.kubernetes.executor.limit.cores")
109+
.doc("Specify the hard cpu limit for a single executor pod")
110+
.stringConf
111+
.createOptional
112+
113+
val KUBERNETES_EXECUTOR_LOST_REASON_CHECK_MAX_ATTEMPTS =
114+
ConfigBuilder("spark.kubernetes.executor.lostCheck.maxAttempts")
115+
.doc("Maximum number of attempts allowed for checking the reason of an executor loss " +
116+
"before it is assumed that the executor failed.")
117+
.intConf
118+
.checkValue(value => value > 0, "Maximum attempts of checks of executor lost reason " +
119+
"must be a positive integer")
120+
.createWithDefault(10)
121+
122+
val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
123+
}

0 commit comments

Comments
 (0)