From 0047619f34fcd50069224e21b8c0d29b0e409f57 Mon Sep 17 00:00:00 2001 From: Ryan Amari Date: Wed, 13 Sep 2023 10:29:17 -0400 Subject: [PATCH] ALS-4979: Initial commit for avro proof of concept. In progress --- .../avillach/hpds/data/query/ResultType.java | 6 +- pom.xml | 5 + processing/pom.xml | 4 + .../hpds/processing/AvroQueryProcessor.java | 291 ++++++++++++++++++ .../avillach/hpds/processing/PFBWriter.java | 45 +++ .../avillach/hpds/service/PicSureService.java | 10 +- 6 files changed, 358 insertions(+), 3 deletions(-) create mode 100644 processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AvroQueryProcessor.java create mode 100644 processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PFBWriter.java diff --git a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java index 4481190b..78c083e6 100644 --- a/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java +++ b/client-api/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/data/query/ResultType.java @@ -94,5 +94,9 @@ public enum ResultType { * is suitable to time series analysis and/or loading into another * instance of HPDS. */ - DATAFRAME_TIMESERIES + DATAFRAME_TIMESERIES, + /** + * This exports a dataframe in PFB avro fromat + */ + DATAFRAME_AVRO } diff --git a/pom.xml b/pom.xml index df3ca3fb..54f13dd8 100644 --- a/pom.xml +++ b/pom.xml @@ -320,6 +320,11 @@ spring-test 4.3.30.RELEASE + + org.apache.avro + avro + 1.11.2 + diff --git a/processing/pom.xml b/processing/pom.xml index f989a9d9..23d103e2 100644 --- a/processing/pom.xml +++ b/processing/pom.xml @@ -35,5 +35,9 @@ org.springframework spring-web + + org.apache.avro + avro + diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AvroQueryProcessor.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AvroQueryProcessor.java new file mode 100644 index 00000000..ebbbacc2 --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/AvroQueryProcessor.java @@ -0,0 +1,291 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing; + +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks; +import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue; +import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube; +import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * This class handles Avro export queries for HPDS. + * @author ramari16 + * + */ +@Component +public class AvroQueryProcessor { + + private static final byte[] EMPTY_STRING_BYTES = "".getBytes(); + private Logger log = LoggerFactory.getLogger(AvroQueryProcessor.class); + + private final String ID_CUBE_NAME; + private final int ID_BATCH_SIZE; + + private final AbstractProcessor abstractProcessor; + + @Autowired + public AvroQueryProcessor(AbstractProcessor abstractProcessor) { + this.abstractProcessor = abstractProcessor; + ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0")); + ID_CUBE_NAME = System.getProperty("ID_CUBE_NAME", "NONE"); + } + + public String[] getHeaderRow(Query query) { + String[] header = new String[query.getFields().size()+1]; + header[0] = "Patient ID"; + System.arraycopy(query.getFields().toArray(), 0, header, 1, query.getFields().size()); + return header; + } + + public Schema generateSchema(Query query) { + SchemaBuilder.FieldAssembler record = SchemaBuilder.record("pfb") + .namespace("edu.harvard.hms.dbmi.avillach") + .fields(); + query.getFields().stream() + .map(abstractProcessor.getDictionary()::get) + .filter(Objects::nonNull) + .forEach(columnMeta -> { + if (columnMeta.isCategorical()) { + record.optionalString(getValidAvroName(columnMeta.getName())); + } else { + record.optionalDouble(getValidAvroName(columnMeta.getName())); + } + }); + record.requiredInt("patientId"); + return record.endRecord(); + } + + private String getValidAvroName(String string) { + return string.replaceAll("\\\\", "_").replaceAll(" ", "_"); + } + + public String runQuery(Query query) throws IOException { + TreeSet idList = abstractProcessor.getPatientSubsetForQuery(query); + log.info("Processing " + idList.size() + " rows for result"); + Schema schema = generateSchema(query); + GenericRecord[] genericRecords = buildResult(query, idList, schema); + + + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + + File file = new File("/tmp/test.avro"); + dataFileWriter.create(schema, file); + for (GenericRecord record : genericRecords) { + dataFileWriter.append(record); + } + dataFileWriter.close(); + return Files.readString(Path.of("/tmp/test.avro"), StandardCharsets.ISO_8859_1); + } + + + private GenericRecord[] buildResult(Query query, TreeSet ids, Schema schema) { + List columns = query.getFields().stream() + .map(abstractProcessor.getDictionary()::get) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + List paths = columns.stream() + .map(ColumnMeta::getName) + .collect(Collectors.toList()); + int columnCount = paths.size() + 1; + + ArrayList columnIndex = abstractProcessor.useResidentCubesFirst(paths, columnCount); + + GenericRecord[] allRecords = ids.stream().map(id -> { + GenericData.Record record = new GenericData.Record(schema); + record.put("patientId", id); + return record; + }).toArray(GenericRecord[]::new); + + Integer[] patientIds = ids.toArray(new Integer[]{}); + + columnIndex.parallelStream().forEach((column)->{ + // todo: this is only pheno fields right now + PhenoCube cube = abstractProcessor.getCube(paths.get(column - 1)); + + KeyAndValue[] cubeValues = cube.sortedByKey(); + + int cubeIdPointer = 0; + for (int patientRow = 0; patientRow < ids.size(); patientRow++) { + int patientId = patientIds[patientRow]; + while(cubeIdPointer < cubeValues.length) { + KeyAndValue cubeKeyValue = cubeValues[cubeIdPointer]; + int key = cubeKeyValue.getKey(); + if(key < patientId) { + cubeIdPointer++; + } else if(key == patientId){ + Comparable value = cubeKeyValue.getValue(); + GenericRecord patientRecord = allRecords[patientRow]; + if(cube.isStringType()) { + patientRecord.put(getValidAvroName(columns.get(column - 1).getName()), value.toString()); + }else { + patientRecord.put(getValidAvroName(columns.get(column - 1).getName()), (Double)value); + } + cubeIdPointer++; + break; + } else { + // no value found + break; + } + } + } + }); + + return allRecords; + } + + private void clearColumn(List paths, TreeSet ids, ResultStore results, Integer x) { + String path = paths.get(x-1); + if(VariantUtils.pathIsVariantSpec(path)) { + ByteBuffer doubleBuffer = ByteBuffer.allocate(Double.BYTES); + int idInSubsetPointer = 0; + for(int id : ids) { + writeVariantNullResultField(results, x, doubleBuffer, idInSubsetPointer); + idInSubsetPointer++; + } + } else { + PhenoCube cube = abstractProcessor.getCube(path); + ByteBuffer doubleBuffer = ByteBuffer.allocate(Double.BYTES); + int idInSubsetPointer = 0; + for(int id : ids) { + writeNullResultField(results, x, cube, doubleBuffer, idInSubsetPointer); + idInSubsetPointer++; + } + } + } + + private void processColumn(List paths, TreeSet ids, ResultStore results, + Integer x) { + String path = paths.get(x-1); + if(VariantUtils.pathIsVariantSpec(path)) { + VariantMasks masks = abstractProcessor.getMasks(path, new VariantBucketHolder()); + String[] patientIds = abstractProcessor.getPatientIds(); + int idPointer = 0; + + ByteBuffer doubleBuffer = ByteBuffer.allocate(Double.BYTES); + int idInSubsetPointer = 0; + for(int id : ids) { + while(idPointer < patientIds.length) { + int key = Integer.parseInt(patientIds[idPointer]); + if(key < id) { + idPointer++; + } else if(key == id){ + idPointer = writeVariantResultField(results, x, masks, idPointer, doubleBuffer, + idInSubsetPointer); + break; + } else { + writeVariantNullResultField(results, x, doubleBuffer, idInSubsetPointer); + break; + } + } + idInSubsetPointer++; + } + }else { + PhenoCube cube = abstractProcessor.getCube(path); + + KeyAndValue[] cubeValues = cube.sortedByKey(); + + int idPointer = 0; + + ByteBuffer doubleBuffer = ByteBuffer.allocate(Double.BYTES); + int idInSubsetPointer = 0; + for(int id : ids) { + while(idPointer < cubeValues.length) { + int key = cubeValues[idPointer].getKey(); + if(key < id) { + idPointer++; + } else if(key == id){ + idPointer = writeResultField(results, x, cube, cubeValues, idPointer, doubleBuffer, + idInSubsetPointer); + break; + } else { + writeNullResultField(results, x, cube, doubleBuffer, idInSubsetPointer); + break; + } + } + idInSubsetPointer++; + } + } + } + + private void writeVariantNullResultField(ResultStore results, Integer x, ByteBuffer doubleBuffer, + int idInSubsetPointer) { + byte[] valueBuffer = null; + valueBuffer = EMPTY_STRING_BYTES; + results.writeField(x,idInSubsetPointer, valueBuffer); + } + + private int writeVariantResultField(ResultStore results, Integer x, VariantMasks masks, int idPointer, + ByteBuffer doubleBuffer, int idInSubsetPointer) { + byte[] valueBuffer; + if(masks.heterozygousMask != null && masks.heterozygousMask.testBit(idPointer)) { + valueBuffer = "0/1".getBytes(); + }else if(masks.homozygousMask != null && masks.homozygousMask.testBit(idPointer)) { + valueBuffer = "1/1".getBytes(); + }else { + valueBuffer = "0/0".getBytes(); + } + valueBuffer = masks.toString().getBytes(); + results.writeField(x,idInSubsetPointer, valueBuffer); + return idPointer; + } + + private int writeResultField(ResultStore results, Integer x, PhenoCube cube, KeyAndValue[] cubeValues, + int idPointer, ByteBuffer doubleBuffer, int idInSubsetPointer) { + byte[] valueBuffer; + Comparable value = cubeValues[idPointer++].getValue(); + if(cube.isStringType()) { + valueBuffer = value.toString().getBytes(); + }else { + valueBuffer = doubleBuffer.putDouble((Double)value).array(); + doubleBuffer.clear(); + } + results.writeField(x,idInSubsetPointer, valueBuffer); + return idPointer; + } + + /** + * Correctly handle null records. A numerical value should be a NaN if it is missing to distinguish from a zero. A + * String based value(categorical) should be empty instead of null because the word null might be a valid value. + * + * @param results + * @param x + * @param cube + * @param doubleBuffer + * @param idInSubsetPointer + */ + private void writeNullResultField(ResultStore results, Integer x, PhenoCube cube, ByteBuffer doubleBuffer, int idInSubsetPointer) { + byte[] valueBuffer = null; + if(cube.isStringType()) { + valueBuffer = EMPTY_STRING_BYTES; + }else { + Double nullDouble = Double.NaN; + valueBuffer = doubleBuffer.putDouble(nullDouble).array(); + doubleBuffer.clear(); + } + results.writeField(x,idInSubsetPointer, valueBuffer); + } +} diff --git a/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PFBWriter.java b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PFBWriter.java new file mode 100644 index 00000000..e3253f24 --- /dev/null +++ b/processing/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/processing/PFBWriter.java @@ -0,0 +1,45 @@ +package edu.harvard.hms.dbmi.avillach.hpds.processing; + +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; + +import java.io.File; +import java.io.IOException; + +public class PFBWriter { + + public Schema generateSchema() { + SchemaBuilder.FieldAssembler record = SchemaBuilder.record("pfb") + .namespace("edu.harvard.hms.dbmi.avillach") + .fields(); + record.requiredInt("patientId"); + record.requiredString("parentConsent"); + record.requiredString("topmedConsent"); + record.requiredString("consents"); + return record.endRecord(); + } + + public void write() throws IOException { + Schema schema = generateSchema(); + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + File file = new File("/tmp/test.avro"); + dataFileWriter.create(schema, file); + GenericRecord row = new GenericData.Record(schema); + row.put("patientId", 1); + row.put("parentConsent", "/abc/123/"); + row.put("topmedConsent", "/def/456/"); + row.put("consents", "phs000001"); + dataFileWriter.append(row); + dataFileWriter.close(); + } + + public static void main(String[] args) throws IOException { + new PFBWriter().write(); + } +} diff --git a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java index 58b81399..4728dbbf 100644 --- a/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java +++ b/service/src/main/java/edu/harvard/hms/dbmi/avillach/hpds/service/PicSureService.java @@ -42,13 +42,15 @@ public class PicSureService implements IResourceRS { @Autowired public PicSureService(QueryService queryService, TimelineProcessor timelineProcessor, CountProcessor countProcessor, - VariantListProcessor variantListProcessor, AbstractProcessor abstractProcessor, Paginator paginator) { + VariantListProcessor variantListProcessor, AbstractProcessor abstractProcessor, Paginator paginator, + AvroQueryProcessor avroQueryProcessor) { this.queryService = queryService; this.timelineProcessor = timelineProcessor; this.countProcessor = countProcessor; this.variantListProcessor = variantListProcessor; this.abstractProcessor = abstractProcessor; this.paginator = paginator; + this.avroQueryProcessor = avroQueryProcessor; Crypto.loadDefaultKey(); } @@ -68,6 +70,8 @@ public PicSureService(QueryService queryService, TimelineProcessor timelineProce private final Paginator paginator; + private final AvroQueryProcessor avroQueryProcessor; + private static final String QUERY_METADATA_FIELD = "queryMetadata"; private static final int RESPONSE_CACHE_SIZE = 50; @@ -338,7 +342,9 @@ private Response _querySync(QueryRequest resultRequest) throws IOException { return queryOkResponse(result.stream, incomingQuery).build(); } return Response.status(400).entity("Status : " + result.status.name()).build(); - + case DATAFRAME_AVRO: + return queryOkResponse(avroQueryProcessor.runQuery(incomingQuery), incomingQuery) + .header(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON).build(); case CROSS_COUNT: return queryOkResponse(countProcessor.runCrossCounts(incomingQuery), incomingQuery) .header(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON).build();