Skip to content

Commit

Permalink
Merge branch 'TASK-6445' of github.com:opencb/opencga into TASK-6445
Browse files Browse the repository at this point in the history
  • Loading branch information
imedina committed Oct 3, 2024
2 parents ccbba00 + bc2cc11 commit 31811c1
Show file tree
Hide file tree
Showing 11 changed files with 433 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.opencb.commons.utils.DockerUtils;
import org.opencb.opencga.analysis.tools.OpenCgaToolScopeStudy;
import org.opencb.opencga.catalog.utils.InputFileUtils;
import org.opencb.opencga.analysis.tools.OpenCgaDockerToolScopeStudy;
import org.opencb.opencga.core.common.TimeUtils;
import org.opencb.opencga.core.exceptions.ToolException;
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.file.File;
import org.opencb.opencga.core.models.job.JobRunDockerParams;
import org.opencb.opencga.core.models.job.JobRunParams;
import org.opencb.opencga.core.models.job.ToolInfoExecutor;
Expand All @@ -17,12 +14,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.LinkedList;
import java.util.List;

@Tool(id = CustomToolExecutor.ID, resource = Enums.Resource.JOB, description = CustomToolExecutor.DESCRIPTION)
public class CustomToolExecutor extends OpenCgaToolScopeStudy {
public class CustomToolExecutor extends OpenCgaDockerToolScopeStudy {

public final static String ID = "custom-tool";
public static final String DESCRIPTION = "Execute an analysis from a custom binary.";
Expand All @@ -31,8 +27,6 @@ public class CustomToolExecutor extends OpenCgaToolScopeStudy {
protected JobRunParams runParams = new JobRunParams();

private String cliParams;
// Build list of inputfiles in case we need to specifically mount them in read only mode
List<AbstractMap.SimpleEntry<String, String>> inputBindings;
private String dockerImage;

private final static Logger logger = LoggerFactory.getLogger(CustomToolExecutor.class);
Expand Down Expand Up @@ -66,38 +60,15 @@ protected void check() throws Exception {
tags.add(this.dockerImage);
updateJobInformation(tags, toolInfoExecutor);

// Build input bindings
this.inputBindings = new LinkedList<>();
InputFileUtils inputFileUtils = new InputFileUtils(catalogManager);

StringBuilder cliParamsBuilder = new StringBuilder();
String[] params = runParams.getCommandLine().split(" ");
Map<String, String> inputDirectoryMounts = new HashMap<>();
for (String param : params) {
if (inputFileUtils.isValidOpenCGAFile(param)) {
File file = inputFileUtils.findOpenCGAFileFromPattern(study, param, token);
Path parent = Paths.get(file.getUri()).getParent();
if (!inputDirectoryMounts.containsKey(parent.toString())) {
inputDirectoryMounts.put(parent.toString(), "/data/input" + inputDirectoryMounts.size());
}
String directoryMount = inputDirectoryMounts.get(parent.toString());
inputBindings.add(new AbstractMap.SimpleEntry<>(parent.toString(), directoryMount));
cliParamsBuilder.append(directoryMount).append("/").append(file.getName()).append(" ");
} else {
cliParamsBuilder.append(param).append(" ");
}
}
processInputParams(runParams.getCommandLine(), cliParamsBuilder);
this.cliParams = cliParamsBuilder.toString();
}

@Override
protected void run() throws Exception {
// Build output binding
AbstractMap.SimpleEntry<String, String> outputBinding = new AbstractMap.SimpleEntry<>(getOutDir().toAbsolutePath().toString(),
"/data/output");

StopWatch stopWatch = StopWatch.createStarted();
String cmdline = DockerUtils.run(dockerImage, inputBindings, outputBinding, cliParams, null);
String cmdline = runDocker(dockerImage, cliParams);
logger.info("Docker command line: " + cmdline);
logger.info("Execution time: " + TimeUtils.durationToString(stopWatch));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package org.opencb.opencga.analysis.tools;

import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.commons.utils.DockerUtils;
import org.opencb.opencga.catalog.db.api.JobDBAdaptor;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.catalog.utils.InputFileUtils;
import org.opencb.opencga.core.models.file.File;
import org.opencb.opencga.core.models.job.ToolInfoExecutor;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Stream;

public abstract class OpenCgaDockerToolScopeStudy extends OpenCgaToolScopeStudy {

// Build list of inputfiles in case we need to specifically mount them in read only mode
protected List<AbstractMap.SimpleEntry<String, String>> dockerInputBindings;
// Directory where temporal input files will be stored
protected Path temporalInputDir;

protected InputFileUtils inputFileUtils;

@Override
protected void check() throws Exception {
super.check();
this.dockerInputBindings = new LinkedList<>();
this.temporalInputDir = Files.createDirectory(getOutDir().resolve(".opencga_input"));
this.inputFileUtils = new InputFileUtils(catalogManager);
}

@Override
protected void close() {
super.close();
deleteTemporalFiles();
}

private void deleteTemporalFiles() {
// Delete input files and temporal directory
try (Stream<Path> paths = Files.walk(temporalInputDir)) {
paths.sorted(Comparator.reverseOrder())
.map(Path::toFile)
.forEach(java.io.File::delete);
} catch (IOException e) {
logger.warn("Could not delete temporal input directory: " + temporalInputDir, e);
}
}

/**
* Process the input parameters of the command line.
* @param value
* @param inputFileUtils
* @param cliParamsBuilder
* @throws CatalogException
*/
protected void processInputCli(String value, InputFileUtils inputFileUtils, StringBuilder cliParamsBuilder) throws CatalogException {
if (inputFileUtils.isValidOpenCGAFile(value)) {
File file = inputFileUtils.findOpenCGAFileFromPattern(study, value, token);
if (inputFileUtils.fileMayContainReferencesToOtherFiles(file)) {
Path outputFile = temporalInputDir.resolve(file.getName());
List<File> files = inputFileUtils.findAndReplaceFilePathToUrisFromFile(study, file, outputFile, token);

// Write outputFile as inputBinding
dockerInputBindings.add(new AbstractMap.SimpleEntry<>(outputFile.toString(), outputFile.toString()));
logger.info("Params: OpenCGA input file: '{}'", outputFile);
cliParamsBuilder.append(outputFile).append(" ");

// Add files to inputBindings to ensure they are also mounted (if any)
for (File tmpFile : files) {
dockerInputBindings.add(new AbstractMap.SimpleEntry<>(tmpFile.getUri().getPath(), tmpFile.getUri().getPath()));
logger.info("Inner files from '{}': OpenCGA input file: '{}'", outputFile, tmpFile.getUri().getPath());
}
} else {
String path = file.getUri().getPath();
dockerInputBindings.add(new AbstractMap.SimpleEntry<>(path, path));
logger.info("Params: OpenCGA input file: '{}'", path);
cliParamsBuilder.append(path).append(" ");
}
} else {
cliParamsBuilder.append(value).append(" ");
}

}

protected void processOutputCli(String value, InputFileUtils inputFileUtils, StringBuilder cliParamsBuilder) throws CatalogException {
String dynamicOutputFolder;
if (inputFileUtils.isDynamicOutputFolder(value)) {
// If it starts with $OUTPUT/...
dynamicOutputFolder = inputFileUtils.getDynamicOutputFolder(value, getOutDir().toAbsolutePath().toString());
} else {
// If it starts directly with the subpath...
dynamicOutputFolder = inputFileUtils.appendSubpath(getOutDir().toAbsolutePath().toString(), value);
}
logger.info("Params: Dynamic output folder: '{}'", dynamicOutputFolder);
cliParamsBuilder.append(dynamicOutputFolder).append(" ");
}

/**
* Given a variable name, it removes the prefix '-' if present.
* Example: --input -> input; -input -> input; input -> input
*
* @param variable A parameter of a command line.
* @return the variable removing any '-' prefix.
*/
protected String removePrefix(String variable) {
String value = variable;
while (value.startsWith("-")) {
value = value.substring(1);
}
return value;
}

protected void updateJobInformation(List<String> tags, ToolInfoExecutor executor) throws CatalogException {
ObjectMap params = new ObjectMap()
.append(JobDBAdaptor.QueryParams.TAGS.key(), tags)
.append(JobDBAdaptor.QueryParams.TOOL_EXTERNAL_EXECUTOR.key(), executor);
catalogManager.getJobManager().update(getStudyFqn(), getJobId(), params, QueryOptions.empty(), token);
}

protected void processInputParams(String commandLineParams, StringBuilder builder) throws CatalogException {
String[] params = commandLineParams.split(" ");
for (String param : params) {
if (inputFileUtils.isDynamicOutputFolder(param)) {
processOutputCli(param, inputFileUtils, builder);
} else {
processInputCli(param, inputFileUtils, builder);
}
}
}

protected String runDocker(String image, String cli) throws IOException {
return runDocker(image, null, cli, null);
}

protected String runDocker(String image, AbstractMap.SimpleEntry<String, String> userOutputBinding, String cmdParams,
Map<String, String> userDockerParams) throws IOException {
AbstractMap.SimpleEntry<String, String> outputBinding = userOutputBinding != null
? userOutputBinding
: new AbstractMap.SimpleEntry<>(getOutDir().toAbsolutePath().toString(), getOutDir().toAbsolutePath().toString());

Map<String, String> dockerParams = new HashMap<>();
// Establish working directory
dockerParams.put("-w", getOutDir().toAbsolutePath().toString());
dockerParams.put("--volume", "/var/run/docker.sock:/var/run/docker.sock");
dockerParams.put("--env", "DOCKER_HOST='tcp://localhost:2375'");
dockerParams.put("--network", "host");
if (userDockerParams != null) {
dockerParams.putAll(userDockerParams);
}

return DockerUtils.run(image, dockerInputBindings, outputBinding, cmdParams, dockerParams);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ public final ExecutionResult start() throws ToolException {
logException(exception);
privateLogger.info("------- Tool '" + getId() + "' executed in "
+ TimeUtils.durationToString(result.getEnd().getTime() - result.getStart().getTime()) + " -------");
close();
}
return result;
}
Expand Down Expand Up @@ -341,6 +342,12 @@ protected void check() throws Exception {
*/
protected abstract void run() throws Exception;

/**
* Method that may be overrided by subclasses to clean up resources.
*/
protected void close() {
}

/**
* Method to be called by the Runtime shutdownHook in case of an unexpected system shutdown.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,9 @@

package org.opencb.opencga.analysis.tools;

import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.opencga.catalog.db.api.JobDBAdaptor;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.catalog.managers.StudyManager;
import org.opencb.opencga.core.api.ParamConstants;
import org.opencb.opencga.core.models.job.ToolInfoExecutor;

import java.util.List;

public abstract class OpenCgaToolScopeStudy extends OpenCgaTool {

Expand Down Expand Up @@ -56,11 +50,4 @@ protected final String getStudyFqn() throws CatalogException {
getToken()).first().getFqn();
}

protected void updateJobInformation(List<String> tags, ToolInfoExecutor executor) throws CatalogException {
ObjectMap params = new ObjectMap()
.append(JobDBAdaptor.QueryParams.TAGS.key(), tags)
.append(JobDBAdaptor.QueryParams.TOOL_EXTERNAL_EXECUTOR.key(), executor);
catalogManager.getJobManager().update(getStudyFqn(), getJobId(), params, QueryOptions.empty(), token);
}

}
Loading

0 comments on commit 31811c1

Please sign in to comment.