Skip to content

Commit

Permalink
KOGITO-8841: Review data-index gateway API to interact with job service
Browse files Browse the repository at this point in the history
  • Loading branch information
nmirasch committed Aug 9, 2023
1 parent 812b3c9 commit 8918d13
Show file tree
Hide file tree
Showing 9 changed files with 448 additions and 424 deletions.
4 changes: 4 additions & 0 deletions data-index/data-index-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-vertx</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-reactive-routes</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.index.api;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.service.DataIndexServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.quarkus.security.credential.TokenCredential;
import io.quarkus.security.identity.SecurityIdentity;
import io.vertx.core.AsyncResult;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;

import static java.lang.String.format;

@ApplicationScoped
public class KogitoRuntimeCommonClient {

public static final String CANCEL_JOB_PATH = "/jobs/%s";
public static final String RESCHEDULE_JOB_PATH = "/jobs/%s";

public static final String FROM_PROCESS_INSTANCE_WITH_ID = "from ProcessInstance with id: ";

private static final Logger LOGGER = LoggerFactory.getLogger(KogitoRuntimeCommonClient.class);

protected Vertx vertx;

protected SecurityIdentity identity;

protected Map<String, WebClient> serviceWebClientMap = new HashMap<>();

@ConfigProperty(name = "kogito.dataindex.gateway.url")
protected Optional<String> gatewayTargetUrl;

public KogitoRuntimeCommonClient() {
}

public void setGatewayTargetUrl(Optional<String> gatewayTargetUrl) {
this.gatewayTargetUrl = gatewayTargetUrl;
}

public void addServiceWebClient(String serviceUrl, WebClient webClient) {
serviceWebClientMap.put(serviceUrl, webClient);
}

protected WebClient getWebClient(String runtimeServiceUrl) {
if (runtimeServiceUrl == null) {
throw new DataIndexServiceException("Runtime service URL not defined, please review the kogito.service.url system property to point the public URL for this runtime.");
} else {
return serviceWebClientMap.computeIfAbsent(runtimeServiceUrl, url -> WebClient.create(vertx, getWebClientToURLOptions(runtimeServiceUrl)));
}
}

public WebClientOptions getWebClientToURLOptions(String targetHttpURL) {
try {
URL dataIndexURL = new URL(targetHttpURL);
return new WebClientOptions()
.setDefaultHost(gatewayTargetUrl.orElse(dataIndexURL.getHost()))
.setDefaultPort((dataIndexURL.getPort() != -1 ? dataIndexURL.getPort() : dataIndexURL.getDefaultPort()))
.setSsl(dataIndexURL.getProtocol().compareToIgnoreCase("https") == 0);
} catch (MalformedURLException ex) {
LOGGER.error(String.format("Invalid runtime service URL: %s", targetHttpURL), ex);
return null;
}
}

public CompletableFuture<String> cancelJob(String serviceURL, Job job) {
String requestURI = format(CANCEL_JOB_PATH, job.getId());
LOGGER.debug("Sending DELETE to URI {}", requestURI);
return sendDeleteClientRequest(getWebClient(serviceURL), requestURI, "CANCEL Job with id: " + job.getId());
}

public CompletableFuture<String> rescheduleJob(String serviceURL, Job job, String newJobData) {
String requestURI = format(RESCHEDULE_JOB_PATH, job.getId());
LOGGER.debug("Sending body: {} PATCH to URI {}", newJobData, requestURI);
return sendPatchClientRequest(getWebClient(serviceURL), requestURI,
"RESCHEDULED JOB with id: " + job.getId(), new JsonObject(newJobData));
}

public CompletableFuture sendDeleteClientRequest(WebClient webClient, String requestURI, String logMessage) {
CompletableFuture future = new CompletableFuture<>();
webClient.delete(requestURI)
.putHeader("Authorization", getAuthHeader())
.send(res -> asyncHttpResponseTreatment(res, future, logMessage));
LOGGER.debug("Sending DELETE to URI {}", requestURI);
return future;
}

protected void asyncHttpResponseTreatment(AsyncResult<HttpResponse<Buffer>> res, CompletableFuture future, String logMessage) {
if (res.succeeded() && (res.result().statusCode() == 200 || res.result().statusCode() == 201)) {
future.complete(res.result().bodyAsString() != null ? res.result().bodyAsString() : "Successfully performed: " + logMessage);
} else {
future.completeExceptionally(new DataIndexServiceException(getErrorMessage(logMessage, res.result())));
}
}

public CompletableFuture sendPatchClientRequest(WebClient webClient, String requestURI, String logMessage, JsonObject jsonBody) {
CompletableFuture future = new CompletableFuture<>();
webClient.patch(requestURI)
.putHeader("Authorization", getAuthHeader())
.sendJson(jsonBody, res -> asyncHttpResponseTreatment(res, future, logMessage));
return future;
}

protected String getErrorMessage(String logMessage, HttpResponse<Buffer> result) {
String errorMessage = "FAILED: " + logMessage;
if (result != null) {
errorMessage += " errorCode:" + result.statusCode() +
" errorStatus:" + result.statusMessage() +
" errorMessage:" + (result.body() != null ? result.body().toString() : "-");
}
return errorMessage;
}

public String getAuthHeader() {
if (identity != null && identity.getCredential(TokenCredential.class) != null) {
return "Bearer " + identity.getCredential(TokenCredential.class).getToken();
}
return "";
}

@Inject
public void setIdentity(SecurityIdentity identity) {
this.identity = identity;
}

@Inject
public void setVertx(Vertx vertx) {
this.vertx = vertx;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.index.api;

import java.nio.Buffer;
import java.util.*;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.kie.kogito.index.TestUtils;
import org.kie.kogito.index.model.Job;
import org.kie.kogito.index.model.ProcessInstance;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import io.quarkus.security.credential.TokenCredential;
import io.quarkus.security.identity.SecurityIdentity;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.client.HttpRequest;
import io.vertx.ext.web.client.HttpResponse;
import io.vertx.ext.web.client.WebClient;
import io.vertx.ext.web.client.WebClientOptions;

import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
public abstract class KogitoRuntimeCommonClientTest {

protected static int ACTIVE = 1;
protected static String SERVICE_URL = "http://runtimeURL.com";
protected static String PROCESS_INSTANCE_ID = "pId";
protected static String JOB_ID = "jobId";
protected static String AUTHORIZED_TOKEN = "authToken";

@Mock
protected Vertx vertx;

@Mock
protected SecurityIdentity identityMock;

protected TokenCredential tokenCredential;

@Mock
protected WebClient webClientMock;

@Mock
protected HttpRequest httpRequestMock;

protected abstract KogitoRuntimeCommonClient getClient();

@Test
void testCancelJob() {
setupIdentityMock();
when(webClientMock.delete(anyString())).thenReturn(httpRequestMock);

Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED");
getClient().cancelJob(SERVICE_URL, job);

verify(getClient()).sendDeleteClientRequest(webClientMock,
format(KogitoRuntimeCommonClient.CANCEL_JOB_PATH, job.getId()),
"CANCEL Job with id: " + JOB_ID);
ArgumentCaptor<Handler> handlerCaptor = ArgumentCaptor.forClass(Handler.class);
verify(httpRequestMock).send(handlerCaptor.capture());
checkResponseHandling(handlerCaptor.getValue());
}

@Test
void testRescheduleJob() {
String newJobData = "{\"expirationTime\": \"2023-08-27T04:35:54.631Z\",\"retries\": 2}";
setupIdentityMock();
when(webClientMock.patch(anyString())).thenReturn(httpRequestMock);

Job job = createJob(JOB_ID, PROCESS_INSTANCE_ID, "SCHEDULED");

getClient().rescheduleJob(SERVICE_URL, job, newJobData);
ArgumentCaptor<JsonObject> jsonCaptor = ArgumentCaptor.forClass(JsonObject.class);

verify(getClient()).sendPatchClientRequest(eq(webClientMock),
eq(format(KogitoRuntimeCommonClient.RESCHEDULE_JOB_PATH, JOB_ID)),
eq("RESCHEDULED JOB with id: " + job.getId()),
jsonCaptor.capture());

assertThat(jsonCaptor.getValue().getString("expirationTime")).isEqualTo("2023-08-27T04:35:54.631Z");
assertThat(jsonCaptor.getValue().getString("retries")).isEqualTo("2");

ArgumentCaptor<Handler> handlerCaptor = ArgumentCaptor.forClass(Handler.class);
JsonObject jsonOject = new JsonObject(newJobData);
verify(httpRequestMock).sendJson(eq(jsonOject), handlerCaptor.capture());
verify(httpRequestMock).putHeader("Authorization", "Bearer " + AUTHORIZED_TOKEN);
checkResponseHandling(handlerCaptor.getValue());
}

@Test
void testWebClientToURLOptions() {
String defaultHost = "localhost";
int defaultPort = 8180;
WebClientOptions webClientOptions = getClient().getWebClientToURLOptions("http://" + defaultHost + ":" + defaultPort);
assertThat(webClientOptions.getDefaultHost()).isEqualTo(defaultHost);
assertThat(webClientOptions.getDefaultPort()).isEqualTo(defaultPort);
}

@Test
void testWebClientToURLOptionsWithoutPort() {
String dataIndexUrl = "http://service.com";
WebClientOptions webClientOptions = getClient().getWebClientToURLOptions(dataIndexUrl);
assertThat(webClientOptions.getDefaultPort()).isEqualTo(80);
assertThat(webClientOptions.getDefaultHost()).isEqualTo("service.com");
assertFalse(webClientOptions.isSsl());
}

@Test
void testWebClientToURLOptionsWithoutPortSSL() {
String dataIndexurl = "https://service.com";
WebClientOptions webClientOptions = getClient().getWebClientToURLOptions(dataIndexurl);
assertThat(webClientOptions.getDefaultPort()).isEqualTo(443);
assertThat(webClientOptions.getDefaultHost()).isEqualTo("service.com");
assertTrue(webClientOptions.isSsl());
}

@Test
void testMalformedURL() {
assertThat(getClient().getWebClientToURLOptions("malformedURL")).isNull();
}

@Test
void testOverrideURL() {
String host = "host.testcontainers.internal";
getClient().setGatewayTargetUrl(Optional.of(host));
WebClientOptions webClientOptions = getClient().getWebClientToURLOptions("http://service.com");
assertThat(webClientOptions.getDefaultHost()).isEqualTo(host);
}

protected AsyncResult createResponseMocks(HttpResponse response, boolean succeed, int statusCode) {
AsyncResult asyncResultMock = mock(AsyncResult.class);
when(asyncResultMock.succeeded()).thenReturn(succeed);
when(asyncResultMock.result()).thenReturn(response);
when(response.statusCode()).thenReturn(statusCode);
return asyncResultMock;
}

private Job createJob(String jobId, String processInstanceId, String status) {
return TestUtils.getJob(jobId, "travels", processInstanceId, null, null, status);
}

protected void checkResponseHandling(Handler<AsyncResult<HttpResponse<Buffer>>> handler) {
HttpResponse response = mock(HttpResponse.class);
HttpResponse responseWithoutError = mock(HttpResponse.class);

handler.handle(createResponseMocks(response, false, 404));
verify(response).statusMessage();
verify(response).body();
verify(response, never()).bodyAsString();

handler.handle(createResponseMocks(responseWithoutError, true, 200));
verify(responseWithoutError, never()).statusMessage();
verify(responseWithoutError, never()).body();
verify(responseWithoutError).bodyAsString();
}

protected void setupIdentityMock() {
tokenCredential = mock(TokenCredential.class);
when(identityMock.getCredential(TokenCredential.class)).thenReturn(tokenCredential);
when(tokenCredential.getToken()).thenReturn(AUTHORIZED_TOKEN);
when(httpRequestMock.putHeader("Authorization", "Bearer " + AUTHORIZED_TOKEN)).thenReturn(httpRequestMock);
}

protected ProcessInstance createProcessInstance(String processInstanceId, int status) {
return TestUtils.getProcessInstance("travels", processInstanceId, status, null, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@
<groupId>org.kie.kogito</groupId>
<artifactId>persistence-commons-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-api</artifactId>
Expand Down
Loading

0 comments on commit 8918d13

Please sign in to comment.