Skip to content

Commit

Permalink
ALS-6511: Add PFB result type (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 authored Aug 19, 2024
1 parent b2c6346 commit 1cdfd3e
Show file tree
Hide file tree
Showing 16 changed files with 730 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ public String toString() {
writePartFormat("Observation Count Fields", fields, builder, true);
break;
case DATAFRAME:
case DATAFRAME_MERGED:
case SECRET_ADMIN_DATAFRAME:
writePartFormat("Data Export Fields", fields, builder, true);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ public enum ResultType {
* Return the number of observations for included patients and
* included fields, broken up across the included cross count fields.
*/
OBSERVATION_CROSS_COUNT,
/**
* This was developed for UDN, but is completely useless and should
* be deleted.
*/
DATAFRAME_MERGED,
OBSERVATION_CROSS_COUNT,
/**
* Not completely implemented and currently dead code. Someone with
* statistics experience needs to develop a p-value based filter for
Expand Down Expand Up @@ -94,5 +89,10 @@ public enum ResultType {
* is suitable to time series analysis and/or loading into another
* instance of HPDS.
*/
DATAFRAME_TIMESERIES
DATAFRAME_TIMESERIES,
/**
* Exports data as PFB, using avro
* <a href="https://uc-cdis.github.io/pypfb/">https://uc-cdis.github.io/pypfb/</a>
*/
DATAFRAME_PFB
}
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,16 @@
<version>1.18.30</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.3</version>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.1.10.5</version>
</dependency>


</dependencies>
Expand Down
10 changes: 10 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,15 @@
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.xerial.snappy/snappy-java -->
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;

import edu.harvard.hms.dbmi.avillach.hpds.processing.io.CsvWriter;
import edu.harvard.hms.dbmi.avillach.hpds.processing.io.ResultWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -13,11 +19,30 @@
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.ResultType;
import edu.harvard.hms.dbmi.avillach.hpds.exception.NotEnoughMemoryException;
import org.springframework.http.MediaType;

public class AsyncResult implements Runnable, Comparable<AsyncResult>{

private static Logger log = LoggerFactory.getLogger(AsyncResult.class);


public byte[] readAllBytes() {
try {
return stream.readAllBytes();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public void closeWriter() {
stream.closeWriter();
}

private MediaType responseType;

public MediaType getResponseType() {
return responseType;
}

public static enum Status{
SUCCESS {
@Override
Expand Down Expand Up @@ -52,29 +77,82 @@ public PicSureStatus toPicSureStatus() {
public abstract PicSureStatus toPicSureStatus();
}

public Query query;

public Status status;

public long queuedTime;

public long completedTime;

public int retryCount;

public int queueDepth;

public int positionInQueue;

public int numRows;
private Query query;

public Query getQuery() {
return query;
}

private Status status;

public Status getStatus() {
return status;
}

public AsyncResult setStatus(Status status) {
this.status = status;
return this;
}

private long queuedTime;

public long getQueuedTime() {
return queuedTime;
}

public AsyncResult setQueuedTime(long queuedTime) {
this.queuedTime = queuedTime;
return this;
}

private long completedTime;

public long getCompletedTime() {
return completedTime;
}

private int retryCount;

public int numColumns;
private int queueDepth;

public int getQueueDepth() {
return queueDepth;
}

public AsyncResult setQueueDepth(int queueDepth) {
this.queueDepth = queueDepth;
return this;
}

private int positionInQueue;

public AsyncResult setPositionInQueue(int positionInQueue) {
this.positionInQueue = positionInQueue;
return this;
}

private int numRows;

public String id;
private int numColumns;

private String id;

public String getId() {
return id;
}

public AsyncResult setId(String id) {
this.id = id;
return this;
}

@JsonIgnore
public ResultStoreStream stream;

private ResultStoreStream stream;

public ResultStoreStream getStream() {
return stream;
}

@JsonIgnore
private String[] headerRow;

Expand All @@ -86,21 +164,48 @@ public PicSureStatus toPicSureStatus() {
* The actual exception is thrown in @see ResultStore#constructor
*/
@JsonIgnore
public ExecutorService jobQueue;
private ExecutorService jobQueue;

public ExecutorService getJobQueue() {
return jobQueue;
}

public AsyncResult setJobQueue(ExecutorService jobQueue) {
this.jobQueue = jobQueue;
return this;
}

@JsonIgnore
public HpdsProcessor processor;
private HpdsProcessor processor;

public HpdsProcessor getProcessor() {
return processor;
}

public AsyncResult(Query query, String[] headerRow) {
public AsyncResult(Query query, HpdsProcessor processor, ResultWriter writer) {
this.query = query;
this.headerRow = headerRow;
this.processor = processor;
this.headerRow = processor.getHeaderRow(query);
this.responseType = writer.getResponseType();
try {
stream = new ResultStoreStream(headerRow, query.getExpectedResultType() == ResultType.DATAFRAME_MERGED);
stream = new ResultStoreStream(headerRow, writer);
} catch (IOException e) {
log.error("Exception creating result stream", e);
}
}

public void appendResults(List<String[]> dataEntries) {
stream.appendResults(dataEntries);
}
public void appendMultiValueResults(List<List<List<String>>> dataEntries) {
stream.appendMultiValueResults(dataEntries);
}

public void appendResultStore(ResultStore resultStore) {
stream.appendResultStore(resultStore);
}


@Override
public void run() {
status = AsyncResult.Status.RUNNING;
Expand All @@ -127,9 +232,15 @@ public void enqueue() {
}
}

public void open() {
stream.open();
}

@Override
public int compareTo(AsyncResult o) {
return this.query.getId().compareTo(o.query.getId());
}




}
Loading

0 comments on commit 1cdfd3e

Please sign in to comment.