Skip to content

Commit

Permalink
catalog: improve event code, #TASK-4178
Browse files Browse the repository at this point in the history
  • Loading branch information
pfurio committed Oct 10, 2024
1 parent 9b82bcb commit d14982f
Show file tree
Hide file tree
Showing 14 changed files with 707 additions and 229 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package org.opencb.opencga.catalog.db.api;

import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.commons.datastore.core.QueryParam;
import org.opencb.opencga.catalog.exceptions.CatalogAuthorizationException;
import org.opencb.opencga.catalog.exceptions.CatalogDBException;
import org.opencb.opencga.catalog.exceptions.CatalogParameterException;
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.event.CatalogEvent;
import org.opencb.opencga.core.response.OpenCGAResult;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -110,11 +113,16 @@ public static SubscribersQueryParams getParam(String key) {
}
}

OpenCGAResult<CatalogEvent> get(Query query, QueryOptions options)
throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException;

void insert(CatalogEvent event) throws CatalogParameterException, CatalogDBException, CatalogAuthorizationException;

void updateSubscriber(CatalogEvent event, Enums.Resource resource, boolean successful)
throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException;

void finishEvent(CatalogEvent opencgaEvent) throws CatalogParameterException, CatalogDBException, CatalogAuthorizationException;

void archiveEvent(CatalogEvent opencgaEvent) throws CatalogParameterException, CatalogDBException, CatalogAuthorizationException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.mongodb.client.ClientSession;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.bson.Document;
import org.bson.conversions.Bson;
Expand All @@ -19,7 +20,6 @@
import org.opencb.opencga.catalog.exceptions.CatalogParameterException;
import org.opencb.opencga.core.common.TimeUtils;
import org.opencb.opencga.core.config.Configuration;
import org.opencb.opencga.core.events.IEventHandler;
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.event.CatalogEvent;
import org.opencb.opencga.core.models.event.EventSubscriber;
Expand Down Expand Up @@ -47,6 +47,12 @@ public EventMongoDBAdaptor(MongoDBCollection eventCollection, MongoDBCollection
this.eventConverter = new OpenCgaMongoConverter<>(CatalogEvent.class);
}

@Override
public OpenCGAResult<CatalogEvent> get(Query query, QueryOptions options)
throws CatalogDBException, CatalogParameterException, CatalogAuthorizationException {
throw new NotImplementedException("Yet to implement");
}

@Override
public void insert(CatalogEvent event) throws CatalogParameterException, CatalogDBException, CatalogAuthorizationException {
runTransaction(session -> {
Expand Down Expand Up @@ -89,29 +95,40 @@ public void finishEvent(CatalogEvent opencgaEvent) throws CatalogParameterExcept
CatalogEvent catalogEvent = eventConverter.convertToDataModelType(eventDoc);

// Check all subscribers performed successfully their action
boolean archive = true;
boolean isSuccessful = true;
for (EventSubscriber subscriber : catalogEvent.getSubscribers()) {
if (!subscriber.isSuccessful()) {
isSuccessful = false;
if (subscriber.getNumAttempts() < IEventHandler.MAX_NUM_ATTEMPTS) {
archive = false;
}
// One of the subscribers failed, so it won't be archived
return null;
}
}

if (archive) {
// Move to different collection
eventDoc.put(QueryParams.SUCCESSFUL.key(), isSuccessful);
archiveCollection.insert(session, eventDoc, QueryOptions.empty());
Bson bsonQuery = parseQuery(query);
eventCollection.remove(session, bsonQuery, QueryOptions.empty());
}
eventDoc.put(QueryParams.SUCCESSFUL.key(), true);
archiveEvent(session, eventDoc);
return null;
});
}

@Override
public void archiveEvent(CatalogEvent opencgaEvent)
throws CatalogParameterException, CatalogDBException, CatalogAuthorizationException {
runTransaction(session -> {
Query query = new Query(QueryParams.UID.key(), opencgaEvent.getUid());
Document eventDoc = nativeGet(session, query, QueryOptions.empty()).first();
archiveEvent(session, eventDoc);
return null;
});
}

private void archiveEvent(ClientSession session, Document eventDoc) throws CatalogDBException {
// Insert in archive collection
archiveCollection.insert(session, eventDoc, QueryOptions.empty());

// Build query to remove from main collection
Query query = new Query(QueryParams.UID.key(), eventDoc.get(QueryParams.UID.key(), Number.class).longValue());
Bson bsonQuery = parseQuery(query);
eventCollection.remove(session, bsonQuery, QueryOptions.empty());
}

OpenCGAResult<CatalogEvent> get(ClientSession clientSession, Query query, QueryOptions options) throws CatalogDBException {
long startTime = startQuery();
try (DBIterator<CatalogEvent> dbIterator = iterator(clientSession, query, options)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,32 @@
package org.opencb.opencga.catalog.managers;

import org.apache.commons.collections4.CollectionUtils;
import org.opencb.commons.datastore.core.ObjectMap;
import org.opencb.commons.datastore.core.Query;
import org.opencb.commons.datastore.core.QueryOptions;
import org.opencb.commons.datastore.core.QueryParam;
import org.opencb.opencga.catalog.auth.authorization.AuthorizationManager;
import org.opencb.opencga.catalog.db.DBAdaptorFactory;
import org.opencb.opencga.catalog.db.api.*;
import org.opencb.opencga.catalog.exceptions.CatalogAuthorizationException;
import org.opencb.opencga.catalog.exceptions.CatalogDBException;
import org.opencb.opencga.catalog.exceptions.CatalogException;
import org.opencb.opencga.catalog.exceptions.CatalogParameterException;
import org.opencb.opencga.catalog.exceptions.*;
import org.opencb.opencga.catalog.models.InternalGetDataResult;
import org.opencb.opencga.catalog.utils.CatalogFqn;
import org.opencb.opencga.core.api.ParamConstants;
import org.opencb.opencga.core.config.Configuration;
import org.opencb.opencga.core.models.IPrivateStudyUid;
import org.opencb.opencga.core.models.JwtPayload;
import org.opencb.opencga.core.models.common.EntryParam;
import org.opencb.opencga.core.models.common.Enums;
import org.opencb.opencga.core.models.event.CatalogEvent;
import org.opencb.opencga.core.models.study.Group;
import org.opencb.opencga.core.models.study.Study;
import org.opencb.opencga.core.response.OpenCGAResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Created by hpccoll1 on 12/05/15.
Expand Down Expand Up @@ -329,35 +334,182 @@ protected void checkMember(String organization, long studyId, String member)
}
}

protected static Class<?> getTypeClass(QueryParam.Type type) {
switch (type) {
case STRING:
case TEXT:
case TEXT_ARRAY:
return String.class;
case INTEGER_ARRAY:
case INTEGER:
return Integer.class;
case DOUBLE:
case DECIMAL:
case DECIMAL_ARRAY:
return Double.class;
case LONG:
case LONG_ARRAY:
return Long.class;
case BOOLEAN:
case BOOLEAN_ARRAY:
return Boolean.class;
case MAP:
return Map.class;
case OBJECT:
return Object.class;
case DATE:
case TIMESTAMP:
return Date.class;
default:
throw new IllegalArgumentException("Unknown or unrecognised type '" + type + "'");
}
public interface ExecuteOperation<T> {
OpenCGAResult<T> execute(String organizationId, Study study, String userId, QueryOptions queryOptions, JwtPayload tokenPayload)
throws CatalogException;
}
//
// public interface ExecuteBatchOperation<T> {
// T execute(Study study, String userId, QueryOptions queryOptions, String auditOperationUuid) throws CatalogException, IOException;
// }
//
// protected <T> OpenCGAResult<T> run(ObjectMap params, Enums.Action action, Enums.Resource resource, String studyStr, String token,
// QueryOptions options, ExecuteOperation<T> body)
// throws CatalogException {
// return run(params, action, resource, studyStr, token, options, QueryOptions.empty(), body);
// }
//
// protected <T> OpenCGAResult<T> run(ObjectMap params, Enums.Action action, Enums.Resource resource, String studyStr, String token,
// QueryOptions options, QueryOptions studyIncludeList, ExecuteOperation<T> body)
// throws CatalogException {
// return run(params, resource, action, studyStr, null, token, options, studyIncludeList, body);
// }

protected <T> OpenCGAResult<T> run(ObjectMap params, Enums.Resource resource, Enums.Action action, String studyStr,
@Nullable EntryParam entryParam, String token, QueryOptions options, QueryOptions studyIncludeList,
ExecuteOperation<T> body) throws CatalogException {
JwtPayload tokenPayload = catalogManager.getUserManager().validateToken(token);
CatalogFqn studyFqn = CatalogFqn.extractFqnFromStudy(studyStr, tokenPayload);
String organizationId = studyFqn.getOrganizationId();
String userId = tokenPayload.getUserId(organizationId);

String eventId = resource.name().toLowerCase() + "." + action.name().toLowerCase();
Supplier<Study> studySupplier = () -> {
try {
return catalogManager.getStudyManager().resolveId(studyFqn, studyIncludeList, tokenPayload);
} catch (CatalogException e) {
throw new CatalogRuntimeException(e);
}
};
CatalogEvent notify = EventManager.getInstance().notify(eventId, organizationId, studySupplier, entryParam, userId, options, params,
tokenPayload, body);
OpenCGAResult<T> result = (OpenCGAResult<T>) notify.getEvent().getResult();
return result;
}

// protected <T, S extends ObjectMap> T run(ObjectMap params, Enums.Action action, Enums.Resource resource, String operationUuid,
// Study study, String userId, S options, ExecuteOperation<T> body) throws CatalogException {
// StopWatch totalStopWatch = StopWatch.createStarted();
// Exception exception = null;
// ReferenceParam referenceParam = new ReferenceParam();
// try {
// QueryOptions queryOptions = options != null ? new QueryOptions(options) : new QueryOptions();
// return body.execute(study, userId, referenceParam, queryOptions);
// } catch (Exception e) {
// exception = e;
// throw e;
// } finally {
// try {
// String operationId = UuidUtils.generateOpenCgaUuid(UuidUtils.Entity.AUDIT);
// AuditRecord auditRecord = new AuditRecord(operationId, operationUuid, userId, GitRepositoryState.get().getBuildVersion(),
// action, resource, referenceParam.getId(), referenceParam.getUuid(), study.getId(), study.getUuid(), params,
// new AuditRecord.Status(AuditRecord.Status.Result.SUCCESS), TimeUtils.getDate(),
// new ObjectMap("totalTimeMillis", totalStopWatch.getTime(TimeUnit.MILLISECONDS)));
// if (exception != null) {
// auditRecord.setStatus(new AuditRecord.Status(AuditRecord.Status.Result.ERROR, new Error(1, exception.getMessage(),
// "")));
// auditRecord.getAttributes()
// .append("errorType", exception.getClass())
// .append("errorMessage", exception.getMessage());
// }
// auditManager.audit(auditRecord);
// } catch (Exception e2) {
// if (exception != null) {
// exception.addSuppressed(e2);
// } else {
// throw e2;
// }
// }
// }
// }
//
// protected <T, S extends ObjectMap> T run(ObjectMap params, Enums.Action action, Enums.Resource resource, String operationUuid,
// Study study, String userId, S options, ExecuteOperation<T> body) throws CatalogException {
// StopWatch totalStopWatch = StopWatch.createStarted();
// Exception exception = null;
// ReferenceParam referenceParam = new ReferenceParam();
//
// QueryOptions queryOptions = options != null ? new QueryOptions(options) : new QueryOptions();
// Supplier<T> supplier = () -> {
// try {
// return body.execute(study, userId, referenceParam, queryOptions);
// } catch (CatalogException e) {
// throw new CatalogRuntimeException(e);
// }
// };
//
//
//
// return body.execute(study, userId, referenceParam, queryOptions);
//
// try {
// QueryOptions queryOptions = options != null ? new QueryOptions(options) : new QueryOptions();
// return body.execute(study, userId, referenceParam, queryOptions);
// } catch (Exception e) {
// exception = e;
// throw e;
// } finally {
// try {
// String operationId = UuidUtils.generateOpenCgaUuid(UuidUtils.Entity.AUDIT);
// AuditRecord auditRecord = new AuditRecord(operationId, operationUuid, userId, GitRepositoryState.get().getBuildVersion(),
// action, resource, referenceParam.getId(), referenceParam.getUuid(), study.getId(), study.getUuid(), params,
// new AuditRecord.Status(AuditRecord.Status.Result.SUCCESS), TimeUtils.getDate(),
// new ObjectMap("totalTimeMillis", totalStopWatch.getTime(TimeUnit.MILLISECONDS)));
// if (exception != null) {
// auditRecord.setStatus(new AuditRecord.Status(AuditRecord.Status.Result.ERROR, new Error(1, exception.getMessage(),
// "")));
// auditRecord.getAttributes()
// .append("errorType", exception.getClass())
// .append("errorMessage", exception.getMessage());
// }
// auditManager.audit(auditRecord);
// } catch (Exception e2) {
// if (exception != null) {
// exception.addSuppressed(e2);
// } else {
// throw e2;
// }
// }
// }
// }
//
//
// protected <T> T runBatch(ObjectMap params, Enums.Action action, Enums.Resource resource, String studyStr, String token,
// QueryOptions options, ExecuteBatchOperation<T> body) throws CatalogException {
// StopWatch totalStopWatch = StopWatch.createStarted();
// String userId = catalogManager.getUserManager().getUserId(token);
// Study study = catalogManager.getStudyManager().resolveId(studyStr, userId, StudyManager.INCLUDE_BASE);
// String operationUuid = UuidUtils.generateOpenCgaUuid(UuidUtils.Entity.AUDIT);
// auditManager.initAuditBatch(operationUuid);
// Exception exception = null;
// try {
// QueryOptions queryOptions = options != null ? new QueryOptions(options) : new QueryOptions();
// return body.execute(study, userId, queryOptions, operationUuid);
// } catch (IOException e) {
// exception = new CatalogException(e);
// ObjectMap auditAttributes = new ObjectMap()
// .append("totalTimeMillis", totalStopWatch.getTime(TimeUnit.MILLISECONDS))
// .append("errorType", e.getClass())
// .append("errorMessage", e.getMessage());
// AuditRecord.Status status = new AuditRecord.Status(AuditRecord.Status.Result.ERROR, new Error(0, "", e.getMessage()));
// AuditRecord auditRecord = new AuditRecord(UuidUtils.generateOpenCgaUuid(UuidUtils.Entity.AUDIT), operationUuid, userId,
// GitRepositoryState.get().getBuildVersion(), action, resource, "", "", study.getId(), study.getUuid(), params,
// status, TimeUtils.getDate(), auditAttributes);
// auditManager.audit(auditRecord);
// throw (CatalogException) exception;
// } catch (Exception e) {
// exception = e;
// ObjectMap auditAttributes = new ObjectMap()
// .append("totalTimeMillis", totalStopWatch.getTime(TimeUnit.MILLISECONDS))
// .append("errorType", e.getClass())
// .append("errorMessage", e.getMessage());
// AuditRecord.Status status = new AuditRecord.Status(AuditRecord.Status.Result.ERROR, new Error(0, "", e.getMessage()));
// AuditRecord auditRecord = new AuditRecord(UuidUtils.generateOpenCgaUuid(UuidUtils.Entity.AUDIT), operationUuid, userId,
// GitRepositoryState.get().getBuildVersion(), action, resource, "", "", study.getId(), study.getUuid(), params,
// status, TimeUtils.getDate(), auditAttributes);
// auditManager.audit(auditRecord);
// throw e;
// } finally {
// try {
// auditManager.finishAuditBatch(operationUuid);
// } catch (Exception e2) {
// if (exception != null) {
// exception.addSuppressed(e2);
// } else {
// throw e2;
// }
// }
// }
// }

}
Loading

0 comments on commit d14982f

Please sign in to comment.