Skip to content

Commit

Permalink
ALS-4978: Separate genomic processing logic from AbstractProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Oct 11, 2023
1 parent 3c88970 commit ebced5a
Show file tree
Hide file tree
Showing 19 changed files with 365 additions and 247 deletions.
4 changes: 2 additions & 2 deletions client-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
<parent>
<artifactId>pic-sure-hpds</artifactId>
<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<version>1.0-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>
</parent>

<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<artifactId>client-api</artifactId>
<version>1.0-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>

<name>client-api</name>
<!-- FIXME change it to the project's website -->
Expand Down
2 changes: 1 addition & 1 deletion common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>pic-sure-hpds</artifactId>
<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<version>1.0-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>
</parent>

<artifactId>common</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion data/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>pic-sure-hpds</artifactId>
<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<version>1.0-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>
</parent>

<artifactId>data</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion docker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>pic-sure-hpds</artifactId>
<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<version>1.0-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>
</parent>

<artifactId>docker</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion etl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<artifactId>pic-sure-hpds</artifactId>
<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<version>1.0-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>
</parent>

<artifactId>etl</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<artifactId>pic-sure-hpds</artifactId>
<version>1.0-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>pic-sure-hpds</name>
<modules>
Expand Down
2 changes: 1 addition & 1 deletion processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>pic-sure-hpds</artifactId>
<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<version>1.0-SNAPSHOT</version>
<version>3.0.0-SNAPSHOT</version>
</parent>

<artifactId>processing</artifactId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public int runCounts(Query query) {
* @return
*/
public int runObservationCount(Query query) {
TreeSet<Integer> patients = abstractProcessor.getPatientSubsetForQuery(query);
Set<Integer> patients = abstractProcessor.getPatientSubsetForQuery(query);
int[] observationCount = {0};
query.getFields().stream().forEach(field -> {
observationCount[0] += Arrays.stream(abstractProcessor.getCube(field).sortedByKey()).filter(keyAndValue->{
Expand All @@ -75,7 +75,7 @@ public int runObservationCount(Query query) {
*/
public Map<String, Integer> runObservationCrossCounts(Query query) {
TreeMap<String, Integer> counts = new TreeMap<>();
TreeSet<Integer> baseQueryPatientSet = abstractProcessor.getPatientSubsetForQuery(query);
Set<Integer> baseQueryPatientSet = abstractProcessor.getPatientSubsetForQuery(query);
query.getCrossCountFields().parallelStream().forEach((String concept)->{
try {
//breaking these statements to allow += operator to cast long to int.
Expand All @@ -100,7 +100,7 @@ public Map<String, Integer> runObservationCrossCounts(Query query) {
*/
public Map<String, Integer> runCrossCounts(Query query) {
TreeMap<String, Integer> counts = new TreeMap<>();
TreeSet<Integer> baseQueryPatientSet = abstractProcessor.getPatientSubsetForQuery(query);
Set<Integer> baseQueryPatientSet = abstractProcessor.getPatientSubsetForQuery(query);
query.getCrossCountFields().parallelStream().forEach((String concept)->{
try {
Query safeCopy = new Query();
Expand All @@ -121,7 +121,7 @@ public Map<String, Integer> runCrossCounts(Query query) {
*/
public Map<String, Map<String, Integer>> runCategoryCrossCounts(Query query) {
Map<String, Map<String, Integer>> categoryCounts = new TreeMap<>();
TreeSet<Integer> baseQueryPatientSet = abstractProcessor.getPatientSubsetForQuery(query);
Set<Integer> baseQueryPatientSet = abstractProcessor.getPatientSubsetForQuery(query);
query.getRequiredFields().parallelStream().forEach(concept -> {
Map<String, Integer> varCount = new TreeMap<>();;
TreeMap<String, TreeSet<Integer>> categoryMap = abstractProcessor.getCube(concept).getCategoryMap();
Expand Down Expand Up @@ -176,7 +176,7 @@ public Map<String, Map<String, Integer>> runCategoryCrossCounts(Query query) {
*/
public Map<String, Map<Double, Integer>> runContinuousCrossCounts(Query query) {
TreeMap<String, Map<Double, Integer>> conceptMap = new TreeMap<>();
TreeSet<Integer> baseQueryPatientSet = abstractProcessor.getPatientSubsetForQuery(query);
Set<Integer> baseQueryPatientSet = abstractProcessor.getPatientSubsetForQuery(query);
query.getNumericFilters().forEach((String concept, Filter.DoubleFilter range)-> {
KeyAndValue[] pairs = abstractProcessor.getCube(concept).getEntriesForValueRange(range.getMin(), range.getMax());
Map<Double, Integer> countMap = new TreeMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,51 @@

import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.*;

public class DistributableQuery {

private Query genomicQuery;
private List<Set<Integer>> phenotypicQueryPatientSets;
private List<Query.VariantInfoFilter> variantInfoFilters = new ArrayList<>();

public DistributableQuery() {
genomicQuery = new Query();
phenotypicQueryPatientSets = new ArrayList<>();
private Map<String, String[]> categoryFilters = new HashMap<>();

private List<String> requiredFields = new ArrayList<>();

private Set<Integer> patientIds;


public void addRequiredVariantField(String path) {
requiredFields.add(path);
}
public List<String> getRequiredFields() {
return requiredFields;
}

public Query getGenomicQuery() {
return genomicQuery;
public void addVariantSpecCategoryFilter(String key, String[] values) {
categoryFilters.put(key, values);
}
public Map<String, String[]> getCategoryFilters() {
return categoryFilters;
}

public List<Set<Integer>> getPhenotypicQueryPatientSets() {
return phenotypicQueryPatientSets;
public void setVariantInfoFilters(Collection<Query.VariantInfoFilter> variantInfoFilters) {
this.variantInfoFilters = variantInfoFilters != null ? new ArrayList<>(variantInfoFilters) : new ArrayList<>();
}
public List<Query.VariantInfoFilter> getVariantInfoFilters() {
return new ArrayList<>(variantInfoFilters);
}

public void addAndClausePatients(Set<Integer> patientSet) {
synchronized (patientSet) {
phenotypicQueryPatientSets.add(patientSet);
}

public DistributableQuery setPatientIds(Set<Integer> patientIds) {
this.patientIds = patientIds;
return this;
}

public void addRequiredVariantField(String path) {
synchronized (genomicQuery) {
genomicQuery.getRequiredFields().add(path);
}
public Set<Integer> getPatientIds() {
return patientIds;
}

public void addVariantSpecCategoryFilter(String key, String[] categoryFilters) {
synchronized (genomicQuery) {
genomicQuery.getCategoryFilters().put(key, categoryFilters);
}
public boolean hasFilters() {
return !variantInfoFilters.isEmpty() || !categoryFilters.isEmpty() || !requiredFields.isEmpty();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import com.google.common.collect.Range;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.FileBackedByteIndexedInfoStore;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Filter;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.*;
import java.math.BigInteger;
import java.util.*;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;

@Component
public class GenomicProcessor {

private static Logger log = LoggerFactory.getLogger(GenomicProcessor.class);

private final PatientVariantJoinHandler patientVariantJoinHandler;

private final VariantIndexCache variantIndexCache;

private final Map<String, FileBackedByteIndexedInfoStore> infoStores;

private final List<String> infoStoreColumns;

private final String genomicDataDirectory;

private final VariantService variantService;

@Autowired
public GenomicProcessor(PatientVariantJoinHandler patientVariantJoinHandler, VariantIndexCache variantIndexCache, VariantService variantService) {
this.patientVariantJoinHandler = patientVariantJoinHandler;
this.variantIndexCache = variantIndexCache;
this.variantService = variantService;

genomicDataDirectory = System.getProperty("HPDS_GENOMIC_DATA_DIRECTORY", "/opt/local/hpds/all/");

infoStores = new HashMap<>();
File genomicDataDirectory = new File(this.genomicDataDirectory);
if(genomicDataDirectory.exists() && genomicDataDirectory.isDirectory()) {
Arrays.stream(genomicDataDirectory.list((file, filename)->{return filename.endsWith("infoStore.javabin");}))
.forEach((String filename)->{
try (
FileInputStream fis = new FileInputStream(this.genomicDataDirectory + filename);
GZIPInputStream gis = new GZIPInputStream(fis);
ObjectInputStream ois = new ObjectInputStream(gis)
){
log.info("loading " + filename);
FileBackedByteIndexedInfoStore infoStore = (FileBackedByteIndexedInfoStore) ois.readObject();
infoStores.put(filename.replace("_infoStore.javabin", ""), infoStore);
ois.close();
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
});
}
infoStoreColumns = new ArrayList<>(infoStores.keySet());

variantIndexCache = new VariantIndexCache(variantService.getVariantIndex(), infoStores);
}

protected BigInteger getPatientMaskForVariantInfoFilters(DistributableQuery distributableQuery) {
// log.debug("filterdIDSets START size: " + filteredIdSets.size());
/* VARIANT INFO FILTER HANDLING IS MESSY */
if(!distributableQuery.getVariantInfoFilters().isEmpty()) {
for(Query.VariantInfoFilter filter : distributableQuery.getVariantInfoFilters()){
ArrayList<VariantIndex> variantSets = new ArrayList<>();
addVariantsMatchingFilters(filter, variantSets);
log.info("Found " + variantSets.size() + " groups of sets for patient identification");
//log.info("found " + variantSets.stream().mapToInt(Set::size).sum() + " variants for identification");
if(!variantSets.isEmpty()) {
// INTERSECT all the variant sets.
VariantIndex intersectionOfInfoFilters = variantSets.get(0);
for(VariantIndex variantSet : variantSets) {
intersectionOfInfoFilters = intersectionOfInfoFilters.intersection(variantSet);
}
// Apparently set.size() is really expensive with large sets... I just saw it take 17 seconds for a set with 16.7M entries
if(log.isDebugEnabled()) {
//IntSummaryStatistics stats = variantSets.stream().collect(Collectors.summarizingInt(set->set.size()));
//log.debug("Number of matching variants for all sets : " + stats.getSum());
//log.debug("Number of matching variants for intersection of sets : " + intersectionOfInfoFilters.size());
}
// add filteredIdSet for patients who have matching variants, heterozygous or homozygous for now.
return patientVariantJoinHandler.getPatientIdsForIntersectionOfVariantSets(distributableQuery.getPatientIds(), intersectionOfInfoFilters);
}
}
}
return createMaskForPatientSet(distributableQuery.getPatientIds());
/* END OF VARIANT INFO FILTER HANDLING */
}

public Set<Integer> patientMaskToPatientIdSet(BigInteger patientMask) {
Set<Integer> ids = new TreeSet<Integer>();
String bitmaskString = patientMask.toString(2);
for(int x = 2;x < bitmaskString.length()-2;x++) {
if('1'==bitmaskString.charAt(x)) {
String patientId = variantService.getPatientIds()[x-2].trim();
ids.add(Integer.parseInt(patientId));
}
}
return ids;
}

protected void addVariantsMatchingFilters(Query.VariantInfoFilter filter, ArrayList<VariantIndex> variantSets) {
// Add variant sets for each filter
if(filter.categoryVariantInfoFilters != null && !filter.categoryVariantInfoFilters.isEmpty()) {
filter.categoryVariantInfoFilters.entrySet().parallelStream().forEach((Map.Entry<String,String[]> entry) ->{
addVariantsMatchingCategoryFilter(variantSets, entry);
});
}
if(filter.numericVariantInfoFilters != null && !filter.numericVariantInfoFilters.isEmpty()) {
filter.numericVariantInfoFilters.forEach((String column, Filter.FloatFilter doubleFilter)->{
FileBackedByteIndexedInfoStore infoStore = getInfoStore(column);

doubleFilter.getMax();
Range<Float> filterRange = Range.closed(doubleFilter.getMin(), doubleFilter.getMax());
List<String> valuesInRange = infoStore.continuousValueIndex.getValuesInRange(filterRange);
VariantIndex variants = new SparseVariantIndex(Set.of());
for(String value : valuesInRange) {
variants = variants.union(variantIndexCache.get(column, value));
}
variantSets.add(variants);
});
}
}

private void addVariantsMatchingCategoryFilter(ArrayList<VariantIndex> variantSets, Map.Entry<String, String[]> entry) {
String column = entry.getKey();
String[] values = entry.getValue();
Arrays.sort(values);
FileBackedByteIndexedInfoStore infoStore = getInfoStore(column);

List<String> infoKeys = filterInfoCategoryKeys(values, infoStore);
/*
* We want to union all the variants for each selected key, so we need an intermediate set
*/
VariantIndex[] categoryVariantSets = new VariantIndex[] {new SparseVariantIndex(Set.of())};

if(infoKeys.size()>1) {
infoKeys.stream().forEach((key)->{
VariantIndex variantsForColumnAndValue = variantIndexCache.get(column, key);
categoryVariantSets[0] = categoryVariantSets[0].union(variantsForColumnAndValue);
});
} else {
categoryVariantSets[0] = variantIndexCache.get(column, infoKeys.get(0));
}
variantSets.add(categoryVariantSets[0]);
}

private List<String> filterInfoCategoryKeys(String[] values, FileBackedByteIndexedInfoStore infoStore) {
List<String> infoKeys = infoStore.getAllValues().keys().stream().filter((String key)->{
// iterate over the values for the specific category and find which ones match the search
int insertionIndex = Arrays.binarySearch(values, key);
return insertionIndex > -1 && insertionIndex < values.length;
}).collect(Collectors.toList());
log.info("found " + infoKeys.size() + " keys");
return infoKeys;
}

protected BigInteger createMaskForPatientSet(Set<Integer> patientSubset) {
return patientVariantJoinHandler.createMaskForPatientSet(patientSubset);
}

public FileBackedByteIndexedInfoStore getInfoStore(String column) {
return infoStores.get(column);
}

public VariantIndex addVariantsForInfoFilter(VariantIndex unionOfInfoFilters, Query.VariantInfoFilter filter) {
ArrayList<VariantIndex> variantSets = new ArrayList<>();
addVariantsMatchingFilters(filter, variantSets);

if(!variantSets.isEmpty()) {
VariantIndex intersectionOfInfoFilters = variantSets.get(0);
for(VariantIndex variantSet : variantSets) {
// log.info("Variant Set : " + Arrays.deepToString(variantSet.toArray()));
intersectionOfInfoFilters = intersectionOfInfoFilters.intersection(variantSet);
}
unionOfInfoFilters = unionOfInfoFilters.union(intersectionOfInfoFilters);
} else {
log.warn("No info filters included in query.");
}
return unionOfInfoFilters;
}

}
Loading

0 comments on commit ebced5a

Please sign in to comment.