diff --git a/data-index/data-index-common/pom.xml b/data-index/data-index-common/pom.xml index 6a52279bf0..58231a1fed 100644 --- a/data-index/data-index-common/pom.xml +++ b/data-index/data-index-common/pom.xml @@ -29,6 +29,10 @@ io.quarkus quarkus-vertx + + io.vertx + vertx-web-client + io.quarkus quarkus-reactive-routes diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/KogitoRuntimeCommonClient.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/KogitoRuntimeCommonClient.java new file mode 100644 index 0000000000..0d4b20f07c --- /dev/null +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/KogitoRuntimeCommonClient.java @@ -0,0 +1,159 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kie.kogito.index.api; + +import java.net.MalformedURLException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.kie.kogito.index.model.Job; +import org.kie.kogito.index.service.DataIndexServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.quarkus.security.credential.TokenCredential; +import io.quarkus.security.identity.SecurityIdentity; +import io.vertx.core.AsyncResult; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import static java.lang.String.format; + +@ApplicationScoped +public class KogitoRuntimeCommonClient { + + public static final String CANCEL_JOB_PATH = "/jobs/%s"; + public static final String RESCHEDULE_JOB_PATH = "/jobs/%s"; + + public static final String FROM_PROCESS_INSTANCE_WITH_ID = "from ProcessInstance with id: "; + + private static final Logger LOGGER = LoggerFactory.getLogger(KogitoRuntimeCommonClient.class); + + protected Vertx vertx; + + protected SecurityIdentity identity; + + protected Map serviceWebClientMap = new HashMap<>(); + + @ConfigProperty(name = "kogito.dataindex.gateway.url") + protected Optional gatewayTargetUrl; + + public void setGatewayTargetUrl(Optional gatewayTargetUrl) { + this.gatewayTargetUrl = gatewayTargetUrl; + } + + public void addServiceWebClient(String serviceUrl, WebClient webClient) { + serviceWebClientMap.put(serviceUrl, webClient); + } + + protected WebClient getWebClient(String runtimeServiceUrl) { + if (runtimeServiceUrl == null) { + throw new DataIndexServiceException("Runtime service URL not defined, please review the kogito.service.url system property to point the public URL for this runtime."); + } else { + return serviceWebClientMap.computeIfAbsent(runtimeServiceUrl, url -> WebClient.create(vertx, getWebClientToURLOptions(runtimeServiceUrl))); + } + } + + public WebClientOptions getWebClientToURLOptions(String targetHttpURL) { + try { + URL dataIndexURL = new URL(targetHttpURL); + return new WebClientOptions() + .setDefaultHost(gatewayTargetUrl.orElse(dataIndexURL.getHost())) + .setDefaultPort((dataIndexURL.getPort() != -1 ? dataIndexURL.getPort() : dataIndexURL.getDefaultPort())) + .setSsl(dataIndexURL.getProtocol().compareToIgnoreCase("https") == 0); + } catch (MalformedURLException ex) { + LOGGER.error(String.format("Invalid runtime service URL: %s", targetHttpURL), ex); + return null; + } + } + + public CompletableFuture cancelJob(String serviceURL, Job job) { + String requestURI = format(CANCEL_JOB_PATH, job.getId()); + LOGGER.debug("Sending DELETE to URI {}", requestURI); + return sendDeleteClientRequest(getWebClient(serviceURL), requestURI, "CANCEL Job with id: " + job.getId()); + } + + public CompletableFuture rescheduleJob(String serviceURL, Job job, String newJobData) { + String requestURI = format(RESCHEDULE_JOB_PATH, job.getId()); + LOGGER.debug("Sending body: {} PATCH to URI {}", newJobData, requestURI); + return sendPatchClientRequest(getWebClient(serviceURL), requestURI, + "RESCHEDULED JOB with id: " + job.getId(), new JsonObject(newJobData)); + } + + public CompletableFuture sendDeleteClientRequest(WebClient webClient, String requestURI, String logMessage) { + CompletableFuture future = new CompletableFuture<>(); + webClient.delete(requestURI) + .putHeader("Authorization", getAuthHeader()) + .send(res -> asyncHttpResponseTreatment(res, future, logMessage)); + LOGGER.debug("Sending DELETE to URI {}", requestURI); + return future; + } + + protected void asyncHttpResponseTreatment(AsyncResult> res, CompletableFuture future, String logMessage) { + if (res.succeeded() && (res.result().statusCode() == 200 || res.result().statusCode() == 201)) { + future.complete(res.result().bodyAsString() != null ? res.result().bodyAsString() : "Successfully performed: " + logMessage); + } else { + future.completeExceptionally(new DataIndexServiceException(getErrorMessage(logMessage, res.result()))); + } + } + + public CompletableFuture sendPatchClientRequest(WebClient webClient, String requestURI, String logMessage, JsonObject jsonBody) { + CompletableFuture future = new CompletableFuture<>(); + webClient.patch(requestURI) + .putHeader("Authorization", getAuthHeader()) + .sendJson(jsonBody, res -> asyncHttpResponseTreatment(res, future, logMessage)); + return future; + } + + protected String getErrorMessage(String logMessage, HttpResponse result) { + String errorMessage = "FAILED: " + logMessage; + if (result != null) { + errorMessage += " errorCode:" + result.statusCode() + + " errorStatus:" + result.statusMessage() + + " errorMessage:" + (result.body() != null ? result.body().toString() : "-"); + } + return errorMessage; + } + + public String getAuthHeader() { + if (identity != null && identity.getCredential(TokenCredential.class) != null) { + return "Bearer " + identity.getCredential(TokenCredential.class).getToken(); + } + return ""; + } + + @Inject + public void setIdentity(SecurityIdentity identity) { + this.identity = identity; + } + + @Inject + public void setVertx(Vertx vertx) { + this.vertx = vertx; + } + +} diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java index 412f35bd64..4f4f1add75 100644 --- a/data-index/data-index-common/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/graphql/AbstractGraphQLSchemaManager.java @@ -35,6 +35,7 @@ import org.kie.kogito.index.model.Node; import org.kie.kogito.index.model.ProcessInstance; import org.kie.kogito.index.model.UserTaskInstance; +import org.kie.kogito.index.service.DataIndexServiceException; import org.kie.kogito.index.storage.DataIndexStorageService; import org.kie.kogito.persistence.api.Storage; import org.kie.kogito.persistence.api.query.Query; @@ -49,11 +50,21 @@ import graphql.schema.idl.SchemaParser; import graphql.schema.idl.TypeDefinitionRegistry; +import static java.lang.String.format; import static java.util.Collections.singletonList; import static org.kie.kogito.persistence.api.query.QueryFilterFactory.equalTo; public abstract class AbstractGraphQLSchemaManager implements GraphQLSchemaManager { + private static final String ID = "id"; + private static final String USER = "user"; + private static final String GROUPS = "groups"; + private static final String TASK_ID = "taskId"; + private static final String COMMENT_ID = "commentId"; + private static final String ATTACHMENT_ID = "attachmentId"; + + private static final String UNABLE_TO_FIND_ERROR_MSG = "Unable to find the instance with %s %s"; + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractGraphQLSchemaManager.class); @Inject @@ -213,4 +224,176 @@ public void transform(Consumer builder) { schema = schema.transform(builder); } + public CompletableFuture abortProcessInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().abortProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture retryProcessInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().retryProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture skipProcessInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().skipProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture updateProcessInstanceVariables(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().updateProcessInstanceVariables(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance, + env.getArgument("variables")); + + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture triggerNodeInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().triggerNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), + processInstance, + env.getArgument("nodeId")); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture retriggerNodeInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().retriggerNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), + processInstance, + env.getArgument("nodeInstanceId")); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture cancelNodeInstance(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(id); + if (processInstance != null) { + return getDataIndexApiExecutor().cancelNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), + processInstance, + env.getArgument("nodeInstanceId")); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture cancelJob(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + Job job = getCacheService().getJobsCache().get(id); + if (job != null) { + return getDataIndexApiExecutor().cancelJob(job.getEndpoint(), job); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + public CompletableFuture rescheduleJob(DataFetchingEnvironment env) { + String id = env.getArgument("id"); + Job job = getCacheService().getJobsCache().get(id); + if (job != null) { + return getDataIndexApiExecutor().rescheduleJob(job.getEndpoint(), job, env.getArgument("data")); + } + return CompletableFuture.failedFuture(new DataIndexServiceException(format(UNABLE_TO_FIND_ERROR_MSG, ID, id))); + } + + protected CompletableFuture getUserTaskInstanceSchema(DataFetchingEnvironment env) { + UserTaskInstance userTaskInstance = env.getSource(); + return getDataIndexApiExecutor().getUserTaskSchema(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS)); + } + + protected CompletableFuture updateUserTaskInstance(DataFetchingEnvironment env) { + UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); + return getDataIndexApiExecutor().updateUserTaskInstance(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArguments()); + } + + protected CompletableFuture createTaskInstanceComment(DataFetchingEnvironment env) { + UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); + return getDataIndexApiExecutor().createUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument("comment")); + } + + protected CompletableFuture createTaskInstanceAttachment(DataFetchingEnvironment env) { + UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); + return getDataIndexApiExecutor().createUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument("name"), + env.getArgument("uri")); + } + + protected CompletableFuture updateUserTaskComment(DataFetchingEnvironment env) { + Query query = getCacheService().getUserTaskInstancesCache().query(); + query.filter(singletonList(equalTo("comments.id", env.getArgument(COMMENT_ID)))); + UserTaskInstance userTaskInstance = query.execute().get(0); + return getDataIndexApiExecutor().updateUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument(COMMENT_ID), + env.getArgument("comment")); + } + + protected CompletableFuture deleteUserTaskComment(DataFetchingEnvironment env) { + Query query = getCacheService().getUserTaskInstancesCache().query(); + query.filter(singletonList(equalTo("comments.id", env.getArgument(COMMENT_ID)))); + UserTaskInstance userTaskInstance = query.execute().get(0); + return getDataIndexApiExecutor().deleteUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument(COMMENT_ID)); + } + + protected CompletableFuture updateUserTaskAttachment(DataFetchingEnvironment env) { + Query query = getCacheService().getUserTaskInstancesCache().query(); + query.filter(singletonList(equalTo("attachments.id", env.getArgument(ATTACHMENT_ID)))); + UserTaskInstance userTaskInstance = query.execute().get(0); + return getDataIndexApiExecutor().updateUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument(ATTACHMENT_ID), + env.getArgument("name"), + env.getArgument("uri")); + } + + protected CompletableFuture deleteUserTaskAttachment(DataFetchingEnvironment env) { + Query query = getCacheService().getUserTaskInstancesCache().query(); + query.filter(singletonList(equalTo("attachments.id", env.getArgument(ATTACHMENT_ID)))); + UserTaskInstance userTaskInstance = query.execute().get(0); + return getDataIndexApiExecutor().deleteUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), + userTaskInstance, + env.getArgument(USER), + env.getArgument(GROUPS), + env.getArgument(ATTACHMENT_ID)); + } + } diff --git a/data-index/data-index-common/src/main/resources/basic.schema.graphqls b/data-index/data-index-common/src/main/resources/basic.schema.graphqls index f6d22aa649..c4cc47d1b1 100644 --- a/data-index/data-index-common/src/main/resources/basic.schema.graphqls +++ b/data-index/data-index-common/src/main/resources/basic.schema.graphqls @@ -4,6 +4,7 @@ scalar Long schema { query: Query + mutation: Mutation } type Query { @@ -12,6 +13,27 @@ type Query { Jobs(where: JobArgument, orderBy: JobOrderBy, pagination: Pagination): [Job] } +type Mutation { + ProcessInstanceAbort(id: String): String + ProcessInstanceRetry(id: String): String + ProcessInstanceSkip(id: String): String + ProcessInstanceUpdateVariables(id: String, variables: String): String + NodeInstanceTrigger(id: String, nodeId: String): String + NodeInstanceRetrigger(id: String, nodeInstanceId: String): String + NodeInstanceCancel(id: String, nodeInstanceId: String): String + JobCancel(id: String): String + JobReschedule(id: String, data: String): String + UserTaskInstanceUpdate(taskId: String, user: String, groups: [String], description: String, priority: String, + actualOwner: String, adminGroups: [String!], adminUsers: [String!], excludedUsers: [String!], + potentialGroups: [String!], potentialUsers: [String!], inputParams: String): String + UserTaskInstanceCommentCreate(taskId: String, user: String, groups: [String], comment: String): String + UserTaskInstanceAttachmentCreate(taskId: String, user: String, groups: [String], name: String, uri: String): String + UserTaskInstanceCommentUpdate(user:String, groups:[String],commentId: String, comment: String): String + UserTaskInstanceCommentDelete(user:String, groups:[String],commentId: String): String + UserTaskInstanceAttachmentUpdate(user:String, groups:[String],attachmentId: String, name: String, uri: String): String + UserTaskInstanceAttachmentDelete(user:String, groups:[String], attachmentId: String): String +} + type ProcessInstance { id: String! processId: String! diff --git a/data-index/data-index-common/src/test/java/org/kie/kogito/index/api/KogitoRuntimeCommonClientTest.java b/data-index/data-index-common/src/test/java/org/kie/kogito/index/api/KogitoRuntimeCommonClientTest.java new file mode 100644 index 0000000000..86799c27ec --- /dev/null +++ b/data-index/data-index-common/src/test/java/org/kie/kogito/index/api/KogitoRuntimeCommonClientTest.java @@ -0,0 +1,190 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kie.kogito.index.api; + +import java.nio.Buffer; +import java.util.*; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.kie.kogito.index.TestUtils; +import org.kie.kogito.index.model.Job; +import org.kie.kogito.index.model.ProcessInstance; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import io.quarkus.security.credential.TokenCredential; +import io.quarkus.security.identity.SecurityIdentity; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.HttpRequest; +import io.vertx.ext.web.client.HttpResponse; +import io.vertx.ext.web.client.WebClient; +import io.vertx.ext.web.client.WebClientOptions; + +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public abstract class KogitoRuntimeCommonClientTest { + + protected static int ACTIVE = 1; + protected static String SERVICE_URL = "http://runtimeURL.com"; + protected static String PROCESS_INSTANCE_ID = "pId"; + protected static String JOB_ID = "jobId"; + protected static String AUTHORIZED_TOKEN = "authToken"; + + @Mock + protected Vertx vertx; + + @Mock + protected SecurityIdentity identityMock; + + protected TokenCredential tokenCredential; + + @Mock + protected WebClient webClientMock; + + @Mock + protected HttpRequest httpRequestMock; + + protected abstract KogitoRuntimeCommonClient getClient(); + + protected void testCancelJobRest() { + setupIdentityMock(); + when(webClientMock.delete(anyString())).thenReturn(httpRequestMock); + Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED"); + getClient().cancelJob(SERVICE_URL, job); + + verify(getClient()).sendDeleteClientRequest(webClientMock, + format(KogitoRuntimeCommonClient.CANCEL_JOB_PATH, job.getId()), + "CANCEL Job with id: " + JOB_ID); + ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class); + verify(httpRequestMock).send(handlerCaptor.capture()); + checkResponseHandling(handlerCaptor.getValue()); + } + + @Test + protected void testRescheduleWithoutJobServiceInstance() { + String newJobData = "{\"expirationTime\": \"2023-08-27T04:35:54.631Z\",\"retries\": 2}"; + setupIdentityMock(); + when(webClientMock.patch(anyString())).thenReturn(httpRequestMock); + + Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED"); + + getClient().rescheduleJob(SERVICE_URL, job, newJobData); + ArgumentCaptor jsonCaptor = ArgumentCaptor.forClass(JsonObject.class); + + verify(getClient()).sendPatchClientRequest(eq(webClientMock), + eq(format(KogitoRuntimeCommonClient.RESCHEDULE_JOB_PATH, JOB_ID)), + eq("RESCHEDULED JOB with id: " + job.getId()), + jsonCaptor.capture()); + + assertThat(jsonCaptor.getValue().getString("expirationTime")).isEqualTo("2023-08-27T04:35:54.631Z"); + assertThat(jsonCaptor.getValue().getString("retries")).isEqualTo("2"); + + ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class); + JsonObject jsonOject = new JsonObject(newJobData); + verify(httpRequestMock).sendJson(eq(jsonOject), handlerCaptor.capture()); + verify(httpRequestMock).putHeader("Authorization", "Bearer " + AUTHORIZED_TOKEN); + checkResponseHandling(handlerCaptor.getValue()); + } + + @Test + void testWebClientToURLOptions() { + String defaultHost = "localhost"; + int defaultPort = 8180; + WebClientOptions webClientOptions = getClient().getWebClientToURLOptions("http://" + defaultHost + ":" + defaultPort); + assertThat(webClientOptions.getDefaultHost()).isEqualTo(defaultHost); + assertThat(webClientOptions.getDefaultPort()).isEqualTo(defaultPort); + } + + @Test + void testWebClientToURLOptionsWithoutPort() { + String dataIndexUrl = "http://service.com"; + WebClientOptions webClientOptions = getClient().getWebClientToURLOptions(dataIndexUrl); + assertThat(webClientOptions.getDefaultPort()).isEqualTo(80); + assertThat(webClientOptions.getDefaultHost()).isEqualTo("service.com"); + assertFalse(webClientOptions.isSsl()); + } + + @Test + void testWebClientToURLOptionsWithoutPortSSL() { + String dataIndexurl = "https://service.com"; + WebClientOptions webClientOptions = getClient().getWebClientToURLOptions(dataIndexurl); + assertThat(webClientOptions.getDefaultPort()).isEqualTo(443); + assertThat(webClientOptions.getDefaultHost()).isEqualTo("service.com"); + assertTrue(webClientOptions.isSsl()); + } + + @Test + void testMalformedURL() { + assertThat(getClient().getWebClientToURLOptions("malformedURL")).isNull(); + } + + @Test + void testOverrideURL() { + String host = "host.testcontainers.internal"; + getClient().setGatewayTargetUrl(Optional.of(host)); + WebClientOptions webClientOptions = getClient().getWebClientToURLOptions("http://service.com"); + assertThat(webClientOptions.getDefaultHost()).isEqualTo(host); + } + + protected AsyncResult createResponseMocks(HttpResponse response, boolean succeed, int statusCode) { + AsyncResult asyncResultMock = mock(AsyncResult.class); + when(asyncResultMock.succeeded()).thenReturn(succeed); + when(asyncResultMock.result()).thenReturn(response); + when(response.statusCode()).thenReturn(statusCode); + return asyncResultMock; + } + + protected Job createJob(String jobId, String processInstanceId, String status) { + return TestUtils.getJob(jobId, "travels", processInstanceId, null, null, status); + } + + protected void checkResponseHandling(Handler>> handler) { + HttpResponse response = mock(HttpResponse.class); + HttpResponse responseWithoutError = mock(HttpResponse.class); + + handler.handle(createResponseMocks(response, false, 404)); + verify(response).statusMessage(); + verify(response).body(); + verify(response, never()).bodyAsString(); + + handler.handle(createResponseMocks(responseWithoutError, true, 200)); + verify(responseWithoutError, never()).statusMessage(); + verify(responseWithoutError, never()).body(); + verify(responseWithoutError).bodyAsString(); + } + + protected void setupIdentityMock() { + tokenCredential = mock(TokenCredential.class); + when(identityMock.getCredential(TokenCredential.class)).thenReturn(tokenCredential); + when(tokenCredential.getToken()).thenReturn(AUTHORIZED_TOKEN); + when(httpRequestMock.putHeader("Authorization", "Bearer " + AUTHORIZED_TOKEN)).thenReturn(httpRequestMock); + } + + protected ProcessInstance createProcessInstance(String processInstanceId, int status) { + return TestUtils.getProcessInstance("travels", processInstanceId, status, null, null); + } + +} diff --git a/data-index/data-index-service/data-index-service-common/pom.xml b/data-index/data-index-service/data-index-service-common/pom.xml index 144a0fa16c..4a1eff5009 100644 --- a/data-index/data-index-service/data-index-service-common/pom.xml +++ b/data-index/data-index-service/data-index-service-common/pom.xml @@ -33,10 +33,6 @@ org.kie.kogito persistence-commons-protobuf - - io.vertx - vertx-web-client - io.smallrye.reactive smallrye-reactive-messaging-api diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java index 3c74b1e286..960092c794 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java @@ -15,21 +15,15 @@ */ package org.kie.kogito.index.service.api; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; import javax.ws.rs.core.MediaType; -import org.eclipse.microprofile.config.inject.ConfigProperty; import org.kie.kogito.index.api.KogitoRuntimeClient; -import org.kie.kogito.index.model.Job; +import org.kie.kogito.index.api.KogitoRuntimeCommonClient; import org.kie.kogito.index.model.Node; import org.kie.kogito.index.model.ProcessInstance; import org.kie.kogito.index.model.UserTaskInstance; @@ -37,21 +31,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.quarkus.security.credential.TokenCredential; -import io.quarkus.security.identity.SecurityIdentity; import io.vertx.core.AsyncResult; -import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.client.HttpRequest; import io.vertx.ext.web.client.HttpResponse; import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.client.WebClientOptions; import static java.lang.String.format; @ApplicationScoped -public class KogitoRuntimeClientImpl implements KogitoRuntimeClient { +class KogitoRuntimeClientImpl extends KogitoRuntimeCommonClient implements KogitoRuntimeClient { public static final String ABORT_PROCESS_INSTANCE_PATH = "/management/processes/%s/instances/%s"; public static final String RETRY_PROCESS_INSTANCE_PATH = "/management/processes/%s/instances/%s/retrigger"; @@ -64,9 +54,6 @@ public class KogitoRuntimeClientImpl implements KogitoRuntimeClient { public static final String RETRIGGER_NODE_INSTANCE_PATH = "/management/processes/%s/instances/%s/nodeInstances/%s"; // nodeInstance Id public static final String CANCEL_NODE_INSTANCE_PATH = "/management/processes/%s/instances/%s/nodeInstances/%s"; // nodeInstance Id - public static final String CANCEL_JOB_PATH = "/%s"; - public static final String RESCHEDULE_JOB_PATH = "/%s"; - public static final String GET_TASK_SCHEMA_PATH = "/%s/%s/%s/%s/schema"; public static final String UPDATE_USER_TASK_INSTANCE_PATH = "/management/processes/%s/instances/%s/tasks/%s"; @@ -79,39 +66,6 @@ public class KogitoRuntimeClientImpl implements KogitoRuntimeClient { public static final String DELETE_USER_TASK_INSTANCE_ATTACHMENT_PATH = "/%s/%s/%s/%s/attachments/%s"; private static final Logger LOGGER = LoggerFactory.getLogger(KogitoRuntimeClientImpl.class); - private Vertx vertx; - private SecurityIdentity identity; - protected Map serviceWebClientMap = new HashMap<>(); - - @ConfigProperty(name = "kogito.dataindex.gateway.url") - protected Optional gatewayTargetUrl; - - @Inject - public KogitoRuntimeClientImpl(Vertx vertx, SecurityIdentity identity) { - this.vertx = vertx; - this.identity = identity; - } - - protected WebClient getWebClient(String runtimeServiceUrl) { - if (runtimeServiceUrl == null) { - throw new DataIndexServiceException("Runtime service URL not defined, please review the kogito.service.url system property to point the public URL for this runtime."); - } else { - return serviceWebClientMap.computeIfAbsent(runtimeServiceUrl, url -> WebClient.create(vertx, getWebClientToURLOptions(runtimeServiceUrl))); - } - } - - protected WebClientOptions getWebClientToURLOptions(String targetHttpURL) { - try { - URL dataIndexURL = new URL(targetHttpURL); - return new WebClientOptions() - .setDefaultHost(gatewayTargetUrl.orElse(dataIndexURL.getHost())) - .setDefaultPort((dataIndexURL.getPort() != -1 ? dataIndexURL.getPort() : dataIndexURL.getDefaultPort())) - .setSsl(dataIndexURL.getProtocol().compareToIgnoreCase("https") == 0); - } catch (MalformedURLException ex) { - LOGGER.error("Invalid runtime service URL: " + targetHttpURL, ex); - return null; - } - } @Override public CompletableFuture abortProcessInstance(String serviceURL, ProcessInstance processInstance) { @@ -160,37 +114,21 @@ public CompletableFuture> getProcessInstanceNodeDefinitions(String se public CompletableFuture triggerNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeDefinitionId) { String requestURI = format(TRIGGER_NODE_INSTANCE_PATH, processInstance.getProcessId(), processInstance.getId(), nodeDefinitionId); return sendPostClientRequest(getWebClient(serviceURL), requestURI, - "Trigger Node " + nodeDefinitionId + - "from ProcessInstance with id: " + processInstance.getId()); + "Trigger Node " + nodeDefinitionId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId()); } @Override public CompletableFuture retriggerNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeInstanceId) { String requestURI = format(RETRIGGER_NODE_INSTANCE_PATH, processInstance.getProcessId(), processInstance.getId(), nodeInstanceId); return sendPostClientRequest(getWebClient(serviceURL), requestURI, - "Retrigger NodeInstance " + nodeInstanceId + - "from ProcessInstance with id: " + processInstance.getId()); + "Retrigger NodeInstance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId()); } @Override public CompletableFuture cancelNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeInstanceId) { String requestURI = format(CANCEL_NODE_INSTANCE_PATH, processInstance.getProcessId(), processInstance.getId(), nodeInstanceId); return sendDeleteClientRequest(getWebClient(serviceURL), requestURI, - "Cancel NodeInstance " + nodeInstanceId + - "from ProcessInstance with id: " + processInstance.getId()); - } - - @Override - public CompletableFuture cancelJob(String serviceURL, Job job) { - String requestURI = format(CANCEL_JOB_PATH, job.getId()); - return sendDeleteClientRequest(getWebClient(serviceURL), requestURI, "CANCEL Job with id: " + job.getId()); - } - - @Override - public CompletableFuture rescheduleJob(String serviceURL, Job job, String newJobData) { - String requestURI = format(RESCHEDULE_JOB_PATH, job.getId()); - return sendJSONPutClientRequest(getWebClient(serviceURL), requestURI, - "RESCHEDULED JOB with id: " + job.getId(), newJobData); + "Cancel NodeInstance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId()); } @Override @@ -277,40 +215,28 @@ private String getUserGroupsURIParameter(String user, List groups) { return builder.toString(); } - protected CompletableFuture sendDeleteClientRequest(WebClient webClient, String requestURI, String logMessage) { - CompletableFuture future = new CompletableFuture<>(); - webClient.delete(requestURI) - .putHeader("Authorization", getAuthHeader()) - .send(res -> asyncHttpResponseTreatment(res, future, logMessage)); - return future; - } - protected CompletableFuture sendPostWithBodyClientRequest(WebClient webClient, String requestURI, String logMessage, String body, String contentType) { CompletableFuture future = new CompletableFuture<>(); + HttpRequest request = webClient.post(requestURI) .putHeader("Authorization", getAuthHeader()) .putHeader("Content-Type", contentType); if (MediaType.APPLICATION_JSON.equals(contentType)) { + LOGGER.debug("Sending Json Body: {} POST to URI {}", body, requestURI); request.sendJson(new JsonObject(body), res -> asyncHttpResponseTreatment(res, future, logMessage)); } else { + LOGGER.debug("Sending Buffer(Body): {} POST to URI {}", body, requestURI); request.sendBuffer(Buffer.buffer(body), res -> asyncHttpResponseTreatment(res, future, logMessage)); } return future; } - private void asyncHttpResponseTreatment(AsyncResult> res, CompletableFuture future, String logMessage) { - if (res.succeeded() && ((res.result().statusCode() == 200 || res.result().statusCode() == 201))) { - future.complete(res.result().bodyAsString() != null ? res.result().bodyAsString() : "Successfully performed: " + logMessage); - } else { - future.completeExceptionally(new DataIndexServiceException(getErrorMessage(logMessage, res.result()))); - } - } - protected CompletableFuture sendPostClientRequest(WebClient webClient, String requestURI, String logMessage) { CompletableFuture future = new CompletableFuture<>(); webClient.post(requestURI) .putHeader("Authorization", getAuthHeader()) .send(res -> asyncHttpResponseTreatment(res, future, logMessage)); + LOGGER.debug("Sending post to URI {}", requestURI); return future; } @@ -324,27 +250,22 @@ protected CompletableFuture sendPutClientRequest(WebClient webClient, String req .putHeader("Authorization", getAuthHeader()) .putHeader("Content-Type", contentType); if (MediaType.APPLICATION_JSON.equals(contentType)) { + LOGGER.debug("Sending Json Body: {} PUT to URI {}", body, requestURI); request.sendJson(new JsonObject(body), res -> asyncHttpResponseTreatment(res, future, logMessage)); } else { + LOGGER.debug("Sending Buffer(Body): {} PUT to URI {}", body, requestURI); request.sendBuffer(Buffer.buffer(body), res -> asyncHttpResponseTreatment(res, future, logMessage)); } return future; } - protected CompletableFuture sendPatchClientRequest(WebClient webClient, String requestURI, String logMessage, JsonObject jsonBody) { - CompletableFuture future = new CompletableFuture<>(); - webClient.patch(requestURI) - .putHeader("Authorization", getAuthHeader()) - .sendJson(jsonBody, res -> asyncHttpResponseTreatment(res, future, logMessage)); - return future; - } - protected CompletableFuture sendGetClientRequest(WebClient webClient, String requestURI, String logMessage, Class type) { CompletableFuture future = new CompletableFuture<>(); webClient.get(requestURI) .putHeader("Authorization", getAuthHeader()) .send(res -> send(logMessage, type, future, res)); + LOGGER.debug("Sending GET to URI {}", requestURI); return future; } @@ -362,20 +283,4 @@ protected void send(String logMessage, Class type, CompletableFuture future, Asy } } - private String getErrorMessage(String logMessage, HttpResponse result) { - String errorMessage = "FAILED: " + logMessage; - if (result != null) { - errorMessage += " errorCode:" + result.statusCode() + - " errorStatus:" + result.statusMessage() + - " errorMessage:" + (result.body() != null ? result.body().toString() : "-"); - } - return errorMessage; - } - - protected String getAuthHeader() { - if (identity != null && identity.getCredential(TokenCredential.class) != null) { - return "Bearer " + identity.getCredential(TokenCredential.class).getToken(); - } - return ""; - } } diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java index 5518de4942..f897fcf63c 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/graphql/GraphQLSchemaManagerImpl.java @@ -19,7 +19,6 @@ import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import javax.annotation.PostConstruct; @@ -28,13 +27,9 @@ import org.kie.kogito.index.graphql.AbstractGraphQLSchemaManager; import org.kie.kogito.index.graphql.query.GraphQLQueryParserRegistry; import org.kie.kogito.index.json.DataIndexParsingException; -import org.kie.kogito.index.model.Job; -import org.kie.kogito.index.model.ProcessInstance; import org.kie.kogito.index.model.ProcessInstanceState; -import org.kie.kogito.index.model.UserTaskInstance; import org.kie.kogito.index.service.DataIndexServiceException; import org.kie.kogito.persistence.api.Storage; -import org.kie.kogito.persistence.api.query.Query; import org.reactivestreams.Publisher; import com.fasterxml.jackson.databind.JsonNode; @@ -50,25 +45,17 @@ import graphql.schema.idl.TypeDefinitionRegistry; import static java.lang.String.format; -import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; import static org.kie.kogito.index.json.JsonUtils.getObjectMapper; -import static org.kie.kogito.persistence.api.query.QueryFilterFactory.equalTo; @ApplicationScoped public class GraphQLSchemaManagerImpl extends AbstractGraphQLSchemaManager { - private static final String PROCESS_INSTANCE_ADDED = "ProcessInstanceAdded"; private static final String PROCESS_INSTANCE_UPDATED = "ProcessInstanceUpdated"; private static final String USER_TASK_INSTANCE_ADDED = "UserTaskInstanceAdded"; private static final String USER_TASK_INSTANCE_UPDATED = "UserTaskInstanceUpdated"; private static final String JOB_UPDATED = "JobUpdated"; private static final String JOB_ADDED = "JobAdded"; - private static final String USER = "user"; - private static final String GROUPS = "groups"; - private static final String TASK_ID = "taskId"; - private static final String COMMENT_ID = "commentId"; - private static final String ATTACHMENT_ID = "attachmentId"; @Override @PostConstruct @@ -150,57 +137,6 @@ public GraphQLSchema createSchema() { return schemaGenerator.makeExecutableSchema(typeDefinitionRegistry, runtimeWiring); } - public CompletableFuture abortProcessInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().abortProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); - } - - public CompletableFuture retryProcessInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().retryProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); - } - - public CompletableFuture skipProcessInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().skipProcessInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance); - } - - public CompletableFuture updateProcessInstanceVariables(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().updateProcessInstanceVariables(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), processInstance, env.getArgument("variables")); - } - - public CompletableFuture triggerNodeInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().triggerNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), - processInstance, - env.getArgument("nodeId")); - } - - public CompletableFuture retriggerNodeInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().retriggerNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), - processInstance, - env.getArgument("nodeInstanceId")); - } - - public CompletableFuture cancelNodeInstance(DataFetchingEnvironment env) { - ProcessInstance processInstance = getCacheService().getProcessInstancesCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().cancelNodeInstance(getServiceUrl(processInstance.getEndpoint(), processInstance.getProcessId()), - processInstance, - env.getArgument("nodeInstanceId")); - } - - public CompletableFuture cancelJob(DataFetchingEnvironment env) { - Job job = getCacheService().getJobsCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().cancelJob(job.getEndpoint(), job); - } - - public CompletableFuture rescheduleJob(DataFetchingEnvironment env) { - Job job = getCacheService().getJobsCache().get(env.getArgument("id")); - return getDataIndexApiExecutor().rescheduleJob(job.getEndpoint(), job, env.getArgument("data")); - } - protected String getProcessInstanceJsonServiceUrl(DataFetchingEnvironment env) { Object source = env.getSource(); if (source instanceof JsonNode) { @@ -211,89 +147,6 @@ protected String getProcessInstanceJsonServiceUrl(DataFetchingEnvironment env) { return null; } - private CompletableFuture getUserTaskInstanceSchema(DataFetchingEnvironment env) { - UserTaskInstance userTaskInstance = env.getSource(); - return getDataIndexApiExecutor().getUserTaskSchema(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS)); - } - - private CompletableFuture updateUserTaskInstance(DataFetchingEnvironment env) { - UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); - return getDataIndexApiExecutor().updateUserTaskInstance(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArguments()); - } - - private CompletableFuture createTaskInstanceComment(DataFetchingEnvironment env) { - UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); - return getDataIndexApiExecutor().createUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument("comment")); - } - - private CompletableFuture createTaskInstanceAttachment(DataFetchingEnvironment env) { - UserTaskInstance userTaskInstance = getCacheService().getUserTaskInstancesCache().get(env.getArgument(TASK_ID)); - return getDataIndexApiExecutor().createUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument("name"), - env.getArgument("uri")); - } - - private CompletableFuture updateUserTaskComment(DataFetchingEnvironment env) { - Query query = getCacheService().getUserTaskInstancesCache().query(); - query.filter(singletonList(equalTo("comments.id", env.getArgument(COMMENT_ID)))); - UserTaskInstance userTaskInstance = query.execute().get(0); - return getDataIndexApiExecutor().updateUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument(COMMENT_ID), - env.getArgument("comment")); - } - - private CompletableFuture deleteUserTaskComment(DataFetchingEnvironment env) { - Query query = getCacheService().getUserTaskInstancesCache().query(); - query.filter(singletonList(equalTo("comments.id", env.getArgument(COMMENT_ID)))); - UserTaskInstance userTaskInstance = query.execute().get(0); - return getDataIndexApiExecutor().deleteUserTaskInstanceComment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument(COMMENT_ID)); - } - - private CompletableFuture updateUserTaskAttachment(DataFetchingEnvironment env) { - Query query = getCacheService().getUserTaskInstancesCache().query(); - query.filter(singletonList(equalTo("attachments.id", env.getArgument(ATTACHMENT_ID)))); - UserTaskInstance userTaskInstance = query.execute().get(0); - return getDataIndexApiExecutor().updateUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument(ATTACHMENT_ID), - env.getArgument("name"), - env.getArgument("uri")); - } - - private CompletableFuture deleteUserTaskAttachment(DataFetchingEnvironment env) { - Query query = getCacheService().getUserTaskInstancesCache().query(); - query.filter(singletonList(equalTo("attachments.id", env.getArgument(ATTACHMENT_ID)))); - UserTaskInstance userTaskInstance = query.execute().get(0); - return getDataIndexApiExecutor().deleteUserTaskInstanceAttachment(getServiceUrl(userTaskInstance.getEndpoint(), userTaskInstance.getProcessId()), - userTaskInstance, - env.getArgument(USER), - env.getArgument(GROUPS), - env.getArgument(ATTACHMENT_ID)); - } - private DataFetcher> getProcessInstanceAddedDataFetcher() { return objectCreatedPublisher(() -> getCacheService().getProcessInstancesCache()); } diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java index e75147bab8..66046b639f 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java @@ -72,7 +72,9 @@ public Message convert(Message message, Type type) { } else if (type.getTypeName().equals(KogitoJobCloudEvent.class.getTypeName())) { KogitoJobCloudEvent event = objectMapper.readValue(message.getPayload().toString(), KogitoJobCloudEvent.class); if (event.getData() == null) { - event.setData(objectMapper.readValue(message.getPayload().toString(), Job.class)); + Job job = objectMapper.readValue(message.getPayload().toString(), Job.class); + job.setEndpoint(event.getSource() != null ? event.getSource().toString() : "none"); + event.setData(job); } return message.withPayload(event); } else if (type.getTypeName().equals(UserTaskInstanceDataEvent.class.getTypeName())) { diff --git a/data-index/data-index-service/data-index-service-common/src/main/resources/domain.schema.graphqls b/data-index/data-index-service/data-index-service-common/src/main/resources/domain.schema.graphqls index e50c8dc121..f3a41fcd0a 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/resources/domain.schema.graphqls +++ b/data-index/data-index-service/data-index-service-common/src/main/resources/domain.schema.graphqls @@ -1,27 +1,5 @@ extend schema { subscription: Subscription - mutation: Mutation -} - -type Mutation { - ProcessInstanceAbort(id: String): String - ProcessInstanceRetry(id: String): String - ProcessInstanceSkip(id: String): String - ProcessInstanceUpdateVariables(id: String, variables: String): String - NodeInstanceTrigger(id: String, nodeId: String): String - NodeInstanceRetrigger(id: String, nodeInstanceId: String): String - NodeInstanceCancel(id: String, nodeInstanceId: String): String - JobCancel(id: String): String - JobReschedule(id: String, data: String): String - UserTaskInstanceUpdate(taskId: String, user: String, groups: [String], description: String, priority: String, - actualOwner: String, adminGroups: [String!], adminUsers: [String!], excludedUsers: [String!], - potentialGroups: [String!], potentialUsers: [String!], inputParams: String): String - UserTaskInstanceCommentCreate(taskId: String, user: String, groups: [String], comment: String): String - UserTaskInstanceAttachmentCreate(taskId: String, user: String, groups: [String], name: String, uri: String): String - UserTaskInstanceCommentUpdate(user:String, groups:[String],commentId: String, comment: String): String - UserTaskInstanceCommentDelete(user:String, groups:[String],commentId: String): String - UserTaskInstanceAttachmentUpdate(user:String, groups:[String],attachmentId: String, name: String, uri: String): String - UserTaskInstanceAttachmentDelete(user:String, groups:[String], attachmentId: String): String } type KogitoMetadata { diff --git a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/api/KogitoRuntimeClientTest.java b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/api/KogitoRuntimeClientTest.java index 6907d6ac85..1ccf292a12 100644 --- a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/api/KogitoRuntimeClientTest.java +++ b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/api/KogitoRuntimeClientTest.java @@ -15,7 +15,6 @@ */ package org.kie.kogito.index.service.api; -import java.nio.Buffer; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -27,91 +26,54 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.kie.kogito.index.TestUtils; -import org.kie.kogito.index.model.Job; +import org.kie.kogito.index.api.KogitoRuntimeCommonClient; +import org.kie.kogito.index.api.KogitoRuntimeCommonClientTest; import org.kie.kogito.index.model.ProcessInstance; import org.kie.kogito.index.model.UserTaskInstance; import org.kie.kogito.index.service.DataIndexServiceException; import org.mockito.ArgumentCaptor; -import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import io.quarkus.security.credential.TokenCredential; -import io.quarkus.security.identity.SecurityIdentity; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; -import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; -import io.vertx.ext.web.client.HttpRequest; import io.vertx.ext.web.client.HttpResponse; -import io.vertx.ext.web.client.WebClient; -import io.vertx.ext.web.client.WebClientOptions; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.ABORT_PROCESS_INSTANCE_PATH; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.CANCEL_JOB_PATH; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.CANCEL_NODE_INSTANCE_PATH; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.GET_PROCESS_INSTANCE_DIAGRAM_PATH; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.GET_PROCESS_INSTANCE_NODE_DEFINITIONS_PATH; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.GET_PROCESS_INSTANCE_SOURCE_PATH; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.RESCHEDULE_JOB_PATH; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.RETRIGGER_NODE_INSTANCE_PATH; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.RETRY_PROCESS_INSTANCE_PATH; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.SKIP_PROCESS_INSTANCE_PATH; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.TRIGGER_NODE_INSTANCE_PATH; -import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.UPDATE_VARIABLES_PROCESS_INSTANCE_PATH; +import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @ExtendWith(MockitoExtension.class) -public class KogitoRuntimeClientTest { +class KogitoRuntimeClientTest extends KogitoRuntimeCommonClientTest { - private static int ACTIVE = 1; private static int ERROR = 5; - private static String SERVICE_URL = "http://runtimeURL.com"; - private static String PROCESS_INSTANCE_ID = "pId"; private static String TASK_ID = "taskId"; - private static String JOB_ID = "jobId"; - - private static String AUTHORIZED_TOKEN = "authToken"; - - @Mock - public Vertx vertx; - - @Mock - private SecurityIdentity identityMock; - - private TokenCredential tokenCredential; private KogitoRuntimeClientImpl client; - @Mock - private WebClient webClientMock; - - @Mock - private HttpRequest httpRequestMock; - @BeforeEach public void setup() { - client = spy(new KogitoRuntimeClientImpl(vertx, identityMock)); - client.gatewayTargetUrl = Optional.empty(); - client.serviceWebClientMap.put(SERVICE_URL, webClientMock); + client = spy(new KogitoRuntimeClientImpl()); + client.setGatewayTargetUrl(Optional.empty()); + client.addServiceWebClient(SERVICE_URL, webClientMock); + client.setVertx(vertx); + client.setIdentity(identityMock); + } + + @Override + protected KogitoRuntimeCommonClient getClient() { + return client; } @Test - public void testAbortProcessInstance() { + void testAbortProcessInstance() { setupIdentityMock(); when(webClientMock.delete(anyString())).thenReturn(httpRequestMock); @@ -128,7 +90,7 @@ public void testAbortProcessInstance() { } @Test - public void testRetryProcessInstance() { + void testRetryProcessInstance() { setupIdentityMock(); when(webClientMock.post(anyString())).thenReturn(httpRequestMock); ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ERROR); @@ -144,7 +106,7 @@ public void testRetryProcessInstance() { } @Test - public void testSkipProcessInstance() { + void testSkipProcessInstance() { setupIdentityMock(); when(webClientMock.post(anyString())).thenReturn(httpRequestMock); @@ -161,10 +123,10 @@ public void testSkipProcessInstance() { } @Test - public void testUpdateProcessInstanceVariables() { - setupIdentityMock(); + void testUpdateProcessInstanceVariables() { when(webClientMock.put(anyString())).thenReturn(httpRequestMock); when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock); + when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock); ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ERROR); @@ -179,7 +141,7 @@ public void testUpdateProcessInstanceVariables() { } @Test - public void testTriggerNodeInstance() { + void testTriggerNodeInstance() { String nodeDefId = "nodeDefId"; setupIdentityMock(); when(webClientMock.post(anyString())).thenReturn(httpRequestMock); @@ -189,14 +151,14 @@ public void testTriggerNodeInstance() { client.triggerNodeInstance(SERVICE_URL, pI, nodeDefId); verify(client).sendPostClientRequest(webClientMock, format(TRIGGER_NODE_INSTANCE_PATH, pI.getProcessId(), pI.getId(), nodeDefId), - "Trigger Node " + nodeDefId + "from ProcessInstance with id: " + pI.getId()); + "Trigger Node " + nodeDefId + FROM_PROCESS_INSTANCE_WITH_ID + pI.getId()); ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class); verify(httpRequestMock).send(handlerCaptor.capture()); checkResponseHandling(handlerCaptor.getValue()); } @Test - public void testRetriggerNodeInstance() { + void testRetriggerNodeInstance() { String nodeInstanceId = "nodeInstanceId"; setupIdentityMock(); when(webClientMock.post(anyString())).thenReturn(httpRequestMock); @@ -206,15 +168,14 @@ public void testRetriggerNodeInstance() { client.retriggerNodeInstance(SERVICE_URL, pI, nodeInstanceId); verify(client).sendPostClientRequest(webClientMock, format(RETRIGGER_NODE_INSTANCE_PATH, pI.getProcessId(), pI.getId(), nodeInstanceId), - "Retrigger NodeInstance " + nodeInstanceId + - "from ProcessInstance with id: " + pI.getId()); + "Retrigger NodeInstance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + pI.getId()); ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class); verify(httpRequestMock).send(handlerCaptor.capture()); checkResponseHandling(handlerCaptor.getValue()); } @Test - public void testCancelNodeInstance() { + void testCancelNodeInstance() { String nodeInstanceId = "nodeInstanceId"; setupIdentityMock(); when(webClientMock.delete(anyString())).thenReturn(httpRequestMock); @@ -224,50 +185,14 @@ public void testCancelNodeInstance() { client.cancelNodeInstance(SERVICE_URL, pI, nodeInstanceId); verify(client).sendDeleteClientRequest(webClientMock, format(CANCEL_NODE_INSTANCE_PATH, pI.getProcessId(), pI.getId(), nodeInstanceId), - "Cancel NodeInstance " + nodeInstanceId + - "from ProcessInstance with id: " + pI.getId()); + "Cancel NodeInstance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + pI.getId()); ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class); verify(httpRequestMock).send(handlerCaptor.capture()); checkResponseHandling(handlerCaptor.getValue()); } @Test - public void testCancelJob() { - setupIdentityMock(); - when(webClientMock.delete(anyString())).thenReturn(httpRequestMock); - - Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED"); - client.cancelJob(SERVICE_URL, job); - - verify(client).sendDeleteClientRequest(webClientMock, - format(CANCEL_JOB_PATH, job.getId()), - "CANCEL Job with id: " + JOB_ID); - ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class); - verify(httpRequestMock).send(handlerCaptor.capture()); - checkResponseHandling(handlerCaptor.getValue()); - } - - @Test - public void testRescheduleJob() { - String newJobData = "{ }"; - setupIdentityMock(); - when(webClientMock.put(anyString())).thenReturn(httpRequestMock); - when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock); - - Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED"); - - client.rescheduleJob(SERVICE_URL, job, newJobData); - verify(client).sendJSONPutClientRequest(webClientMock, - format(RESCHEDULE_JOB_PATH, JOB_ID), - "RESCHEDULED JOB with id: " + job.getId(), newJobData); - ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class); - JsonObject jsonOject = new JsonObject(newJobData); - verify(httpRequestMock).sendJson(eq(jsonOject), handlerCaptor.capture()); - checkResponseHandling(handlerCaptor.getValue()); - } - - @Test - public void testGetProcessInstanceDiagram() { + void testGetProcessInstanceDiagram() { setupIdentityMock(); when(webClientMock.get(anyString())).thenReturn(httpRequestMock); @@ -285,7 +210,7 @@ public void testGetProcessInstanceDiagram() { } @Test - public void testGetProcessInstanceNodeDefinitions() { + void testGetProcessInstanceNodeDefinitions() { setupIdentityMock(); when(webClientMock.get(anyString())).thenReturn(httpRequestMock); @@ -314,7 +239,7 @@ public void testGetProcessInstanceNodeDefinitions() { } @Test - public void testGetProcessInstanceSource() { + void testGetProcessInstanceSource() { setupIdentityMock(); when(webClientMock.get(anyString())).thenReturn(httpRequestMock); @@ -332,7 +257,7 @@ public void testGetProcessInstanceSource() { } @Test - public void testSendOk() throws Exception { + void testSendOk() throws Exception { AsyncResult result = mock(AsyncResult.class); when(result.succeeded()).thenReturn(true); HttpResponse response = mock(HttpResponse.class); @@ -348,7 +273,7 @@ public void testSendOk() throws Exception { } @Test - public void testSendNotFound() throws Exception { + void testSendNotFound() throws Exception { AsyncResult result = mock(AsyncResult.class); when(result.succeeded()).thenReturn(true); HttpResponse response = mock(HttpResponse.class); @@ -363,7 +288,7 @@ public void testSendNotFound() throws Exception { } @Test - public void testSendException() throws Exception { + void testSendException() throws Exception { AsyncResult result = mock(AsyncResult.class); when(result.succeeded()).thenReturn(false); when(result.cause()).thenReturn(new RuntimeException()); @@ -382,7 +307,7 @@ public void testSendException() throws Exception { } @Test - public void testGetUserTaskSchema() { + void testGetUserTaskSchema() { setupIdentityMock(); when(webClientMock.get(anyString())).thenReturn(httpRequestMock); @@ -398,7 +323,7 @@ public void testGetUserTaskSchema() { } @Test - public void testUpdateUserTaskInstance() { + void testUpdateUserTaskInstance() { setupIdentityMock(); when(webClientMock.patch(anyString())).thenReturn(httpRequestMock); @@ -421,10 +346,10 @@ public void testUpdateUserTaskInstance() { } @Test - public void testCreateUserTaskInstanceComment() { + void testCreateUserTaskInstanceComment() { String commentInfo = "newComment"; - setupIdentityMock(); when(webClientMock.post(anyString())).thenReturn(httpRequestMock); + when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock); when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock); UserTaskInstance taskInstance = createUserTaskInstance(PROCESS_INSTANCE_ID, TASK_ID, "InProgress"); @@ -439,11 +364,11 @@ public void testCreateUserTaskInstanceComment() { } @Test - public void testCreateUserTaskInstanceAttachment() { + void testCreateUserTaskInstanceAttachment() { String attachmentUri = "nhttps://drive.google.com/file/d/AttachmentUri"; String attachmentName = "newAttachmentName"; - setupIdentityMock(); when(webClientMock.post(anyString())).thenReturn(httpRequestMock); + when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock); when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock); UserTaskInstance taskInstance = createUserTaskInstance(PROCESS_INSTANCE_ID, TASK_ID, "InProgress"); @@ -461,11 +386,11 @@ public void testCreateUserTaskInstanceAttachment() { } @Test - public void testUpdateUserTaskInstanceComment() { + void testUpdateUserTaskInstanceComment() { String commentInfo = "NewCommentContent"; String commentId = "commentId"; - setupIdentityMock(); when(webClientMock.put(anyString())).thenReturn(httpRequestMock); + when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock); when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock); UserTaskInstance taskInstance = createUserTaskInstance(PROCESS_INSTANCE_ID, TASK_ID, "InProgress"); @@ -482,7 +407,7 @@ public void testUpdateUserTaskInstanceComment() { } @Test - public void testDeleteTaskInstanceComment() { + void testDeleteTaskInstanceComment() { String commentId = "commentId"; setupIdentityMock(); when(webClientMock.delete(anyString())).thenReturn(httpRequestMock); @@ -499,12 +424,13 @@ public void testDeleteTaskInstanceComment() { } @Test - public void testUpdateUserTaskInstanceAttachment() { + void testUpdateUserTaskInstanceAttachment() { String attachmentName = "NewAttachmentName"; String attachmentContent = "NewAttachmentContent"; String attachmentId = "attachmentId"; - setupIdentityMock(); + when(webClientMock.put(anyString())).thenReturn(httpRequestMock); + when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock); when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock); UserTaskInstance taskInstance = createUserTaskInstance(PROCESS_INSTANCE_ID, TASK_ID, "InProgress"); @@ -525,7 +451,7 @@ public void testUpdateUserTaskInstanceAttachment() { } @Test - public void testDeleteTaskInstanceAttachment() { + void testDeleteTaskInstanceAttachment() { String attachmentId = "attachmentId"; setupIdentityMock(); when(webClientMock.delete(anyString())).thenReturn(httpRequestMock); @@ -542,100 +468,12 @@ public void testDeleteTaskInstanceAttachment() { } @Test - public void testWebClientToURLOptions() { - String defaultHost = "localhost"; - int defaultPort = 8180; - WebClientOptions webClientOptions = client.getWebClientToURLOptions("http://" + defaultHost + ":" + defaultPort); - assertThat(webClientOptions.getDefaultHost()).isEqualTo(defaultHost); - assertThat(webClientOptions.getDefaultPort()).isEqualTo(defaultPort); - } - - @Test - public void testWebClientToURLOptionsWithoutPort() { - String dataIndexUrl = "http://service.com"; - WebClientOptions webClientOptions = client.getWebClientToURLOptions(dataIndexUrl); - assertThat(webClientOptions.getDefaultPort()).isEqualTo(80); - assertThat(webClientOptions.getDefaultHost()).isEqualTo("service.com"); - assertFalse(webClientOptions.isSsl()); - } - - @Test - public void testWebClientToURLOptionsWithoutPortSSL() { - String dataIndexurl = "https://service.com"; - WebClientOptions webClientOptions = client.getWebClientToURLOptions(dataIndexurl); - assertThat(webClientOptions.getDefaultPort()).isEqualTo(443); - assertThat(webClientOptions.getDefaultHost()).isEqualTo("service.com"); - assertTrue(webClientOptions.isSsl()); - } - - @Test - public void testMalformedURL() { - assertThat(client.getWebClientToURLOptions("malformedURL")).isNull(); - } - - @Test - void testOverrideURL() { - String host = "host.testcontainers.internal"; - client.gatewayTargetUrl = Optional.of(host); - WebClientOptions webClientOptions = client.getWebClientToURLOptions("http://service.com"); - assertThat(webClientOptions.getDefaultHost()).isEqualTo(host); - } - - @Test - public void testGetAuthHeader() { - tokenCredential = mock(TokenCredential.class); - when(identityMock.getCredential(TokenCredential.class)).thenReturn(tokenCredential); - when(tokenCredential.getToken()).thenReturn(AUTHORIZED_TOKEN); - - String token = client.getAuthHeader(); - verify(identityMock, times(2)).getCredential(TokenCredential.class); - assertThat(token).isEqualTo("Bearer " + AUTHORIZED_TOKEN); - - when(identityMock.getCredential(TokenCredential.class)).thenReturn(null); - token = client.getAuthHeader(); - assertThat(token).isEqualTo(""); - } - - private AsyncResult createResponseMocks(HttpResponse response, boolean succeed, int statusCode) { - AsyncResult asyncResultMock = mock(AsyncResult.class); - when(asyncResultMock.succeeded()).thenReturn(succeed); - when(asyncResultMock.result()).thenReturn(response); - when(response.statusCode()).thenReturn(statusCode); - return asyncResultMock; - } - - private ProcessInstance createProcessInstance(String processInstanceId, int status) { - return TestUtils.getProcessInstance("travels", processInstanceId, status, null, null); + protected void testCancelJob() { + testCancelJobRest(); } private UserTaskInstance createUserTaskInstance(String processInstanceId, String userTaskId, String state) { return TestUtils.getUserTaskInstance(userTaskId, "travels", processInstanceId, null, null, state, "jdoe"); } - private Job createJob(String jobId, String processInstanceId, String status) { - return TestUtils.getJob(jobId, "travels", processInstanceId, null, null, status); - } - - protected void checkResponseHandling(Handler>> handler) { - HttpResponse response = mock(HttpResponse.class); - HttpResponse responseWithoutError = mock(HttpResponse.class); - - handler.handle(createResponseMocks(response, false, 404)); - verify(response).statusMessage(); - verify(response).body(); - verify(response, never()).bodyAsString(); - - handler.handle(createResponseMocks(responseWithoutError, true, 200)); - verify(responseWithoutError, never()).statusMessage(); - verify(responseWithoutError, never()).body(); - verify(responseWithoutError).bodyAsString(); - } - - protected void setupIdentityMock() { - tokenCredential = mock(TokenCredential.class); - when(identityMock.getCredential(TokenCredential.class)).thenReturn(tokenCredential); - when(tokenCredential.getToken()).thenReturn(AUTHORIZED_TOKEN); - when(httpRequestMock.putHeader(eq("Authorization"), eq("Bearer " + AUTHORIZED_TOKEN))).thenReturn(httpRequestMock); - } - } diff --git a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/DataIndexEventPublisher.java b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/DataIndexEventPublisher.java index 0d53d229f0..5ce4b6a66e 100644 --- a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/DataIndexEventPublisher.java +++ b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/DataIndexEventPublisher.java @@ -56,7 +56,9 @@ public void publish(DataEvent event) { break; case "JobEvent": try { - indexingService.indexJob(getObjectMapper().readValue(new String((byte[]) event.getData()), Job.class)); + Job job = getObjectMapper().readValue(new String((byte[]) event.getData()), Job.class); + job.setEndpoint(event.getSource().toString()); + indexingService.indexJob(job); } catch (IOException e) { throw new UncheckedIOException(e); } diff --git a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java index 4508553b7a..7e2918f5ba 100644 --- a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java +++ b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java @@ -18,9 +18,13 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.time.ZonedDateTime; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.stream.Collectors; import javax.enterprise.context.ApplicationScoped; @@ -28,23 +32,32 @@ import javax.inject.Inject; import org.eclipse.microprofile.context.ManagedExecutor; +import org.kie.kogito.Application; import org.kie.kogito.addon.source.files.SourceFilesProvider; import org.kie.kogito.index.api.KogitoRuntimeClient; +import org.kie.kogito.index.api.KogitoRuntimeCommonClient; import org.kie.kogito.index.model.Job; import org.kie.kogito.index.model.Node; import org.kie.kogito.index.model.ProcessInstance; import org.kie.kogito.index.model.UserTaskInstance; import org.kie.kogito.index.service.DataIndexServiceException; import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess; +import org.kie.kogito.jobs.*; import org.kie.kogito.process.Process; +import org.kie.kogito.process.ProcessInstanceExecutionException; import org.kie.kogito.process.Processes; import org.kie.kogito.process.impl.AbstractProcess; +import org.kie.kogito.services.uow.UnitOfWorkExecutor; import org.kie.kogito.svg.ProcessSvgService; +import org.kie.kogito.timer.TimerInstance; import static org.jbpm.ruleflow.core.Metadata.UNIQUE_ID; +import static org.kie.kogito.index.json.JsonUtils.getObjectMapper; @ApplicationScoped -public class KogitoAddonRuntimeClientImpl implements KogitoRuntimeClient { +public class KogitoAddonRuntimeClientImpl extends KogitoRuntimeCommonClient implements KogitoRuntimeClient { + + private static String SUCCESSFULLY_OPERATION_MESSAGE = "Successfully performed: %s"; private ProcessSvgService processSvgService; @@ -52,35 +65,71 @@ public class KogitoAddonRuntimeClientImpl implements KogitoRuntimeClient { private Processes processes; + private Application application; + + private JobsService jobsService; + @Inject public KogitoAddonRuntimeClientImpl(Instance processSvgService, SourceFilesProvider sourceFilesProvider, - Instance processesInstance) { + Instance processesInstance, + Instance application, + Instance jobsService) { this.processSvgService = processSvgService.isResolvable() ? processSvgService.get() : null; this.sourceFilesProvider = sourceFilesProvider; this.processes = processesInstance.isResolvable() ? processesInstance.get() : null; + this.application = application.isResolvable() ? application.get() : null; + this.jobsService = jobsService.isResolvable() ? jobsService.get() : null; } @Inject ManagedExecutor managedExecutor; + public void setJobsService(JobsService jobsService) { + this.jobsService = jobsService; + } + static CompletableFuture throwUnsupportedException() { - return CompletableFuture.failedFuture(new UnsupportedOperationException()); + return CompletableFuture.failedFuture(new UnsupportedOperationException("Unsupported operation using Data Index addon")); } @Override public CompletableFuture abortProcessInstance(String serviceURL, ProcessInstance processInstance) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.abort(); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, "ABORT ProcessInstance with id: " + processInstance.getId()); + } + })); } @Override public CompletableFuture retryProcessInstance(String serviceURL, ProcessInstance processInstance) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.error().get().retrigger(); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, "RETRY ProcessInstance in error with id: " + processInstance.getId()); + } + })); } @Override public CompletableFuture skipProcessInstance(String serviceURL, ProcessInstance processInstance) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.error().get().skip(); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, "SKIP ProcessInstance in error with id: " + processInstance.getId()); + } + })); } @Override @@ -133,27 +182,44 @@ public CompletableFuture> getProcessInstanceNodeDefinitions(String se @Override public CompletableFuture triggerNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeDefinitionId) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.triggerNode(nodeDefinitionId); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, + "TRIGGER Node " + nodeDefinitionId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId()); + } + })); } @Override public CompletableFuture retriggerNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeInstanceId) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.retriggerNodeInstance(nodeInstanceId); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, + "RETRIGGER Node instance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId()); + } + })); } @Override public CompletableFuture cancelNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeInstanceId) { - return throwUnsupportedException(); - } - - @Override - public CompletableFuture cancelJob(String serviceURL, Job job) { - return throwUnsupportedException(); - } - - @Override - public CompletableFuture rescheduleJob(String serviceURL, Job job, String newJobData) { - return throwUnsupportedException(); + return CompletableFuture.completedFuture(executeOnProcessInstance(processInstance.getProcessId(), processInstance.getId(), pInstance -> { + pInstance.cancelNodeInstance(nodeInstanceId); + + if (pInstance.status() == org.kie.kogito.process.ProcessInstance.STATE_ERROR) { + throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage()); + } else { + return String.format(SUCCESSFULLY_OPERATION_MESSAGE, + "CANCEL Node instance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId()); + } + })); } @Override @@ -196,4 +262,117 @@ public CompletableFuture updateUserTaskInstanceAttachment(String service public CompletableFuture deleteUserTaskInstanceAttachment(String serviceURL, UserTaskInstance userTaskInstance, String user, List groups, String attachmentId) { return throwUnsupportedException(); } + + private String executeOnProcessInstance(String processId, String processInstanceId, Function, String> supplier) { + + Process process = processes != null ? processes.processById(processId) : null; + + if (process == null) { + throw new DataIndexServiceException(String.format("Unable to find Process instance with id %s to perform the operation requested", processInstanceId)); + } + return UnitOfWorkExecutor.executeInUnitOfWork(application.unitOfWorkManager(), () -> { + Optional> processInstanceFound = process.instances().findById(processInstanceId); + if (processInstanceFound.isPresent()) { + org.kie.kogito.process.ProcessInstance processInstance = processInstanceFound.get(); + return supplier.apply(processInstance); + } else { + throw new DataIndexServiceException(String.format("Process instance with id %s doesn't allow the operation requested", processInstanceId)); + } + }); + } + + @Override + public CompletableFuture cancelJob(String serviceURL, Job job) { + if (jobsService == null) { + return super.cancelJob(serviceURL, job); + } + if (jobsService.cancelJob(job.getId())) { + return CompletableFuture.completedFuture(String.format(SUCCESSFULLY_OPERATION_MESSAGE, + "CANCEL Job with id" + job.getId() + FROM_PROCESS_INSTANCE_WITH_ID + job.getProcessInstanceId())); + } + return CompletableFuture.failedFuture(new DataIndexServiceException("Unable to CANCEL Job with id" + job.getId())); + } + + @Override + public CompletableFuture rescheduleJob(String serviceURL, Job job, String newJobData) { + if (jobsService == null) { + return super.rescheduleJob(serviceURL, job, newJobData); + } + try { + jobsService.cancelJob(job.getId()); + + //TODO : necessary create createDurationTimer and register?? + Job newJob = getObjectMapper().readValue(newJobData, Job.class); + if (newJob.getExpirationTime() != null) { + job.setExpirationTime(newJob.getExpirationTime()); + } + if (newJob.getPriority() != null) { + job.setPriority(newJob.getPriority()); + } + if (newJob.getRepeatInterval() != null) { + job.setRepeatInterval(newJob.getRepeatInterval()); + } + if (newJob.getRepeatLimit() != null) { + job.setRepeatLimit(newJob.getRepeatLimit()); + } + if (newJob.getRetries() != null) { + job.setRetries(newJob.getRetries()); + } + ProcessInstanceJobDescription description = + ProcessInstanceJobDescription.builder() + .id(job.getId()) + .timerId("-1") + .expirationTime(new ExpirationTime() { + @Override + public ZonedDateTime get() { + return job.getExpirationTime(); + } + + @Override + public Long repeatInterval() { + return job.getRepeatInterval(); + } + + @Override + public Integer repeatLimit() { + return job.getRepeatLimit(); + } + }) + .processInstanceId(job.getProcessInstanceId()) + .processId(job.getProcessId()) + .priority(job.getPriority()) + .build(); + // TODO: how about passing retries? + if (jobsService.scheduleProcessInstanceJob(description) != null) { + return CompletableFuture.completedFuture(String.format(SUCCESSFULLY_OPERATION_MESSAGE, + "RESCHEDULE Job with id" + job.getId() + FROM_PROCESS_INSTANCE_WITH_ID + job.getProcessInstanceId())); + } + return CompletableFuture.failedFuture(new DataIndexServiceException("Unable to RESCHEDULE Job with id" + job.getId())); + } catch (IOException e) { + return CompletableFuture.failedFuture(new DataIndexServiceException("Unable to RESCHEDULE Job with id" + job.getId(), e)); + } + } + + private TimerInstance createDurationTimer(long duration) { + TimerInstance timerInstance = new TimerInstance(); + timerInstance.setId(UUID.randomUUID().toString()); + timerInstance.setTimerId("-1"); + timerInstance.setDelay(duration); + timerInstance.setPeriod(0); + return timerInstance; + } + + private TimerInstance registerTimer(TimerInstance timerInstance, Job job) { + ProcessInstanceJobDescription description = + ProcessInstanceJobDescription.builder() + .id(timerInstance.getId()) + .timerId(timerInstance.getTimerId()) + .expirationTime(DurationExpirationTime.after(timerInstance.getDelay())) + .processInstanceId(job.getProcessInstanceId()) + .processId(job.getProcessId()) + .build(); + jobsService.scheduleProcessInstanceJob(description); + return timerInstance; + } + } diff --git a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/graphql/GraphQLAddonSchemaManagerImpl.java b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/graphql/GraphQLAddonSchemaManagerImpl.java index d3f4c10f29..afa4308dcb 100644 --- a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/graphql/GraphQLAddonSchemaManagerImpl.java +++ b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/graphql/GraphQLAddonSchemaManagerImpl.java @@ -40,6 +40,18 @@ public GraphQLSchema createSchema() { builder.dataFetcher("Jobs", this::getJobsValues); return builder; }) + .type("Mutation", builder -> { + builder.dataFetcher("ProcessInstanceAbort", this::abortProcessInstance); + builder.dataFetcher("ProcessInstanceRetry", this::retryProcessInstance); + builder.dataFetcher("ProcessInstanceSkip", this::skipProcessInstance); + builder.dataFetcher("ProcessInstanceUpdateVariables", this::updateProcessInstanceVariables); + builder.dataFetcher("NodeInstanceTrigger", this::triggerNodeInstance); + builder.dataFetcher("NodeInstanceRetrigger", this::retriggerNodeInstance); + builder.dataFetcher("NodeInstanceCancel", this::cancelNodeInstance); + builder.dataFetcher("JobCancel", this::cancelJob); + builder.dataFetcher("JobReschedule", this::rescheduleJob); + return builder; + }) .type("ProcessInstance", builder -> { builder.dataFetcher("parentProcessInstance", this::getParentProcessInstanceValue); builder.dataFetcher("childProcessInstances", this::getChildProcessInstancesValues); diff --git a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/test/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImplTest.java b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/test/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImplTest.java new file mode 100644 index 0000000000..ba8999401c --- /dev/null +++ b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/test/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImplTest.java @@ -0,0 +1,272 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kie.kogito.index.addon.api; + +import java.util.Optional; + +import javax.enterprise.inject.Instance; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.kie.kogito.Application; +import org.kie.kogito.addon.source.files.SourceFilesProvider; +import org.kie.kogito.index.api.KogitoRuntimeCommonClient; +import org.kie.kogito.index.api.KogitoRuntimeCommonClientTest; +import org.kie.kogito.index.model.Job; +import org.kie.kogito.index.model.ProcessInstance; +import org.kie.kogito.jobs.JobsService; +import org.kie.kogito.jobs.ProcessInstanceJobDescription; +import org.kie.kogito.process.ProcessError; +import org.kie.kogito.process.ProcessInstanceExecutionException; +import org.kie.kogito.process.ProcessInstances; +import org.kie.kogito.process.Processes; +import org.kie.kogito.process.impl.AbstractProcess; +import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory; +import org.kie.kogito.services.uow.DefaultUnitOfWorkManager; +import org.kie.kogito.svg.ProcessSvgService; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import io.quarkus.security.credential.TokenCredential; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +public class KogitoAddonRuntimeClientImplTest extends KogitoRuntimeCommonClientTest { + + private static final String NODE_ID = "nodeId"; + + public static final String NODE_ID_ERROR = "processInstanceIdError"; + + private KogitoAddonRuntimeClientImpl client; + + @Mock + Instance processSvgServiceInstance; + + @Mock + private ProcessSvgService processSvgService; + + @Mock + private SourceFilesProvider sourceFilesProvider; + + @Mock + Instance processesInstance; + + @Mock + private Processes processes; + + @Mock + private AbstractProcess process; + + @Mock + private ProcessInstances instances; + + @Mock + private org.kie.kogito.process.ProcessInstance processInstance; + + @Mock + private ProcessError error; + + @Mock + private Object variables; + + @Mock + Instance applicationInstance; + + @Mock + private Application application; + + @Mock + Instance jobsServiceInstance; + + @Mock + private JobsService jobsService; + + @BeforeEach + public void setup() { + lenient().when(processSvgServiceInstance.isResolvable()).thenReturn(true); + lenient().when(processSvgServiceInstance.get()).thenReturn(processSvgService); + lenient().when(processesInstance.isResolvable()).thenReturn(true); + lenient().when(processesInstance.get()).thenReturn(processes); + lenient().when(processes.processById(anyString())).thenReturn(process); + lenient().when(process.instances()).thenReturn(instances); + lenient().when(instances.findById(anyString())).thenReturn(Optional.of(processInstance)); + lenient().when(processInstance.error()).thenReturn(Optional.of(error)); + lenient().when(processInstance.variables()).thenReturn(variables); + lenient().when(processInstance.id()).thenReturn(PROCESS_INSTANCE_ID); + lenient().when(processInstance.status()).thenReturn(org.kie.kogito.process.ProcessInstance.STATE_ERROR); + lenient().when(error.failedNodeId()).thenReturn(NODE_ID_ERROR); + lenient().when(error.errorMessage()).thenReturn("Test error message"); + lenient().when(application.unitOfWorkManager()).thenReturn(new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory())); + lenient().when(applicationInstance.isResolvable()).thenReturn(true); + lenient().when(applicationInstance.get()).thenReturn(application); + lenient().when(jobsService.cancelJob(anyString())).thenReturn(true); + lenient().when(jobsService.scheduleProcessInstanceJob(any(ProcessInstanceJobDescription.class))).thenReturn("OK"); + lenient().when(jobsServiceInstance.isResolvable()).thenReturn(true); + lenient().when(jobsServiceInstance.get()).thenReturn(jobsService); + + client = spy(new KogitoAddonRuntimeClientImpl(processSvgServiceInstance, sourceFilesProvider, processesInstance, applicationInstance, jobsServiceInstance)); + client.setGatewayTargetUrl(Optional.empty()); + client.addServiceWebClient(SERVICE_URL, webClientMock); + client.setVertx(vertx); + client.setIdentity(identityMock); + } + + @Override + public KogitoRuntimeCommonClient getClient() { + return client; + } + + private org.kie.kogito.process.ProcessInstance mockProcessInstanceStatusActive() { + return doAnswer((v) -> { + when(processInstance.status()).thenReturn(org.kie.kogito.process.ProcessInstance.STATE_ACTIVE); + return null; + }).when(processInstance); + } + + private org.kie.kogito.process.ProcessInstance mockProcessInstanceStatusError() { + return doAnswer((v) -> { + when(processInstance.status()).thenReturn(org.kie.kogito.process.ProcessInstance.STATE_ERROR); + return null; + }).when(processInstance); + } + + private ProcessError mockProcessInstanceStatusActiveOnError() { + return doAnswer((v) -> { + when(processInstance.status()).thenReturn(org.kie.kogito.process.ProcessInstance.STATE_ACTIVE); + return null; + }).when(error); + } + + @Test + void testAbortProcessInstanceSuccess() { + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + mockProcessInstanceStatusActive().abort(); + client.abortProcessInstance(SERVICE_URL, pI); + verify(processInstance, times(0)).error(); + verify(processInstance, times(1)).abort(); + } + + @Test + void testAbortProcessInstanceError() { + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + mockProcessInstanceStatusError().abort(); + assertThrows(ProcessInstanceExecutionException.class, + () -> client.abortProcessInstance(SERVICE_URL, pI)); + verify(processInstance, times(2)).error(); + verify(processInstance, times(1)).abort(); + } + + @Test + void testRetryProcessInstance() { + mockProcessInstanceStatusActiveOnError().retrigger(); + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + client.retryProcessInstance(SERVICE_URL, pI); + verify(processInstance, times(1)).error(); + verify(error, times(1)).retrigger(); + verify(error, times(0)).skip(); + } + + @Test + void testSkipProcessInstance() { + mockProcessInstanceStatusActiveOnError().skip(); + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + client.skipProcessInstance(SERVICE_URL, pI); + verify(processInstance, times(1)).error(); + verify(error, times(0)).retrigger(); + verify(error, times(1)).skip(); + } + + @Test + void testTriggerNodeInstance() { + mockProcessInstanceStatusActive().triggerNode(NODE_ID); + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + client.triggerNodeInstance(SERVICE_URL, pI, NODE_ID); + verify(processInstance, times(0)).error(); + verify(processInstance, times(1)).triggerNode(NODE_ID); + } + + @Test + void testRetriggerNodeInstance() { + mockProcessInstanceStatusActive().retriggerNodeInstance(NODE_ID); + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + client.retriggerNodeInstance(SERVICE_URL, pI, NODE_ID); + verify(processInstance, times(0)).error(); + verify(processInstance, times(1)).retriggerNodeInstance(NODE_ID); + } + + @Test + void testCancelNodeInstance() { + mockProcessInstanceStatusActive().cancelNodeInstance(NODE_ID); + ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE); + client.cancelNodeInstance(SERVICE_URL, pI, NODE_ID); + verify(processInstance, times(0)).error(); + verify(processInstance, times(1)).cancelNodeInstance(NODE_ID); + } + + @Test + void testCancelJobWithRest() { + client.setJobsService(null); + testCancelJobRest(); + } + + @Test + void testCancelJobWithInternalJobServiceInstance() { + client.setJobsService(jobsService); + Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED"); + + client.cancelJob(SERVICE_URL, job); + verify(jobsService).cancelJob(JOB_ID); + } + + @Test + protected void testRescheduleWithoutJobServiceInstance() { + client.setJobsService(null); + super.testRescheduleWithoutJobServiceInstance(); + } + + @Test + void testRescheduleWithJobServiceInstance() { + client.setJobsService(jobsService); + String newJobData = "{\"expirationTime\": \"2023-08-27T04:35:54.631Z\",\"repeatInterval\": 2}"; + + Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED"); + job.setRepeatLimit(3); + job.setRepeatInterval(3L); + + getClient().rescheduleJob(SERVICE_URL, job, newJobData); + ArgumentCaptor processInstanceJobDescriptionArgumentCaptor = ArgumentCaptor.forClass(ProcessInstanceJobDescription.class); + + verify(jobsService).cancelJob(JOB_ID); + verify(jobsService).scheduleProcessInstanceJob(processInstanceJobDescriptionArgumentCaptor.capture()); + + assertThat(processInstanceJobDescriptionArgumentCaptor.getValue().expirationTime().get()).isEqualTo("2023-08-27T04:35:54.631Z"); + assertThat(processInstanceJobDescriptionArgumentCaptor.getValue().expirationTime().repeatInterval()).isEqualTo(2); + assertThat(processInstanceJobDescriptionArgumentCaptor.getValue().expirationTime().repeatLimit()).isEqualTo(3); + } + + protected void setupIdentityMock() { + tokenCredential = mock(TokenCredential.class); + when(identityMock.getCredential(TokenCredential.class)).thenReturn(tokenCredential); + when(tokenCredential.getToken()).thenReturn(AUTHORIZED_TOKEN); + when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock); + } + +}