Skip to content

Commit

Permalink
ALS-4979: Initial commit for avro proof of concept. In progress
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Sep 13, 2023
1 parent a67d8fe commit 0047619
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,9 @@ public enum ResultType {
* is suitable to time series analysis and/or loading into another
* instance of HPDS.
*/
DATAFRAME_TIMESERIES
DATAFRAME_TIMESERIES,
/**
* This exports a dataframe in PFB avro fromat
*/
DATAFRAME_AVRO
}
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,11 @@
<artifactId>spring-test</artifactId>
<version>4.3.30.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.2</version>
</dependency>

</dependencies>
</dependencyManagement>
Expand Down
4 changes: 4 additions & 0 deletions processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,9 @@
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.VariantMasks;
import edu.harvard.hms.dbmi.avillach.hpds.data.genotype.caching.VariantBucketHolder;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.TreeSet;
import java.util.stream.Collectors;

/**
* This class handles Avro export queries for HPDS.
* @author ramari16
*
*/
@Component
public class AvroQueryProcessor {

private static final byte[] EMPTY_STRING_BYTES = "".getBytes();
private Logger log = LoggerFactory.getLogger(AvroQueryProcessor.class);

private final String ID_CUBE_NAME;
private final int ID_BATCH_SIZE;

private final AbstractProcessor abstractProcessor;

@Autowired
public AvroQueryProcessor(AbstractProcessor abstractProcessor) {
this.abstractProcessor = abstractProcessor;
ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0"));
ID_CUBE_NAME = System.getProperty("ID_CUBE_NAME", "NONE");
}

public String[] getHeaderRow(Query query) {
String[] header = new String[query.getFields().size()+1];
header[0] = "Patient ID";
System.arraycopy(query.getFields().toArray(), 0, header, 1, query.getFields().size());
return header;
}

public Schema generateSchema(Query query) {
SchemaBuilder.FieldAssembler<Schema> record = SchemaBuilder.record("pfb")
.namespace("edu.harvard.hms.dbmi.avillach")
.fields();
query.getFields().stream()
.map(abstractProcessor.getDictionary()::get)
.filter(Objects::nonNull)
.forEach(columnMeta -> {
if (columnMeta.isCategorical()) {
record.optionalString(getValidAvroName(columnMeta.getName()));
} else {
record.optionalDouble(getValidAvroName(columnMeta.getName()));
}
});
record.requiredInt("patientId");
return record.endRecord();
}

private String getValidAvroName(String string) {
return string.replaceAll("\\\\", "_").replaceAll(" ", "_");
}

public String runQuery(Query query) throws IOException {
TreeSet<Integer> idList = abstractProcessor.getPatientSubsetForQuery(query);
log.info("Processing " + idList.size() + " rows for result");
Schema schema = generateSchema(query);
GenericRecord[] genericRecords = buildResult(query, idList, schema);


DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);

File file = new File("/tmp/test.avro");
dataFileWriter.create(schema, file);
for (GenericRecord record : genericRecords) {
dataFileWriter.append(record);
}
dataFileWriter.close();
return Files.readString(Path.of("/tmp/test.avro"), StandardCharsets.ISO_8859_1);
}


private GenericRecord[] buildResult(Query query, TreeSet<Integer> ids, Schema schema) {
List<ColumnMeta> columns = query.getFields().stream()
.map(abstractProcessor.getDictionary()::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());
List<String> paths = columns.stream()
.map(ColumnMeta::getName)
.collect(Collectors.toList());
int columnCount = paths.size() + 1;

ArrayList<Integer> columnIndex = abstractProcessor.useResidentCubesFirst(paths, columnCount);

GenericRecord[] allRecords = ids.stream().map(id -> {
GenericData.Record record = new GenericData.Record(schema);
record.put("patientId", id);
return record;
}).toArray(GenericRecord[]::new);

Integer[] patientIds = ids.toArray(new Integer[]{});

columnIndex.parallelStream().forEach((column)->{
// todo: this is only pheno fields right now
PhenoCube<?> cube = abstractProcessor.getCube(paths.get(column - 1));

KeyAndValue<?>[] cubeValues = cube.sortedByKey();

int cubeIdPointer = 0;
for (int patientRow = 0; patientRow < ids.size(); patientRow++) {
int patientId = patientIds[patientRow];
while(cubeIdPointer < cubeValues.length) {
KeyAndValue<?> cubeKeyValue = cubeValues[cubeIdPointer];
int key = cubeKeyValue.getKey();
if(key < patientId) {
cubeIdPointer++;
} else if(key == patientId){
Comparable<?> value = cubeKeyValue.getValue();
GenericRecord patientRecord = allRecords[patientRow];
if(cube.isStringType()) {
patientRecord.put(getValidAvroName(columns.get(column - 1).getName()), value.toString());
}else {
patientRecord.put(getValidAvroName(columns.get(column - 1).getName()), (Double)value);
}
cubeIdPointer++;
break;
} else {
// no value found
break;
}
}
}
});

return allRecords;
}

private void clearColumn(List<String> paths, TreeSet<Integer> ids, ResultStore results, Integer x) {
String path = paths.get(x-1);
if(VariantUtils.pathIsVariantSpec(path)) {
ByteBuffer doubleBuffer = ByteBuffer.allocate(Double.BYTES);
int idInSubsetPointer = 0;
for(int id : ids) {
writeVariantNullResultField(results, x, doubleBuffer, idInSubsetPointer);
idInSubsetPointer++;
}
} else {
PhenoCube<?> cube = abstractProcessor.getCube(path);
ByteBuffer doubleBuffer = ByteBuffer.allocate(Double.BYTES);
int idInSubsetPointer = 0;
for(int id : ids) {
writeNullResultField(results, x, cube, doubleBuffer, idInSubsetPointer);
idInSubsetPointer++;
}
}
}

private void processColumn(List<String> paths, TreeSet<Integer> ids, ResultStore results,
Integer x) {
String path = paths.get(x-1);
if(VariantUtils.pathIsVariantSpec(path)) {
VariantMasks masks = abstractProcessor.getMasks(path, new VariantBucketHolder<VariantMasks>());
String[] patientIds = abstractProcessor.getPatientIds();
int idPointer = 0;

ByteBuffer doubleBuffer = ByteBuffer.allocate(Double.BYTES);
int idInSubsetPointer = 0;
for(int id : ids) {
while(idPointer < patientIds.length) {
int key = Integer.parseInt(patientIds[idPointer]);
if(key < id) {
idPointer++;
} else if(key == id){
idPointer = writeVariantResultField(results, x, masks, idPointer, doubleBuffer,
idInSubsetPointer);
break;
} else {
writeVariantNullResultField(results, x, doubleBuffer, idInSubsetPointer);
break;
}
}
idInSubsetPointer++;
}
}else {
PhenoCube<?> cube = abstractProcessor.getCube(path);

KeyAndValue<?>[] cubeValues = cube.sortedByKey();

int idPointer = 0;

ByteBuffer doubleBuffer = ByteBuffer.allocate(Double.BYTES);
int idInSubsetPointer = 0;
for(int id : ids) {
while(idPointer < cubeValues.length) {
int key = cubeValues[idPointer].getKey();
if(key < id) {
idPointer++;
} else if(key == id){
idPointer = writeResultField(results, x, cube, cubeValues, idPointer, doubleBuffer,
idInSubsetPointer);
break;
} else {
writeNullResultField(results, x, cube, doubleBuffer, idInSubsetPointer);
break;
}
}
idInSubsetPointer++;
}
}
}

private void writeVariantNullResultField(ResultStore results, Integer x, ByteBuffer doubleBuffer,
int idInSubsetPointer) {
byte[] valueBuffer = null;
valueBuffer = EMPTY_STRING_BYTES;
results.writeField(x,idInSubsetPointer, valueBuffer);
}

private int writeVariantResultField(ResultStore results, Integer x, VariantMasks masks, int idPointer,
ByteBuffer doubleBuffer, int idInSubsetPointer) {
byte[] valueBuffer;
if(masks.heterozygousMask != null && masks.heterozygousMask.testBit(idPointer)) {
valueBuffer = "0/1".getBytes();
}else if(masks.homozygousMask != null && masks.homozygousMask.testBit(idPointer)) {
valueBuffer = "1/1".getBytes();
}else {
valueBuffer = "0/0".getBytes();
}
valueBuffer = masks.toString().getBytes();
results.writeField(x,idInSubsetPointer, valueBuffer);
return idPointer;
}

private int writeResultField(ResultStore results, Integer x, PhenoCube<?> cube, KeyAndValue<?>[] cubeValues,
int idPointer, ByteBuffer doubleBuffer, int idInSubsetPointer) {
byte[] valueBuffer;
Comparable<?> value = cubeValues[idPointer++].getValue();
if(cube.isStringType()) {
valueBuffer = value.toString().getBytes();
}else {
valueBuffer = doubleBuffer.putDouble((Double)value).array();
doubleBuffer.clear();
}
results.writeField(x,idInSubsetPointer, valueBuffer);
return idPointer;
}

/**
* Correctly handle null records. A numerical value should be a NaN if it is missing to distinguish from a zero. A
* String based value(categorical) should be empty instead of null because the word null might be a valid value.
*
* @param results
* @param x
* @param cube
* @param doubleBuffer
* @param idInSubsetPointer
*/
private void writeNullResultField(ResultStore results, Integer x, PhenoCube<?> cube, ByteBuffer doubleBuffer, int idInSubsetPointer) {
byte[] valueBuffer = null;
if(cube.isStringType()) {
valueBuffer = EMPTY_STRING_BYTES;
}else {
Double nullDouble = Double.NaN;
valueBuffer = doubleBuffer.putDouble(nullDouble).array();
doubleBuffer.clear();
}
results.writeField(x,idInSubsetPointer, valueBuffer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;

import java.io.File;
import java.io.IOException;

public class PFBWriter {

public Schema generateSchema() {
SchemaBuilder.FieldAssembler<Schema> record = SchemaBuilder.record("pfb")
.namespace("edu.harvard.hms.dbmi.avillach")
.fields();
record.requiredInt("patientId");
record.requiredString("parentConsent");
record.requiredString("topmedConsent");
record.requiredString("consents");
return record.endRecord();
}

public void write() throws IOException {
Schema schema = generateSchema();
DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter);
File file = new File("/tmp/test.avro");
dataFileWriter.create(schema, file);
GenericRecord row = new GenericData.Record(schema);
row.put("patientId", 1);
row.put("parentConsent", "/abc/123/");
row.put("topmedConsent", "/def/456/");
row.put("consents", "phs000001");
dataFileWriter.append(row);
dataFileWriter.close();
}

public static void main(String[] args) throws IOException {
new PFBWriter().write();
}
}
Loading

0 comments on commit 0047619

Please sign in to comment.