Skip to content

Commit

Permalink
- Create endpoint for writing data to file
Browse files Browse the repository at this point in the history
- Create service for writing out to file
- Add configuration to specify where to write
- Add PIC-SURE uuid to query object
  - This is how we namespace file writes
- Make abstract processor intersect ID sets more proactively
  - Memory issues
- Most of this work will be done by another service, but
  • Loading branch information
Luke Sikina committed Jan 8, 2024
1 parent 2efc3ee commit 35fdd58
Show file tree
Hide file tree
Showing 14 changed files with 560 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public Query(Query query) {
this.fields = new ArrayList<String>(query.fields);
this.requiredFields = new ArrayList<String>(query.requiredFields);
this.anyRecordOf = new ArrayList<String>(query.anyRecordOf);
this.numericFilters = new TreeMap<String, DoubleFilter>(query.numericFilters);
this.numericFilters = new TreeMap<String, Filter.DoubleFilter>(query.numericFilters);
this.categoryFilters = new TreeMap<String, String[]>(query.categoryFilters);
this.variantInfoFilters = new ArrayList<VariantInfoFilter>();
if (query.variantInfoFilters != null) {
Expand All @@ -30,6 +30,7 @@ public Query(Query query) {
});
}
this.id = query.id;
this.picSureId = query.picSureId;
}

private ResultType expectedResultType = ResultType.COUNT;
Expand All @@ -38,11 +39,12 @@ public Query(Query query) {
private List<String> requiredFields = new ArrayList<>();
private List<String> anyRecordOf = new ArrayList<>();
private List<List<String>> anyRecordOfMulti = new ArrayList<>();
private Map<String, DoubleFilter> numericFilters = new HashMap<>();
private Map<String, Filter.DoubleFilter> numericFilters = new HashMap<>();
private Map<String, String[]> categoryFilters = new HashMap<>();
private List<VariantInfoFilter> variantInfoFilters = new ArrayList<>();
private String id;

private String picSureId;

public ResultType getExpectedResultType() {
return expectedResultType;
Expand Down Expand Up @@ -72,7 +74,7 @@ public List<List<String>> getAllAnyRecordOf() {
return anyRecordOfMultiCopy;
}

public Map<String, DoubleFilter> getNumericFilters() {
public Map<String, Filter.DoubleFilter> getNumericFilters() {
return numericFilters;
}

Expand Down Expand Up @@ -111,7 +113,7 @@ public void setAnyRecordOfMulti(Collection<List<String>> anyRecordOfMulti) {
this.anyRecordOfMulti = anyRecordOfMulti != null ? new ArrayList<>(anyRecordOfMulti) : new ArrayList<>();
}

public void setNumericFilters(Map<String, DoubleFilter> numericFilters) {
public void setNumericFilters(Map<String, Filter.DoubleFilter> numericFilters) {
this.numericFilters = numericFilters != null ? new HashMap<>(numericFilters) : new HashMap<>();
}

Expand All @@ -127,19 +129,27 @@ public void setId(String id) {
this.id = id;
}

public String getPicSureId() {
return picSureId;
}

public void setPicSureId(String picSureId) {
this.picSureId = picSureId;
}

public static class VariantInfoFilter {
public VariantInfoFilter() {

}

public VariantInfoFilter(VariantInfoFilter filter) {
this.numericVariantInfoFilters = new TreeMap<String, FloatFilter>(filter.numericVariantInfoFilters);
this.categoryVariantInfoFilters = new TreeMap<String, String[]>(filter.categoryVariantInfoFilters);
this.numericVariantInfoFilters = new TreeMap<>(filter.numericVariantInfoFilters);
this.categoryVariantInfoFilters = new TreeMap<>(filter.categoryVariantInfoFilters);
}

public Map<String, FloatFilter> numericVariantInfoFilters;
public Map<String, Filter.FloatFilter> numericVariantInfoFilters;
public Map<String, String[]> categoryVariantInfoFilters;

public String toString() {
StringBuilder builder = new StringBuilder();
writePartFormat("Numeric Variant Info Filters", numericVariantInfoFilters, builder);
Expand Down Expand Up @@ -207,7 +217,7 @@ public String toString() {

return builder.toString();
}

/**
* For some elements of the query, we will iterate over the list of items and send them each to the string builder
* @param queryPart
Expand All @@ -218,7 +228,7 @@ public String toString() {
private static void writePartFormat(String queryPart, Collection items, StringBuilder builder, boolean allowRollup) {
final Collection collectionToWrite = Optional.ofNullable(items).orElseGet(Collections::emptyList);
//same beginning
builder.append(queryPart + ": [");
builder.append(queryPart + ": [");
//if there are many elements, we want to truncate the display
if(allowRollup && collectionToWrite.size() > 5) {
builder.append("\n");
Expand All @@ -233,17 +243,17 @@ private static void writePartFormat(String queryPart, Collection items, StringBu
//same ending
builder.append("]\n");
}

@SuppressWarnings("rawtypes")
private static void showTopLevelValues(Collection varList, StringBuilder builder) {

Map<String, Integer> countMap = new HashMap<String, Integer>();

for(Object var : varList) {
if(var instanceof String) {
int index = ((String) var).startsWith("\\") ? 1 : 0;
String firstLevel = ((String)var).split("\\\\")[index];

Integer count = countMap.get(firstLevel);
if(count == null) {
count = 1;
Expand All @@ -255,7 +265,7 @@ private static void showTopLevelValues(Collection varList, StringBuilder builder
System.out.println("Object is not string! " + var);
}
}

for(String key : countMap.keySet()) {
builder.append("\t" + countMap.get(key) + " values under " + key + "\n");
}
Expand All @@ -275,12 +285,12 @@ private static void writePartFormat(String queryPart, Map varMap, StringBuilder
}

//for the mapped elements, we never want to roll up the values; always show
builder.append(queryPart + ": [");
builder.append(queryPart + ": [");
String sep1 = "";
for(Object key : varMap.keySet()) {
builder.append(sep1 + key + ": ");
Object value = varMap.get(key);

if(value instanceof Object[]) {
builder.append("{");
String sep2 = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;


@Component
public class AbstractProcessor {
Expand Down Expand Up @@ -187,6 +190,16 @@ protected Set<Integer> applyBooleanLogic(List<Set<Integer>> filteredIdSets) {
return ids[0];
}

private class IntersectingSetContainer<T> {
Set<T> set = null;
boolean initialized = false;

void intersect(@NotNull Set<T> toIntersect) {
initialized = true;
set = set == null ? toIntersect : Sets.intersection(set, toIntersect);
}
}

/**
* For each filter in the query, return a set of patient ids that match. The order of these sets in the
* returned list of sets does not matter and cannot currently be tied back to the filter that generated
Expand All @@ -196,26 +209,27 @@ protected Set<Integer> applyBooleanLogic(List<Set<Integer>> filteredIdSets) {
* @return
*/
protected List<Set<Integer>> idSetsForEachFilter(Query query) {
final ArrayList<Set<Integer>> filteredIdSets = new ArrayList<>();
IntersectingSetContainer<Integer> ids = new IntersectingSetContainer<>();

try {
query.getAllAnyRecordOf().forEach(anyRecordOfFilterList -> {
addIdSetsForAnyRecordOf(anyRecordOfFilterList, filteredIdSets);
});
addIdSetsForRequiredFields(query, filteredIdSets);
addIdSetsForNumericFilters(query, filteredIdSets);
addIdSetsForCategoryFilters(query, filteredIdSets);
for (List<String> anyRecordOfFilterList : query.getAllAnyRecordOf()) {
ids = addIdSetsForAnyRecordOf(anyRecordOfFilterList, ids);
}
ids = addIdSetsForRequiredFields(query, ids);
ids = addIdSetsForNumericFilters(query, ids);
ids = addIdSetsForCategoryFilters(query, ids);
} catch (InvalidCacheLoadException e) {
log.warn("Invalid query supplied: " + e.getLocalizedMessage());
filteredIdSets.add(new HashSet<>()); // if an invalid path is supplied, no patients should match.
return List.of(new HashSet<>());
}



//AND logic to make sure all patients match each filter
if(filteredIdSets.size()>1) {
List<Set<Integer>> processedFilteredIdSets = new ArrayList<>(List.of(applyBooleanLogic(filteredIdSets)));
return addIdSetsForVariantInfoFilters(query, processedFilteredIdSets);
if (ids.initialized) {
return addIdSetsForVariantInfoFilters(query, new ArrayList<>(List.of(ids.set)));
} else {
return addIdSetsForVariantInfoFilters(query, filteredIdSets);
return addIdSetsForVariantInfoFilters(query, new ArrayList<>());
}
}

Expand Down Expand Up @@ -249,22 +263,23 @@ public TreeSet<Integer> getPatientSubsetForQuery(Query query) {
return idList;
}

private void addIdSetsForRequiredFields(Query query, ArrayList<Set<Integer>> filteredIdSets) {
private IntersectingSetContainer<Integer> addIdSetsForRequiredFields(Query query, IntersectingSetContainer<Integer> filteredIdSets) {
if(!query.getRequiredFields().isEmpty()) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
filteredIdSets.addAll(query.getRequiredFields().parallelStream().map(path->{
if(VariantUtils.pathIsVariantSpec(path)) {
query.getRequiredFields().parallelStream().map(path -> {
if (VariantUtils.pathIsVariantSpec(path)) {
TreeSet<Integer> patientsInScope = new TreeSet<>();
addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1","1/1"}, path, patientsInScope, bucketCache);
addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1", "1/1"}, path, patientsInScope, bucketCache);
return patientsInScope;
} else {
return new TreeSet<Integer>(getCube(path).keyBasedIndex());
return (Set<Integer>) new TreeSet<Integer>(getCube(path).keyBasedIndex());
}
}).collect(Collectors.toSet()));
}).forEach(filteredIdSets::intersect);
}
return filteredIdSets;
}

private void addIdSetsForAnyRecordOf(List<String> anyRecordOfFilters, ArrayList<Set<Integer>> filteredIdSets) {
private IntersectingSetContainer<Integer> addIdSetsForAnyRecordOf(List<String> anyRecordOfFilters, IntersectingSetContainer<Integer> filteredIdSets) {
if(!anyRecordOfFilters.isEmpty()) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
Set<Integer> anyRecordOfPatientSet = anyRecordOfFilters.parallelStream().flatMap(path -> {
Expand All @@ -281,35 +296,37 @@ private void addIdSetsForAnyRecordOf(List<String> anyRecordOfFilters, ArrayList<
}
}
}).collect(Collectors.toSet());
filteredIdSets.add(anyRecordOfPatientSet);
filteredIdSets.intersect(anyRecordOfPatientSet);
}
return filteredIdSets;
}

private void addIdSetsForNumericFilters(Query query, ArrayList<Set<Integer>> filteredIdSets) {
private IntersectingSetContainer<Integer> addIdSetsForNumericFilters(Query query, IntersectingSetContainer<Integer> filteredIdSets) {
if(!query.getNumericFilters().isEmpty()) {
filteredIdSets.addAll((Set<TreeSet<Integer>>)(query.getNumericFilters().entrySet().parallelStream().map(entry->{
return (TreeSet<Integer>)(getCube(entry.getKey()).getKeysForRange(entry.getValue().getMin(), entry.getValue().getMax()));
}).collect(Collectors.toSet())));
query.getNumericFilters().entrySet().parallelStream().map(entry-> {
return (Set<Integer>)(getCube(entry.getKey()).getKeysForRange(entry.getValue().getMin(), entry.getValue().getMax()));
}).forEach(filteredIdSets::intersect);
}
return filteredIdSets;
}

private void addIdSetsForCategoryFilters(Query query, ArrayList<Set<Integer>> filteredIdSets) {
if(!query.getCategoryFilters().isEmpty()) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
Set<Set<Integer>> idsThatMatchFilters = query.getCategoryFilters().entrySet().parallelStream().map(entry->{
Set<Integer> ids = new TreeSet<>();
if(VariantUtils.pathIsVariantSpec(entry.getKey())) {
addIdSetsForVariantSpecCategoryFilters(entry.getValue(), entry.getKey(), ids, bucketCache);
} else {
String[] categoryFilter = entry.getValue();
for(String category : categoryFilter) {
ids.addAll(getCube(entry.getKey()).getKeysForValue(category));
}
private IntersectingSetContainer<Integer> addIdSetsForCategoryFilters(
Query query, IntersectingSetContainer<Integer> startingIds
) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
query.getCategoryFilters().entrySet().parallelStream().map(entry->{
Set<Integer> ids = new TreeSet<>();
if(VariantUtils.pathIsVariantSpec(entry.getKey())) {
addIdSetsForVariantSpecCategoryFilters(entry.getValue(), entry.getKey(), ids, bucketCache);
} else {
String[] categoryFilter = entry.getValue();
for(String category : categoryFilter) {
ids.addAll(getCube(entry.getKey()).getKeysForValue(category));
}
return ids;
}).collect(Collectors.toSet());
filteredIdSets.addAll(idsThatMatchFilters);
}
}
return ids;
}).forEach(startingIds::intersect);
return startingIds;
}

private void addIdSetsForVariantSpecCategoryFilters(String[] zygosities, String key, Set<Integer> ids, VariantBucketHolder<VariantMasks> bucketCache) {
Expand Down Expand Up @@ -661,4 +678,12 @@ public String[] getPatientIds() {
public VariantMasks getMasks(String path, VariantBucketHolder<VariantMasks> variantMasksVariantBucketHolder) {
return variantService.getMasks(path, variantMasksVariantBucketHolder);
}

/**
* BE CAREFUL WITH THIS METHOD! NOT A TRUE INTERSECTION
* This intersects the set, but if the second set is null, it just returns the first
*/
private <T> Set<T> nullSafeIntersect(@NotNull Set<T> first, @Nullable Set<T> second) {
return second == null ? first : Sets.intersection(first, second);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

Expand Down Expand Up @@ -131,5 +132,9 @@ public void enqueue() {
public int compareTo(AsyncResult o) {
return this.query.getId().compareTo(o.query.getId());
}

public Path getTempFilePath() {
return stream.getTempFilePath();
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import java.io.*;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
Expand Down Expand Up @@ -225,4 +226,8 @@ public long estimatedSize() {
return tempFile.length();
}

public Path getTempFilePath() {
return Path.of(tempFile.getAbsolutePath());
}

}
Loading

0 comments on commit 35fdd58

Please sign in to comment.