Skip to content

Commit

Permalink
Merge pull request #2454 from opencb/TASK-5448
Browse files Browse the repository at this point in the history
TASK-5448 - Add mandatory variant setup step
  • Loading branch information
j-coll authored Sep 3, 2024
2 parents e3565be + fc2e0d9 commit 8bc7529
Show file tree
Hide file tree
Showing 78 changed files with 1,929 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@
import org.opencb.opencga.core.models.sample.SamplePermissions;
import org.opencb.opencga.core.models.study.Study;
import org.opencb.opencga.core.models.study.StudyPermissions;
import org.opencb.opencga.core.models.study.VariantSetupResult;
import org.opencb.opencga.core.models.variant.VariantSetupParams;
import org.opencb.opencga.core.response.OpenCGAResult;
import org.opencb.opencga.storage.core.variant.query.VariantQueryResult;
import org.opencb.opencga.core.tools.ToolParams;
import org.opencb.opencga.storage.core.StorageEngineFactory;
import org.opencb.opencga.storage.core.StoragePipelineResult;
Expand All @@ -88,6 +89,7 @@
import org.opencb.opencga.storage.core.variant.adaptors.iterators.VariantDBIterator;
import org.opencb.opencga.storage.core.variant.io.VariantWriterFactory.VariantOutputFormat;
import org.opencb.opencga.storage.core.variant.query.ParsedQuery;
import org.opencb.opencga.storage.core.variant.query.VariantQueryResult;
import org.opencb.opencga.storage.core.variant.query.VariantQueryUtils;
import org.opencb.opencga.storage.core.variant.query.projection.VariantQueryProjectionParser;
import org.opencb.opencga.storage.core.variant.score.VariantScoreFormatDescriptor;
Expand Down Expand Up @@ -490,6 +492,18 @@ public void aggregate(String studyStr, VariantAggregateParams params, String tok
});
}

public VariantSetupResult variantSetup(String studyStr, VariantSetupParams params, String token)
throws CatalogException, StorageEngineException {
return secureOperation(VariantSetupOperationManager.ID, studyStr, params.toObjectMap(), token,
engine -> new VariantSetupOperationManager(this, engine).setup(getStudyFqn(studyStr, token), params, token));
}

public boolean hasVariantSetup(String studyStr, String token) throws CatalogException {
Study study = catalogManager.getStudyManager().get(studyStr,
new QueryOptions(INCLUDE, StudyDBAdaptor.QueryParams.INTERNAL_CONFIGURATION_VARIANT_ENGINE.key()), token).first();
return VariantSetupOperationManager.hasVariantSetup(study);
}

public ObjectMap configureProject(String projectStr, ObjectMap params, String token) throws CatalogException, StorageEngineException {
return secureOperationByProject("configure", projectStr, params, token, engine -> {
DataStore dataStore = getDataStoreByProjectId(projectStr, token);
Expand Down Expand Up @@ -1181,7 +1195,7 @@ private interface VariantOperationFunction<R> {
private <R> R secureOperationByProject(String operationName, String project, ObjectMap params, String token, VariantOperationFunction<R> operation)
throws CatalogException, StorageEngineException {
try (VariantStorageEngine variantStorageEngine = getVariantStorageEngineByProject(project, params, token)) {
return secureTool(operationName, true, params, token, variantStorageEngine, operation);
return secureTool(operationName, true, null, params, token, variantStorageEngine, operation);
} catch (IOException e) {
throw new StorageEngineException("Error closing the VariantStorageEngine", e);
}
Expand All @@ -1190,7 +1204,7 @@ private <R> R secureOperationByProject(String operationName, String project, Obj
private <R> R secureOperation(String operationName, String study, ObjectMap params, String token, VariantOperationFunction<R> operation)
throws CatalogException, StorageEngineException {
try (VariantStorageEngine variantStorageEngine = getVariantStorageEngineForStudyOperation(study, params, token)) {
return secureTool(operationName, true, params, token, variantStorageEngine, operation);
return secureTool(operationName, true, study, params, token, variantStorageEngine, operation);
} catch (IOException e) {
throw new StorageEngineException("Error closing the VariantStorageEngine", e);
}
Expand All @@ -1199,7 +1213,7 @@ private <R> R secureOperation(String operationName, String study, ObjectMap para
private <R> R secureAnalysis(String operationName, String study, ObjectMap params, String token, VariantOperationFunction<R> operation)
throws CatalogException, StorageEngineException {
try (VariantStorageEngine variantStorageEngine = getVariantStorageEngineForStudyOperation(study, params, token)) {
return secureTool(operationName, false, params, token, variantStorageEngine, operation);
return secureTool(operationName, false, study, params, token, variantStorageEngine, operation);
} catch (IOException e) {
throw new StorageEngineException("Error closing the VariantStorageEngine", e);
}
Expand All @@ -1221,7 +1235,7 @@ private <R> R secureOperationByProject(String operationName, String projectStr,
return secureOperationByProject(operationName, projectStr, params, token, operation);
}

private <R> R secureTool(String toolId, boolean isOperation, ObjectMap params, String token,
private <R> R secureTool(String toolId, boolean isOperation, String study, ObjectMap params, String token,
VariantStorageEngine variantStorageEngine, VariantOperationFunction<R> operation)
throws CatalogException, StorageEngineException {

Expand All @@ -1241,6 +1255,15 @@ private <R> R secureTool(String toolId, boolean isOperation, ObjectMap params, S
throw new StorageEngineException("Unable to execute operation '" + toolId + "'. "
+ "The storage engine is in mode=" + storageConfiguration.getMode());
}
if (isOperation && study != null && !VariantSetupOperationManager.ID.equals(toolId)) {
// Ensure that the variant setup has been executed
// do not check for the setup operation itself
// Project level operations can not be checked for setup.
if (!hasVariantSetup(study, token)) {
throw new StorageEngineException("Unable to execute operation '" + toolId + "'. "
+ "The variant storage has not been setup for study '" + study + "'");
}
}
result = operation.apply(variantStorageEngine);
return result;
} catch (CatalogException | StorageEngineException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package org.opencb.opencga.analysis.variant.manager.operations;

import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.opencga.analysis.variant.manager.VariantStorageManager;
import org.opencb.opencga.catalog.db.api.StudyDBAdaptor;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.core.common.TimeUtils;
import org.opencb.opencga.core.models.study.Study;
import org.opencb.opencga.core.models.study.VariantSetupResult;
import org.opencb.opencga.core.models.variant.VariantSetupParams;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VariantSetupOperationManager extends OperationManager {


public static final String ID = "variant-setup";
private static Logger logger = LoggerFactory.getLogger(VariantSetupOperationManager.class);

public VariantSetupOperationManager(VariantStorageManager variantStorageManager, VariantStorageEngine variantStorageEngine) {
super(variantStorageManager, variantStorageEngine);
}

public VariantSetupResult setup(String studyFqn, VariantSetupParams params, String token)
throws CatalogException, StorageEngineException {
// Copy params to avoid modifying input object
params = new VariantSetupParams(params);
check(studyFqn, params, token);

VariantSetupResult result = new VariantSetupResult();
result.setDate(TimeUtils.getTime());
result.setUserId(catalogManager.getUserManager().getUserIdContextStudy(studyFqn, token));
result.setParams(params.toObjectMap());
result.setStatus(VariantSetupResult.Status.READY);

inferParams(params);

ObjectMap options = variantStorageEngine.inferConfigurationParams(params);
result.setOptions(options);

catalogManager.getStudyManager().setVariantEngineSetupOptions(studyFqn, result, token);

return result;
}

/**
* Infer some parameters from others.
* - averageFileSize inferred from fileType
* - samplesPerFile inferred from dataDistribution or expectedSamplesNumber and expectedFilesNumber
* - numberOfVariantsPerSample inferred from fileType
* @param params params to infer
*/
private void inferParams(VariantSetupParams params) {
if (params.getFileType() != null) {
switch (params.getFileType()) {
case GENOME_gVCF:
if (params.getAverageFileSize() == null) {
params.setAverageFileSize("1GiB");
}
if (params.getVariantsPerSample() == null) {
params.setVariantsPerSample(5000000);
}
break;
case GENOME_VCF:
if (params.getAverageFileSize() == null) {
params.setAverageFileSize("500MiB");
}
if (params.getVariantsPerSample() == null) {
params.setVariantsPerSample(5000000);
}
break;
case EXOME:
if (params.getAverageFileSize() == null) {
params.setAverageFileSize("100MiB");
}
if (params.getVariantsPerSample() == null) {
params.setVariantsPerSample(100000);
}
break;
default:
throw new IllegalArgumentException("Unknown fileType " + params.getFileType());
}
}
// Unable to tell. Use a default value for numberOfVariantsPerSample
if (params.getVariantsPerSample() == null) {
params.setVariantsPerSample(5000000);
}

if (params.getAverageSamplesPerFile() == null) {
if (params.getDataDistribution() == null) {
params.setAverageSamplesPerFile(params.getExpectedSamples().floatValue() / params.getExpectedFiles().floatValue());
} else {
switch (params.getDataDistribution()) {
case SINGLE_SAMPLE_PER_FILE:
params.setAverageSamplesPerFile(1f);
break;
case MULTIPLE_SAMPLES_PER_FILE:
params.setAverageSamplesPerFile(params.getExpectedSamples().floatValue() / params.getExpectedFiles().floatValue());
break;
case MULTIPLE_FILES_PER_SAMPLE:
// Hard to tell. Let's assume 2 samples per file
params.setAverageSamplesPerFile(2f);
break;
case FILES_SPLIT_BY_CHROMOSOME:
case FILES_SPLIT_BY_REGION:
params.setAverageSamplesPerFile(params.getExpectedSamples().floatValue());
break;
default:
throw new IllegalArgumentException("Unknown dataDistribution " + params.getDataDistribution());
}
}
}
}

private void check(String studyStr, VariantSetupParams params, String token) throws CatalogException, StorageEngineException {
Study study = catalogManager.getStudyManager().get(studyStr,
new QueryOptions(QueryOptions.INCLUDE, StudyDBAdaptor.QueryParams.INTERNAL_CONFIGURATION_VARIANT_ENGINE.key()), token)
.first();

VariantStorageMetadataManager metadataManager = variantStorageEngine.getMetadataManager();
if (metadataManager.studyExists(studyStr)) {
int studyId = metadataManager.getStudyId(studyStr);
if (!metadataManager.getIndexedFiles(studyId).isEmpty()) {
throw new IllegalArgumentException("Unable to execute variant-setup on study '" + studyStr + "'. "
+ "It already has indexed files.");
}
}
if (hasVariantSetup(study)) {
logger.info("Study {} was already setup. Re executing variant-setup", studyStr);
}

if (params.getExpectedFiles() == null || params.getExpectedFiles() <= 0) {
throw new IllegalArgumentException("Missing expectedFiles");
}
if (params.getExpectedSamples() == null || params.getExpectedSamples() <= 0) {
throw new IllegalArgumentException("Missing expectedSamples");
}

if (params.getAverageFileSize() == null && params.getFileType() == null) {
throw new IllegalArgumentException("Missing averageFileSize or fileType");
}
}

public static boolean hasVariantSetup(Study study) {
boolean hasSetup = false;
VariantSetupResult setup = getVariantSetupResult(study);
if (setup != null && setup.getStatus() == VariantSetupResult.Status.READY) {
hasSetup = true;
}
return hasSetup;
}

private static VariantSetupResult getVariantSetupResult(Study study) {
if (study.getInternal() != null
&& study.getInternal().getConfiguration() != null
&& study.getInternal().getConfiguration().getVariantEngine() != null) {
return study.getInternal().getConfiguration().getVariantEngine().getSetup();
}
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ protected void check() throws Exception {
params.putIfNotEmpty(VariantStorageOptions.INCLUDE_GENOTYPE.key(), indexParams.getIncludeGenotypes());
params.put(VariantStorageOptions.STATS_AGGREGATION.key(), indexParams.getAggregated());
params.putIfNotEmpty(VariantStorageOptions.STATS_AGGREGATION_MAPPING_FILE.key(), indexParams.getAggregationMappingFile());
params.put(VariantStorageOptions.GVCF.key(), indexParams.isGvcf());
if (indexParams.isGvcf()) {
params.put(VariantStorageOptions.GVCF.key(), indexParams.isGvcf());
}

// queryOptions.putIfNotNull(VariantFileIndexerStorageOperation.TRANSFORMED_FILES, indexParams.transformedPaths);

Expand All @@ -92,7 +94,9 @@ protected void check() throws Exception {
params.put(VariantStorageOptions.FAMILY.key(), indexParams.isFamily());
params.put(VariantStorageOptions.SOMATIC.key(), indexParams.isSomatic());
params.putIfNotEmpty(VariantStorageOptions.LOAD_SPLIT_DATA.key(), indexParams.getLoadSplitData());
params.put(VariantStorageOptions.LOAD_MULTI_FILE_DATA.key(), indexParams.isLoadMultiFileData());
if (indexParams.isLoadMultiFileData()) {
params.put(VariantStorageOptions.LOAD_MULTI_FILE_DATA.key(), indexParams.isLoadMultiFileData());
}
params.putIfNotEmpty(VariantStorageOptions.LOAD_SAMPLE_INDEX.key(), indexParams.getLoadSampleIndex());
params.putIfNotEmpty(VariantStorageOptions.LOAD_ARCHIVE.key(), indexParams.getLoadArchive());
params.putIfNotEmpty(VariantStorageOptions.LOAD_HOM_REF.key(), indexParams.getLoadHomRef());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.opencb.biodata.models.variant.avro.SequenceOntologyTerm;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.opencga.analysis.variant.OpenCGATestExternalResource;
import org.opencb.opencga.analysis.variant.manager.VariantOperationsTest;
import org.opencb.opencga.analysis.variant.manager.VariantStorageManager;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.catalog.managers.AbstractClinicalManagerTest;
Expand Down Expand Up @@ -56,11 +57,12 @@ public static AbstractClinicalManagerTest getClinicalTest(OpenCGATestExternalRes
.append(VariantStorageOptions.ANNOTATE.key(), true)
.append(VariantStorageOptions.STATS_CALCULATE.key(), false);

VariantStorageManager variantStorageManager = new VariantStorageManager(opencga.getCatalogManager(), opencga.getStorageEngineFactory());
VariantStorageManager variantStorageManager = opencga.getVariantStorageManager();

Path outDir = Paths.get("target/test-data").resolve("junit_clinical_analysis_" + RandomStringUtils.randomAlphabetic(10));
Files.createDirectories(outDir);

VariantOperationsTest.dummyVariantSetup(variantStorageManager, clinicalTest.studyFqn, clinicalTest.token);
variantStorageManager.index(clinicalTest.studyFqn, "family.vcf", outDir.toString(), storageOptions, clinicalTest.token);
variantStorageManager.index(clinicalTest.studyFqn, "exomiser.vcf.gz", outDir.toString(), storageOptions, clinicalTest.token);
variantStorageManager.index(clinicalTest.studyFqn, "HG004.1k.vcf.gz", outDir.toString(), storageOptions, clinicalTest.token);
Expand Down
Loading

0 comments on commit 8bc7529

Please sign in to comment.