diff --git a/opencga-app/src/main/java/org/opencb/opencga/app/migrations/v2_12_6/SyncCohortsAndSamplesMigration.java b/opencga-app/src/main/java/org/opencb/opencga/app/migrations/v2_12_6/SyncCohortsAndSamplesMigration.java new file mode 100644 index 00000000000..edfdbb7c828 --- /dev/null +++ b/opencga-app/src/main/java/org/opencb/opencga/app/migrations/v2_12_6/SyncCohortsAndSamplesMigration.java @@ -0,0 +1,73 @@ +package org.opencb.opencga.app.migrations.v2_12_6; + +import com.mongodb.client.MongoCollection; +import com.mongodb.client.model.Filters; +import com.mongodb.client.model.Projections; +import com.mongodb.client.model.Updates; +import org.apache.commons.collections4.CollectionUtils; +import org.bson.Document; +import org.bson.conversions.Bson; +import org.opencb.opencga.catalog.db.api.CohortDBAdaptor; +import org.opencb.opencga.catalog.db.api.SampleDBAdaptor; +import org.opencb.opencga.catalog.db.mongodb.MongoDBAdaptor; +import org.opencb.opencga.catalog.db.mongodb.MongoDBAdaptorFactory; +import org.opencb.opencga.catalog.migration.Migration; +import org.opencb.opencga.catalog.migration.MigrationTool; + +import java.util.List; +import java.util.stream.Collectors; + +@Migration(id = "syncCohortsAndSamplesMigration" , + description = "Sync array of samples from cohort with array of cohortIds from Sample", + version = "2.12.6", + domain = Migration.MigrationDomain.CATALOG, + language = Migration.MigrationLanguage.JAVA, + date = 20240621 +) +public class SyncCohortsAndSamplesMigration extends MigrationTool { + + @Override + protected void run() throws Exception { + MongoCollection sampleCollection = getMongoCollection(MongoDBAdaptorFactory.SAMPLE_COLLECTION); + MongoCollection sampleArchiveCollection = getMongoCollection(MongoDBAdaptorFactory.SAMPLE_ARCHIVE_COLLECTION); + + queryMongo(MongoDBAdaptorFactory.COHORT_COLLECTION, new Document(), + Projections.include(CohortDBAdaptor.QueryParams.ID.key(), CohortDBAdaptor.QueryParams.SAMPLES.key()), + cohortDoc -> { + String cohortId = cohortDoc.getString(CohortDBAdaptor.QueryParams.ID.key()); + List samples = cohortDoc.getList(CohortDBAdaptor.QueryParams.SAMPLES.key(), Document.class); + if (CollectionUtils.isNotEmpty(samples)) { + List sampleUids = samples + .stream() + .map(s -> s.get(SampleDBAdaptor.QueryParams.UID.key(), Number.class).longValue()) + .collect(Collectors.toList()); + // Ensure all those samples have a reference to the cohortId + Bson query = Filters.and( + Filters.in(SampleDBAdaptor.QueryParams.UID.key(), sampleUids), + Filters.eq(MongoDBAdaptor.LAST_OF_VERSION, true) + ); + Bson update = Updates.addToSet(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), cohortId); + long addedMissingCohort = sampleCollection.updateMany(query, update).getModifiedCount(); + sampleArchiveCollection.updateMany(query, update); + + // Ensure there aren't any samples pointing to this cohort that are not in the samples array + query = Filters.and( + Filters.nin(SampleDBAdaptor.QueryParams.UID.key(), sampleUids), + Filters.eq(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), cohortId), + Filters.eq(MongoDBAdaptor.LAST_OF_VERSION, true) + ); + update = Updates.pull(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), cohortId); + long removedNonAssociatedCohort = sampleCollection.updateMany(query, update).getModifiedCount(); + sampleArchiveCollection.updateMany(query, update); + + if (addedMissingCohort > 0 || removedNonAssociatedCohort > 0) { + logger.info("Fixed cohort '{}' references. " + + "Added missing reference to {} samples. " + + "Removed non-associated reference from {} samples.", + cohortId, addedMissingCohort, removedNonAssociatedCohort); + } + } + }); + } + +} diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/CohortMongoDBAdaptor.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/CohortMongoDBAdaptor.java index 4a1977e1433..18189eb56d6 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/CohortMongoDBAdaptor.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/db/mongodb/CohortMongoDBAdaptor.java @@ -235,22 +235,21 @@ public OpenCGAResult update(long cohortId, ObjectMap parameters, QueryOptions qu @Override public OpenCGAResult update(long cohortUid, ObjectMap parameters, List variableSetList, QueryOptions queryOptions) throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException { - Query query = new Query(QueryParams.UID.key(), cohortUid); - QueryOptions options = new QueryOptions(QueryOptions.INCLUDE, - Arrays.asList(QueryParams.ID.key(), QueryParams.UID.key(), QueryParams.STUDY_UID.key(), - QueryParams.SAMPLES.key() + "." + QueryParams.ID.key())); - OpenCGAResult documentResult = get(query, options); - if (documentResult.getNumResults() == 0) { - throw new CatalogDBException("Could not update cohort. Cohort uid '" + cohortUid + "' not found."); - } - String cohortId = documentResult.first().getId(); - try { - return runTransaction(clientSession -> transactionalUpdate(clientSession, documentResult.first(), parameters, variableSetList, - queryOptions)); - } catch (CatalogDBException e) { - logger.error("Could not update cohort {}: {}", cohortId, e.getMessage(), e); - throw new CatalogDBException("Could not update cohort " + cohortId + ": " + e.getMessage(), e.getCause()); + return runTransaction(clientSession -> { + Query query = new Query(QueryParams.UID.key(), cohortUid); + QueryOptions options = new QueryOptions(QueryOptions.INCLUDE, + Arrays.asList(QueryParams.ID.key(), QueryParams.UID.key(), QueryParams.STUDY_UID.key(), + QueryParams.SAMPLES.key() + "." + QueryParams.ID.key())); + OpenCGAResult documentResult = get(clientSession, query, options); + if (documentResult.getNumResults() == 0) { + throw new CatalogDBException("Could not update cohort. Cohort uid '" + cohortUid + "' not found."); + } + return transactionalUpdate(clientSession, documentResult.first(), parameters, variableSetList, queryOptions); + }); + } catch (Exception e) { + logger.error("Could not update cohort {}: {}", cohortUid, e.getMessage(), e); + throw new CatalogDBException("Could not update cohort " + cohortUid + ": " + e.getMessage(), e); } } diff --git a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/ProjectManager.java b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/ProjectManager.java index 7900d958a42..ca0ddeb625c 100644 --- a/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/ProjectManager.java +++ b/opencga-catalog/src/main/java/org/opencb/opencga/catalog/managers/ProjectManager.java @@ -320,7 +320,8 @@ private void validateProjectForCreation(Project project, User user) throws Catal } try { CellBaseConfiguration cellBaseConfiguration = ParamUtils.defaultObject(project.getCellbase(), - new CellBaseConfiguration(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION)); + new CellBaseConfiguration(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION, + ParamConstants.CELLBASE_DATA_RELEASE, ParamConstants.CELLBASE_APIKEY)); cellBaseConfiguration = CellBaseValidator.validate(cellBaseConfiguration, project.getOrganism().getScientificName(), project.getOrganism().getAssembly(), true); diff --git a/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerTest.java b/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerTest.java index 2b403314f4a..890c74709de 100644 --- a/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerTest.java +++ b/opencga-catalog/src/test/java/org/opencb/opencga/catalog/managers/CatalogManagerTest.java @@ -16,9 +16,11 @@ package org.opencb.opencga.catalog.managers; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.mongodb.BasicDBObject; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.StopWatch; import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -57,6 +59,9 @@ import javax.naming.NamingException; import java.io.IOException; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.hamcrest.CoreMatchers.allOf; @@ -1417,6 +1422,88 @@ public void updateSampleCohortTest() throws Exception { } } + @Test + public void updateSampleCohortWithThreadsTest() throws Exception { + Sample sampleId1 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_1"), INCLUDE_RESULT, token).first(); + Sample sampleId2 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_2"), INCLUDE_RESULT, token).first(); + Sample sampleId3 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_3"), INCLUDE_RESULT, token).first(); + catalogManager.getCohortManager().create(studyFqn, new Cohort().setId("MyCohort1") + .setSamples(Arrays.asList(sampleId1)), null, token).first(); + catalogManager.getCohortManager().create(studyFqn, new Cohort().setId("MyCohort2") + .setSamples(Arrays.asList(sampleId2, sampleId3)), null, token).first(); + + ExecutorService executorService = Executors.newFixedThreadPool(10, + new ThreadFactoryBuilder() + .setNameFormat("executor-service-%d") + .build()); + + StopWatch stopWatch = StopWatch.createStarted(); + List> sampleIds = new ArrayList<>(5); + List innerArray = new ArrayList<>(50); + for (int i = 0; i < 250; i++) { + if (i % 50 == 0) { + System.out.println("i = " + i); + } + + String sampleId = "SAMPLE_AUTO_" + i; + executorService.submit(() -> { + try { + catalogManager.getSampleManager().create(studyFqn, new Sample().setId(sampleId), QueryOptions.empty(), token); + } catch (CatalogException e) { + throw new RuntimeException(e); + } + }); + if (innerArray.size() == 50) { + sampleIds.add(new ArrayList<>(innerArray)); + innerArray.clear(); + } + innerArray.add(sampleId); + } + sampleIds.add(new ArrayList<>(innerArray)); + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.MINUTES); + + System.out.println("Creating 250 samples took " + stopWatch.getTime(TimeUnit.SECONDS) + " seconds"); + + stopWatch.stop(); + stopWatch.reset(); + stopWatch.start(); + executorService = Executors.newFixedThreadPool(3); + int execution = 0; + Map actionMap = new HashMap<>(); + actionMap.put(CohortDBAdaptor.QueryParams.SAMPLES.key(), ParamUtils.BasicUpdateAction.SET); + QueryOptions queryOptions = new QueryOptions(); + queryOptions.put(Constants.ACTIONS, actionMap); + for (List innerSampleIds : sampleIds) { + Cohort myCohort1 = catalogManager.getCohortManager().get(studyFqn, "MyCohort1", null, token).first(); + List sampleReferenceParamList = new ArrayList<>(myCohort1.getNumSamples() + innerSampleIds.size()); + sampleReferenceParamList.addAll(myCohort1.getSamples().stream().map(s -> new SampleReferenceParam().setId(s.getId())).collect(Collectors.toList())); + sampleReferenceParamList.addAll(innerSampleIds.stream().map(s -> new SampleReferenceParam().setId(s)).collect(Collectors.toList())); + int executionId = execution++; + executorService.submit(() -> { + try { + catalogManager.getCohortManager().update(studyFqn, "MyCohort1", + new CohortUpdateParams().setSamples(sampleReferenceParamList), + queryOptions, token); + System.out.println("Execution: " + executionId); + } catch (CatalogException e) { + throw new RuntimeException(e); + } + }); + } + executorService.shutdown(); + executorService.awaitTermination(1, TimeUnit.MINUTES); + System.out.println("Attaching 250 samples took " + stopWatch.getTime(TimeUnit.SECONDS) + " seconds"); + + // Ensure persistence + Query sampleQuery = new Query(SampleDBAdaptor.QueryParams.COHORT_IDS.key(), "MyCohort1"); + OpenCGAResult search = catalogManager.getSampleManager().search(studyFqn, sampleQuery, SampleManager.INCLUDE_SAMPLE_IDS, token); + Cohort myCohort1 = catalogManager.getCohortManager().get(studyFqn, "MyCohort1", null, token).first(); + assertEquals(search.getNumResults(), myCohort1.getNumSamples()); + Set sampleIdSet = search.getResults().stream().map(Sample::getId).collect(Collectors.toSet()); + assertTrue(myCohort1.getSamples().stream().map(Sample::getId).collect(Collectors.toSet()).containsAll(sampleIdSet)); + } + @Test public void deleteSampleCohortTest() throws Exception { Sample sampleId1 = catalogManager.getSampleManager().create(studyFqn, new Sample().setId("SAMPLE_1"), INCLUDE_RESULT, token).first(); diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/api/ParamConstants.java b/opencga-core/src/main/java/org/opencb/opencga/core/api/ParamConstants.java index ba0afa4dfc6..bd177657c23 100644 --- a/opencga-core/src/main/java/org/opencb/opencga/core/api/ParamConstants.java +++ b/opencga-core/src/main/java/org/opencb/opencga/core/api/ParamConstants.java @@ -76,8 +76,8 @@ public class ParamConstants { public static final String CELLBASE_URL = "https://ws.zettagenomics.com/cellbase"; - public static final String CELLBASE_VERSION = "v5.2"; - public static final String CELLBASE_DATA_RELEASE = "3"; + public static final String CELLBASE_VERSION = "v5.8"; + public static final String CELLBASE_DATA_RELEASE = "7"; public static final String CELLBASE_APIKEY = ""; public static final String POP_FREQ_1000G_CB_V4 = "1kG_phase3"; diff --git a/opencga-core/src/main/java/org/opencb/opencga/core/config/storage/CellBaseConfiguration.java b/opencga-core/src/main/java/org/opencb/opencga/core/config/storage/CellBaseConfiguration.java index 2ef6a77cca4..101a822562c 100644 --- a/opencga-core/src/main/java/org/opencb/opencga/core/config/storage/CellBaseConfiguration.java +++ b/opencga-core/src/main/java/org/opencb/opencga/core/config/storage/CellBaseConfiguration.java @@ -46,7 +46,8 @@ public class CellBaseConfiguration { private String apiKey; public CellBaseConfiguration() { - this(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION); + this(ParamConstants.CELLBASE_URL, ParamConstants.CELLBASE_VERSION, ParamConstants.CELLBASE_DATA_RELEASE, + ParamConstants.CELLBASE_APIKEY); } public CellBaseConfiguration(String url, String version) { diff --git a/opencga-storage/opencga-storage-core/pom.xml b/opencga-storage/opencga-storage-core/pom.xml index d731ec91136..b2b6da5bea7 100644 --- a/opencga-storage/opencga-storage-core/pom.xml +++ b/opencga-storage/opencga-storage-core/pom.xml @@ -191,6 +191,12 @@ com.databricks SnpEff + + + distlib + distlib + + diff --git a/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml b/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml index 4f64a3ad939..a2f894900b3 100644 --- a/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml +++ b/opencga-storage/opencga-storage-core/src/main/resources/storage-configuration.yml @@ -11,7 +11,7 @@ cellbase: ## URL host to annotate variants, for example: https://uk.ws.zettagenomics.com/cellbase/ url: "${OPENCGA.CELLBASE.REST.HOST}" version: "${OPENCGA.CELLBASE.VERSION}" - dataRelease: "2" + dataRelease: "7" ## Storage Query Server configuration. When CLI is launched in 'server' mode a RESTful web server ## is launched in the specified port. diff --git a/pom.xml b/pom.xml index 3b066129ccc..b5564242b70 100644 --- a/pom.xml +++ b/pom.xml @@ -814,6 +814,12 @@ com.databricks SnpEff ${snpeff.version} + + + distlib + distlib + + org.apache.parquet @@ -1394,7 +1400,7 @@ https://uk.ws.zettagenomics.com/cellbase/ - v5.2 + v5.8