Skip to content

Commit 0483ba5

Browse files
committed
[FLINK-3675][yarn] improvements to library shipping
- always ship the lib folder - properly setup the classpath from the supplied ship files - cleanup deploy() method of YarnClusterDescriptor - add test case This closes apache#2187
1 parent 0e8be41 commit 0483ba5

File tree

17 files changed

+352
-168
lines changed

17 files changed

+352
-168
lines changed

flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ public class CliFrontend {
113113
private static final String ACTION_SAVEPOINT = "savepoint";
114114

115115
// config dir parameters
116-
public static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR";
117116
private static final String CONFIG_DIRECTORY_FALLBACK_1 = "../conf";
118117
private static final String CONFIG_DIRECTORY_FALLBACK_2 = "conf";
119118

@@ -153,7 +152,7 @@ public CliFrontend(String configDir) throws Exception {
153152
// load the configuration
154153
LOG.info("Trying to load configuration file");
155154
GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
156-
System.setProperty(ENV_CONFIG_DIRECTORY, configDirectory.getAbsolutePath());
155+
System.setProperty(ConfigConstants.ENV_FLINK_CONF_DIR, configDirectory.getAbsolutePath());
157156
this.config = GlobalConfiguration.getConfiguration();
158157

159158
try {
@@ -1022,16 +1021,16 @@ public static void main(String[] args) {
10221021
// --------------------------------------------------------------------------------------------
10231022

10241023
public static String getConfigurationDirectoryFromEnv() {
1025-
String envLocation = System.getenv(ENV_CONFIG_DIRECTORY);
1026-
String location = envLocation != null ? envLocation : System.getProperty(ENV_CONFIG_DIRECTORY);
1024+
String envLocation = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
1025+
String location = envLocation != null ? envLocation : System.getProperty(ConfigConstants.ENV_FLINK_CONF_DIR);
10271026

10281027
if (location != null) {
10291028
if (new File(location).exists()) {
10301029
return location;
10311030
}
10321031
else {
10331032
throw new RuntimeException("The config directory '" + location + "', specified in the '" +
1034-
ENV_CONFIG_DIRECTORY + "' environment variable, does not exist.");
1033+
ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable, does not exist.");
10351034
}
10361035
}
10371036
else if (new File(CONFIG_DIRECTORY_FALLBACK_1).exists()) {
@@ -1043,7 +1042,7 @@ else if (new File(CONFIG_DIRECTORY_FALLBACK_2).exists()) {
10431042
else {
10441043
throw new RuntimeException("The configuration directory was not specified. " +
10451044
"Please specify the directory containing the configuration file through the '" +
1046-
ENV_CONFIG_DIRECTORY + "' environment variable.");
1045+
ConfigConstants.ENV_FLINK_CONF_DIR + "' environment variable.");
10471046
}
10481047
return location;
10491048
}

flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java

+9
Original file line numberDiff line numberDiff line change
@@ -950,6 +950,15 @@ public final class ConfigConstants {
950950
/** ZooKeeper default leader port. */
951951
public static final int DEFAULT_ZOOKEEPER_LEADER_PORT = 3888;
952952

953+
// ----------------------------- Environment Variables ----------------------------
954+
955+
/** The environment variable name which contains the location of the configuration directory */
956+
public static final String ENV_FLINK_CONF_DIR = "FLINK_CONF_DIR";
957+
958+
/** The environment variable name which contains the location of the lib folder */
959+
public static final String ENV_FLINK_LIB_DIR = "FLINK_LIB_DIR";
960+
961+
953962
/**
954963
* Not instantiable.
955964
*/

flink-dist/src/main/flink-bin/bin/config.sh

+5
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
135135
FLINK_ROOT_DIR=`dirname "$SYMLINK_RESOLVED_BIN"`
136136
FLINK_LIB_DIR=$FLINK_ROOT_DIR/lib
137137

138+
### Exported environment variables ###
139+
export FLINK_CONF_DIR
140+
# export /lib dir to access it during deployment of the Yarn staging files
141+
export FLINK_LIB_DIR
142+
138143
# These need to be mangled because they are directly passed to java.
139144
# The above lib path is used by the shell script to retrieve jars in a
140145
# directory, so it needs to be unmangled.

flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,5 +52,5 @@ log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4
5252

5353
export FLINK_CONF_DIR
5454

55-
$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -ship $bin/../lib/ -j $FLINK_LIB_DIR/flink-dist*.jar "$@"
55+
$JAVA_RUN $JVM_ARGS -classpath $CC_CLASSPATH:$HADOOP_CLASSPATH:$HADOOP_CONF_DIR:$YARN_CONF_DIR $log_setting org.apache.flink.yarn.cli.FlinkYarnSessionCli -j $FLINK_LIB_DIR/flink-dist*.jar "$@"
5656

flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public void write(int b) {}
8686

8787
// Unset FLINK_CONF_DIR, as this is a precondition for this test to work properly
8888
Map<String, String> map = new HashMap<>(System.getenv());
89-
map.remove(CliFrontend.ENV_CONFIG_DIRECTORY);
89+
map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
9090
TestBaseUtils.setEnv(map);
9191
}
9292

flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import akka.actor.ActorSystem;
2222
import org.apache.commons.cli.CommandLine;
2323
import org.apache.commons.cli.CommandLineParser;
24+
import org.apache.commons.cli.DefaultParser;
2425
import org.apache.commons.cli.Options;
25-
import org.apache.commons.cli.PosixParser;
26-
26+
import org.apache.flink.configuration.ConfigConstants;
2727
import org.apache.flink.client.CliFrontend;
2828
import org.apache.flink.client.cli.CliFrontendParser;
2929
import org.apache.flink.client.cli.RunOptions;
@@ -61,14 +61,14 @@ public void testDynamicProperties() throws IOException {
6161
File tmpFolder = tmp.newFolder();
6262
File fakeConf = new File(tmpFolder, "flink-conf.yaml");
6363
fakeConf.createNewFile();
64-
map.put("FLINK_CONF_DIR", tmpFolder.getAbsolutePath());
64+
map.put(ConfigConstants.ENV_FLINK_CONF_DIR, tmpFolder.getAbsolutePath());
6565
TestBaseUtils.setEnv(map);
6666
FlinkYarnSessionCli cli = new FlinkYarnSessionCli("", "", false);
6767
Options options = new Options();
6868
cli.addGeneralOptions(options);
6969
cli.addRunOptions(options);
7070

71-
CommandLineParser parser = new PosixParser();
71+
CommandLineParser parser = new DefaultParser();
7272
CommandLine cmd = null;
7373
try {
7474
cmd = parser.parse(options, new String[]{"run", "-j", "fake.jar", "-n", "15", "-D", "akka.ask.timeout=5 min"});

flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingYarnClusterDescriptor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public TestingYarnClusterDescriptor() {
4646
filesToShip.add(testingJar);
4747
filesToShip.add(testingRuntimeJar);
4848

49-
setShipFiles(filesToShip);
49+
addShipFiles(filesToShip);
5050
}
5151

5252
@Override

flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,9 @@ public void testMultipleAMKill() throws Exception {
103103
flinkYarnClient.setJobManagerMemory(768);
104104
flinkYarnClient.setTaskManagerMemory(1024);
105105
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
106-
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
106+
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
107107

108-
String confDirPath = System.getenv("FLINK_CONF_DIR");
108+
String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
109109
flinkYarnClient.setConfigurationDirectory(confDirPath);
110110

111111
String fsStateHandlePath = tmp.getRoot().getPath();

flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.yarn;
2020

2121
import org.apache.flink.client.program.ClusterClient;
22+
import org.apache.flink.configuration.ConfigConstants;
2223
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
2324
import org.apache.flink.configuration.GlobalConfiguration;
2425
import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
@@ -221,8 +222,8 @@ public void testJavaAPI() {
221222
flinkYarnClient.setJobManagerMemory(768);
222223
flinkYarnClient.setTaskManagerMemory(1024);
223224
flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
224-
flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
225-
String confDirPath = System.getenv("FLINK_CONF_DIR");
225+
flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
226+
String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
226227
flinkYarnClient.setConfigurationDirectory(confDirPath);
227228
flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
228229
flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.flink.yarn;
19+
20+
import org.apache.flink.configuration.ConfigConstants;
21+
import org.apache.flink.configuration.Configuration;
22+
import org.apache.flink.test.util.TestBaseUtils;
23+
import org.apache.hadoop.fs.Path;
24+
import org.junit.Assert;
25+
import org.junit.Rule;
26+
import org.junit.Test;
27+
import org.junit.rules.TemporaryFolder;
28+
29+
import java.io.File;
30+
import java.util.ArrayList;
31+
import java.util.HashMap;
32+
import java.util.HashSet;
33+
import java.util.List;
34+
import java.util.Map;
35+
import java.util.Set;
36+
37+
public class YarnClusterDescriptorTest {
38+
39+
@Rule
40+
public TemporaryFolder temporaryFolder = new TemporaryFolder();
41+
42+
/**
43+
* Tests to ship a lib folder through the {@code YarnClusterDescriptor.addShipFiles}
44+
*/
45+
@Test
46+
public void testExplicitLibShipping() throws Exception {
47+
AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor();
48+
descriptor.setLocalJarPath(new Path("/path/to/flink.jar"));
49+
50+
descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
51+
descriptor.setConfigurationFilePath(new Path(temporaryFolder.getRoot().getPath()));
52+
descriptor.setFlinkConfiguration(new Configuration());
53+
54+
File libFile = temporaryFolder.newFile("libFile.jar");
55+
File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
56+
57+
Assert.assertFalse(descriptor.shipFiles.contains(libFile));
58+
Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
59+
60+
List<File> shipFiles = new ArrayList<>();
61+
shipFiles.add(libFile);
62+
shipFiles.add(libFolder);
63+
64+
descriptor.addShipFiles(shipFiles);
65+
66+
Assert.assertTrue(descriptor.shipFiles.contains(libFile));
67+
Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
68+
69+
// only execute part of the deployment to test for shipped files
70+
Set<File> effectiveShipFiles = new HashSet<>();
71+
descriptor.addLibFolderToShipFiles(effectiveShipFiles);
72+
73+
Assert.assertEquals(0, effectiveShipFiles.size());
74+
Assert.assertEquals(2, descriptor.shipFiles.size());
75+
Assert.assertTrue(descriptor.shipFiles.contains(libFile));
76+
Assert.assertTrue(descriptor.shipFiles.contains(libFolder));
77+
}
78+
79+
/**
80+
* Tests to ship a lib folder through the {@code ConfigConstants.ENV_FLINK_LIB_DIR}
81+
*/
82+
@Test
83+
public void testEnvironmentLibShipping() throws Exception {
84+
AbstractYarnClusterDescriptor descriptor = new YarnClusterDescriptor();
85+
86+
descriptor.setConfigurationDirectory(temporaryFolder.getRoot().getAbsolutePath());
87+
descriptor.setConfigurationFilePath(new Path(temporaryFolder.getRoot().getPath()));
88+
descriptor.setFlinkConfiguration(new Configuration());
89+
90+
File libFolder = temporaryFolder.newFolder().getAbsoluteFile();
91+
File libFile = new File(libFolder, "libFile.jar");
92+
libFile.createNewFile();
93+
94+
Set<File> effectiveShipFiles = new HashSet<>();
95+
96+
final Map<String, String> oldEnv = System.getenv();
97+
try {
98+
Map<String, String> env = new HashMap<>(1);
99+
env.put(ConfigConstants.ENV_FLINK_LIB_DIR, libFolder.getAbsolutePath());
100+
TestBaseUtils.setEnv(env);
101+
// only execute part of the deployment to test for shipped files
102+
descriptor.addLibFolderToShipFiles(effectiveShipFiles);
103+
} finally {
104+
TestBaseUtils.setEnv(oldEnv);
105+
}
106+
107+
// only add the ship the folder, not the contents
108+
Assert.assertFalse(effectiveShipFiles.contains(libFile));
109+
Assert.assertTrue(effectiveShipFiles.contains(libFolder));
110+
Assert.assertFalse(descriptor.shipFiles.contains(libFile));
111+
Assert.assertFalse(descriptor.shipFiles.contains(libFolder));
112+
}
113+
114+
}

flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.commons.io.FileUtils;
2222
import org.apache.flink.client.CliFrontend;
23+
import org.apache.flink.configuration.ConfigConstants;
2324
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
2425
import org.apache.flink.test.util.TestBaseUtils;
2526
import org.apache.flink.util.TestLogger;
@@ -365,7 +366,7 @@ public static void startYARNWithConfig(Configuration conf) {
365366
File flinkConfDirPath = findFile(flinkDistRootDir, new ContainsName(new String[]{"flink-conf.yaml"}));
366367
Assert.assertNotNull(flinkConfDirPath);
367368

368-
map.put(CliFrontend.ENV_CONFIG_DIRECTORY, flinkConfDirPath.getParent());
369+
map.put(ConfigConstants.ENV_FLINK_CONF_DIR, flinkConfDirPath.getParent());
369370

370371
File yarnConfFile = writeYarnSiteConfigXML(conf);
371372
map.put("YARN_CONF_DIR", yarnConfFile.getParentFile().getAbsolutePath());
@@ -596,7 +597,7 @@ public void sendStop() {
596597
public static void teardown() throws Exception {
597598
// Unset FLINK_CONF_DIR, as it might change the behavior of other tests
598599
Map<String, String> map = new HashMap<>(System.getenv());
599-
map.remove(CliFrontend.ENV_CONFIG_DIRECTORY);
600+
map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
600601
TestBaseUtils.setEnv(map);
601602

602603
// When we are on travis, we copy the tmp files of JUnit (containing the MiniYARNCluster log files)

0 commit comments

Comments
 (0)