Skip to content

Commit

Permalink
Merge pull request #2472 from opencb/TASK-6345-3.2.0
Browse files Browse the repository at this point in the history
TASK-6345 - Cohort and sample double association is missing
  • Loading branch information
pfurio authored Jul 11, 2024
2 parents ab5d880 + 916c735 commit bb6689e
Show file tree
Hide file tree
Showing 8 changed files with 182 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package org.opencb.opencga.app.migrations.v2.v2_12_6;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Projections;
import com.mongodb.client.model.Updates;
import org.apache.commons.collections4.CollectionUtils;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.opencb.opencga.catalog.db.api.CohortDBAdaptor;
import org.opencb.opencga.catalog.db.api.SampleDBAdaptor;
import org.opencb.opencga.catalog.db.mongodb.MongoDBAdaptor;
import org.opencb.opencga.catalog.db.mongodb.OrganizationMongoDBAdaptorFactory;
import org.opencb.opencga.catalog.migration.Migration;
import org.opencb.opencga.catalog.migration.MigrationTool;

import java.util.List;
import java.util.stream.Collectors;

@Migration(id = "syncCohortsAndSamplesMigration" ,
description = "Sync array of samples from cohort with array of cohortIds from Sample",
version = "2.12.6",
domain = Migration.MigrationDomain.CATALOG,
language = Migration.MigrationLanguage.JAVA,
date = 20240621
)
public class SyncCohortsAndSamplesMigration extends MigrationTool {

@Override
protected void run() throws Exception {
MongoCollection<Document> sampleCollection = getMongoCollection(OrganizationMongoDBAdaptorFactory.SAMPLE_COLLECTION);
MongoCollection<Document> sampleArchiveCollection = getMongoCollection(OrganizationMongoDBAdaptorFactory.SAMPLE_ARCHIVE_COLLECTION);

queryMongo(OrganizationMongoDBAdaptorFactory.COHORT_COLLECTION, new Document(),
Projections.include(CohortDBAdaptor.QueryParams.ID.key(), CohortDBAdaptor.QueryParams.SAMPLES.key()),
cohortDoc -> {
String cohortId = cohortDoc.getString(CohortDBAdaptor.QueryParams.ID.key());
List<Document> samples = cohortDoc.getList(CohortDBAdaptor.QueryParams.SAMPLES.key(), Document.class);
if (CollectionUtils.isNotEmpty(samples)) {
List<Long> sampleUids = samples
.stream()
.map(s -> s.get(SampleDBAdaptor.QueryParams.UID.key(), Number.class).longValue())
.collect(Collectors.toList());
// Ensure all those samples have a reference to the cohortId
Bson query = Filters.and(
Filters.in(SampleDBAdaptor.QueryParams.UID.key(), sampleUids),
Filters.eq(MongoDBAdaptor.LAST_OF_VERSION, true)
);
Bson update = Updates.addToSet(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), cohortId);
long addedMissingCohort = sampleCollection.updateMany(query, update).getModifiedCount();
sampleArchiveCollection.updateMany(query, update);

// Ensure there aren't any samples pointing to this cohort that are not in the samples array
query = Filters.and(
Filters.nin(SampleDBAdaptor.QueryParams.UID.key(), sampleUids),
Filters.eq(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), cohortId),
Filters.eq(MongoDBAdaptor.LAST_OF_VERSION, true)
);
update = Updates.pull(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), cohortId);
long removedNonAssociatedCohort = sampleCollection.updateMany(query, update).getModifiedCount();
sampleArchiveCollection.updateMany(query, update);

if (addedMissingCohort > 0 || removedNonAssociatedCohort > 0) {
logger.info("Fixed cohort '{}' references. "
+ "Added missing reference to {} samples. "
+ "Removed non-associated reference from {} samples.",
cohortId, addedMissingCohort, removedNonAssociatedCohort);
}
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -235,22 +235,21 @@ public OpenCGAResult update(long cohortId, ObjectMap parameters, QueryOptions qu
@Override
public OpenCGAResult update(long cohortUid, ObjectMap parameters, List<VariableSet> variableSetList, QueryOptions queryOptions)
throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException {
Query query = new Query(QueryParams.UID.key(), cohortUid);
QueryOptions options = new QueryOptions(QueryOptions.INCLUDE,
Arrays.asList(QueryParams.ID.key(), QueryParams.UID.key(), QueryParams.STUDY_UID.key(),
QueryParams.SAMPLES.key() + "." + QueryParams.ID.key()));
OpenCGAResult<Cohort> documentResult = get(query, options);
if (documentResult.getNumResults() == 0) {
throw new CatalogDBException("Could not update cohort. Cohort uid '" + cohortUid + "' not found.");
}
String cohortId = documentResult.first().getId();

try {
return runTransaction(clientSession -> transactionalUpdate(clientSession, documentResult.first(), parameters, variableSetList,
queryOptions));
} catch (CatalogDBException e) {
logger.error("Could not update cohort {}: {}", cohortId, e.getMessage(), e);
throw new CatalogDBException("Could not update cohort " + cohortId + ": " + e.getMessage(), e.getCause());
return runTransaction(clientSession -> {
Query query = new Query(QueryParams.UID.key(), cohortUid);
QueryOptions options = new QueryOptions(QueryOptions.INCLUDE,
Arrays.asList(QueryParams.ID.key(), QueryParams.UID.key(), QueryParams.STUDY_UID.key(),
QueryParams.SAMPLES.key() + "." + QueryParams.ID.key()));
OpenCGAResult<Cohort> documentResult = get(clientSession, query, options);
if (documentResult.getNumResults() == 0) {
throw new CatalogDBException("Could not update cohort. Cohort uid '" + cohortUid + "' not found.");
}
return transactionalUpdate(clientSession, documentResult.first(), parameters, variableSetList, queryOptions);
});
} catch (Exception e) {
logger.error("Could not update cohort {}: {}", cohortUid, e.getMessage(), e);
throw new CatalogDBException("Could not update cohort " + cohortUid + ": " + e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ private void validateProjectForCreation(String organizationId, Project project)
}
try {
CellBaseConfiguration cellBaseConfiguration = ParamUtils.defaultObject(project.getCellbase(),
new CellBaseConfiguration(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION));
new CellBaseConfiguration(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION,
ParamConstants.CELLBASE_DATA_RELEASE, ParamConstants.CELLBASE_APIKEY));
cellBaseConfiguration = CellBaseValidator.validate(cellBaseConfiguration, project.getOrganism().getScientificName(),
project.getOrganism().getAssembly(), true);
project.setCellbase(cellBaseConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package org.opencb.opencga.catalog.managers;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand Down Expand Up @@ -66,6 +68,9 @@
import javax.naming.NamingException;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.allOf;
Expand Down Expand Up @@ -1770,6 +1775,88 @@ public void updateSampleCohortTest() throws Exception {
}
}

@Test
public void updateSampleCohortWithThreadsTest() throws Exception {
Sample sampleId1 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_1"), INCLUDE_RESULT, ownerToken).first();
Sample sampleId2 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_2"), INCLUDE_RESULT, ownerToken).first();
Sample sampleId3 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_3"), INCLUDE_RESULT, ownerToken).first();
catalogManager.getCohortManager().create(studyFqn, new Cohort().setId("MyCohort1")
.setSamples(Arrays.asList(sampleId1)), null, ownerToken).first();
catalogManager.getCohortManager().create(studyFqn, new Cohort().setId("MyCohort2")
.setSamples(Arrays.asList(sampleId2, sampleId3)), null, ownerToken).first();

ExecutorService executorService = Executors.newFixedThreadPool(10,
new ThreadFactoryBuilder()
.setNameFormat("executor-service-%d")
.build());

StopWatch stopWatch = StopWatch.createStarted();
List<List<String>> sampleIds = new ArrayList<>(5);
List<String> innerArray = new ArrayList<>(50);
for (int i = 0; i < 250; i++) {
if (i % 50 == 0) {
System.out.println("i = " + i);
}

String sampleId = "SAMPLE_AUTO_" + i;
executorService.submit(() -> {
try {
catalogManager.getSampleManager().create(studyFqn, new Sample().setId(sampleId), QueryOptions.empty(), ownerToken);
} catch (CatalogException e) {
throw new RuntimeException(e);
}
});
if (innerArray.size() == 50) {
sampleIds.add(new ArrayList<>(innerArray));
innerArray.clear();
}
innerArray.add(sampleId);
}
sampleIds.add(new ArrayList<>(innerArray));
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);

System.out.println("Creating 250 samples took " + stopWatch.getTime(TimeUnit.SECONDS) + " seconds");

stopWatch.stop();
stopWatch.reset();
stopWatch.start();
executorService = Executors.newFixedThreadPool(3);
int execution = 0;
Map<String, Object> actionMap = new HashMap<>();
actionMap.put(CohortDBAdaptor.QueryParams.SAMPLES.key(), ParamUtils.BasicUpdateAction.SET);
QueryOptions queryOptions = new QueryOptions();
queryOptions.put(Constants.ACTIONS, actionMap);
for (List<String> innerSampleIds : sampleIds) {
Cohort myCohort1 = catalogManager.getCohortManager().get(studyFqn, "MyCohort1", null, ownerToken).first();
List<SampleReferenceParam> sampleReferenceParamList = new ArrayList<>(myCohort1.getNumSamples() + innerSampleIds.size());
sampleReferenceParamList.addAll(myCohort1.getSamples().stream().map(s -> new SampleReferenceParam().setId(s.getId())).collect(Collectors.toList()));
sampleReferenceParamList.addAll(innerSampleIds.stream().map(s -> new SampleReferenceParam().setId(s)).collect(Collectors.toList()));
int executionId = execution++;
executorService.submit(() -> {
try {
catalogManager.getCohortManager().update(studyFqn, "MyCohort1",
new CohortUpdateParams().setSamples(sampleReferenceParamList),
queryOptions, ownerToken);
System.out.println("Execution: " + executionId);
} catch (CatalogException e) {
throw new RuntimeException(e);
}
});
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("Attaching 250 samples took " + stopWatch.getTime(TimeUnit.SECONDS) + " seconds");

// Ensure persistence
Query sampleQuery = new Query(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), "MyCohort1");
OpenCGAResult<Sample> search = catalogManager.getSampleManager().search(studyFqn, sampleQuery, SampleManager.INCLUDE_SAMPLE_IDS, ownerToken);
Cohort myCohort1 = catalogManager.getCohortManager().get(studyFqn, "MyCohort1", null, ownerToken).first();
assertEquals(search.getNumResults(), myCohort1.getNumSamples());
Set<String> sampleIdSet = search.getResults().stream().map(Sample::getId).collect(Collectors.toSet());
assertTrue(myCohort1.getSamples().stream().map(Sample::getId).collect(Collectors.toSet()).containsAll(sampleIdSet));
}

@Test
public void deleteSampleCohortTest() throws Exception {
Sample sampleId1 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_1"), INCLUDE_RESULT, ownerToken).first();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public class ParamConstants {
private static final String UP_TO_100 = " up to a maximum of 100";

public static final String CELLBASE_URL = "https://ws.zettagenomics.com/cellbase";
public static final String CELLBASE_VERSION = "v5.2";
public static final String CELLBASE_DATA_RELEASE = "3";
public static final String CELLBASE_VERSION = "v5.8";
public static final String CELLBASE_DATA_RELEASE = "7";
public static final String CELLBASE_APIKEY = "";

public static final String POP_FREQ_1000G_CB_V4 = "1kG_phase3";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public class CellBaseConfiguration {
private String apiKey;

public CellBaseConfiguration() {
this(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION);
this(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION, ParamConstants.CELLBASE_DATA_RELEASE,
ParamConstants.CELLBASE_APIKEY);
}

public CellBaseConfiguration(String url, String version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ cellbase:
## URL host to annotate variants, for example: https://uk.ws.zettagenomics.com/cellbase/
url: "${OPENCGA.CELLBASE.REST.HOST}"
version: "${OPENCGA.CELLBASE.VERSION}"
dataRelease: "2"
dataRelease: "7"

## Storage Query Server configuration. When CLI is launched in 'server' mode a RESTful web server
## is launched in the specified port.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1429,7 +1429,7 @@

<!-- cellbase -->
<OPENCGA.CELLBASE.REST.HOST>https://uk.ws.zettagenomics.com/cellbase/</OPENCGA.CELLBASE.REST.HOST>
<OPENCGA.CELLBASE.VERSION>v5.2</OPENCGA.CELLBASE.VERSION>
<OPENCGA.CELLBASE.VERSION>v5.8</OPENCGA.CELLBASE.VERSION>
</properties>
</profile>

Expand Down

0 comments on commit bb6689e

Please sign in to comment.