Skip to content

Commit

Permalink
some issues fix
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiankun committed Jun 7, 2024
2 parents 745768f + 7b3e9b7 commit b24e7de
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 17 deletions.
2 changes: 2 additions & 0 deletions common/src/main/java/net/qihoo/hbox/api/HboxConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ enum Environment {

HBOX_FILES_LOCATION("HBOX_FILES_LOCATION"),

HBOX_ARCHIVE_FILES_LOCATION("HBOX_ARCHIVE_FILES_LOCATION"),

HBOX_LIBJARS_LOCATION("HBOX_LIBJARS_LOCATION"),

APP_JAR_LOCATION("APP_JAR_LOCATION"),
Expand Down
51 changes: 48 additions & 3 deletions core/src/main/java/net/qihoo/hbox/AM/ApplicationMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public class ApplicationMaster extends CompositeService {
private Path appConfRemoteLocation;
// location of files on HDFS
private String appFilesRemoteLocation;
// location of archive files on HDFS
private String appArchiveFilesRemoteLocation;
// location of lib jars on HDFS
private String appLibJarsRemoteLocation;
// location of cacheFiles on HDFS
Expand Down Expand Up @@ -322,6 +324,11 @@ public void run() {
LOG.info("Application files location: " + appFilesRemoteLocation);
}

if (envs.containsKey(HboxConstants.Environment.HBOX_ARCHIVE_FILES_LOCATION.toString())) {
appArchiveFilesRemoteLocation = envs.get(HboxConstants.Environment.HBOX_ARCHIVE_FILES_LOCATION.toString());
LOG.info("Application archive files location: " + appArchiveFilesRemoteLocation);
}

if (envs.containsKey(HboxConstants.Environment.HBOX_LIBJARS_LOCATION.toString())) {
appLibJarsRemoteLocation = envs.get(HboxConstants.Environment.HBOX_LIBJARS_LOCATION.toString());
LOG.info("Application lib Jars location: " + appLibJarsRemoteLocation);
Expand Down Expand Up @@ -1365,6 +1372,40 @@ private void buildContainerLocalResource() {
}
}

if (appArchiveFilesRemoteLocation != null) {
String[] archiveFiles = StringUtils.split(appArchiveFilesRemoteLocation, ",");
for (String archiveFile : archiveFiles) {
Path pathRemote;
String aliasName;
if (archiveFile.contains("#")) {
String[] paths = StringUtils.split(archiveFile, "#");
if (paths.length != 2) {
throw new RuntimeException("Error cacheArchive path format " + appArchiveFilesRemoteLocation);
}
pathRemote = new Path(paths[0]);
aliasName = paths[1];
} else {
pathRemote = new Path(archiveFile);
aliasName = pathRemote.getName();
}
URI pathRemoteUri = pathRemote.toUri();
if (Boolean.parseBoolean(conf.get(HboxConfiguration.HBOX_APPEND_DEFAULTFS_ENABLE, String.valueOf(HboxConfiguration.DEFAULT_HBOX_APPEND_DEFAULTFS_ENABLE)))) {
if (pathRemoteUri.getScheme() == null || pathRemoteUri.getHost() == null) {
pathRemote = new Path(defaultUri.toString(), pathRemote.toString());
}
}
LOG.info("archive file remote path is " + pathRemote + " and alias name is " + aliasName);
containerLocalResource.put(aliasName,
Utilities.createApplicationResource(pathRemote.getFileSystem(conf),
pathRemote,
LocalResourceType.ARCHIVE,
LocalResourceVisibility.APPLICATION));
if (hboxAppType.equals("MPI") || hboxAppType.equals("TENSORNET") || hboxAppType.equals("HOROVOD")) {
reLinkFiles.append(aliasName).append(",");
}
}
}

if (appLibJarsRemoteLocation != null) {
String[] tfFiles = StringUtils.split(appLibJarsRemoteLocation, ",");
for (String file : tfFiles) {
Expand Down Expand Up @@ -1491,6 +1532,10 @@ private Map<String, String> buildContainerEnv(String role) {
} else {
containerEnv.put("CLASSPATH", System.getenv("CLASSPATH") + ":" + libJarsClassPath);
}

// for libhdfs.so
containerEnv.computeIfPresent("CLASSPATH", (k, v) -> v + ":$(hadoop classpath --glob)");

containerEnv.put(HboxConstants.Environment.APP_ATTEMPTID.toString(), applicationAttemptID.toString());
containerEnv.put(HboxConstants.Environment.APP_ID.toString(), applicationAttemptID.getApplicationId().toString());

Expand Down Expand Up @@ -1624,8 +1669,8 @@ private void launchMpiExec() throws IOException {
// MPI related options
// bind-to none option
envLists.add("OMPI_MCA_hwloc_base_binding_policy=none");
// -mca plm_rsh_agent /bin/echo
envLists.add("OMPI_MCA_plm_rsh_agent=/bin/echo");
// -mca plm_rsh_agent /bin/true
envLists.add("OMPI_MCA_plm_rsh_agent=/bin/true");
// -mca plm_base_verbose 1
envLists.add("OMPI_MCA_plm_base_verbose=1");
// --oversubscribe
Expand Down Expand Up @@ -1699,7 +1744,7 @@ public void run() {
* @return
*/
private void processMpiExecOutput(String mpiExecOutput) {
if (mpiExecOutput.startsWith("command") || mpiExecOutput.contains("<template>")) {
if (mpiContainerCommand == null && (mpiExecOutput.startsWith("command") || mpiExecOutput.contains("<template>"))) {
LOG.info("Container mpi Command " + mpiExecOutput);
appendMessage(new Message(LogType.STDERR, mpiExecOutput));
if (mpiExecOutput.startsWith("command")) {
Expand Down
46 changes: 46 additions & 0 deletions core/src/main/java/net/qihoo/hbox/client/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class Client {
private ApplicationMessageProtocol hboxClient;
private transient AtomicBoolean isRunning;
private StringBuffer appFilesRemotePath;
private StringBuffer appArchiveFilesRemotePath;
private StringBuffer appLibJarsRemotePath;
private ApplicationId applicationId;
private int maxContainerMem;
Expand All @@ -75,6 +76,7 @@ private Client(String[] args) throws IOException, ParseException, ClassNotFoundE
this.clientArguments = new ClientArguments(args);
this.isRunning = new AtomicBoolean(false);
this.appFilesRemotePath = new StringBuffer(1000);
this.appArchiveFilesRemotePath = new StringBuffer(1000);
this.appLibJarsRemotePath = new StringBuffer(1000);
this.inputPaths = new ConcurrentHashMap<>();
this.s3InputPaths = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -724,6 +726,46 @@ private void prepareFilesForAM(Map<String, String> appMasterEnv, Map<String, Loc
}
}

private void prepareArchiveFilesForAM(Map<String, String> appMasterEnv, Map<String, LocalResource> localResources) throws IOException {
Path[] filesDst = new Path[clientArguments.hboxArchiveFiles.length];
LOG.info("Copy hbox archive files from local filesystem to remote.");
boolean amEnableFiles = conf.getBoolean(HboxConfiguration.HBOX_AM_CMD_ENABLE, HboxConfiguration.DEFAULT_HBOX_AM_ENABLE);
for (int i = 0; i < clientArguments.hboxArchiveFiles.length; i++) {
assert (!clientArguments.hboxArchiveFiles[i].isEmpty());
Path filesSrc;
String aliasName;
if (clientArguments.hboxArchiveFiles[i].contains("#")) {
String[] paths = StringUtils.split(clientArguments.hboxArchiveFiles[i], "#");
if (paths.length != 2) {
throw new RuntimeException("Error cacheArchive path format " + clientArguments.hboxArchiveFiles[i]);
}
filesSrc = new Path(paths[0]);
aliasName = paths[1];
} else {
filesSrc = new Path(clientArguments.hboxArchiveFiles[i]);
aliasName = filesSrc.getName();
}

filesDst[i] = Utilities.getRemotePath(
conf, applicationId, filesSrc.getName());
LOG.info("Copying " + filesSrc + " to remote path " + filesDst[i].toString());
dfs.copyFromLocalFile(false, true, filesSrc, filesDst[i]);
appArchiveFilesRemotePath.append(filesDst[i].toUri().toString() + "#" + aliasName).append(",");

if (amEnableFiles || (clientArguments.appType.equals("MXNET") && !conf.getBoolean(HboxConfiguration.HBOX_MXNET_MODE_SINGLE, HboxConfiguration.DEFAULT_HBOX_MXNET_MODE_SINGLE))
|| clientArguments.appType.equals("XFLOW")
|| (clientArguments.appType.equals("XDL") && !conf.getBoolean(HboxConfiguration.HBOX_TF_MODE_SINGLE, HboxConfiguration.DEFAULT_HBOX_TF_MODE_SINGLE))) {
localResources.put(aliasName,
Utilities.createApplicationResource(filesDst[i].getFileSystem(conf),
filesDst[i],
LocalResourceType.ARCHIVE,
LocalResourceVisibility.APPLICATION));
}
}
appMasterEnv.put(HboxConstants.Environment.HBOX_ARCHIVE_FILES_LOCATION.toString(),
appArchiveFilesRemotePath.deleteCharAt(appArchiveFilesRemotePath.length() - 1).toString());
}

private void prepareJarsForAM(Map<String, String> appMasterEnv, Map<String, LocalResource> localResources) throws IOException {
Path[] tfFilesDst = new Path[clientArguments.libJars.length];
LOG.info("Copy hbox lib jars from local filesystem to remote.");
Expand Down Expand Up @@ -1011,6 +1053,10 @@ private void prepareOtherEnvsForAM(Map<String, String> appMasterEnv, Map<String,
prepareFilesForAM(appMasterEnv, localResources);
}

if (clientArguments.hboxArchiveFiles != null) {
prepareArchiveFilesForAM(appMasterEnv, localResources);
}

if (clientArguments.libJars != null) {
prepareJarsForAM(appMasterEnv, localResources);
}
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/net/qihoo/hbox/client/ClientArguments.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class ClientArguments {
int psNum;
String duration;
String[] hboxFiles;
String[] hboxArchiveFiles;
String[] libJars;
String[] hboxCommandArgs;
String inputStrategy;
Expand Down Expand Up @@ -91,6 +92,7 @@ private void init() {
psGCores = HboxConfiguration.DEFAULT_HBOX_PS_GPU;
psNum = HboxConfiguration.DEFAULT_HBOX_PS_NUM;
hboxFiles = null;
hboxArchiveFiles = null;
libJars = null;
hboxCacheFiles = "";
hboxCacheArchives = "";
Expand Down Expand Up @@ -163,6 +165,9 @@ private void init() {
allOptions.addOption("files", "files", true,
"Location of the hbox files used in container");

allOptions.addOption("archiveFiles", "archiveFiles", true,
"Location of local archive files will be uploaded to container and be decompressed");

allOptions.addOption("jars", "jars", true,
"Location of the hbox lib jars used in container");

Expand Down Expand Up @@ -426,6 +431,10 @@ private void cliParser(String[] args) throws ParseException, IOException, ClassN
hboxFiles = StringUtils.split(cliParser.getOptionValue("files"), ',');
}

if (cliParser.hasOption("archiveFiles")) {
hboxArchiveFiles = StringUtils.split(cliParser.getOptionValue("archiveFiles"), ',');
}

if (cliParser.hasOption("jars")) {
libJars = StringUtils.split(cliParser.getOptionValue("jars"), ',');
}
Expand Down
33 changes: 20 additions & 13 deletions core/src/main/java/net/qihoo/hbox/util/Utilities.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,32 +168,39 @@ public static boolean isSubPath(FileSystem fs, Path parent, Path sub) {

public static LocalResource createApplicationResource(FileSystem fs, Path path, LocalResourceType type)
throws IOException {
LocalResource localResource = Records.newRecord(LocalResource.class);
FileStatus fileStatus = fs.getFileStatus(path);
if (!fs.getConf().get(HboxConfiguration.HBOX_REMOTE_DEFAULTFS, HboxConfiguration.DEFAULT_HBOX_REMOTE_DEFAULTFS).equals("")) {
path = new Path(fs.getConf().get(HboxConfiguration.HBOX_REMOTE_DEFAULTFS), path);
}
localResource.setResource(ConverterUtils.getYarnUrlFromPath(path));
localResource.setSize(fileStatus.getLen());
localResource.setTimestamp(fileStatus.getModificationTime());
localResource.setType(type);
LocalResource localResource;
switch (fs.getConf().get(HboxConfiguration.HBOX_LOCAL_RESOURCE_VISIBILITY, HboxConfiguration.DEFAULT_HBOX_LOCAL_RESOURCE_VISIBILITY).toUpperCase()) {
case "PUBLIC":
localResource.setVisibility(LocalResourceVisibility.PUBLIC);
localResource = createApplicationResource(fs, path, type, LocalResourceVisibility.PUBLIC);
break;
case "APPLICATION":
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
localResource = createApplicationResource(fs, path, type, LocalResourceVisibility.APPLICATION);
break;
case "PRIVATE":
localResource.setVisibility(LocalResourceVisibility.PRIVATE);
localResource = createApplicationResource(fs, path, type, LocalResourceVisibility.PRIVATE);
break;
default:
localResource.setVisibility(LocalResourceVisibility.PUBLIC);
localResource = createApplicationResource(fs, path, type, LocalResourceVisibility.PUBLIC);
break;
}
return localResource;
}

public static LocalResource createApplicationResource(FileSystem fs, Path path, LocalResourceType type, LocalResourceVisibility visibility)
throws IOException {
LocalResource localResource = Records.newRecord(LocalResource.class);
FileStatus fileStatus = fs.getFileStatus(path);
if (!fs.getConf().get(HboxConfiguration.HBOX_REMOTE_DEFAULTFS, HboxConfiguration.DEFAULT_HBOX_REMOTE_DEFAULTFS).equals("")) {
path = new Path(fs.getConf().get(HboxConfiguration.HBOX_REMOTE_DEFAULTFS), path);
}
localResource.setResource(ConverterUtils.getYarnUrlFromPath(path));
localResource.setSize(fileStatus.getLen());
localResource.setTimestamp(fileStatus.getModificationTime());
localResource.setType(type);
localResource.setVisibility(visibility);
return localResource;
}

public static void addPathToEnvironment(Map<String, String> env, String userEnvKey, String userEnvValue) {
if (env.containsKey(userEnvKey)) {
env.put(userEnvKey, userEnvValue + System.getProperty("path.separator") + env.get(userEnvKey) + System.getProperty("path.separator") + System.getenv(userEnvKey));
Expand Down
2 changes: 1 addition & 1 deletion doc/submit.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,4 @@ output-strategy | the strategy of the output file, default as the configure of h
outputformat | specify the class of outputformat when output-strategy is "STREAM", default as the configure of hbox.outputformat.class
tf-evaluator | whether to set the last worker as evaluator of distributed TensorFlow job type, default as the configure of hbox.tf.evaluator
output-index | specify the index of the worker which to upload the output, default upload the output of all the workers.

archiveFiles | Location of local archive files will be uploaded to container and be decompressed. use comma as separator, # with alias name
1 change: 1 addition & 0 deletions doc/submit_cn.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,4 @@ output-strategy | 输出文件加载策略,默认为系统配置hbox.output.st
outputformat | 当输出文件加载模式为STREAM时,指定outputformat类,默认为系统配置hbox.outputformat.class
tf-evaluator | 在分布式TensorFlow作业类型下,是否设置evaluator角色,默认为系统配置hbox.tf.evaluator
output-index | 指定保存index对应worker的输出文件,默认保存所有worker的输出结果
archiveFiles | 上传本地压缩文件, container内自动解压. 逗号分割, 路径后#可带alias名字

0 comments on commit b24e7de

Please sign in to comment.