Skip to content

Commit

Permalink
analysis: Synchronize to catalog annotation and secondary annotation …
Browse files Browse the repository at this point in the history
…index status #TASK-6219
  • Loading branch information
j-coll committed Aug 14, 2024
1 parent 9616d1e commit 0a765b7
Show file tree
Hide file tree
Showing 9 changed files with 204 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public VariantSearchLoadResult secondaryAnnotationIndex(String project, String r
Query inputQuery = new Query();
inputQuery.putIfNotEmpty(VariantQueryParam.REGION.key(), region);
VariantSearchLoadResult result = engine.secondaryIndex(inputQuery, new QueryOptions(params), overwrite);
getSynchronizer(engine).synchronizeCatalogFromStorage(token);
getSynchronizer(engine).synchronizeCatalogFromStorage(project, token);
return result;
});
}
Expand Down Expand Up @@ -282,8 +282,7 @@ public void annotate(String projectStr, List<String> studies, String region, boo
public void saveAnnotation(String project, String annotationName, ObjectMap params, String token)
throws CatalogException, StorageEngineException {
secureOperationByProject(VariantAnnotationSaveOperationTool.ID, project, params, token, engine -> {
CatalogStorageMetadataSynchronizer
.updateProjectMetadata(catalogManager, engine.getMetadataManager(), project, token);
getSynchronizer(engine).synchronizeProjectMetadataFromCatalog(project, token);
engine.saveAnnotation(annotationName, params);
return null;
});
Expand All @@ -292,8 +291,7 @@ public void saveAnnotation(String project, String annotationName, ObjectMap para
public void deleteAnnotation(String project, String annotationName, ObjectMap params, String token)
throws CatalogException, StorageEngineException {
secureOperationByProject(VariantAnnotationDeleteOperationTool.ID, project, params, token, engine -> {
CatalogStorageMetadataSynchronizer
.updateProjectMetadata(catalogManager, engine.getMetadataManager(), project, token);
getSynchronizer(engine).synchronizeProjectMetadataFromCatalog(project, token);
engine.deleteAnnotation(annotationName, params);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,27 +49,21 @@ public OperationManager(VariantStorageManager variantStorageManager, VariantStor
this.variantStorageEngine = variantStorageEngine;
}

public final StudyMetadata synchronizeCatalogStudyFromStorage(String study, String token)
throws CatalogException, StorageEngineException {
return synchronizeCatalogStudyFromStorage(study, token, false);
}

public final StudyMetadata synchronizeCatalogStudyFromStorage(String study, String token, boolean failIfNotExist)
throws CatalogException, StorageEngineException {
VariantStorageMetadataManager metadataManager = variantStorageEngine.getMetadataManager();
CatalogStorageMetadataSynchronizer metadataSynchronizer
= new CatalogStorageMetadataSynchronizer(catalogManager, metadataManager);

StudyMetadata studyMetadata = metadataManager.getStudyMetadata(study);
if (studyMetadata == null) {
if (!metadataManager.studyExists(study)) {
if (failIfNotExist) {
throw new CatalogException("Study '" + study + "' does not exist on the VariantStorage");
}
} else {
// Update Catalog file and cohort status.
metadataSynchronizer.synchronizeCatalogStudyFromStorage(studyMetadata, token);
metadataSynchronizer.synchronizeCatalogStudyFromStorage(study, token);
}
return studyMetadata;
return metadataManager.getStudyMetadata(study);
}

protected final String getStudyFqn(String study, String token) throws CatalogException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.core.common.TimeUtils;
import org.opencb.opencga.core.models.file.File;
import org.opencb.opencga.core.models.project.Project;
import org.opencb.opencga.core.models.project.ProjectOrganism;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.variant.VariantStorageEngine;
import org.opencb.opencga.storage.core.variant.adaptors.VariantQueryParam;
Expand Down Expand Up @@ -96,7 +94,7 @@ private void annotate(String projectStr, List<String> studies, String loadFileSt
if (StringUtils.isEmpty(loadFileStr)) {
variantStorageEngine.annotate(outdir.toUri(), annotationQuery, annotationOptions);
new CatalogStorageMetadataSynchronizer(catalogManager, variantStorageEngine.getMetadataManager())
.synchronizeCatalogFromStorage(token);
.synchronizeCatalogFromStorage(projectStr, token);
} else {
Path loadFilePath = Paths.get(loadFileStr);
boolean fileExists = Files.exists(loadFilePath);
Expand All @@ -123,11 +121,8 @@ private void annotate(String projectStr, List<String> studies, String loadFileSt
}

private void synchronizeProjectMetadata(String projectStr, String token) throws CatalogException, StorageEngineException {
Project project = catalogManager.getProjectManager().get(projectStr, QueryOptions.empty(), token).first();
ProjectOrganism organism = project.getOrganism();
int currentRelease = project.getCurrentRelease();
CatalogStorageMetadataSynchronizer.updateProjectMetadata(variantStorageEngine.getMetadataManager(), organism, currentRelease,
project.getCellbase());
new CatalogStorageMetadataSynchronizer(catalogManager, variantStorageEngine.getMetadataManager())
.synchronizeProjectMetadataFromCatalog(projectStr, token);
}

private String buildOutputFileName(String alias, String region) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public List<StoragePipelineResult> index(String study, List<String> files, URI o
updateProject(studyFqn, token);

List<URI> fileUris = findFilesToIndex(params, token);
if (fileUris.size() == 0) {
if (fileUris.isEmpty()) {
logger.warn("Nothing to do.");
return Collections.emptyList();
}
Expand All @@ -130,6 +130,14 @@ public List<StoragePipelineResult> index(String study, List<String> files, URI o

private void check(String study, ObjectMap params, String token) throws Exception {
studyFqn = getStudyFqn(study, token);
String projectFqn = catalogManager.getStudyManager().getProjectFqn(studyFqn);

Project project = catalogManager
.getProjectManager()
.get(projectFqn,
new QueryOptions(QueryOptions.INCLUDE, Collections.singletonList(CURRENT_RELEASE.key())),
token).first();
release = project.getCurrentRelease();

JwtPayload jwtPayload = new JwtPayload(token);
CatalogFqn catalogFqn = CatalogFqn.extractFqnFromStudy(studyFqn, jwtPayload);
Expand Down Expand Up @@ -169,16 +177,9 @@ private void check(String study, ObjectMap params, String token) throws Exceptio

private void updateProject(String studyFqn, String token) throws CatalogException, StorageEngineException {
String projectFqn = catalogManager.getStudyManager().getProjectFqn(studyFqn);
Project project = catalogManager
.getProjectManager()
.get(projectFqn,
new QueryOptions(QueryOptions.INCLUDE, Arrays.asList(CURRENT_RELEASE.key(), ORGANISM.key(), CELLBASE.key())),
token).first();
release = project.getCurrentRelease();

// Add species, assembly and release
CatalogStorageMetadataSynchronizer.updateProjectMetadata(variantStorageEngine.getMetadataManager(), project.getOrganism(), release,
project.getCellbase());
synchronizer.synchronizeProjectMetadataFromCatalog(projectFqn, token);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.opencb.opencga.core.models.project.Project;
import org.opencb.opencga.core.models.project.ProjectOrganism;
import org.opencb.opencga.core.models.sample.*;
import org.opencb.opencga.core.models.variant.InternalVariantOperationIndex;
import org.opencb.opencga.core.models.variant.OperationIndexStatus;
import org.opencb.opencga.core.response.OpenCGAResult;
import org.opencb.opencga.storage.core.exceptions.StorageEngineException;
import org.opencb.opencga.storage.core.metadata.VariantStorageMetadataManager;
Expand Down Expand Up @@ -111,23 +113,21 @@ public CatalogStorageMetadataSynchronizer(CatalogManager catalogManager, Variant
this.metadataManager = metadataManager;
}

public static void updateProjectMetadata(CatalogManager catalog, VariantStorageMetadataManager scm, String project, String sessionId)
public void synchronizeProjectMetadataFromCatalog(String projectFqn, String token)
throws CatalogException, StorageEngineException {
final Project p = catalog.getProjectManager().get(project,
final Project project = catalogManager.getProjectManager().get(projectFqn,
new QueryOptions(QueryOptions.INCLUDE, Arrays.asList(
ProjectDBAdaptor.QueryParams.ORGANISM.key(), ProjectDBAdaptor.QueryParams.CURRENT_RELEASE.key(),
ProjectDBAdaptor.QueryParams.CELLBASE.key())),
sessionId)
token)
.first();

updateProjectMetadata(scm, p.getOrganism(), p.getCurrentRelease(), p.getCellbase());
}

public static void updateProjectMetadata(VariantStorageMetadataManager scm, ProjectOrganism organism, int release, CellBaseConfiguration cellbase)
throws StorageEngineException {
int release = project.getCurrentRelease();
CellBaseConfiguration cellbase = project.getCellbase();
ProjectOrganism organism = project.getOrganism();
String scientificName = CellBaseUtils.toCellBaseSpeciesName(organism.getScientificName());

scm.updateProjectMetadata(projectMetadata -> {
metadataManager.updateProjectMetadata(projectMetadata -> {
if (projectMetadata == null) {
projectMetadata = new ProjectMetadata();
}
Expand All @@ -139,8 +139,84 @@ public static void updateProjectMetadata(VariantStorageMetadataManager scm, Proj
});
}

public StudyMetadata getStudyMetadata(String study) throws CatalogException {
return metadataManager.getStudyMetadata(study);
private void synchronizeCatalogProjectFromStorageByStudy(String studyFqn, String token)
throws CatalogException {
String projectFqn = catalogManager.getStudyManager().getProjectFqn(studyFqn);
synchronizeCatalogProjectFromStorage(projectFqn, token);
}

private void synchronizeCatalogProjectFromStorage(String projectFqn, String token)
throws CatalogException {

if (!metadataManager.exists()) {
return;
}
logger.info("Synchronize project '{}' from Storage", projectFqn);
ProjectMetadata projectMetadata = metadataManager.getProjectMetadata();

Project project = catalogManager.getProjectManager().get(projectFqn,
new QueryOptions(QueryOptions.INCLUDE, Arrays.asList(
ProjectDBAdaptor.QueryParams.FQN.key(),
ProjectDBAdaptor.QueryParams.ORGANISM.key(),
ProjectDBAdaptor.QueryParams.INTERNAL.key(),
ProjectDBAdaptor.QueryParams.CELLBASE.key())),
token)
.first();
projectFqn = project.getFqn();

String annotationIndexStatus = secureGet(() -> project.getInternal().getVariant().getAnnotationIndex().getStatus().getId(), null);
TaskMetadata.Status storageAnnotationIndexStatus = projectMetadata.getAnnotationIndexStatus();
if (!storageAnnotationIndexStatus.name().equals(annotationIndexStatus)) {
logger.info("Update project '{}' annotation index status to {}",
projectFqn, storageAnnotationIndexStatus);

OperationIndexStatus operationIndexStatus;
switch (storageAnnotationIndexStatus) {
case NONE:
operationIndexStatus = new OperationIndexStatus(OperationIndexStatus.PENDING,
"Variant annotation index operation pending. "
+ " variantIndexTs = " + projectMetadata.getVariantIndexLastTimestamp()
+ ", variantAnnotationIndexTs = " + projectMetadata.getAnnotationIndexLastTimestamp()
);
break;
case READY:
operationIndexStatus = new OperationIndexStatus(storageAnnotationIndexStatus.name(), "");
break;
default:
throw new IllegalStateException("Unexpected value: " + storageAnnotationIndexStatus);
}
catalogManager.getProjectManager().setProjectInternalVariantAnnotationIndex(projectFqn,
new InternalVariantOperationIndex(operationIndexStatus),
new QueryOptions(), token);
}

String secondaryAnnotationIndexStatus = secureGet(() -> project.getInternal().getVariant().getSecondaryAnnotationIndex().getStatus().getId(), null);
TaskMetadata.Status storageSecondaryAnnotationIndexStatus = projectMetadata.getSecondaryAnnotationIndexStatus();
if (!storageSecondaryAnnotationIndexStatus.name().equals(secondaryAnnotationIndexStatus)) {
logger.info("Update project '{}' secondary annotation index status to {}",
projectFqn, storageSecondaryAnnotationIndexStatus);

OperationIndexStatus operationIndexStatus;
switch (storageSecondaryAnnotationIndexStatus) {
case NONE:
operationIndexStatus = new OperationIndexStatus(OperationIndexStatus.PENDING,
"Variant secondary annotation index operation pending. "
+ " variantIndexTs = " + projectMetadata.getVariantIndexLastTimestamp()
+ ", variantSecondaryAnnotationIndexTs = " + projectMetadata.getSecondaryAnnotationIndexLastTimestamp()
+ ", variantAnnotationIndexTs = " + projectMetadata.getAnnotationIndexLastTimestamp()
+ ", variantIndexStatsTs = " + projectMetadata.getStatsLastTimestamp()
);
break;
case READY:
operationIndexStatus = new OperationIndexStatus(storageAnnotationIndexStatus.name(), "");
break;
default:
throw new IllegalStateException("Unexpected value: " + storageSecondaryAnnotationIndexStatus);
}
catalogManager.getProjectManager().setProjectInternalVariantSecondaryAnnotationIndex(projectFqn,
new InternalVariantOperationIndex(operationIndexStatus),
new QueryOptions(), token);
}
}

/**
Expand Down Expand Up @@ -177,6 +253,8 @@ public boolean synchronizeCatalogFilesFromStorage(String study, List<File> files
*/
public boolean synchronizeCatalogFilesFromStorage(String studyFqn, List<File> files, String sessionId)
throws CatalogException {
synchronizeCatalogProjectFromStorageByStudy(studyFqn, sessionId);

StudyMetadata study = metadataManager.getStudyMetadata(studyFqn);
if (study == null) {
return false;
Expand Down Expand Up @@ -206,6 +284,8 @@ public boolean synchronizeCatalogSamplesFromStorage(String studyFqn, List<String
if (study == null) {
return false;
}
synchronizeCatalogProjectFromStorageByStudy(studyFqn, sessionId);

logger.info("Synchronizing samples from study " + study.getName());

List<Integer> sampleIds;
Expand All @@ -232,10 +312,12 @@ public boolean synchronizeCatalogSamplesFromStorage(String studyFqn, List<String
* @return if anything was modified
* @throws CatalogException on error
*/
public boolean synchronizeCatalogFromStorage(String token) throws CatalogException {
public boolean synchronizeCatalogFromStorage(String project, String token) throws CatalogException {
boolean modified = false;
synchronizeCatalogProjectFromStorage(project, token);
for (String study : metadataManager.getStudyNames()) {
modified |= synchronizeCatalogStudyFromStorage(study, token);
StudyMetadata studyMetadata = metadataManager.getStudyMetadata(study);
modified |= synchronizeCatalogStudyFromStorage(studyMetadata, token);
}
return modified;
}
Expand All @@ -244,6 +326,7 @@ public boolean synchronizeCatalogStudyFromStorage(String study, String sessionId
throws CatalogException {
StudyMetadata studyMetadata = metadataManager.getStudyMetadata(study);
if (studyMetadata != null) {
synchronizeCatalogProjectFromStorageByStudy(studyMetadata.getName(), sessionId);
// Update Catalog file and cohort status.
return synchronizeCatalogStudyFromStorage(studyMetadata, sessionId);
} else {
Expand All @@ -263,7 +346,7 @@ public boolean synchronizeCatalogStudyFromStorage(String study, String sessionId
* @return if there were modifications in catalog
* @throws CatalogException if there is an error with catalog
*/
public boolean synchronizeCatalogStudyFromStorage(StudyMetadata study, String sessionId)
private boolean synchronizeCatalogStudyFromStorage(StudyMetadata study, String sessionId)
throws CatalogException {
logger.info("Synchronizing study " + study.getName());

Expand All @@ -275,10 +358,11 @@ public boolean synchronizeCatalogStudyFromStorage(StudyMetadata study, String se
}

public boolean synchronizeCohorts(String study, String sessionId) throws CatalogException {
StudyMetadata studyMetadata = getStudyMetadata(study);
StudyMetadata studyMetadata = metadataManager.getStudyMetadata(study);
if (studyMetadata == null) {
return false;
} else {
synchronizeCatalogProjectFromStorageByStudy(studyMetadata.getName(), sessionId);
return synchronizeCohorts(studyMetadata, sessionId);
}
}
Expand Down
Loading

0 comments on commit 0a765b7

Please sign in to comment.