Skip to content

Commit

Permalink
Merge branch 'ALS-6511' into ALS-6511-ALS-6330
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Aug 7, 2024
2 parents 2d7fb5b + 93630c3 commit d8997a4
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private Map<Integer, List<String>> processColumn(TreeSet<Integer> patientIds, St
} else if(key == patientId){
String value = getResultField(cube, cubeValues, idPointer);
patientIdToValueMap.computeIfAbsent(patientId, k -> new ArrayList<>()).add(value);
idPointer++;
} else {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class PfbWriter implements ResultWriter {
private Schema entitySchema;
private Schema patientDataSchema;

private static final Set<String> SINGULAR_FIELDS = Set.of("patient_id");

public PfbWriter(File tempFile) {
file = tempFile;
entityFieldAssembler = SchemaBuilder.record("entity")
Expand All @@ -58,7 +60,14 @@ public void writeHeader(String[] data) {
SchemaBuilder.FieldAssembler<Schema> patientRecords = SchemaBuilder.record("patientData")
.fields();

fields.forEach(field -> patientRecords.name(field).type(SchemaBuilder.array().items(SchemaBuilder.nullable().stringType())).noDefault());
fields.forEach(field -> {
if (isSingularField(field)) {
patientRecords.nullableString(field, "null");
} else {
patientRecords.name(field).type(SchemaBuilder.array().items(SchemaBuilder.nullable().stringType())).noDefault();
}

});
patientDataSchema = patientRecords.endRecord();

Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema);
Expand All @@ -81,6 +90,10 @@ public void writeHeader(String[] data) {
writeMetadata();
}

private boolean isSingularField(String field) {
return SINGULAR_FIELDS.contains(field);
}

protected String formatFieldName(String s) {
String formattedFieldName = s.replaceAll("\\W", "_");
if (Character.isDigit(formattedFieldName.charAt(0))) {
Expand Down Expand Up @@ -118,27 +131,7 @@ private void writeMetadata() {

@Override
public void writeEntity(Collection<String[]> entities) {
entities.forEach(entity -> {
if (entity.length != fields.size()) {
throw new IllegalArgumentException("Entity length much match the number of fields in this document");
}
GenericRecord patientData = new GenericData.Record(patientDataSchema);
for(int i = 0; i < fields.size(); i++) {
List<String> fieldValue = entity[i] != null ? List.of(entity[i]) : List.of();
patientData.put(fields.get(i), fieldValue);
}

GenericRecord entityRecord = new GenericData.Record(entitySchema);
entityRecord.put("object", patientData);
entityRecord.put("name", "patientData");
entityRecord.put("id", "192035");

try {
dataFileWriter.append(entityRecord);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
throw new RuntimeException("Method not supported, use writeMultiValueEntity instead");
}

@Override
Expand All @@ -149,8 +142,13 @@ public void writeMultiValueEntity(Collection<List<List<String>>> entities) {
}
GenericRecord patientData = new GenericData.Record(patientDataSchema);
for(int i = 0; i < fields.size(); i++) {
List<String> fieldValue = entity.get(i) != null ? entity.get(i) : List.of();
patientData.put(fields.get(i), fieldValue);
if (isSingularField(fields.get(i))) {
String entityValue = (entity.get(i) != null && !entity.get(i).isEmpty()) ? entity.get(i).get(0) : "";
patientData.put(fields.get(i), entityValue);
} else {
List<String> fieldValue = entity.get(i) != null ? entity.get(i) : List.of();
patientData.put(fields.get(i), fieldValue);
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ public class PfbWriterTest {
public void writeValidPFB() {
PfbWriter pfbWriter = new PfbWriter(new File("target/test-result.avro"));

pfbWriter.writeHeader(new String[] {"\\demographics\\age\\", "\\phs123\\stroke\\"});
pfbWriter.writeEntity(List.of(new String[]{"80", "Y"},
new String[]{"70", "N"},
new String[]{"75", null}
pfbWriter.writeHeader(new String[] {"patient_id", "\\demographics\\age\\", "\\phs123\\stroke\\"});
pfbWriter.writeMultiValueEntity(List.of(
List.of(List.of("123"), List.of("80"), List.of("Y")),
List.of(List.of("456"), List.of("70"),List.of("N", "Y")),
List.of(List.of("789"), List.of("75"), List.of())
));
pfbWriter.close();
// todo: validate this programatically
Expand Down

0 comments on commit d8997a4

Please sign in to comment.