From 75e618383d8f1e6b53f0f84aa308b1397749b0b3 Mon Sep 17 00:00:00 2001 From: Matheus Cruz Date: Mon, 12 Aug 2024 14:51:09 -0300 Subject: [PATCH] Add MongoDB correlation on Quarkus example --- README.md | 6 + .../quarkus/GrafanaDockerComposeIT.java | 2 +- serverless-workflow-examples/pom.xml | 1 + .../README.md | 154 ++++++++++++++ .../docker-compose/docker-compose.yml | 85 ++++++++ .../pom.xml | 199 ++++++++++++++++++ .../java/org/kie/kogito/examples/Account.java | 41 ++++ .../kie/kogito/examples/EventsService.java | 106 ++++++++++ .../kie/kogito/examples/WorkflowResource.java | 85 ++++++++ .../src/main/resources/application.properties | 101 +++++++++ .../src/main/resources/correlation.sw.json | 104 +++++++++ .../kie/kogito/examples/CorrelationIT.java | 91 ++++++++ .../src/test/resources/application.properties | 24 +++ 13 files changed, 998 insertions(+), 1 deletion(-) create mode 100644 serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/README.md create mode 100644 serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/docker-compose/docker-compose.yml create mode 100644 serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/pom.xml create mode 100644 serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/Account.java create mode 100644 serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/EventsService.java create mode 100644 serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/WorkflowResource.java create mode 100644 serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/resources/application.properties create mode 100644 serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/resources/correlation.sw.json create mode 100644 serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/test/java/org/kie/kogito/examples/CorrelationIT.java create mode 100644 serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/test/resources/application.properties diff --git a/README.md b/README.md index 4cc8f4fb1d..0ac3377164 100644 --- a/README.md +++ b/README.md @@ -141,6 +141,12 @@ A Serverless Workflow service that works as a Github bot application, which reac * [on Quarkus](serverless-workflow-examples/serverless-workflow-github-showcase) +## Serverless Workflow Correlation + +* [on Quarkus (JDBC)](serverless-workflow-examples/serverless-workflow-correlation-quarkus) +* [on Quarkus (MongoDB)](serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus) + + ## Other Misc Examples - Onboarding example combining 1 process and two decision services: see [README.md](kogito-quarkus-examples/onboarding-example/README.md) diff --git a/kogito-quarkus-examples/dmn-drools-quarkus-metrics/src/test/java/org/kie/kogito/examples/quarkus/GrafanaDockerComposeIT.java b/kogito-quarkus-examples/dmn-drools-quarkus-metrics/src/test/java/org/kie/kogito/examples/quarkus/GrafanaDockerComposeIT.java index 8642a55948..9407ebeac5 100644 --- a/kogito-quarkus-examples/dmn-drools-quarkus-metrics/src/test/java/org/kie/kogito/examples/quarkus/GrafanaDockerComposeIT.java +++ b/kogito-quarkus-examples/dmn-drools-quarkus-metrics/src/test/java/org/kie/kogito/examples/quarkus/GrafanaDockerComposeIT.java @@ -69,7 +69,7 @@ public class GrafanaDockerComposeIT { .withStartupTimeout(STARTUP_MINUTES_TIMEOUT)) .withLogConsumer("prometheus-1", new Slf4jLogConsumer(LOGGER)) .withPull(false) - .withLocalCompose(true); + .withLocalCompose(true); } catch (URISyntaxException e) { throw new RuntimeException(e); } diff --git a/serverless-workflow-examples/pom.xml b/serverless-workflow-examples/pom.xml index 962cbd298a..006dae010a 100644 --- a/serverless-workflow-examples/pom.xml +++ b/serverless-workflow-examples/pom.xml @@ -49,6 +49,7 @@ serverless-workflow-compensation-quarkus serverless-workflow-consuming-events-over-http-quarkus serverless-workflow-correlation-quarkus + serverless-workflow-correlation-mongodb-quarkus serverless-workflow-custom-function-knative serverless-workflow-custom-type serverless-workflow-data-index-persistence-addon-quarkus diff --git a/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/README.md b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/README.md new file mode 100644 index 0000000000..d901964f75 --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/README.md @@ -0,0 +1,154 @@ +# Kogito Serverless Workflow - Correlation with Callback Example + +## Description + +This example contains a workflow service to demonstrate correlation feature using callback states and events. +Each callback state withing the workflow publishes an event and wait for a response event, +there is an incoming event, it is matched with the proper workflow instance by using the correlation attribute, in this case it is the `userid`. So for every incoming event the userid is used to properly find and trigger the proper workflow instance. The correlation is defined in the [workflow definition file](src/main/resources/correlation.sw.json) that is described using JSON format as defined in the [CNCF Serverless Workflow specification](https://github.com/serverlessworkflow/specification). + +```json +"correlation": [ + { + "contextAttributeName": "userid" + } +] +``` +Events should be in CloudEvent format and the correlation attribute should be defined as an extension attribute, in this case `userid`. + +The workflow example is started by events as well, so a start event should be published with the same correlation attribute `userid, that will be used to match correlations for the started workflow instance. + +In the example the event broker used to publish/receive the events is Kafka, and the used topics are the same described as the event types in the workflow definition. + + +```json +{ + "name": "newAccountEvent", + "source": "", + "type": "newAccountEventType", + "correlation": [ + { + "contextAttributeName": "userid" + } + ] +} +``` +For simplicity, the events are published and consumed in the same application running the workflow, but in a real use case they should come from different services interacting with the workflow, see [EventsService](src/main/java/org/kie/kogito/examples/EventsService.java). + +To start the workflow as mentioned, it is required an event to be published which is going to be consumed by the workflow service starting a new instance. A helper REST endpoint was recreated to simplify this step, so once a POST request is received it publishes the start event to the broker see [WorkflowResource](src/main/java/org/kie/kogito/examples/WorkflowResource.java). + +All eventing configuration and the broker parameters are in done in the [application.properties](src/main/resources/application.properties). + +## Infrastructure requirements + +### Kafka + +This quickstart requires an Apache Kafka to be available and by default expects it to be on default port and localhost. + +* Install and Startup Kafka Server / Zookeeper + +https://kafka.apache.org/quickstart + +To publish and consume the event, topic "move" is used. + +Optionally and for convenience, a docker-compose [configuration file](docker-compose/docker-compose.yml) is +provided in the path [docker-compose/](docker-compose/), where you can just run the command from there: + +```sh +docker-compose up +``` + +In this way a container for Kafka will be started on port 9092. + +### MongoDB + +Alternatively, you can run this example using persistence with a MongoDB server. + +Configuration for setting up the connection can be found in [applications.properties](src/main/resources/application.properties) file, which +follows the Quarkus MongoDB Client settings, for more information please check [MongoDB Client Configuration Reference](https://quarkus.io/guides/mongodb#configuration-reference). + +Optionally and for convenience, a docker-compose [configuration file](docker-compose/docker-compose.yml) is +provided in the path [docker-compose/](docker-compose/), where you can just run the command from there: + +```sh +docker-compose up +``` + +## Installing and Running + +### Prerequisites + +You will need: + - Java 17+ installed + - Environment variable JAVA_HOME set accordingly + - Maven 3.9.6+ installed + +When using native image compilation, you will also need: + - [GraalVm](https://www.graalvm.org/downloads/) 19.3.1+ installed + - Environment variable GRAALVM_HOME set accordingly + - Note that GraalVM native image compilation typically requires other packages (glibc-devel, zlib-devel and gcc) to be installed too. You also need 'native-image' installed in GraalVM (using 'gu install native-image'). Please refer to [GraalVM installation documentation](https://www.graalvm.org/docs/reference-manual/aot-compilation/#prerequisites) for more details. + +### Compile and Run in Local Dev Mode + +```sh +mvn clean package quarkus:dev +``` + +### Compile and Run in JVM mode + +```sh +mvn clean package +java -jar target/quarkus-app/quarkus-run.jar +``` + +or on Windows + +```sh +mvn clean package +java -jar target\quarkus-app\quarkus-run.jar +``` + +### Compile and Run in JVM mode using PostgreSQL persistence + +To enable persistence, please append `-Ppersistence` to your Maven command. +That will ensure the correct dependencies are in place, and automatically set the required properties to connect +with the PostgreSQL instance from the provided docker compose. + +```sh +mvn clean package -Peristence +``` + +### Compile and Run using Local Native Image +Note that this requires GRAALVM_HOME to point to a valid GraalVM installation + +```sh +mvn clean package -Pnative +``` + +To run the generated native executable, generated in `target/`, execute + +```sh +./target/serverless-workflow-correlation-quarkus-{version}-runner +``` + +### Start a workflow + +The service based on the JSON workflow definition can be access by sending a request to http://localhost:8080/account/{userid} + +Complete curl command can be found below: + +```sh +curl -X POST -H 'Content-Type:application/json' -H 'Accept:application/json' http://localhost:8080/account/12345 +``` + +After a while (note that to you need give time for event to be consumed) you should see the log message printed in the console, and the workflow is completed. + +```text +2022-05-12 11:02:15,891 INFO [org.kie.kog.ser.eve.imp.ProcessEventDispatcher] (kogito-event-executor-0) Starting new process instance with signal 'newAccountEventType' +2022-05-12 11:02:18,909 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-9) SRMSG18256: Initialize record store for topic-partition 'validateAccountEmail-0' at position 16. +2022-05-12 11:02:18,919 INFO [org.kie.kog.exa.EventsService] (pool-1-thread-1) Validate Account received. Workflow data JsonCloudEventData{node={"email":"test@test.com","userId":"12345"}} +2022-05-12 11:02:19,931 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-5) SRMSG18256: Initialize record store for topic-partition 'validatedAccountEmail-0' at position 16. +2022-05-12 11:02:20,962 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-8) SRMSG18256: Initialize record store for topic-partition 'activateAccount-0' at position 16. +2022-05-12 11:02:20,971 INFO [org.kie.kog.exa.EventsService] (pool-1-thread-1) Activate Account received. Workflow data JsonCloudEventData{node={"email":"test@test.com","userId":"12345"}} +2022-05-12 11:02:21,994 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-6) SRMSG18256: Initialize record store for topic-partition 'activatedAccount-0' at position 7. +2022-05-12 11:02:22,006 INFO [org.kie.kog.exa.EventsService] (kogito-event-executor-0) Complete Account Creation received. Workflow data {"email":"test@test.com","userId":"12345"}, KogitoProcessInstanceId 0cef0eef-06c8-4433-baea-505fa8d45f68 +``` \ No newline at end of file diff --git a/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/docker-compose/docker-compose.yml b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/docker-compose/docker-compose.yml new file mode 100644 index 0000000000..24ea2caf84 --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/docker-compose/docker-compose.yml @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +version: "3" + +services: + zookeeper: + container_name: zookeeper + image: strimzi/kafka:0.20.1-kafka-2.6.0 + command: [ + "sh", "-c", + "bin/zookeeper-server-start.sh config/zookeeper.properties" + ] + ports: + - "2181:2181" + environment: + LOG_DIR: "/tmp/logs" + + kafka: + image: strimzi/kafka:0.20.1-kafka-2.6.0 + container_name: kafka + command: [ + "sh", "-c", + "bin/kafka-server-start.sh config/server.properties --override inter.broker.listener.name=$${KAFKA_INTER_BROKER_LISTENER_NAME} --override listener.security.protocol.map=$${KAFKA_LISTENER_SECURITY_PROTOCOL_MAP} --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" + ] + depends_on: + - zookeeper + ports: + - "9092:9092" + environment: + KAFKA_BROKER_ID: 0 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://kafka:9092 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + LOG_DIR: "/tmp/logs" + + mongodb: + image: mongo:latest + restart: always + container_name: mongo + ports: + - "27017:27017" + networks: + - mongodb-compose-network + mongo-express: + image: mongo-express:latest + container_name: mongo_express + environment: + ME_CONFIG_MONGODB_ADMINUSERNAME: root + ME_CONFIG_MONGODB_ADMINPASSWORD: example + ME_CONFIG_MONGODB_URL: mongodb://mongo:27017/ + ME_CONFIG_BASICAUTH: false + ports: + - "8081:8081" + depends_on: + - mongodb + networks: + - mongodb-compose-network + +networks: + mongodb-compose-network: + driver: bridge + + +# curl -X POST -H 'Content-Type:application/json' -H 'Accept:application/json' http://localhost:8888/account/mirror && curl -X POST -H 'Content-Type:application/json' -H 'Accept:application/json' http://localhost:8080/account/mirror \ No newline at end of file diff --git a/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/pom.xml b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/pom.xml new file mode 100644 index 0000000000..de51b9fb4a --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/pom.xml @@ -0,0 +1,199 @@ + + + + 4.0.0 + + + org.kie.kogito.examples + serverless-workflow-examples-parent + 999-SNAPSHOT + ../serverless-workflow-examples-parent/pom.xml + + + org.kie.kogito.examples + serverless-workflow-correlation-quarkus + 1.0-SNAPSHOT + + Kogito Example :: Serverless Workflow Correlation :: Quarkus + Kogito Serverless Workflow Correlation Example - Quarkus + + 3.8.4 + quarkus-bom + io.quarkus + 3.8.4 + org.kie.kogito + kogito-bom + 999-SNAPSHOT + 3.8.1 + 17 + 3.0.0-M7 + + + + + + ${quarkus.platform.group-id} + ${quarkus.platform.artifact-id} + ${quarkus.platform.version} + pom + import + + + ${kogito.bom.group-id} + ${kogito.bom.artifact-id} + ${kogito.bom.version} + pom + import + + + + + + org.apache.kie.sonataflow + sonataflow-quarkus + + + org.kie + kie-addons-quarkus-messaging + + + io.quarkus + quarkus-smallrye-reactive-messaging-kafka + + + io.quarkus + quarkus-resteasy + + + io.quarkus + quarkus-resteasy-jackson + + + io.quarkus + quarkus-smallrye-health + + + org.kie + kie-addons-quarkus-source-files + + + + io.quarkus + quarkus-junit5 + test + + + io.rest-assured + rest-assured + test + + + org.awaitility + awaitility + test + + + + + ${project.artifactId} + + + maven-compiler-plugin + ${version.compiler.plugin} + + ${maven.compiler.release} + + + + ${quarkus.platform.group-id} + quarkus-maven-plugin + ${quarkus-plugin.version} + true + + + + build + generate-code + generate-code-tests + + + + + + maven-failsafe-plugin + ${version.failsafe.plugin} + + + org.jboss.logmanager.LogManager + ${maven.home} + + + + + + integration-test + verify + + + + + + + + + container + + + container + + + + container + + + + io.quarkus + quarkus-container-image-jib + + + + + persistence + + + persistence + + + + + org.kie + kie-addons-quarkus-persistence-mongodb + + + io.quarkus + quarkus-mongodb-client + + + + + diff --git a/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/Account.java b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/Account.java new file mode 100644 index 0000000000..5da1f57541 --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/Account.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.examples; + +public class Account { + + private String email; + private String userId; + + public Account() { + } + + public Account(String email, String userId) { + this.email = email; + this.userId = userId; + } + + public String getEmail() { + return email; + } + + public String getUserId() { + return userId; + } +} diff --git a/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/EventsService.java b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/EventsService.java new file mode 100644 index 0000000000..004e81d958 --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/EventsService.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.examples; + +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Acknowledgment; +import org.eclipse.microprofile.reactive.messaging.Acknowledgment.Strategy; +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.kie.kogito.event.cloudevents.utils.CloudEventUtils; +import org.kie.kogito.internal.process.runtime.KogitoProcessContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.cloudevents.jackson.JsonCloudEventData; + +@ApplicationScoped +public class EventsService { + + private static final Logger logger = LoggerFactory.getLogger(EventsService.class); + + @Inject + ObjectMapper objectMapper; + + private Map accounts = new ConcurrentHashMap<>(); + + public void complete(JsonNode workflowData, KogitoProcessContext context) { + logger.info("Complete Account Creation received. Workflow data {}, KogitoProcessInstanceId {} ", workflowData, context.getProcessInstance().getStringId()); + } + + @Incoming("validate") + @Outgoing("validated") + @Acknowledgment(Strategy.POST_PROCESSING) + public String onEventValidate(Message message) { + Optional ce = CloudEventUtils.decode(message.getPayload()); + JsonCloudEventData cloudEventData = (JsonCloudEventData) ce.get().getData(); + logger.info("Validate Account received. Workflow data {}", cloudEventData); + String userId = ce.get().getExtension("userid").toString(); + + //just for testing + accounts.put(userId, ce.get().getExtension("kogitoprocinstanceid").toString()); + + return generateCloudEvent(userId, "validatedAccountEmail", null); + } + + @Incoming("activate") + @Outgoing("activated") + @Acknowledgment(Strategy.POST_PROCESSING) + public String onEventActivate(Message message) { + Optional ce = CloudEventUtils.decode(message.getPayload()); + JsonCloudEventData cloudEventData = (JsonCloudEventData) ce.get().getData(); + logger.info("Activate Account received. Workflow data {}", cloudEventData); + return generateCloudEvent(ce.get().getExtension("userid").toString(), "activatedAccount", null); + } + + private String generateCloudEvent(String id, String type, Object data) { + try { + return objectMapper.writeValueAsString(CloudEventBuilder.v1() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("")) + .withType(type) + .withTime(OffsetDateTime.now()) + .withExtension("userid", id) + .withData(objectMapper.writeValueAsBytes(data)) + .build()); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException(e); + } + } + + public final String getAccount(String userId) { + return accounts.get(userId); + } +} diff --git a/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/WorkflowResource.java b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/WorkflowResource.java new file mode 100644 index 0000000000..de6af279ab --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/java/org/kie/kogito/examples/WorkflowResource.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.examples; + +import java.net.URI; +import java.time.OffsetDateTime; +import java.util.Collections; +import java.util.Map; +import java.util.UUID; + +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.core.Response; + +import org.eclipse.microprofile.reactive.messaging.Channel; +import org.eclipse.microprofile.reactive.messaging.Emitter; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.cloudevents.core.builder.CloudEventBuilder; + +/** + * Helper class used to facilitate testing using REST + */ +@Path("/account") +public class WorkflowResource { + + @Inject + ObjectMapper objectMapper; + + @Channel("start") + Emitter emitter; + + @Inject + EventsService eventsService; + + @POST + @Path("/{userId}") + public Response onEvent(@PathParam("userId") String userId) { + String start = generateCloudEvent(userId, "newAccountEventType"); + emitter.send(start); + return Response.status(Response.Status.CREATED).build(); + } + + @GET + @Path("/{userId}") + public Map getProcessInstanceId(@PathParam("userId") String userId) { + return Collections.singletonMap("processInstanceId", eventsService.getAccount(userId)); + } + + private String generateCloudEvent(String id, String type) { + try { + return objectMapper.writeValueAsString(CloudEventBuilder.v03() + .withId(UUID.randomUUID().toString()) + .withSource(URI.create("")) + .withType(type) + .withTime(OffsetDateTime.now()) + .withExtension("userid", id) + .withData(objectMapper.writeValueAsBytes(new Account("test@test.com", id))) + .build()); + } catch (JsonProcessingException e) { + throw new IllegalArgumentException(e); + } + } +} diff --git a/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/resources/application.properties b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/resources/application.properties new file mode 100644 index 0000000000..2996230c52 --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/resources/application.properties @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +%prod.kafka.bootstrap.servers=localhost:9092 + +#start the workflow events +##application channels +mp.messaging.outgoing.start.connector=smallrye-kafka +mp.messaging.outgoing.start.topic=newAccountEventType +mp.messaging.outgoing.start.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.outgoing.start.group.id=kogito-sw-callback + +##workflow channels +mp.messaging.incoming.newAccountEventType.connector=smallrye-kafka +mp.messaging.incoming.newAccountEventType.topic=newAccountEventType +mp.messaging.incoming.newAccountEventType.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer +mp.messaging.incoming.newAccountEventType.group.id=kogito-sw-callback +mp.messaging.incoming.newAccountEventType.auto.offset.reset=earliest + +#activate account events +##application channels +mp.messaging.outgoing.activated.connector=smallrye-kafka +mp.messaging.outgoing.activated.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.outgoing.activated.topic=activatedAccount +mp.messaging.outgoing.activated.group.id=kogito-sw-callback + +mp.messaging.incoming.activate.connector=smallrye-kafka +mp.messaging.incoming.activate.topic=activateAccount +mp.messaging.incoming.activate.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer +mp.messaging.incoming.activate.group.id=kogito-sw-callback +mp.messaging.incoming.activate.auto.offset.reset=earliest + +##workflow channels +mp.messaging.incoming.activatedAccount.connector=smallrye-kafka +mp.messaging.incoming.activatedAccount.topic=activatedAccount +mp.messaging.incoming.activatedAccount.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer +mp.messaging.incoming.activatedAccount.group.id=kogito-sw-callback +mp.messaging.incoming.activatedAccount.auto.offset.reset=earliest + +mp.messaging.outgoing.activateAccount.connector=smallrye-kafka +mp.messaging.outgoing.activateAccount.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.outgoing.activateAccount.topic=activateAccount +mp.messaging.outgoing.activateAccount.group.id=kogito-sw-callback + +#validate email events +##application channels +mp.messaging.incoming.validate.connector=smallrye-kafka +mp.messaging.incoming.validate.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer +mp.messaging.incoming.validate.topic=validateAccountEmail +mp.messaging.incoming.validate.group.id=kogito-sw-callback +mp.messaging.incoming.validate.auto.offset.reset=earliest + +mp.messaging.outgoing.validated.connector=smallrye-kafka +mp.messaging.outgoing.validated.topic=validatedAccountEmail +mp.messaging.outgoing.validated.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.outgoing.validated.group.id=kogito-sw-callback + +##workflow channels +mp.messaging.outgoing.validateAccountEmail.connector=smallrye-kafka +mp.messaging.outgoing.validateAccountEmail.topic=validateAccountEmail +mp.messaging.outgoing.validateAccountEmail.value.serializer=org.apache.kafka.common.serialization.StringSerializer +mp.messaging.outgoing.validateAccountEmail.group.id=kogito-sw-callback + +mp.messaging.incoming.validatedAccountEmail.connector=smallrye-kafka +mp.messaging.incoming.validatedAccountEmail.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer +mp.messaging.incoming.validatedAccountEmail.topic=validatedAccountEmail +mp.messaging.incoming.validatedAccountEmail.group.id=kogito-sw-callback +mp.messaging.incoming.validatedAccountEmail.auto.offset.reset=earliest + +#Persistence configuration +kogito.persistence.type=mongodb + +quarkus.mongodb.database=kogito +%prod.quarkus.mongodb.database=kogito + +kogito.persistence.proto.marshaller=true + +quarkus.grpc.dev-mode.force-server-start=false + +# profile to pack this example into a container, to use it execute activate the maven container profile, -Dcontainer +%container.quarkus.container-image.build=true +%container.quarkus.container-image.push=false +%container.quarkus.container-image.group=${USER} +%container.quarkus.container-image.registry=dev.local +%container.quarkus.container-image.tag=1.0-SNAPSHOT diff --git a/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/resources/correlation.sw.json b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/resources/correlation.sw.json new file mode 100644 index 0000000000..b6a44aeb13 --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/main/resources/correlation.sw.json @@ -0,0 +1,104 @@ +{ + "id": "correlation", + "version": "1.0", + "name": "Workflow Correlation example", + "description": "An example of how to use correlation on events", + "start": "New User Account Request", + "events": [ + { + "name": "newAccountEvent", + "source": "", + "type": "newAccountEventType", + "correlation": [ + { + "contextAttributeName": "userid" + } + ] + }, + { + "name": "validateAccountEmailEvent", + "source": "workflow", + "type": "validateAccountEmail" + }, + { + "name": "validatedAccountEmailEvent", + "source": "workflow", + "type": "validatedAccountEmail", + "correlation": [ + { + "contextAttributeName": "userid" + } + ] + }, + { + "name": "activateAccountEvent", + "source": "workflow", + "type": "activateAccount" + }, + { + "name": "activatedAccountEvent", + "source": "workflow", + "type": "activatedAccount", + "correlation": [ + { + "contextAttributeName": "userid" + } + ] + } + ], + "functions": [ + { + "name": "complete", + "type": "custom", + "operation": "service:java:org.kie.kogito.examples.EventsService::complete" + } + ], + "states": [ + { + "name":"New User Account Request", + "type":"event", + "onEvents": [{ + "eventRefs": ["newAccountEvent"] + }], + "transition": "Validate User Email" + }, + { + "name": "Validate User Email", + "type": "callback", + "action": { + "name": "publish validate event", + "eventRef": { + "triggerEventRef": "validateAccountEmailEvent" + } + }, + "eventRef": "validatedAccountEmailEvent", + "transition": "Activate User Account" + }, +{ + "name": "Activate User Account", + "type": "callback", + "action": { + "name": "publish Activate Account event", + "eventRef": { + "triggerEventRef": "activateAccountEvent" + } + }, + "eventRef": "activatedAccountEvent", + "transition": "Account Creation Completed" + }, + + { + "name": "Account Creation Completed", + "type": "operation", + "actions": [ + { + "name": "accountCreationCompleted", + "functionRef": { + "refName": "complete" + } + } + ], + "end": true + } + ] +} diff --git a/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/test/java/org/kie/kogito/examples/CorrelationIT.java b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/test/java/org/kie/kogito/examples/CorrelationIT.java new file mode 100644 index 0000000000..9f8a24fcba --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/test/java/org/kie/kogito/examples/CorrelationIT.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.kie.kogito.examples; + +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; + +import io.quarkus.test.junit.QuarkusIntegrationTest; +import io.restassured.http.ContentType; + +import static io.restassured.RestAssured.given; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.notNullValue; + +@QuarkusIntegrationTest +class CorrelationIT { + + public static final String HEALTH_URL = "/q/health"; + public static final int TIMEOUT = 2; + + private String userId = "12345"; + + @Test + void testCorrelation() { + //health check - wait to be ready + await() + .atMost(TIMEOUT, MINUTES) + .pollDelay(2, SECONDS) + .pollInterval(1, SECONDS) + .untilAsserted(() -> given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .get(HEALTH_URL) + .then() + .statusCode(200)); + + //start workflow + given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .pathParam("userId", userId) + .post("/account/{userId}") + .then() + .statusCode(201); + + //check instance created + AtomicReference processInstanceId = new AtomicReference<>(); + await().atMost(TIMEOUT, MINUTES) + .pollInterval(1, SECONDS) + .untilAsserted(() -> processInstanceId.set(given() + .accept(ContentType.JSON) + .pathParam("userId", userId) + .get("/account/{userId}") + .then() + .statusCode(200) + .body("processInstanceId", notNullValue()) + .extract() + .body().path("processInstanceId"))); + + //check instance completed + await() + .atMost(TIMEOUT, MINUTES) + .pollInterval(1, SECONDS) + .untilAsserted(() -> given() + .contentType(ContentType.JSON) + .accept(ContentType.JSON) + .pathParam("processInstanceId", processInstanceId.get()) + .get("/correlation/{processInstanceId}") + .then() + .statusCode(404)); + } +} diff --git a/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/test/resources/application.properties b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/test/resources/application.properties new file mode 100644 index 0000000000..50bf6c1154 --- /dev/null +++ b/serverless-workflow-examples/serverless-workflow-correlation-mongodb-quarkus/src/test/resources/application.properties @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Quarkus +quarkus.http.test-port=0 + +# Temporary fix for test to pass due to issue in Quarkus classloading resolver +quarkus.class-loading.parent-first-artifacts=org.testcontainers:testcontainers \ No newline at end of file