From db2e6fcb4d2b4d4280d4892c98f9ad2d0e9d70b5 Mon Sep 17 00:00:00 2001 From: Helber Belmiro Date: Fri, 11 Aug 2023 10:25:55 -0300 Subject: [PATCH] KOGITO-9158 Use RestWorkItemHandler as HTTP client in Knative custom function (#2993) * KOGITO-9158 Used RestWorkItemHandler as HTTP client in Knative custom function Signed-off-by: Helber Belmiro * KOGITO-9158 Refactoring KnativeTypeHandler.java Signed-off-by: Helber Belmiro * KOGITO-9158 Changed request timeout Signed-off-by: Helber Belmiro * KOGITO-9158 Changed KnativeWorkItemHandler to extend RestWorkItemHandler Signed-off-by: Helber Belmiro * KOGITO-9158 Fixed KnativeWorkItemHandlerProducer static method Signed-off-by: Helber Belmiro --------- Signed-off-by: Helber Belmiro --- .../workitem/rest/RestWorkItemHandler.java | 29 +- .../KnativeResourceDiscoveryTestUtil.java | 2 +- .../addons/knative/serving/deployment/pom.xml | 4 + .../customfunctions/KnativeTypeHandler.java | 67 ++- .../knative/serving/integration-tests/pom.xml | 136 +++++++ .../resources/arrayKnativeFunction.sw.json | 34 ++ .../cloudEventKnativeFunction.sw.json | 38 ++ ...dEventWithIdAsParamKnativeFunction.sw.json | 38 ++ ...ntWithIdAsPlainJsonKnativeFunction.sw.json | 38 ++ ...dEventWithMissingIdKnativeFunction.sw.json | 37 ++ ...ithoutIdAsPlainJsonKnativeFunction.sw.json | 37 ++ .../emptyParamsKnativeFunction.sw.json | 28 ++ .../invalidCloudEventKnativeFunction.sw.json | 36 ++ .../plainJsonKnativeFunction.sw.json | 31 ++ .../serviceNotFoundKnativeFunction.sw.json | 38 ++ .../resources/timeoutKnativeFunction.sw.json | 31 ++ .../it/KnativeServingAddonIT.java | 326 +++++++++++++++ .../src/test/resources/application.properties | 1 + .../resources/knative/quarkus-greeting.yaml | 54 +++ quarkus/addons/knative/serving/pom.xml | 1 + .../addons/knative/serving/runtime/pom.xml | 13 + .../CloudEventKnativeParamsDecorator.java | 49 +++ ...CloudEventKnativeServiceRequestClient.java | 105 ----- .../KnativeFunctionPayloadSupplier.java | 45 ++ ...ativeServerlessWorkflowCustomFunction.java | 74 ---- .../KnativeServiceRequestClient.java | 91 ----- .../KnativeServiceRequestClientResolver.java | 38 -- .../KnativeWorkItemHandler.java | 59 ++- .../KnativeWorkItemHandlerProducer.java | 48 +++ .../serving/customfunctions/Operation.java | 22 +- .../PlainJsonKnativeParamsDecorator.java | 43 ++ .../PlainJsonKnativeServiceRequestClient.java | 109 ----- ...dEventKnativeServiceRequestClientTest.java | 152 ------- ...eServerlessWorkflowCustomFunctionTest.java | 384 ------------------ .../src/test/resources/application.properties | 1 - 35 files changed, 1253 insertions(+), 986 deletions(-) create mode 100644 quarkus/addons/knative/serving/integration-tests/pom.xml create mode 100644 quarkus/addons/knative/serving/integration-tests/src/main/resources/arrayKnativeFunction.sw.json create mode 100644 quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventKnativeFunction.sw.json create mode 100644 quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithIdAsParamKnativeFunction.sw.json create mode 100644 quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithIdAsPlainJsonKnativeFunction.sw.json create mode 100644 quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithMissingIdKnativeFunction.sw.json create mode 100644 quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithoutIdAsPlainJsonKnativeFunction.sw.json create mode 100644 quarkus/addons/knative/serving/integration-tests/src/main/resources/emptyParamsKnativeFunction.sw.json create mode 100644 quarkus/addons/knative/serving/integration-tests/src/main/resources/invalidCloudEventKnativeFunction.sw.json create mode 100644 quarkus/addons/knative/serving/integration-tests/src/main/resources/plainJsonKnativeFunction.sw.json create mode 100644 quarkus/addons/knative/serving/integration-tests/src/main/resources/serviceNotFoundKnativeFunction.sw.json create mode 100644 quarkus/addons/knative/serving/integration-tests/src/main/resources/timeoutKnativeFunction.sw.json create mode 100644 quarkus/addons/knative/serving/integration-tests/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/it/KnativeServingAddonIT.java create mode 100644 quarkus/addons/knative/serving/integration-tests/src/test/resources/application.properties create mode 100644 quarkus/addons/knative/serving/integration-tests/src/test/resources/knative/quarkus-greeting.yaml create mode 100644 quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeParamsDecorator.java delete mode 100644 quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeServiceRequestClient.java create mode 100644 quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeFunctionPayloadSupplier.java delete mode 100644 quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServerlessWorkflowCustomFunction.java delete mode 100644 quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServiceRequestClient.java delete mode 100644 quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServiceRequestClientResolver.java create mode 100644 quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeWorkItemHandlerProducer.java create mode 100644 quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/PlainJsonKnativeParamsDecorator.java delete mode 100644 quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/PlainJsonKnativeServiceRequestClient.java delete mode 100644 quarkus/addons/knative/serving/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeServiceRequestClientTest.java delete mode 100644 quarkus/addons/knative/serving/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServerlessWorkflowCustomFunctionTest.java delete mode 100644 quarkus/addons/knative/serving/runtime/src/test/resources/application.properties diff --git a/kogito-workitems/kogito-rest-workitem/src/main/java/org/kogito/workitem/rest/RestWorkItemHandler.java b/kogito-workitems/kogito-rest-workitem/src/main/java/org/kogito/workitem/rest/RestWorkItemHandler.java index 1020f79c5d2..f04a52acb3e 100644 --- a/kogito-workitems/kogito-rest-workitem/src/main/java/org/kogito/workitem/rest/RestWorkItemHandler.java +++ b/kogito-workitems/kogito-rest-workitem/src/main/java/org/kogito/workitem/rest/RestWorkItemHandler.java @@ -17,6 +17,7 @@ import java.net.MalformedURLException; import java.net.URL; +import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -82,6 +83,8 @@ public class RestWorkItemHandler implements KogitoWorkItemHandler { public static final String PATH_PARAM_RESOLVER = "PathParamResolver"; public static final String AUTH_METHOD = "AuthMethod"; + public static final String REQUEST_TIMEOUT_IN_MILLIS = "RequestTimeout"; + public static final int DEFAULT_PORT = 80; public static final int DEFAULT_SSL_PORT = 443; @@ -174,7 +177,10 @@ public void executeWorkItem(KogitoWorkItem workItem, KogitoWorkItemManager manag requestDecorators.forEach(d -> d.decorate(workItem, parameters, request)); authDecorators.forEach(d -> d.decorate(workItem, parameters, request)); paramsDecorator.decorate(workItem, parameters, request); - HttpResponse response = method.equals(HttpMethod.POST) || method.equals(HttpMethod.PUT) ? request.sendJsonAndAwait(bodyBuilder.apply(parameters)) : request.sendAndAwait(); + Duration requestTimeout = getRequestTimeout(parameters); + HttpResponse response = method.equals(HttpMethod.POST) || method.equals(HttpMethod.PUT) + ? sendJson(request, bodyBuilder.apply(parameters), requestTimeout) + : send(request, requestTimeout); int statusCode = response.statusCode(); if (statusCode < 200 || statusCode >= 300) { throw new WorkItemExecutionException(Integer.toString(statusCode), "Request for endpoint " + endPoint + " failed with message: " + response.statusMessage()); @@ -182,6 +188,27 @@ public void executeWorkItem(KogitoWorkItem workItem, KogitoWorkItemManager manag manager.completeWorkItem(workItem.getStringId(), Collections.singletonMap(RESULT, resultHandler.apply(response, targetInfo))); } + private static HttpResponse sendJson(HttpRequest request, Object body, Duration requestTimeout) { + if (requestTimeout == null) { + return request.sendJsonAndAwait(body); + } else { + return request.sendJson(body).await().atMost(requestTimeout); + } + } + + private static HttpResponse send(HttpRequest request, Duration requestTimeout) { + if (requestTimeout == null) { + return request.sendAndAwait(); + } else { + return request.send().await().atMost(requestTimeout); + } + } + + private static Duration getRequestTimeout(Map parameters) { + Long requestTimeoutInMillis = getParam(parameters, REQUEST_TIMEOUT_IN_MILLIS, Long.class, null); + return requestTimeoutInMillis == null ? null : Duration.ofMillis(requestTimeoutInMillis); + } + private Class getTargetInfo(KogitoWorkItem workItem) { String varName = ((WorkItemNode) ((WorkItemNodeInstance) workItem.getNodeInstance()).getNode()).getIoSpecification().getOutputMappingBySources().get(RESULT); if (varName != null) { diff --git a/quarkus/addons/fabric8-kubernetes-service-catalog/test-utils/src/main/java/org/kie/kogito/addons/quarkus/k8s/test/utils/KnativeResourceDiscoveryTestUtil.java b/quarkus/addons/fabric8-kubernetes-service-catalog/test-utils/src/main/java/org/kie/kogito/addons/quarkus/k8s/test/utils/KnativeResourceDiscoveryTestUtil.java index c156609e027..f2bbbb4c3cc 100644 --- a/quarkus/addons/fabric8-kubernetes-service-catalog/test-utils/src/main/java/org/kie/kogito/addons/quarkus/k8s/test/utils/KnativeResourceDiscoveryTestUtil.java +++ b/quarkus/addons/fabric8-kubernetes-service-catalog/test-utils/src/main/java/org/kie/kogito/addons/quarkus/k8s/test/utils/KnativeResourceDiscoveryTestUtil.java @@ -32,7 +32,7 @@ public static void createServiceIfNotExists(KubernetesServer k8sServer, String k @SuppressWarnings("deprecation") // Quarkus LTS 2.13 compatibility public static void createServiceIfNotExists(KubernetesServer k8sServer, String knativeYaml, String namespace, String serviceName, String remoteServiceUrl) { - if (k8sServer.getClient().services().inNamespace("test").withName(serviceName).get() == null) { + if (k8sServer.getClient().services().inNamespace(namespace).withName(serviceName).get() == null) { KnativeClient knativeClient = k8sServer.getClient().adapt(KnativeClient.class); Service service = knativeClient.services() diff --git a/quarkus/addons/knative/serving/deployment/pom.xml b/quarkus/addons/knative/serving/deployment/pom.xml index ce5d66cc008..8b915548d23 100644 --- a/quarkus/addons/knative/serving/deployment/pom.xml +++ b/quarkus/addons/knative/serving/deployment/pom.xml @@ -29,6 +29,10 @@ io.quarkus quarkus-vertx-deployment + + org.kie.kogito + kogito-serverless-workflow-rest-parser + diff --git a/quarkus/addons/knative/serving/deployment/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/deployment/customfunctions/KnativeTypeHandler.java b/quarkus/addons/knative/serving/deployment/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/deployment/customfunctions/KnativeTypeHandler.java index d86a4fac52e..0319463bc40 100644 --- a/quarkus/addons/knative/serving/deployment/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/deployment/customfunctions/KnativeTypeHandler.java +++ b/quarkus/addons/knative/serving/deployment/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/deployment/customfunctions/KnativeTypeHandler.java @@ -15,29 +15,86 @@ */ package org.kie.kogito.addons.quarkus.knative.serving.deployment.customfunctions; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +import org.jbpm.compiler.canonical.descriptors.TaskDescriptor; import org.jbpm.ruleflow.core.RuleFlowNodeContainerFactory; +import org.jbpm.ruleflow.core.factory.NodeFactory; import org.jbpm.ruleflow.core.factory.WorkItemNodeFactory; +import org.kie.kogito.addons.quarkus.knative.serving.customfunctions.CloudEventKnativeParamsDecorator; import org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler; +import org.kie.kogito.addons.quarkus.knative.serving.customfunctions.Operation; +import org.kie.kogito.addons.quarkus.knative.serving.customfunctions.PlainJsonKnativeParamsDecorator; import org.kie.kogito.serverless.workflow.parser.ParserContext; +import org.kie.kogito.serverless.workflow.parser.VariableInfo; import org.kie.kogito.serverless.workflow.parser.types.WorkItemTypeHandler; +import org.kie.kogito.serverless.workflow.suppliers.ParamsRestBodyBuilderSupplier; +import org.kogito.workitem.rest.RestWorkItemHandler; + +import com.github.javaparser.ast.expr.Expression; import io.serverlessworkflow.api.Workflow; import io.serverlessworkflow.api.functions.FunctionDefinition; +import io.serverlessworkflow.api.functions.FunctionRef; +import io.vertx.core.http.HttpMethod; +import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.PAYLOAD_FIELDS_PROPERTY_NAME; import static org.kie.kogito.serverless.workflow.parser.FunctionTypeHandlerFactory.trimCustomOperation; +import static org.kie.kogito.serverless.workflow.utils.ServerlessWorkflowUtils.runtimeRestApi; public class KnativeTypeHandler extends WorkItemTypeHandler { + private static final String DEFAULT_REQUEST_TIMEOUT_VALUE = "10000"; + + @Override + public NodeFactory getActionNode(Workflow workflow, ParserContext context, + RuleFlowNodeContainerFactory embeddedSubProcess, FunctionDefinition functionDef, + FunctionRef functionRef, VariableInfo varInfo) { + validateArgs(functionRef); + + WorkItemNodeFactory node = buildWorkItem(embeddedSubProcess, context, varInfo.getInputVar(), varInfo.getOutputVar()) + .name(functionDef.getName()); + + if (functionRef.getArguments() != null && !functionRef.getArguments().isEmpty()) { + List payloadFields = new ArrayList<>(); + functionRef.getArguments().fieldNames().forEachRemaining(payloadFields::add); + if (!payloadFields.isEmpty()) { + node.workParameter(PAYLOAD_FIELDS_PROPERTY_NAME, String.join(";", payloadFields)); + } + } + + return addFunctionArgs(workflow, + fillWorkItemHandler(workflow, context, node, functionDef), + functionRef); + } + @Override - protected > WorkItemNodeFactory fillWorkItemHandler(Workflow workflow, - ParserContext context, - WorkItemNodeFactory node, - FunctionDefinition functionDef) { + protected > WorkItemNodeFactory fillWorkItemHandler( + Workflow workflow, ParserContext context, WorkItemNodeFactory node, FunctionDefinition functionDef) { if (functionDef.getMetadata() != null) { functionDef.getMetadata().forEach(node::metaData); } - return node.workName(KnativeWorkItemHandler.NAME).metaData(KnativeWorkItemHandler.OPERATION_PROPERTY_NAME, trimCustomOperation(functionDef)); + Operation operation = Operation.parse(trimCustomOperation(functionDef)); + + if (operation.isCloudEvent()) { + node.workParameter(RestWorkItemHandler.PARAMS_DECORATOR, CloudEventKnativeParamsDecorator.class.getName()); + } else { + node.workParameter(RestWorkItemHandler.PARAMS_DECORATOR, PlainJsonKnativeParamsDecorator.class.getName()); + } + + Supplier requestTimeout = runtimeRestApi(functionDef, "timeout", + context.getContext(), String.class, DEFAULT_REQUEST_TIMEOUT_VALUE); + + return node.workParameter(KnativeWorkItemHandler.SERVICE_PROPERTY_NAME, operation.getService()) + .workParameter(KnativeWorkItemHandler.PATH_PROPERTY_NAME, operation.getPath()) + .workParameter(RestWorkItemHandler.BODY_BUILDER, new ParamsRestBodyBuilderSupplier()) + .workParameter(RestWorkItemHandler.METHOD, HttpMethod.POST) + .workParameter(RestWorkItemHandler.REQUEST_TIMEOUT_IN_MILLIS, requestTimeout) + .metaData(TaskDescriptor.KEY_WORKITEM_TYPE, RestWorkItemHandler.REST_TASK_TYPE) + .workName(KnativeWorkItemHandler.NAME); } @Override diff --git a/quarkus/addons/knative/serving/integration-tests/pom.xml b/quarkus/addons/knative/serving/integration-tests/pom.xml new file mode 100644 index 00000000000..f1354419700 --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/pom.xml @@ -0,0 +1,136 @@ + + + 4.0.0 + + org.kie.kogito + kogito-addons-quarkus-knative-serving-parent + 2.0.0-SNAPSHOT + + + kogito-addons-quarkus-knative-serving-integration-tests + Kogito Add-On Knative Serving - Integration Tests + Knative Serving Kogito Add-On Integration Tests. + + + + + org.kie.kogito + kogito-quarkus-bom + ${project.version} + pom + import + + + + + + + io.quarkus + quarkus-smallrye-openapi + + + org.kie.kogito + kogito-quarkus-serverless-workflow + + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-resteasy-jackson + + + org.kie.kogito + kogito-addons-quarkus-knative-serving + + + org.kie.kogito + kogito-addons-quarkus-kubernetes + + + org.kie.kogito + kogito-addons-quarkus-fabric8-kubernetes-service-catalog + + + io.quarkus + quarkus-kubernetes + + + + + org.assertj + assertj-core + test + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + com.github.tomakehurst + wiremock-jre8 + test + + + io.quarkus + quarkus-test-kubernetes-client + test + + + org.kie.kogito + kogito-addons-quarkus-fabric8-kubernetes-service-catalog-test-utils + test + + + + + + + + io.quarkus + quarkus-maven-plugin + + true + ${skipTests} + + + + + + + io.quarkus + quarkus-maven-plugin + + + + build + + + + + + + + + native + + + native + + + + native + + + + \ No newline at end of file diff --git a/quarkus/addons/knative/serving/integration-tests/src/main/resources/arrayKnativeFunction.sw.json b/quarkus/addons/knative/serving/integration-tests/src/main/resources/arrayKnativeFunction.sw.json new file mode 100644 index 00000000000..f13f39163eb --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/main/resources/arrayKnativeFunction.sw.json @@ -0,0 +1,34 @@ +{ + "id": "arrayKnativeFunction", + "version": "1.0", + "name": "Test Knative function", + "description": "This workflow tests a Knative function", + "start": "invokeFunction", + "functions": [ + { + "name": "greet", + "type": "custom", + "operation": "knative:services.v1.serving.knative.dev/default/serverless-workflow-greeting-quarkus?path=/arrayFunction" + } + ], + "states": [ + { + "name": "invokeFunction", + "type": "operation", + "actions": [ + { + "functionRef": { + "refName": "greet", + "arguments": { + "array": [ + "Javierito", + "Pepito" + ] + } + } + } + ], + "end": true + } + ] +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventKnativeFunction.sw.json b/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventKnativeFunction.sw.json new file mode 100644 index 00000000000..d40a675d1fd --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventKnativeFunction.sw.json @@ -0,0 +1,38 @@ +{ + "id": "cloudEvent", + "version": "1.0", + "name": "Test Knative function", + "description": "This workflow tests a Knative function", + "start": "invokeFunction", + "functions": [ + { + "name": "greet", + "type": "custom", + "operation": "knative:services.v1.serving.knative.dev/default/serverless-workflow-greeting-quarkus?asCloudEvent=true&path=/cloud-event" + } + ], + "states": [ + { + "name": "invokeFunction", + "type": "operation", + "actions": [ + { + "functionRef": { + "refName": "greet", + "arguments": { + "specversion": "1.0", + "id": "42", + "source": "https://localhost:8080", + "type": "org.kie.kogito.test", + "data": { + "org": "Acme", + "project": "Kogito" + } + } + } + } + ], + "end": true + } + ] +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithIdAsParamKnativeFunction.sw.json b/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithIdAsParamKnativeFunction.sw.json new file mode 100644 index 00000000000..544a9ab9d24 --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithIdAsParamKnativeFunction.sw.json @@ -0,0 +1,38 @@ +{ + "id": "cloudEventWithIdAsParam", + "version": "1.0", + "name": "Test Knative function", + "description": "This workflow tests a Knative function", + "start": "invokeFunction", + "functions": [ + { + "name": "greet", + "type": "custom", + "operation": "knative:services.v1.serving.knative.dev/default/serverless-workflow-greeting-quarkus?asCloudEvent=true&path=/cloud-event" + } + ], + "states": [ + { + "name": "invokeFunction", + "type": "operation", + "actions": [ + { + "functionRef": { + "refName": "greet", + "arguments": { + "specversion": "1.0", + "id": "42", + "source": "https://localhost:8080", + "type": "org.kie.kogito.test", + "data": { + "org": "Acme", + "project": "Kogito" + } + } + } + } + ], + "end": true + } + ] +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithIdAsPlainJsonKnativeFunction.sw.json b/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithIdAsPlainJsonKnativeFunction.sw.json new file mode 100644 index 00000000000..81110d98b84 --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithIdAsPlainJsonKnativeFunction.sw.json @@ -0,0 +1,38 @@ +{ + "id": "cloudEventWithIdAsPlainJson", + "version": "1.0", + "name": "Test Knative function", + "description": "This workflow tests a Knative function", + "start": "invokeFunction", + "functions": [ + { + "name": "greet", + "type": "custom", + "operation": "knative:services.v1.serving.knative.dev/default/serverless-workflow-greeting-quarkus?path=/plainJsonFunction" + } + ], + "states": [ + { + "name": "invokeFunction", + "type": "operation", + "actions": [ + { + "functionRef": { + "refName": "greet", + "arguments": { + "specversion": "1.0", + "id": "42", + "source": "https://localhost:8080", + "type": "org.kie.kogito.test", + "data": { + "org": "Acme", + "project": "Kogito" + } + } + } + } + ], + "end": true + } + ] +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithMissingIdKnativeFunction.sw.json b/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithMissingIdKnativeFunction.sw.json new file mode 100644 index 00000000000..b9c23c82dc8 --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithMissingIdKnativeFunction.sw.json @@ -0,0 +1,37 @@ +{ + "id": "cloudEventWithMissingId", + "version": "1.0", + "name": "Test Knative function", + "description": "This workflow tests a Knative function", + "start": "invokeFunction", + "functions": [ + { + "name": "greet", + "type": "custom", + "operation": "knative:services.v1.serving.knative.dev/default/serverless-workflow-greeting-quarkus?asCloudEvent=true&path=/cloud-event" + } + ], + "states": [ + { + "name": "invokeFunction", + "type": "operation", + "actions": [ + { + "functionRef": { + "refName": "greet", + "arguments": { + "specversion": "1.0", + "source": "https://localhost:8080", + "type": "org.kie.kogito.test", + "data": { + "org": "Acme", + "project": "Kogito" + } + } + } + } + ], + "end": true + } + ] +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithoutIdAsPlainJsonKnativeFunction.sw.json b/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithoutIdAsPlainJsonKnativeFunction.sw.json new file mode 100644 index 00000000000..d2bcc94e0df --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/main/resources/cloudEventWithoutIdAsPlainJsonKnativeFunction.sw.json @@ -0,0 +1,37 @@ +{ + "id": "cloudEventWithoutIdAsPlainJson", + "version": "1.0", + "name": "Test Knative function", + "description": "This workflow tests a Knative function", + "start": "invokeFunction", + "functions": [ + { + "name": "greet", + "type": "custom", + "operation": "knative:services.v1.serving.knative.dev/default/serverless-workflow-greeting-quarkus?path=/plainJsonFunction" + } + ], + "states": [ + { + "name": "invokeFunction", + "type": "operation", + "actions": [ + { + "functionRef": { + "refName": "greet", + "arguments": { + "specversion": "1.0", + "source": "https://localhost:8080", + "type": "org.kie.kogito.test", + "data": { + "org": "Acme", + "project": "Kogito" + } + } + } + } + ], + "end": true + } + ] +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/main/resources/emptyParamsKnativeFunction.sw.json b/quarkus/addons/knative/serving/integration-tests/src/main/resources/emptyParamsKnativeFunction.sw.json new file mode 100644 index 00000000000..5c1d0300893 --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/main/resources/emptyParamsKnativeFunction.sw.json @@ -0,0 +1,28 @@ +{ + "id": "emptyParamsKnativeFunction", + "version": "1.0", + "name": "Test Knative function", + "description": "This workflow tests a Knative function", + "start": "invokeFunction", + "functions": [ + { + "name": "greet", + "type": "custom", + "operation": "knative:services.v1.serving.knative.dev/default/serverless-workflow-greeting-quarkus?path=/emptyParamsKnativeFunction" + } + ], + "states": [ + { + "name": "invokeFunction", + "type": "operation", + "actions": [ + { + "functionRef": { + "refName": "greet" + } + } + ], + "end": true + } + ] +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/main/resources/invalidCloudEventKnativeFunction.sw.json b/quarkus/addons/knative/serving/integration-tests/src/main/resources/invalidCloudEventKnativeFunction.sw.json new file mode 100644 index 00000000000..ec9e56d1fc4 --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/main/resources/invalidCloudEventKnativeFunction.sw.json @@ -0,0 +1,36 @@ +{ + "id": "invalidCloudEvent", + "version": "1.0", + "name": "Test Knative function", + "description": "This workflow tests a Knative function", + "start": "invokeFunction", + "functions": [ + { + "name": "greet", + "type": "custom", + "operation": "knative:services.v1.serving.knative.dev/default/serverless-workflow-greeting-quarkus?asCloudEvent=true&path=/cloud-event" + } + ], + "states": [ + { + "name": "invokeFunction", + "type": "operation", + "actions": [ + { + "functionRef": { + "refName": "greet", + "arguments": { + "specversion": "1.0", + "source": "https://localhost:8080", + "data": { + "org": "Acme", + "project": "Kogito" + } + } + } + } + ], + "end": true + } + ] +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/main/resources/plainJsonKnativeFunction.sw.json b/quarkus/addons/knative/serving/integration-tests/src/main/resources/plainJsonKnativeFunction.sw.json new file mode 100644 index 00000000000..9fcb6b0bb12 --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/main/resources/plainJsonKnativeFunction.sw.json @@ -0,0 +1,31 @@ +{ + "id": "plainJsonKnativeFunction", + "version": "1.0", + "name": "Test Knative function", + "description": "This workflow tests a Knative function", + "start": "invokeFunction", + "functions": [ + { + "name": "greet", + "type": "custom", + "operation": "knative:services.v1.serving.knative.dev/default/serverless-workflow-greeting-quarkus?path=/plainJsonFunction" + } + ], + "states": [ + { + "name": "invokeFunction", + "type": "operation", + "actions": [ + { + "functionRef": { + "refName": "greet", + "arguments": { + "name": ".name" + } + } + } + ], + "end": true + } + ] +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/main/resources/serviceNotFoundKnativeFunction.sw.json b/quarkus/addons/knative/serving/integration-tests/src/main/resources/serviceNotFoundKnativeFunction.sw.json new file mode 100644 index 00000000000..bf7e28c7c8f --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/main/resources/serviceNotFoundKnativeFunction.sw.json @@ -0,0 +1,38 @@ +{ + "id": "serviceNotFound", + "version": "1.0", + "name": "Test Knative function", + "description": "This workflow tests a Knative function", + "start": "invokeFunction", + "functions": [ + { + "name": "greet", + "type": "custom", + "operation": "knative:services.v1.serving.knative.dev/default/serverless-workflow-greeting-quarkus?asCloudEvent=true&path=/non_existing_path" + } + ], + "states": [ + { + "name": "invokeFunction", + "type": "operation", + "actions": [ + { + "functionRef": { + "refName": "greet", + "arguments": { + "specversion": "1.0", + "id": "42", + "source": "https://localhost:8080", + "type": "org.kie.kogito.test", + "data": { + "org": "Acme", + "project": "Kogito" + } + } + } + } + ], + "end": true + } + ] +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/main/resources/timeoutKnativeFunction.sw.json b/quarkus/addons/knative/serving/integration-tests/src/main/resources/timeoutKnativeFunction.sw.json new file mode 100644 index 00000000000..98dfc95f9d9 --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/main/resources/timeoutKnativeFunction.sw.json @@ -0,0 +1,31 @@ +{ + "id": "timeoutKnativeFunction", + "version": "1.0", + "name": "Test Knative function", + "description": "This workflow tests a Knative function", + "start": "invokeFunction", + "functions": [ + { + "name": "greet_with_timeout", + "type": "custom", + "operation": "knative:services.v1.serving.knative.dev/default/serverless-workflow-greeting-quarkus?path=/timeout" + } + ], + "states": [ + { + "name": "invokeFunction", + "type": "operation", + "actions": [ + { + "functionRef": { + "refName": "greet_with_timeout", + "arguments": { + "name": ".name" + } + } + } + ], + "end": true + } + ] +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/it/KnativeServingAddonIT.java b/quarkus/addons/knative/serving/integration-tests/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/it/KnativeServingAddonIT.java new file mode 100644 index 00000000000..60b92485d7d --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/it/KnativeServingAddonIT.java @@ -0,0 +1,326 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kie.kogito.addons.quarkus.knative.serving.customfunctions.it; + +import java.net.HttpURLConnection; + +import org.eclipse.microprofile.config.inject.ConfigProperty; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import com.github.tomakehurst.wiremock.core.WireMockConfiguration; + +import io.fabric8.kubernetes.client.server.mock.KubernetesServer; +import io.quarkus.test.junit.QuarkusTest; +import io.quarkus.test.kubernetes.client.KubernetesTestServer; +import io.quarkus.test.kubernetes.client.WithKubernetesTestServer; +import io.restassured.http.ContentType; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; +import static com.github.tomakehurst.wiremock.client.WireMock.equalToJson; +import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath; +import static com.github.tomakehurst.wiremock.client.WireMock.post; +import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static io.restassured.RestAssured.given; +import static org.hamcrest.CoreMatchers.is; +import static org.kie.kogito.addons.quarkus.k8s.test.utils.KnativeResourceDiscoveryTestUtil.createServiceIfNotExists; +import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8; + +@QuarkusTest +@WithKubernetesTestServer +class KnativeServingAddonIT { + + private static final String NAMESPACE = "default"; + + private static final String SERVICENAME = "serverless-workflow-greeting-quarkus"; + + private static final String CLOUD_EVENT_PATH = "/cloud-event"; + + public static final String AT_LEAST_ONE_NON_WHITE_CHARACTER_REGEX = ".*\\S.*"; + + private static WireMockServer wireMockServer; + + private static String remoteServiceUrl; + + @ConfigProperty(name = "kogito.sw.functions.greet_with_timeout.timeout") + Long requestTimeout; + + @KubernetesTestServer + KubernetesServer mockServer; + + @BeforeAll + static void beforeAll() { + createWiremockServer(); + } + + @BeforeEach + void beforeEach() { + createServiceIfNotExists(mockServer, "knative/quarkus-greeting.yaml", NAMESPACE, SERVICENAME, remoteServiceUrl); + } + + @AfterAll + static void afterAll() { + if (wireMockServer != null) { + wireMockServer.stop(); + } + } + + @Test + void executeWithEmptyParameters() { + mockExecuteWithEmptyParametersEndpoint(); + + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .body("{\"workflowdata\":{}}").when() + .post("/emptyParamsKnativeFunction") + .then() + .statusCode(HttpURLConnection.HTTP_CREATED) + .body("workflowdata.org", is("Acme")) + .body("workflowdata.project", is("Kogito")); + } + + @Test + void executeWithParameters() { + mockExecuteWithParametersEndpoint(); + + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .body("{\"name\": \"hbelmiro\" }").when() + .post("/plainJsonKnativeFunction") + .then() + .statusCode(HttpURLConnection.HTTP_CREATED) + .body("workflowdata.message", is("Hello")); + } + + @Test + void executeWithArray() { + mockExecuteWithArrayEndpoint(); + + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON).when() + .post("/arrayKnativeFunction") + .then() + .statusCode(HttpURLConnection.HTTP_CREATED) + .body("workflowdata.message", is(JsonNodeFactory.instance.arrayNode().add(23).add(24).toPrettyString())); + } + + @Test + void executeWithParametersShouldSendOnlyFunctionArgs() { + mockExecuteWithParametersEndpoint(); + + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .body("{\"name\": \"hbelmiro\", \"should_not_be_sent\" : \"value\" }").when() + .post("/plainJsonKnativeFunction") + .then() + .statusCode(HttpURLConnection.HTTP_CREATED) + .body("workflowdata.message", is("Hello")); + } + + @Test + void executeWithCloudEventWithIdAsPlainJson() { + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON).when() + .post("/cloudEventWithIdAsPlainJson") + .then() + .statusCode(HttpURLConnection.HTTP_BAD_REQUEST); + } + + @Test + void executeWithCloudEventWithoutIdAsPlainJson() { + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON).when() + .post("/cloudEventWithoutIdAsPlainJson") + .then() + .statusCode(HttpURLConnection.HTTP_BAD_REQUEST); + } + + @Test + void executeCloudEvent() { + mockExecuteCloudEventWithParametersEndpoint(); + + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .post("/cloudEvent") + .then() + .statusCode(HttpURLConnection.HTTP_CREATED) + .body("workflowdata.message", is("CloudEvents are awesome!")) + .body("workflowdata.object", is(JsonNodeFactory.instance.objectNode() + .put("long", 42L) + .put("String", "xpto").toPrettyString())); + + wireMockServer.verify(postRequestedFor(urlEqualTo(CLOUD_EVENT_PATH)) + .withRequestBody(matchingJsonPath("$.id", equalTo("42"))) + .withHeader("Content-Type", equalTo(APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8))); + } + + @Test + void executeCloudEventWithMissingIdShouldNotThrowException() { + mockExecuteCloudEventWithParametersEndpoint(); + + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .post("/cloudEventWithMissingId") + .then() + .statusCode(HttpURLConnection.HTTP_CREATED) + .body("workflowdata.message", is("CloudEvents are awesome!")) + .body("workflowdata.object", is(JsonNodeFactory.instance.objectNode() + .put("long", 42L) + .put("String", "xpto").toPrettyString())); + + wireMockServer.verify(postRequestedFor(urlEqualTo(CLOUD_EVENT_PATH)) + .withRequestBody(matchingJsonPath("$.id", WireMock.matching(AT_LEAST_ONE_NON_WHITE_CHARACTER_REGEX))) + .withHeader("Content-Type", equalTo(APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8))); + } + + @Test + void cloudEventWithIdMustBeSentAsIs() { + mockExecuteCloudEventWithParametersEndpoint(); + + String id = "42"; + + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .body("{\"id\": \"" + id + "\" }").when() + .post("/cloudEventWithIdAsParam") + .then() + .statusCode(HttpURLConnection.HTTP_CREATED) + .body("workflowdata.message", is("CloudEvents are awesome!")) + .body("workflowdata.object", is(JsonNodeFactory.instance.objectNode() + .put("long", 42L) + .put("String", "xpto").toPrettyString())); + + wireMockServer.verify(postRequestedFor(urlEqualTo(CLOUD_EVENT_PATH)) + .withRequestBody(matchingJsonPath("$.id", equalTo(id))) + .withHeader("Content-Type", equalTo(APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8))); + } + + @Test + void executeWithInvalidCloudEvent() { + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON).when() + .post("/invalidCloudEvent") + .then() + .statusCode(HttpURLConnection.HTTP_INTERNAL_ERROR); + } + + @Test + void execute404() { + mockExecute404Endpoint(); + + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON).when() + .post("/serviceNotFound") + .then() + .statusCode(HttpURLConnection.HTTP_INTERNAL_ERROR); + } + + @Test + void executeTimeout() { + mockExecuteTimeoutEndpoint(); + + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .body("{\"name\": \"hbelmiro\" }").when() + .post("/timeoutKnativeFunction") + .then() + .statusCode(HttpURLConnection.HTTP_INTERNAL_ERROR); + } + + private void mockExecuteTimeoutEndpoint() { + wireMockServer.stubFor(post(urlEqualTo("/timeout")) + .willReturn(aResponse() + .withFixedDelay(requestTimeout.intValue() + 500) + .withStatus(200))); + } + + private void mockExecute404Endpoint() { + wireMockServer.stubFor(post(urlEqualTo("/non_existing_path")) + .willReturn(aResponse() + .withStatus(404))); + } + + private void mockExecuteCloudEventWithParametersEndpoint() { + wireMockServer.stubFor(post(urlEqualTo(CLOUD_EVENT_PATH)) + .willReturn(aResponse() + .withStatus(200) + .withHeader("Content-Type", APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8) + .withJsonBody(JsonNodeFactory.instance.objectNode() + .put("message", "CloudEvents are awesome!") + .put("object", JsonNodeFactory.instance.objectNode() + .put("long", 42L) + .put("String", "xpto").toPrettyString())))); + } + + private void mockExecuteWithEmptyParametersEndpoint() { + wireMockServer.stubFor(post(urlEqualTo("/emptyParamsKnativeFunction")) + .willReturn(aResponse() + .withStatus(HttpURLConnection.HTTP_OK) + .withHeader("Content-Type", "application/json") + .withJsonBody(JsonNodeFactory.instance.objectNode() + .put("org", "Acme") + .put("project", "Kogito")))); + } + + private void mockExecuteWithParametersEndpoint() { + wireMockServer.stubFor(post(urlEqualTo("/plainJsonFunction")) + .withRequestBody(equalToJson(JsonNodeFactory.instance.objectNode() + .put("name", "hbelmiro") + .toString())) + .willReturn(aResponse() + .withStatus(HttpURLConnection.HTTP_OK) + .withHeader("Content-Type", "application/json") + .withJsonBody(JsonNodeFactory.instance.objectNode() + .put("message", "Hello")))); + } + + private void mockExecuteWithArrayEndpoint() { + wireMockServer.stubFor(post(urlEqualTo("/arrayFunction")) + .withRequestBody(equalToJson(JsonNodeFactory.instance.objectNode() + .set("array", JsonNodeFactory.instance.arrayNode().add("Javierito").add("Pepito")) + .toString())) + .willReturn(aResponse() + .withStatus(HttpURLConnection.HTTP_OK) + .withHeader("Content-Type", "application/json") + .withJsonBody(JsonNodeFactory.instance.objectNode() + .put("message", JsonNodeFactory.instance.arrayNode().add(23).add(24).toPrettyString())))); + } + + private static void createWiremockServer() { + wireMockServer = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort()); + wireMockServer.start(); + remoteServiceUrl = wireMockServer.baseUrl(); + } +} diff --git a/quarkus/addons/knative/serving/integration-tests/src/test/resources/application.properties b/quarkus/addons/knative/serving/integration-tests/src/test/resources/application.properties new file mode 100644 index 00000000000..b1caeb3e053 --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/test/resources/application.properties @@ -0,0 +1 @@ +kogito.sw.functions.greet_with_timeout.timeout=2000 \ No newline at end of file diff --git a/quarkus/addons/knative/serving/integration-tests/src/test/resources/knative/quarkus-greeting.yaml b/quarkus/addons/knative/serving/integration-tests/src/test/resources/knative/quarkus-greeting.yaml new file mode 100644 index 00000000000..712d182b28e --- /dev/null +++ b/quarkus/addons/knative/serving/integration-tests/src/test/resources/knative/quarkus-greeting.yaml @@ -0,0 +1,54 @@ +apiVersion: serving.knative.dev/v1 +kind: Service +metadata: + annotations: + serving.knative.dev/creator: minikube-user + serving.knative.dev/lastModifier: minikube-user + creationTimestamp: '2022-08-17T13:58:53Z' + generation: 1 + name: serverless-workflow-greeting-quarkus + resourceVersion: '43817' + uid: 98530cb6-3274-4d0c-b654-a82645cda058 +spec: + template: + metadata: + annotations: + client.knative.dev/updateTimestamp: '2022-08-17T13:58:53Z' + client.knative.dev/user-image: kiegroup/serverless-workflow-greeting-quarkus:1.0 + creationTimestamp: + spec: + containerConcurrency: 0 + containers: + - image: kiegroup/serverless-workflow-greeting-quarkus:1.0 + name: user-container + readinessProbe: + successThreshold: 1 + tcpSocket: + port: 0 + resources: { } + enableServiceLinks: false + timeoutSeconds: 300 + traffic: + - latestRevision: true + percent: 100 +status: + address: + url: http://serverless-workflow-greeting-quarkus.test.svc.cluster.local + conditions: + - lastTransitionTime: '2022-08-17T13:59:00Z' + status: 'True' + type: ConfigurationsReady + - lastTransitionTime: '2022-08-17T13:59:00Z' + status: 'True' + type: Ready + - lastTransitionTime: '2022-08-17T13:59:00Z' + status: 'True' + type: RoutesReady + latestCreatedRevisionName: serverless-workflow-greeting-quarkus-00001 + latestReadyRevisionName: serverless-workflow-greeting-quarkus-00001 + observedGeneration: 1 + traffic: + - latestRevision: true + percent: 100 + revisionName: serverless-workflow-greeting-quarkus-00001 + url: http://serverless-workflow-greeting-quarkus.test.10.99.154.147.sslip.io diff --git a/quarkus/addons/knative/serving/pom.xml b/quarkus/addons/knative/serving/pom.xml index ea7cd0d019d..97dec1b59d4 100644 --- a/quarkus/addons/knative/serving/pom.xml +++ b/quarkus/addons/knative/serving/pom.xml @@ -14,6 +14,7 @@ deployment runtime + integration-tests pom diff --git a/quarkus/addons/knative/serving/runtime/pom.xml b/quarkus/addons/knative/serving/runtime/pom.xml index f57d7658ba1..d5919b1d466 100644 --- a/quarkus/addons/knative/serving/runtime/pom.xml +++ b/quarkus/addons/knative/serving/runtime/pom.xml @@ -22,6 +22,14 @@ org.kie.kogito kogito-serverless-workflow-runtime + + org.kie.kogito + kogito-serverless-workflow-rest-runtime + + + org.kie.kogito + kogito-rest-workitem + io.quarkus quarkus-vertx @@ -36,6 +44,11 @@ quarkus-junit5 test + + org.mockito + mockito-junit-jupiter + test + com.github.tomakehurst wiremock-jre8 diff --git a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeParamsDecorator.java b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeParamsDecorator.java new file mode 100644 index 00000000000..147629fdcb8 --- /dev/null +++ b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeParamsDecorator.java @@ -0,0 +1,49 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; + +import java.util.Map; + +import org.kie.kogito.event.cloudevents.utils.CloudEventUtils; +import org.kie.kogito.internal.process.runtime.KogitoWorkItem; +import org.kogito.workitem.rest.decorators.ParamsDecorator; + +import io.vertx.mutiny.ext.web.client.HttpRequest; + +import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8; +import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.ID; + +public final class CloudEventKnativeParamsDecorator implements ParamsDecorator { + + @Override + public void decorate(KogitoWorkItem workItem, Map parameters, HttpRequest request) { + Map cloudEvent = KnativeFunctionPayloadSupplier.getPayload(parameters); + + if (cloudEvent.get(ID) == null) { + String cloudEventId = generateCloudEventId(cloudEvent.hashCode(), workItem.getProcessInstanceStringId()); + cloudEvent.put(ID, cloudEventId); + parameters.put(ID, cloudEventId); + } + + CloudEventUtils.validateCloudEvent(cloudEvent); + + request.putHeader("Content-Type", APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8); + } + + private static String generateCloudEventId(int uniqueIdentifier, String processInstanceId) { + return processInstanceId + "_" + uniqueIdentifier; + } +} diff --git a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeServiceRequestClient.java b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeServiceRequestClient.java deleted file mode 100644 index 09d0df7cdff..00000000000 --- a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeServiceRequestClient.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Copyright 2023 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; - -import java.net.URI; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; - -import javax.annotation.PreDestroy; -import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; - -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.kie.kogito.event.cloudevents.utils.CloudEventUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.JsonNode; - -import io.vertx.core.json.JsonObject; -import io.vertx.mutiny.core.Vertx; -import io.vertx.mutiny.core.buffer.Buffer; -import io.vertx.mutiny.ext.web.client.HttpRequest; -import io.vertx.mutiny.ext.web.client.HttpResponse; -import io.vertx.mutiny.ext.web.client.WebClient; - -@ApplicationScoped -class CloudEventKnativeServiceRequestClient extends KnativeServiceRequestClient { - - private static final Logger logger = LoggerFactory.getLogger(CloudEventKnativeServiceRequestClient.class); - - private static final String ID = "id"; - - private final WebClient webClient; - - private final Duration requestTimeout; - - @Inject - CloudEventKnativeServiceRequestClient(Vertx vertx, - @ConfigProperty(name = REQUEST_TIMEOUT_PROPERTY_NAME) Optional requestTimeout) { - this.webClient = WebClient.create(vertx); - this.requestTimeout = Duration.ofMillis(requestTimeout.orElse(DEFAULT_REQUEST_TIMEOUT_VALUE)); - } - - @Override - protected JsonNode sendRequest(String processInstanceId, URI serviceAddress, String path, Map cloudEvent) { - HttpRequest request; - if (serviceAddress.getPort() >= 0) { - request = webClient.post(serviceAddress.getPort(), serviceAddress.getHost(), path); - } else { - request = webClient.post(serviceAddress.getHost(), path); - } - request.putHeader("Content-Type", APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8) - .ssl("https".equals(serviceAddress.getScheme())); - - if (cloudEvent.get(ID) == null) { - cloudEvent = createCloudEventWithGeneratedId(cloudEvent, processInstanceId); - } - - CloudEventUtils.validateCloudEvent(cloudEvent); - - JsonObject body = new JsonObject(cloudEvent); - - logger.debug("Sending request with CloudEvent - host: {}, port: {}, path: {}, CloudEvent: {}", - serviceAddress.getHost(), serviceAddress.getPort(), path, body); - - HttpResponse response = request.sendBuffer(Buffer.buffer(body.encode())).await().atMost(requestTimeout); - - return responseAsJsonObject(response); - } - - private static Map createCloudEventWithGeneratedId(Map cloudEvent, String processInstanceId) { - Map modifiableCloudEvent = ensureModifiable(cloudEvent); - modifiableCloudEvent.put(ID, generateCloudEventId(processInstanceId, cloudEvent)); - return modifiableCloudEvent; - } - - private static Map ensureModifiable(Map map) { - return map instanceof HashMap ? map : new HashMap<>(map); - } - - static String generateCloudEventId(String processInstanceId, Map cloudEvent) { - return processInstanceId + "_" + cloudEvent.hashCode(); - } - - @PreDestroy - void preDestroy() { - webClient.close(); - } -} diff --git a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeFunctionPayloadSupplier.java b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeFunctionPayloadSupplier.java new file mode 100644 index 00000000000..a5f2bd5fa9a --- /dev/null +++ b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeFunctionPayloadSupplier.java @@ -0,0 +1,45 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +import static java.util.stream.Collectors.toMap; +import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.PAYLOAD_FIELDS_DELIMITER; +import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.PAYLOAD_FIELDS_PROPERTY_NAME; + +final class KnativeFunctionPayloadSupplier { + + private KnativeFunctionPayloadSupplier() { + } + + static Map getPayload(Map parameters) { + return getPayloadFields(parameters).stream() + .collect(toMap(Function.identity(), parameters::get)); + } + + private static List getPayloadFields(Map parameters) { + String payloadFields = (String) parameters.remove(PAYLOAD_FIELDS_PROPERTY_NAME); + if (payloadFields != null) { + return Arrays.asList(payloadFields.split(PAYLOAD_FIELDS_DELIMITER)); + } else { + return List.of(); + } + } +} diff --git a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServerlessWorkflowCustomFunction.java b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServerlessWorkflowCustomFunction.java deleted file mode 100644 index adf00518c61..00000000000 --- a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServerlessWorkflowCustomFunction.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright 2022 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; - -import java.net.URI; -import java.util.Map; - -import javax.enterprise.context.ApplicationScoped; -import javax.enterprise.inject.Instance; -import javax.inject.Inject; - -import org.kie.kogito.addons.k8s.resource.catalog.DefaultKubernetesServiceCatalogFactory; -import org.kie.kogito.addons.k8s.resource.catalog.KubernetesProtocol; -import org.kie.kogito.addons.k8s.resource.catalog.KubernetesServiceCatalog; -import org.kie.kogito.addons.k8s.resource.catalog.KubernetesServiceCatalogKey; -import org.kie.kogito.process.workitem.WorkItemExecutionException; - -import com.fasterxml.jackson.databind.JsonNode; - -/** - * Implementation of a Serverless Workflow custom function to invoke Knative services. - * - * @see Serverless Workflow specification - Defining custom function types - */ -@ApplicationScoped -final class KnativeServerlessWorkflowCustomFunction { - - private final KubernetesServiceCatalog kubernetesServiceCatalog; - - private final KnativeServiceRequestClientResolver knativeServiceRequestClientResolver; - - @Inject - KnativeServerlessWorkflowCustomFunction(Instance kubernetesServiceCatalog, - KnativeServiceRequestClientResolver knativeServiceRequestClientResolver) { - this.kubernetesServiceCatalog = kubernetesServiceCatalog.isUnsatisfied() - ? DefaultKubernetesServiceCatalogFactory.createKubernetesServiceCatalog() - : kubernetesServiceCatalog.get(); - - this.knativeServiceRequestClientResolver = knativeServiceRequestClientResolver; - } - - JsonNode execute(String processInstanceId, String operationString, Map arguments) { - URI serviceAddress = getServiceAddress(operationString); - - Operation operation = Operation.parse(operationString); - - return knativeServiceRequestClientResolver.resolve(operation).execute( - processInstanceId, - serviceAddress, - operation.getPath(), - arguments); - } - - private URI getServiceAddress(String operation) { - String service = operation.split("\\?")[0]; - - return kubernetesServiceCatalog.getServiceAddress(new KubernetesServiceCatalogKey(KubernetesProtocol.KNATIVE, service)) - .orElseThrow(() -> new WorkItemExecutionException("The Knative service '" + service - + "' could not be found.")); - } -} diff --git a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServiceRequestClient.java b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServiceRequestClient.java deleted file mode 100644 index c4910bff209..00000000000 --- a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServiceRequestClient.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Copyright 2023 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; - -import java.net.URI; -import java.util.Map; -import java.util.Objects; - -import org.kie.kogito.jackson.utils.JsonObjectUtils; -import org.kie.kogito.jackson.utils.ObjectMapperFactory; -import org.kie.kogito.process.workitem.WorkItemExecutionException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; - -import io.vertx.core.json.Json; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; -import io.vertx.mutiny.core.buffer.Buffer; -import io.vertx.mutiny.ext.web.client.HttpResponse; - -abstract class KnativeServiceRequestClient { - - private static final Logger logger = LoggerFactory.getLogger(KnativeServiceRequestClient.class); - - protected static final long DEFAULT_REQUEST_TIMEOUT_VALUE = 10_000L; - - static final String APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8 = "application/cloudevents+json; charset=UTF-8"; - - static final String REQUEST_TIMEOUT_PROPERTY_NAME = "kogito.addon.knative-serving.request-timeout"; - - /** - * Invokes a Knative service using the specified payload. - * - * @param processInstanceId process instance ID. - * @param serviceAddress address of the Knative service - * @param path resource path - * @param payload the payload - * @return a {@link JsonNode} that represents the response payload - */ - JsonNode execute(String processInstanceId, URI serviceAddress, String path, Map payload) { - Objects.requireNonNull(serviceAddress, "serviceAddress is a mandatory parameter"); - Objects.requireNonNull(path, "path is a mandatory parameter"); - - return sendRequest(processInstanceId, serviceAddress, path, payload); - } - - protected final JsonNode responseAsJsonObject(HttpResponse response) { - if (logger.isDebugEnabled()) { - logger.debug("Response - status code: {}, body: {}", response.statusCode(), response.bodyAsString()); - } - - if (response.statusCode() < 200 || response.statusCode() >= 300) { - throw new WorkItemExecutionException(Integer.toString(response.statusCode()), response.statusMessage()); - } else { - return getJsonNode(Json.decodeValue(response.body().getDelegate())); - } - } - - private JsonNode getJsonNode(Object json) { - if (json instanceof JsonObject) { - return JsonObjectUtils.fromValue(((JsonObject) json).getMap()); - } else if (json instanceof JsonArray) { - ArrayNode jsonArray = ObjectMapperFactory.listenerAware().createArrayNode(); - for (Object item : ((JsonArray) json)) { - jsonArray.add(getJsonNode(item)); - } - return jsonArray; - } else { - return JsonObjectUtils.fromValue(json); - } - } - - protected abstract JsonNode sendRequest(String processInstanceId, URI serviceAddress, String path, - Map payload); -} diff --git a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServiceRequestClientResolver.java b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServiceRequestClientResolver.java deleted file mode 100644 index 458a914716e..00000000000 --- a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServiceRequestClientResolver.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Copyright 2023 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; - -import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; - -@ApplicationScoped -class KnativeServiceRequestClientResolver { - - private final CloudEventKnativeServiceRequestClient cloudEventClient; - - private final PlainJsonKnativeServiceRequestClient plainJsonClient; - - @Inject - KnativeServiceRequestClientResolver(CloudEventKnativeServiceRequestClient cloudEventClient, - PlainJsonKnativeServiceRequestClient plainJsonClient) { - this.cloudEventClient = cloudEventClient; - this.plainJsonClient = plainJsonClient; - } - - KnativeServiceRequestClient resolve(Operation operation) { - return operation.isCloudEvent() ? cloudEventClient : plainJsonClient; - } -} diff --git a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeWorkItemHandler.java b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeWorkItemHandler.java index d1a65e04873..696c2685359 100644 --- a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeWorkItemHandler.java +++ b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeWorkItemHandler.java @@ -15,32 +15,63 @@ */ package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; +import java.net.URI; import java.util.Map; -import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; - +import org.kie.kogito.addons.k8s.resource.catalog.KubernetesServiceCatalog; +import org.kie.kogito.addons.k8s.resource.catalog.KubernetesServiceCatalogKey; import org.kie.kogito.internal.process.runtime.KogitoWorkItem; -import org.kie.kogito.serverless.workflow.WorkflowWorkItemHandler; +import org.kie.kogito.internal.process.runtime.KogitoWorkItemManager; +import org.kie.kogito.process.workitem.WorkItemExecutionException; +import org.kogito.workitem.rest.RestWorkItemHandler; + +import io.vertx.mutiny.ext.web.client.WebClient; + +import static org.kie.kogito.addons.k8s.resource.catalog.KubernetesProtocol.KNATIVE; + +public final class KnativeWorkItemHandler extends RestWorkItemHandler { -@ApplicationScoped -public final class KnativeWorkItemHandler extends WorkflowWorkItemHandler { + public static final String APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8 = "application/cloudevents+json; charset=UTF-8"; public static final String NAME = "knative"; - public static final String OPERATION_PROPERTY_NAME = "operation"; + public static final String ID = "id"; + + public static final String PATH_PROPERTY_NAME = "knative_function_path"; + + public static final String SERVICE_PROPERTY_NAME = "knative_function_service"; + + public static final String PAYLOAD_FIELDS_PROPERTY_NAME = "knative_function_payload_fields"; - private final KnativeServerlessWorkflowCustomFunction customFunction; + public static final String PAYLOAD_FIELDS_DELIMITER = ";"; - @Inject - public KnativeWorkItemHandler(KnativeServerlessWorkflowCustomFunction customFunction) { - this.customFunction = customFunction; + public static final String CLOUDEVENT_SENT_AS_PLAIN_JSON_ERROR_MESSAGE = "A Knative custom function argument cannot be a CloudEvent when the 'asCloudEvent' property are not set to 'true'"; + + private final KubernetesServiceCatalog kubernetesServiceCatalog; + + public KnativeWorkItemHandler(WebClient httpClient, WebClient httpsClient, KubernetesServiceCatalog kubernetesServiceCatalog) { + super(httpClient, httpsClient); + this.kubernetesServiceCatalog = kubernetesServiceCatalog; } @Override - protected Object internalExecute(KogitoWorkItem workItem, Map arguments) { - var operation = (String) workItem.getNodeInstance().getNode().getMetaData().get(OPERATION_PROPERTY_NAME); - return customFunction.execute(workItem.getProcessInstanceStringId(), operation, arguments); + public void executeWorkItem(KogitoWorkItem workItem, KogitoWorkItemManager manager) { + Map parameters = workItem.getParameters(); + parameters.put(RestWorkItemHandler.URL, getUrl(parameters)); + super.executeWorkItem(workItem, manager); + } + + private String getUrl(Map parameters) { + return getServiceAddress(parameters) + parameters.remove(PATH_PROPERTY_NAME); + } + + private String getServiceAddress(Map parameters) { + String service = (String) parameters.remove(SERVICE_PROPERTY_NAME); + + return kubernetesServiceCatalog.getServiceAddress(new KubernetesServiceCatalogKey(KNATIVE, service)) + .map(URI::toString) + .orElseThrow(() -> new WorkItemExecutionException("The Knative service '" + service + + "' could not be found.")); } @Override diff --git a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeWorkItemHandlerProducer.java b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeWorkItemHandlerProducer.java new file mode 100644 index 00000000000..3e88d6676ef --- /dev/null +++ b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeWorkItemHandlerProducer.java @@ -0,0 +1,48 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; + +import javax.enterprise.inject.Produces; +import javax.inject.Inject; + +import org.kie.kogito.addons.k8s.resource.catalog.KubernetesServiceCatalog; +import org.kogito.workitem.rest.RestWorkItemHandlerUtils; + +import io.vertx.ext.web.client.WebClientOptions; +import io.vertx.mutiny.core.Vertx; +import io.vertx.mutiny.ext.web.client.WebClient; + +public final class KnativeWorkItemHandlerProducer { + + @Inject + Vertx vertx; + + @Inject + KubernetesServiceCatalog kubernetesServiceCatalog; + + @Produces + KnativeWorkItemHandler createKnativeWorkItemHandler() { + return new KnativeWorkItemHandler(getHttpClient(), getHttpsClient(), kubernetesServiceCatalog); + } + + private WebClient getHttpsClient() { + return WebClient.create(vertx, RestWorkItemHandlerUtils.sslWebClientOptions()); + } + + private WebClient getHttpClient() { + return WebClient.create(vertx, new WebClientOptions()); + } +} diff --git a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/Operation.java b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/Operation.java index ec1cad610b8..95b1c995382 100644 --- a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/Operation.java +++ b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/Operation.java @@ -19,7 +19,7 @@ import java.util.Map; import java.util.Objects; -class Operation { +public final class Operation { static final String CLOUD_EVENT_PARAMETER_NAME = "asCloudEvent"; @@ -37,19 +37,19 @@ private Operation(Builder builder) { this.isCloudEvent = builder.isCloudEvent; } - String getService() { + public String getService() { return service; } - String getPath() { + public String getPath() { return path; } - boolean isCloudEvent() { + public boolean isCloudEvent() { return isCloudEvent; } - static Operation parse(String value) { + public static Operation parse(String value) { String[] parts = value.split("\\?", 2); String[] query = parts.length > 1 ? parts[1].split("&") : new String[0]; @@ -66,7 +66,7 @@ static Operation parse(String value) { .build(); } - static Builder builder() { + public static Builder builder() { return new Builder(); } @@ -89,7 +89,7 @@ public int hashCode() { return Objects.hash(service, path, isCloudEvent); } - static class Builder { + public static class Builder { private String service; @@ -100,22 +100,22 @@ static class Builder { private Builder() { } - Builder withService(String service) { + public Builder withService(String service) { this.service = service; return this; } - Builder withPath(String path) { + public Builder withPath(String path) { this.path = path; return this; } - Builder withIsCloudEvent(boolean isCloudEvent) { + public Builder withIsCloudEvent(boolean isCloudEvent) { this.isCloudEvent = isCloudEvent; return this; } - Operation build() { + public Operation build() { return new Operation(this); } } diff --git a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/PlainJsonKnativeParamsDecorator.java b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/PlainJsonKnativeParamsDecorator.java new file mode 100644 index 00000000000..35f3c77d769 --- /dev/null +++ b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/PlainJsonKnativeParamsDecorator.java @@ -0,0 +1,43 @@ +/* + * Copyright 2023 Red Hat, Inc. and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; + +import java.util.List; +import java.util.Map; + +import org.kie.kogito.event.cloudevents.utils.CloudEventUtils; +import org.kie.kogito.internal.process.runtime.KogitoWorkItem; +import org.kogito.workitem.rest.decorators.ParamsDecorator; + +import io.vertx.mutiny.ext.web.client.HttpRequest; + +import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.CLOUDEVENT_SENT_AS_PLAIN_JSON_ERROR_MESSAGE; +import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeWorkItemHandler.ID; + +public final class PlainJsonKnativeParamsDecorator implements ParamsDecorator { + + @Override + public void decorate(KogitoWorkItem workItem, Map parameters, HttpRequest request) { + if (isCloudEvent(KnativeFunctionPayloadSupplier.getPayload(parameters))) { + throw new IllegalArgumentException(CLOUDEVENT_SENT_AS_PLAIN_JSON_ERROR_MESSAGE); + } + } + + private static boolean isCloudEvent(Map payload) { + List cloudEventMissingAttributes = CloudEventUtils.getMissingAttributes(payload); + return !payload.isEmpty() && (cloudEventMissingAttributes.isEmpty() || (cloudEventMissingAttributes.size() == 1 && cloudEventMissingAttributes.contains(ID))); + } +} diff --git a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/PlainJsonKnativeServiceRequestClient.java b/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/PlainJsonKnativeServiceRequestClient.java deleted file mode 100644 index 1cd2b07fba0..00000000000 --- a/quarkus/addons/knative/serving/runtime/src/main/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/PlainJsonKnativeServiceRequestClient.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2023 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; - -import java.net.URI; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.Optional; - -import javax.annotation.PreDestroy; -import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; - -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.kie.kogito.event.cloudevents.utils.CloudEventUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.fasterxml.jackson.databind.JsonNode; - -import io.vertx.core.json.JsonObject; -import io.vertx.mutiny.core.Vertx; -import io.vertx.mutiny.core.buffer.Buffer; -import io.vertx.mutiny.ext.web.client.HttpRequest; -import io.vertx.mutiny.ext.web.client.HttpResponse; -import io.vertx.mutiny.ext.web.client.WebClient; - -import static org.kie.kogito.serverless.workflow.SWFConstants.CONTENT_DATA; - -@ApplicationScoped -class PlainJsonKnativeServiceRequestClient extends KnativeServiceRequestClient { - - private static final Logger logger = LoggerFactory.getLogger(PlainJsonKnativeServiceRequestClient.class); - - static final String CLOUDEVENT_SENT_AS_PLAIN_JSON_ERROR_MESSAGE = "A Knative custom function argument cannot be a CloudEvent when the 'asCloudEvent' property are not set to 'true'"; - - private final WebClient webClient; - - private final Duration requestTimeout; - - @Inject - PlainJsonKnativeServiceRequestClient(Vertx vertx, - @ConfigProperty(name = REQUEST_TIMEOUT_PROPERTY_NAME) Optional requestTimeout) { - this.webClient = WebClient.create(vertx); - this.requestTimeout = Duration.ofMillis(requestTimeout.orElse(DEFAULT_REQUEST_TIMEOUT_VALUE)); - } - - @Override - protected JsonNode sendRequest(String processInstanceId, URI serviceAddress, String path, - Map payload) { - HttpRequest request; - if (serviceAddress.getPort() >= 0) { - request = webClient.post(serviceAddress.getPort(), serviceAddress.getHost(), path); - } else { - request = webClient.post(serviceAddress.getHost(), path); - } - request.ssl("https".equals(serviceAddress.getScheme())); - - HttpResponse response; - - if (payload.isEmpty()) { - logger.debug("Sending request with empty body - host: {}, port: {}, path: {}", serviceAddress.getHost(), - serviceAddress.getPort(), path); - - response = request.send().await().atMost(requestTimeout); - } else { - Object body; - if (payload.size() == 1 && payload.containsKey(CONTENT_DATA)) { - body = payload.get(CONTENT_DATA); - } else { - validatePayload(payload); - body = new JsonObject(payload); - } - - logger.debug("Sending request with body - host: {}, port: {}, path: {}, body: {}", serviceAddress.getHost(), - serviceAddress.getPort(), path, body); - - response = request.sendJson(body).await().atMost(requestTimeout); - } - - return responseAsJsonObject(response); - } - - private void validatePayload(Map payload) { - List missingAttributes = CloudEventUtils.getMissingAttributes(payload); - if (missingAttributes.isEmpty() || (missingAttributes.size() == 1 && missingAttributes.contains("id"))) { - throw new IllegalArgumentException(CLOUDEVENT_SENT_AS_PLAIN_JSON_ERROR_MESSAGE); - } - } - - @PreDestroy - void preDestroy() { - webClient.close(); - } -} diff --git a/quarkus/addons/knative/serving/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeServiceRequestClientTest.java b/quarkus/addons/knative/serving/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeServiceRequestClientTest.java deleted file mode 100644 index 98e2c18678a..00000000000 --- a/quarkus/addons/knative/serving/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/CloudEventKnativeServiceRequestClientTest.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Copyright 2023 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; - -import java.net.URI; -import java.util.HashMap; -import java.util.Map; - -import javax.inject.Inject; - -import org.assertj.core.api.Assertions; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.kie.kogito.event.cloudevents.utils.InvalidCloudEventException; - -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.github.tomakehurst.wiremock.WireMockServer; -import com.github.tomakehurst.wiremock.core.WireMockConfiguration; - -import io.cloudevents.SpecVersion; -import io.cloudevents.core.v1.CloudEventV1; -import io.quarkus.test.junit.QuarkusTest; - -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; -import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath; -import static com.github.tomakehurst.wiremock.client.WireMock.post; -import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; -import static io.cloudevents.core.v1.CloudEventV1.DATACONTENTTYPE; -import static io.cloudevents.core.v1.CloudEventV1.ID; -import static io.cloudevents.core.v1.CloudEventV1.SOURCE; -import static io.cloudevents.core.v1.CloudEventV1.SPECVERSION; -import static io.cloudevents.core.v1.CloudEventV1.TIME; -import static io.cloudevents.core.v1.CloudEventV1.TYPE; -import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeServiceRequestClient.APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8; -import static org.kie.kogito.event.cloudevents.utils.CloudEventUtils.DATA; - -@QuarkusTest -class CloudEventKnativeServiceRequestClientTest { - - private static WireMockServer wireMockServer; - - @Inject - CloudEventKnativeServiceRequestClient client; - - @Test - void cloudEventWithoutIdMustHaveGeneratedId() { - mockServer(); - - String processInstanceId = "process1"; - String source = "https://localhost:8080"; - Map cloudEvent = Map.of( - CloudEventV1.SPECVERSION, 1.0, - "source", source, - "type", "org.kie.kogito.test", - "data", Map.of( - "org", "Acme", - "project", "Kogito")); - - client.sendRequest(processInstanceId, URI.create(wireMockServer.baseUrl()), "/cloud-event", cloudEvent); - - String expectedCloudEventId = CloudEventKnativeServiceRequestClient.generateCloudEventId(processInstanceId, cloudEvent); - - wireMockServer.verify(postRequestedFor(urlEqualTo("/cloud-event")) - .withRequestBody(matchingJsonPath("$.id", equalTo(expectedCloudEventId))) - .withHeader("Content-Type", equalTo(APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8))); - } - - @Test - void cloudEventWithIdMustBeSentAsIs() { - mockServer(); - - String processInstanceId = "process1"; - Map cloudEvent = Map.of( - CloudEventV1.SPECVERSION, 1.0, - "id", 42, - "source", "https://localhost:8080", - "type", "org.kie.kogito.test", - "data", Map.of( - "org", "Acme", - "project", "Kogito")); - - client.sendRequest(processInstanceId, URI.create(wireMockServer.baseUrl()), "/cloud-event", cloudEvent); - - wireMockServer.verify(postRequestedFor(urlEqualTo("/cloud-event")) - .withRequestBody(matchingJsonPath("$.id", equalTo("42"))) - .withHeader("Content-Type", equalTo(APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8))); - } - - @Test - void invalidCloudEventMustThrowException() { - String processInstanceId = "process1"; - - Map cloudEvent = new HashMap<>(); - cloudEvent.put(SPECVERSION, SpecVersion.V1.toString()); - cloudEvent.put(ID, "abc-123"); - cloudEvent.put(SOURCE, "/myapp"); - cloudEvent.put(TYPE, "com.example.someevent"); - cloudEvent.put(DATACONTENTTYPE, "application/json"); - cloudEvent.put(TIME, "invalid"); - cloudEvent.put(DATA, "{\"foo\":\"bar\"}"); - - URI uri = URI.create(wireMockServer.baseUrl()); - - Assertions.assertThatExceptionOfType(InvalidCloudEventException.class) - .isThrownBy(() -> client.sendRequest(processInstanceId, uri, "/cloud-event", cloudEvent)); - } - - private static void mockServer() { - wireMockServer.stubFor(post(urlEqualTo("/cloud-event")) - .willReturn(aResponse() - .withStatus(200) - .withHeader("Content-Type", APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8) - .withJsonBody(JsonNodeFactory.instance.objectNode() - .put("message", "CloudEvents are awesome!") - .set("object", JsonNodeFactory.instance.objectNode() - .put("long", 42L) - .put("String", "Knowledge is everything"))))); - } - - private static void createWiremockServer() { - wireMockServer = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort()); - wireMockServer.start(); - } - - @BeforeAll - static void beforeAll() { - createWiremockServer(); - } - - @AfterAll - static void afterAll() { - if (wireMockServer != null) { - wireMockServer.stop(); - } - } -} \ No newline at end of file diff --git a/quarkus/addons/knative/serving/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServerlessWorkflowCustomFunctionTest.java b/quarkus/addons/knative/serving/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServerlessWorkflowCustomFunctionTest.java deleted file mode 100644 index 45344199c48..00000000000 --- a/quarkus/addons/knative/serving/runtime/src/test/java/org/kie/kogito/addons/quarkus/knative/serving/customfunctions/KnativeServerlessWorkflowCustomFunctionTest.java +++ /dev/null @@ -1,384 +0,0 @@ -/* - * Copyright 2022 Red Hat, Inc. and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.kie.kogito.addons.quarkus.knative.serving.customfunctions; - -import java.time.Instant; -import java.util.List; -import java.util.Map; - -import javax.inject.Inject; - -import org.eclipse.microprofile.config.inject.ConfigProperty; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.kie.kogito.event.cloudevents.utils.InvalidCloudEventException; -import org.kie.kogito.process.workitem.WorkItemExecutionException; -import org.kie.kogito.serverless.workflow.SWFConstants; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.github.tomakehurst.wiremock.WireMockServer; -import com.github.tomakehurst.wiremock.core.WireMockConfiguration; - -import io.cloudevents.SpecVersion; -import io.cloudevents.core.v1.CloudEventV1; -import io.fabric8.kubernetes.client.server.mock.KubernetesServer; -import io.quarkus.test.junit.QuarkusTest; -import io.quarkus.test.kubernetes.client.KubernetesTestServer; -import io.quarkus.test.kubernetes.client.WithKubernetesTestServer; -import io.smallrye.mutiny.TimeoutException; - -import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; -import static com.github.tomakehurst.wiremock.client.WireMock.equalTo; -import static com.github.tomakehurst.wiremock.client.WireMock.equalToJson; -import static com.github.tomakehurst.wiremock.client.WireMock.matchingJsonPath; -import static com.github.tomakehurst.wiremock.client.WireMock.post; -import static com.github.tomakehurst.wiremock.client.WireMock.postRequestedFor; -import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; -import static org.assertj.core.api.Assertions.assertThatNoException; -import static org.kie.kogito.addons.quarkus.k8s.test.utils.KnativeResourceDiscoveryTestUtil.createServiceIfNotExists; -import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeServiceRequestClient.APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8; -import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.KnativeServiceRequestClient.REQUEST_TIMEOUT_PROPERTY_NAME; -import static org.kie.kogito.addons.quarkus.knative.serving.customfunctions.PlainJsonKnativeServiceRequestClient.CLOUDEVENT_SENT_AS_PLAIN_JSON_ERROR_MESSAGE; - -@QuarkusTest -@WithKubernetesTestServer -class KnativeServerlessWorkflowCustomFunctionTest { - - private static final String UNUSED = "unused"; - - private static final String NAMESPACE = "test"; - - private static final String SERVICENAME = "serverless-workflow-greeting-quarkus"; - - private static final String CLOUD_EVENT_PATH = "/cloud-event"; - - private static String remoteServiceUrl; - - @KubernetesTestServer - KubernetesServer mockServer; - - @ConfigProperty(name = REQUEST_TIMEOUT_PROPERTY_NAME) - Long requestTimeout; - - @Inject - KnativeServerlessWorkflowCustomFunction knativeServerlessWorkflowCustomFunction; - - private static WireMockServer wireMockServer; - - @BeforeAll - static void beforeAll() { - createWiremockServer(); - } - - @BeforeEach - void beforeEach() { - createServiceIfNotExists(mockServer, "knative/quarkus-greeting.yaml", NAMESPACE, SERVICENAME, remoteServiceUrl); - } - - @AfterAll - static void afterAll() { - if (wireMockServer != null) { - wireMockServer.stop(); - } - } - - private static void createWiremockServer() { - wireMockServer = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort()); - wireMockServer.start(); - remoteServiceUrl = wireMockServer.baseUrl(); - } - - private void mockExecuteTimeoutEndpoint() { - wireMockServer.stubFor(post(urlEqualTo("/timeout")) - .willReturn(aResponse() - .withFixedDelay(requestTimeout.intValue() + 500) - .withStatus(200))); - } - - private void mockExecute404Endpoint() { - wireMockServer.stubFor(post(urlEqualTo("/non_existing_path")) - .willReturn(aResponse() - .withStatus(404))); - } - - private void mockExecuteWithQueryParametersEndpoint() { - wireMockServer.stubFor(post(urlEqualTo("/hello")) - .willReturn(aResponse() - .withStatus(200) - .withHeader("Content-Type", "application/json") - .withJsonBody(JsonNodeFactory.instance.objectNode() - .put("message", "Hello Kogito")))); - } - - private void mockExecuteWithParametersEndpoint() { - wireMockServer.stubFor(post(urlEqualTo("/")) - .withRequestBody(equalToJson(JsonNodeFactory.instance.objectNode() - .put("org", "Acme") - .put("project", "Kogito") - .toString())) - .willReturn(aResponse() - .withStatus(200) - .withHeader("Content-Type", "application/json") - .withJsonBody(JsonNodeFactory.instance.objectNode() - .put("message", "Kogito is awesome!") - .set("object", JsonNodeFactory.instance.objectNode() - .put("long", 42L) - .put("String", "Knowledge is everything"))))); - } - - private void mockExecuteWithArray() { - wireMockServer.stubFor(post(urlEqualTo("/")) - .withRequestBody(equalToJson(JsonNodeFactory.instance.arrayNode() - .add("Javierito") - .add("Pepito") - .toString())) - .willReturn(aResponse() - .withStatus(200) - .withHeader("Content-Type", "application/json") - .withJsonBody(JsonNodeFactory.instance.arrayNode().add(23).add(24)))); - } - - private void mockExecuteCloudEventWithParametersEndpoint() { - wireMockServer.stubFor(post(urlEqualTo(CLOUD_EVENT_PATH)) - .willReturn(aResponse() - .withStatus(200) - .withHeader("Content-Type", APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8) - .withJsonBody(JsonNodeFactory.instance.objectNode() - .put("message", "CloudEvents are awesome!") - .set("object", JsonNodeFactory.instance.objectNode() - .put("long", 42L) - .put("String", "Knowledge is everything"))))); - } - - private void mockExecuteWithEmptyParametersEndpoint() { - wireMockServer.stubFor(post(urlEqualTo("/")) - .willReturn(aResponse() - .withStatus(200) - .withHeader("Content-Type", "application/json") - .withJsonBody(JsonNodeFactory.instance.objectNode() - .put("org", "Acme") - .put("project", "Kogito")))); - } - - @Test - void executeWithEmptyParameters() { - mockExecuteWithEmptyParametersEndpoint(); - - JsonNode output = knativeServerlessWorkflowCustomFunction.execute("unused", SERVICENAME, Map.of()); - - JsonNode expected = JsonNodeFactory.instance.objectNode() - .put("org", "Acme") - .put("project", "Kogito"); - - assertThat(output).isEqualTo(expected); - } - - @Test - void executeWithParameters() { - mockExecuteWithParametersEndpoint(); - - Map parameters = Map.of( - "org", "Acme", - "project", "Kogito"); - - JsonNode output = knativeServerlessWorkflowCustomFunction.execute(UNUSED, SERVICENAME, parameters); - - JsonNode expected = JsonNodeFactory.instance.objectNode() - .put("message", "Kogito is awesome!") - .set("object", JsonNodeFactory.instance.objectNode() - .put("long", 42L) - .put("String", "Knowledge is everything")); - - assertThat(output).hasToString(expected.toString()); - } - - @Test - void executeWithArray() { - mockExecuteWithArray(); - assertThat(knativeServerlessWorkflowCustomFunction.execute(UNUSED, SERVICENAME, Map.of( - SWFConstants.CONTENT_DATA, List.of("Javierito", "Pepito")))).hasToString(JsonNodeFactory.instance.arrayNode().add(23).add(24).toString()); - } - - @Test - void executeWithCloudEventWithIdAsPlainJson() { - mockExecuteWithParametersEndpoint(); - - Map cloudEvent = Map.of( - CloudEventV1.SPECVERSION, 1.0, // KnativeWorkItemHandler receives this attribute as a double - "id", 42, // KnativeWorkItemHandler receivers this attribute as an Integer - "source", "https://localhost:8080", - "type", "org.kie.kogito.test", - "data", Map.of( - "org", "Acme", - "project", "Kogito")); - - String processInstanceId = Instant.now().toString(); - - assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> knativeServerlessWorkflowCustomFunction.execute(processInstanceId, SERVICENAME, cloudEvent)) - .withMessage(CLOUDEVENT_SENT_AS_PLAIN_JSON_ERROR_MESSAGE); - } - - @Test - void executeWithCloudEventWithoutIdAsPlainJson() { - mockExecuteWithParametersEndpoint(); - - Map cloudEvent = Map.of( - CloudEventV1.SPECVERSION, 1.0, // KnativeWorkItemHandler receives this attribute as a double - "source", "https://localhost:8080", - "type", "org.kie.kogito.test", - "data", Map.of( - "org", "Acme", - "project", "Kogito")); - - String processInstanceId = Instant.now().toString(); - - assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> knativeServerlessWorkflowCustomFunction.execute(processInstanceId, SERVICENAME, cloudEvent)) - .withMessage(CLOUDEVENT_SENT_AS_PLAIN_JSON_ERROR_MESSAGE); - } - - @Test - void executeWithCloudEventThatHasOnlyIdMissingAsPlainJson() { - mockExecuteWithParametersEndpoint(); - - Map cloudEvent = Map.of( - CloudEventV1.SPECVERSION, 1.0, // KnativeWorkItemHandler receives this attribute as a double - "source", "https://localhost:8080", - "type", "org.kie.kogito.test", - "data", Map.of( - "org", "Acme", - "project", "Kogito")); - - String processInstanceId = Instant.now().toString(); - - assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> knativeServerlessWorkflowCustomFunction.execute(processInstanceId, SERVICENAME, cloudEvent)) - .withMessage(CLOUDEVENT_SENT_AS_PLAIN_JSON_ERROR_MESSAGE); - } - - @Test - void executeCloudEvent() { - mockExecuteCloudEventWithParametersEndpoint(); - - String source = "https://localhost:8080"; - - Map cloudEvent = Map.of( - CloudEventV1.SPECVERSION, 1.0, // KnativeWorkItemHandler receives this attribute as a double - "id", 42, // KnativeWorkItemHandler receivers this attribute as an Integer - "source", source, - "type", "org.kie.kogito.test", - "data", Map.of( - "org", "Acme", - "project", "Kogito")); - - String processInstanceId = Instant.now().toString(); - - JsonNode output = knativeServerlessWorkflowCustomFunction.execute( - processInstanceId, SERVICENAME + "?asCloudEvent=true&path=" + CLOUD_EVENT_PATH, cloudEvent); - - JsonNode expected = JsonNodeFactory.instance.objectNode() - .put("message", "CloudEvents are awesome!") - .set("object", JsonNodeFactory.instance.objectNode() - .put("long", 42L) - .put("String", "Knowledge is everything")); - - assertThat(output).hasToString(expected.toString()); - - wireMockServer.verify(postRequestedFor(urlEqualTo(CLOUD_EVENT_PATH)) - .withRequestBody(matchingJsonPath("$.id", equalTo("42"))) - .withHeader("Content-Type", equalTo(APPLICATION_CLOUDEVENTS_JSON_CHARSET_UTF_8))); - } - - @Test - void executeCloudEventWithMissingIdShouldNotThrowException() { - mockExecuteCloudEventWithParametersEndpoint(); - - String source = "https://localhost:8080"; - - Map cloudEvent = Map.of( - CloudEventV1.SPECVERSION, 1.0, // KnativeWorkItemHandler receives this attribute as a double - "source", source, - "type", "org.kie.kogito.test", - "data", Map.of( - "org", "Acme", - "project", "Kogito")); - - String operation = SERVICENAME + "?asCloudEvent=true&path=" + CLOUD_EVENT_PATH; - - assertThatNoException() - .isThrownBy(() -> knativeServerlessWorkflowCustomFunction.execute(UNUSED, operation, cloudEvent)); - } - - @Test - void executeWithInvalidCloudEvent() { - Map cloudEvent = Map.of( - CloudEventV1.SPECVERSION, SpecVersion.V1.toString(), - "source", "https://localhost:8080", - "data", Map.of( - "org", "Acme", - "project", "Kogito")); - - String operation = SERVICENAME + "?asCloudEvent=true&path=" + CLOUD_EVENT_PATH; - - assertThatExceptionOfType(InvalidCloudEventException.class) - .isThrownBy(() -> knativeServerlessWorkflowCustomFunction.execute(UNUSED, operation, cloudEvent)); - } - - @Test - void executeWithQueryParameters() { - mockExecuteWithQueryParametersEndpoint(); - - JsonNode output = knativeServerlessWorkflowCustomFunction.execute(UNUSED, SERVICENAME + "?path=/hello", Map.of()); - - JsonNode expected = JsonNodeFactory.instance.objectNode() - .put("message", "Hello Kogito"); - - assertThat(output).isEqualTo(expected); - } - - @Test - void execute404() { - mockExecute404Endpoint(); - - Map parameters = Map.of(); - - String operation = SERVICENAME + "?path=/non_existing_path"; - - assertThatCode(() -> knativeServerlessWorkflowCustomFunction.execute(UNUSED, operation, parameters)) - .isInstanceOf(WorkItemExecutionException.class) - .extracting("errorCode") - .isEqualTo("404"); - } - - @Test - void executeTimeout() { - mockExecuteTimeoutEndpoint(); - - Map payload = Map.of(); - - String operation = SERVICENAME + "?path=/timeout"; - - assertThatExceptionOfType(TimeoutException.class) - .isThrownBy(() -> knativeServerlessWorkflowCustomFunction.execute(UNUSED, operation, payload)); - } -} diff --git a/quarkus/addons/knative/serving/runtime/src/test/resources/application.properties b/quarkus/addons/knative/serving/runtime/src/test/resources/application.properties deleted file mode 100644 index 7eccc749364..00000000000 --- a/quarkus/addons/knative/serving/runtime/src/test/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ -kogito.addon.knative-serving.request-timeout=2000 \ No newline at end of file