Skip to content

Commit

Permalink
analysis: improve nextflow executor, #TASK-6445
Browse files Browse the repository at this point in the history
  • Loading branch information
pfurio committed Sep 30, 2024
1 parent c15799b commit d74a555
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opencb.commons.utils.DockerUtils;
import org.opencb.opencga.analysis.tools.OpenCgaToolScopeStudy;
import org.opencb.opencga.catalog.db.api.WorkflowDBAdaptor;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.catalog.utils.InputFileUtils;
import org.opencb.opencga.core.common.JacksonUtils;
import org.opencb.opencga.core.common.TimeUtils;
Expand All @@ -22,6 +23,7 @@
import org.opencb.opencga.core.models.workflow.NextFlowRunParams;
import org.opencb.opencga.core.models.workflow.Workflow;
import org.opencb.opencga.core.models.workflow.WorkflowScript;
import org.opencb.opencga.core.models.workflow.WorkflowVariable;
import org.opencb.opencga.core.response.OpenCGAResult;
import org.opencb.opencga.core.tools.annotations.Tool;
import org.opencb.opencga.core.tools.annotations.ToolParams;
Expand Down Expand Up @@ -50,8 +52,7 @@ public class NextFlowExecutor extends OpenCgaToolScopeStudy {

private Workflow workflow;
private String cliParams;
// Build list of inputfiles in case we need to specifically mount them in read only mode
private List<String> inputFileUris;

// Build list of inputfiles in case we need to specifically mount them in read only mode
List<AbstractMap.SimpleEntry<String, String>> inputBindings;

Expand All @@ -62,9 +63,6 @@ public class NextFlowExecutor extends OpenCgaToolScopeStudy {
private Thread thread;
private final int monitorThreadPeriod = 5000;

// private final Path inputDir = Paths.get("/data/input");
// private final String outputDir = "/data/output";

private final static Logger logger = LoggerFactory.getLogger(NextFlowExecutor.class);

@Override
Expand Down Expand Up @@ -97,6 +95,18 @@ protected void check() throws Exception {

outDirPath = getOutDir().toAbsolutePath().toString();

Set<String> mandatoryParams = new HashSet<>();
Map<String, WorkflowVariable> variableMap = new HashMap<>();
if (CollectionUtils.isNotEmpty(workflow.getVariables())) {
for (WorkflowVariable variable : workflow.getVariables()) {
String variableId = removePrefix(variable.getId());
variableMap.put(variableId, variable);
if (variable.isRequired()) {
mandatoryParams.add(variableId);
}
}
}

// Update job tags and attributes
ToolInfoExecutor toolInfoExecutor = new ToolInfoExecutor(workflow.getManager().getId().name(), workflow.getManager().getVersion());
Set<String> tags = new HashSet<>();
Expand All @@ -111,55 +121,123 @@ protected void check() throws Exception {

this.inputBindings = new LinkedList<>();
if (MapUtils.isNotEmpty(nextflowParams.getParams())) {
this.inputFileUris = new LinkedList<>();
InputFileUtils inputFileUtils = new InputFileUtils(catalogManager);

StringBuilder cliParamsBuilder = new StringBuilder();
for (Map.Entry<String, String> entry : nextflowParams.getParams().entrySet()) {
String variableId = removePrefix(entry.getKey());
// Remove from the mandatoryParams set
mandatoryParams.remove(variableId);

WorkflowVariable workflowVariable = variableMap.get(variableId);

if (entry.getKey().startsWith("-")) {
cliParamsBuilder.append(entry.getKey()).append(" ");
} else {
cliParamsBuilder.append("--").append(entry.getKey()).append(" ");
}
if (StringUtils.isNotEmpty(entry.getValue())) {
if (inputFileUtils.isValidOpenCGAFile(entry.getValue())) {
File file = inputFileUtils.findOpenCGAFileFromPattern(study, entry.getValue(), token);
if (inputFileUtils.fileMayContainReferencesToOtherFiles(file)) {
Path outputFile = temporalInputDir.resolve(file.getName());
List<File> files = inputFileUtils.findAndReplaceFilePathToUrisFromFile(study, file, outputFile, token);

// Write outputFile as inputBinding
inputBindings.add(new AbstractMap.SimpleEntry<>(outputFile.toString(), outputFile.toString()));
logger.info("Params: OpenCGA input file: {}", outputFile);
cliParamsBuilder.append(outputFile).append(" ");

// Add files to inputBindings to ensure they are also mounted (if any)
for (File tmpFile : files) {
inputBindings.add(new AbstractMap.SimpleEntry<>(tmpFile.getUri().getPath(), tmpFile.getUri().getPath()));
logger.info("Inner files from '{}': OpenCGA input file: '{}'", outputFile, tmpFile.getUri().getPath());
}
} else {
String path = file.getUri().getPath();
inputBindings.add(new AbstractMap.SimpleEntry<>(path, path));
logger.info("Params: OpenCGA input file: {}", path);
cliParamsBuilder.append(path).append(" ");
}
} else if (inputFileUtils.isDynamicOutputFolder(entry.getValue())) {
String dynamicOutputFolder = inputFileUtils.getDynamicOutputFolder(entry.getValue(), outDirPath);
logger.info("Params: Dynamic output folder: {}", dynamicOutputFolder);
cliParamsBuilder.append(dynamicOutputFolder).append(" ");
if ((workflowVariable != null && workflowVariable.isOutput()) || inputFileUtils.isDynamicOutputFolder(entry.getValue())) {
processOutputCli(entry.getValue(), inputFileUtils, cliParamsBuilder);
} else {
cliParamsBuilder.append(entry.getValue()).append(" ");
processInputCli(entry.getValue(), inputFileUtils, cliParamsBuilder);
}
} else if (workflowVariable != null) {
if (StringUtils.isNotEmpty(workflowVariable.getDefaultValue())) {
cliParamsBuilder.append(workflowVariable.getDefaultValue()).append(" ");
} else if (workflowVariable.isOutput()) {
processOutputCli("", inputFileUtils, cliParamsBuilder);
} else if (workflowVariable.isRequired() && workflowVariable.getType() != WorkflowVariable.WorkflowType.FLAG) {
throw new ToolException("Missing value for mandatory parameter: " + variableId);
}
}
}
for (String mandatoryParam : mandatoryParams) {
logger.info("Processing missing mandatory param: '{}'", mandatoryParam);
WorkflowVariable workflowVariable = variableMap.get(mandatoryParam);

if (workflowVariable.getId().startsWith("-")) {
cliParamsBuilder.append(workflowVariable.getId()).append(" ");
} else {
cliParamsBuilder.append("--").append(workflowVariable.getId()).append(" ");
}

if (StringUtils.isNotEmpty(workflowVariable.getDefaultValue())) {
if (workflowVariable.isOutput()) {
processOutputCli(workflowVariable.getDefaultValue(), inputFileUtils, cliParamsBuilder);
} else {
processInputCli(workflowVariable.getDefaultValue(), inputFileUtils, cliParamsBuilder);
}
} else if (workflowVariable.isOutput()) {
processOutputCli("", inputFileUtils, cliParamsBuilder);
} else {
throw new ToolException("Missing mandatory parameter: " + mandatoryParam);
}
}

this.cliParams = cliParamsBuilder.toString();
} else {
this.cliParams = "";
this.inputFileUris = Collections.emptyList();
}
}

void processInputCli(String value, InputFileUtils inputFileUtils, StringBuilder cliParamsBuilder) throws CatalogException {
if (inputFileUtils.isValidOpenCGAFile(value)) {
File file = inputFileUtils.findOpenCGAFileFromPattern(study, value, token);
if (inputFileUtils.fileMayContainReferencesToOtherFiles(file)) {
Path outputFile = temporalInputDir.resolve(file.getName());
List<File> files = inputFileUtils.findAndReplaceFilePathToUrisFromFile(study, file, outputFile, token);

// Write outputFile as inputBinding
inputBindings.add(new AbstractMap.SimpleEntry<>(outputFile.toString(), outputFile.toString()));
logger.info("Params: OpenCGA input file: {}", outputFile);
cliParamsBuilder.append(outputFile).append(" ");

// Add files to inputBindings to ensure they are also mounted (if any)
for (File tmpFile : files) {
inputBindings.add(new AbstractMap.SimpleEntry<>(tmpFile.getUri().getPath(), tmpFile.getUri().getPath()));
logger.info("Inner files from '{}': OpenCGA input file: '{}'", outputFile, tmpFile.getUri().getPath());
}
} else {
String path = file.getUri().getPath();
inputBindings.add(new AbstractMap.SimpleEntry<>(path, path));
logger.info("Params: OpenCGA input file: {}", path);
cliParamsBuilder.append(path).append(" ");
}
} else {
cliParamsBuilder.append(value).append(" ");
}

}

void processOutputCli(String value, InputFileUtils inputFileUtils, StringBuilder cliParamsBuilder) {
String dynamicOutputFolder;
if (inputFileUtils.isDynamicOutputFolder(value)) {
// If it starts with $OUTPUT/...
dynamicOutputFolder = inputFileUtils.getDynamicOutputFolder(value, outDirPath);
} else {
// If it starts directly with the subpath...
dynamicOutputFolder = inputFileUtils.appendSubpath(outDirPath, value);
}
logger.info("Params: Dynamic output folder: {}", dynamicOutputFolder);
cliParamsBuilder.append(dynamicOutputFolder).append(" ");
}

/**
* Given a variable name, it removes the prefix '-' if present.
* Example: --input -> input; -input -> input; input -> input
*
* @param variable A parameter of the nextflow command line.
* @return the variable removing any '-' prefix.
*/
private String removePrefix(String variable) {
String value = variable;
while (value.startsWith("-")) {
value = value.substring(1);
}
return value;
}

@Override
protected void run() throws Exception {
for (WorkflowScript script : workflow.getScripts()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,14 @@ public String getDynamicOutputFolder(String file, String outDir) {
return outDir + file.substring(OUTPUT.length());
}

public String appendSubpath(String path, String subpath) {
if ((path.endsWith("/") && !subpath.startsWith("/")) || (!path.endsWith("/") && subpath.startsWith("/"))) {
return path + subpath;
} else if (path.endsWith("/") && subpath.startsWith("/")) {
return path + subpath.substring(1);
} else {
return path + "/" + subpath;
}
}

}

0 comments on commit d74a555

Please sign in to comment.