Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save to file timeseries #95

Merged
merged 9 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;


@Component
public class AbstractProcessor {
Expand Down Expand Up @@ -187,6 +190,16 @@ protected Set<Integer> applyBooleanLogic(List<Set<Integer>> filteredIdSets) {
return ids[0];
}

private class IntersectingSetContainer<T> {
Set<T> set = null;
boolean initialized = false;

void intersect(@NotNull Set<T> toIntersect) {
initialized = true;
set = set == null ? toIntersect : Sets.intersection(set, toIntersect);
}
}

/**
* For each filter in the query, return a set of patient ids that match. The order of these sets in the
* returned list of sets does not matter and cannot currently be tied back to the filter that generated
Expand All @@ -196,26 +209,27 @@ protected Set<Integer> applyBooleanLogic(List<Set<Integer>> filteredIdSets) {
* @return
*/
protected List<Set<Integer>> idSetsForEachFilter(Query query) {
final ArrayList<Set<Integer>> filteredIdSets = new ArrayList<>();
IntersectingSetContainer<Integer> ids = new IntersectingSetContainer<>();

try {
query.getAllAnyRecordOf().forEach(anyRecordOfFilterList -> {
addIdSetsForAnyRecordOf(anyRecordOfFilterList, filteredIdSets);
});
addIdSetsForRequiredFields(query, filteredIdSets);
addIdSetsForNumericFilters(query, filteredIdSets);
addIdSetsForCategoryFilters(query, filteredIdSets);
for (List<String> anyRecordOfFilterList : query.getAllAnyRecordOf()) {
ids = addIdSetsForAnyRecordOf(anyRecordOfFilterList, ids);
}
ids = addIdSetsForRequiredFields(query, ids);
ids = addIdSetsForNumericFilters(query, ids);
ids = addIdSetsForCategoryFilters(query, ids);
} catch (InvalidCacheLoadException e) {
log.warn("Invalid query supplied: " + e.getLocalizedMessage());
filteredIdSets.add(new HashSet<>()); // if an invalid path is supplied, no patients should match.
return List.of(new HashSet<>());
}



//AND logic to make sure all patients match each filter
if(filteredIdSets.size()>1) {
List<Set<Integer>> processedFilteredIdSets = new ArrayList<>(List.of(applyBooleanLogic(filteredIdSets)));
return addIdSetsForVariantInfoFilters(query, processedFilteredIdSets);
if (ids.initialized) {
return addIdSetsForVariantInfoFilters(query, new ArrayList<>(List.of(ids.set)));
} else {
return addIdSetsForVariantInfoFilters(query, filteredIdSets);
return addIdSetsForVariantInfoFilters(query, new ArrayList<>());
}
}

Expand Down Expand Up @@ -249,22 +263,23 @@ public TreeSet<Integer> getPatientSubsetForQuery(Query query) {
return idList;
}

private void addIdSetsForRequiredFields(Query query, ArrayList<Set<Integer>> filteredIdSets) {
private IntersectingSetContainer<Integer> addIdSetsForRequiredFields(Query query, IntersectingSetContainer<Integer> filteredIdSets) {
if(!query.getRequiredFields().isEmpty()) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
filteredIdSets.addAll(query.getRequiredFields().parallelStream().map(path->{
if(VariantUtils.pathIsVariantSpec(path)) {
query.getRequiredFields().parallelStream().map(path -> {
if (VariantUtils.pathIsVariantSpec(path)) {
TreeSet<Integer> patientsInScope = new TreeSet<>();
addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1","1/1"}, path, patientsInScope, bucketCache);
addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1", "1/1"}, path, patientsInScope, bucketCache);
return patientsInScope;
} else {
return new TreeSet<Integer>(getCube(path).keyBasedIndex());
return (Set<Integer>) new TreeSet<Integer>(getCube(path).keyBasedIndex());
}
}).collect(Collectors.toSet()));
}).forEach(filteredIdSets::intersect);
}
return filteredIdSets;
}

private void addIdSetsForAnyRecordOf(List<String> anyRecordOfFilters, ArrayList<Set<Integer>> filteredIdSets) {
private IntersectingSetContainer<Integer> addIdSetsForAnyRecordOf(List<String> anyRecordOfFilters, IntersectingSetContainer<Integer> filteredIdSets) {
if(!anyRecordOfFilters.isEmpty()) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
Set<Integer> anyRecordOfPatientSet = anyRecordOfFilters.parallelStream().flatMap(path -> {
Expand All @@ -281,35 +296,37 @@ private void addIdSetsForAnyRecordOf(List<String> anyRecordOfFilters, ArrayList<
}
}
}).collect(Collectors.toSet());
filteredIdSets.add(anyRecordOfPatientSet);
filteredIdSets.intersect(anyRecordOfPatientSet);
}
return filteredIdSets;
}

private void addIdSetsForNumericFilters(Query query, ArrayList<Set<Integer>> filteredIdSets) {
private IntersectingSetContainer<Integer> addIdSetsForNumericFilters(Query query, IntersectingSetContainer<Integer> filteredIdSets) {
if(!query.getNumericFilters().isEmpty()) {
filteredIdSets.addAll((Set<TreeSet<Integer>>)(query.getNumericFilters().entrySet().parallelStream().map(entry->{
return (TreeSet<Integer>)(getCube(entry.getKey()).getKeysForRange(entry.getValue().getMin(), entry.getValue().getMax()));
}).collect(Collectors.toSet())));
query.getNumericFilters().entrySet().parallelStream().map(entry-> {
return (Set<Integer>)(getCube(entry.getKey()).getKeysForRange(entry.getValue().getMin(), entry.getValue().getMax()));
}).forEach(filteredIdSets::intersect);
}
return filteredIdSets;
}

private void addIdSetsForCategoryFilters(Query query, ArrayList<Set<Integer>> filteredIdSets) {
if(!query.getCategoryFilters().isEmpty()) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
Set<Set<Integer>> idsThatMatchFilters = query.getCategoryFilters().entrySet().parallelStream().map(entry->{
Set<Integer> ids = new TreeSet<>();
if(VariantUtils.pathIsVariantSpec(entry.getKey())) {
addIdSetsForVariantSpecCategoryFilters(entry.getValue(), entry.getKey(), ids, bucketCache);
} else {
String[] categoryFilter = entry.getValue();
for(String category : categoryFilter) {
ids.addAll(getCube(entry.getKey()).getKeysForValue(category));
}
private IntersectingSetContainer<Integer> addIdSetsForCategoryFilters(
Query query, IntersectingSetContainer<Integer> startingIds
) {
VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
query.getCategoryFilters().entrySet().parallelStream().map(entry->{
Set<Integer> ids = new TreeSet<>();
if(VariantUtils.pathIsVariantSpec(entry.getKey())) {
addIdSetsForVariantSpecCategoryFilters(entry.getValue(), entry.getKey(), ids, bucketCache);
} else {
String[] categoryFilter = entry.getValue();
for(String category : categoryFilter) {
ids.addAll(getCube(entry.getKey()).getKeysForValue(category));
}
return ids;
}).collect(Collectors.toSet());
filteredIdSets.addAll(idsThatMatchFilters);
}
}
return ids;
}).forEach(startingIds::intersect);
return startingIds;
}

private void addIdSetsForVariantSpecCategoryFilters(String[] zygosities, String key, Set<Integer> ids, VariantBucketHolder<VariantMasks> bucketCache) {
Expand Down Expand Up @@ -661,4 +678,12 @@ public String[] getPatientIds() {
public VariantMasks getMasks(String path, VariantBucketHolder<VariantMasks> variantMasksVariantBucketHolder) {
return variantService.getMasks(path, variantMasksVariantBucketHolder);
}

/**
* BE CAREFUL WITH THIS METHOD! NOT A TRUE INTERSECTION
* This intersects the set, but if the second set is null, it just returns the first
*/
private <T> Set<T> nullSafeIntersect(@NotNull Set<T> first, @Nullable Set<T> second) {
return second == null ? first : Sets.intersection(first, second);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import java.io.IOException;
import java.nio.file.Path;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

Expand Down Expand Up @@ -131,5 +132,9 @@ public void enqueue() {
public int compareTo(AsyncResult o) {
return this.query.getId().compareTo(o.query.getId());
}

public Path getTempFilePath() {
return stream.getTempFilePath();
}

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import java.io.*;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeMap;
Expand Down Expand Up @@ -225,4 +226,8 @@ public long estimatedSize() {
return tempFile.length();
}

public Path getTempFilePath() {
return Path.of(tempFile.getAbsolutePath());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public class TimeseriesProcessor implements HpdsProcessor {
private AbstractProcessor abstractProcessor;

private final String ID_CUBE_NAME;

private final int ID_BATCH_SIZE;

private final int CACHE_SIZE;

@Autowired
Expand Down Expand Up @@ -71,7 +73,6 @@ public void runQuery(Query query, AsyncResult result) {
} else {
throw new NotAuthorizedException("Data Export is not authorized for this system");
}
return;
}

/**
Expand All @@ -83,32 +84,28 @@ public void runQuery(Query query, AsyncResult result) {
* @throws IOException
*/
private void exportTimeData(Query query, AsyncResult result, TreeSet<Integer> idList) throws IOException {

Set<String> exportedConceptPaths = new HashSet<String>();
log.info("Starting export for time series data of query {} (HPDS ID {})", query.getPicSureId(), query.getId());
//get a list of all fields mentioned in the query; export all data associated with any included field
List<String> pathList = new LinkedList<String>();
Set<String> pathList = new HashSet<>();
pathList.addAll(query.getAnyRecordOf());
pathList.addAll(query.getFields());
pathList.addAll(query.getRequiredFields());
pathList.addAll(query.getCategoryFilters().keySet());
pathList.addAll(query.getNumericFilters().keySet());

addDataForConcepts(pathList, exportedConceptPaths, idList, result);

addDataForConcepts(pathList, idList, result);
log.info("Completed export for time series data of query {} (HPDS ID {})", query.getPicSureId(), query.getId());
}

private void addDataForConcepts(Collection<String> pathList, Set<String> exportedConceptPaths, TreeSet<Integer> idList, AsyncResult result) throws IOException {
private void addDataForConcepts(Set<String> pathList, TreeSet<Integer> idList, AsyncResult result) throws IOException {
for (String conceptPath : pathList) {
//skip concepts we may already have encountered
if(exportedConceptPaths.contains(conceptPath)) {
continue;
}
ArrayList<String[]> dataEntries = new ArrayList<String[]>();
PhenoCube<?> cube = abstractProcessor.getCube(conceptPath);
if(cube == null) {
log.warn("Attempting export of non-existant concept: " + conceptPath);
continue;
}
log.debug("Exporting " + conceptPath);
log.info("Exporting " + conceptPath);
List<?> valuesForKeys = cube.getValuesForKeys(idList);
for (Object kvObj : valuesForKeys) {
if (cube.isStringType()) {
Expand All @@ -131,7 +128,6 @@ private void addDataForConcepts(Collection<String> pathList, Set<String> exporte
}
}
result.stream.appendResults(dataEntries);
exportedConceptPaths.add(conceptPath);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;

import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType;
import edu.harvard.hms.dbmi.avillach.hpds.service.filesharing.FileSharingService;
import edu.harvard.hms.dbmi.avillach.hpds.service.util.Paginator;
import edu.harvard.hms.dbmi.avillach.hpds.service.util.QueryDecorator;
Expand Down Expand Up @@ -266,11 +267,26 @@ public Response queryResult(@PathParam("resourceQueryId") UUID queryId, QueryReq
public Response writeQueryResult(
@RequestBody() Query query, @PathParam("dataType") String datatype
) {
// query IDs within HPDS are a different concept that query IDs in PIC-SURE
// Generally, equivalent queries with different PIC-SURE query IDs will have the SAME
// HPDS query ID.
queryDecorator.setId(query);
AsyncResult result = queryService.getResultFor(query.getId());
if (query.getExpectedResultType() != ResultType.DATAFRAME_TIMESERIES) {
return Response
.status(400, "The write endpoint only writes time series dataframes. Fix result type.")
.build();
}
String hpdsQueryID;
try {
QueryStatus queryStatus = convertToQueryStatus(queryService.runQuery(query));
String status = queryStatus.getResourceStatus();
hpdsQueryID = queryStatus.getResourceResultId();
while ("RUNNING".equalsIgnoreCase(status) || "PENDING".equalsIgnoreCase(status)) {
Thread.sleep(10000); // Yea, this is not restful. Sorry.
status = convertToQueryStatus(queryService.getStatusFor(hpdsQueryID)).getResourceStatus();
}
} catch (ClassNotFoundException | IOException | InterruptedException e) {
log.warn("Error waiting for response", e);
return Response.serverError().build();
}

AsyncResult result = queryService.getResultFor(hpdsQueryID);
// the queryResult has this DIY retry logic that blocks a system thread.
// I'm not going to do that here. If the service can't find it, you get a 404.
// Retry it client side.
Expand All @@ -287,6 +303,7 @@ public Response writeQueryResult(
// at least for now, this is going to block until we finish writing
// Not very restful, but it will make this API very easy to consume
boolean success = false;
query.setId(hpdsQueryID);
if ("phenotypic".equals(datatype)) {
success = fileSystemService.createPhenotypicData(query);
} else if ("genomic".equals(datatype)) {
Expand Down Expand Up @@ -321,7 +338,7 @@ public Response queryFormat(QueryRequest resultRequest) {
public Response querySync(QueryRequest resultRequest) {
if (Crypto.hasKey(Crypto.DEFAULT_KEY_NAME)) {
try {
return _querySync(resultRequest);
return submitQueryAndWaitForCompletion(resultRequest);
} catch (IOException e) {
log.error("IOException caught: ", e);
return Response.serverError().build();
Expand Down Expand Up @@ -350,7 +367,7 @@ public PaginatedSearchResult<String> searchGenomicConceptValues(
return paginator.paginate(matchingValues, page, size);
}

private Response _querySync(QueryRequest resultRequest) throws IOException {
private Response submitQueryAndWaitForCompletion(QueryRequest resultRequest) throws IOException {
Query incomingQuery;
incomingQuery = convertIncomingQuery(resultRequest);
log.info("Query Converted");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import java.nio.file.Files;
import java.nio.file.Path;

import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;

@Service
public class FileSystemService {

Expand All @@ -26,6 +28,10 @@ public class FileSystemService {
private boolean enableFileSharing;

public boolean writeResultToFile(String fileName, AsyncResult result, String id) {
if (Files.exists(result.getTempFilePath())) {
LOG.info("A temp file already exists for query {}. Moving that rather than rewriting.", id);
return moveFile(fileName, result.getTempFilePath(), id);
}
result.stream.open();
return writeStreamToFile(fileName, result.stream, id);
}
Expand All @@ -34,6 +40,27 @@ public boolean writeResultToFile(String fileName, String result, String id) {
return writeStreamToFile(fileName, new ByteArrayInputStream(result.getBytes()), id);
}


private boolean moveFile(String destinationName, Path sourceFile, String queryId) {
if (!enableFileSharing) {
LOG.warn("Attempted to write query result to file while file sharing is disabled. No-op.");
return false;
}

Path dirPath = Path.of(sharingRoot.toString(), queryId);
Path filePath = Path.of(sharingRoot.toString(), queryId, destinationName);

try {
LOG.info("Moving query {} to file: {}", queryId, filePath);
makeDirIfDNE(dirPath);
Path result = Files.move(sourceFile, filePath, REPLACE_EXISTING);
return Files.exists(result);
} catch (IOException e) {
LOG.error("Error moving.", e);
return false;
}
}

private boolean writeStreamToFile(String fileName, InputStream content, String queryId) {
if (!enableFileSharing) {
LOG.warn("Attempted to write query result to file while file sharing is disabled. No-op.");
Expand Down
Loading