Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK-6201 - Outdated variant stats of running operations are not returned #2445

Merged
merged 1 commit into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opencb.cellbase.core.config.SpeciesProperties;
import org.opencb.cellbase.core.models.DataRelease;
import org.opencb.cellbase.core.result.CellBaseDataResponse;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.commons.utils.VersionUtils;

Expand Down Expand Up @@ -307,16 +308,30 @@ private static String majorMinor(String version) {
public String getVersionFromServer() throws IOException {
if (serverVersion == null) {
synchronized (this) {
String serverVersion = cellBaseClient.getMetaClient().about().firstResult().getString("Version");
ObjectMap result = retryMetaAbout(3);
if (result == null) {
throw new IOException("Unable to get version from server for cellbase " + toString());
}
String serverVersion = result.getString("Version");
if (StringUtils.isEmpty(serverVersion)) {
serverVersion = cellBaseClient.getMetaClient().about().firstResult().getString("Version: ");
serverVersion = result.getString("Version: ");
}
this.serverVersion = serverVersion;
}
}
return serverVersion;
}

private ObjectMap retryMetaAbout(int retries) throws IOException {
ObjectMap result = cellBaseClient.getMetaClient().about().firstResult();
if (result == null && retries > 0) {
// Retry
logger.warn("Unable to get version from server for cellbase " + toString() + ". Retrying...");
result = retryMetaAbout(retries - 1);
}
return result;
}

public boolean isMinVersion(String minVersion) throws IOException {
String serverVersion = getVersionFromServer();
return VersionUtils.isMinVersion(minVersion, serverVersion);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1135,9 +1135,14 @@ public Iterable<CohortMetadata> getInvalidCohorts(int studyId) {
return () -> Iterators.filter(cohortIterator(studyId), CohortMetadata::isInvalid);
}

public Iterable<CohortMetadata> getCalculatedOrInvalidCohorts(int studyId) {
public Iterable<CohortMetadata> getCalculatedOrPartialCohorts(int studyId) {
return () -> Iterators.filter(cohortIterator(studyId),
cohortMetadata -> cohortMetadata.isStatsReady() || cohortMetadata.isInvalid());
cohortMetadata -> {
TaskMetadata.Status status = cohortMetadata.getStatsStatus();
return status == TaskMetadata.Status.READY
|| status == TaskMetadata.Status.RUNNING
|| status == TaskMetadata.Status.ERROR;
});
}

public CohortMetadata setSamplesToCohort(int studyId, String cohortName, Collection<Integer> samples) throws StorageEngineException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opencb.opencga.storage.core.metadata.models.CohortMetadata;
import org.opencb.opencga.storage.core.metadata.models.FileMetadata;
import org.opencb.opencga.storage.core.metadata.models.StudyMetadata;
import org.opencb.opencga.storage.core.metadata.models.TaskMetadata;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.opencb.opencga.storage.core.variant.adaptors.VariantField;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryException;
Expand Down Expand Up @@ -134,9 +135,10 @@ public VariantQueryProjection parseVariantQueryProjection(Query query, QueryOpti
for (VariantQueryProjection.StudyVariantQueryProjection study : studies.values()) {
int studyId = study.getId();
List<Integer> cohorts = new LinkedList<>();
for (CohortMetadata cohort : metadataManager.getCalculatedOrInvalidCohorts(studyId)) {
for (CohortMetadata cohort : metadataManager.getCalculatedOrPartialCohorts(studyId)) {
cohorts.add(cohort.getId());
if (cohort.isInvalid()) {
TaskMetadata.Status status = cohort.getStatsStatus();
if (status == TaskMetadata.Status.ERROR) {
String message = "Please note that the Cohort Stats for "
+ "'" + study.getName() + ":" + cohort.getName() + "' are currently outdated.";
int numSampmles = cohort.getSamples().size();
Expand All @@ -147,6 +149,10 @@ public VariantQueryProjection parseVariantQueryProjection(Query query, QueryOpti
}
message += " To display updated statistics, please execute variant-stats-index.";
events.add(new Event(Event.Type.WARNING, message));
} else if (status == TaskMetadata.Status.RUNNING) {
String message = "Please note that the Cohort Stats for "
+ "'" + study.getName() + ":" + cohort.getName() + "' are currently being calculated.";
events.add(new Event(Event.Type.WARNING, message));
}
}
study.setCohorts(cohorts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ protected VariantStatsDBWriter newVariantStatisticsDBWriter(VariantDBAdaptor dbA
//
// }

@Deprecated
void checkAndUpdateCalculatedCohorts(StudyMetadata studyMetadata, URI uri, boolean updateStats)
throws IOException, StorageEngineException {
Set<String> cohortNames = readCohortsFromStatsFile(uri);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public void postCalculateStats(
}
}

@Deprecated
public static void checkAndUpdateCalculatedCohorts(
VariantStorageMetadataManager metadataManager, StudyMetadata studyMetadata, Collection<String> cohorts, boolean updateStats)
throws StorageEngineException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.junit.*;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.opencb.biodata.models.variant.Genotype;
import org.opencb.biodata.models.variant.StudyEntry;
import org.opencb.biodata.models.variant.Variant;
Expand All @@ -33,7 +34,9 @@
import org.opencb.opencga.storage.core.metadata.models.CohortMetadata;
import org.opencb.opencga.storage.core.metadata.models.SampleMetadata;
import org.opencb.opencga.storage.core.metadata.models.StudyMetadata;
import org.opencb.opencga.storage.core.metadata.models.TaskMetadata;
import org.opencb.opencga.storage.core.variant.VariantStorageBaseTest;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.opencb.opencga.storage.core.variant.VariantStorageEngineTest;
import org.opencb.opencga.storage.core.variant.VariantStorageOptions;
import org.opencb.opencga.storage.core.variant.adaptors.VariantDBAdaptor;
Expand All @@ -49,8 +52,7 @@
import java.util.*;
import java.util.stream.Collectors;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

/**
* Created by hpccoll1 on 01/06/15.
Expand Down Expand Up @@ -123,32 +125,76 @@ public void queryInvalidStats() throws Exception {
Iterator<SampleMetadata> iterator = metadataManager.sampleMetadataIterator(studyMetadata.getId());

/** Create cohorts **/
HashSet<String> cohort1 = new HashSet<>();
cohort1.add(iterator.next().getName());
cohort1.add(iterator.next().getName());
HashSet<String> cohort1Samples = new HashSet<>();
cohort1Samples.add(iterator.next().getName());
cohort1Samples.add(iterator.next().getName());

HashSet<String> cohort2 = new HashSet<>();
cohort2.add(iterator.next().getName());
cohort2.add(iterator.next().getName());
HashSet<String> cohort2Samples = new HashSet<>();
cohort2Samples.add(iterator.next().getName());
cohort2Samples.add(iterator.next().getName());

Map<String, Set<String>> cohorts = new HashMap<>();
cohorts.put("cohort1", cohort1);
cohorts.put("cohort2", cohort2);
cohorts.put("cohort1", cohort1Samples);
cohorts.put("cohort2", cohort2Samples);

// Just cohort ALL is expected
VariantQueryResult<Variant> result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
assertEquals(1, result.first().getStudies().get(0).getStats().size());
assertEquals(0, result.getEvents().size());

metadataManager.registerCohort(studyMetadata.getName(), "cohort1", cohort1Samples);

// Still just cohort ALL is expected, as cohort1 is not ready nor partial
result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
assertEquals(1, result.first().getStudies().get(0).getStats().size());
assertEquals(0, result.getEvents().size());

//Calculate stats
stats(options, studyMetadata.getName(), cohorts, outputUri.resolve("cohort1.cohort2.stats"));

checkCohorts(dbAdaptor, studyMetadata);

List<Integer> cohort1Samples = metadataManager.getCohortMetadata(studyMetadata.getId(), "cohort1").getSamples();
CohortMetadata cohort = metadataManager.addSamplesToCohort(studyMetadata.getId(), "cohort2", cohort1Samples);
assertTrue(cohort.isInvalid());
// All 3 cohorts are ready and expected
result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
assertEquals(3, result.first().getStudies().get(0).getStats().size());
assertEquals(0, result.getEvents().size());

VariantQueryResult<Variant> result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
List<Integer> cohort1SampleIds = metadataManager.getCohortMetadata(studyMetadata.getId(), "cohort1").getSamples();
CohortMetadata cohort2 = metadataManager.addSamplesToCohort(studyMetadata.getId(), "cohort2", cohort1SampleIds);
assertTrue(cohort2.isInvalid());

// Cohort2 is invalid, but still all cohorts are expected, but with a warning event
result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
assertEquals(3, result.first().getStudies().get(0).getStats().size());
assertEquals(1, result.getEvents().size());
assertEquals("Please note that the Cohort Stats for '1000g:cohort2' are currently outdated." +
" The statistics have been calculated with 2 samples, while the total number of samples in the cohort is 4." +
" To display updated statistics, please execute variant-stats-index.", result.getEvents().get(0).getMessage());

VariantStorageEngine engineMock = Mockito.spy(variantStorageEngine);
VariantStatisticsManager statsManagerMock = Mockito.spy(variantStorageEngine.newVariantStatisticsManager());
Mockito.doReturn(statsManagerMock).when(engineMock).newVariantStatisticsManager();
Mockito.doAnswer(invocation -> {
invocation.callRealMethod();
throw new StorageEngineException("Mock error calculating stats");
}).when(statsManagerMock).preCalculateStats(Mockito.any(), Mockito.any(), Mockito.anyList(), Mockito.anyBoolean(), Mockito.any());

options.put(DefaultVariantStatisticsManager.OUTPUT, outputUri.resolve("stats_mock_fail").toString());
try {
engineMock.calculateStats(studyMetadata.getName(), Collections.singletonList(cohort2.getName()), options);
fail("Expected to fail mock");
} catch (Exception e) {
assertEquals("Mock error calculating stats", e.getMessage());
}

cohort2 = metadataManager.getCohortMetadata(studyMetadata.getId(), cohort2.getName());
assertEquals(TaskMetadata.Status.RUNNING, cohort2.getStatsStatus());

result = variantStorageEngine.get(new Query(), new QueryOptions(QueryOptions.LIMIT, 1));
assertEquals(3, result.first().getStudies().get(0).getStats().size());
assertEquals(1, result.getEvents().size());
assertEquals("Please note that the Cohort Stats for '1000g:cohort2' are currently being calculated.",
result.getEvents().get(0).getMessage());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void registerStudyColumns(int studyId) throws StorageEngineException {
registerNewFiles(studyId, new ArrayList<>(metadataManager.getIndexedFiles(studyId)));

List<Integer> cohortIds = new LinkedList<>();
for (CohortMetadata cohort : metadataManager.getCalculatedOrInvalidCohorts(studyId)) {
for (CohortMetadata cohort : metadataManager.getCalculatedOrPartialCohorts(studyId)) {
cohortIds.add(cohort.getId());
}
registerNewCohorts(studyId, cohortIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Scan configureScan(Scan scan, VariantStorageMetadataManager metadataManag
scan.addColumn(GenomeHelper.COLUMN_FAMILY_BYTES, INDEX_STUDIES.bytes());
for (Integer studyId : metadataManager.getStudyIds()) {
scan.addColumn(GenomeHelper.COLUMN_FAMILY_BYTES, VariantPhoenixSchema.getStudyColumn(studyId).bytes());
for (CohortMetadata cohort : metadataManager.getCalculatedOrInvalidCohorts(studyId)) {
for (CohortMetadata cohort : metadataManager.getCalculatedOrPartialCohorts(studyId)) {
scan.addColumn(GenomeHelper.COLUMN_FAMILY_BYTES, VariantPhoenixSchema.getStatsColumn(studyId, cohort.getId()).bytes());
}
}
Expand Down
Loading