Skip to content

Commit

Permalink
catalog: add new update operation, #TASK-6445
Browse files Browse the repository at this point in the history
  • Loading branch information
pfurio committed Aug 7, 2024
1 parent c97f75e commit d776092
Show file tree
Hide file tree
Showing 14 changed files with 515 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import org.opencb.opencga.core.common.TimeUtils;
import org.opencb.opencga.core.exceptions.ToolException;
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.nextflow.NextFlowRunParams;
import org.opencb.opencga.core.models.nextflow.Workflow;
import org.opencb.opencga.core.models.workflow.NextFlowRunParams;
import org.opencb.opencga.core.models.workflow.Workflow;
import org.opencb.opencga.core.response.OpenCGAResult;
import org.opencb.opencga.core.tools.annotations.Tool;
import org.opencb.opencga.core.tools.annotations.ToolParams;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
import org.opencb.opencga.catalog.managers.AbstractManagerTest;
import org.opencb.opencga.core.config.storage.StorageConfiguration;
import org.opencb.opencga.core.exceptions.ToolException;
import org.opencb.opencga.core.models.nextflow.NextFlowRunParams;
import org.opencb.opencga.core.models.nextflow.Workflow;
import org.opencb.opencga.core.models.workflow.NextFlowRunParams;
import org.opencb.opencga.core.models.workflow.Workflow;
import org.opencb.opencga.core.models.workflow.WorkflowCreateParams;
import org.opencb.opencga.storage.core.StorageEngineFactory;

import java.io.IOException;
Expand All @@ -26,7 +27,7 @@ public void nextflowScriptTest() throws ToolException, CatalogException, IOExcep
InputStream inputStream = StorageManager.class.getClassLoader().getResourceAsStream("storage-configuration.yml");
StorageConfiguration storageConfiguration = StorageConfiguration.load(inputStream, "yml");

Workflow workflow = getDummyWorkflow();
WorkflowCreateParams workflow = createDummyWorkflow();
catalogManager.getWorkflowManager().create(workflow, QueryOptions.empty(), ownerToken);

Path outDir = Paths.get(catalogManagerResource.createTmpOutdir("_nextflow"));
Expand All @@ -45,10 +46,9 @@ public void nextflowScriptTest() throws ToolException, CatalogException, IOExcep
public void nextflowDockerTest() throws ToolException, CatalogException, IOException {
InputStream inputStream = StorageManager.class.getClassLoader().getResourceAsStream("storage-configuration.yml");
StorageConfiguration storageConfiguration = StorageConfiguration.load(inputStream, "yml");
Workflow workflow = new Workflow()
WorkflowCreateParams workflow = new WorkflowCreateParams()
.setId("workflow")
.setCommandLine("run nextflow-io/rnaseq-nf -with-docker")
.setType(Workflow.Type.NEXTFLOW);
.setCommandLine("run nextflow-io/rnaseq-nf -with-docker");
catalogManager.getWorkflowManager().create(workflow, QueryOptions.empty(), ownerToken);

Path outDir = Paths.get(catalogManagerResource.createTmpOutdir("_nextflow"));
Expand All @@ -62,8 +62,7 @@ public void nextflowDockerTest() throws ToolException, CatalogException, IOExcep
System.out.println(stopWatch.getTime(TimeUnit.MILLISECONDS));
}


private Workflow getDummyWorkflow() {
private WorkflowCreateParams createDummyWorkflow() {
String scriptContent = "params.str = 'Hello world!'\n" +
"\n" +
"process splitLetters {\n" +
Expand Down Expand Up @@ -99,10 +98,9 @@ private Workflow getDummyWorkflow() {
"workflow {\n" +
" splitLetters | flatten | convertToUpper | view { it.trim() } | sleep\n" +
"}";
return new Workflow()
return new WorkflowCreateParams()
.setId("workflow")
.setCommandLine("run pipeline.nf")
.setType(Workflow.Type.NEXTFLOW)
.setScripts(Collections.singletonList(new Workflow.Script("pipeline.nf", scriptContent)));
}
}
2 changes: 1 addition & 1 deletion opencga-app/app/cloud/docker/opencga-base/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Based on Ubuntu 22.04 (jammy)
## We are now using OpenJDK 8u372 to support "cgroup v2", see https://developers.redhat.com/articles/2023/04/19/openjdk-8u372-feature-cgroup-v2-support#
FROM eclipse-temurin:11.0.23_9-jre-jammy
FROM eclipse-temurin:11.0.24_8-jre-noble

ARG BUILD_PATH="."

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.opencb.opencga.catalog.exceptions.CatalogAuthorizationException;
import org.opencb.opencga.catalog.exceptions.CatalogDBException;
import org.opencb.opencga.catalog.exceptions.CatalogParameterException;
import org.opencb.opencga.core.models.nextflow.Workflow;
import org.opencb.opencga.core.models.workflow.Workflow;
import org.opencb.opencga.core.response.OpenCGAResult;

import java.util.Map;
Expand All @@ -22,6 +22,7 @@ enum QueryParams implements QueryParam {
ID("id", TEXT, ""),
UID("uid", LONG, ""),
UUID("uuid", TEXT, ""),
DESCRIPTION("description", TEXT, ""),
TYPE("type", TEXT, ""),
COMMAND_LINE("commandLine", TEXT, ""),
SCRIPTS("scripts", OBJECT, ""),
Expand All @@ -30,6 +31,7 @@ enum QueryParams implements QueryParam {
VERSION("version", INTEGER, ""), // Version of the sample
CREATION_DATE("creationDate", DATE, ""),
MODIFICATION_DATE("modificationDate", DATE, ""),
ATTRIBUTES("attributes", OBJECT, ""),
STUDY_UID("studyUid", INTEGER_ARRAY, ""),
STUDY("study", INTEGER_ARRAY, ""); // Alias to studyId in the database. Only for the webservices.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,10 @@ protected <E> OpenCGAResult<E> update(ClientSession session, Bson sourceQuery, L
}
}

if (entryList.isEmpty()) {
throw new CatalogDBException("No documents could be found to be updated");
}

// 2. Execute main update
OpenCGAResult<E> executionResult = update.execute(entryList);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
import org.apache.commons.lang3.time.StopWatch;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.opencb.commons.datastore.core.DataResult;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.commons.datastore.core.*;
import org.opencb.commons.datastore.mongodb.MongoDBCollection;
import org.opencb.commons.datastore.mongodb.MongoDBIterator;
import org.opencb.opencga.catalog.db.api.DBIterator;
Expand All @@ -20,17 +17,20 @@
import org.opencb.opencga.catalog.exceptions.CatalogDBException;
import org.opencb.opencga.catalog.exceptions.CatalogParameterException;
import org.opencb.opencga.catalog.utils.Constants;
import org.opencb.opencga.catalog.utils.ParamUtils;
import org.opencb.opencga.core.common.TimeUtils;
import org.opencb.opencga.core.config.Configuration;
import org.opencb.opencga.core.models.nextflow.Workflow;
import org.opencb.opencga.core.models.workflow.Workflow;
import org.opencb.opencga.core.response.OpenCGAResult;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.opencb.opencga.catalog.db.mongodb.MongoDBUtils.fixAclProjection;
import static org.opencb.opencga.catalog.db.api.ClinicalAnalysisDBAdaptor.QueryParams.MODIFICATION_DATE;
import static org.opencb.opencga.catalog.db.mongodb.MongoDBUtils.*;

public class WorkflowMongoDBAdaptor extends CatalogMongoDBAdaptor implements WorkflowDBAdaptor {

Expand Down Expand Up @@ -231,15 +231,121 @@ public OpenCGAResult<Workflow> stats(Query query) {
}

@Override
public OpenCGAResult<Workflow> update(long id, ObjectMap parameters, QueryOptions queryOptions)
public OpenCGAResult<Workflow> update(long uid, ObjectMap parameters, QueryOptions queryOptions)
throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException {
return null;
Query query = new Query(QueryParams.UID.key(), uid);
return update(query, parameters, queryOptions);
}

@Override
public OpenCGAResult<Workflow> update(Query query, ObjectMap parameters, QueryOptions queryOptions)
throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException {
return null;
try {
return runTransaction(clientSession -> privateUpdate(clientSession, query, parameters, queryOptions));
} catch (CatalogDBException e) {
logger.error("Could not update workflows for query {}", query.toJson(), e);
throw new CatalogDBException("Could not update workflows based on query " + query.toJson(), e);
}
}

OpenCGAResult<Workflow> privateUpdate(ClientSession clientSession, Query query, ObjectMap parameters, QueryOptions queryOptions)
throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException {
long tmpStartTime = startQuery();
Bson bsonQuery = parseQuery(query);
return versionedMongoDBAdaptor.update(clientSession, bsonQuery, (entrylist) -> {
String workflowIds = entrylist.stream().map(w -> w.getString(QueryParams.ID.key())).collect(Collectors.joining(", "));

UpdateDocument updateParams = parseAndValidateUpdateParams(parameters, queryOptions);
Document workflowUpdate = updateParams.toFinalUpdateDocument();

if (workflowUpdate.isEmpty()) {
if (!parameters.isEmpty()) {
logger.error("Non-processed update parameters: {}", parameters.keySet());
}
throw new CatalogDBException("Nothing to be updated");
}

List<Event> events = new ArrayList<>();

logger.debug("Workflow update: query : {}, update: {}", bsonQuery.toBsonDocument(), workflowUpdate.toBsonDocument());
DataResult<?> result = workflowCollection.update(clientSession, bsonQuery, workflowUpdate,
new QueryOptions(MongoDBCollection.MULTI, true));

if (result.getNumMatches() == 0) {
throw new CatalogDBException("Workflow(s) '" + workflowIds + "' not found");
}
if (result.getNumUpdated() == 0) {
events.add(new Event(Event.Type.WARNING, workflowIds, "Workflow(s) already updated"));
}
logger.debug("Workflow(s) '{}' successfully updated", workflowIds);


return endWrite(tmpStartTime, result.getNumMatches(), result.getNumUpdated(), events);
}, null, null);
}

private UpdateDocument parseAndValidateUpdateParams(ObjectMap parameters, QueryOptions queryOptions) throws CatalogDBException {
if (parameters.containsKey(QueryParams.ID.key())) {
throw new CatalogDBException("It is not allowed to update the 'id' of a workflow");
}

UpdateDocument document = new UpdateDocument();

if (StringUtils.isNotEmpty(parameters.getString(QueryParams.CREATION_DATE.key()))) {
String time = parameters.getString(QueryParams.CREATION_DATE.key());
Date date = TimeUtils.toDate(time);
document.getSet().put(QueryParams.CREATION_DATE.key(), time);
document.getSet().put(PRIVATE_CREATION_DATE, date);
}
if (StringUtils.isNotEmpty(parameters.getString(MODIFICATION_DATE.key()))) {
String time = parameters.getString(QueryParams.MODIFICATION_DATE.key());
Date date = TimeUtils.toDate(time);
document.getSet().put(QueryParams.MODIFICATION_DATE.key(), time);
document.getSet().put(PRIVATE_MODIFICATION_DATE, date);
}

final String[] acceptedParams = {QueryParams.DESCRIPTION.key(), QueryParams.COMMAND_LINE.key()};
filterStringParams(parameters, document.getSet(), acceptedParams);

final String[] acceptedMapParams = {QueryParams.ATTRIBUTES.key()};
filterMapParams(parameters, document.getSet(), acceptedMapParams);

// Check if the tags exist.
if (parameters.containsKey(QueryParams.SCRIPTS.key())) {
List<Workflow.Script> scriptList = parameters.getAsList(QueryParams.SCRIPTS.key(), Workflow.Script.class);

if (!scriptList.isEmpty()) {
Map<String, Object> actionMap = queryOptions.getMap(Constants.ACTIONS, new HashMap<>());
ParamUtils.BasicUpdateAction operation =
ParamUtils.BasicUpdateAction.from(actionMap, QueryParams.SCRIPTS.key(), ParamUtils.BasicUpdateAction.ADD);
switch (operation) {
case SET:
document.getSet().put(QueryParams.SCRIPTS.key(), scriptList);
break;
case REMOVE:
document.getPullAll().put(QueryParams.SCRIPTS.key(), scriptList);
break;
case ADD:
document.getAddToSet().put(QueryParams.SCRIPTS.key(), scriptList);
break;
default:
throw new IllegalArgumentException("Unknown update action " + operation);
}
}
}

if (!document.toFinalUpdateDocument().isEmpty()) {
String time = TimeUtils.getTime();
if (StringUtils.isEmpty(parameters.getString(QueryParams.MODIFICATION_DATE.key()))) {
// Update modificationDate param
Date date = TimeUtils.toDate(time);
document.getSet().put(QueryParams.MODIFICATION_DATE.key(), time);
document.getSet().put(PRIVATE_MODIFICATION_DATE, date);
}
document.getSet().put(INTERNAL_LAST_MODIFIED, time);
}

return document;
}

@Override
Expand Down Expand Up @@ -355,6 +461,7 @@ private Bson parseQuery(Query query, String user) throws CatalogDBException, Cat
case UUID:
case RELEASE:
case VERSION:
case TYPE:
addAutoOrQuery(queryParam.key(), queryParam.key(), queryCopy, queryParam.type(), andBsonList);
break;
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.opencb.opencga.catalog.db.mongodb.converters;

import org.opencb.opencga.core.models.nextflow.Workflow;
import org.opencb.opencga.core.models.workflow.Workflow;

public class WorkflowConverter extends OpenCgaMongoConverter<Workflow> {

Expand Down
Loading

0 comments on commit d776092

Please sign in to comment.