Skip to content

Commit

Permalink
this will end poorly, but maybe it will be fast af
Browse files Browse the repository at this point in the history
  • Loading branch information
Luke-Sikina committed Jul 20, 2023
1 parent 39f2320 commit c489fa2
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 13 deletions.
7 changes: 6 additions & 1 deletion processing/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
<dependency>
<groupId>edu.harvard.hms.dbmi.avillach.hpds</groupId>
<artifactId>client-api</artifactId>
</dependency>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package edu.harvard.hms.dbmi.avillach.hpds.processing;import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;



public class ByteArrayFactory extends BasePooledObjectFactory<byte[]> {

private final int size;

public ByteArrayFactory(int size) {
this.size = size;
}

@Override
public byte[] create() {
return new byte[1];
}

/**
* Use the default PooledObject implementation.
*/
@Override
public PooledObject<byte[]> wrap(byte[] buffer) {
return new DefaultPooledObject<byte[]>(buffer);
}

/**
* When an object is returned to the pool, clear the buffer.
*/
@Override
public void passivateObject(PooledObject<byte[]> pooledObject) {
// pooledObject.getObject().setLength(0);
// noop
}

// for all other methods, the no-op implementation
// in BasePooledObjectFactory will suffice
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.stream.Collectors;

import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.ColumnMeta;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -18,7 +19,6 @@
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.KeyAndValue;
import edu.harvard.hms.dbmi.avillach.hpds.data.phenotype.PhenoCube;
import edu.harvard.hms.dbmi.avillach.hpds.data.query.Query;
import edu.harvard.hms.dbmi.avillach.hpds.exception.NotEnoughMemoryException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -56,24 +56,41 @@ public String[] getHeaderRow(Query query) {
public void runQuery(Query query, AsyncResult result) {
TreeSet<Integer> idList = abstractProcessor.getPatientSubsetForQuery(query);
log.info("Processing " + idList.size() + " rows for result " + result.id);
for(List<Integer> list : Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE)){
result.stream.appendResultStore(buildResult(result, query, new TreeSet<Integer>(list)));
};
}


private ResultStore buildResult(AsyncResult result, Query query, TreeSet<Integer> ids) {
List<ColumnMeta> columns = query.getFields().stream()
.map(abstractProcessor.getDictionary()::get)
.filter(Objects::nonNull)
.collect(Collectors.toList());

List<String> paths = columns.stream().map(ColumnMeta::getName).collect(Collectors.toList());
int columnCount = paths.size() + 1;
List<String> paths = columns.stream()
.map(ColumnMeta::getName)
.collect(Collectors.toList());

GenericObjectPool<byte[]> pool = new GenericObjectPool<>(new ByteArrayFactory(ID_BATCH_SIZE * Integer.BYTES));

for(List<Integer> list : Lists.partition(new ArrayList<>(idList), ID_BATCH_SIZE)){
TreeSet<Integer> tList = new TreeSet<>(list);
if (tList.size() == ID_BATCH_SIZE) {
byte[] bytes;
try {
bytes = pool.borrowObject();
result.stream.appendResultStore(buildResult(columns, paths, result, tList, bytes));
pool.returnObject(bytes);
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
byte[] bytes = new byte[Integer.BYTES * tList.size()];
result.stream.appendResultStore(buildResult(columns, paths, result, tList, bytes));
}
};
}


private ResultStore buildResult(List<ColumnMeta> columns, List<String> paths, AsyncResult result, TreeSet<Integer> ids, byte[] resultArray) {
int columnCount = paths.size() + 1;
ArrayList<Integer> columnIndex = abstractProcessor.useResidentCubesFirst(paths, columnCount);
ResultStore results = new ResultStore(result.id, columns, ids);
ResultStore results = new ResultStore(result.id, columns, ids, resultArray);

columnIndex.parallelStream().forEach((column)->{
clearColumn(paths, ids, results, column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class ResultStore {
* @param ids The subject ids for in the current batch of the result
* @throws NotEnoughMemoryException If the size of available heap cannot support a byte array of size (rowWidth x numRows)
*/
public ResultStore(String resultId, List<ColumnMeta> columns, TreeSet<Integer> ids) throws NotEnoughMemoryException {
public ResultStore(String resultId, List<ColumnMeta> columns, TreeSet<Integer> ids, byte[] resultArray) throws NotEnoughMemoryException {
this.columns = new ArrayList<ColumnMeta>();
this.numRows = ids.size();
this.getColumns().add(PATIENT_ID_COLUMN_META);
Expand All @@ -77,7 +77,8 @@ public ResultStore(String resultId, List<ColumnMeta> columns, TreeSet<Integer> i
}
try {
log.info("Allocating result array : " + resultId);
resultArray = new byte[this.rowWidth * this.getNumRows()];
// new byte[this.rowWidth * this.getNumRows()];
this.resultArray = resultArray;
} catch(Error e) {
throw new NotEnoughMemoryException();
}
Expand Down

0 comments on commit c489fa2

Please sign in to comment.