Skip to content

Commit

Permalink
Merge pull request #2445 from opencb/TASK-6201
Browse files Browse the repository at this point in the history
TASK-6201 - Outdated variant stats of running operations are not returned
  • Loading branch information
j-coll authored May 9, 2024
2 parents 240be48 + faffa3f commit 9cd6177
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opencb.cellbase.core.config.SpeciesProperties;
import org.opencb.cellbase.core.models.DataRelease;
import org.opencb.cellbase.core.result.CellBaseDataResponse;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.commons.utils.VersionUtils;

Expand Down Expand Up @@ -307,16 +308,30 @@ private static String majorMinor(String version) {
public String getVersionFromServer() throws IOException {
if (serverVersion == null) {
synchronized (this) {
String serverVersion = cellBaseClient.getMetaClient().about().firstResult().getString("Version");
ObjectMap result = retryMetaAbout(3);
if (result == null) {
throw new IOException("Unable to get version from server for cellbase " + toString());
}
String serverVersion = result.getString("Version");
if (StringUtils.isEmpty(serverVersion)) {
serverVersion = cellBaseClient.getMetaClient().about().firstResult().getString("Version: ");
serverVersion = result.getString("Version: ");
}
this.serverVersion = serverVersion;
}
}
return serverVersion;
}

private ObjectMap retryMetaAbout(int retries) throws IOException {
ObjectMap result = cellBaseClient.getMetaClient().about().firstResult();
if (result == null && retries > 0) {
// Retry
logger.warn("Unable to get version from server for cellbase " + toString() + ". Retrying...");
result = retryMetaAbout(retries - 1);
}
return result;
}

public boolean isMinVersion(String minVersion) throws IOException {
String serverVersion = getVersionFromServer();
return VersionUtils.isMinVersion(minVersion, serverVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1135,9 +1135,14 @@ public Iterable<CohortMetadata> getInvalidCohorts(int studyId) {
return () -> Iterators.filter(cohortIterator(studyId), CohortMetadata::isInvalid);
}

public Iterable<CohortMetadata> getCalculatedOrInvalidCohorts(int studyId) {
public Iterable<CohortMetadata> getCalculatedOrPartialCohorts(int studyId) {
return () -> Iterators.filter(cohortIterator(studyId),
cohortMetadata -> cohortMetadata.isStatsReady() || cohortMetadata.isInvalid());
cohortMetadata -> {
TaskMetadata.Status status = cohortMetadata.getStatsStatus();
return status == TaskMetadata.Status.READY
|| status == TaskMetadata.Status.RUNNING
|| status == TaskMetadata.Status.ERROR;
});
}

public CohortMetadata setSamplesToCohort(int studyId, String cohortName, Collection<Integer> samples) throws StorageEngineException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opencb.opencga.storage.core.metadata.models.CohortMetadata;
import org.opencb.opencga.storage.core.metadata.models.FileMetadata;
import org.opencb.opencga.storage.core.metadata.models.StudyMetadata;
import org.opencb.opencga.storage.core.metadata.models.TaskMetadata;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.opencb.opencga.storage.core.variant.adaptors.VariantField;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryException;
Expand Down Expand Up @@ -134,9 +135,10 @@ public VariantQueryProjection parseVariantQueryProjection(Query query, QueryOpti
for (VariantQueryProjection.StudyVariantQueryProjection study : studies.values()) {
int studyId = study.getId();
List<Integer> cohorts = new LinkedList<>();
for (CohortMetadata cohort : metadataManager.getCalculatedOrInvalidCohorts(studyId)) {
for (CohortMetadata cohort : metadataManager.getCalculatedOrPartialCohorts(studyId)) {
cohorts.add(cohort.getId());
if (cohort.isInvalid()) {
TaskMetadata.Status status = cohort.getStatsStatus();
if (status == TaskMetadata.Status.ERROR) {
String message = "Please note that the Cohort Stats for "
+ "'" + study.getName() + ":" + cohort.getName() + "' are currently outdated.";
int numSampmles = cohort.getSamples().size();
Expand All @@ -147,6 +149,10 @@ public VariantQueryProjection parseVariantQueryProjection(Query query, QueryOpti
}
message += " To display updated statistics, please execute variant-stats-index.";
events.add(new Event(Event.Type.WARNING, message));
} else if (status == TaskMetadata.Status.RUNNING) {
String message = "Please note that the Cohort Stats for "
+ "'" + study.getName() + ":" + cohort.getName() + "' are currently being calculated.";
events.add(new Event(Event.Type.WARNING, message));
}
}
study.setCohorts(cohorts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ protected VariantStatsDBWriter newVariantStatisticsDBWriter(VariantDBAdaptor dbA
//
// }

@Deprecated
void checkAndUpdateCalculatedCohorts(StudyMetadata studyMetadata, URI uri, boolean updateStats)
throws IOException, StorageEngineException {
Set<String> cohortNames = readCohortsFromStatsFile(uri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void postCalculateStats(
}
}

@Deprecated
public static void checkAndUpdateCalculatedCohorts(
VariantStorageMetadataManager metadataManager, StudyMetadata studyMetadata, Collection<String> cohorts, boolean updateStats)
throws StorageEngineException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.junit.*;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.opencb.biodata.models.variant.Genotype;
import org.opencb.biodata.models.variant.StudyEntry;
import org.opencb.biodata.models.variant.Variant;
Expand All @@ -33,7 +34,9 @@
import org.opencb.opencga.storage.core.metadata.models.CohortMetadata;
import org.opencb.opencga.storage.core.metadata.models.SampleMetadata;
import org.opencb.opencga.storage.core.metadata.models.StudyMetadata;
import org.opencb.opencga.storage.core.metadata.models.TaskMetadata;
import org.opencb.opencga.storage.core.variant.VariantStorageBaseTest;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.opencb.opencga.storage.core.variant.VariantStorageEngineTest;
import org.opencb.opencga.storage.core.variant.VariantStorageOptions;
import org.opencb.opencga.storage.core.variant.adaptors.VariantDBAdaptor;
Expand All @@ -49,8 +52,7 @@
import java.util.*;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

/**
* Created by hpccoll1 on 01/06/15.
Expand Down Expand Up @@ -123,32 +125,76 @@ public void queryInvalidStats() throws Exception {
Iterator<SampleMetadata> iterator = metadataManager.sampleMetadataIterator(studyMetadata.getId());

/** Create cohorts **/
HashSet<String> cohort1 = new HashSet<>();
cohort1.add(iterator.next().getName());
cohort1.add(iterator.next().getName());
HashSet<String> cohort1Samples = new HashSet<>();
cohort1Samples.add(iterator.next().getName());
cohort1Samples.add(iterator.next().getName());

HashSet<String> cohort2 = new HashSet<>();
cohort2.add(iterator.next().getName());
cohort2.add(iterator.next().getName());
HashSet<String> cohort2Samples = new HashSet<>();
cohort2Samples.add(iterator.next().getName());
cohort2Samples.add(iterator.next().getName());

Map<String, Set<String>> cohorts = new HashMap<>();
cohorts.put("cohort1", cohort1);
cohorts.put("cohort2", cohort2);
cohorts.put("cohort1", cohort1Samples);
cohorts.put("cohort2", cohort2Samples);

// Just cohort ALL is expected
VariantQueryResult<Variant> result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
assertEquals(1, result.first().getStudies().get(0).getStats().size());
assertEquals(0, result.getEvents().size());

metadataManager.registerCohort(studyMetadata.getName(), "cohort1", cohort1Samples);

// Still just cohort ALL is expected, as cohort1 is not ready nor partial
result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
assertEquals(1, result.first().getStudies().get(0).getStats().size());
assertEquals(0, result.getEvents().size());

//Calculate stats
stats(options, studyMetadata.getName(), cohorts, outputUri.resolve("cohort1.cohort2.stats"));

checkCohorts(dbAdaptor, studyMetadata);

List<Integer> cohort1Samples = metadataManager.getCohortMetadata(studyMetadata.getId(), "cohort1").getSamples();
CohortMetadata cohort = metadataManager.addSamplesToCohort(studyMetadata.getId(), "cohort2", cohort1Samples);
assertTrue(cohort.isInvalid());
// All 3 cohorts are ready and expected
result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
assertEquals(3, result.first().getStudies().get(0).getStats().size());
assertEquals(0, result.getEvents().size());

VariantQueryResult<Variant> result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
List<Integer> cohort1SampleIds = metadataManager.getCohortMetadata(studyMetadata.getId(), "cohort1").getSamples();
CohortMetadata cohort2 = metadataManager.addSamplesToCohort(studyMetadata.getId(), "cohort2", cohort1SampleIds);
assertTrue(cohort2.isInvalid());

// Cohort2 is invalid, but still all cohorts are expected, but with a warning event
result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
assertEquals(3, result.first().getStudies().get(0).getStats().size());
assertEquals(1, result.getEvents().size());
assertEquals("Please note that the Cohort Stats for '1000g:cohort2' are currently outdated." +
" The statistics have been calculated with 2 samples, while the total number of samples in the cohort is 4." +
" To display updated statistics, please execute variant-stats-index.", result.getEvents().get(0).getMessage());

VariantStorageEngine engineMock = Mockito.spy(variantStorageEngine);
VariantStatisticsManager statsManagerMock = Mockito.spy(variantStorageEngine.newVariantStatisticsManager());
Mockito.doReturn(statsManagerMock).when(engineMock).newVariantStatisticsManager();
Mockito.doAnswer(invocation -> {
invocation.callRealMethod();
throw new StorageEngineException("Mock error calculating stats");
}).when(statsManagerMock).preCalculateStats(Mockito.any(), Mockito.any(), Mockito.anyList(), Mockito.anyBoolean(), Mockito.any());

options.put(DefaultVariantStatisticsManager.OUTPUT, outputUri.resolve("stats_mock_fail").toString());
try {
engineMock.calculateStats(studyMetadata.getName(), Collections.singletonList(cohort2.getName()), options);
fail("Expected to fail mock");
} catch (Exception e) {
assertEquals("Mock error calculating stats", e.getMessage());
}

cohort2 = metadataManager.getCohortMetadata(studyMetadata.getId(), cohort2.getName());
assertEquals(TaskMetadata.Status.RUNNING, cohort2.getStatsStatus());

result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
assertEquals(3, result.first().getStudies().get(0).getStats().size());
assertEquals(1, result.getEvents().size());
assertEquals("Please note that the Cohort Stats for '1000g:cohort2' are currently being calculated.",
result.getEvents().get(0).getMessage());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void registerStudyColumns(int studyId) throws StorageEngineException {
registerNewFiles(studyId, new ArrayList<>(metadataManager.getIndexedFiles(studyId)));

List<Integer> cohortIds = new LinkedList<>();
for (CohortMetadata cohort : metadataManager.getCalculatedOrInvalidCohorts(studyId)) {
for (CohortMetadata cohort : metadataManager.getCalculatedOrPartialCohorts(studyId)) {
cohortIds.add(cohort.getId());
}
registerNewCohorts(studyId, cohortIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Scan configureScan(Scan scan, VariantStorageMetadataManager metadataManag
scan.addColumn(GenomeHelper.COLUMN_FAMILY_BYTES, INDEX_STUDIES.bytes());
for (Integer studyId : metadataManager.getStudyIds()) {
scan.addColumn(GenomeHelper.COLUMN_FAMILY_BYTES, VariantPhoenixSchema.getStudyColumn(studyId).bytes());
for (CohortMetadata cohort : metadataManager.getCalculatedOrInvalidCohorts(studyId)) {
for (CohortMetadata cohort : metadataManager.getCalculatedOrPartialCohorts(studyId)) {
scan.addColumn(GenomeHelper.COLUMN_FAMILY_BYTES, VariantPhoenixSchema.getStatsColumn(studyId, cohort.getId()).bytes());
}
}
Expand Down

0 comments on commit 9cd6177

Please sign in to comment.