Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TASK-6515 - Port Patch 2.12.6.1 -> 3.2.1 (XB 1.10.6.1 -> 2.2.1) #2491

Merged
merged 31 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
5f1029a
storage: Fix VariantSearchTest. #TASK-6136
j-coll May 27, 2024
9f46a7c
storage: Fix tests from HadoopVariantStorageEngineSplitDataTest #TASK…
j-coll May 28, 2024
f15978b
storage: Avoid unnecesary sample metadata updates updating cohorts #T…
j-coll May 31, 2024
e968bd2
storage: Improve HBaseLockManager errors and warn messages. #TASK-5895
j-coll May 31, 2024
dd1dcec
storage: Increase metadata lock duration and timeout. #TASK-5895
j-coll May 31, 2024
1c7db62
storage: Improve testing SampleIndexAggregation of intergenic queries…
j-coll May 31, 2024
e38d0fc
Merge branch 'release-2.12.x' into TASK-6136
j-coll Jun 3, 2024
9f2b06d
Prepare next release 2.12.6-SNAPSHOT
juanfeSanahuja Jun 4, 2024
7fcd257
Merge pull request #2457 from opencb/TASK-6136
j-coll Jun 6, 2024
97c09c6
Merge branch 'release-2.12.x' into TASK-5895
j-coll Jun 6, 2024
63dec4c
Merge pull request #2459 from opencb/TASK-5895
j-coll Jun 17, 2024
b6e6e86
storage: Fix compoundHet query in single-study projects. #TASK-6311
j-coll Jun 20, 2024
c626659
Merge pull request #2471 from opencb/TASK-6311
j-coll Jun 25, 2024
f3ac930
storage: Fix numTotalSamples variant result value. #TASK-6436
j-coll Jun 28, 2024
e95bae4
storage: Fix canUseThisExecutor on SampleIndexOnlyVariantQueryExecuto…
j-coll Jun 28, 2024
ccd3da9
Merge pull request #2478 from opencb/TASK-6436
j-coll Jul 1, 2024
54b6676
Prepare release 2.12.6
juanfeSanahuja Jul 5, 2024
f35ed46
Prepare next release 2.12.6.1-SNAPSHOT
j-coll Jul 24, 2024
9a7dd15
storage: Hash large vairants into solr collections. Add unhashed attr…
j-coll Jul 24, 2024
5d3c7f4
storage: Fix some tests. #TASK-6596
j-coll Jul 24, 2024
84af7c1
storage: Fix NPE at some tests. #TASK-6596
j-coll Jul 24, 2024
f7b4b52
storage: Fix VariantDBAdaptorTest. #TASK-6596
j-coll Jul 25, 2024
2dcd910
Merge pull request #2490 from opencb/TASK-6596
j-coll Jul 25, 2024
8cdc46c
Prepare release 2.12.6.1
juanfeSanahuja Jul 26, 2024
9dbf1f8
Prepare Port Patch 1.10.6.1 -> 2.2.1 #TASK-6515
juanfeSanahuja Jul 30, 2024
26e8c2e
Merge branch 'release-3.2.x' into TASK-6515
juanfeSanahuja Jul 30, 2024
69d88bb
Merge branch 'release-3.2.x' into TASK-6515
juanfeSanahuja Aug 9, 2024
00ddc0a
Fix compile error CELLBASE_DATA_RELEASE_GRCH38 #TASK-6515
juanfeSanahuja Aug 9, 2024
d321044
storage: Fix merge issue with ASSEMBLY. #TASK-6515
j-coll Aug 13, 2024
8c1cde1
storage: Embedded solr should be optional in tests. #TASK-6515
j-coll Aug 13, 2024
568244c
Merge branch 'release-3.2.x' into TASK-6515
pfurio Aug 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ private void rebuildSampleFileIds(VariantStorageMetadataManager metadataManager,
for (Map.Entry<Integer, List<Integer>> entry : batch.entrySet()) {
Integer sampleId = entry.getKey();
List<Integer> fileIds = entry.getValue();

List<Integer> actualFiles = metadataManager.getSampleMetadata(studyId, sampleId).getFiles();
if (actualFiles.size() != fileIds.size() || !actualFiles.containsAll(fileIds)) {
fixedSamples++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.BiPredicate;
Expand Down Expand Up @@ -191,14 +190,7 @@ public ObjectMap getConfiguration() {

public Lock lockGlobal(long lockDuration, long timeout, String lockName)
throws StorageEngineException {
try {
return projectDBAdaptor.lockProject(lockDuration, timeout, lockName);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageEngineException("Unable to lock the Project", e);
} catch (TimeoutException e) {
throw new StorageEngineException("Unable to lock the Project", e);
}
return projectDBAdaptor.lockProject(lockDuration, timeout, lockName);
}

public Lock lockStudy(int studyId) throws StorageEngineException {
Expand Down Expand Up @@ -282,17 +274,14 @@ public <E extends Exception> StudyMetadata updateStudyMetadata(Object study, Upd
throws StorageEngineException, E {
int studyId = getStudyId(study);

Lock lock = lockStudy(studyId);
try {
try (Lock lock = lockStudy(studyId)) {
StudyMetadata sm = getStudyMetadata(studyId);

sm = updater.update(sm);

lock.checkLocked();
unsecureUpdateStudyMetadata(sm);
return sm;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -557,16 +546,8 @@ public <E extends Exception> ProjectMetadata updateProjectMetadata(UpdateConsume
public <E extends Exception> ProjectMetadata updateProjectMetadata(UpdateFunction<ProjectMetadata, E> function)
throws StorageEngineException, E {
Objects.requireNonNull(function);
Lock lock;
try {
lock = projectDBAdaptor.lockProject(lockDuration, lockTimeout);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new StorageEngineException("Unable to lock the Project", e);
} catch (TimeoutException e) {
throw new StorageEngineException("Unable to lock the Project", e);
}
try {

try (Lock lock = projectDBAdaptor.lockProject(lockDuration, lockTimeout)) {
ProjectMetadata projectMetadata = getProjectMetadata();
int countersHash = (projectMetadata == null ? Collections.emptyMap() : projectMetadata.getCounters()).hashCode();

Expand All @@ -579,8 +560,6 @@ public <E extends Exception> ProjectMetadata updateProjectMetadata(UpdateFunctio
lock.checkLocked();
projectDBAdaptor.updateProjectMetadata(projectMetadata, updateCounters);
return projectMetadata;
} finally {
lock.unlock();
}
}

Expand All @@ -594,11 +573,15 @@ public ProjectMetadata getProjectMetadata() {

public ProjectMetadata getAndUpdateProjectMetadata(ObjectMap options) throws StorageEngineException {
ProjectMetadata projectMetadata = getProjectMetadata();

checkSameSpeciesAndAssembly(options, projectMetadata);
if (options != null && (projectMetadata == null
|| StringUtils.isEmpty(projectMetadata.getSpecies()) && options.containsKey(SPECIES.key())
|| StringUtils.isEmpty(projectMetadata.getAssembly()) && options.containsKey(ASSEMBLY.key()))) {

projectMetadata = updateProjectMetadata(pm -> {
// Check again, in case it was updated by another thread
checkSameSpeciesAndAssembly(options, pm);
if (pm == null) {
pm = new ProjectMetadata();
}
Expand All @@ -619,6 +602,25 @@ public ProjectMetadata getAndUpdateProjectMetadata(ObjectMap options) throws Sto
return projectMetadata;
}

private static void checkSameSpeciesAndAssembly(ObjectMap options, ProjectMetadata projectMetadata) throws StorageEngineException {
if (options != null && projectMetadata != null) {
if (options.containsKey(ASSEMBLY.key())) {
if (StringUtils.isNotEmpty(projectMetadata.getAssembly()) && !projectMetadata.getAssembly()
.equalsIgnoreCase(options.getString(ASSEMBLY.key()))) {
throw new StorageEngineException("Incompatible assembly change from '" + projectMetadata.getAssembly() + "' to '"
+ options.getString(ASSEMBLY.key()) + "'");
}
}
if (options.containsKey(SPECIES.key())) {
if (StringUtils.isNotEmpty(projectMetadata.getSpecies()) && !projectMetadata.getSpecies()
.equalsIgnoreCase(toCellBaseSpeciesName(options.getString(SPECIES.key())))) {
throw new StorageEngineException("Incompatible species change from '" + projectMetadata.getSpecies() + "' to '"
+ options.getString(SPECIES.key()) + "'");
}
}
}
}

public DataResult<VariantFileMetadata> getVariantFileMetadata(int studyId, int fileId, QueryOptions options)
throws StorageEngineException {
return fileDBAdaptor.getVariantFileMetadata(studyId, fileId, options);
Expand Down Expand Up @@ -673,16 +675,14 @@ public void unsecureUpdateFileMetadata(int studyId, FileMetadata file) {
public <E extends Exception> FileMetadata updateFileMetadata(int studyId, int fileId, UpdateConsumer<FileMetadata, E> update)
throws E, StorageEngineException {
getFileName(studyId, fileId); // Check file exists
Lock lock = fileDBAdaptor.lock(studyId, fileId, lockDuration, lockTimeout);
try {

try (Lock lock = fileDBAdaptor.lock(studyId, fileId, lockDuration, lockTimeout)) {
FileMetadata fileMetadata = getFileMetadata(studyId, fileId);
update.update(fileMetadata);
lock.checkLocked();
unsecureUpdateFileMetadata(studyId, fileMetadata);
fileIdIndexedCache.put(studyId, fileId, fileMetadata.isIndexed());
return fileMetadata;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -863,6 +863,19 @@ public Iterator<FileMetadata> fileMetadataIterator(int studyId) {
return fileDBAdaptor.fileIterator(studyId);
}

public SampleMetadata getSampleMetadata(Integer studyId, Integer sampleId) {
return getSampleMetadata(studyId.intValue(), sampleId.intValue());
}

public SampleMetadata getSampleMetadata(int studyId, Integer sampleId) {
return getSampleMetadata(studyId, sampleId.intValue());
}

public SampleMetadata getSampleMetadata(int studyId, Object sample) {
int sampleId = getSampleIdOrFail(studyId, sample);
return getSampleMetadata(studyId, sampleId);
}

public SampleMetadata getSampleMetadata(int studyId, int sampleId) {
return sampleDBAdaptor.getSampleMetadata(studyId, sampleId, null);
}
Expand All @@ -875,15 +888,13 @@ public void unsecureUpdateSampleMetadata(int studyId, SampleMetadata sample) {
public <E extends Exception> SampleMetadata updateSampleMetadata(int studyId, int sampleId, UpdateConsumer<SampleMetadata, E> consumer)
throws E, StorageEngineException {
getSampleName(studyId, sampleId); // Check sample exists
Lock lock = sampleDBAdaptor.lock(studyId, sampleId, lockDuration, lockTimeout);
try {

try (Lock lock = sampleDBAdaptor.lock(studyId, sampleId, lockDuration, lockTimeout)) {
SampleMetadata sample = getSampleMetadata(studyId, sampleId);
sample = consumer.toFunction().update(sample);
lock.checkLocked();
unsecureUpdateSampleMetadata(studyId, sample);
return sample;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -1054,15 +1065,12 @@ public void unsecureUpdateCohortMetadata(int studyId, CohortMetadata cohort) {
public <E extends Exception> CohortMetadata updateCohortMetadata(int studyId, int cohortId, UpdateConsumer<CohortMetadata, E> update)
throws E, StorageEngineException {
getCohortName(studyId, cohortId); // Check cohort exists
Lock lock = cohortDBAdaptor.lock(studyId, cohortId, lockDuration, lockTimeout);
try {
try (Lock lock = cohortDBAdaptor.lock(studyId, cohortId, lockDuration, lockTimeout)) {
CohortMetadata cohortMetadata = getCohortMetadata(studyId, cohortId);
update.update(cohortMetadata);
lock.checkLocked();
unsecureUpdateCohortMetadata(studyId, cohortMetadata);
return cohortMetadata;
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -1190,13 +1198,19 @@ private CohortMetadata updateCohortSamples(int studyId, String cohortName, Colle
for (Integer sampleId : sampleIds) {
Integer finalCohortId = cohortId;
if (secondaryIndexCohort) {
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addSecondaryIndexCohort(finalCohortId);
});
if (!getSampleMetadata(studyId, sampleId).getSecondaryIndexCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addSecondaryIndexCohort(finalCohortId);
});
}
} else {
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addCohort(finalCohortId);
});
if (!getSampleMetadata(studyId, sampleId).getCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleId, sampleMetadata -> {
sampleMetadata.addCohort(finalCohortId);
});
}
}
}

Expand All @@ -1209,13 +1223,19 @@ private CohortMetadata updateCohortSamples(int studyId, String cohortName, Colle
Integer finalCohortId = cohortId;
if (!sampleIds.contains(sampleFromCohort)) {
if (secondaryIndexCohort) {
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getSecondaryIndexCohorts().remove(finalCohortId);
});
if (getSampleMetadata(studyId, sampleFromCohort).getSecondaryIndexCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getSecondaryIndexCohorts().remove(finalCohortId);
});
}
} else {
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getCohorts().remove(finalCohortId);
});
if (getSampleMetadata(studyId, sampleFromCohort).getCohorts().contains(finalCohortId)) {
// Avoid unnecessary updates
updateSampleMetadata(studyId, sampleFromCohort, sampleMetadata -> {
sampleMetadata.getCohorts().remove(finalCohortId);
});
}
}
}
}
Expand Down Expand Up @@ -1326,15 +1346,12 @@ public void unsecureUpdateTask(int studyId, TaskMetadata task) throws StorageEng
public <E extends Exception> TaskMetadata updateTask(int studyId, int taskId, UpdateConsumer<TaskMetadata, E> consumer)
throws E, StorageEngineException {
getTask(studyId, taskId); // Check task exists
Lock lock = taskDBAdaptor.lock(studyId, taskId, lockDuration, lockTimeout);
try {
try (Lock lock = taskDBAdaptor.lock(studyId, taskId, lockDuration, lockTimeout)) {
TaskMetadata task = getTask(studyId, taskId);
consumer.update(task);
lock.checkLocked();
unsecureUpdateTask(studyId, task);
return task;
} finally {
lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.opencb.opencga.storage.core.metadata.models.ProjectMetadata;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* Created on 02/05/18.
Expand All @@ -17,14 +16,12 @@
public interface ProjectMetadataAdaptor extends AutoCloseable {

default Lock lockProject(long lockDuration, long timeout)
throws InterruptedException, TimeoutException, StorageEngineException {
throws StorageEngineException {
return lockProject(lockDuration, timeout, null);
}

Lock lockProject(long lockDuration, long timeout, String lockName)
throws InterruptedException, TimeoutException, StorageEngineException;

void unLockProject(long lockId) throws StorageEngineException;
throws StorageEngineException;

DataResult<ProjectMetadata> getProjectMetadata();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,7 @@ public VariantQueryExecutor getVariantQueryExecutor(Query query, QueryOptions op
public VariantQueryExecutor getVariantQueryExecutor(ParsedVariantQuery variantQuery) {
try {
for (VariantQueryExecutor executor : getVariantQueryExecutors()) {
if (executor.canUseThisExecutor(variantQuery.getQuery(), variantQuery.getInputOptions())) {
if (executor.canUseThisExecutor(variantQuery, variantQuery.getInputOptions())) {
logger.info("Using VariantQueryExecutor : " + executor.getClass().getName());
logger.info(" Query : " + VariantQueryUtils.printQuery(variantQuery.getInputQuery()));
logger.info(" Options : " + variantQuery.getInputOptions().toJson());
Expand All @@ -1362,6 +1362,19 @@ public VariantQueryExecutor getVariantQueryExecutor(ParsedVariantQuery variantQu
throw new VariantQueryException("No VariantQueryExecutor found to run the query!");
}

public final VariantQueryExecutor getVariantQueryExecutor(Class<? extends VariantQueryExecutor> clazz)
throws StorageEngineException {
Optional<VariantQueryExecutor> first = getVariantQueryExecutors()
.stream()
.filter(e -> e instanceof SearchIndexVariantQueryExecutor)
.findFirst();
if (first.isPresent()) {
return first.get();
} else {
throw new StorageEngineException("VariantQueryExecutor " + clazz + " not found");
}
}

public Query preProcessQuery(Query originalQuery, QueryOptions options) {
try {
return getVariantQueryParser().preProcessQuery(originalQuery, options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ public enum VariantStorageOptions implements ConfigurationOption {

INDEX_SEARCH("indexSearch", false), // Build secondary indexes using search engine.

METADATA_LOCK_DURATION("metadata.lock.duration", 5000),
METADATA_LOCK_TIMEOUT("metadata.lock.timeout", 60000),
METADATA_LOCK_DURATION("metadata.lock.duration", 60000),
METADATA_LOCK_TIMEOUT("metadata.lock.timeout", 600000),
METADATA_LOAD_BATCH_SIZE("metadata.load.batchSize", 10),
METADATA_LOAD_THREADS("metadata.load.numThreads", 4),

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@
import org.opencb.opencga.storage.core.io.plain.StringDataReader;
import org.opencb.opencga.storage.core.io.plain.StringDataWriter;
import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager;
import org.opencb.opencga.storage.core.metadata.models.CohortMetadata;
import org.opencb.opencga.storage.core.metadata.models.FileMetadata;
import org.opencb.opencga.storage.core.metadata.models.StudyMetadata;
import org.opencb.opencga.storage.core.metadata.models.TaskMetadata;
import org.opencb.opencga.storage.core.metadata.models.*;
import org.opencb.opencga.storage.core.variant.adaptors.GenotypeClass;
import org.opencb.opencga.storage.core.variant.adaptors.VariantDBAdaptor;
import org.opencb.opencga.storage.core.variant.io.VariantReaderUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,14 +267,6 @@ public VariantStudyQuery setStudies(ParsedQuery<String> studies) {
return this;
}

public String getStudyOrFail() {
if (studies == null || studies.size() != 1) {
throw new VariantQueryException("Require exactly one study");
} else {
return studies.get(0);
}
}

public ParsedQuery<KeyOpValue<SampleMetadata, List<String>>> getGenotypes() {
return genotypes;
}
Expand Down Expand Up @@ -311,6 +303,19 @@ public void setDefaultStudy(StudyMetadata defaultStudy) {
public StudyMetadata getDefaultStudy() {
return defaultStudy;
}

public StudyMetadata getDefaultStudyOrFail() {
if (defaultStudy == null) {
if (studies.size() != 1) {
throw new VariantQueryException("Only one study is allowed. Found " + studies.size() + " studies");
} else {
throw new VariantQueryException("One study required. None provided");
}
} else {
return defaultStudy;
}
}

}

public static class VariantQueryXref {
Expand Down
Loading
Loading