diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/VariantWalkerTool.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/VariantWalkerTool.java index 3e826de405..a3eddd7eef 100644 --- a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/VariantWalkerTool.java +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/VariantWalkerTool.java @@ -26,7 +26,6 @@ import org.opencb.opencga.core.models.variant.VariantWalkerParams; import org.opencb.opencga.core.tools.annotations.Tool; import org.opencb.opencga.core.tools.annotations.ToolParams; -import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryParam; import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory; import java.net.URI; @@ -55,6 +54,15 @@ protected void check() throws Exception { } format = VariantWriterFactory.toOutputFormat(toolParams.getOutputFileName(), toolParams.getOutputFileName()); + if (!format.isPlain()) { + format = format.inPlain(); + } + + if (StringUtils.isEmpty(toolParams.getOutputFileName())) { + toolParams.setOutputFileName("output." + format.toString().toLowerCase() + ".gz"); + } else if (!toolParams.getOutputFileName().endsWith(".gz")) { + toolParams.setOutputFileName(toolParams.getOutputFileName() + ".gz"); + } } @Override @@ -70,15 +78,11 @@ protected void run() throws Exception { // The scratch directory is expected to be faster than the final directory // This also avoids moving files to final directory if the tool fails Path outDir = getScratchDir(); - String outputFile = StringUtils.isEmpty(toolParams.getOutputFileName()) - ? outDir.toString() - : outDir.resolve(toolParams.getOutputFileName()).toString(); + String outputFile = outDir.resolve(toolParams.getOutputFileName()).toString(); Query query = toolParams.toQuery(); - QueryOptions queryOptions = new QueryOptions(params); - for (VariantQueryParam param : VariantQueryParam.values()) { - queryOptions.remove(param.key()); - } - uris.addAll(variantStorageManager.walkData(outputFile, + QueryOptions queryOptions = new QueryOptions().append(QueryOptions.INCLUDE, toolParams.getInclude()) + .append(QueryOptions.EXCLUDE, toolParams.getExclude()); + uris.add(variantStorageManager.walkData(outputFile, format, query, queryOptions, toolParams.getDockerImage(), toolParams.getCommandLine(), token)); }); step("move-files", () -> { diff --git a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java index d1e276fbf3..f292e6d6a3 100644 --- a/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java +++ b/opencga-analysis/src/main/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManager.java @@ -88,6 +88,7 @@ import org.opencb.opencga.storage.core.variant.VariantStorageOptions; import org.opencb.opencga.storage.core.variant.adaptors.*; import org.opencb.opencga.storage.core.variant.adaptors.iterators.VariantDBIterator; +import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory; import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory.VariantOutputFormat; import org.opencb.opencga.storage.core.variant.query.ParsedQuery; import org.opencb.opencga.storage.core.variant.query.VariantQueryResult; @@ -98,6 +99,7 @@ import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.nio.file.Paths; import java.util.*; import java.util.concurrent.TimeUnit; @@ -202,14 +204,19 @@ public List exportData(String outputFile, VariantOutputFormat outputFormat, * @throws StorageEngineException If there is any error exporting variants * @return generated files */ - public List walkData(String outputFile, VariantOutputFormat format, + public URI walkData(String outputFile, VariantOutputFormat format, Query query, QueryOptions queryOptions, String dockerImage, String commandLine, String token) throws CatalogException, StorageEngineException { String anyStudy = catalogUtils.getAnyStudy(query, token); return secureAnalysis(VariantWalkerTool.ID, anyStudy, queryOptions, token, engine -> { Query finalQuery = catalogUtils.parseQuery(query, queryOptions, engine.getCellBaseUtils(), token); checkSamplesPermissions(finalQuery, queryOptions, token); - URI outputUri = new VariantExportOperationManager(this, engine).getOutputUri(outputFile, format, finalQuery, token); + URI outputUri; + try { + outputUri = UriUtils.createUri(outputFile); + } catch (URISyntaxException e) { + throw new IllegalArgumentException(e); + } return engine.walkData(outputUri, format, finalQuery, queryOptions, dockerImage, commandLine); }); } @@ -533,7 +540,7 @@ public boolean hasVariantSetup(String studyStr, String token) throws CatalogExce public ObjectMap configureProject(String projectStr, ObjectMap params, String token) throws CatalogException, StorageEngineException { return secureOperationByProject("configure", projectStr, params, token, engine -> { - validateNewConfiguration(engine, params); + validateNewConfiguration(engine, params, token); DataStore dataStore = getDataStoreByProjectId(projectStr, token); @@ -546,7 +553,7 @@ public ObjectMap configureProject(String projectStr, ObjectMap params, String to public ObjectMap configureStudy(String studyStr, ObjectMap params, String token) throws CatalogException, StorageEngineException { return secureOperation("configure", studyStr, params, token, engine -> { - validateNewConfiguration(engine, params); + validateNewConfiguration(engine, params, token); Study study = catalogManager.getStudyManager() .get(studyStr, new QueryOptions(INCLUDE, StudyDBAdaptor.QueryParams.INTERNAL_CONFIGURATION_VARIANT_ENGINE_OPTIONS.key()), @@ -570,12 +577,13 @@ public ObjectMap configureStudy(String studyStr, ObjectMap params, String token) }); } - private void validateNewConfiguration(VariantStorageEngine engine, ObjectMap params) throws StorageEngineException { - for (VariantStorageOptions option : VariantStorageOptions.values()) { - if (option.isProtected() && params.get(option.key()) != null) { - throw new StorageEngineException("Unable to update protected option '" + option.key() + "'"); - } + private void validateNewConfiguration(VariantStorageEngine engine, ObjectMap params, String token) + throws StorageEngineException, CatalogException { + if (catalogManager.getAuthorizationManager().isOpencgaAdministrator(catalogManager.getUserManager().validateToken(token))) { + logger.info("Skip configuration validation. User is an admin."); + return; } + engine.validateNewConfiguration(params); } /** diff --git a/opencga-analysis/src/test/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManagerTest.java b/opencga-analysis/src/test/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManagerTest.java index 4aeedde871..6f371c7fa8 100644 --- a/opencga-analysis/src/test/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManagerTest.java +++ b/opencga-analysis/src/test/java/org/opencb/opencga/analysis/variant/manager/VariantStorageManagerTest.java @@ -35,6 +35,7 @@ import org.opencb.opencga.core.testclassification.duration.MediumTests; import org.opencb.opencga.storage.core.exceptions.StorageEngineException; import org.opencb.opencga.storage.core.variant.VariantStorageEngine; +import org.opencb.opencga.storage.core.variant.VariantStorageOptions; import java.util.Collections; import java.util.HashSet; @@ -101,6 +102,21 @@ public void testConfigure() throws CatalogException, StorageEngineException { assertNotNull(vse2.getOptions().get("KeyFromTheSecondStudy")); } + @Test + public void testConfigureProtectedValues() throws Exception { + VariantStorageOptions key = VariantStorageOptions.WALKER_DOCKER_MEMORY; + assertTrue(key.isProtected()); + ObjectMap conf = new ObjectMap(key.key(), "30g"); + + String fqn = catalogManager.getProjectManager().get(projectId, null, sessionId).first().getFqn(); + + variantManager.configureProject(fqn, new ObjectMap(conf), opencga.getAdminToken()); + + thrown.expect(StorageEngineException.class); + thrown.expectMessage("Unable to update protected option '" + key.key() + "'"); + variantManager.configureProject(projectId, new ObjectMap(conf), sessionId); + } + @Test public void testConfigureSampleIndex() throws Exception { SampleIndexConfiguration conf = getRandomConf(); diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java index bf46887740..b10b2c7305 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageEngine.java @@ -285,7 +285,7 @@ public List exportData(URI outputFile, VariantOutputFormat outputFormat, UR return exporter.export(outputFile, outputFormat, variantsFile, parsedVariantQuery); } - public List walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format, Query query, QueryOptions queryOptions, + public URI walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format, Query query, QueryOptions queryOptions, String dockerImage, String commandLine) throws IOException, StorageEngineException { if (format == VariantWriterFactory.VariantOutputFormat.VCF || format == VariantWriterFactory.VariantOutputFormat.VCF_GZ) { @@ -304,8 +304,11 @@ public List walkData(URI outputFile, VariantWriterFactory.VariantOutputForm String dockerCommandLine = "docker run --rm -i " + "--memory " + memory + " " - + "--cpus " + cpu + " " - + "--user " + user + " "; + + "--cpus " + cpu + " "; + + if (StringUtils.isNotEmpty(user)) { + dockerCommandLine += "--user " + user + " "; + } if (StringUtils.isNotEmpty(volume)) { dockerCommandLine += "-v " + volume + ":/data "; @@ -323,7 +326,7 @@ public List walkData(URI outputFile, VariantWriterFactory.VariantOutputForm } - public abstract List walkData(URI outputFile, VariantOutputFormat format, Query query, QueryOptions queryOptions, + public abstract URI walkData(URI outputFile, VariantOutputFormat format, Query query, QueryOptions queryOptions, String commandLine) throws StorageEngineException; @@ -1202,6 +1205,14 @@ public abstract void loadVariantScore(URI scoreFile, String study, String scoreN @Override public abstract void testConnection() throws StorageEngineException; + public void validateNewConfiguration(ObjectMap params) throws StorageEngineException { + for (VariantStorageOptions option : VariantStorageOptions.values()) { + if (option.isProtected() && params.get(option.key()) != null) { + throw new StorageEngineException("Unable to update protected option '" + option.key() + "'"); + } + } + } + public void reloadCellbaseConfiguration() { cellBaseUtils = null; } diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageOptions.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageOptions.java index f7736bd5b1..b00bd525bd 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageOptions.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/VariantStorageOptions.java @@ -102,7 +102,7 @@ public enum VariantStorageOptions implements ConfigurationOption { WALKER_DOCKER_MEMORY("walker.docker.memory", "512m", true), WALKER_DOCKER_CPU("walker.docker.cpu", "1", true), - WALKER_DOCKER_USER("walker.docker.user", "root", true), + WALKER_DOCKER_USER("walker.docker.user", "", true), WALKER_DOCKER_ENV("walker.docker.env", "", true), WALKER_DOCKER_MOUNT("walker.docker.mount", "", true), WALKER_DOCKER_OPTS("walker.docker.opts", "", true), diff --git a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/io/VariantWriterFactory.java b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/io/VariantWriterFactory.java index fa002facbd..61c2e6552d 100644 --- a/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/io/VariantWriterFactory.java +++ b/opencga-storage/opencga-storage-core/src/main/java/org/opencb/opencga/storage/core/variant/io/VariantWriterFactory.java @@ -122,7 +122,7 @@ public boolean isSnappy() { return extension.endsWith(".snappy"); } - public VariantOutputFormat inPlan() { + public VariantOutputFormat inPlain() { if (!isPlain()) { return VariantOutputFormat.valueOf(name().replace("_GZ", "").replace("_SNAPPY", "")); } else { diff --git a/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml b/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml index b9970d18ea..21c2dd12f7 100644 --- a/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml +++ b/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml @@ -128,6 +128,13 @@ variant: search.intersect.always: false # Force intersect queries search.intersect.params.threshold: 3 # Minimum number of QueryParams in the query to intersect + walker.docker.memory: "512m" # Memory limit for the docker executor + walker.docker.cpu: "1" # CPU limit for the docker executor + walker.docker.user: "" # User to run the docker executor + walker.docker.env: "" # Environment variables to be passed to the docker executor. e.g. key=value,key2=value2 + walker.docker.mount: "" # Volumes to be mounted in the docker executor + walker.docker.opts: "" # Additional docker options + ## The following section defines all available storage engine plugins installed engines: ## Hadoop Storage Engine @@ -177,6 +184,9 @@ variant: # See opencb/opencga#352 for more info. storage.hadoop.mr.scanner.timeout: 300000 + # DOCKER_HOST environment variable to be used by the docker executor inside the MapReduce job + storage.hadoop.mr.stream.docker.host: "" + mapreduce.map.memory.mb: 2048 DeleteHBaseColumnDriver: storage.hadoop.write.mappers.limit.factor: 4 diff --git a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/dummy/DummyVariantStorageEngine.java b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/dummy/DummyVariantStorageEngine.java index e10370dcaa..65a0169ef8 100644 --- a/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/dummy/DummyVariantStorageEngine.java +++ b/opencga-storage/opencga-storage-core/src/test/java/org/opencb/opencga/storage/core/variant/dummy/DummyVariantStorageEngine.java @@ -143,7 +143,7 @@ public void importData(URI input, VariantMetadata metadata, List walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format, Query query, QueryOptions queryOptions, String commandLine) throws StorageEngineException { + public URI walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format, Query query, QueryOptions queryOptions, String commandLine) throws StorageEngineException { throw new UnsupportedOperationException("Unable to walk data in " + getStorageEngineId()); } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java index fdee34d313..061bc95642 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageEngine.java @@ -317,20 +317,38 @@ protected VariantExporter newVariantExporter(VariantMetadataFactory metadataFact } @Override - public List walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format, + public URI walkData(URI outputFile, VariantWriterFactory.VariantOutputFormat format, Query query, QueryOptions queryOptions, String commandLine) throws StorageEngineException { ParsedVariantQuery variantQuery = parseQuery(query, queryOptions); int studyId = variantQuery.getStudyQuery().getDefaultStudy().getId(); + ObjectMap params = new ObjectMap(getOptions()).appendAll(variantQuery.getQuery()).appendAll(variantQuery.getInputOptions()); + params.remove(StreamVariantDriver.COMMAND_LINE_PARAM); + + String memory = getOptions().getString(WALKER_DOCKER_MEMORY.key(), WALKER_DOCKER_MEMORY.defaultValue()); + int memoryBytes; + if (memory.endsWith("M") || memory.endsWith("m")) { + memoryBytes = Integer.parseInt(memory.substring(0, memory.length() - 1)) * 1024 * 1024; + } else if (memory.endsWith("G") || memory.endsWith("g")) { + memoryBytes = Integer.parseInt(memory.substring(0, memory.length() - 1)) * 1024 * 1024 * 1024; + } else { + memoryBytes = Integer.parseInt(memory); + } + + String dockerHost = getOptions().getString(MR_STREAM_DOCKER_HOST.key(), MR_STREAM_DOCKER_HOST.defaultValue()); + if (StringUtils.isNotEmpty(dockerHost)) { + params.put(StreamVariantDriver.ENVIRONMENT_VARIABLES, "DOCKER_HOST=" + dockerHost); + } + getMRExecutor().run(StreamVariantDriver.class, StreamVariantDriver.buildArgs( null, getVariantTableName(), studyId, null, - new ObjectMap().appendAll(variantQuery.getQuery()).appendAll(variantQuery.getInputOptions()) - .append(StreamVariantDriver.MAX_BYTES_PER_MAP_PARAM, 1024 * 10) + params + .append(StreamVariantDriver.MAX_BYTES_PER_MAP_PARAM, memoryBytes / 2) .append(StreamVariantDriver.COMMAND_LINE_BASE64_PARAM, Base64.getEncoder().encodeToString(commandLine.getBytes())) .append(StreamVariantDriver.INPUT_FORMAT_PARAM, format.toString()) .append(StreamVariantDriver.OUTPUT_PARAM, outputFile) - ), ""); - return null; + ), "Walk data"); + return outputFile; } @Override @@ -1335,4 +1353,15 @@ public void testConnection() throws StorageEngineException { } } + @Override + public void validateNewConfiguration(ObjectMap params) throws StorageEngineException { + super.validateNewConfiguration(params); + + for (HadoopVariantStorageOptions option : HadoopVariantStorageOptions.values()) { + if (option.isProtected() && params.get(option.key()) != null) { + throw new StorageEngineException("Unable to update protected option '" + option.key() + "'"); + } + } + } + } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java index 817605be87..363b07e9fb 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/HadoopVariantStorageOptions.java @@ -60,6 +60,8 @@ public enum HadoopVariantStorageOptions implements ConfigurationOption { MR_EXECUTOR_SSH_HADOOP_SCP_BIN("storage.hadoop.mr.executor.ssh.hadoop-scp.bin", "misc/scripts/hadoop-scp.sh"), MR_EXECUTOR_SSH_HADOOP_TERMINATION_GRACE_PERIOD_SECONDS("storage.hadoop.mr.executor.ssh.terminationGracePeriodSeconds", 120), + MR_STREAM_DOCKER_HOST("storage.hadoop.mr.stream.docker.host", "", true), + ///////////////////////// // Variant table configuration ///////////////////////// @@ -134,6 +136,7 @@ public enum HadoopVariantStorageOptions implements ConfigurationOption { private final String key; private final Object value; + private final boolean isProtected; HadoopVariantStorageOptions(String key) { this(key, null); @@ -142,6 +145,13 @@ public enum HadoopVariantStorageOptions implements ConfigurationOption { HadoopVariantStorageOptions(String key, Object value) { this.key = key; this.value = value; + this.isProtected = false; + } + + HadoopVariantStorageOptions(String key, Object value, boolean isProtected) { + this.key = key; + this.value = value; + this.isProtected = isProtected; } @Override @@ -157,4 +167,11 @@ public String key() { public T defaultValue() { return (T) value; } + + @Override + public boolean isProtected() { + return isProtected; + } + + } diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java index bb31552ad6..5a248e190e 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantDriver.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; +import java.util.HashMap; import java.util.Map; public class StreamVariantDriver extends VariantDriver { @@ -28,11 +29,13 @@ public class StreamVariantDriver extends VariantDriver { public static final String COMMAND_LINE_PARAM = "commandLine"; public static final String COMMAND_LINE_BASE64_PARAM = "commandLineBase64"; public static final String MAX_BYTES_PER_MAP_PARAM = "maxBytesPerMap"; + public static final String ENVIRONMENT_VARIABLES = "envVars"; private VariantWriterFactory.VariantOutputFormat format; private int maxBytesPerMap; private static Logger logger = LoggerFactory.getLogger(StreamVariantDriver.class); private String commandLine; + private Map envVars; private Class mapperClass; private Class reducerClass; @@ -76,6 +79,20 @@ protected void parseAndValidateParameters() throws IOException { commandLine = new String(java.util.Base64.getDecoder().decode(commandLineBase64)); } + envVars = new HashMap<>(); + String envVarsStr = getParam(ENVIRONMENT_VARIABLES); + if (StringUtils.isNotEmpty(envVarsStr)) { + String[] split = envVarsStr.split(","); + for (String s : split) { + String[] split1 = s.split("="); + if (split1.length != 2) { + throw new IllegalArgumentException("Invalid environment variable '" + s + "'"); + } + envVars.put(split1[0], split1[1]); + } + } + + String outdirStr = getParam(OUTPUT_PARAM); if (StringUtils.isEmpty(outdirStr)) { throw new IllegalArgumentException("Missing argument " + OUTPUT_PARAM); @@ -115,6 +132,7 @@ protected void setupJob(Job job) throws IOException { StreamVariantMapper.setCommandLine(job, commandLine); StreamVariantMapper.setVariantFormat(job, format); StreamVariantMapper.setMaxInputBytesPerProcess(job, maxBytesPerMap); + StreamVariantMapper.setEnvironment(job, envVars); reducerClass = Reducer.class; diff --git a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantMapper.java b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantMapper.java index 163d6bd964..df5425e8d4 100644 --- a/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantMapper.java +++ b/opencga-storage/opencga-storage-hadoop/opencga-storage-hadoop-core/src/main/java/org/opencb/opencga/storage/hadoop/variant/mr/StreamVariantMapper.java @@ -18,10 +18,7 @@ import org.opencb.opencga.storage.hadoop.variant.metadata.HBaseVariantStorageMetadataDBAdaptorFactory; import java.io.*; -import java.util.Base64; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicReference; +import java.util.*; import static org.opencb.opencga.storage.hadoop.variant.mr.VariantsTableMapReduceHelper.COUNTER_GROUP_NAME; @@ -30,8 +27,9 @@ public class StreamVariantMapper extends VariantMapper throwable = new AtomicReference<>(); + protected final List throwables = Collections.synchronizedList(new ArrayList<>()); private volatile boolean processProvidedStatus_ = false; public static void setCommandLine(Job job, String commandLine) { String commandLineBase64 = Base64.getEncoder().encodeToString(commandLine.getBytes()); - job.getConfiguration().set(STREAMPROCESSOR, commandLineBase64); + job.getConfiguration().set(COMMANDLINE_BASE64, commandLineBase64); } public static void setVariantFormat(Job job, VariantWriterFactory.VariantOutputFormat format) { @@ -82,16 +80,15 @@ public static void setMaxInputBytesPerProcess(Job job, int maxInputBytesPerProce protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Configuration conf = context.getConfiguration(); - commandLine = new String(Base64.getDecoder().decode(conf.get(STREAMPROCESSOR))); + commandLine = new String(Base64.getDecoder().decode(conf.get(COMMANDLINE_BASE64))); maxInputBytesPerProcess = conf.getInt(MAX_INPUT_BYTES_PER_PROCESS, 1024 * 1024 * 1024); format = VariantWriterFactory.toOutputFormat(conf.get(VARIANT_FORMAT), ""); if (!format.isPlain()) { - format = format.inPlan(); + format = format.inPlain(); } - envs = new HashMap<>(); - addEnvironment(envs, conf.get("stream.addenvironment")); + addEnvironment(envs, conf); // add TMPDIR environment variable with the value of java.io.tmpdir envs.put("TMPDIR", System.getProperty("java.io.tmpdir")); @@ -135,24 +132,33 @@ public void run(Context context) throws IOException, InterruptedException { } private boolean hasExceptions() { - return throwable.get() != null; + return !throwables.isEmpty(); } private void setException(Throwable th) { - if (!throwable.compareAndSet(null, th)) { - synchronized (throwable) { - // addSuppressed is not thread safe - throwable.get().addSuppressed(th); - } - } + throwables.add(th); LOG.warn("{}", th); } private void throwExceptionIfAny() throws IOException { if (hasExceptions()) { - Throwable cause = throwable.get(); - throwable.set(null); - throw new IOException("MROutput/MRErrThread failed:", cause); + String message = "StreamVariantMapper failed:"; + if (stderrThread != null) { + String stderr = String.join("\n", stderrThread.stderrBuffer); + message += "\nSTDERR: " + stderr; + } + if (throwables.size() == 1) { + Throwable cause = throwables.get(0); + throwables.clear(); + throw new IOException(message, cause); + } else { + IOException exception = new IOException(message); + for (int i = 1; i < throwables.size(); i++) { + exception.addSuppressed(throwables.get(i)); + } + throwables.clear(); + throw exception; + } } } @@ -247,7 +253,6 @@ private void startProcess(Context context) throws IOException { processedBytes = 0; numRecordsRead = 0; numRecordsWritten = 0; - throwable.set(null); variantDataWriter.open(); variantDataWriter.pre(); @@ -255,7 +260,30 @@ private void startProcess(Context context) throws IOException { } - void addEnvironment(Map env, String nameVals) { + public static void setEnvironment(Job job, Map env) { + if (env == null || env.isEmpty()) { + return; + } + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : env.entrySet()) { + if (entry.getKey().contains(" ") || entry.getValue().contains(" ")) { + throw new IllegalArgumentException("Environment variables cannot contain spaces: " + + "'" + entry.getKey() + "' = '" + entry.getValue() + "'"); + } + if (entry.getKey().contains("=") || entry.getValue().contains("=")) { + throw new IllegalArgumentException("Environment variables cannot contain '=': " + + "'" + entry.getKey() + "' = '" + entry.getValue() + "'"); + } + if (sb.length() > 0) { + sb.append(" "); + } + sb.append(entry.getKey()).append("=").append(entry.getValue()); + } + job.getConfiguration().set(ADDENVIRONMENT_PARAM, sb.toString()); + } + + public static void addEnvironment(Map env, Configuration conf) { + String nameVals = conf.get(ADDENVIRONMENT_PARAM); // encoding "a=b c=d" from StreamJob if (nameVals == null) { return; @@ -264,7 +292,7 @@ void addEnvironment(Map env, String nameVals) { for (int i = 0; i < nv.length; i++) { String[] pair = nv[i].split("=", 2); if (pair.length != 2) { - LOG.info("Skip env entry:" + nv[i]); + throw new IllegalArgumentException("Invalid name=value: " + nv[i]); } else { env.put(pair[0], pair[1]); } @@ -319,6 +347,9 @@ private class MRErrorThread extends Thread { private final String reporterPrefix; private final String counterPrefix; private final String statusPrefix; + private final LinkedList stderrBuffer = new LinkedList<>(); + private int stderrBufferSize = 0; + private static final int STDERR_BUFFER_CAPACITY = 10 * 1024; MRErrorThread(Context context) { this.context = context; @@ -345,6 +376,12 @@ public void run() { LOG.warn("Cannot parse reporter line: " + lineStr); } } else { + // Store STDERR in a circular buffer (just the last 10KB), and include it in case of exception + stderrBuffer.add(lineStr); + stderrBufferSize += lineStr.length(); + while (stderrBufferSize > STDERR_BUFFER_CAPACITY && stderrBuffer.size() > 3) { + stderrBufferSize -= stderrBuffer.remove().length(); + } LOG.info("[STDERR] - " + lineStr); // System.err.println(lineStr); }