diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java index 958fdf04ca7..a9a7409037e 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java @@ -46,6 +46,7 @@ import org.kie.kogito.internal.process.runtime.KogitoWorkItemHandler; import org.kie.kogito.process.Process; import org.kie.kogito.process.ProcessInstance; +import org.kie.kogito.process.ProcessInstancesFactory; import org.kie.kogito.process.Processes; import org.kie.kogito.process.impl.CachedWorkItemHandlerConfig; import org.kie.kogito.process.impl.DefaultProcessEventListenerConfig; @@ -90,6 +91,7 @@ public class StaticWorkflowApplication extends StaticApplication implements Auto private Iterable processRegisters; private final Collection closeables = new ArrayList<>(); private final Map> queues; + private ProcessInstancesFactory processInstancesFactory; private static class StaticCompletionEventListener extends DefaultKogitoProcessEventListener { @@ -171,6 +173,11 @@ public JsonNodeModel execute(Workflow workflow, JsonNode data) { return execute(findOrCreate(workflow), data); } + public StaticWorkflowApplication withProcessInstances(ProcessInstancesFactory processInstanceFactory) { + this.processInstancesFactory = processInstanceFactory; + return this; + } + private Process findOrCreate(Workflow workflow) { return findProcessById(workflow.getId()).orElseGet(() -> process(workflow)); } @@ -264,7 +271,7 @@ public Optional waitForFinish(String id, Duration duration) throw private Process createProcess(Workflow workflow) { workflowRegisters.forEach(r -> r.register(this, workflow)); - StaticWorkflowProcess process = new StaticWorkflowProcess(this, handlers, ServerlessWorkflowParser + StaticWorkflowProcess process = new StaticWorkflowProcess(this, handlers, processInstancesFactory, ServerlessWorkflowParser .of(workflow, JavaKogitoBuildContext.builder().withApplicationProperties(System.getProperties()).build()).getProcessInfo().info()); processRegisters.forEach(r -> r.register(this, workflow, process)); WorkflowProcessImpl workflowProcess = (WorkflowProcessImpl) process.get(); diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowProcess.java b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowProcess.java index 2c98a4586bc..4658f0b2814 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowProcess.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowProcess.java @@ -25,6 +25,7 @@ import org.kie.kogito.internal.process.runtime.KogitoWorkItemHandler; import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess; import org.kie.kogito.process.ProcessInstance; +import org.kie.kogito.process.ProcessInstancesFactory; import org.kie.kogito.process.impl.AbstractProcess; import org.kie.kogito.serverless.workflow.models.JsonNodeModel; @@ -32,9 +33,10 @@ class StaticWorkflowProcess extends AbstractProcess { private final KogitoWorkflowProcess process; - public StaticWorkflowProcess(Application app, Collection handlers, KogitoWorkflowProcess process) { - super(app, handlers, null); + public StaticWorkflowProcess(Application app, Collection handlers, ProcessInstancesFactory processInstanceFactory, KogitoWorkflowProcess process) { + super(app, handlers, null, processInstanceFactory); this.process = process; + activate(); } @Override @@ -53,6 +55,11 @@ public ProcessInstance createInstance(Model m) { return createInstance((JsonNodeModel) m); } + @Override + public JsonNodeModel createModel() { + return new JsonNodeModel(); + } + @Override public ProcessInstance createInstance(WorkflowProcessInstance wpi) { return new StaticWorkflowProcessInstance(this, this.createModel(), this.createProcessRuntime(), wpi); diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml index 9be2961fe88..a995b5db496 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml @@ -26,6 +26,11 @@ io.cloudevents cloudevents-kafka + + org.kie.kogito + kogito-addons-persistence-rocksdb + test + org.junit.jupiter junit-jupiter-api diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java index 0bfbc3a3b60..7362f7ca5b1 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java @@ -16,6 +16,7 @@ package org.kie.kogito.serverless.workflow.executor; import java.net.URI; +import java.nio.file.Path; import java.time.Duration; import java.time.OffsetDateTime; import java.util.Collections; @@ -26,6 +27,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.kie.kogito.persistence.rocksdb.RocksDBProcessInstancesFactory; +import org.rocksdb.Options; +import org.rocksdb.RocksDBException; + +import com.fasterxml.jackson.databind.node.TextNode; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; @@ -60,6 +67,25 @@ void testCallbackSubscriber() throws InterruptedException, TimeoutException { } } + @Test + void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws InterruptedException, TimeoutException, RocksDBException { + final String eventType = "testSubscribe"; + final String additionalData = "This has been injected by the event"; + Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"er Beti\"}")), eventDef(eventType))).end().build(); + try (StaticWorkflowApplication application = + StaticWorkflowApplication.create().withProcessInstances(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) { + String id = application.execute(workflow, jsonObject().put("slogan", "Viva ")).getId(); + assertThat(application.variables(id).orElseThrow().getWorkflowdata().get("slogan").equals(new TextNode("Viva "))); + publish(eventType, buildCloudEvent(eventType, id) + .withData(JsonCloudEventData.wrap(jsonObject().put("additionalData", additionalData))) + .build()); + assertThat(application.waitForFinish(id, Duration.ofSeconds(20)).orElseThrow().getWorkflowdata()) + .isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti")); + Thread.sleep(10); + assertThat(application.variables(id)).isEmpty(); + } + } + @Test void testEventSubscriber() throws InterruptedException, TimeoutException { final String eventType = "testSubscribe";