From cb9f9219c026f75bfce78db97b0526ecc6828267 Mon Sep 17 00:00:00 2001 From: nmirasch Date: Thu, 14 Sep 2023 11:26:40 +0200 Subject: [PATCH] KOGITO-9522: add process instance operations to Data index GatewayAPI (#1844) --- .../graphql/AbstractGraphQLSchemaManager.java | 183 ++++++++++++++++ .../src/main/resources/basic.schema.graphqls | 21 ++ .../graphql/GraphQLSchemaManagerImpl.java | 147 ------------- .../src/main/resources/domain.schema.graphqls | 22 -- .../api/KogitoAddonRuntimeClientImpl.java | 100 ++++++++- .../api/KogitoAddonRuntimeClientImplTest.java | 207 ++++++++++++++++++ .../GraphQLAddonSchemaManagerImpl.java | 12 + 7 files changed, 512 insertions(+), 180 deletions(-) create mode 100644 data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/test/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImplTest.java diff --git a/data-index/data-index-graphql/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java b/data-index/data-index-graphql/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java index 0ed221d5de..d41fbae9d9 100644 --- a/data-index/data-index-graphql/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java +++ b/data-index/data-index-graphql/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java @@ -36,6 +36,7 @@ import org.kie.kogito.index.model.ProcessDefinition; import org.kie.kogito.index.model.ProcessInstance; import org.kie.kogito.index.model.UserTaskInstance; +import org.kie.kogito.index.service.DataIndexServiceException; import org.kie.kogito.index.storage.DataIndexStorageService; import org.kie.kogito.persistence.api.Storage; import org.kie.kogito.persistence.api.query.Query; @@ -50,11 +51,21 @@ import graphql.schema.idl.SchemaParser; import graphql.schema.idl.TypeDefinitionRegistry; +import static java.lang.String.format; import static java.util.Collections.singletonList; import static org.kie.kogito.persistence.api.query.QueryFilterFactory.equalTo; public abstract class AbstractGraphQLSchemaManager implements GraphQLSchemaManager { + private static final String ID = "id"; + private static final String USER = "user"; + private static final String GROUPS = "groups"; + private static final String TASK_ID = "taskId"; + private static final String COMMENT_ID = "commentId"; + private static final String ATTACHMENT_ID = "attachmentId"; + + private static final String UNABLE_TO_FIND_ERROR_MSG = "Unable to find the instance with %s %s"; + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGraphQLSchemaManager.class); @Inject @@ -262,4 +273,176 @@ public void transform(Consumer builder) { schema = schema.transform(builder); } + public CompletableFuture abortProcessInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().abortProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture retryProcessInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().retryProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture skipProcessInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().skipProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture updateProcessInstanceVariables(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().updateProcessInstanceVariables(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance, + env.getArgument("variables")); + + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture triggerNodeInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().triggerNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), + processInstance, + env.getArgument("nodeId")); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture retriggerNodeInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().retriggerNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), + processInstance, + env.getArgument("nodeInstanceId")); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture cancelNodeInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().cancelNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), + processInstance, + env.getArgument("nodeInstanceId")); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture cancelJob(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + Job job = getCacheService().getJobsCache().get(id); + if (job != null) { + return getDataIndexApiExecutor().cancelJob(job.getEndpoint(), job); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture rescheduleJob(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + Job job = getCacheService().getJobsCache().get(id); + if (job != null) { + return getDataIndexApiExecutor().rescheduleJob(job.getEndpoint(), job, env.getArgument("data")); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + protected CompletableFuture getUserTaskInstanceSchema(DataFetchingEnvironment env) { + UserTaskInstance userTaskInstance = env.getSource(); + return getDataIndexApiExecutor().getUserTaskSchema(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS)); + } + + protected CompletableFuture updateUserTaskInstance(DataFetchingEnvironment env) { + UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); + return getDataIndexApiExecutor().updateUserTaskInstance(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArguments()); + } + + protected CompletableFuture createTaskInstanceComment(DataFetchingEnvironment env) { + UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); + return getDataIndexApiExecutor().createUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument("comment")); + } + + protected CompletableFuture createTaskInstanceAttachment(DataFetchingEnvironment env) { + UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); + return getDataIndexApiExecutor().createUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument("name"), + env.getArgument("uri")); + } + + protected CompletableFuture updateUserTaskComment(DataFetchingEnvironment env) { + Query query = getCacheService().getUserTaskInstancesCache().query(); + query.filter(singletonList(equalTo("comments.id", env.getArgument(COMMENT_ID)))); + UserTaskInstance userTaskInstance = query.execute().get(0); + return getDataIndexApiExecutor().updateUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument(COMMENT_ID), + env.getArgument("comment")); + } + + protected CompletableFuture deleteUserTaskComment(DataFetchingEnvironment env) { + Query query = getCacheService().getUserTaskInstancesCache().query(); + query.filter(singletonList(equalTo("comments.id", env.getArgument(COMMENT_ID)))); + UserTaskInstance userTaskInstance = query.execute().get(0); + return getDataIndexApiExecutor().deleteUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument(COMMENT_ID)); + } + + protected CompletableFuture updateUserTaskAttachment(DataFetchingEnvironment env) { + Query query = getCacheService().getUserTaskInstancesCache().query(); + query.filter(singletonList(equalTo("attachments.id", env.getArgument(ATTACHMENT_ID)))); + UserTaskInstance userTaskInstance = query.execute().get(0); + return getDataIndexApiExecutor().updateUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument(ATTACHMENT_ID), + env.getArgument("name"), + env.getArgument("uri")); + } + + protected CompletableFuture deleteUserTaskAttachment(DataFetchingEnvironment env) { + Query query = getCacheService().getUserTaskInstancesCache().query(); + query.filter(singletonList(equalTo("attachments.id", env.getArgument(ATTACHMENT_ID)))); + UserTaskInstance userTaskInstance = query.execute().get(0); + return getDataIndexApiExecutor().deleteUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument(ATTACHMENT_ID)); + } + } diff --git a/data-index/data-index-graphql/src/main/resources/basic.schema.graphqls b/data-index/data-index-graphql/src/main/resources/basic.schema.graphqls index 566a439fc7..74d90e2545 100644 --- a/data-index/data-index-graphql/src/main/resources/basic.schema.graphqls +++ b/data-index/data-index-graphql/src/main/resources/basic.schema.graphqls @@ -5,6 +5,7 @@ scalar JSON schema { query: Query + mutation: Mutation } type Query { @@ -13,6 +14,26 @@ type Query { UserTaskInstances(where: UserTaskInstanceArgument, orderBy: UserTaskInstanceOrderBy, pagination: Pagination): [UserTaskInstance] Jobs(where: JobArgument, orderBy: JobOrderBy, pagination: Pagination): [Job] } +type Mutation { + ProcessInstanceAbort(id: String): String + ProcessInstanceRetry(id: String): String + ProcessInstanceSkip(id: String): String + ProcessInstanceUpdateVariables(id: String, variables: String): String + NodeInstanceTrigger(id: String, nodeId: String): String + NodeInstanceRetrigger(id: String, nodeInstanceId: String): String + NodeInstanceCancel(id: String, nodeInstanceId: String): String + JobCancel(id: String): String + JobReschedule(id: String, data: String): String + UserTaskInstanceUpdate(taskId: String, user: String, groups: [String], description: String, priority: String, + actualOwner: String, adminGroups: [String!], adminUsers: [String!], excludedUsers: [String!], + potentialGroups: [String!], potentialUsers: [String!], inputParams: String): String + UserTaskInstanceCommentCreate(taskId: String, user: String, groups: [String], comment: String): String + UserTaskInstanceAttachmentCreate(taskId: String, user: String, groups: [String], name: String, uri: String): String + UserTaskInstanceCommentUpdate(user:String, groups:[String],commentId: String, comment: String): String + UserTaskInstanceCommentDelete(user:String, groups:[String],commentId: String): String + UserTaskInstanceAttachmentUpdate(user:String, groups:[String],attachmentId: String, name: String, uri: String): String + UserTaskInstanceAttachmentDelete(user:String, groups:[String], attachmentId: String): String +} type ProcessDefinition { id: String! diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java index fac7f06214..d0e389e084 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java @@ -19,7 +19,6 @@ import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import javax.annotation.PostConstruct; @@ -28,13 +27,9 @@ import org.kie.kogito.index.graphql.AbstractGraphQLSchemaManager; import org.kie.kogito.index.graphql.query.GraphQLQueryParserRegistry; import org.kie.kogito.index.json.DataIndexParsingException; -import org.kie.kogito.index.model.Job; -import org.kie.kogito.index.model.ProcessInstance; import org.kie.kogito.index.model.ProcessInstanceState; -import org.kie.kogito.index.model.UserTaskInstance; import org.kie.kogito.index.service.DataIndexServiceException; import org.kie.kogito.persistence.api.Storage; -import org.kie.kogito.persistence.api.query.Query; import org.reactivestreams.Publisher; import com.fasterxml.jackson.databind.JsonNode; @@ -50,25 +45,17 @@ import graphql.schema.idl.TypeDefinitionRegistry; import static java.lang.String.format; -import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; import static org.kie.kogito.index.json.JsonUtils.getObjectMapper; -import static org.kie.kogito.persistence.api.query.QueryFilterFactory.equalTo; @ApplicationScoped public class GraphQLSchemaManagerImpl extends AbstractGraphQLSchemaManager { - private static final String PROCESS_INSTANCE_ADDED = "ProcessInstanceAdded"; private static final String PROCESS_INSTANCE_UPDATED = "ProcessInstanceUpdated"; private static final String USER_TASK_INSTANCE_ADDED = "UserTaskInstanceAdded"; private static final String USER_TASK_INSTANCE_UPDATED = "UserTaskInstanceUpdated"; private static final String JOB_UPDATED = "JobUpdated"; private static final String JOB_ADDED = "JobAdded"; - private static final String USER = "user"; - private static final String GROUPS = "groups"; - private static final String TASK_ID = "taskId"; - private static final String COMMENT_ID = "commentId"; - private static final String ATTACHMENT_ID = "attachmentId"; @Override @PostConstruct @@ -159,57 +146,6 @@ public GraphQLSchema createSchema() { return schemaGenerator.makeExecutableSchema(typeDefinitionRegistry, runtimeWiring); } - public CompletableFuture abortProcessInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().abortProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); - } - - public CompletableFuture retryProcessInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().retryProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); - } - - public CompletableFuture skipProcessInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().skipProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); - } - - public CompletableFuture updateProcessInstanceVariables(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().updateProcessInstanceVariables(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance, env.getArgument("variables")); - } - - public CompletableFuture triggerNodeInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().triggerNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), - processInstance, - env.getArgument("nodeId")); - } - - public CompletableFuture retriggerNodeInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().retriggerNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), - processInstance, - env.getArgument("nodeInstanceId")); - } - - public CompletableFuture cancelNodeInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().cancelNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), - processInstance, - env.getArgument("nodeInstanceId")); - } - - public CompletableFuture cancelJob(DataFetchingEnvironment env) { - Job job = getCacheService().getJobsCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().cancelJob(job.getEndpoint(), job); - } - - public CompletableFuture rescheduleJob(DataFetchingEnvironment env) { - Job job = getCacheService().getJobsCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().rescheduleJob(job.getEndpoint(), job, env.getArgument("data")); - } - protected String getProcessInstanceJsonServiceUrl(DataFetchingEnvironment env) { Object source = env.getSource(); if (source instanceof JsonNode) { @@ -220,89 +156,6 @@ protected String getProcessInstanceJsonServiceUrl(DataFetchingEnvironment env) { return null; } - private CompletableFuture getUserTaskInstanceSchema(DataFetchingEnvironment env) { - UserTaskInstance userTaskInstance = env.getSource(); - return getDataIndexApiExecutor().getUserTaskSchema(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS)); - } - - private CompletableFuture updateUserTaskInstance(DataFetchingEnvironment env) { - UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); - return getDataIndexApiExecutor().updateUserTaskInstance(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArguments()); - } - - private CompletableFuture createTaskInstanceComment(DataFetchingEnvironment env) { - UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); - return getDataIndexApiExecutor().createUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument("comment")); - } - - private CompletableFuture createTaskInstanceAttachment(DataFetchingEnvironment env) { - UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); - return getDataIndexApiExecutor().createUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument("name"), - env.getArgument("uri")); - } - - private CompletableFuture updateUserTaskComment(DataFetchingEnvironment env) { - Query query = getCacheService().getUserTaskInstancesCache().query(); - query.filter(singletonList(equalTo("comments.id", env.getArgument(COMMENT_ID)))); - UserTaskInstance userTaskInstance = query.execute().get(0); - return getDataIndexApiExecutor().updateUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument(COMMENT_ID), - env.getArgument("comment")); - } - - private CompletableFuture deleteUserTaskComment(DataFetchingEnvironment env) { - Query query = getCacheService().getUserTaskInstancesCache().query(); - query.filter(singletonList(equalTo("comments.id", env.getArgument(COMMENT_ID)))); - UserTaskInstance userTaskInstance = query.execute().get(0); - return getDataIndexApiExecutor().deleteUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument(COMMENT_ID)); - } - - private CompletableFuture updateUserTaskAttachment(DataFetchingEnvironment env) { - Query query = getCacheService().getUserTaskInstancesCache().query(); - query.filter(singletonList(equalTo("attachments.id", env.getArgument(ATTACHMENT_ID)))); - UserTaskInstance userTaskInstance = query.execute().get(0); - return getDataIndexApiExecutor().updateUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument(ATTACHMENT_ID), - env.getArgument("name"), - env.getArgument("uri")); - } - - private CompletableFuture deleteUserTaskAttachment(DataFetchingEnvironment env) { - Query query = getCacheService().getUserTaskInstancesCache().query(); - query.filter(singletonList(equalTo("attachments.id", env.getArgument(ATTACHMENT_ID)))); - UserTaskInstance userTaskInstance = query.execute().get(0); - return getDataIndexApiExecutor().deleteUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument(ATTACHMENT_ID)); - } - private DataFetcher> getProcessInstanceAddedDataFetcher() { return objectCreatedPublisher(() -> getCacheService().getProcessInstancesCache()); } diff --git a/data-index/data-index-service/data-index-service-common/src/main/resources/domain.schema.graphqls b/data-index/data-index-service/data-index-service-common/src/main/resources/domain.schema.graphqls index 708b44593a..b6f3887da9 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/resources/domain.schema.graphqls +++ b/data-index/data-index-service/data-index-service-common/src/main/resources/domain.schema.graphqls @@ -1,27 +1,5 @@ extend schema { subscription: Subscription - mutation: Mutation -} - -type Mutation { - ProcessInstanceAbort(id: String): String - ProcessInstanceRetry(id: String): String - ProcessInstanceSkip(id: String): String - ProcessInstanceUpdateVariables(id: String, variables: String): String - NodeInstanceTrigger(id: String, nodeId: String): String - NodeInstanceRetrigger(id: String, nodeInstanceId: String): String - NodeInstanceCancel(id: String, nodeInstanceId: String): String - JobCancel(id: String): String - JobReschedule(id: String, data: String): String - UserTaskInstanceUpdate(taskId: String, user: String, groups: [String], description: String, priority: String, - actualOwner: String, adminGroups: [String!], adminUsers: [String!], excludedUsers: [String!], - potentialGroups: [String!], potentialUsers: [String!], inputParams: String): String - UserTaskInstanceCommentCreate(taskId: String, user: String, groups: [String], comment: String): String - UserTaskInstanceAttachmentCreate(taskId: String, user: String, groups: [String], name: String, uri: String): String - UserTaskInstanceCommentUpdate(user:String, groups:[String],commentId: String, comment: String): String - UserTaskInstanceCommentDelete(user:String, groups:[String],commentId: String): String - UserTaskInstanceAttachmentUpdate(user:String, groups:[String],attachmentId: String, name: String, uri: String): String - UserTaskInstanceAttachmentDelete(user:String, groups:[String], attachmentId: String): String } type KogitoMetadata { diff --git a/data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java b/data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java index e7f175c7a0..654b577cdb 100644 --- a/data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java +++ b/data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java @@ -20,7 +20,9 @@ import java.io.UncheckedIOException; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.stream.Collectors; import javax.enterprise.context.ApplicationScoped; @@ -28,6 +30,7 @@ import javax.inject.Inject; import org.eclipse.microprofile.context.ManagedExecutor; +import org.kie.kogito.Application; import org.kie.kogito.addon.source.files.SourceFilesProvider; import org.kie.kogito.index.api.KogitoRuntimeClient; import org.kie.kogito.index.model.Job; @@ -37,8 +40,10 @@ import org.kie.kogito.index.service.DataIndexServiceException; import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess; import org.kie.kogito.process.Process; +import org.kie.kogito.process.ProcessInstanceExecutionException; import org.kie.kogito.process.Processes; import org.kie.kogito.process.impl.AbstractProcess; +import org.kie.kogito.services.uow.UnitOfWorkExecutor; import org.kie.kogito.svg.ProcessSvgService; import static java.util.stream.Collectors.toMap; @@ -46,41 +51,71 @@ @ApplicationScoped public class KogitoAddonRuntimeClientImpl implements KogitoRuntimeClient { + private static String SUCCESSFULLY_OPERATION_MESSAGE = "Successfully performed: %s"; + private ProcessSvgService processSvgService; private SourceFilesProvider sourceFilesProvider; private Processes processes; + private Application application; + @Inject public KogitoAddonRuntimeClientImpl(Instance processSvgService, SourceFilesProvider sourceFilesProvider, - Instance processesInstance) { + Instance processesInstance, + Instance application) { this.processSvgService = processSvgService.isResolvable() ? processSvgService.get() : null; this.sourceFilesProvider = sourceFilesProvider; this.processes = processesInstance.isResolvable() ? processesInstance.get() : null; + this.application = application.isResolvable() ? application.get() : null; } @Inject ManagedExecutor managedExecutor; static CompletableFuture throwUnsupportedException() { - return CompletableFuture.failedFuture(new UnsupportedOperationException()); + return CompletableFuture.failedFuture(new UnsupportedOperationException("Unsupported operation using Data Index addon")); } @Override public CompletableFuture abortProcessInstance(String serviceURL, ProcessInstance processInstance) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.abort(); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, "ABORT ProcessInstance with id: " + processInstance.getId()); + } + })); } @Override public CompletableFuture retryProcessInstance(String serviceURL, ProcessInstance processInstance) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.error().get().retrigger(); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, "RETRY ProcessInstance in error with id: " + processInstance.getId()); + } + })); } @Override public CompletableFuture skipProcessInstance(String serviceURL, ProcessInstance processInstance) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.error().get().skip(); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, "SKIP ProcessInstance in error with id: " + processInstance.getId()); + } + })); } @Override @@ -137,26 +172,51 @@ private static Map mapMetadata(org.kie.api.definition.process.No @Override public CompletableFuture triggerNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeDefinitionId) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.triggerNode(nodeDefinitionId); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, + "TRIGGER Node " + nodeDefinitionId + "from ProcessInstance with id: " + processInstance.getId()); + } + })); } @Override public CompletableFuture retriggerNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeInstanceId) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.retriggerNodeInstance(nodeInstanceId); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, + "RETRIGGER Node instance " + nodeInstanceId + "from ProcessInstance with id: " + processInstance.getId()); + } + })); } @Override public CompletableFuture cancelNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeInstanceId) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.cancelNodeInstance(nodeInstanceId); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, + "CANCEL Node instance " + nodeInstanceId + "from ProcessInstance with id: " + processInstance.getId()); + } + })); } - @Override public CompletableFuture cancelJob(String serviceURL, Job job) { return throwUnsupportedException(); } - @Override - public CompletableFuture rescheduleJob(String serviceURL, Job job, String newJobData) { + public CompletableFuture rescheduleJob(String serviceURL, Job job, String newJobData) throws UncheckedIOException { return throwUnsupportedException(); } @@ -200,4 +260,22 @@ public CompletableFuture updateUserTaskInstanceAttachment(String service public CompletableFuture deleteUserTaskInstanceAttachment(String serviceURL, UserTaskInstance userTaskInstance, String user, List groups, String attachmentId) { return throwUnsupportedException(); } + + private String executeOnProcessInstance(String processId, String processInstanceId, Function, String> supplier) { + + Process process = processes != null ? processes.processById(processId) : null; + + if (process == null) { + throw new DataIndexServiceException(String.format("Unable to find Process instance with id %s to perform the operation requested", processInstanceId)); + } + return UnitOfWorkExecutor.executeInUnitOfWork(application.unitOfWorkManager(), () -> { + Optional> processInstanceFound = process.instances().findById(processInstanceId); + if (processInstanceFound.isPresent()) { + org.kie.kogito.process.ProcessInstance processInstance = processInstanceFound.get(); + return supplier.apply(processInstance); + } else { + throw new DataIndexServiceException(String.format("Process instance with id %s doesn't allow the operation requested", processInstanceId)); + } + }); + } } diff --git a/data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/test/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImplTest.java b/data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/test/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImplTest.java new file mode 100644 index 0000000000..d041cc37ea --- /dev/null +++ b/data-index/kogito-addons-quarkus-data-index-persistence/kogito-addons-quarkus-data-index-persistence-common/runtime/src/test/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImplTest.java @@ -0,0 +1,207 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kie.kogito.index.addon.api; + +import java.util.Optional; + +import javax.enterprise.inject.Instance; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.kie.kogito.Application; +import org.kie.kogito.addon.source.files.SourceFilesProvider; +import org.kie.kogito.index.model.ProcessInstance; +import org.kie.kogito.index.test.TestUtils; +import org.kie.kogito.process.ProcessError; +import org.kie.kogito.process.ProcessInstanceExecutionException; +import org.kie.kogito.process.ProcessInstances; +import org.kie.kogito.process.Processes; +import org.kie.kogito.process.impl.AbstractProcess; +import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory; +import org.kie.kogito.services.uow.DefaultUnitOfWorkManager; +import org.kie.kogito.svg.ProcessSvgService; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public class KogitoAddonRuntimeClientImplTest { + + private static int ACTIVE = 1; + private static int ERROR = 5; + private static String SERVICE_URL = "http://runtimeURL.com"; + private static String PROCESS_INSTANCE_ID = "pId"; + private static final String NODE_ID = "nodeId"; + private static String TASK_ID = "taskId"; + private static String JOB_ID = "jobId"; + + public static final String NODE_ID_ERROR = "processInstanceIdError"; + + private KogitoAddonRuntimeClientImpl client; + + @Mock + Instance processSvgServiceInstance; + + @Mock + private ProcessSvgService processSvgService; + + @Mock + private SourceFilesProvider sourceFilesProvider; + + @Mock + Instance processesInstance; + + @Mock + private Processes processes; + + @Mock + private AbstractProcess process; + + @Mock + private ProcessInstances instances; + + @Mock + private org.kie.kogito.process.ProcessInstance processInstance; + + @Mock + private ProcessError error; + + @Mock + private Object variables; + + @Mock + Instance applicationInstance; + + @Mock + private Application application; + + @BeforeEach + public void setup() { + lenient().when(processSvgServiceInstance.isResolvable()).thenReturn(true); + lenient().when(processSvgServiceInstance.get()).thenReturn(processSvgService); + lenient().when(processesInstance.isResolvable()).thenReturn(true); + lenient().when(processesInstance.get()).thenReturn(processes); + lenient().when(processes.processById(anyString())).thenReturn(process); + lenient().when(process.instances()).thenReturn(instances); + lenient().when(instances.findById(anyString())).thenReturn(Optional.of(processInstance)); + lenient().when(processInstance.error()).thenReturn(Optional.of(error)); + lenient().when(processInstance.variables()).thenReturn(variables); + lenient().when(processInstance.id()).thenReturn(PROCESS_INSTANCE_ID); + lenient().when(processInstance.status()).thenReturn(org.kie.kogito.process.ProcessInstance.STATE_ERROR); + lenient().when(error.failedNodeId()).thenReturn(NODE_ID_ERROR); + lenient().when(error.errorMessage()).thenReturn("Test error message"); + lenient().when(application.unitOfWorkManager()).thenReturn(new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory())); + lenient().when(applicationInstance.isResolvable()).thenReturn(true); + lenient().when(applicationInstance.get()).thenReturn(application); + + client = spy(new KogitoAddonRuntimeClientImpl(processSvgServiceInstance, sourceFilesProvider, processesInstance, applicationInstance)); + } + + private org.kie.kogito.process.ProcessInstance mockProcessInstanceStatusActive() { + return doAnswer((v) -> { + when(processInstance.status()).thenReturn(org.kie.kogito.process.ProcessInstance.STATE_ACTIVE); + return null; + }).when(processInstance); + } + + private org.kie.kogito.process.ProcessInstance mockProcessInstanceStatusError() { + return doAnswer((v) -> { + when(processInstance.status()).thenReturn(org.kie.kogito.process.ProcessInstance.STATE_ERROR); + return null; + }).when(processInstance); + } + + private ProcessError mockProcessInstanceStatusActiveOnError() { + return doAnswer((v) -> { + when(processInstance.status()).thenReturn(org.kie.kogito.process.ProcessInstance.STATE_ACTIVE); + return null; + }).when(error); + } + + @Test + void testAbortProcessInstanceSuccess() { + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + mockProcessInstanceStatusActive().abort(); + client.abortProcessInstance(SERVICE_URL, pI); + verify(processInstance, times(0)).error(); + verify(processInstance, times(1)).abort(); + } + + @Test + void testAbortProcessInstanceError() { + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + mockProcessInstanceStatusError().abort(); + assertThrows(ProcessInstanceExecutionException.class, + () -> client.abortProcessInstance(SERVICE_URL, pI)); + verify(processInstance, times(2)).error(); + verify(processInstance, times(1)).abort(); + } + + @Test + void testRetryProcessInstance() { + mockProcessInstanceStatusActiveOnError().retrigger(); + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + client.retryProcessInstance(SERVICE_URL, pI); + verify(processInstance, times(1)).error(); + verify(error, times(1)).retrigger(); + verify(error, times(0)).skip(); + } + + @Test + void testSkipProcessInstance() { + mockProcessInstanceStatusActiveOnError().skip(); + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + client.skipProcessInstance(SERVICE_URL, pI); + verify(processInstance, times(1)).error(); + verify(error, times(0)).retrigger(); + verify(error, times(1)).skip(); + } + + @Test + void testTriggerNodeInstance() { + mockProcessInstanceStatusActive().triggerNode(NODE_ID); + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + client.triggerNodeInstance(SERVICE_URL, pI, NODE_ID); + verify(processInstance, times(0)).error(); + verify(processInstance, times(1)).triggerNode(NODE_ID); + } + + @Test + void testRetriggerNodeInstance() { + mockProcessInstanceStatusActive().retriggerNodeInstance(NODE_ID); + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + client.retriggerNodeInstance(SERVICE_URL, pI, NODE_ID); + verify(processInstance, times(0)).error(); + verify(processInstance, times(1)).retriggerNodeInstance(NODE_ID); + } + + @Test + void testCancelNodeInstance() { + mockProcessInstanceStatusActive().cancelNodeInstance(NODE_ID); + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + client.cancelNodeInstance(SERVICE_URL, pI, NODE_ID); + verify(processInstance, times(0)).error(); + verify(processInstance, times(1)).cancelNodeInstance(NODE_ID); + } + + private ProcessInstance createProcessInstance(String processInstanceId, int status) { + return TestUtils.getProcessInstance("travels", processInstanceId, status, null, null); + } + +} diff --git a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/graphql/GraphQLAddonSchemaManagerImpl.java b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/graphql/GraphQLAddonSchemaManagerImpl.java index 9971514dfb..8ec4f67ed6 100644 --- a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/graphql/GraphQLAddonSchemaManagerImpl.java +++ b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/graphql/GraphQLAddonSchemaManagerImpl.java @@ -41,6 +41,18 @@ public GraphQLSchema createSchema() { builder.dataFetcher("Jobs", this::getJobsValues); return builder; }) + .type("Mutation", builder -> { + builder.dataFetcher("ProcessInstanceAbort", this::abortProcessInstance); + builder.dataFetcher("ProcessInstanceRetry", this::retryProcessInstance); + builder.dataFetcher("ProcessInstanceSkip", this::skipProcessInstance); + builder.dataFetcher("ProcessInstanceUpdateVariables", this::updateProcessInstanceVariables); + builder.dataFetcher("NodeInstanceTrigger", this::triggerNodeInstance); + builder.dataFetcher("NodeInstanceRetrigger", this::retriggerNodeInstance); + builder.dataFetcher("NodeInstanceCancel", this::cancelNodeInstance); + builder.dataFetcher("JobCancel", this::cancelJob); + builder.dataFetcher("JobReschedule", this::rescheduleJob); + return builder; + }) .type("ProcessDefinition", builder -> { builder.dataFetcher("source", e -> getProcessDefinitionSource(e.getSource())); builder.dataFetcher("nodes", e -> getProcessDefinitionNodes(e.getSource()));