diff --git a/pom.xml b/pom.xml index 56328deb5..2295bebdf 100644 --- a/pom.xml +++ b/pom.xml @@ -198,6 +198,7 @@ true test-mongo.properties + test-stats.properties opencga/conf/storage-mongodb.properties @@ -208,6 +209,7 @@ false test-mongo.properties + test-stats.properties opencga/conf/storage-mongodb.properties diff --git a/src/main/java/uk/ac/ebi/eva/commons/models/data/VariantStats.java b/src/main/java/uk/ac/ebi/eva/commons/models/data/VariantStats.java index 46a205fed..544678780 100644 --- a/src/main/java/uk/ac/ebi/eva/commons/models/data/VariantStats.java +++ b/src/main/java/uk/ac/ebi/eva/commons/models/data/VariantStats.java @@ -239,7 +239,7 @@ private Genotype normalizeGenotypeAlleles(Genotype g) { } } - void setGenotypesCount(Map genotypesCount) { + public void setGenotypesCount(Map genotypesCount) { this.genotypesCount = genotypesCount; } diff --git a/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/VariantDocument.java b/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/VariantDocument.java index 375c3c25d..6b95d4252 100644 --- a/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/VariantDocument.java +++ b/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/VariantDocument.java @@ -266,4 +266,8 @@ public Set getVariantStatsMongo() { public Set getAnnotations() { return annotations; } + + public void setStats(Set variantStats) { + this.variantStatsMongo = variantStats; + } } diff --git a/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java b/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java index 8d96ad103..4ddb8a235 100644 --- a/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java +++ b/src/main/java/uk/ac/ebi/eva/commons/models/mongo/entity/subdocuments/VariantSourceEntryMongo.java @@ -174,4 +174,19 @@ private BasicDBObject buildAttributes(Map attributes) { return attrs; } + public BasicDBObject getSampleData() { + return samp; + } + + public String getStudyId() { + return studyId; + } + + public String getFileId() { + return fileId; + } + + public String[] getAlternates() { + return alternates; + } } diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java index 8e7c7e843..dcacd0488 100644 --- a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/BeanNames.java @@ -26,10 +26,12 @@ public class BeanNames { public static final String VARIANT_ANNOTATION_READER = "variant-annotation-reader"; public static final String VARIANT_READER = "variant-reader"; public static final String ACCESSION_REPORT_READER = "accession-report-reader"; + public static final String VARIANT_STATS_READER = "variant-stats-reader"; public static final String VEP_ANNOTATION_PROCESSOR = "vep-annotation-processor"; public static final String ANNOTATION_PARSER_PROCESSOR = "annotation-parser-processor"; public static final String ANNOTATION_COMPOSITE_PROCESSOR = "annotation-composite-processor"; + public static final String VARIANT_STATS_PROCESSOR = "variant-stats-processor"; public static final String GENE_WRITER = "gene-writer"; public static final String ANNOTATION_WRITER = "annotation-writer"; @@ -37,6 +39,7 @@ public class BeanNames { public static final String COMPOSITE_ANNOTATION_VARIANT_WRITER = "composite-annotation-variant-writer"; public static final String VARIANT_WRITER = "variant-writer"; public static final String ACCESSION_IMPORTER = "accession-importer"; + public static final String VARIANT_STATS_WRITER = "variant-stats-writer"; public static final String ANNOTATION_SKIP_STEP_DECIDER = "annotation-skip-step-decider"; public static final String STATISTICS_SKIP_STEP_DECIDER = "statistics-skip-step-decider"; @@ -60,6 +63,7 @@ public class BeanNames { public static final String DROP_FILES_BY_STUDY_STEP = "drop-files-by-study-step"; public static final String LOAD_ANNOTATION_METADATA_STEP = "annotation-metadata-step"; public static final String ACCESSION_IMPORT_STEP = "accession-import-step"; + public static final String CALCULATE_AND_LOAD_STATISTICS_STEP = "calculate-load-statistics-step"; public static final String AGGREGATED_VCF_JOB = "aggregated-vcf-job"; public static final String ANNOTATE_VARIANTS_JOB = "annotate-variants-job"; @@ -69,4 +73,5 @@ public class BeanNames { public static final String CALCULATE_STATISTICS_JOB = "calculate-statistics-job"; public static final String DROP_STUDY_JOB = "drop-study-job"; public static final String ACCESSION_IMPORT_JOB = "accession-import-job"; + public static final String CALCULATE_AND_LOAD_STATISTICS_JOB = "calculate-load-statistics-job"; } diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/io/readers/VariantStatsReaderConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/io/readers/VariantStatsReaderConfiguration.java new file mode 100644 index 000000000..5ef46b40f --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/io/readers/VariantStatsReaderConfiguration.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.io.readers; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ItemStreamReader; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.core.MongoTemplate; +import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument; +import uk.ac.ebi.eva.pipeline.io.readers.VariantStatsReader; +import uk.ac.ebi.eva.pipeline.parameters.ChunkSizeParameters; +import uk.ac.ebi.eva.pipeline.parameters.DatabaseParameters; +import uk.ac.ebi.eva.pipeline.parameters.InputParameters; + +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_READER; + +@Configuration +public class VariantStatsReaderConfiguration { + + @Bean(VARIANT_STATS_READER) + @StepScope + public ItemStreamReader variantStatsReader(DatabaseParameters databaseParameters, + MongoTemplate mongoTemplate, + InputParameters inputParameters, + ChunkSizeParameters chunkSizeParameters) { + + return new VariantStatsReader(databaseParameters, mongoTemplate, inputParameters.getStudyId(), chunkSizeParameters.getChunkSize()); + } +} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/io/writers/VariantStatsWriterConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/io/writers/VariantStatsWriterConfiguration.java new file mode 100644 index 000000000..84fb2390e --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/io/writers/VariantStatsWriterConfiguration.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.io.writers; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ItemWriter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.mongodb.core.MongoTemplate; +import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument; +import uk.ac.ebi.eva.pipeline.io.writers.VariantStatsWriter; +import uk.ac.ebi.eva.pipeline.parameters.DatabaseParameters; + +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_WRITER; + +@Configuration +public class VariantStatsWriterConfiguration { + + @Bean(VARIANT_STATS_WRITER) + @StepScope + public ItemWriter variantStatsWriter(DatabaseParameters databaseParameters, MongoTemplate mongoTemplate) { + return new VariantStatsWriter(databaseParameters, mongoTemplate); + } +} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/CalculateAndLoadStatisticsJobConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/CalculateAndLoadStatisticsJobConfiguration.java new file mode 100644 index 000000000..ea85cefca --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/CalculateAndLoadStatisticsJobConfiguration.java @@ -0,0 +1,64 @@ +/* + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.jobs; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.JobBuilderFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Scope; +import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.CalculateAndLoadStatisticsStepConfiguration; +import uk.ac.ebi.eva.pipeline.parameters.NewJobIncrementer; + +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.CALCULATE_AND_LOAD_STATISTICS_JOB; +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.CALCULATE_AND_LOAD_STATISTICS_STEP; + +/** + * Configuration to run a full Statistics job: variantStatsFlow: statsCreate --> statsLoad + *

+ * TODO add a new PopulationStatisticsJobParametersValidator + */ +@Configuration +@EnableBatchProcessing +@Import({CalculateAndLoadStatisticsStepConfiguration.class}) +public class CalculateAndLoadStatisticsJobConfiguration { + + private static final Logger logger = LoggerFactory.getLogger(CalculateAndLoadStatisticsJobConfiguration.class); + + @Autowired + @Qualifier(CALCULATE_AND_LOAD_STATISTICS_STEP) + private Step calculateAndLoadStatisticsStep; + + @Bean(CALCULATE_AND_LOAD_STATISTICS_JOB) + @Scope("prototype") + public Job calculateAndLoadStatisticsJob(JobBuilderFactory jobBuilderFactory) { + logger.debug("Building '" + CALCULATE_AND_LOAD_STATISTICS_JOB + "'"); + + return jobBuilderFactory + .get(CALCULATE_AND_LOAD_STATISTICS_JOB) + .incrementer(new NewJobIncrementer()) + .start(calculateAndLoadStatisticsStep) + .build(); + } + +} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepConfiguration.java new file mode 100644 index 000000000..3aaa813e7 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepConfiguration.java @@ -0,0 +1,63 @@ +/* + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.jobs.steps; + +import org.springframework.batch.core.Step; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.batch.core.configuration.annotation.StepBuilderFactory; +import org.springframework.batch.core.step.tasklet.TaskletStep; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.batch.item.ItemStreamReader; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.policy.SimpleCompletionPolicy; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument; +import uk.ac.ebi.eva.pipeline.configuration.ChunkSizeCompletionPolicyConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.io.readers.VariantStatsReaderConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.io.writers.VariantStatsWriterConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.steps.processors.VariantStatsProcessorConfiguration; + +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.CALCULATE_AND_LOAD_STATISTICS_STEP; +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_PROCESSOR; +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_READER; +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_WRITER; + + +@Configuration +@EnableBatchProcessing +@Import({VariantStatsReaderConfiguration.class, VariantStatsWriterConfiguration.class, + VariantStatsProcessorConfiguration.class, ChunkSizeCompletionPolicyConfiguration.class}) +public class CalculateAndLoadStatisticsStepConfiguration { + + @Bean(CALCULATE_AND_LOAD_STATISTICS_STEP) + public Step calculateAndLoadStatisticsStep( + @Qualifier(VARIANT_STATS_READER) ItemStreamReader variantStatsReader, + @Qualifier(VARIANT_STATS_PROCESSOR) ItemProcessor variantStatsProcessor, + @Qualifier(VARIANT_STATS_WRITER) ItemWriter variantStatsWriter, + StepBuilderFactory stepBuilderFactory, + SimpleCompletionPolicy chunkSizeCompletionPolicy) { + TaskletStep step = stepBuilderFactory.get(CALCULATE_AND_LOAD_STATISTICS_STEP) + .chunk(chunkSizeCompletionPolicy) + .reader(variantStatsReader) + .processor(variantStatsProcessor) + .writer(variantStatsWriter) + .build(); + return step; + } +} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/processors/VariantStatsProcessorConfiguration.java b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/processors/VariantStatsProcessorConfiguration.java new file mode 100644 index 000000000..23b1b92fb --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/processors/VariantStatsProcessorConfiguration.java @@ -0,0 +1,35 @@ +/* + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.jobs.steps.processors; + +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument; +import uk.ac.ebi.eva.pipeline.io.processors.VariantStatsProcessor; + +import static uk.ac.ebi.eva.pipeline.configuration.BeanNames.VARIANT_STATS_PROCESSOR; + +@Configuration +public class VariantStatsProcessorConfiguration { + + @Bean(VARIANT_STATS_PROCESSOR) + @StepScope + public ItemProcessor variantStatsProcessor() { + return new VariantStatsProcessor(); + } +} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/io/processors/VariantStatsProcessor.java b/src/main/java/uk/ac/ebi/eva/pipeline/io/processors/VariantStatsProcessor.java new file mode 100644 index 000000000..6e510345e --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/io/processors/VariantStatsProcessor.java @@ -0,0 +1,186 @@ +package uk.ac.ebi.eva.pipeline.io.processors; + +import com.mongodb.BasicDBObject; +import org.opencb.biodata.models.feature.Genotype; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ItemProcessor; +import uk.ac.ebi.eva.commons.models.data.VariantStats; +import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument; +import uk.ac.ebi.eva.commons.models.mongo.entity.subdocuments.VariantSourceEntryMongo; +import uk.ac.ebi.eva.commons.models.mongo.entity.subdocuments.VariantStatsMongo; +import uk.ac.ebi.eva.pipeline.io.readers.VariantStatsReader; + +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +public class VariantStatsProcessor implements ItemProcessor { + private static final Logger logger = LoggerFactory.getLogger(VariantStatsProcessor.class); + private static final String GENOTYPE_COUNTS_MAP = "genotypeCountsMap"; + private static final String ALLELE_COUNTS_MAP = "alleleCountsMap"; + private static final String MISSING_GENOTYPE = "missingGenotype"; + private static final String MISSING_ALLELE = "missingAllele"; + private static final String DEFAULT_GENOTYPE = "def"; + private static final List MISSING_GENOTYPE_ALLELE_REPRESENTATIONS = Arrays.asList(".", "-1"); + + public VariantStatsProcessor() { + } + + @Override + public VariantDocument process(VariantDocument variant) { + Map filesIdNumberOfSamplesMap = VariantStatsReader.getFilesIdAndNumberOfSamplesMap(); + + String variantRef = variant.getReference(); + String variantAlt = variant.getAlternate(); + Set variantStatsSet = new HashSet<>(); + + Set variantSourceEntrySet = variant.getVariantSources(); + for (VariantSourceEntryMongo variantSourceEntry : variantSourceEntrySet) { + String studyId = variantSourceEntry.getStudyId(); + String fileId = variantSourceEntry.getFileId(); + + BasicDBObject sampleData = variantSourceEntry.getSampleData(); + if (sampleData == null || sampleData.isEmpty()) { + continue; + } + + VariantStats variantStats = getVariantStats(variantRef, variantAlt, variantSourceEntry.getAlternates(), sampleData, filesIdNumberOfSamplesMap.get(fileId)); + VariantStatsMongo variantStatsMongo = new VariantStatsMongo(studyId, fileId, "ALL", variantStats); + + variantStatsSet.add(variantStatsMongo); + } + + if (!variantStatsSet.isEmpty()) { + variant.setStats(variantStatsSet); + } + + return variant; + } + + public VariantStats getVariantStats(String variantRef, String variantAlt, String[] fileAlternates, BasicDBObject sampleData, int totalSamplesForFileId) { + Map> countsMap = getGenotypeAndAllelesCounts(sampleData, totalSamplesForFileId); + Map genotypeCountsMap = countsMap.get(GENOTYPE_COUNTS_MAP); + Map alleleCountsMap = countsMap.get(ALLELE_COUNTS_MAP); + + // Calculate Genotype Stats + int missingGenotypes = genotypeCountsMap.getOrDefault(MISSING_GENOTYPE, 0); + genotypeCountsMap.remove(MISSING_GENOTYPE); + Map genotypeCount = genotypeCountsMap.entrySet().stream() + .collect(Collectors.toMap(entry -> new Genotype(entry.getKey(), variantRef, variantAlt), entry -> entry.getValue())); + // find the minor genotype i.e. second highest entry in terms of counts + Optional> minorGenotypeEntry = genotypeCountsMap.entrySet().stream() + .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) + .skip(1) + .findFirst(); + String minorGenotype = ""; + float minorGenotypeFrequency = 0.0f; + if (minorGenotypeEntry.isPresent()) { + minorGenotype = minorGenotypeEntry.get().getKey(); + int totalGenotypes = genotypeCountsMap.values().stream().reduce(0, Integer::sum); + minorGenotypeFrequency = (float) minorGenotypeEntry.get().getValue() / totalGenotypes; + } + + + // Calculate Allele Stats + int missingAlleles = alleleCountsMap.getOrDefault(MISSING_ALLELE, 0); + alleleCountsMap.remove(MISSING_ALLELE); + // find the minor allele i.e. second highest entry in terms of counts + Optional> minorAlleleEntry = alleleCountsMap.entrySet().stream() + .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) + .skip(1) + .findFirst(); + String minorAllele = ""; + float minorAlleleFrequency = 0.0f; + if (minorAlleleEntry.isPresent()) { + int minorAlleleEntryCount = minorAlleleEntry.get().getValue(); + int totalAlleles = alleleCountsMap.values().stream().reduce(0, Integer::sum); + minorAlleleFrequency = (float) minorAlleleEntryCount / totalAlleles; + + String minorAlleleKey = alleleCountsMap.entrySet().stream() + .filter(entry -> entry.getValue().equals(minorAlleleEntryCount)) + .sorted(Map.Entry.comparingByKey()) + .findFirst() + .get() + .getKey(); + + minorAllele = minorAlleleKey.equals("0") ? variantRef : minorAlleleKey.equals("1") ? variantAlt : fileAlternates[Integer.parseInt(minorAlleleKey) - 2]; + } + + VariantStats variantStats = new VariantStats(); + variantStats.setRefAllele(variantRef); + variantStats.setAltAllele(variantAlt); + variantStats.setMissingGenotypes(missingGenotypes); + variantStats.setMgf(minorGenotypeFrequency); + variantStats.setMgfGenotype(minorGenotype); + variantStats.setGenotypesCount(genotypeCount); + variantStats.setMissingAlleles(missingAlleles); + variantStats.setMaf(minorAlleleFrequency); + variantStats.setMafAllele(minorAllele); + + return variantStats; + } + + private Map> getGenotypeAndAllelesCounts(BasicDBObject sampleData, int totalSamplesForFileId) { + Map> genotypeAndAllelesCountsMap = new HashMap<>(); + Map genotypeCountsMap = new HashMap<>(); + Map alleleCountsMap = new HashMap<>(); + + String defaultGenotype = ""; + for (Map.Entry entry : sampleData.entrySet()) { + String genotype = entry.getKey(); + if (genotype.equals(DEFAULT_GENOTYPE)) { + defaultGenotype = entry.getValue().toString(); + continue; + } + + int noOfSamples = ((List) entry.getValue()).size(); + String[] genotypeParts = genotype.split("\\||/"); + + if (Arrays.stream(genotypeParts).anyMatch(gp -> MISSING_GENOTYPE_ALLELE_REPRESENTATIONS.contains(gp))) { + genotypeCountsMap.put(MISSING_GENOTYPE, genotypeCountsMap.getOrDefault(MISSING_GENOTYPE, 0) + 1); + } else { + genotypeCountsMap.put(genotype, noOfSamples); + } + + for (String genotypePart : genotypeParts) { + if (MISSING_GENOTYPE_ALLELE_REPRESENTATIONS.contains(genotypePart)) { + alleleCountsMap.put(MISSING_ALLELE, alleleCountsMap.getOrDefault(MISSING_ALLELE, 0) + noOfSamples); + } else { + alleleCountsMap.put(genotypePart, alleleCountsMap.getOrDefault(genotypePart, 0) + noOfSamples); + } + } + } + + if (!defaultGenotype.isEmpty()) { + int defaultGenotypeCount = totalSamplesForFileId - genotypeCountsMap.values().stream().reduce(0, Integer::sum); + + String[] genotypeParts = defaultGenotype.split("\\||/"); + if (Arrays.stream(genotypeParts).anyMatch(gp -> MISSING_GENOTYPE_ALLELE_REPRESENTATIONS.contains(gp))) { + genotypeCountsMap.put(MISSING_GENOTYPE, genotypeCountsMap.getOrDefault(MISSING_GENOTYPE, 0) + 1); + } else { + genotypeCountsMap.put(defaultGenotype, defaultGenotypeCount); + } + + for (String genotypePart : genotypeParts) { + if (MISSING_GENOTYPE_ALLELE_REPRESENTATIONS.contains(genotypePart)) { + alleleCountsMap.put(MISSING_ALLELE, alleleCountsMap.getOrDefault(MISSING_ALLELE, 0) + defaultGenotypeCount); + } else { + alleleCountsMap.put(genotypePart, alleleCountsMap.getOrDefault(genotypePart, 0) + defaultGenotypeCount); + } + } + } + + genotypeAndAllelesCountsMap.put(GENOTYPE_COUNTS_MAP, genotypeCountsMap); + genotypeAndAllelesCountsMap.put(ALLELE_COUNTS_MAP, alleleCountsMap); + + return genotypeAndAllelesCountsMap; + } + +} diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java b/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java new file mode 100644 index 000000000..d845d0bd3 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/io/readers/VariantStatsReader.java @@ -0,0 +1,119 @@ +package uk.ac.ebi.eva.pipeline.io.readers; + +import com.mongodb.BasicDBObject; +import com.mongodb.client.FindIterable; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.model.Filters; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamException; +import org.springframework.batch.item.ItemStreamReader; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument; +import uk.ac.ebi.eva.commons.models.mongo.entity.subdocuments.VariantSourceEntryMongo; +import uk.ac.ebi.eva.pipeline.parameters.DatabaseParameters; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +import static com.mongodb.client.model.Aggregates.match; +import static com.mongodb.client.model.Aggregates.project; +import static com.mongodb.client.model.Projections.computed; +import static com.mongodb.client.model.Projections.fields; +import static java.util.Arrays.asList; + +public class VariantStatsReader implements ItemStreamReader { + private static final Logger logger = LoggerFactory.getLogger(VariantStatsReader.class); + + private DatabaseParameters databaseParameters; + private MongoTemplate mongoTemplate; + private MongoCursor cursor; + private MongoConverter converter; + private int chunkSize; + private String studyId; + + // Store the map of files to number of sample from the file_2_0 collection + private static Map filesIdNumberOfSamplesMap = new HashMap<>(); + + public VariantStatsReader(DatabaseParameters databaseParameters, MongoTemplate mongoTemplate, String studyId, int chunkSize) { + this.databaseParameters = databaseParameters; + this.mongoTemplate = mongoTemplate; + this.studyId = studyId; + this.chunkSize = chunkSize; + } + + @Override + public VariantDocument read() { + Document nextElement = cursor.tryNext(); + return (nextElement != null) ? getVariant(nextElement) : null; + } + + private VariantDocument getVariant(Document variantDocument) { + return converter.read(VariantDocument.class, new BasicDBObject(variantDocument)); + } + + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + initializeReader(); + } + + public void initializeReader() { + cursor = initializeCursor(); + converter = mongoTemplate.getConverter(); + if (filesIdNumberOfSamplesMap.isEmpty()) { + populateFilesIdAndNumberOfSamplesMap(); + } + } + + private MongoCursor initializeCursor() { + Bson query = Filters.elemMatch(VariantDocument.FILES_FIELD, Filters.eq(VariantSourceEntryMongo.STUDYID_FIELD, studyId)); + logger.info("Issuing find: {}", query); + + FindIterable statsVariantDocuments = getVariants(query); + return statsVariantDocuments.iterator(); + } + + private FindIterable getVariants(Bson query) { + return mongoTemplate.getCollection(databaseParameters.getCollectionVariantsName()) + .find(query) + .noCursorTimeout(true) + .batchSize(chunkSize); + } + + private void populateFilesIdAndNumberOfSamplesMap() { + Bson matchStage = match(Filters.eq("sid", studyId)); + Bson projectStage = project(fields( + computed("fid", "$fid"), + computed("numOfSamples", new Document("$size", new Document("$objectToArray", "$samp"))) + )); + filesIdNumberOfSamplesMap = mongoTemplate.getCollection(databaseParameters.getCollectionFilesName()) + .aggregate(asList(matchStage, projectStage)) + .into(new ArrayList<>()) + .stream() + .collect(Collectors.toMap(doc -> doc.getString("fid"), doc -> doc.getInteger("numOfSamples"))); + } + + public static Map getFilesIdAndNumberOfSamplesMap() { + return filesIdNumberOfSamplesMap; + } + + @Override + public void update(ExecutionContext executionContext) throws ItemStreamException { + + } + + @Override + public void close() throws ItemStreamException { + if (cursor != null) { + cursor.close(); + } + } +} + + diff --git a/src/main/java/uk/ac/ebi/eva/pipeline/io/writers/VariantStatsWriter.java b/src/main/java/uk/ac/ebi/eva/pipeline/io/writers/VariantStatsWriter.java new file mode 100644 index 000000000..6752dc6b4 --- /dev/null +++ b/src/main/java/uk/ac/ebi/eva/pipeline/io/writers/VariantStatsWriter.java @@ -0,0 +1,46 @@ +package uk.ac.ebi.eva.pipeline.io.writers; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.ItemWriter; +import org.springframework.data.mongodb.core.BulkOperations; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import uk.ac.ebi.eva.commons.models.mongo.entity.VariantDocument; +import uk.ac.ebi.eva.pipeline.parameters.DatabaseParameters; + +import java.util.List; + +public class VariantStatsWriter implements ItemWriter { + private static final Logger logger = LoggerFactory.getLogger(VariantStatsWriter.class); + private DatabaseParameters databaseParameters; + private MongoTemplate mongoTemplate; + + public VariantStatsWriter(DatabaseParameters databaseParameters, MongoTemplate mongoTemplate) { + this.databaseParameters = databaseParameters; + this.mongoTemplate = mongoTemplate; + } + + @Override + public void write(List variants) { + BulkOperations bulkOperations = mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED, VariantDocument.class, + databaseParameters.getCollectionVariantsName()); + for (VariantDocument variant : variants) { + if (variant.getVariantStatsMongo() == null || variant.getVariantStatsMongo().isEmpty()) { + continue; + } + Query query = new Query(Criteria.where("_id").is(variant.getId())); + Update update = new Update(); + update.set("st", variant.getVariantStatsMongo()); + + bulkOperations.updateOne(query, update); + } + + bulkOperations.execute(); + } + +} + + diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/PopulationStatisticsJobTest.java b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/PopulationStatisticsJobTest.java index d6ce5e36d..8a85d9979 100644 --- a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/PopulationStatisticsJobTest.java +++ b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/PopulationStatisticsJobTest.java @@ -15,6 +15,7 @@ */ package uk.ac.ebi.eva.pipeline.configuration.jobs; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -32,7 +33,6 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit4.SpringRunner; - import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration; import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration; import uk.ac.ebi.eva.test.rules.PipelineTemporaryFolderRule; @@ -60,6 +60,8 @@ public class PopulationStatisticsJobTest { private static final String MONGO_DUMP = "/dump/VariantStatsConfigurationTest_vl"; + private static final String DATABASE_NAME = "calculate_stats_test_db"; + @Rule public PipelineTemporaryFolderRule temporaryFolderRule = new PipelineTemporaryFolderRule(); @@ -73,6 +75,12 @@ public class PopulationStatisticsJobTest { @Before public void setUp() throws Exception { Config.setOpenCGAHome(GenotypedVcfJobTestUtils.getDefaultOpencgaHome()); + mongoRule.restoreDump(getResourceUrl(MONGO_DUMP), DATABASE_NAME); + } + + @After + public void cleanUp() { + mongoRule.getTemporaryDatabase(DATABASE_NAME).drop(); } @Test @@ -80,14 +88,13 @@ public void fullPopulationStatisticsJob() throws Exception { //Given a valid VCF input file String input = SMALL_VCF_FILE; String statsDir = temporaryFolderRule.getRoot().getPath(); - String dbName = mongoRule.restoreDumpInTemporaryDatabase(getResourceUrl(MONGO_DUMP)); String fileId = "1"; String studyId = "1"; JobParameters jobParameters = new EvaJobParameterBuilder() .collectionFilesName("files") .collectionVariantsName("variants") - .databaseName(dbName) + .databaseName(DATABASE_NAME) .inputStudyId(studyId) .inputVcf(getResource(input).getAbsolutePath()) .inputVcfAggregation("BASIC") @@ -107,7 +114,7 @@ public void fullPopulationStatisticsJob() throws Exception { // The DB docs should have the field "st" VariantStorageManager variantStorageManager = StorageManagerFactory.getVariantStorageManager(); - VariantDBAdaptor variantDBAdaptor = variantStorageManager.getDBAdaptor(dbName, null); + VariantDBAdaptor variantDBAdaptor = variantStorageManager.getDBAdaptor(DATABASE_NAME, null); VariantDBIterator iterator = variantDBAdaptor.iterator(new QueryOptions()); assertEquals(1, iterator.next().getSourceEntries().values().iterator().next().getCohortStats().size()); diff --git a/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepTest.java b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepTest.java new file mode 100644 index 000000000..f44291f47 --- /dev/null +++ b/src/test/java/uk/ac/ebi/eva/pipeline/configuration/jobs/steps/CalculateAndLoadStatisticsStepTest.java @@ -0,0 +1,125 @@ +/* + * Copyright 2024 EMBL - European Bioinformatics Institute + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package uk.ac.ebi.eva.pipeline.configuration.jobs.steps; + +import org.bson.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.test.JobLauncherTestUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.TestPropertySource; +import org.springframework.test.context.junit4.SpringRunner; +import uk.ac.ebi.eva.pipeline.configuration.BeanNames; +import uk.ac.ebi.eva.pipeline.configuration.MongoConfiguration; +import uk.ac.ebi.eva.pipeline.configuration.jobs.CalculateAndLoadStatisticsJobConfiguration; +import uk.ac.ebi.eva.test.configuration.BatchTestConfiguration; +import uk.ac.ebi.eva.test.configuration.TemporaryRuleConfiguration; +import uk.ac.ebi.eva.test.rules.PipelineTemporaryFolderRule; +import uk.ac.ebi.eva.test.rules.TemporaryMongoRule; +import uk.ac.ebi.eva.utils.EvaJobParameterBuilder; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static uk.ac.ebi.eva.test.utils.JobTestUtils.assertCompleted; +import static uk.ac.ebi.eva.test.utils.TestFileUtils.getResourceUrl; + +/** + * Test for {@link CalculateAndLoadStatisticsStepConfiguration} + */ +@RunWith(SpringRunner.class) +@TestPropertySource({"classpath:test-stats.properties"}) +@ContextConfiguration(classes = {CalculateAndLoadStatisticsJobConfiguration.class, BatchTestConfiguration.class, + TemporaryRuleConfiguration.class, MongoConfiguration.class}) +public class CalculateAndLoadStatisticsStepTest { + private static final String MONGO_DUMP = "/dump/VariantStatsConfigurationTest_vl"; + + private static final String COLLECTION_VARIANTS_NAME = "variants"; + + private static final String COLLECTION_FILES_NAME = "files"; + + private static final String DATABASE_NAME = "calculate_load_stats_test_db"; + + private static final String STUDY_ID = "1"; + + @Autowired + @Rule + public TemporaryMongoRule mongoRule; + + @Rule + public PipelineTemporaryFolderRule temporaryFolderRule = new PipelineTemporaryFolderRule(); + + @Autowired + private JobLauncherTestUtils jobLauncherTestUtils; + + @Before + public void setUp() throws Exception { + mongoRule.getTemporaryDatabase(DATABASE_NAME).drop(); + mongoRule.restoreDump(getResourceUrl(MONGO_DUMP), DATABASE_NAME); + } + + @After + public void cleanUp() { + mongoRule.getTemporaryDatabase(DATABASE_NAME).drop(); + } + + @Test + public void calculateAndLoadStatisticsStepShouldCalculateAndLoadStats() { + JobParameters jobParameters = new EvaJobParameterBuilder() + .collectionFilesName(COLLECTION_FILES_NAME) + .collectionVariantsName(COLLECTION_VARIANTS_NAME) + .databaseName(DATABASE_NAME) + .inputStudyId(STUDY_ID) + .chunkSize("100") + .toJobParameters(); + + JobExecution jobExecution = jobLauncherTestUtils.launchStep(BeanNames.CALCULATE_AND_LOAD_STATISTICS_STEP, jobParameters); + + // check job completed successfully + assertCompleted(jobExecution); + List documents = mongoRule.getTemporaryDatabase(DATABASE_NAME).getCollection(COLLECTION_VARIANTS_NAME) + .find().into(new ArrayList<>()); + Assert.assertTrue(documents.size() == 300); + // assert all statistics are calculated for all documents + Assert.assertTrue(documents.stream().allMatch(doc -> doc.containsKey("st"))); + + // assert statistics for the variant with 20_61098_C_T + ArrayList variantStatsList = documents.stream().filter(doc -> doc.get("_id").equals("20_61098_C_T")) + .findFirst().get().get("st", ArrayList.class); + assertEquals(1, variantStatsList.size()); + Document variantStats = variantStatsList.get(0); + Document numOfGT = (Document) variantStats.get("numGt"); + assertEquals(1290, numOfGT.get("0|0")); + assertEquals(417, numOfGT.get("1|0")); + assertEquals(573, numOfGT.get("0|1")); + assertEquals(224, numOfGT.get("1|1")); + assertEquals(0.2871405780315399, variantStats.get("maf")); + assertEquals(0.228833869099617, variantStats.get("mgf")); + assertEquals("T", variantStats.get("mafAl")); + assertEquals("0|1", variantStats.get("mgfGt")); + assertEquals(0, variantStats.get("missAl")); + assertEquals(0, variantStats.get("missGt")); + } + +} diff --git a/src/test/resources/test-stats.properties b/src/test/resources/test-stats.properties new file mode 100644 index 000000000..9d40d0c3b --- /dev/null +++ b/src/test/resources/test-stats.properties @@ -0,0 +1,15 @@ +db.collections.variants.name=variants +db.collections.files.name=files +db.collections.features.name=features +db.collections.stats.name=populationStatistics + +spring.data.mongodb.host=|eva.mongo.host.test| +spring.data.mongodb.port=27017 +spring.data.mongodb.password= +mongodb.read-preference=primary +spring.data.mongodb.authentication-mechanism=SCRAM-SHA-1 + +config.db.read-preference=primary + +# See https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-2.1-Release-Notes#bean-overriding +spring.main.allow-bean-definition-overriding=true \ No newline at end of file