Skip to content

Commit

Permalink
storage: Add STDERR to exception thrown. Fix max_bytes_per_map. #TASK…
Browse files Browse the repository at this point in the history
…-6722
  • Loading branch information
j-coll committed Oct 10, 2024
1 parent 4b8dad2 commit 9ea00eb
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.opencb.opencga.core.models.variant.VariantWalkerParams;
import org.opencb.opencga.core.tools.annotations.Tool;
import org.opencb.opencga.core.tools.annotations.ToolParams;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryParam;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;

import java.net.URI;
Expand Down Expand Up @@ -55,6 +54,15 @@ protected void check() throws Exception {
}

format = VariantWriterFactory.toOutputFormat(toolParams.getOutputFileName(), toolParams.getOutputFileName());
if (!format.isPlain()) {
format = format.inPlain();
}

if (StringUtils.isEmpty(toolParams.getOutputFileName())) {
toolParams.setOutputFileName("output." + format.toString().toLowerCase() + ".gz");
} else if (!toolParams.getOutputFileName().endsWith(".gz")) {
toolParams.setOutputFileName(toolParams.getOutputFileName() + ".gz");
}
}

@Override
Expand All @@ -70,15 +78,11 @@ protected void run() throws Exception {
// The scratch directory is expected to be faster than the final directory
// This also avoids moving files to final directory if the tool fails
Path outDir = getScratchDir();
String outputFile = StringUtils.isEmpty(toolParams.getOutputFileName())
? outDir.toString()
: outDir.resolve(toolParams.getOutputFileName()).toString();
String outputFile = outDir.resolve(toolParams.getOutputFileName()).toString();
Query query = toolParams.toQuery();
QueryOptions queryOptions = new QueryOptions(params);
for (VariantQueryParam param : VariantQueryParam.values()) {
queryOptions.remove(param.key());
}
uris.addAll(variantStorageManager.walkData(outputFile,
QueryOptions queryOptions = new QueryOptions().append(QueryOptions.INCLUDE, toolParams.getInclude())
.append(QueryOptions.EXCLUDE, toolParams.getExclude());
uris.add(variantStorageManager.walkData(outputFile,
format, query, queryOptions, toolParams.getDockerImage(), toolParams.getCommandLine(), token));
});
step("move-files", () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import org.opencb.opencga.storage.core.variant.VariantStorageOptions;
import org.opencb.opencga.storage.core.variant.adaptors.*;
import org.opencb.opencga.storage.core.variant.adaptors.iterators.VariantDBIterator;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory.VariantOutputFormat;
import org.opencb.opencga.storage.core.variant.query.ParsedQuery;
import org.opencb.opencga.storage.core.variant.query.VariantQueryResult;
Expand All @@ -98,6 +99,7 @@

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -202,14 +204,19 @@ public List<URI> exportData(String outputFile, VariantOutputFormat outputFormat,
* @throws StorageEngineException If there is any error exporting variants
* @return generated files
*/
public List<URI> walkData(String outputFile, VariantOutputFormat format,
public URI walkData(String outputFile, VariantOutputFormat format,
Query query, QueryOptions queryOptions, String dockerImage, String commandLine, String token)
throws CatalogException, StorageEngineException {
String anyStudy = catalogUtils.getAnyStudy(query, token);
return secureAnalysis(VariantWalkerTool.ID, anyStudy, queryOptions, token, engine -> {
Query finalQuery = catalogUtils.parseQuery(query, queryOptions, engine.getCellBaseUtils(), token);
checkSamplesPermissions(finalQuery, queryOptions, token);
URI outputUri = new VariantExportOperationManager(this, engine).getOutputUri(outputFile, format, finalQuery, token);
URI outputUri;
try {
outputUri = UriUtils.createUri(outputFile);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
return engine.walkData(outputUri, format, finalQuery, queryOptions, dockerImage, commandLine);
});
}
Expand Down Expand Up @@ -533,7 +540,7 @@ public boolean hasVariantSetup(String studyStr, String token) throws CatalogExce

public ObjectMap configureProject(String projectStr, ObjectMap params, String token) throws CatalogException, StorageEngineException {
return secureOperationByProject("configure", projectStr, params, token, engine -> {
validateNewConfiguration(engine, params);
validateNewConfiguration(engine, params, token);

DataStore dataStore = getDataStoreByProjectId(projectStr, token);

Expand All @@ -546,7 +553,7 @@ public ObjectMap configureProject(String projectStr, ObjectMap params, String to

public ObjectMap configureStudy(String studyStr, ObjectMap params, String token) throws CatalogException, StorageEngineException {
return secureOperation("configure", studyStr, params, token, engine -> {
validateNewConfiguration(engine, params);
validateNewConfiguration(engine, params, token);
Study study = catalogManager.getStudyManager()
.get(studyStr,
new QueryOptions(INCLUDE, StudyDBAdaptor.QueryParams.INTERNAL_CONFIGURATION_VARIANT_ENGINE_OPTIONS.key()),
Expand All @@ -570,12 +577,13 @@ public ObjectMap configureStudy(String studyStr, ObjectMap params, String token)
});
}

private void validateNewConfiguration(VariantStorageEngine engine, ObjectMap params) throws StorageEngineException {
for (VariantStorageOptions option : VariantStorageOptions.values()) {
if (option.isProtected() && params.get(option.key()) != null) {
throw new StorageEngineException("Unable to update protected option '" + option.key() + "'");
}
private void validateNewConfiguration(VariantStorageEngine engine, ObjectMap params, String token)
throws StorageEngineException, CatalogException {
if (catalogManager.getAuthorizationManager().isOpencgaAdministrator(catalogManager.getUserManager().validateToken(token))) {
logger.info("Skip configuration validation. User is an admin.");
return;
}
engine.validateNewConfiguration(params);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opencb.opencga.core.testclassification.duration.MediumTests;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.opencb.opencga.storage.core.variant.VariantStorageOptions;

import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -101,6 +102,21 @@ public void testConfigure() throws CatalogException, StorageEngineException {
assertNotNull(vse2.getOptions().get("KeyFromTheSecondStudy"));
}

@Test
public void testConfigureProtectedValues() throws Exception {
VariantStorageOptions key = VariantStorageOptions.WALKER_DOCKER_MEMORY;
assertTrue(key.isProtected());
ObjectMap conf = new ObjectMap(key.key(), "30g");

String fqn = catalogManager.getProjectManager().get(projectId, null, sessionId).first().getFqn();

variantManager.configureProject(fqn, new ObjectMap(conf), opencga.getAdminToken());

thrown.expect(StorageEngineException.class);
thrown.expectMessage("Unable to update protected option '" + key.key() + "'");
variantManager.configureProject(projectId, new ObjectMap(conf), sessionId);
}

@Test
public void testConfigureSampleIndex() throws Exception {
SampleIndexConfiguration conf = getRandomConf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public List<URI> exportData(URI outputFile, VariantOutputFormat outputFormat, UR
return exporter.export(outputFile, outputFormat, variantsFile, parsedVariantQuery);
}

public List<URI> walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format, Query query, QueryOptions queryOptions,
public URI walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format, Query query, QueryOptions queryOptions,
String dockerImage, String commandLine)
throws IOException, StorageEngineException {
if (format == VariantWriterFactory.VariantOutputFormat.VCF || format == VariantWriterFactory.VariantOutputFormat.VCF_GZ) {
Expand All @@ -304,8 +304,11 @@ public List<URI> walkData(URI outputFile, VariantWriterFactory.VariantOutputForm

String dockerCommandLine = "docker run --rm -i "
+ "--memory " + memory + " "
+ "--cpus " + cpu + " "
+ "--user " + user + " ";
+ "--cpus " + cpu + " ";

if (StringUtils.isNotEmpty(user)) {
dockerCommandLine += "--user " + user + " ";
}

if (StringUtils.isNotEmpty(volume)) {
dockerCommandLine += "-v " + volume + ":/data ";
Expand All @@ -323,7 +326,7 @@ public List<URI> walkData(URI outputFile, VariantWriterFactory.VariantOutputForm
}


public abstract List<URI> walkData(URI outputFile, VariantOutputFormat format, Query query, QueryOptions queryOptions,
public abstract URI walkData(URI outputFile, VariantOutputFormat format, Query query, QueryOptions queryOptions,
String commandLine)
throws StorageEngineException;

Expand Down Expand Up @@ -1202,6 +1205,14 @@ public abstract void loadVariantScore(URI scoreFile, String study, String scoreN
@Override
public abstract void testConnection() throws StorageEngineException;

public void validateNewConfiguration(ObjectMap params) throws StorageEngineException {
for (VariantStorageOptions option : VariantStorageOptions.values()) {
if (option.isProtected() && params.get(option.key()) != null) {
throw new StorageEngineException("Unable to update protected option '" + option.key() + "'");
}
}
}

public void reloadCellbaseConfiguration() {
cellBaseUtils = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public enum VariantStorageOptions implements ConfigurationOption {

WALKER_DOCKER_MEMORY("walker.docker.memory", "512m", true),
WALKER_DOCKER_CPU("walker.docker.cpu", "1", true),
WALKER_DOCKER_USER("walker.docker.user", "root", true),
WALKER_DOCKER_USER("walker.docker.user", "", true),
WALKER_DOCKER_ENV("walker.docker.env", "", true),
WALKER_DOCKER_MOUNT("walker.docker.mount", "", true),
WALKER_DOCKER_OPTS("walker.docker.opts", "", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public boolean isSnappy() {
return extension.endsWith(".snappy");
}

public VariantOutputFormat inPlan() {
public VariantOutputFormat inPlain() {
if (!isPlain()) {
return VariantOutputFormat.valueOf(name().replace("_GZ", "").replace("_SNAPPY", ""));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,13 @@ variant:
search.intersect.always: false # Force intersect queries
search.intersect.params.threshold: 3 # Minimum number of QueryParams in the query to intersect

walker.docker.memory: "512m" # Memory limit for the docker executor
walker.docker.cpu: "1" # CPU limit for the docker executor
walker.docker.user: "" # User to run the docker executor
walker.docker.env: "" # Environment variables to be passed to the docker executor. e.g. key=value,key2=value2
walker.docker.mount: "" # Volumes to be mounted in the docker executor
walker.docker.opts: "" # Additional docker options

## The following section defines all available storage engine plugins installed
engines:
## Hadoop Storage Engine
Expand Down Expand Up @@ -177,6 +184,9 @@ variant:
# See opencb/opencga#352 for more info.
storage.hadoop.mr.scanner.timeout: 300000

# DOCKER_HOST environment variable to be used by the docker executor inside the MapReduce job
storage.hadoop.mr.stream.docker.host: ""

mapreduce.map.memory.mb: 2048
DeleteHBaseColumnDriver:
storage.hadoop.write.mappers.limit.factor: 4
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void importData(URI input, VariantMetadata metadata, List<StudyConfigurat
}

@Override
public List<URI> walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format, Query query, QueryOptions queryOptions, String commandLine) throws StorageEngineException {
public URI walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format, Query query, QueryOptions queryOptions, String commandLine) throws StorageEngineException {
throw new UnsupportedOperationException("Unable to walk data in " + getStorageEngineId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,20 +317,38 @@ protected VariantExporter newVariantExporter(VariantMetadataFactory metadataFact
}

@Override
public List<URI> walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format,
public URI walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format,
Query query, QueryOptions queryOptions, String commandLine) throws StorageEngineException {
ParsedVariantQuery variantQuery = parseQuery(query, queryOptions);
int studyId = variantQuery.getStudyQuery().getDefaultStudy().getId();
ObjectMap params = new ObjectMap(getOptions()).appendAll(variantQuery.getQuery()).appendAll(variantQuery.getInputOptions());
params.remove(StreamVariantDriver.COMMAND_LINE_PARAM);

String memory = getOptions().getString(WALKER_DOCKER_MEMORY.key(), WALKER_DOCKER_MEMORY.defaultValue());
int memoryBytes;
if (memory.endsWith("M") || memory.endsWith("m")) {
memoryBytes = Integer.parseInt(memory.substring(0, memory.length() - 1)) * 1024 * 1024;
} else if (memory.endsWith("G") || memory.endsWith("g")) {
memoryBytes = Integer.parseInt(memory.substring(0, memory.length() - 1)) * 1024 * 1024 * 1024;
} else {
memoryBytes = Integer.parseInt(memory);
}

String dockerHost = getOptions().getString(MR_STREAM_DOCKER_HOST.key(), MR_STREAM_DOCKER_HOST.defaultValue());
if (StringUtils.isNotEmpty(dockerHost)) {
params.put(StreamVariantDriver.ENVIRONMENT_VARIABLES, "DOCKER_HOST=" + dockerHost);
}

getMRExecutor().run(StreamVariantDriver.class, StreamVariantDriver.buildArgs(
null,
getVariantTableName(), studyId, null,
new ObjectMap().appendAll(variantQuery.getQuery()).appendAll(variantQuery.getInputOptions())
.append(StreamVariantDriver.MAX_BYTES_PER_MAP_PARAM, 1024 * 10)
params
.append(StreamVariantDriver.MAX_BYTES_PER_MAP_PARAM, memoryBytes / 2)
.append(StreamVariantDriver.COMMAND_LINE_BASE64_PARAM, Base64.getEncoder().encodeToString(commandLine.getBytes()))
.append(StreamVariantDriver.INPUT_FORMAT_PARAM, format.toString())
.append(StreamVariantDriver.OUTPUT_PARAM, outputFile)
), "");
return null;
), "Walk data");
return outputFile;
}

@Override
Expand Down Expand Up @@ -1335,4 +1353,15 @@ public void testConnection() throws StorageEngineException {
}
}

@Override
public void validateNewConfiguration(ObjectMap params) throws StorageEngineException {
super.validateNewConfiguration(params);

for (HadoopVariantStorageOptions option : HadoopVariantStorageOptions.values()) {
if (option.isProtected() && params.get(option.key()) != null) {
throw new StorageEngineException("Unable to update protected option '" + option.key() + "'");
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public enum HadoopVariantStorageOptions implements ConfigurationOption {
MR_EXECUTOR_SSH_HADOOP_SCP_BIN("storage.hadoop.mr.executor.ssh.hadoop-scp.bin", "misc/scripts/hadoop-scp.sh"),
MR_EXECUTOR_SSH_HADOOP_TERMINATION_GRACE_PERIOD_SECONDS("storage.hadoop.mr.executor.ssh.terminationGracePeriodSeconds", 120),

MR_STREAM_DOCKER_HOST("storage.hadoop.mr.stream.docker.host", "", true),

/////////////////////////
// Variant table configuration
/////////////////////////
Expand Down Expand Up @@ -134,6 +136,7 @@ public enum HadoopVariantStorageOptions implements ConfigurationOption {

private final String key;
private final Object value;
private final boolean isProtected;

HadoopVariantStorageOptions(String key) {
this(key, null);
Expand All @@ -142,6 +145,13 @@ public enum HadoopVariantStorageOptions implements ConfigurationOption {
HadoopVariantStorageOptions(String key, Object value) {
this.key = key;
this.value = value;
this.isProtected = false;
}

HadoopVariantStorageOptions(String key, Object value, boolean isProtected) {
this.key = key;
this.value = value;
this.isProtected = isProtected;
}

@Override
Expand All @@ -157,4 +167,11 @@ public String key() {
public <T> T defaultValue() {
return (T) value;
}

@Override
public boolean isProtected() {
return isProtected;
}


}
Loading

0 comments on commit 9ea00eb

Please sign in to comment.