Skip to content

Commit

Permalink
ALS-6511: Update pfb result to handle multiple values per variable
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Aug 6, 2024
1 parent 741aed8 commit 75e2448
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ public AsyncResult(Query query, HpdsProcessor processor, ResultWriter writer) {
public void appendResults(List<String[]> dataEntries) {
stream.appendResults(dataEntries);
}
public void appendMultiValueResults(List<List<List<String>>> dataEntries) {
stream.appendMultiValueResults(dataEntries);
}

public void appendResultStore(ResultStore resultStore) {
stream.appendResultStore(resultStore);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Component
public class PfbProcessor implements HpdsProcessor {
Expand Down Expand Up @@ -44,23 +45,24 @@ public void runQuery(Query query, AsyncResult result) {
log.info("Processing " + idList.size() + " rows for result " + result.getId());
Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE).stream()
.forEach(patientIds -> {
Map<String, Map<Integer, String>> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds));
List<String[]> fieldValuesPerPatient = patientIds.stream().map(patientId -> {
return Arrays.stream(getHeaderRow(query)).map(field -> {
Map<String, Map<Integer, List<String>>> pathToPatientToValueMap = buildResult(result, query, new TreeSet<>(patientIds));
List<List<List<String>>> fieldValuesPerPatient = patientIds.stream().map(patientId -> {
List<List<String>> objectStream = Arrays.stream(getHeaderRow(query)).map(field -> {
if (PATIENT_ID_FIELD_NAME.equals(field)) {
return patientId.toString();
return List.of(patientId.toString());
} else {
return pathToPatientToValueMap.get(field).get(patientId);
}
}).toArray(String[]::new);
}).collect(Collectors.toList());
return objectStream;
}).collect(Collectors.toList());
result.appendResults(fieldValuesPerPatient);
result.appendMultiValueResults(fieldValuesPerPatient);
});
result.closeWriter();
}

private Map<String, Map<Integer, String>> buildResult(AsyncResult result, Query query, TreeSet<Integer> ids) {
ConcurrentHashMap<String, Map<Integer, String>> pathToPatientToValueMap = new ConcurrentHashMap<>();
private Map<String, Map<Integer, List<String>>> buildResult(AsyncResult result, Query query, TreeSet<Integer> ids) {
ConcurrentHashMap<String, Map<Integer, List<String>>> pathToPatientToValueMap = new ConcurrentHashMap<>();
List<ColumnMeta> columns = query.getFields().stream()
.map(abstractProcessor.getDictionary()::get)
.filter(Objects::nonNull)
Expand All @@ -76,16 +78,16 @@ private Map<String, Map<Integer, String>> buildResult(AsyncResult result, Query
// todo: investigate if the parallel stream will thrash the cache if the number of executors is > number of resident cubes
columnIndex.parallelStream().forEach((columnId)->{
String columnPath = paths.get(columnId-1);
Map<Integer, String> patientIdToValueMap = processColumn(ids, columnPath);
Map<Integer, List<String>> patientIdToValueMap = processColumn(ids, columnPath);
pathToPatientToValueMap.put(columnPath, patientIdToValueMap);
});

return pathToPatientToValueMap;
}

private Map<Integer, String> processColumn(TreeSet<Integer> patientIds, String path) {
private Map<Integer, List<String>> processColumn(TreeSet<Integer> patientIds, String path) {

Map<Integer, String> patientIdToValueMap = new HashMap<>();
Map<Integer, List<String>> patientIdToValueMap = new HashMap<>();
PhenoCube<?> cube = abstractProcessor.getCube(path);

KeyAndValue<?>[] cubeValues = cube.sortedByKey();
Expand All @@ -98,9 +100,7 @@ private Map<Integer, String> processColumn(TreeSet<Integer> patientIds, String p
idPointer++;
} else if(key == patientId){
String value = getResultField(cube, cubeValues, idPointer);
patientIdToValueMap.put(patientId, value);
idPointer++;
break;
patientIdToValueMap.computeIfAbsent(patientId, k -> new ArrayList<>()).add(value);
} else {
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,21 @@ public void appendResultStore(ResultStore results) {
}
writeResultsToTempFile(results, batchSize, entries);
}

/**
* A more compact method to append data to the temp file without making assumptions about the composition.
* @param entries
*/
public void appendResults(List<String[]> entries) {
writer.writeEntity(entries);
}
/**
* A more compact method to append data to the temp file without making assumptions about the composition.
* @param entries
*/
public void appendMultiValueResults(List<List<List<String>>> entries) {
writer.writeMultiValueEntity(entries);
}

private List<String[]> writeResultsToTempFile(ResultStore results, int batchSize,
List<String[]> entries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ public void writeEntity(Collection<String[]> data) {
}
}

@Override
public void writeMultiValueEntity(Collection<List<List<String>>> data) {
throw new RuntimeException("Method not implemented");
}

@Override
public File getFile() {
return file;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void writeHeader(String[] data) {
SchemaBuilder.FieldAssembler<Schema> patientRecords = SchemaBuilder.record("patientData")
.fields();

fields.forEach(field -> patientRecords.nullableString(field, "null"));
fields.forEach(field -> patientRecords.name(field).type(SchemaBuilder.array().items(SchemaBuilder.nullable().stringType())).noDefault());
patientDataSchema = patientRecords.endRecord();

Schema objectSchema = Schema.createUnion(metadataSchema, patientDataSchema);
Expand Down Expand Up @@ -124,7 +124,8 @@ public void writeEntity(Collection<String[]> entities) {
}
GenericRecord patientData = new GenericData.Record(patientDataSchema);
for(int i = 0; i < fields.size(); i++) {
patientData.put(fields.get(i), entity[i]);
List<String> fieldValue = entity[i] != null ? List.of(entity[i]) : List.of();
patientData.put(fields.get(i), fieldValue);
}

GenericRecord entityRecord = new GenericData.Record(entitySchema);
Expand All @@ -140,6 +141,32 @@ public void writeEntity(Collection<String[]> entities) {
});
}

@Override
public void writeMultiValueEntity(Collection<List<List<String>>> entities) {
entities.forEach(entity -> {
if (entity.size() != fields.size()) {
throw new IllegalArgumentException("Entity length much match the number of fields in this document");
}
GenericRecord patientData = new GenericData.Record(patientDataSchema);
for(int i = 0; i < fields.size(); i++) {
List<String> fieldValue = entity.get(i) != null ? entity.get(i) : List.of();
patientData.put(fields.get(i), fieldValue);
}


GenericRecord entityRecord = new GenericData.Record(entitySchema);
entityRecord.put("object", patientData);
entityRecord.put("name", "patientData");
entityRecord.put("id", "192035");

try {
dataFileWriter.append(entityRecord);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
}

@Override
public void close() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;

public interface ResultWriter {
void writeHeader(String[] data);

void writeEntity(Collection<String[]> data);
void writeMultiValueEntity(Collection<List<List<String>>> data);

File getFile();

Expand Down

0 comments on commit 75e2448

Please sign in to comment.