From fd4e724db7f2653c891885a86f8066b288335ca1 Mon Sep 17 00:00:00 2001 From: jiangxinglei Date: Thu, 6 Jun 2024 17:16:13 +0800 Subject: [PATCH 1/4] add archive file option --- .../net/qihoo/hbox/api/HboxConstants.java | 2 + .../net/qihoo/hbox/AM/ApplicationMaster.java | 41 +++++++++++++++++ .../java/net/qihoo/hbox/client/Client.java | 46 +++++++++++++++++++ .../qihoo/hbox/client/ClientArguments.java | 9 ++++ .../java/net/qihoo/hbox/util/Utilities.java | 33 +++++++------ 5 files changed, 118 insertions(+), 13 deletions(-) diff --git a/common/src/main/java/net/qihoo/hbox/api/HboxConstants.java b/common/src/main/java/net/qihoo/hbox/api/HboxConstants.java index fb0fe2e..98d18ce 100644 --- a/common/src/main/java/net/qihoo/hbox/api/HboxConstants.java +++ b/common/src/main/java/net/qihoo/hbox/api/HboxConstants.java @@ -106,6 +106,8 @@ enum Environment { HBOX_FILES_LOCATION("HBOX_FILES_LOCATION"), + HBOX_ARCHIVE_FILES_LOCATION("HBOX_ARCHIVE_FILES_LOCATION"), + HBOX_LIBJARS_LOCATION("HBOX_LIBJARS_LOCATION"), APP_JAR_LOCATION("APP_JAR_LOCATION"), diff --git a/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java b/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java index 240f818..f17afb3 100644 --- a/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java +++ b/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java @@ -89,6 +89,8 @@ public class ApplicationMaster extends CompositeService { private Path appConfRemoteLocation; // location of files on HDFS private String appFilesRemoteLocation; + // location of archive files on HDFS + private String appArchiveFilesRemoteLocation; // location of lib jars on HDFS private String appLibJarsRemoteLocation; // location of cacheFiles on HDFS @@ -322,6 +324,11 @@ public void run() { LOG.info("Application files location: " + appFilesRemoteLocation); } + if (envs.containsKey(HboxConstants.Environment.HBOX_ARCHIVE_FILES_LOCATION.toString())) { + appArchiveFilesRemoteLocation = envs.get(HboxConstants.Environment.HBOX_ARCHIVE_FILES_LOCATION.toString()); + LOG.info("Application archive files location: " + appArchiveFilesRemoteLocation); + } + if (envs.containsKey(HboxConstants.Environment.HBOX_LIBJARS_LOCATION.toString())) { appLibJarsRemoteLocation = envs.get(HboxConstants.Environment.HBOX_LIBJARS_LOCATION.toString()); LOG.info("Application lib Jars location: " + appLibJarsRemoteLocation); @@ -1365,6 +1372,40 @@ private void buildContainerLocalResource() { } } + if (appArchiveFilesRemoteLocation != null) { + String[] archiveFiles = StringUtils.split(appArchiveFilesRemoteLocation, ","); + for (String archiveFile : archiveFiles) { + Path pathRemote; + String aliasName; + if (archiveFile.contains("#")) { + String[] paths = StringUtils.split(archiveFile, "#"); + if (paths.length != 2) { + throw new RuntimeException("Error cacheArchive path format " + appArchiveFilesRemoteLocation); + } + pathRemote = new Path(paths[0]); + aliasName = paths[1]; + } else { + pathRemote = new Path(archiveFile); + aliasName = pathRemote.getName(); + } + URI pathRemoteUri = pathRemote.toUri(); + if (Boolean.parseBoolean(conf.get(HboxConfiguration.HBOX_APPEND_DEFAULTFS_ENABLE, String.valueOf(HboxConfiguration.DEFAULT_HBOX_APPEND_DEFAULTFS_ENABLE)))) { + if (pathRemoteUri.getScheme() == null || pathRemoteUri.getHost() == null) { + pathRemote = new Path(defaultUri.toString(), pathRemote.toString()); + } + } + LOG.info("archive file remote path is " + pathRemote + " and alias name is " + aliasName); + containerLocalResource.put(aliasName, + Utilities.createApplicationResource(pathRemote.getFileSystem(conf), + pathRemote, + LocalResourceType.ARCHIVE, + LocalResourceVisibility.APPLICATION)); + if (hboxAppType.equals("MPI") || hboxAppType.equals("TENSORNET") || hboxAppType.equals("HOROVOD")) { + reLinkFiles.append(aliasName).append(","); + } + } + } + if (appLibJarsRemoteLocation != null) { String[] tfFiles = StringUtils.split(appLibJarsRemoteLocation, ","); for (String file : tfFiles) { diff --git a/core/src/main/java/net/qihoo/hbox/client/Client.java b/core/src/main/java/net/qihoo/hbox/client/Client.java index ff3d924..8f8c4c4 100644 --- a/core/src/main/java/net/qihoo/hbox/client/Client.java +++ b/core/src/main/java/net/qihoo/hbox/client/Client.java @@ -54,6 +54,7 @@ public class Client { private ApplicationMessageProtocol hboxClient; private transient AtomicBoolean isRunning; private StringBuffer appFilesRemotePath; + private StringBuffer appArchiveFilesRemotePath; private StringBuffer appLibJarsRemotePath; private ApplicationId applicationId; private int maxContainerMem; @@ -75,6 +76,7 @@ private Client(String[] args) throws IOException, ParseException, ClassNotFoundE this.clientArguments = new ClientArguments(args); this.isRunning = new AtomicBoolean(false); this.appFilesRemotePath = new StringBuffer(1000); + this.appArchiveFilesRemotePath = new StringBuffer(1000); this.appLibJarsRemotePath = new StringBuffer(1000); this.inputPaths = new ConcurrentHashMap<>(); this.s3InputPaths = new ConcurrentHashMap<>(); @@ -724,6 +726,46 @@ private void prepareFilesForAM(Map appMasterEnv, Map appMasterEnv, Map localResources) throws IOException { + Path[] filesDst = new Path[clientArguments.hboxArchiveFiles.length]; + LOG.info("Copy hbox archive files from local filesystem to remote."); + boolean amEnableFiles = conf.getBoolean(HboxConfiguration.HBOX_AM_CMD_ENABLE, HboxConfiguration.DEFAULT_HBOX_AM_ENABLE); + for (int i = 0; i < clientArguments.hboxArchiveFiles.length; i++) { + assert (!clientArguments.hboxArchiveFiles[i].isEmpty()); + Path filesSrc; + String aliasName; + if (clientArguments.hboxArchiveFiles[i].contains("#")) { + String[] paths = StringUtils.split(clientArguments.hboxArchiveFiles[i], "#"); + if (paths.length != 2) { + throw new RuntimeException("Error cacheArchive path format " + clientArguments.hboxArchiveFiles[i]); + } + filesSrc = new Path(paths[0]); + aliasName = paths[1]; + } else { + filesSrc = new Path(clientArguments.hboxArchiveFiles[i]); + aliasName = filesSrc.getName(); + } + + filesDst[i] = Utilities.getRemotePath( + conf, applicationId, filesSrc.getName()); + LOG.info("Copying " + filesSrc + " to remote path " + filesDst[i].toString()); + dfs.copyFromLocalFile(false, true, filesSrc, filesDst[i]); + appArchiveFilesRemotePath.append(filesDst[i].toUri().toString() + "#" + aliasName).append(","); + + if (amEnableFiles || (clientArguments.appType.equals("MXNET") && !conf.getBoolean(HboxConfiguration.HBOX_MXNET_MODE_SINGLE, HboxConfiguration.DEFAULT_HBOX_MXNET_MODE_SINGLE)) + || clientArguments.appType.equals("XFLOW") + || (clientArguments.appType.equals("XDL") && !conf.getBoolean(HboxConfiguration.HBOX_TF_MODE_SINGLE, HboxConfiguration.DEFAULT_HBOX_TF_MODE_SINGLE))) { + localResources.put(aliasName, + Utilities.createApplicationResource(filesDst[i].getFileSystem(conf), + filesDst[i], + LocalResourceType.ARCHIVE, + LocalResourceVisibility.APPLICATION)); + } + } + appMasterEnv.put(HboxConstants.Environment.HBOX_ARCHIVE_FILES_LOCATION.toString(), + appArchiveFilesRemotePath.deleteCharAt(appArchiveFilesRemotePath.length() - 1).toString()); + } + private void prepareJarsForAM(Map appMasterEnv, Map localResources) throws IOException { Path[] tfFilesDst = new Path[clientArguments.libJars.length]; LOG.info("Copy hbox lib jars from local filesystem to remote."); @@ -1011,6 +1053,10 @@ private void prepareOtherEnvsForAM(Map appMasterEnv, Map env, String userEnvKey, String userEnvValue) { if (env.containsKey(userEnvKey)) { env.put(userEnvKey, userEnvValue + System.getProperty("path.separator") + env.get(userEnvKey) + System.getProperty("path.separator") + System.getenv(userEnvKey)); From 66e31ea69efda245162ee96eea7bf661ea7142cf Mon Sep 17 00:00:00 2001 From: jiangxinglei Date: Thu, 6 Jun 2024 18:53:55 +0800 Subject: [PATCH 2/4] add hadoop classpath glob --- core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java b/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java index f17afb3..a51c7cc 100644 --- a/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java +++ b/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java @@ -1532,6 +1532,10 @@ private Map buildContainerEnv(String role) { } else { containerEnv.put("CLASSPATH", System.getenv("CLASSPATH") + ":" + libJarsClassPath); } + + // for libhdfs.so + containerEnv.computeIfPresent("CLASSPATH", (k, v) -> v + ":$(hadoop classpath --glob)"); + containerEnv.put(HboxConstants.Environment.APP_ATTEMPTID.toString(), applicationAttemptID.toString()); containerEnv.put(HboxConstants.Environment.APP_ID.toString(), applicationAttemptID.getApplicationId().toString()); From 747c56cdcac6a118dc4626c1345d1c711b88cb6d Mon Sep 17 00:00:00 2001 From: jiangxinglei Date: Fri, 7 Jun 2024 09:55:51 +0800 Subject: [PATCH 3/4] add doc --- doc/submit.md | 2 +- doc/submit_cn.md | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/doc/submit.md b/doc/submit.md index bc849d9..1ec025d 100644 --- a/doc/submit.md +++ b/doc/submit.md @@ -46,4 +46,4 @@ output-strategy | the strategy of the output file, default as the configure of h outputformat | specify the class of outputformat when output-strategy is "STREAM", default as the configure of hbox.outputformat.class tf-evaluator | whether to set the last worker as evaluator of distributed TensorFlow job type, default as the configure of hbox.tf.evaluator output-index | specify the index of the worker which to upload the output, default upload the output of all the workers. - +archiveFiles | Location of local archive files will be uploaded to container and be decompressed. use comma as separator, # with alias name diff --git a/doc/submit_cn.md b/doc/submit_cn.md index fce2b84..72b27ae 100644 --- a/doc/submit_cn.md +++ b/doc/submit_cn.md @@ -46,3 +46,4 @@ output-strategy | 输出文件加载策略,默认为系统配置hbox.output.st outputformat | 当输出文件加载模式为STREAM时,指定outputformat类,默认为系统配置hbox.outputformat.class tf-evaluator | 在分布式TensorFlow作业类型下,是否设置evaluator角色,默认为系统配置hbox.tf.evaluator output-index | 指定保存index对应worker的输出文件,默认保存所有worker的输出结果 +archiveFiles | 上传本地压缩文件, container内自动解压. 逗号分割, 路径后#可带alias名字 \ No newline at end of file From 7044e566c45cfbe8ba1193ea7f9d5536d1585adb Mon Sep 17 00:00:00 2001 From: jiangxinglei Date: Fri, 7 Jun 2024 14:11:19 +0800 Subject: [PATCH 4/4] use /bin/true for rsh_agent to aviod extra output --- core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java b/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java index a51c7cc..79a8230 100644 --- a/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java +++ b/core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java @@ -1669,8 +1669,8 @@ private void launchMpiExec() throws IOException { // MPI related options // bind-to none option envLists.add("OMPI_MCA_hwloc_base_binding_policy=none"); - // -mca plm_rsh_agent /bin/echo - envLists.add("OMPI_MCA_plm_rsh_agent=/bin/echo"); + // -mca plm_rsh_agent /bin/true + envLists.add("OMPI_MCA_plm_rsh_agent=/bin/true"); // -mca plm_base_verbose 1 envLists.add("OMPI_MCA_plm_base_verbose=1"); // --oversubscribe @@ -1744,7 +1744,7 @@ public void run() { * @return */ private void processMpiExecOutput(String mpiExecOutput) { - if (mpiExecOutput.startsWith("command") || mpiExecOutput.contains("