From 1cdfd3e41347f378682d4df6141ae7cf940e3fc9 Mon Sep 17 00:00:00 2001 From: ramari16 Date: Mon, 19 Aug 2024 07:35:11 -0400 Subject: [PATCH] ALS-6511: Add PFB result type (#116) --- .../dbmi/avillach/hpds/data/query/Query.java | 1 - .../avillach/hpds/data/query/ResultType.java | 14 +- pom.xml | 10 + processing/pom.xml | 10 + .../avillach/hpds/processing/AsyncResult.java | 163 ++++++++++++--- .../hpds/processing/PfbProcessor.java | 117 +++++++++++ .../hpds/processing/QueryProcessor.java | 6 +- .../avillach/hpds/processing/ResultStore.java | 2 +- .../hpds/processing/ResultStoreStream.java | 170 +++------------ .../hpds/processing/TimeseriesProcessor.java | 4 +- .../hpds/processing/io/CsvWriter.java | 73 +++++++ .../hpds/processing/io/PfbWriter.java | 195 ++++++++++++++++++ .../hpds/processing/io/ResultWriter.java | 21 ++ .../hpds/processing/io/PfbWriterTest.java | 59 ++++++ .../avillach/hpds/service/PicSureService.java | 43 ++-- .../avillach/hpds/service/QueryService.java | 82 +++++--- 16 files changed, 730 insertions(+), 240 deletions(-) create mode 100644 processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java create mode 100644 processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java create mode 100644 processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java create mode 100644 processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/ResultWriter.java create mode 100644 processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java diff --git a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/Query.java b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/Query.java index 8e28e74e..5a8a1fb0 100644 --- a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/Query.java +++ b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/Query.java @@ -179,7 +179,6 @@ public String toString() { writePartFormat("Observation Count Fields", fields, builder, true); break; case DATAFRAME: - case DATAFRAME_MERGED: case SECRET_ADMIN_DATAFRAME: writePartFormat("Data Export Fields", fields, builder, true); break; diff --git a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java index 4481190b..f7cd8165 100644 --- a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java +++ b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java @@ -49,12 +49,7 @@ public enum ResultType { * Return the number of observations for included patients and * included fields, broken up across the included cross count fields. */ - OBSERVATION_CROSS_COUNT, - /** - * This was developed for UDN, but is completely useless and should - * be deleted. - */ - DATAFRAME_MERGED, + OBSERVATION_CROSS_COUNT, /** * Not completely implemented and currently dead code. Someone with * statistics experience needs to develop a p-value based filter for @@ -94,5 +89,10 @@ public enum ResultType { * is suitable to time series analysis and/or loading into another * instance of HPDS. */ - DATAFRAME_TIMESERIES + DATAFRAME_TIMESERIES, + /** + * Exports data as PFB, using avro + * https://uc-cdis.github.io/pypfb/ + */ + DATAFRAME_PFB } diff --git a/pom.xml b/pom.xml index b2bbf69d..9754c284 100644 --- a/pom.xml +++ b/pom.xml @@ -310,6 +310,16 @@ 1.18.30 provided + + org.apache.avro + avro + 1.11.3 + + + org.xerial.snappy + snappy-java + 1.1.10.5 + diff --git a/processing/pom.xml b/processing/pom.xml index 66d9d1e3..df00b787 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -33,5 +33,15 @@ io.projectreactor.netty reactor-netty + + org.apache.avro + avro + + + + org.xerial.snappy + snappy-java + + diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java index e4434abf..7312b6f9 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AsyncResult.java @@ -1,9 +1,15 @@ package edu.harvard.hms.dbmi.avillach.hpds.processing; +import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; +import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter; +import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,11 +19,30 @@ import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType; import edu.harvard.hms.dbmi.avillach.hpds.exception.NotEnoughMemoryException; +import org.springframework.http.MediaType; public class AsyncResult implements Runnable, Comparable{ private static Logger log = LoggerFactory.getLogger(AsyncResult.class); - + + public byte[] readAllBytes() { + try { + return stream.readAllBytes(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public void closeWriter() { + stream.closeWriter(); + } + + private MediaType responseType; + + public MediaType getResponseType() { + return responseType; + } + public static enum Status{ SUCCESS { @Override @@ -52,29 +77,82 @@ public PicSureStatus toPicSureStatus() { public abstract PicSureStatus toPicSureStatus(); } - public Query query; - - public Status status; - - public long queuedTime; - - public long completedTime; - - public int retryCount; - - public int queueDepth; - - public int positionInQueue; - - public int numRows; + private Query query; + + public Query getQuery() { + return query; + } + + private Status status; + + public Status getStatus() { + return status; + } + + public AsyncResult setStatus(Status status) { + this.status = status; + return this; + } + + private long queuedTime; + + public long getQueuedTime() { + return queuedTime; + } + + public AsyncResult setQueuedTime(long queuedTime) { + this.queuedTime = queuedTime; + return this; + } + + private long completedTime; + + public long getCompletedTime() { + return completedTime; + } + + private int retryCount; - public int numColumns; + private int queueDepth; + + public int getQueueDepth() { + return queueDepth; + } + + public AsyncResult setQueueDepth(int queueDepth) { + this.queueDepth = queueDepth; + return this; + } + + private int positionInQueue; + + public AsyncResult setPositionInQueue(int positionInQueue) { + this.positionInQueue = positionInQueue; + return this; + } + + private int numRows; - public String id; + private int numColumns; + private String id; + + public String getId() { + return id; + } + + public AsyncResult setId(String id) { + this.id = id; + return this; + } + @JsonIgnore - public ResultStoreStream stream; - + private ResultStoreStream stream; + + public ResultStoreStream getStream() { + return stream; + } + @JsonIgnore private String[] headerRow; @@ -86,21 +164,48 @@ public PicSureStatus toPicSureStatus() { * The actual exception is thrown in @see ResultStore#constructor */ @JsonIgnore - public ExecutorService jobQueue; + private ExecutorService jobQueue; + + public ExecutorService getJobQueue() { + return jobQueue; + } + + public AsyncResult setJobQueue(ExecutorService jobQueue) { + this.jobQueue = jobQueue; + return this; + } @JsonIgnore - public HpdsProcessor processor; + private HpdsProcessor processor; + + public HpdsProcessor getProcessor() { + return processor; + } - public AsyncResult(Query query, String[] headerRow) { + public AsyncResult(Query query, HpdsProcessor processor, ResultWriter writer) { this.query = query; - this.headerRow = headerRow; + this.processor = processor; + this.headerRow = processor.getHeaderRow(query); + this.responseType = writer.getResponseType(); try { - stream = new ResultStoreStream(headerRow, query.getExpectedResultType() == ResultType.DATAFRAME_MERGED); + stream = new ResultStoreStream(headerRow, writer); } catch (IOException e) { log.error("Exception creating result stream", e); } } + public void appendResults(List dataEntries) { + stream.appendResults(dataEntries); + } + public void appendMultiValueResults(List>> dataEntries) { + stream.appendMultiValueResults(dataEntries); + } + + public void appendResultStore(ResultStore resultStore) { + stream.appendResultStore(resultStore); + } + + @Override public void run() { status = AsyncResult.Status.RUNNING; @@ -127,9 +232,15 @@ public void enqueue() { } } + public void open() { + stream.open(); + } + @Override public int compareTo(AsyncResult o) { return this.query.getId().compareTo(o.query.getId()); } - + + + } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java new file mode 100644 index 00000000..6968a587 --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PfbProcessor.java @@ -0,0 +1,117 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing; + +import com.google.common.collect.Lists; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; +import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Component +public class PfbProcessor implements HpdsProcessor { + + public static final String PATIENT_ID_FIELD_NAME = "patient_id"; + private final int ID_BATCH_SIZE; + private final AbstractProcessor abstractProcessor; + + private Logger log = LoggerFactory.getLogger(PfbProcessor.class); + + + @Autowired + public PfbProcessor(AbstractProcessor abstractProcessor) { + this.abstractProcessor = abstractProcessor; + ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0")); + } + + @Override + public String[] getHeaderRow(Query query) { + String[] header = new String[query.getFields().size()+1]; + header[0] = PATIENT_ID_FIELD_NAME; + System.arraycopy(query.getFields().toArray(), 0, header, 1, query.getFields().size()); + return header; + } + + @Override + public void runQuery(Query query, AsyncResult result) { + Set idList = abstractProcessor.getPatientSubsetForQuery(query); + log.info("Processing " + idList.size() + " rows for result " + result.getId()); + Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).stream() + .forEach(patientIds -> { + Map>> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds)); + List>> fieldValuesPerPatient = patientIds.stream().map(patientId -> { + List> objectStream = Arrays.stream(getHeaderRow(query)).map(field -> { + if (PATIENT_ID_FIELD_NAME.equals(field)) { + return List.of(patientId.toString()); + } else { + return pathToPatientToValueMap.get(field).get(patientId); + } + }).collect(Collectors.toList()); + return objectStream; + }).collect(Collectors.toList()); + result.appendMultiValueResults(fieldValuesPerPatient); + }); + result.closeWriter(); + } + + private Map>> buildResult(AsyncResult result, Query query, TreeSet ids) { + ConcurrentHashMap>> pathToPatientToValueMap = new ConcurrentHashMap<>(); + List columns = query.getFields().stream() + .map(abstractProcessor.getDictionary()::get) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + List paths = columns.stream() + .map(ColumnMeta::getName) + .collect(Collectors.toList()); + int columnCount = paths.size() + 1; + + ArrayList columnIndex = abstractProcessor.useResidentCubesFirst(paths, columnCount); + + // todo: investigate if the parallel stream will thrash the cache if the number of executors is > number of resident cubes + columnIndex.parallelStream().forEach((columnId)->{ + String columnPath = paths.get(columnId-1); + Map> patientIdToValueMap = processColumn(ids, columnPath); + pathToPatientToValueMap.put(columnPath, patientIdToValueMap); + }); + + return pathToPatientToValueMap; + } + + private Map> processColumn(TreeSet patientIds, String path) { + + Map> patientIdToValueMap = new HashMap<>(); + PhenoCube cube = abstractProcessor.getCube(path); + + KeyAndValue[] cubeValues = cube.sortedByKey(); + + int idPointer = 0; + for(int patientId : patientIds) { + while(idPointer < cubeValues.length) { + int key = cubeValues[idPointer].getKey(); + if(key < patientId) { + idPointer++; + } else if(key == patientId){ + String value = getResultField(cube, cubeValues, idPointer); + patientIdToValueMap.computeIfAbsent(patientId, k -> new ArrayList<>()).add(value); + idPointer++; + } else { + break; + } + } + } + return patientIdToValueMap; + } + + private String getResultField(PhenoCube cube, KeyAndValue[] cubeValues, + int idPointer) { + Comparable value = cubeValues[idPointer].getValue(); + return value.toString(); + } +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java index 08d83bc6..f02e91f4 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/QueryProcessor.java @@ -53,11 +53,11 @@ public String[] getHeaderRow(Query query) { public void runQuery(Query query, AsyncResult result) { Set idList = abstractProcessor.getPatientSubsetForQuery(query); - log.info("Processing " + idList.size() + " rows for result " + result.id); + log.info("Processing " + idList.size() + " rows for result " + result.getId()); Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).parallelStream() .map(list -> buildResult(result, query, new TreeSet<>(list))) .sequential() - .forEach(result.stream::appendResultStore); + .forEach(result::appendResultStore); } @@ -72,7 +72,7 @@ private ResultStore buildResult(AsyncResult result, Query query, TreeSet columnIndex = abstractProcessor.useResidentCubesFirst(paths, columnCount); - ResultStore results = new ResultStore(result.id, columns, ids); + ResultStore results = new ResultStore(result.getId(), columns, ids); columnIndex.parallelStream().forEach((column)->{ clearColumn(paths, ids, results, column); diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStore.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStore.java index b4ec7631..4fc2f7c4 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStore.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStore.java @@ -124,7 +124,7 @@ private int getFieldOffset(int row, int column) { * @param row * @throws IOException */ - public void readRowIntoStringArray(int rowNumber, int[] columnWidths, String[] row) throws IOException { + public void readRowIntoStringArray(int rowNumber, int[] columnWidths, String[] row) { if(wrappedResultArray == null) { wrappedResultArray = ByteBuffer.wrap(resultArray); } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java index 5bb23d81..d0ffc2f7 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/ResultStoreStream.java @@ -3,111 +3,56 @@ import java.io.*; import java.util.ArrayList; import java.util.List; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.regex.Pattern; +import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter; +import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter; -import org.apache.commons.csv.CSVFormat; -import org.apache.commons.csv.CSVParser; -import org.apache.commons.csv.CSVPrinter; -import org.apache.commons.csv.CSVRecord; - -import de.siegmar.fastcsv.writer.CsvWriter; import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; public class ResultStoreStream extends InputStream { - private CsvWriter writer; - private File tempFile; + private ResultWriter writer; private InputStream in; private int value; private boolean streamIsClosed = false; private int numRows; - private String[] expandedHeader; - private TreeMap> mergedColumnIndex; private String[] originalHeader; - private boolean mergeColumns; - public ResultStoreStream(String[] header, boolean mergeColumns) throws IOException { - writer = new CsvWriter(); - tempFile = File.createTempFile("result-"+ System.nanoTime(), ".sstmp"); + public ResultStoreStream(String[] header, ResultWriter writer) throws IOException { + this.writer = writer; this.originalHeader = header; - if(mergeColumns) { - this.expandedHeader = createMergedColumns(header); - writeHeader(this.expandedHeader); - }else { - writeHeader(this.originalHeader); - } - this.mergeColumns = mergeColumns; + writeHeader(this.originalHeader); numRows = 0; } - private String[] createMergedColumns(String[] header) { - ArrayList allColumns = new ArrayList(); - allColumns.add(header[0]); - TreeMap> mergedColumns = new TreeMap<>(); - this.mergedColumnIndex = new TreeMap<>(); - int columnNumber = 0; - for(String column : header) { - String[] split = column.split("\\\\"); - if(split.length > 1) { - String key = split[1]; - TreeSet subColumns = mergedColumns.get(key); - ArrayList columnIndex = mergedColumnIndex.get(key); - if(subColumns == null) { - subColumns = new TreeSet(); - mergedColumns.put(key, subColumns); - allColumns.add(key); - columnIndex = new ArrayList(); - mergedColumnIndex.put(key, columnIndex); - } - columnIndex.add(columnNumber); - subColumns.add(column); - } - columnNumber++; - } - for(int x = 1;x headerEntries = new ArrayList(); - headerEntries.add(header); - try(FileWriter out = new FileWriter(tempFile);){ - writer.write(out, headerEntries); + public void appendResultStore(ResultStore results) { + int batchSize = 100; + List entries = new ArrayList<>(batchSize); + for(int x = 0;x entries = new ArrayList(batchSize); - for(int x = 0;x entries) { - try (FileWriter out = new FileWriter(tempFile, true);){ - writer.write(out, entries); - numRows += entries.size(); - } catch (IOException e) { - throw new RuntimeException("IOException while appending temp file : " + tempFile.getAbsolutePath(), e); - } + writer.writeEntity(entries); + } + /** + * Appending data to the writer that supports multiple values per patient/variable combination + */ + public void appendMultiValueResults(List>> entries) { + writer.writeMultiValueEntity(entries); } - private List writeResultsToTempFile(ResultStore results, FileWriter out, int batchSize, - List entries) throws IOException { + private List writeResultsToTempFile(ResultStore results, int batchSize, + List entries) { List columns = results.getColumns(); int[] columnWidths = new int[columns.size()]; @@ -123,7 +68,7 @@ private List writeResultsToTempFile(ResultStore results, FileWriter ou if(rowsInBatch < batchSize) { entries = entries.subList(0, rowsInBatch); } - writer.write(out, entries); + writer.writeEntity(entries); numRows += rowsInBatch; } return entries; @@ -139,68 +84,12 @@ public void close() { public void open() { try { - in = new BufferedInputStream(new FileInputStream(new File(tempFile.getAbsolutePath())), 1024 * 1024 * 8); - if(mergeColumns) { - File mergedFile = File.createTempFile(tempFile.getName(), "_merged"); - FileWriter out = new FileWriter(mergedFile); - CSVParser parser = CSVFormat.DEFAULT.withDelimiter(',').parse(new InputStreamReader(in)); - CSVPrinter writer = new CSVPrinter(out, CSVFormat.DEFAULT.withDelimiter(',')); - final boolean[] firstRow = new boolean[] {true}; - parser.forEach((CSVRecord record)->{ - if(firstRow[0]) { - try { - ArrayList header = new ArrayList<>(); - header.add("Patient ID"); - header.addAll(mergedColumnIndex.keySet()); - writer.printRecord(header); - firstRow[0] = false; - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }else { - ArrayList records = new ArrayList(); - records.add(record.get(0)); - for(String column : mergedColumnIndex.keySet()) { - ArrayList valuesToMerge = new ArrayList<>(); - for(Integer columnNumber : mergedColumnIndex.get(column)) { - String value = record.get(columnNumber); - if( value != null && ! value.isEmpty() ) { - value = value.replaceAll("\"", "'"); - String label = originalHeader[columnNumber].replaceAll("\\\\"+ column, ""); - if(label.length()>1) { - label = label.substring(1, label.length()-1); - }else { - label = null; - } - if(label==null || label.trim().contentEquals(value.trim())) { - valuesToMerge.add(value); - } else { - valuesToMerge.add(label==null ? value : label.replaceAll("\\\\"+Pattern.quote(value), "") + " : " + value); - } - } - } - records.add(String.join(";", valuesToMerge)); - } - try { - writer.printRecord(records); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - } - }); - parser.close(); - writer.close(); - out.close(); - in.close(); - in = new BufferedInputStream(new FileInputStream(mergedFile), 1024 * 1024 * 8); - } + in = new BufferedInputStream(new FileInputStream(writer.getFile().getAbsolutePath()), 1024 * 1024 * 8); streamIsClosed = false; } catch (FileNotFoundException e) { - throw new RuntimeException("temp file for result not found : " + tempFile.getAbsolutePath()); - } catch (IOException e) { - throw new UncheckedIOException(e); + throw new RuntimeException("temp file for result not found : " + writer.getFile().getAbsolutePath()); } - } + } @@ -222,7 +111,10 @@ int getNumRows() { } public long estimatedSize() { - return tempFile.length(); + return writer.getFile().length(); } + public void closeWriter() { + writer.close(); + } } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/TimeseriesProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/TimeseriesProcessor.java index fe539659..9e01ad92 100644 --- a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/TimeseriesProcessor.java +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/TimeseriesProcessor.java @@ -127,11 +127,11 @@ private void addDataForConcepts(Collection pathList, Set exporte } //batch exports so we don't take double memory (valuesForKeys + dataEntries could be a lot of data points) if(dataEntries.size() >= ID_BATCH_SIZE) { - result.stream.appendResults(dataEntries); + result.appendResults(dataEntries); dataEntries = new ArrayList(); } } - result.stream.appendResults(dataEntries); + result.appendResults(dataEntries); exportedConceptPaths.add(conceptPath); } } diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java new file mode 100644 index 00000000..3302d06e --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/CsvWriter.java @@ -0,0 +1,73 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.io; + +import org.springframework.http.MediaType; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class CsvWriter implements ResultWriter { + + private final de.siegmar.fastcsv.writer.CsvWriter csvWriter; + + private final FileWriter fileWriter; + + private final File file; + + public CsvWriter(File file) { + this.file = file; + csvWriter = new de.siegmar.fastcsv.writer.CsvWriter(); + try { + this.fileWriter = new FileWriter(file); + } catch (IOException e) { + throw new RuntimeException("IOException while appending temp file : " + file.getAbsolutePath(), e); + } + } + + @Override + public void writeHeader(String[] data) { + try { + List dataList = new ArrayList<>(); + dataList.add(data); + csvWriter.write(fileWriter, dataList); + } catch (IOException e) { + throw new RuntimeException("IOException while appending to CSV file", e); + } + } + @Override + public void writeEntity(Collection data) { + try { + csvWriter.write(fileWriter, data); + } catch (IOException e) { + throw new RuntimeException("IOException while appending to CSV file", e); + } + } + + @Override + public void writeMultiValueEntity(Collection>> data) { + throw new RuntimeException("Method not implemented"); + } + + @Override + public File getFile() { + return file; + } + + @Override + public MediaType getResponseType() { + return MediaType.TEXT_PLAIN; + } + + @Override + public void close() { + try { + fileWriter.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java new file mode 100644 index 00000000..9f09bbd8 --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriter.java @@ -0,0 +1,195 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.io; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.Codec; +import org.apache.avro.file.CodecFactory; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; + +import java.io.File; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.*; +import java.util.stream.Collectors; + +public class PfbWriter implements ResultWriter { + + private Logger log = LoggerFactory.getLogger(PfbWriter.class); + + private final Schema metadataSchema; + private final Schema nodeSchema; + private SchemaBuilder.FieldAssembler entityFieldAssembler; + + private List fields; + private DataFileWriter dataFileWriter; + private File file; + private Schema entitySchema; + private Schema patientDataSchema; + + private static final Set SINGULAR_FIELDS = Set.of("patient_id"); + + public PfbWriter(File tempFile) { + file = tempFile; + entityFieldAssembler = SchemaBuilder.record("entity") + .namespace("edu.harvard.dbmi") + .fields(); + + SchemaBuilder.FieldAssembler nodeRecord = SchemaBuilder.record("nodes") + .fields() + .requiredString("name") + .nullableString("ontology_reference", "null") + .name("values").type(SchemaBuilder.map().values(SchemaBuilder.nullable().stringType())).noDefault(); + nodeSchema = nodeRecord.endRecord(); + + SchemaBuilder.FieldAssembler metadataRecord = SchemaBuilder.record("metadata") + .fields(); + metadataRecord.requiredString("misc"); + metadataRecord = metadataRecord.name("nodes").type(SchemaBuilder.array().items(nodeSchema)).noDefault(); + metadataSchema = metadataRecord.endRecord(); + } + + @Override + public void writeHeader(String[] data) { + fields = Arrays.stream(data.clone()).map(this::formatFieldName).collect(Collectors.toList()); + SchemaBuilder.FieldAssembler patientRecords = SchemaBuilder.record("patientData") + .fields(); + + fields.forEach(field -> { + if (isSingularField(field)) { + patientRecords.nullableString(field, "null"); + } else { + patientRecords.name(field).type(SchemaBuilder.array().items(SchemaBuilder.nullable().stringType())).noDefault(); + } + + }); + patientDataSchema = patientRecords.endRecord(); + + Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema); + + entityFieldAssembler = entityFieldAssembler.name("object").type(objectSchema).noDefault(); + entityFieldAssembler.nullableString("id", "null"); + entityFieldAssembler.requiredString("name"); + entitySchema = entityFieldAssembler.endRecord(); + + DatumWriter datumWriter = new GenericDatumWriter(entitySchema); + dataFileWriter = new DataFileWriter(datumWriter); + try { + log.info("Creating temp avro file at " + file.getAbsoluteFile()); + dataFileWriter.setCodec(CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL)); + dataFileWriter.create(entitySchema, file); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + writeMetadata(); + } + + private boolean isSingularField(String field) { + return SINGULAR_FIELDS.contains(field); + } + + /** + * Transforms our variable names to once that are valid avro fields. We replace invalid characters with underscores + * and add a leading underscore if the variable starts with a number + */ + protected String formatFieldName(String s) { + String formattedFieldName = s.replaceAll("\\W", "_"); + if (Character.isDigit(formattedFieldName.charAt(0))) { + return "_" + formattedFieldName; + } + return formattedFieldName; + } + + private void writeMetadata() { + GenericRecord entityRecord = new GenericData.Record(entitySchema); + + List nodeList = new ArrayList<>(); + for (String field : fields) { + GenericRecord nodeData = new GenericData.Record(nodeSchema); + nodeData.put("name", field); + nodeData.put("ontology_reference", ""); + nodeData.put("values", Map.of()); + nodeList.add(nodeData); + } + GenericRecord metadata = new GenericData.Record(metadataSchema); + metadata.put("misc", ""); + metadata.put("nodes", nodeList); + + + entityRecord.put("object", metadata); + entityRecord.put("name", "metadata"); + entityRecord.put("id", "null"); + + try { + dataFileWriter.append(entityRecord); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public void writeEntity(Collection entities) { + throw new RuntimeException("Method not supported, use writeMultiValueEntity instead"); + } + + @Override + public void writeMultiValueEntity(Collection>> entities) { + entities.forEach(entity -> { + if (entity.size() != fields.size()) { + throw new IllegalArgumentException("Entity length much match the number of fields in this document"); + } + GenericRecord patientData = new GenericData.Record(patientDataSchema); + String patientId = ""; + for(int i = 0; i < fields.size(); i++) { + if ("patient_id".equals(fields.get(i))) { + patientId = (entity.get(i) != null && !entity.get(i).isEmpty()) ? entity.get(i).get(0) : ""; + } + if (isSingularField(fields.get(i))) { + String entityValue = (entity.get(i) != null && !entity.get(i).isEmpty()) ? entity.get(i).get(0) : ""; + patientData.put(fields.get(i), entityValue); + } else { + List fieldValue = entity.get(i) != null ? entity.get(i) : List.of(); + patientData.put(fields.get(i), fieldValue); + } + } + + + GenericRecord entityRecord = new GenericData.Record(entitySchema); + entityRecord.put("object", patientData); + entityRecord.put("name", "patientData"); + entityRecord.put("id", patientId); + + try { + dataFileWriter.append(entityRecord); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + + @Override + public void close() { + try { + dataFileWriter.close(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public File getFile() { + return file; + } + + @Override + public MediaType getResponseType() { + return MediaType.APPLICATION_OCTET_STREAM; + } +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/ResultWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/ResultWriter.java new file mode 100644 index 00000000..c0a81929 --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/ResultWriter.java @@ -0,0 +1,21 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.io; + +import org.springframework.http.MediaType; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +public interface ResultWriter { + void writeHeader(String[] data); + + void writeEntity(Collection data); + void writeMultiValueEntity(Collection>> data); + + File getFile(); + + MediaType getResponseType(); + + void close(); +} diff --git a/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java new file mode 100644 index 00000000..7e18d1a7 --- /dev/null +++ b/processing/src/test/java/edu/harvard/hms/dbmi/avillach/hpds/processing/io/PfbWriterTest.java @@ -0,0 +1,59 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing.io; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; + + +public class PfbWriterTest { + + @Test + public void writeValidPFB() { + PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro")); + + pfbWriter.writeHeader(new String[] {"patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\"}); + List> nullableList = new ArrayList<>(); + nullableList.add(List.of("123")); + nullableList.add(null); + nullableList.add(List.of("Y")); + pfbWriter.writeMultiValueEntity(List.of( + nullableList, + List.of(List.of("456"), List.of("80") ,List.of("N", "Y")), + List.of(List.of(), List.of("75"), List.of()) + )); + pfbWriter.writeMultiValueEntity(List.of( + List.of(List.of("123"), List.of("80"), List.of("Y")), + List.of(List.of("456"), List.of("70"),List.of("N", "Y")), + List.of(List.of(), List.of("75"), List.of()) + )); + pfbWriter.close(); + // todo: validate this programatically + } + + @Test + public void formatFieldName_spacesAndBackslashes_replacedWithUnderscore() { + PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro")); + String formattedName = pfbWriter.formatFieldName("\\Topmed Study Accession with Subject ID\\\\"); + assertEquals("_Topmed_Study_Accession_with_Subject_ID__", formattedName); + } + + @Test + public void formatFieldName_startsWithDigit_prependUnderscore() { + PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro")); + String formattedName = pfbWriter.formatFieldName("123Topmed Study Accession with Subject ID\\\\"); + assertEquals("_123Topmed_Study_Accession_with_Subject_ID__", formattedName); + } + + @Test + public void formatFieldName_randomGarbage_replaceWithUnderscore() { + PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro")); + String formattedName = pfbWriter.formatFieldName("$$$my garbage @vro var!able nam#"); + assertEquals("___my_garbage__vro_var_able_nam_", formattedName); + } +} \ No newline at end of file diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java index 6c47f882..b88c97f7 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java @@ -13,6 +13,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.InputStreamResource; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; @@ -193,22 +194,22 @@ private Query convertIncomingQuery(QueryRequest queryJson) private QueryStatus convertToQueryStatus(AsyncResult entity) { QueryStatus status = new QueryStatus(); - status.setDuration(entity.completedTime == 0 ? 0 : entity.completedTime - entity.queuedTime); - status.setResourceResultId(entity.id); - status.setResourceStatus(entity.status.name()); - if (entity.status == AsyncResult.Status.SUCCESS) { - status.setSizeInBytes(entity.stream.estimatedSize()); + status.setDuration(entity.getCompletedTime() == 0 ? 0 : entity.getCompletedTime() - entity.getQueuedTime()); + status.setResourceResultId(entity.getId()); + status.setResourceStatus(entity.getStatus().name()); + if (entity.getStatus() == AsyncResult.Status.SUCCESS) { + status.setSizeInBytes(entity.getStream().estimatedSize()); } - status.setStartTime(entity.queuedTime); - status.setStatus(entity.status.toPicSureStatus()); + status.setStartTime(entity.getQueuedTime()); + status.setStatus(entity.getStatus().toPicSureStatus()); Map metadata = new HashMap(); - metadata.put("picsureQueryId", UUIDv5.UUIDFromString(entity.query.toString())); + metadata.put("picsureQueryId", UUIDv5.UUIDFromString(entity.getQuery().toString())); status.setResultMetadata(metadata); return status; } - @PostMapping(value = "/query/{resourceQueryId}/result", produces = MediaType.TEXT_PLAIN_VALUE) + @PostMapping(value = "/query/{resourceQueryId}/result") public ResponseEntity queryResult(@PathVariable("resourceQueryId") UUID queryId, @RequestBody QueryRequest resultRequest) throws IOException { AsyncResult result = queryService.getResultFor(queryId.toString()); if (result == null) { @@ -226,13 +227,13 @@ public ResponseEntity queryResult(@PathVariable("resourceQueryId") UUID queryId, return ResponseEntity.status(404).build(); } } - if (result.status == AsyncResult.Status.SUCCESS) { - result.stream.open(); + if (result.getStatus() == AsyncResult.Status.SUCCESS) { + result.open(); return ResponseEntity.ok() - .contentType(MediaType.TEXT_PLAIN) - .body(new String(result.stream.readAllBytes(), StandardCharsets.UTF_8)); + .contentType(result.getResponseType()) + .body(new InputStreamResource(result.getStream())); } else { - return ResponseEntity.status(400).body("Status : " + result.status.name()); + return ResponseEntity.status(400).body("Status : " + result.getStatus().name()); } } @@ -295,20 +296,6 @@ private ResponseEntity _querySync(QueryRequest resultRequest) throws IOException case DATAFRAME: case SECRET_ADMIN_DATAFRAME: - case DATAFRAME_MERGED: - QueryStatus status = query(resultRequest).getBody(); - while (status.getResourceStatus().equalsIgnoreCase("RUNNING") - || status.getResourceStatus().equalsIgnoreCase("PENDING")) { - status = queryStatus(UUID.fromString(status.getResourceResultId()), null); - } - log.info(status.toString()); - - AsyncResult result = queryService.getResultFor(status.getResourceResultId()); - if (result.status == AsyncResult.Status.SUCCESS) { - result.stream.open(); - return queryOkResponse(new String(result.stream.readAllBytes(), StandardCharsets.UTF_8), incomingQuery, MediaType.TEXT_PLAIN); - } - return ResponseEntity.status(400).contentType(MediaType.APPLICATION_JSON).body("Status : " + result.status.name()); case CROSS_COUNT: return queryOkResponse(countProcessor.runCrossCounts(incomingQuery), incomingQuery, MediaType.APPLICATION_JSON); diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java index c2fc7b78..5fe1a0a9 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/QueryService.java @@ -1,5 +1,6 @@ package edu.harvard.hms.dbmi.avillach.hpds.service; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.util.*; @@ -7,6 +8,10 @@ import java.util.function.Predicate; import java.util.stream.Collectors; +import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType; +import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter; +import edu.harvard.hms.dbmi.avillach.hpds.processing.io.PfbWriter; +import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,6 +48,7 @@ public class QueryService { private final QueryProcessor queryProcessor; private final TimeseriesProcessor timeseriesProcessor; private final CountProcessor countProcessor; + private final PfbProcessor pfbProcessor; HashMap results = new HashMap<>(); @@ -52,6 +58,7 @@ public QueryService (AbstractProcessor abstractProcessor, QueryProcessor queryProcessor, TimeseriesProcessor timeseriesProcessor, CountProcessor countProcessor, + PfbProcessor pfbProcessor, @Value("${SMALL_JOB_LIMIT}") Integer smallJobLimit, @Value("${SMALL_TASK_THREADS}") Integer smallTaskThreads, @Value("${LARGE_TASK_THREADS}") Integer largeTaskThreads) { @@ -59,6 +66,7 @@ public QueryService (AbstractProcessor abstractProcessor, this.queryProcessor = queryProcessor; this.timeseriesProcessor = timeseriesProcessor; this.countProcessor = countProcessor; + this.pfbProcessor = pfbProcessor; SMALL_JOB_LIMIT = smallJobLimit; SMALL_TASK_THREADS = smallTaskThreads; @@ -85,17 +93,17 @@ public AsyncResult runQuery(Query query) throws ClassNotFoundException, IOExcept // This is all the validation we do for now. if(!ensureAllFieldsExist(query)) { - result.status = Status.ERROR; + result.setStatus(Status.ERROR); }else { if(query.getFields().size() > SMALL_JOB_LIMIT) { - result.jobQueue = largeTaskExecutor; + result.setJobQueue(largeTaskExecutor); } else { - result.jobQueue = smallTaskExecutor; + result.setJobQueue(smallTaskExecutor); } result.enqueue(); } - return getStatusFor(result.id); + return getStatusFor(result.getId()); } ExecutorService countExecutor = Executors.newSingleThreadExecutor(); @@ -108,30 +116,38 @@ private AsyncResult initializeResult(Query query) throws ClassNotFoundException, HpdsProcessor p; switch(query.getExpectedResultType()) { - case DATAFRAME : - case SECRET_ADMIN_DATAFRAME: - case DATAFRAME_MERGED : - p = queryProcessor; - break; - case DATAFRAME_TIMESERIES : - p = timeseriesProcessor; - break; - case COUNT : - case CATEGORICAL_CROSS_COUNT : - case CONTINUOUS_CROSS_COUNT : - p = countProcessor; - break; - default : - throw new RuntimeException("UNSUPPORTED RESULT TYPE"); + case DATAFRAME : + case SECRET_ADMIN_DATAFRAME: + p = queryProcessor; + break; + case DATAFRAME_TIMESERIES : + p = timeseriesProcessor; + break; + case COUNT : + case CATEGORICAL_CROSS_COUNT : + case CONTINUOUS_CROSS_COUNT : + p = countProcessor; + break; + case DATAFRAME_PFB: + p = pfbProcessor; + break; + default : + throw new RuntimeException("UNSUPPORTED RESULT TYPE"); } - - AsyncResult result = new AsyncResult(query, p.getHeaderRow(query)); - result.status = AsyncResult.Status.PENDING; - result.queuedTime = System.currentTimeMillis(); - result.id = UUIDv5.UUIDFromString(query.toString()).toString(); - result.processor = p; - query.setId(result.id); - results.put(result.id, result); + + ResultWriter writer; + if (ResultType.DATAFRAME_PFB.equals(query.getExpectedResultType())) { + writer = new PfbWriter(File.createTempFile("result-" + System.nanoTime(), ".avro")); + } else { + writer = new CsvWriter(File.createTempFile("result-" + System.nanoTime(), ".sstmp")); + } + + AsyncResult result = new AsyncResult(query, p, writer) + .setStatus(AsyncResult.Status.PENDING) + .setQueuedTime(System.currentTimeMillis()) + .setId(UUIDv5.UUIDFromString(query.toString()).toString()); + query.setId(result.getId()); + results.put(result.getId(), result); return result; } @@ -209,21 +225,21 @@ private List includingOnlyDictionaryFields(Set fields, Set SMALL_JOB_LIMIT ? + AsyncResult[] queue = asyncResult.getQuery().getFields().size() > SMALL_JOB_LIMIT ? largeTaskExecutionQueue.toArray(new AsyncResult[largeTaskExecutionQueue.size()]) : smallTaskExecutionQueue.toArray(new AsyncResult[smallTaskExecutionQueue.size()]); - if(asyncResult.status == Status.PENDING) { + if(asyncResult.getStatus() == Status.PENDING) { ArrayList queueSnapshot = new ArrayList(); for(int x = 0;x