diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java index 958fdf04ca7..0f77e351413 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java @@ -46,6 +46,7 @@ import org.kie.kogito.internal.process.runtime.KogitoWorkItemHandler; import org.kie.kogito.process.Process; import org.kie.kogito.process.ProcessInstance; +import org.kie.kogito.process.ProcessInstancesFactory; import org.kie.kogito.process.Processes; import org.kie.kogito.process.impl.CachedWorkItemHandlerConfig; import org.kie.kogito.process.impl.DefaultProcessEventListenerConfig; @@ -90,6 +91,7 @@ public class StaticWorkflowApplication extends StaticApplication implements Auto private Iterable processRegisters; private final Collection closeables = new ArrayList<>(); private final Map> queues; + private ProcessInstancesFactory processInstancesFactory; private static class StaticCompletionEventListener extends DefaultKogitoProcessEventListener { @@ -105,7 +107,9 @@ public void afterProcessCompleted(ProcessCompletedEvent event) { SynchronousQueue queue = queues.remove(instance.getId()); if (queue != null) { try { - queue.offer(new JsonNodeModel(instance.getId(), instance.getVariables().get(SWFConstants.DEFAULT_WORKFLOW_VAR)), 1000L, TimeUnit.SECONDS); + if (queue.offer(new JsonNodeModel(instance.getId(), instance.getVariables().get(SWFConstants.DEFAULT_WORKFLOW_VAR)), 1L, TimeUnit.SECONDS)) { + logger.debug("waiting process instance {} has been notified about its completion", instance.getId()); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -171,6 +175,11 @@ public JsonNodeModel execute(Workflow workflow, JsonNode data) { return execute(findOrCreate(workflow), data); } + public StaticWorkflowApplication processInstancesFactory(ProcessInstancesFactory processInstanceFactory) { + this.processInstancesFactory = processInstanceFactory; + return this; + } + private Process findOrCreate(Workflow workflow) { return findProcessById(workflow.getId()).orElseGet(() -> process(workflow)); } @@ -264,7 +273,7 @@ public Optional waitForFinish(String id, Duration duration) throw private Process createProcess(Workflow workflow) { workflowRegisters.forEach(r -> r.register(this, workflow)); - StaticWorkflowProcess process = new StaticWorkflowProcess(this, handlers, ServerlessWorkflowParser + StaticWorkflowProcess process = new StaticWorkflowProcess(this, handlers, processInstancesFactory, ServerlessWorkflowParser .of(workflow, JavaKogitoBuildContext.builder().withApplicationProperties(System.getProperties()).build()).getProcessInfo().info()); processRegisters.forEach(r -> r.register(this, workflow, process)); WorkflowProcessImpl workflowProcess = (WorkflowProcessImpl) process.get(); diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowProcess.java b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowProcess.java index 2c98a4586bc..4658f0b2814 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowProcess.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowProcess.java @@ -25,6 +25,7 @@ import org.kie.kogito.internal.process.runtime.KogitoWorkItemHandler; import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess; import org.kie.kogito.process.ProcessInstance; +import org.kie.kogito.process.ProcessInstancesFactory; import org.kie.kogito.process.impl.AbstractProcess; import org.kie.kogito.serverless.workflow.models.JsonNodeModel; @@ -32,9 +33,10 @@ class StaticWorkflowProcess extends AbstractProcess { private final KogitoWorkflowProcess process; - public StaticWorkflowProcess(Application app, Collection handlers, KogitoWorkflowProcess process) { - super(app, handlers, null); + public StaticWorkflowProcess(Application app, Collection handlers, ProcessInstancesFactory processInstanceFactory, KogitoWorkflowProcess process) { + super(app, handlers, null, processInstanceFactory); this.process = process; + activate(); } @Override @@ -53,6 +55,11 @@ public ProcessInstance createInstance(Model m) { return createInstance((JsonNodeModel) m); } + @Override + public JsonNodeModel createModel() { + return new JsonNodeModel(); + } + @Override public ProcessInstance createInstance(WorkflowProcessInstance wpi) { return new StaticWorkflowProcessInstance(this, this.createModel(), this.createProcessRuntime(), wpi); diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml index 9be2961fe88..33ca3f8f4d3 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml @@ -26,11 +26,21 @@ io.cloudevents cloudevents-kafka + + org.kie.kogito + kogito-addons-persistence-rocksdb + test + org.junit.jupiter junit-jupiter-api test + + org.awaitility + awaitility + test + ch.qos.logback logback-classic diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java index 0bfbc3a3b60..58abdf42d77 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java @@ -16,6 +16,7 @@ package org.kie.kogito.serverless.workflow.executor; import java.net.URI; +import java.nio.file.Path; import java.time.Duration; import java.time.OffsetDateTime; import java.util.Collections; @@ -26,6 +27,12 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.kie.kogito.persistence.rocksdb.RocksDBProcessInstancesFactory; +import org.rocksdb.Options; +import org.rocksdb.RocksDBException; + +import com.fasterxml.jackson.databind.node.TextNode; import io.cloudevents.CloudEvent; import io.cloudevents.core.builder.CloudEventBuilder; @@ -34,6 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.awaitility.Awaitility.await; import static org.kie.kogito.serverless.workflow.fluent.ActionBuilder.call; import static org.kie.kogito.serverless.workflow.fluent.EventDefBuilder.eventDef; import static org.kie.kogito.serverless.workflow.fluent.FunctionBuilder.expr; @@ -60,6 +68,24 @@ void testCallbackSubscriber() throws InterruptedException, TimeoutException { } } + @Test + void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws InterruptedException, TimeoutException, RocksDBException { + final String eventType = "testSubscribe"; + final String additionalData = "This has been injected by the event"; + Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"er Beti\"}")), eventDef(eventType))).end().build(); + try (StaticWorkflowApplication application = + StaticWorkflowApplication.create().processInstancesFactory(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) { + String id = application.execute(workflow, jsonObject().put("slogan", "Viva ")).getId(); + assertThat(application.variables(id).orElseThrow().getWorkflowdata()).doesNotContain(new TextNode(additionalData)); + publish(eventType, buildCloudEvent(eventType, id) + .withData(JsonCloudEventData.wrap(jsonObject().put("additionalData", additionalData))) + .build()); + assertThat(application.waitForFinish(id, Duration.ofSeconds(20)).orElseThrow().getWorkflowdata()) + .isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti")); + await().atMost(Duration.ofSeconds(1)).pollInterval(Duration.ofMillis(50)).until(() -> application.variables(id).isEmpty()); + } + } + @Test void testEventSubscriber() throws InterruptedException, TimeoutException { final String eventType = "testSubscribe"; diff --git a/quarkus/addons/process-definitions/runtime/pom.xml b/quarkus/addons/process-definitions/runtime/pom.xml index aeead3cd7ee..deef3ce872d 100644 --- a/quarkus/addons/process-definitions/runtime/pom.xml +++ b/quarkus/addons/process-definitions/runtime/pom.xml @@ -35,6 +35,11 @@ quarkus-junit5 test + + org.kie.kogito + kogito-addons-quarkus-persistence-rocksdb-deployment + test + io.rest-assured rest-assured diff --git a/quarkus/addons/process-definitions/runtime/src/main/java/org/kie/kogito/process/definitions/ProcessDefinitionsResource.java b/quarkus/addons/process-definitions/runtime/src/main/java/org/kie/kogito/process/definitions/ProcessDefinitionsResource.java index c27ce320ffb..2b7c9d08799 100644 --- a/quarkus/addons/process-definitions/runtime/src/main/java/org/kie/kogito/process/definitions/ProcessDefinitionsResource.java +++ b/quarkus/addons/process-definitions/runtime/src/main/java/org/kie/kogito/process/definitions/ProcessDefinitionsResource.java @@ -20,6 +20,8 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import javax.enterprise.inject.Instance; +import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -28,6 +30,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.kie.kogito.process.ProcessInstancesFactory; import org.kie.kogito.serverless.workflow.executor.StaticWorkflowApplication; import org.kie.kogito.serverless.workflow.models.JsonNodeModel; import org.kie.kogito.serverless.workflow.models.JsonNodeModelInput; @@ -41,9 +44,13 @@ public class ProcessDefinitionsResource { private StaticWorkflowApplication application; + @Inject + Instance factories; + @PostConstruct void init() { application = StaticWorkflowApplication.create(); + factories.stream().findFirst().ifPresent(application::processInstancesFactory); } @PreDestroy