Skip to content

Commit

Permalink
ALS-4978: Initial commit of multiple genomic processor. Some tests st…
Browse files Browse the repository at this point in the history
…ill failing
  • Loading branch information
ramari16 committed Oct 23, 2023
1 parent ce53d37 commit 9139708
Show file tree
Hide file tree
Showing 7 changed files with 428 additions and 357 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ public class AbstractProcessor {

private static Logger log = LoggerFactory.getLogger(AbstractProcessor.class);

private final String HOMOZYGOUS_VARIANT = "1/1";
private final String HETEROZYGOUS_VARIANT = "0/1";
private final String HOMOZYGOUS_REFERENCE = "0/0";

private final String ID_CUBE_NAME;
private final int ID_BATCH_SIZE;
private final int CACHE_SIZE;
Expand Down Expand Up @@ -78,20 +74,20 @@ public List<String> load(String conceptPath) {
@Autowired
public AbstractProcessor(
PhenotypeMetaStore phenotypeMetaStore,
VariantService variantService,
GenomicProcessor genomicProcessor
) throws ClassNotFoundException, IOException, InterruptedException {
hpdsDataDirectory = System.getProperty("HPDS_DATA_DIRECTORY", "/opt/local/hpds/");
genomicDataDirectory = System.getProperty("HPDS_GENOMIC_DATA_DIRECTORY", "/opt/local/hpds/all/");

this.phenotypeMetaStore = phenotypeMetaStore;
this.variantService = variantService;
// todo: get rid of this
this.variantService = new VariantService(genomicDataDirectory);
this.genomicProcessor = genomicProcessor;

CACHE_SIZE = Integer.parseInt(System.getProperty("CACHE_SIZE", "100"));
ID_BATCH_SIZE = Integer.parseInt(System.getProperty("ID_BATCH_SIZE", "0"));
ID_CUBE_NAME = System.getProperty("ID_CUBE_NAME", "NONE");

hpdsDataDirectory = System.getProperty("HPDS_DATA_DIRECTORY", "/opt/local/hpds/");
genomicDataDirectory = System.getProperty("HPDS_GENOMIC_DATA_DIRECTORY", "/opt/local/hpds/all/");

store = initializeCache();

if(Crypto.hasKey(Crypto.DEFAULT_KEY_NAME)) {
Expand Down Expand Up @@ -214,7 +210,7 @@ protected Set<Integer> idSetsForEachFilter(Query query) {
} else {
// if there are no patient filters, use all patients.
// todo: we should not have to send these
phenotypicPatientSet = Arrays.stream(variantService.getPatientIds())
phenotypicPatientSet = Arrays.stream(genomicProcessor.getPatientIds())
.map(String::trim)
.map(Integer::parseInt)
.collect(Collectors.toSet());
Expand Down Expand Up @@ -275,11 +271,6 @@ private List<Set<Integer>> getIdSetsForRequiredFields(Query query, Distributable
if(!query.getRequiredFields().isEmpty()) {
return query.getRequiredFields().stream().map(path -> {
if (VariantUtils.pathIsVariantSpec(path)) {
// todo: implement this logic in the genomic nodes
//VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
//TreeSet<Integer> patientsInScope = new TreeSet<>();
//addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1", "1/1"}, path, patientsInScope, bucketCache);
//return patientsInScope;
distributableQuery.addRequiredVariantField(path);
return null;
} else {
Expand All @@ -295,11 +286,6 @@ private Optional<Set<Integer>> getPatientIdsForAnyRecordOf(List<String> anyRecor
// This is an OR aggregation of anyRecordOf filters
Set<Integer> anyRecordOfPatientSet = anyRecordOfFilters.parallelStream().flatMap(path -> {
if (VariantUtils.pathIsVariantSpec(path)) {
// todo: implement this logic in the genomic nodes
/*TreeSet<Integer> patientsInScope = new TreeSet<>();
addIdSetsForVariantSpecCategoryFilters(new String[]{"0/1", "1/1"}, path, patientsInScope, bucketCache);
return patientsInScope.stream();*/
distributableQuery.addRequiredVariantField(path);
throw new IllegalArgumentException("Variant paths not allowed for anyRecordOf queries");
}
return (Stream<Integer>) getCube(path).keyBasedIndex().stream();
Expand All @@ -324,9 +310,6 @@ private List<Set<Integer>> getIdSetsForCategoryFilters(Query query, Distributabl
return query.getCategoryFilters().entrySet().stream().map((entry) -> {
Set<Integer> ids = new TreeSet<>();
if (VariantUtils.pathIsVariantSpec(entry.getKey())) {
// todo: implement this logic in the genomic nodes
//VariantBucketHolder<VariantMasks> bucketCache = new VariantBucketHolder<>();
//addIdSetsForVariantSpecCategoryFilters(categoryFilters, key, ids, bucketCache);
distributableQuery.addVariantSpecCategoryFilter(entry.getKey(), entry.getValue());
} else {
for (String category : entry.getValue()) {
Expand All @@ -339,141 +322,8 @@ private List<Set<Integer>> getIdSetsForCategoryFilters(Query query, Distributabl
return List.of();
}

private void addIdSetsForVariantSpecCategoryFilters(String[] zygosities, String key, Set<Integer> ids, VariantBucketHolder<VariantMasks> bucketCache) {
ArrayList<BigInteger> variantBitmasks = getBitmasksForVariantSpecCategoryFilter(zygosities, key, bucketCache);
if( ! variantBitmasks.isEmpty()) {
BigInteger bitmask = variantBitmasks.get(0);
if(variantBitmasks.size()>1) {
for(int x = 1;x<variantBitmasks.size();x++) {
bitmask = bitmask.or(variantBitmasks.get(x));
}
}
String bitmaskString = bitmask.toString(2);
log.debug("or'd masks : " + bitmaskString);
// TODO : This is much less efficient than using bitmask.testBit(x)
for(int x = 2;x < bitmaskString.length()-2;x++) {
if('1'==bitmaskString.charAt(x)) {
String patientId = variantService.getPatientIds()[x-2];
try{
ids.add(Integer.parseInt(patientId));
}catch(NullPointerException | NoSuchElementException e) {
log.error(ID_CUBE_NAME + " has no value for patientId : " + patientId);
}
}
}
}
}

private ArrayList<BigInteger> getBitmasksForVariantSpecCategoryFilter(String[] zygosities, String variantName, VariantBucketHolder<VariantMasks> bucketCache) {
ArrayList<BigInteger> variantBitmasks = new ArrayList<>();
variantName = variantName.replaceAll(",\\d/\\d$", "");
log.debug("looking up mask for : " + variantName);
VariantMasks masks;
masks = variantService.getMasks(variantName, bucketCache);
Arrays.stream(zygosities).forEach((zygosity) -> {
if(masks!=null) {
if(zygosity.equals(HOMOZYGOUS_REFERENCE)) {
BigInteger homozygousReferenceBitmask = calculateIndiscriminateBitmask(masks);
for(int x = 2;x<homozygousReferenceBitmask.bitLength()-2;x++) {
homozygousReferenceBitmask = homozygousReferenceBitmask.flipBit(x);
}
variantBitmasks.add(homozygousReferenceBitmask);
} else if(masks.heterozygousMask != null && zygosity.equals(HETEROZYGOUS_VARIANT)) {
variantBitmasks.add(masks.heterozygousMask);
}else if(masks.homozygousMask != null && zygosity.equals(HOMOZYGOUS_VARIANT)) {
variantBitmasks.add(masks.homozygousMask);
}else if(zygosity.equals("")) {
variantBitmasks.add(calculateIndiscriminateBitmask(masks));
}
} else {
variantBitmasks.add(variantService.emptyBitmask());
}

});
return variantBitmasks;
}

/**
* Calculate a bitmask which is a bitwise OR of any populated masks in the VariantMasks passed in
* @param masks
* @return
*/
private BigInteger calculateIndiscriminateBitmask(VariantMasks masks) {
BigInteger indiscriminateVariantBitmask = null;
if(masks.heterozygousMask == null && masks.homozygousMask != null) {
indiscriminateVariantBitmask = masks.homozygousMask;
}else if(masks.homozygousMask == null && masks.heterozygousMask != null) {
indiscriminateVariantBitmask = masks.heterozygousMask;
}else if(masks.homozygousMask != null && masks.heterozygousMask != null) {
indiscriminateVariantBitmask = masks.heterozygousMask.or(masks.homozygousMask);
}else {
indiscriminateVariantBitmask = variantService.emptyBitmask();
}
return indiscriminateVariantBitmask;
}

protected Collection<String> getVariantList(Query query) throws IOException {
return processVariantList(query);
}

private Collection<String> processVariantList(Query query) {
boolean queryContainsVariantInfoFilters = query.getVariantInfoFilters().stream().anyMatch(variantInfoFilter ->
!variantInfoFilter.categoryVariantInfoFilters.isEmpty() || !variantInfoFilter.numericVariantInfoFilters.isEmpty()
);
if(queryContainsVariantInfoFilters) {
VariantIndex unionOfInfoFilters = new SparseVariantIndex(Set.of());

// todo: are these not the same thing?
if(query.getVariantInfoFilters().size()>1) {
for(VariantInfoFilter filter : query.getVariantInfoFilters()){
unionOfInfoFilters = genomicProcessor.addVariantsForInfoFilter(unionOfInfoFilters, filter);
//log.info("filter " + filter + " sets: " + Arrays.deepToString(unionOfInfoFilters.toArray()));
}
} else {
unionOfInfoFilters = genomicProcessor.addVariantsForInfoFilter(unionOfInfoFilters, query.getVariantInfoFilters().get(0));
}

Set<Integer> patientSubsetForQuery = getPatientSubsetForQuery(query);
HashSet<Integer> allPatients = new HashSet<>(
Arrays.stream(variantService.getPatientIds())
.map((id) -> {
return Integer.parseInt(id.trim());
})
.collect(Collectors.toList()));
Set<Integer> patientSubset = Sets.intersection(patientSubsetForQuery, allPatients);
// log.debug("Patient subset " + Arrays.deepToString(patientSubset.toArray()));

// If we have all patients then no variants would be filtered, so no need to do further processing
if(patientSubset.size()==variantService.getPatientIds().length) {
log.info("query selects all patient IDs, returning....");
return unionOfInfoFilters.mapToVariantSpec(variantService.getVariantIndex());
}

BigInteger patientMasks = createMaskForPatientSet(patientSubset);

Set<String> unionOfInfoFiltersVariantSpecs = unionOfInfoFilters.mapToVariantSpec(variantService.getVariantIndex());
Collection<String> variantsInScope = variantService.filterVariantSetForPatientSet(unionOfInfoFiltersVariantSpecs, new ArrayList<>(patientSubset));

//NC - this is the original variant filtering, which checks the patient mask from each variant against the patient mask from the query
if(variantsInScope.size()<100000) {
ConcurrentSkipListSet<String> variantsWithPatients = new ConcurrentSkipListSet<String>();
variantsInScope.parallelStream().forEach((String variantKey)->{
VariantMasks masks = variantService.getMasks(variantKey, new VariantBucketHolder<VariantMasks>());
if ( masks.heterozygousMask != null && masks.heterozygousMask.and(patientMasks).bitCount()>4) {
variantsWithPatients.add(variantKey);
} else if ( masks.homozygousMask != null && masks.homozygousMask.and(patientMasks).bitCount()>4) {
variantsWithPatients.add(variantKey);
} else if ( masks.heterozygousNoCallMask != null && masks.heterozygousNoCallMask.and(patientMasks).bitCount()>4) {
//so heterozygous no calls we want, homozygous no calls we don't
variantsWithPatients.add(variantKey);
}
});
return variantsWithPatients;
}else {
return unionOfInfoFiltersVariantSpecs;
}
}
return new ArrayList<>();
return genomicProcessor.processVariantList(getPatientSubsetForQuery(query), query);
}

public FileBackedByteIndexedInfoStore getInfoStore(String column) {
Expand Down Expand Up @@ -574,7 +424,7 @@ public TreeMap<String, ColumnMeta> getDictionary() {
}

public String[] getPatientIds() {
return variantService.getPatientIds();
return genomicProcessor.getPatientIds();
}

public VariantMasks getMasks(String path, VariantBucketHolder<VariantMasks> variantMasksVariantBucketHolder) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,20 @@ public class DistributableQuery {

private List<Query.VariantInfoFilter> variantInfoFilters = new ArrayList<>();

private Map<String, String[]> categoryFilters = new HashMap<>();
private final Map<String, String[]> categoryFilters = new HashMap<>();

private List<String> requiredFields = new ArrayList<>();

private List<String> anyRecordOfFields = new ArrayList<>();
private final Set<String> requiredFields = new HashSet<>();

private Set<Integer> patientIds;


public void addRequiredVariantField(String path) {
requiredFields.add(path);
}
public List<String> getRequiredFields() {
public Set<String> getRequiredFields() {
return requiredFields;
}

public void addAnyRecordOfField(String path) {
anyRecordOfFields.add(path);
}

public List<String> getAnyRecordOfFields() {
return anyRecordOfFields;
}

public void addVariantSpecCategoryFilter(String key, String[] values) {
categoryFilters.put(key, values);
}
Expand Down
Loading

0 comments on commit 9139708

Please sign in to comment.