Skip to content

Commit

Permalink
ALS-4978: Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
ramari16 committed Oct 30, 2023
1 parent 16ec41d commit ab4e9a9
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ public class AbstractProcessor {

private final PhenotypeMetaStore phenotypeMetaStore;

private final VariantIndexCache variantIndexCache;

private final GenomicProcessor genomicProcessor;

private final LoadingCache<String, List<String>> infoStoreValuesCache = CacheBuilder.newBuilder().build(new CacheLoader<>() {
Expand Down Expand Up @@ -135,19 +133,16 @@ public AbstractProcessor(
});
}
infoStoreColumns = new ArrayList<>(infoStores.keySet());

variantIndexCache = new VariantIndexCache(variantService.getVariantIndex(), infoStores);
}

public AbstractProcessor(PhenotypeMetaStore phenotypeMetaStore, LoadingCache<String, PhenoCube<?>> store,
Map<String, FileBackedByteIndexedInfoStore> infoStores, List<String> infoStoreColumns,
VariantService variantService, VariantIndexCache variantIndexCache, GenomicProcessor genomicProcessor) {
VariantService variantService, GenomicProcessor genomicProcessor) {
this.phenotypeMetaStore = phenotypeMetaStore;
this.store = store;
this.infoStores = infoStores;
this.infoStoreColumns = infoStoreColumns;
this.variantService = variantService;
this.variantIndexCache = variantIndexCache;
this.genomicProcessor = genomicProcessor;

CACHE_SIZE = Integer.parseInt(System.getProperty("CACHE_SIZE", "100"));
Expand Down Expand Up @@ -188,8 +183,19 @@ protected Set<Integer> applyBooleanLogic(List<Set<Integer>> filteredIdSets) {
* @return
*/
protected Set<Integer> idSetsForEachFilter(Query query) {
DistributableQuery distributableQuery = getDistributableQuery(query);

if (distributableQuery.hasFilters()) {
BigInteger patientMaskForVariantInfoFilters = genomicProcessor.getPatientMaskForVariantInfoFilters(distributableQuery);
return patientMaskToPatientIdSet(patientMaskForVariantInfoFilters);
}

return distributableQuery.getPatientIds();
}

private DistributableQuery getDistributableQuery(Query query) {
DistributableQuery distributableQuery = new DistributableQuery();
List<Set<Integer>> patientIdSets = new ArrayList<>();
List<Set<Integer>> patientIdSets = new ArrayList<>();

try {
query.getAllAnyRecordOf().forEach(anyRecordOfFilterList -> {
Expand All @@ -216,14 +222,8 @@ protected Set<Integer> idSetsForEachFilter(Query query) {
.collect(Collectors.toSet());
}
distributableQuery.setVariantInfoFilters(query.getVariantInfoFilters());
distributableQuery.setPatientIds(phenotypicPatientSet);

if (distributableQuery.hasFilters()) {
BigInteger patientMaskForVariantInfoFilters = genomicProcessor.getPatientMaskForVariantInfoFilters(distributableQuery);
return patientMaskToPatientIdSet(patientMaskForVariantInfoFilters);
}

return phenotypicPatientSet;
distributableQuery.setPatientIds(phenotypicPatientSet);
return distributableQuery;
}

public Set<Integer> patientMaskToPatientIdSet(BigInteger patientMask) {
Expand Down Expand Up @@ -323,7 +323,8 @@ private List<Set<Integer>> getIdSetsForCategoryFilters(Query query, Distributabl
}

protected Collection<String> getVariantList(Query query) throws IOException {
return genomicProcessor.processVariantList(getPatientSubsetForQuery(query), query);
DistributableQuery distributableQuery = getDistributableQuery(query);
return genomicProcessor.processVariantList(distributableQuery);
}

public FileBackedByteIndexedInfoStore getInfoStore(String column) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public interface GenomicProcessor {

BigInteger createMaskForPatientSet(Set<Integer> patientSubset);

Collection<String> processVariantList(Set<Integer> patientSubsetForQuery, Query query);
Collection<String> processVariantList(DistributableQuery distributableQuery);

String[] getPatientIds();
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public GenomicProcessorNodeImpl(String genomicDataDirectory) {
throw new RuntimeException(e);
}
});
} else {
throw new IllegalArgumentException("Not a valid genomicDataDirectory: " + this.genomicDataDirectory);
}
infoStoreColumns = new ArrayList<>(infoStores.keySet());

Expand Down Expand Up @@ -222,7 +224,7 @@ private VariantIndex addVariantsForInfoFilter(VariantIndex unionOfInfoFilters, Q
}

@Override
public Collection<String> processVariantList(Set<Integer> patientSubsetForQuery, Query query) {
public Collection<String> processVariantList(DistributableQuery query) {
boolean queryContainsVariantInfoFilters = query.getVariantInfoFilters().stream().anyMatch(variantInfoFilter ->
!variantInfoFilter.categoryVariantInfoFilters.isEmpty() || !variantInfoFilter.numericVariantInfoFilters.isEmpty()
);
Expand All @@ -245,7 +247,7 @@ public Collection<String> processVariantList(Set<Integer> patientSubsetForQuery,
return Integer.parseInt(id.trim());
})
.collect(Collectors.toList()));
Set<Integer> patientSubset = Sets.intersection(patientSubsetForQuery, allPatients);
Set<Integer> patientSubset = Sets.intersection(query.getPatientIds(), allPatients);
// log.debug("Patient subset " + Arrays.deepToString(patientSubset.toArray()));

// If we have all patients then no variants would be filtered, so no need to do further processing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ public BigInteger createMaskForPatientSet(Set<Integer> patientSubset) {
}

@Override
public Collection<String> processVariantList(Set<Integer> patientSubsetForQuery, Query query) {
public Collection<String> processVariantList(DistributableQuery distributableQuery) {
return nodes.parallelStream().flatMap(node ->
node.processVariantList(patientSubsetForQuery, query).stream()).collect(Collectors.toList()
node.processVariantList(distributableQuery).stream()).collect(Collectors.toList()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private void processColumn(List<String> paths, TreeSet<Integer> ids, ResultStore
Integer x) {
String path = paths.get(x-1);
if(VariantUtils.pathIsVariantSpec(path)) {
// todo: confirm this entire if block is even used. I don't think it is
Optional<VariantMasks> masks = abstractProcessor.getMasks(path, new VariantBucketHolder<>());
String[] patientIds = abstractProcessor.getPatientIds();
int idPointer = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ public class AbstractProcessorTest {
@Mock
private VariantService variantService;

@Mock
private VariantIndexCache variantIndexCache;

@Mock
private GenomicProcessor genomicProcessor;

Expand Down Expand Up @@ -65,14 +62,13 @@ public void setup() {
infoStores,
null,
variantService,
variantIndexCache,
genomicProcessor
);
}

@Test
public void getPatientSubsetForQuery_oneVariantCategoryFilter_indexFound() {
when(variantIndexCache.get(GENE_WITH_VARIANT_KEY, EXAMPLE_GENES_WITH_VARIANT.get(0))).thenReturn(new SparseVariantIndex(Set.of(2, 4, 6)));
//when(variantIndexCache.get(GENE_WITH_VARIANT_KEY, EXAMPLE_GENES_WITH_VARIANT.get(0))).thenReturn(new SparseVariantIndex(Set.of(2, 4, 6)));

ArgumentCaptor<VariantIndex> argumentCaptor = ArgumentCaptor.forClass(VariantIndex.class);
//when(patientVariantJoinHandler.getPatientIdsForIntersectionOfVariantSets(any(), argumentCaptor.capture())).thenReturn(List.of(Set.of(42)));
Expand All @@ -94,8 +90,8 @@ public void getPatientSubsetForQuery_oneVariantCategoryFilter_indexFound() {

@Test
public void getPatientSubsetForQuery_oneVariantCategoryFilterTwoValues_unionFilters() {
when(variantIndexCache.get(GENE_WITH_VARIANT_KEY, EXAMPLE_GENES_WITH_VARIANT.get(0))).thenReturn(new SparseVariantIndex(Set.of(2, 4)));
when(variantIndexCache.get(GENE_WITH_VARIANT_KEY, EXAMPLE_GENES_WITH_VARIANT.get(1))).thenReturn(new SparseVariantIndex(Set.of(6)));
//when(variantIndexCache.get(GENE_WITH_VARIANT_KEY, EXAMPLE_GENES_WITH_VARIANT.get(0))).thenReturn(new SparseVariantIndex(Set.of(2, 4)));
//when(variantIndexCache.get(GENE_WITH_VARIANT_KEY, EXAMPLE_GENES_WITH_VARIANT.get(1))).thenReturn(new SparseVariantIndex(Set.of(6)));

ArgumentCaptor<VariantIndex> argumentCaptor = ArgumentCaptor.forClass(VariantIndex.class);
//when(patientVariantJoinHandler.getPatientIdsForIntersectionOfVariantSets(any(), argumentCaptor.capture())).thenReturn(List.of(Set.of(42)));
Expand All @@ -118,8 +114,8 @@ public void getPatientSubsetForQuery_oneVariantCategoryFilterTwoValues_unionFilt

@Test
public void getPatientSubsetForQuery_twoVariantCategoryFilters_intersectFilters() {
when(variantIndexCache.get(GENE_WITH_VARIANT_KEY, EXAMPLE_GENES_WITH_VARIANT.get(0))).thenReturn(new SparseVariantIndex(Set.of(2, 4, 6)));
when(variantIndexCache.get(VARIANT_SEVERITY_KEY, EXAMPLE_VARIANT_SEVERITIES.get(0))).thenReturn(new SparseVariantIndex(Set.of(4, 5, 6, 7)));
//when(variantIndexCache.get(GENE_WITH_VARIANT_KEY, EXAMPLE_GENES_WITH_VARIANT.get(0))).thenReturn(new SparseVariantIndex(Set.of(2, 4, 6)));
//when(variantIndexCache.get(VARIANT_SEVERITY_KEY, EXAMPLE_VARIANT_SEVERITIES.get(0))).thenReturn(new SparseVariantIndex(Set.of(4, 5, 6, 7)));

ArgumentCaptor<VariantIndex> argumentCaptor = ArgumentCaptor.forClass(VariantIndex.class);
//when(patientVariantJoinHandler.getPatientIdsForIntersectionOfVariantSets(any(), argumentCaptor.capture())).thenReturn(List.of(Set.of(42)));
Expand Down

0 comments on commit ab4e9a9

Please sign in to comment.