diff --git a/data-index/data-index-common/pom.xml b/data-index/data-index-common/pom.xml
index 6a52279bf0..58231a1fed 100644
--- a/data-index/data-index-common/pom.xml
+++ b/data-index/data-index-common/pom.xml
@@ -29,6 +29,10 @@
io.quarkus
quarkus-vertx
+
+ io.vertx
+ vertx-web-client
+
io.quarkus
quarkus-reactive-routes
diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/KogitoRuntimeCommonClient.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/KogitoRuntimeCommonClient.java
new file mode 100644
index 0000000000..de45d64be6
--- /dev/null
+++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/api/KogitoRuntimeCommonClient.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright 2023 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kie.kogito.index.api;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+import javax.enterprise.context.ApplicationScoped;
+import javax.inject.Inject;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.kie.kogito.index.model.Job;
+import org.kie.kogito.index.service.DataIndexServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.quarkus.security.credential.TokenCredential;
+import io.quarkus.security.identity.SecurityIdentity;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import static java.lang.String.format;
+
+@ApplicationScoped
+public class KogitoRuntimeCommonClient {
+
+ public static final String CANCEL_JOB_PATH = "/jobs/%s";
+ public static final String RESCHEDULE_JOB_PATH = "/jobs/%s";
+
+ public static final String FROM_PROCESS_INSTANCE_WITH_ID = "from ProcessInstance with id: ";
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KogitoRuntimeCommonClient.class);
+
+ protected Vertx vertx;
+
+ protected SecurityIdentity identity;
+
+ protected Map serviceWebClientMap = new HashMap<>();
+
+ @ConfigProperty(name = "kogito.dataindex.gateway.url")
+ protected Optional gatewayTargetUrl;
+
+ public KogitoRuntimeCommonClient() {
+ }
+
+ public void setGatewayTargetUrl(Optional gatewayTargetUrl) {
+ this.gatewayTargetUrl = gatewayTargetUrl;
+ }
+
+ public void addServiceWebClient(String serviceUrl, WebClient webClient) {
+ serviceWebClientMap.put(serviceUrl, webClient);
+ }
+
+ protected WebClient getWebClient(String runtimeServiceUrl) {
+ if (runtimeServiceUrl == null) {
+ throw new DataIndexServiceException("Runtime service URL not defined, please review the kogito.service.url system property to point the public URL for this runtime.");
+ } else {
+ return serviceWebClientMap.computeIfAbsent(runtimeServiceUrl, url -> WebClient.create(vertx, getWebClientToURLOptions(runtimeServiceUrl)));
+ }
+ }
+
+ public WebClientOptions getWebClientToURLOptions(String targetHttpURL) {
+ try {
+ URL dataIndexURL = new URL(targetHttpURL);
+ return new WebClientOptions()
+ .setDefaultHost(gatewayTargetUrl.orElse(dataIndexURL.getHost()))
+ .setDefaultPort((dataIndexURL.getPort() != -1 ? dataIndexURL.getPort() : dataIndexURL.getDefaultPort()))
+ .setSsl(dataIndexURL.getProtocol().compareToIgnoreCase("https") == 0);
+ } catch (MalformedURLException ex) {
+ LOGGER.error(String.format("Invalid runtime service URL: %s", targetHttpURL), ex);
+ return null;
+ }
+ }
+
+ public CompletableFuture cancelJob(String serviceURL, Job job) {
+ String requestURI = format(CANCEL_JOB_PATH, job.getId());
+ LOGGER.debug("Sending DELETE to URI {}", requestURI);
+ return sendDeleteClientRequest(getWebClient(serviceURL), requestURI, "CANCEL Job with id: " + job.getId());
+ }
+
+ public CompletableFuture rescheduleJob(String serviceURL, Job job, String newJobData) {
+ String requestURI = format(RESCHEDULE_JOB_PATH, job.getId());
+ LOGGER.debug("Sending body: {} PATCH to URI {}", newJobData, requestURI);
+ return sendPatchClientRequest(getWebClient(serviceURL), requestURI,
+ "RESCHEDULED JOB with id: " + job.getId(), new JsonObject(newJobData));
+ }
+
+ public CompletableFuture sendDeleteClientRequest(WebClient webClient, String requestURI, String logMessage) {
+ CompletableFuture future = new CompletableFuture<>();
+ webClient.delete(requestURI)
+ .putHeader("Authorization", getAuthHeader())
+ .send(res -> asyncHttpResponseTreatment(res, future, logMessage));
+ LOGGER.debug("Sending DELETE to URI {}", requestURI);
+ return future;
+ }
+
+ protected void asyncHttpResponseTreatment(AsyncResult> res, CompletableFuture future, String logMessage) {
+ if (res.succeeded() && (res.result().statusCode() == 200 || res.result().statusCode() == 201)) {
+ future.complete(res.result().bodyAsString() != null ? res.result().bodyAsString() : "Successfully performed: " + logMessage);
+ } else {
+ future.completeExceptionally(new DataIndexServiceException(getErrorMessage(logMessage, res.result())));
+ }
+ }
+
+ public CompletableFuture sendPatchClientRequest(WebClient webClient, String requestURI, String logMessage, JsonObject jsonBody) {
+ CompletableFuture future = new CompletableFuture<>();
+ webClient.patch(requestURI)
+ .putHeader("Authorization", getAuthHeader())
+ .sendJson(jsonBody, res -> asyncHttpResponseTreatment(res, future, logMessage));
+ return future;
+ }
+
+ protected String getErrorMessage(String logMessage, HttpResponse result) {
+ String errorMessage = "FAILED: " + logMessage;
+ if (result != null) {
+ errorMessage += " errorCode:" + result.statusCode() +
+ " errorStatus:" + result.statusMessage() +
+ " errorMessage:" + (result.body() != null ? result.body().toString() : "-");
+ }
+ return errorMessage;
+ }
+
+ public String getAuthHeader() {
+ if (identity != null && identity.getCredential(TokenCredential.class) != null) {
+ return "Bearer " + identity.getCredential(TokenCredential.class).getToken();
+ }
+ return "";
+ }
+
+ @Inject
+ public void setIdentity(SecurityIdentity identity) {
+ this.identity = identity;
+ }
+
+ @Inject
+ public void setVertx(Vertx vertx) {
+ this.vertx = vertx;
+ }
+
+}
diff --git a/data-index/data-index-common/src/test/java/org/kie/kogito/index/api/KogitoRuntimeCommonClientTest.java b/data-index/data-index-common/src/test/java/org/kie/kogito/index/api/KogitoRuntimeCommonClientTest.java
new file mode 100644
index 0000000000..dfb78dc975
--- /dev/null
+++ b/data-index/data-index-common/src/test/java/org/kie/kogito/index/api/KogitoRuntimeCommonClientTest.java
@@ -0,0 +1,192 @@
+/*
+ * Copyright 2023 Red Hat, Inc. and/or its affiliates.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.kie.kogito.index.api;
+
+import java.nio.Buffer;
+import java.util.*;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.kie.kogito.index.TestUtils;
+import org.kie.kogito.index.model.Job;
+import org.kie.kogito.index.model.ProcessInstance;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import io.quarkus.security.credential.TokenCredential;
+import io.quarkus.security.identity.SecurityIdentity;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
+
+import static java.lang.String.format;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+public abstract class KogitoRuntimeCommonClientTest {
+
+ protected static int ACTIVE = 1;
+ protected static String SERVICE_URL = "http://runtimeURL.com";
+ protected static String PROCESS_INSTANCE_ID = "pId";
+ protected static String JOB_ID = "jobId";
+ protected static String AUTHORIZED_TOKEN = "authToken";
+
+ @Mock
+ protected Vertx vertx;
+
+ @Mock
+ protected SecurityIdentity identityMock;
+
+ protected TokenCredential tokenCredential;
+
+ @Mock
+ protected WebClient webClientMock;
+
+ @Mock
+ protected HttpRequest httpRequestMock;
+
+ protected abstract KogitoRuntimeCommonClient getClient();
+
+ @Test
+ void testCancelJob() {
+ setupIdentityMock();
+ when(webClientMock.delete(anyString())).thenReturn(httpRequestMock);
+
+ Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED");
+ getClient().cancelJob(SERVICE_URL, job);
+
+ verify(getClient()).sendDeleteClientRequest(webClientMock,
+ format(KogitoRuntimeCommonClient.CANCEL_JOB_PATH, job.getId()),
+ "CANCEL Job with id: " + JOB_ID);
+ ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class);
+ verify(httpRequestMock).send(handlerCaptor.capture());
+ checkResponseHandling(handlerCaptor.getValue());
+ }
+
+ @Test
+ void testRescheduleJob() {
+ String newJobData = "{\"expirationTime\": \"2023-08-27T04:35:54.631Z\",\"retries\": 2}";
+ setupIdentityMock();
+ when(webClientMock.patch(anyString())).thenReturn(httpRequestMock);
+
+ Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED");
+
+ getClient().rescheduleJob(SERVICE_URL, job, newJobData);
+ ArgumentCaptor jsonCaptor = ArgumentCaptor.forClass(JsonObject.class);
+
+ verify(getClient()).sendPatchClientRequest(eq(webClientMock),
+ eq(format(KogitoRuntimeCommonClient.RESCHEDULE_JOB_PATH, JOB_ID)),
+ eq("RESCHEDULED JOB with id: " + job.getId()),
+ jsonCaptor.capture());
+
+ assertThat(jsonCaptor.getValue().getString("expirationTime")).isEqualTo("2023-08-27T04:35:54.631Z");
+ assertThat(jsonCaptor.getValue().getString("retries")).isEqualTo("2");
+
+ ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class);
+ JsonObject jsonOject = new JsonObject(newJobData);
+ verify(httpRequestMock).sendJson(eq(jsonOject), handlerCaptor.capture());
+ verify(httpRequestMock).putHeader("Authorization", "Bearer " + AUTHORIZED_TOKEN);
+ checkResponseHandling(handlerCaptor.getValue());
+ }
+
+ @Test
+ void testWebClientToURLOptions() {
+ String defaultHost = "localhost";
+ int defaultPort = 8180;
+ WebClientOptions webClientOptions = getClient().getWebClientToURLOptions("http://" + defaultHost + ":" + defaultPort);
+ assertThat(webClientOptions.getDefaultHost()).isEqualTo(defaultHost);
+ assertThat(webClientOptions.getDefaultPort()).isEqualTo(defaultPort);
+ }
+
+ @Test
+ void testWebClientToURLOptionsWithoutPort() {
+ String dataIndexUrl = "http://service.com";
+ WebClientOptions webClientOptions = getClient().getWebClientToURLOptions(dataIndexUrl);
+ assertThat(webClientOptions.getDefaultPort()).isEqualTo(80);
+ assertThat(webClientOptions.getDefaultHost()).isEqualTo("service.com");
+ assertFalse(webClientOptions.isSsl());
+ }
+
+ @Test
+ void testWebClientToURLOptionsWithoutPortSSL() {
+ String dataIndexurl = "https://service.com";
+ WebClientOptions webClientOptions = getClient().getWebClientToURLOptions(dataIndexurl);
+ assertThat(webClientOptions.getDefaultPort()).isEqualTo(443);
+ assertThat(webClientOptions.getDefaultHost()).isEqualTo("service.com");
+ assertTrue(webClientOptions.isSsl());
+ }
+
+ @Test
+ void testMalformedURL() {
+ assertThat(getClient().getWebClientToURLOptions("malformedURL")).isNull();
+ }
+
+ @Test
+ void testOverrideURL() {
+ String host = "host.testcontainers.internal";
+ getClient().setGatewayTargetUrl(Optional.of(host));
+ WebClientOptions webClientOptions = getClient().getWebClientToURLOptions("http://service.com");
+ assertThat(webClientOptions.getDefaultHost()).isEqualTo(host);
+ }
+
+ protected AsyncResult createResponseMocks(HttpResponse response, boolean succeed, int statusCode) {
+ AsyncResult asyncResultMock = mock(AsyncResult.class);
+ when(asyncResultMock.succeeded()).thenReturn(succeed);
+ when(asyncResultMock.result()).thenReturn(response);
+ when(response.statusCode()).thenReturn(statusCode);
+ return asyncResultMock;
+ }
+
+ private Job createJob(String jobId, String processInstanceId, String status) {
+ return TestUtils.getJob(jobId, "travels", processInstanceId, null, null, status);
+ }
+
+ protected void checkResponseHandling(Handler>> handler) {
+ HttpResponse response = mock(HttpResponse.class);
+ HttpResponse responseWithoutError = mock(HttpResponse.class);
+
+ handler.handle(createResponseMocks(response, false, 404));
+ verify(response).statusMessage();
+ verify(response).body();
+ verify(response, never()).bodyAsString();
+
+ handler.handle(createResponseMocks(responseWithoutError, true, 200));
+ verify(responseWithoutError, never()).statusMessage();
+ verify(responseWithoutError, never()).body();
+ verify(responseWithoutError).bodyAsString();
+ }
+
+ protected void setupIdentityMock() {
+ tokenCredential = mock(TokenCredential.class);
+ when(identityMock.getCredential(TokenCredential.class)).thenReturn(tokenCredential);
+ when(tokenCredential.getToken()).thenReturn(AUTHORIZED_TOKEN);
+ when(httpRequestMock.putHeader("Authorization", "Bearer " + AUTHORIZED_TOKEN)).thenReturn(httpRequestMock);
+ }
+
+ protected ProcessInstance createProcessInstance(String processInstanceId, int status) {
+ return TestUtils.getProcessInstance("travels", processInstanceId, status, null, null);
+ }
+
+}
diff --git a/data-index/data-index-service/data-index-service-common/pom.xml b/data-index/data-index-service/data-index-service-common/pom.xml
index 144a0fa16c..4a1eff5009 100644
--- a/data-index/data-index-service/data-index-service-common/pom.xml
+++ b/data-index/data-index-service/data-index-service-common/pom.xml
@@ -33,10 +33,6 @@
org.kie.kogito
persistence-commons-protobuf
-
- io.vertx
- vertx-web-client
-
io.smallrye.reactive
smallrye-reactive-messaging-api
diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java
index 3c74b1e286..993523eedc 100644
--- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java
+++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/api/KogitoRuntimeClientImpl.java
@@ -15,21 +15,15 @@
*/
package org.kie.kogito.index.service.api;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.enterprise.context.ApplicationScoped;
-import javax.inject.Inject;
import javax.ws.rs.core.MediaType;
-import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.index.api.KogitoRuntimeClient;
-import org.kie.kogito.index.model.Job;
+import org.kie.kogito.index.api.KogitoRuntimeCommonClient;
import org.kie.kogito.index.model.Node;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;
@@ -37,21 +31,17 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.quarkus.security.credential.TokenCredential;
-import io.quarkus.security.identity.SecurityIdentity;
import io.vertx.core.AsyncResult;
-import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
-import io.vertx.ext.web.client.WebClientOptions;
import static java.lang.String.format;
@ApplicationScoped
-public class KogitoRuntimeClientImpl implements KogitoRuntimeClient {
+class KogitoRuntimeClientImpl extends KogitoRuntimeCommonClient implements KogitoRuntimeClient {
public static final String ABORT_PROCESS_INSTANCE_PATH = "/management/processes/%s/instances/%s";
public static final String RETRY_PROCESS_INSTANCE_PATH = "/management/processes/%s/instances/%s/retrigger";
@@ -64,9 +54,6 @@ public class KogitoRuntimeClientImpl implements KogitoRuntimeClient {
public static final String RETRIGGER_NODE_INSTANCE_PATH = "/management/processes/%s/instances/%s/nodeInstances/%s"; // nodeInstance Id
public static final String CANCEL_NODE_INSTANCE_PATH = "/management/processes/%s/instances/%s/nodeInstances/%s"; // nodeInstance Id
- public static final String CANCEL_JOB_PATH = "/%s";
- public static final String RESCHEDULE_JOB_PATH = "/%s";
-
public static final String GET_TASK_SCHEMA_PATH = "/%s/%s/%s/%s/schema";
public static final String UPDATE_USER_TASK_INSTANCE_PATH = "/management/processes/%s/instances/%s/tasks/%s";
@@ -79,38 +66,9 @@ public class KogitoRuntimeClientImpl implements KogitoRuntimeClient {
public static final String DELETE_USER_TASK_INSTANCE_ATTACHMENT_PATH = "/%s/%s/%s/%s/attachments/%s";
private static final Logger LOGGER = LoggerFactory.getLogger(KogitoRuntimeClientImpl.class);
- private Vertx vertx;
- private SecurityIdentity identity;
- protected Map serviceWebClientMap = new HashMap<>();
-
- @ConfigProperty(name = "kogito.dataindex.gateway.url")
- protected Optional gatewayTargetUrl;
- @Inject
- public KogitoRuntimeClientImpl(Vertx vertx, SecurityIdentity identity) {
- this.vertx = vertx;
- this.identity = identity;
- }
-
- protected WebClient getWebClient(String runtimeServiceUrl) {
- if (runtimeServiceUrl == null) {
- throw new DataIndexServiceException("Runtime service URL not defined, please review the kogito.service.url system property to point the public URL for this runtime.");
- } else {
- return serviceWebClientMap.computeIfAbsent(runtimeServiceUrl, url -> WebClient.create(vertx, getWebClientToURLOptions(runtimeServiceUrl)));
- }
- }
-
- protected WebClientOptions getWebClientToURLOptions(String targetHttpURL) {
- try {
- URL dataIndexURL = new URL(targetHttpURL);
- return new WebClientOptions()
- .setDefaultHost(gatewayTargetUrl.orElse(dataIndexURL.getHost()))
- .setDefaultPort((dataIndexURL.getPort() != -1 ? dataIndexURL.getPort() : dataIndexURL.getDefaultPort()))
- .setSsl(dataIndexURL.getProtocol().compareToIgnoreCase("https") == 0);
- } catch (MalformedURLException ex) {
- LOGGER.error("Invalid runtime service URL: " + targetHttpURL, ex);
- return null;
- }
+ public KogitoRuntimeClientImpl() {
+ super();
}
@Override
@@ -160,37 +118,21 @@ public CompletableFuture> getProcessInstanceNodeDefinitions(String se
public CompletableFuture triggerNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeDefinitionId) {
String requestURI = format(TRIGGER_NODE_INSTANCE_PATH, processInstance.getProcessId(), processInstance.getId(), nodeDefinitionId);
return sendPostClientRequest(getWebClient(serviceURL), requestURI,
- "Trigger Node " + nodeDefinitionId +
- "from ProcessInstance with id: " + processInstance.getId());
+ "Trigger Node " + nodeDefinitionId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId());
}
@Override
public CompletableFuture retriggerNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeInstanceId) {
String requestURI = format(RETRIGGER_NODE_INSTANCE_PATH, processInstance.getProcessId(), processInstance.getId(), nodeInstanceId);
return sendPostClientRequest(getWebClient(serviceURL), requestURI,
- "Retrigger NodeInstance " + nodeInstanceId +
- "from ProcessInstance with id: " + processInstance.getId());
+ "Retrigger NodeInstance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId());
}
@Override
public CompletableFuture cancelNodeInstance(String serviceURL, ProcessInstance processInstance, String nodeInstanceId) {
String requestURI = format(CANCEL_NODE_INSTANCE_PATH, processInstance.getProcessId(), processInstance.getId(), nodeInstanceId);
return sendDeleteClientRequest(getWebClient(serviceURL), requestURI,
- "Cancel NodeInstance " + nodeInstanceId +
- "from ProcessInstance with id: " + processInstance.getId());
- }
-
- @Override
- public CompletableFuture cancelJob(String serviceURL, Job job) {
- String requestURI = format(CANCEL_JOB_PATH, job.getId());
- return sendDeleteClientRequest(getWebClient(serviceURL), requestURI, "CANCEL Job with id: " + job.getId());
- }
-
- @Override
- public CompletableFuture rescheduleJob(String serviceURL, Job job, String newJobData) {
- String requestURI = format(RESCHEDULE_JOB_PATH, job.getId());
- return sendJSONPutClientRequest(getWebClient(serviceURL), requestURI,
- "RESCHEDULED JOB with id: " + job.getId(), newJobData);
+ "Cancel NodeInstance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId());
}
@Override
@@ -277,40 +219,28 @@ private String getUserGroupsURIParameter(String user, List groups) {
return builder.toString();
}
- protected CompletableFuture sendDeleteClientRequest(WebClient webClient, String requestURI, String logMessage) {
- CompletableFuture future = new CompletableFuture<>();
- webClient.delete(requestURI)
- .putHeader("Authorization", getAuthHeader())
- .send(res -> asyncHttpResponseTreatment(res, future, logMessage));
- return future;
- }
-
protected CompletableFuture sendPostWithBodyClientRequest(WebClient webClient, String requestURI, String logMessage, String body, String contentType) {
CompletableFuture future = new CompletableFuture<>();
+
HttpRequest request = webClient.post(requestURI)
.putHeader("Authorization", getAuthHeader())
.putHeader("Content-Type", contentType);
if (MediaType.APPLICATION_JSON.equals(contentType)) {
+ LOGGER.debug("Sending Json Body: {} POST to URI {}", body, requestURI);
request.sendJson(new JsonObject(body), res -> asyncHttpResponseTreatment(res, future, logMessage));
} else {
+ LOGGER.debug("Sending Buffer(Body): {} POST to URI {}", body, requestURI);
request.sendBuffer(Buffer.buffer(body), res -> asyncHttpResponseTreatment(res, future, logMessage));
}
return future;
}
- private void asyncHttpResponseTreatment(AsyncResult> res, CompletableFuture future, String logMessage) {
- if (res.succeeded() && ((res.result().statusCode() == 200 || res.result().statusCode() == 201))) {
- future.complete(res.result().bodyAsString() != null ? res.result().bodyAsString() : "Successfully performed: " + logMessage);
- } else {
- future.completeExceptionally(new DataIndexServiceException(getErrorMessage(logMessage, res.result())));
- }
- }
-
protected CompletableFuture sendPostClientRequest(WebClient webClient, String requestURI, String logMessage) {
CompletableFuture future = new CompletableFuture<>();
webClient.post(requestURI)
.putHeader("Authorization", getAuthHeader())
.send(res -> asyncHttpResponseTreatment(res, future, logMessage));
+ LOGGER.debug("Sending post to URI {}", requestURI);
return future;
}
@@ -324,27 +254,22 @@ protected CompletableFuture sendPutClientRequest(WebClient webClient, String req
.putHeader("Authorization", getAuthHeader())
.putHeader("Content-Type", contentType);
if (MediaType.APPLICATION_JSON.equals(contentType)) {
+ LOGGER.debug("Sending Json Body: {} PUT to URI {}", body, requestURI);
request.sendJson(new JsonObject(body), res -> asyncHttpResponseTreatment(res, future, logMessage));
} else {
+ LOGGER.debug("Sending Buffer(Body): {} PUT to URI {}", body, requestURI);
request.sendBuffer(Buffer.buffer(body), res -> asyncHttpResponseTreatment(res, future, logMessage));
}
return future;
}
- protected CompletableFuture sendPatchClientRequest(WebClient webClient, String requestURI, String logMessage, JsonObject jsonBody) {
- CompletableFuture future = new CompletableFuture<>();
- webClient.patch(requestURI)
- .putHeader("Authorization", getAuthHeader())
- .sendJson(jsonBody, res -> asyncHttpResponseTreatment(res, future, logMessage));
- return future;
- }
-
protected CompletableFuture sendGetClientRequest(WebClient webClient, String requestURI, String logMessage, Class type) {
CompletableFuture future = new CompletableFuture<>();
webClient.get(requestURI)
.putHeader("Authorization", getAuthHeader())
.send(res -> send(logMessage, type, future, res));
+ LOGGER.debug("Sending GET to URI {}", requestURI);
return future;
}
@@ -362,20 +287,4 @@ protected void send(String logMessage, Class type, CompletableFuture future, Asy
}
}
- private String getErrorMessage(String logMessage, HttpResponse result) {
- String errorMessage = "FAILED: " + logMessage;
- if (result != null) {
- errorMessage += " errorCode:" + result.statusCode() +
- " errorStatus:" + result.statusMessage() +
- " errorMessage:" + (result.body() != null ? result.body().toString() : "-");
- }
- return errorMessage;
- }
-
- protected String getAuthHeader() {
- if (identity != null && identity.getCredential(TokenCredential.class) != null) {
- return "Bearer " + identity.getCredential(TokenCredential.class).getToken();
- }
- return "";
- }
}
diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java
index 0ce95881fa..66046b639f 100644
--- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java
+++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/KogitoIndexEventConverter.java
@@ -68,13 +68,11 @@ public Message> convert(Message> message, Type type) {
if (processInstanceDataEvent.getData() == null) {
processInstanceDataEvent.setData(objectMapper.readValue(message.getPayload().toString(), ProcessInstanceEventBody.class));
}
- LOGGER.error("---------------------------- Source: processInstanceEvent:" + processInstanceDataEvent.getSource());
return message.withPayload(processInstanceDataEvent);
} else if (type.getTypeName().equals(KogitoJobCloudEvent.class.getTypeName())) {
KogitoJobCloudEvent event = objectMapper.readValue(message.getPayload().toString(), KogitoJobCloudEvent.class);
if (event.getData() == null) {
Job job = objectMapper.readValue(message.getPayload().toString(), Job.class);
- LOGGER.error("---------------------------- Source: jobEvent:" + event.getSource());
job.setEndpoint(event.getSource() != null ? event.getSource().toString() : "none");
event.setData(job);
}
diff --git a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/api/KogitoRuntimeClientTest.java b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/api/KogitoRuntimeClientTest.java
index 6907d6ac85..c4b9161672 100644
--- a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/api/KogitoRuntimeClientTest.java
+++ b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/api/KogitoRuntimeClientTest.java
@@ -15,7 +15,6 @@
*/
package org.kie.kogito.index.service.api;
-import java.nio.Buffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -27,91 +26,54 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.kie.kogito.index.TestUtils;
-import org.kie.kogito.index.model.Job;
+import org.kie.kogito.index.api.KogitoRuntimeCommonClient;
+import org.kie.kogito.index.api.KogitoRuntimeCommonClientTest;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;
import org.kie.kogito.index.service.DataIndexServiceException;
import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
-import io.quarkus.security.credential.TokenCredential;
-import io.quarkus.security.identity.SecurityIdentity;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
-import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
-import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
-import io.vertx.ext.web.client.WebClient;
-import io.vertx.ext.web.client.WebClientOptions;
import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.ABORT_PROCESS_INSTANCE_PATH;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.CANCEL_JOB_PATH;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.CANCEL_NODE_INSTANCE_PATH;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.GET_PROCESS_INSTANCE_DIAGRAM_PATH;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.GET_PROCESS_INSTANCE_NODE_DEFINITIONS_PATH;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.GET_PROCESS_INSTANCE_SOURCE_PATH;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.RESCHEDULE_JOB_PATH;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.RETRIGGER_NODE_INSTANCE_PATH;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.RETRY_PROCESS_INSTANCE_PATH;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.SKIP_PROCESS_INSTANCE_PATH;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.TRIGGER_NODE_INSTANCE_PATH;
-import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.UPDATE_VARIABLES_PROCESS_INSTANCE_PATH;
+import static org.kie.kogito.index.service.api.KogitoRuntimeClientImpl.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
-public class KogitoRuntimeClientTest {
+class KogitoRuntimeClientTest extends KogitoRuntimeCommonClientTest {
- private static int ACTIVE = 1;
private static int ERROR = 5;
- private static String SERVICE_URL = "http://runtimeURL.com";
- private static String PROCESS_INSTANCE_ID = "pId";
private static String TASK_ID = "taskId";
- private static String JOB_ID = "jobId";
-
- private static String AUTHORIZED_TOKEN = "authToken";
-
- @Mock
- public Vertx vertx;
-
- @Mock
- private SecurityIdentity identityMock;
-
- private TokenCredential tokenCredential;
private KogitoRuntimeClientImpl client;
- @Mock
- private WebClient webClientMock;
-
- @Mock
- private HttpRequest httpRequestMock;
-
@BeforeEach
public void setup() {
- client = spy(new KogitoRuntimeClientImpl(vertx, identityMock));
- client.gatewayTargetUrl = Optional.empty();
- client.serviceWebClientMap.put(SERVICE_URL, webClientMock);
+ client = spy(new KogitoRuntimeClientImpl());
+ client.setGatewayTargetUrl(Optional.empty());
+ client.addServiceWebClient(SERVICE_URL, webClientMock);
+ client.setVertx(vertx);
+ client.setIdentity(identityMock);
+ }
+
+ @Override
+ protected KogitoRuntimeCommonClient getClient() {
+ return client;
}
@Test
- public void testAbortProcessInstance() {
+ void testAbortProcessInstance() {
setupIdentityMock();
when(webClientMock.delete(anyString())).thenReturn(httpRequestMock);
@@ -128,7 +90,7 @@ public void testAbortProcessInstance() {
}
@Test
- public void testRetryProcessInstance() {
+ void testRetryProcessInstance() {
setupIdentityMock();
when(webClientMock.post(anyString())).thenReturn(httpRequestMock);
ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ERROR);
@@ -144,7 +106,7 @@ public void testRetryProcessInstance() {
}
@Test
- public void testSkipProcessInstance() {
+ void testSkipProcessInstance() {
setupIdentityMock();
when(webClientMock.post(anyString())).thenReturn(httpRequestMock);
@@ -161,10 +123,10 @@ public void testSkipProcessInstance() {
}
@Test
- public void testUpdateProcessInstanceVariables() {
- setupIdentityMock();
+ void testUpdateProcessInstanceVariables() {
when(webClientMock.put(anyString())).thenReturn(httpRequestMock);
when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock);
+ when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock);
ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ERROR);
@@ -179,7 +141,7 @@ public void testUpdateProcessInstanceVariables() {
}
@Test
- public void testTriggerNodeInstance() {
+ void testTriggerNodeInstance() {
String nodeDefId = "nodeDefId";
setupIdentityMock();
when(webClientMock.post(anyString())).thenReturn(httpRequestMock);
@@ -189,14 +151,14 @@ public void testTriggerNodeInstance() {
client.triggerNodeInstance(SERVICE_URL, pI, nodeDefId);
verify(client).sendPostClientRequest(webClientMock,
format(TRIGGER_NODE_INSTANCE_PATH, pI.getProcessId(), pI.getId(), nodeDefId),
- "Trigger Node " + nodeDefId + "from ProcessInstance with id: " + pI.getId());
+ "Trigger Node " + nodeDefId + FROM_PROCESS_INSTANCE_WITH_ID + pI.getId());
ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class);
verify(httpRequestMock).send(handlerCaptor.capture());
checkResponseHandling(handlerCaptor.getValue());
}
@Test
- public void testRetriggerNodeInstance() {
+ void testRetriggerNodeInstance() {
String nodeInstanceId = "nodeInstanceId";
setupIdentityMock();
when(webClientMock.post(anyString())).thenReturn(httpRequestMock);
@@ -206,15 +168,14 @@ public void testRetriggerNodeInstance() {
client.retriggerNodeInstance(SERVICE_URL, pI, nodeInstanceId);
verify(client).sendPostClientRequest(webClientMock,
format(RETRIGGER_NODE_INSTANCE_PATH, pI.getProcessId(), pI.getId(), nodeInstanceId),
- "Retrigger NodeInstance " + nodeInstanceId +
- "from ProcessInstance with id: " + pI.getId());
+ "Retrigger NodeInstance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + pI.getId());
ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class);
verify(httpRequestMock).send(handlerCaptor.capture());
checkResponseHandling(handlerCaptor.getValue());
}
@Test
- public void testCancelNodeInstance() {
+ void testCancelNodeInstance() {
String nodeInstanceId = "nodeInstanceId";
setupIdentityMock();
when(webClientMock.delete(anyString())).thenReturn(httpRequestMock);
@@ -224,50 +185,14 @@ public void testCancelNodeInstance() {
client.cancelNodeInstance(SERVICE_URL, pI, nodeInstanceId);
verify(client).sendDeleteClientRequest(webClientMock,
format(CANCEL_NODE_INSTANCE_PATH, pI.getProcessId(), pI.getId(), nodeInstanceId),
- "Cancel NodeInstance " + nodeInstanceId +
- "from ProcessInstance with id: " + pI.getId());
+ "Cancel NodeInstance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + pI.getId());
ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class);
verify(httpRequestMock).send(handlerCaptor.capture());
checkResponseHandling(handlerCaptor.getValue());
}
@Test
- public void testCancelJob() {
- setupIdentityMock();
- when(webClientMock.delete(anyString())).thenReturn(httpRequestMock);
-
- Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED");
- client.cancelJob(SERVICE_URL, job);
-
- verify(client).sendDeleteClientRequest(webClientMock,
- format(CANCEL_JOB_PATH, job.getId()),
- "CANCEL Job with id: " + JOB_ID);
- ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class);
- verify(httpRequestMock).send(handlerCaptor.capture());
- checkResponseHandling(handlerCaptor.getValue());
- }
-
- @Test
- public void testRescheduleJob() {
- String newJobData = "{ }";
- setupIdentityMock();
- when(webClientMock.put(anyString())).thenReturn(httpRequestMock);
- when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock);
-
- Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED");
-
- client.rescheduleJob(SERVICE_URL, job, newJobData);
- verify(client).sendJSONPutClientRequest(webClientMock,
- format(RESCHEDULE_JOB_PATH, JOB_ID),
- "RESCHEDULED JOB with id: " + job.getId(), newJobData);
- ArgumentCaptor handlerCaptor = ArgumentCaptor.forClass(Handler.class);
- JsonObject jsonOject = new JsonObject(newJobData);
- verify(httpRequestMock).sendJson(eq(jsonOject), handlerCaptor.capture());
- checkResponseHandling(handlerCaptor.getValue());
- }
-
- @Test
- public void testGetProcessInstanceDiagram() {
+ void testGetProcessInstanceDiagram() {
setupIdentityMock();
when(webClientMock.get(anyString())).thenReturn(httpRequestMock);
@@ -285,7 +210,7 @@ public void testGetProcessInstanceDiagram() {
}
@Test
- public void testGetProcessInstanceNodeDefinitions() {
+ void testGetProcessInstanceNodeDefinitions() {
setupIdentityMock();
when(webClientMock.get(anyString())).thenReturn(httpRequestMock);
@@ -314,7 +239,7 @@ public void testGetProcessInstanceNodeDefinitions() {
}
@Test
- public void testGetProcessInstanceSource() {
+ void testGetProcessInstanceSource() {
setupIdentityMock();
when(webClientMock.get(anyString())).thenReturn(httpRequestMock);
@@ -332,7 +257,7 @@ public void testGetProcessInstanceSource() {
}
@Test
- public void testSendOk() throws Exception {
+ void testSendOk() throws Exception {
AsyncResult result = mock(AsyncResult.class);
when(result.succeeded()).thenReturn(true);
HttpResponse response = mock(HttpResponse.class);
@@ -348,7 +273,7 @@ public void testSendOk() throws Exception {
}
@Test
- public void testSendNotFound() throws Exception {
+ void testSendNotFound() throws Exception {
AsyncResult result = mock(AsyncResult.class);
when(result.succeeded()).thenReturn(true);
HttpResponse response = mock(HttpResponse.class);
@@ -363,7 +288,7 @@ public void testSendNotFound() throws Exception {
}
@Test
- public void testSendException() throws Exception {
+ void testSendException() throws Exception {
AsyncResult result = mock(AsyncResult.class);
when(result.succeeded()).thenReturn(false);
when(result.cause()).thenReturn(new RuntimeException());
@@ -382,7 +307,7 @@ public void testSendException() throws Exception {
}
@Test
- public void testGetUserTaskSchema() {
+ void testGetUserTaskSchema() {
setupIdentityMock();
when(webClientMock.get(anyString())).thenReturn(httpRequestMock);
@@ -398,7 +323,7 @@ public void testGetUserTaskSchema() {
}
@Test
- public void testUpdateUserTaskInstance() {
+ void testUpdateUserTaskInstance() {
setupIdentityMock();
when(webClientMock.patch(anyString())).thenReturn(httpRequestMock);
@@ -421,10 +346,10 @@ public void testUpdateUserTaskInstance() {
}
@Test
- public void testCreateUserTaskInstanceComment() {
+ void testCreateUserTaskInstanceComment() {
String commentInfo = "newComment";
- setupIdentityMock();
when(webClientMock.post(anyString())).thenReturn(httpRequestMock);
+ when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock);
when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock);
UserTaskInstance taskInstance = createUserTaskInstance(PROCESS_INSTANCE_ID, TASK_ID, "InProgress");
@@ -439,11 +364,11 @@ public void testCreateUserTaskInstanceComment() {
}
@Test
- public void testCreateUserTaskInstanceAttachment() {
+ void testCreateUserTaskInstanceAttachment() {
String attachmentUri = "nhttps://drive.google.com/file/d/AttachmentUri";
String attachmentName = "newAttachmentName";
- setupIdentityMock();
when(webClientMock.post(anyString())).thenReturn(httpRequestMock);
+ when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock);
when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock);
UserTaskInstance taskInstance = createUserTaskInstance(PROCESS_INSTANCE_ID, TASK_ID, "InProgress");
@@ -461,11 +386,11 @@ public void testCreateUserTaskInstanceAttachment() {
}
@Test
- public void testUpdateUserTaskInstanceComment() {
+ void testUpdateUserTaskInstanceComment() {
String commentInfo = "NewCommentContent";
String commentId = "commentId";
- setupIdentityMock();
when(webClientMock.put(anyString())).thenReturn(httpRequestMock);
+ when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock);
when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock);
UserTaskInstance taskInstance = createUserTaskInstance(PROCESS_INSTANCE_ID, TASK_ID, "InProgress");
@@ -482,7 +407,7 @@ public void testUpdateUserTaskInstanceComment() {
}
@Test
- public void testDeleteTaskInstanceComment() {
+ void testDeleteTaskInstanceComment() {
String commentId = "commentId";
setupIdentityMock();
when(webClientMock.delete(anyString())).thenReturn(httpRequestMock);
@@ -499,12 +424,13 @@ public void testDeleteTaskInstanceComment() {
}
@Test
- public void testUpdateUserTaskInstanceAttachment() {
+ void testUpdateUserTaskInstanceAttachment() {
String attachmentName = "NewAttachmentName";
String attachmentContent = "NewAttachmentContent";
String attachmentId = "attachmentId";
- setupIdentityMock();
+
when(webClientMock.put(anyString())).thenReturn(httpRequestMock);
+ when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock);
when(httpRequestMock.putHeader(eq("Content-Type"), anyString())).thenReturn(httpRequestMock);
UserTaskInstance taskInstance = createUserTaskInstance(PROCESS_INSTANCE_ID, TASK_ID, "InProgress");
@@ -525,7 +451,7 @@ public void testUpdateUserTaskInstanceAttachment() {
}
@Test
- public void testDeleteTaskInstanceAttachment() {
+ void testDeleteTaskInstanceAttachment() {
String attachmentId = "attachmentId";
setupIdentityMock();
when(webClientMock.delete(anyString())).thenReturn(httpRequestMock);
@@ -541,101 +467,8 @@ public void testDeleteTaskInstanceAttachment() {
checkResponseHandling(handlerCaptor.getValue());
}
- @Test
- public void testWebClientToURLOptions() {
- String defaultHost = "localhost";
- int defaultPort = 8180;
- WebClientOptions webClientOptions = client.getWebClientToURLOptions("http://" + defaultHost + ":" + defaultPort);
- assertThat(webClientOptions.getDefaultHost()).isEqualTo(defaultHost);
- assertThat(webClientOptions.getDefaultPort()).isEqualTo(defaultPort);
- }
-
- @Test
- public void testWebClientToURLOptionsWithoutPort() {
- String dataIndexUrl = "http://service.com";
- WebClientOptions webClientOptions = client.getWebClientToURLOptions(dataIndexUrl);
- assertThat(webClientOptions.getDefaultPort()).isEqualTo(80);
- assertThat(webClientOptions.getDefaultHost()).isEqualTo("service.com");
- assertFalse(webClientOptions.isSsl());
- }
-
- @Test
- public void testWebClientToURLOptionsWithoutPortSSL() {
- String dataIndexurl = "https://service.com";
- WebClientOptions webClientOptions = client.getWebClientToURLOptions(dataIndexurl);
- assertThat(webClientOptions.getDefaultPort()).isEqualTo(443);
- assertThat(webClientOptions.getDefaultHost()).isEqualTo("service.com");
- assertTrue(webClientOptions.isSsl());
- }
-
- @Test
- public void testMalformedURL() {
- assertThat(client.getWebClientToURLOptions("malformedURL")).isNull();
- }
-
- @Test
- void testOverrideURL() {
- String host = "host.testcontainers.internal";
- client.gatewayTargetUrl = Optional.of(host);
- WebClientOptions webClientOptions = client.getWebClientToURLOptions("http://service.com");
- assertThat(webClientOptions.getDefaultHost()).isEqualTo(host);
- }
-
- @Test
- public void testGetAuthHeader() {
- tokenCredential = mock(TokenCredential.class);
- when(identityMock.getCredential(TokenCredential.class)).thenReturn(tokenCredential);
- when(tokenCredential.getToken()).thenReturn(AUTHORIZED_TOKEN);
-
- String token = client.getAuthHeader();
- verify(identityMock, times(2)).getCredential(TokenCredential.class);
- assertThat(token).isEqualTo("Bearer " + AUTHORIZED_TOKEN);
-
- when(identityMock.getCredential(TokenCredential.class)).thenReturn(null);
- token = client.getAuthHeader();
- assertThat(token).isEqualTo("");
- }
-
- private AsyncResult createResponseMocks(HttpResponse response, boolean succeed, int statusCode) {
- AsyncResult asyncResultMock = mock(AsyncResult.class);
- when(asyncResultMock.succeeded()).thenReturn(succeed);
- when(asyncResultMock.result()).thenReturn(response);
- when(response.statusCode()).thenReturn(statusCode);
- return asyncResultMock;
- }
-
- private ProcessInstance createProcessInstance(String processInstanceId, int status) {
- return TestUtils.getProcessInstance("travels", processInstanceId, status, null, null);
- }
-
private UserTaskInstance createUserTaskInstance(String processInstanceId, String userTaskId, String state) {
return TestUtils.getUserTaskInstance(userTaskId, "travels", processInstanceId, null, null, state, "jdoe");
}
- private Job createJob(String jobId, String processInstanceId, String status) {
- return TestUtils.getJob(jobId, "travels", processInstanceId, null, null, status);
- }
-
- protected void checkResponseHandling(Handler>> handler) {
- HttpResponse response = mock(HttpResponse.class);
- HttpResponse responseWithoutError = mock(HttpResponse.class);
-
- handler.handle(createResponseMocks(response, false, 404));
- verify(response).statusMessage();
- verify(response).body();
- verify(response, never()).bodyAsString();
-
- handler.handle(createResponseMocks(responseWithoutError, true, 200));
- verify(responseWithoutError, never()).statusMessage();
- verify(responseWithoutError, never()).body();
- verify(responseWithoutError).bodyAsString();
- }
-
- protected void setupIdentityMock() {
- tokenCredential = mock(TokenCredential.class);
- when(identityMock.getCredential(TokenCredential.class)).thenReturn(tokenCredential);
- when(tokenCredential.getToken()).thenReturn(AUTHORIZED_TOKEN);
- when(httpRequestMock.putHeader(eq("Authorization"), eq("Bearer " + AUTHORIZED_TOKEN))).thenReturn(httpRequestMock);
- }
-
}
diff --git a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java
index 431cbae78e..3951d096dd 100644
--- a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java
+++ b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/main/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImpl.java
@@ -33,7 +33,7 @@
import org.kie.kogito.Application;
import org.kie.kogito.addon.source.files.SourceFilesProvider;
import org.kie.kogito.index.api.KogitoRuntimeClient;
-import org.kie.kogito.index.model.Job;
+import org.kie.kogito.index.api.KogitoRuntimeCommonClient;
import org.kie.kogito.index.model.Node;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.index.model.UserTaskInstance;
@@ -49,7 +49,7 @@
import static org.jbpm.ruleflow.core.Metadata.UNIQUE_ID;
@ApplicationScoped
-public class KogitoAddonRuntimeClientImpl implements KogitoRuntimeClient {
+public class KogitoAddonRuntimeClientImpl extends KogitoRuntimeCommonClient implements KogitoRuntimeClient {
private static String SUCCESSFULLY_OPERATION_MESSAGE = "Successfully performed: %s";
@@ -175,7 +175,7 @@ public CompletableFuture triggerNodeInstance(String serviceURL, ProcessI
throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage());
} else {
return String.format(SUCCESSFULLY_OPERATION_MESSAGE,
- "TRIGGER Node " + nodeDefinitionId + "from ProcessInstance with id: " + processInstance.getId());
+ "TRIGGER Node " + nodeDefinitionId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId());
}
}));
}
@@ -189,7 +189,7 @@ public CompletableFuture retriggerNodeInstance(String serviceURL, Proces
throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage());
} else {
return String.format(SUCCESSFULLY_OPERATION_MESSAGE,
- "RETRIGGER Node instance " + nodeInstanceId + "from ProcessInstance with id: " + processInstance.getId());
+ "RETRIGGER Node instance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId());
}
}));
}
@@ -203,42 +203,11 @@ public CompletableFuture cancelNodeInstance(String serviceURL, ProcessIn
throw new ProcessInstanceExecutionException(pInstance.id(), pInstance.error().get().failedNodeId(), pInstance.error().get().errorMessage());
} else {
return String.format(SUCCESSFULLY_OPERATION_MESSAGE,
- "CANCEL Node instance " + nodeInstanceId + "from ProcessInstance with id: " + processInstance.getId());
+ "CANCEL Node instance " + nodeInstanceId + FROM_PROCESS_INSTANCE_WITH_ID + processInstance.getId());
}
}));
}
- public CompletableFuture cancelJob(String serviceURL, Job job) {
- return throwUnsupportedException();
- /*
- * if (jobResource == null) {
- * return CompletableFuture.completedFuture(null);
- * } else {
- * return CompletableFuture.supplyAsync(() -> jobResource.delete(job.getId()).toString(),
- * managedExecutor);
- * }
- */
- }
-
- public CompletableFuture rescheduleJob(String serviceURL, Job job, String newJobData) throws UncheckedIOException {
- return throwUnsupportedException();
- /*
- * if (jobResource == null) {
- * return CompletableFuture.completedFuture(null);
- * } else {
- * return CompletableFuture.supplyAsync(() -> {
- * try {
- * return jobResource.patch(job.getId(), getObjectMapper().readValue(newJobData, org.kie.kogito.jobs.api.Job.class)).toString();
- * } catch (JsonMappingException exception) {
- * throw new UncheckedIOException(exception);
- * } catch (JsonProcessingException e) {
- * throw new RuntimeException(e);
- * }
- * }, managedExecutor);
- * }
- */
- }
-
@Override
public CompletableFuture getUserTaskSchema(String serviceURL, UserTaskInstance userTaskInstance, String user, List groups) {
return throwUnsupportedException();
@@ -280,24 +249,6 @@ public CompletableFuture deleteUserTaskInstanceAttachment(String service
return throwUnsupportedException();
}
- private CompletableFuture executeOnProcessInstanceOld(String processId, String processInstanceId, Function, String> supplier) {
-
- Process> process = processes != null ? processes.processById(processId) : null;
-
- if (process == null) {
- return CompletableFuture.completedFuture(null);
- }
- return CompletableFuture.supplyAsync(() -> UnitOfWorkExecutor.executeInUnitOfWork(application.unitOfWorkManager(), () -> {
- Optional extends org.kie.kogito.process.ProcessInstance>> processInstanceFound = process.instances().findById(processInstanceId);
- if (processInstanceFound.isPresent()) {
- org.kie.kogito.process.ProcessInstance> processInstance = processInstanceFound.get();
- return supplier.apply(processInstance);
- } else {
- return String.format("Process instance with id %s not allow the operation requested", processInstanceId);
- }
- }), managedExecutor);
- }
-
private String executeOnProcessInstance(String processId, String processInstanceId, Function, String> supplier) {
Process> process = processes != null ? processes.processById(processId) : null;
diff --git a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/test/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImplTest.java b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/test/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImplTest.java
index 16af3c00d3..32ce43e42b 100644
--- a/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/test/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImplTest.java
+++ b/data-index/kogito-addons-quarkus-data-index/kogito-addons-quarkus-data-index-common/runtime/src/test/java/org/kie/kogito/index/addon/api/KogitoAddonRuntimeClientImplTest.java
@@ -24,8 +24,8 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.kie.kogito.Application;
import org.kie.kogito.addon.source.files.SourceFilesProvider;
-import org.kie.kogito.index.TestUtils;
-import org.kie.kogito.index.model.Job;
+import org.kie.kogito.index.api.KogitoRuntimeCommonClient;
+import org.kie.kogito.index.api.KogitoRuntimeCommonClientTest;
import org.kie.kogito.index.model.ProcessInstance;
import org.kie.kogito.process.ProcessError;
import org.kie.kogito.process.ProcessInstanceExecutionException;
@@ -38,19 +38,15 @@
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
+import io.quarkus.security.credential.TokenCredential;
+
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
-public class KogitoAddonRuntimeClientImplTest {
+public class KogitoAddonRuntimeClientImplTest extends KogitoRuntimeCommonClientTest {
- private static int ACTIVE = 1;
- private static int ERROR = 5;
- private static String SERVICE_URL = "http://runtimeURL.com";
- private static String PROCESS_INSTANCE_ID = "pId";
private static final String NODE_ID = "nodeId";
- private static String TASK_ID = "taskId";
- private static String JOB_ID = "jobId";
public static final String NODE_ID_ERROR = "processInstanceIdError";
@@ -112,6 +108,15 @@ public void setup() {
lenient().when(applicationInstance.get()).thenReturn(application);
client = spy(new KogitoAddonRuntimeClientImpl(processSvgServiceInstance, sourceFilesProvider, processesInstance, applicationInstance));
+ client.setGatewayTargetUrl(Optional.empty());
+ client.addServiceWebClient(SERVICE_URL, webClientMock);
+ client.setVertx(vertx);
+ client.setIdentity(identityMock);
+ }
+
+ @Override
+ public KogitoRuntimeCommonClient getClient() {
+ return client;
}
private org.kie.kogito.process.ProcessInstance mockProcessInstanceStatusActive() {
@@ -136,7 +141,7 @@ private ProcessError mockProcessInstanceStatusActiveOnError() {
}
@Test
- public void testAbortProcessInstanceSuccess() {
+ void testAbortProcessInstanceSuccess() {
ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE);
mockProcessInstanceStatusActive().abort();
client.abortProcessInstance(SERVICE_URL, pI);
@@ -145,7 +150,7 @@ public void testAbortProcessInstanceSuccess() {
}
@Test
- public void testAbortProcessInstanceError() {
+ void testAbortProcessInstanceError() {
ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE);
mockProcessInstanceStatusError().abort();
assertThrows(ProcessInstanceExecutionException.class,
@@ -155,7 +160,7 @@ public void testAbortProcessInstanceError() {
}
@Test
- public void testRetryProcessInstance() {
+ void testRetryProcessInstance() {
mockProcessInstanceStatusActiveOnError().retrigger();
ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE);
client.retryProcessInstance(SERVICE_URL, pI);
@@ -165,7 +170,7 @@ public void testRetryProcessInstance() {
}
@Test
- public void testSkipProcessInstance() {
+ void testSkipProcessInstance() {
mockProcessInstanceStatusActiveOnError().skip();
ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE);
client.skipProcessInstance(SERVICE_URL, pI);
@@ -175,12 +180,7 @@ public void testSkipProcessInstance() {
}
@Test
- public void testUpdateProcessInstanceVariables() {
-
- }
-
- @Test
- public void testTriggerNodeInstance() {
+ void testTriggerNodeInstance() {
mockProcessInstanceStatusActive().triggerNode(NODE_ID);
ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE);
client.triggerNodeInstance(SERVICE_URL, pI, NODE_ID);
@@ -189,7 +189,7 @@ public void testTriggerNodeInstance() {
}
@Test
- public void testRetriggerNodeInstance() {
+ void testRetriggerNodeInstance() {
mockProcessInstanceStatusActive().retriggerNodeInstance(NODE_ID);
ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE);
client.retriggerNodeInstance(SERVICE_URL, pI, NODE_ID);
@@ -198,7 +198,7 @@ public void testRetriggerNodeInstance() {
}
@Test
- public void testCancelNodeInstance() {
+ void testCancelNodeInstance() {
mockProcessInstanceStatusActive().cancelNodeInstance(NODE_ID);
ProcessInstance pI = createProcessInstance(PROCESS_INSTANCE_ID, ACTIVE);
client.cancelNodeInstance(SERVICE_URL, pI, NODE_ID);
@@ -206,32 +206,11 @@ public void testCancelNodeInstance() {
verify(processInstance, times(1)).cancelNodeInstance(NODE_ID);
}
- @Test
- public void testCancelJob() {
-
- }
-
- @Test
- public void testRescheduleJob() {
-
- }
-
- @Test
- public void testGetProcessInstanceNodeDefinitions() {
-
- }
-
- @Test
- public void testGetProcessInstanceSource() {
-
- }
-
- private ProcessInstance createProcessInstance(String processInstanceId, int status) {
- return TestUtils.getProcessInstance("travels", processInstanceId, status, null, null);
- }
-
- private Job createJob(String jobId, String processInstanceId, String status) {
- return TestUtils.getJob(jobId, "travels", processInstanceId, null, null, status);
+ protected void setupIdentityMock() {
+ tokenCredential = mock(TokenCredential.class);
+ when(identityMock.getCredential(TokenCredential.class)).thenReturn(tokenCredential);
+ when(tokenCredential.getToken()).thenReturn(AUTHORIZED_TOKEN);
+ when(httpRequestMock.putHeader(eq("Authorization"), anyString())).thenReturn(httpRequestMock);
}
}