diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java index a9a7409037e..ea5edc1e4d6 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-core/src/main/java/org/kie/kogito/serverless/workflow/executor/StaticWorkflowApplication.java @@ -107,7 +107,7 @@ public void afterProcessCompleted(ProcessCompletedEvent event) { SynchronousQueue queue = queues.remove(instance.getId()); if (queue != null) { try { - queue.offer(new JsonNodeModel(instance.getId(), instance.getVariables().get(SWFConstants.DEFAULT_WORKFLOW_VAR)), 1000L, TimeUnit.SECONDS); + queue.offer(new JsonNodeModel(instance.getId(), instance.getVariables().get(SWFConstants.DEFAULT_WORKFLOW_VAR)), 1L, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -173,7 +173,7 @@ public JsonNodeModel execute(Workflow workflow, JsonNode data) { return execute(findOrCreate(workflow), data); } - public StaticWorkflowApplication withProcessInstances(ProcessInstancesFactory processInstanceFactory) { + public StaticWorkflowApplication processInstancesFactory(ProcessInstancesFactory processInstanceFactory) { this.processInstancesFactory = processInstanceFactory; return this; } diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml index a995b5db496..33ca3f8f4d3 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/pom.xml @@ -36,6 +36,11 @@ junit-jupiter-api test + + org.awaitility + awaitility + test + ch.qos.logback logback-classic diff --git a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java index 7362f7ca5b1..4035672db0f 100644 --- a/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java +++ b/kogito-serverless-workflow/kogito-serverless-workflow-executor-kafka/src/test/java/org/kie/kogito/serverless/workflow/executor/WorkflowEventSubscriberTest.java @@ -41,6 +41,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.awaitility.Awaitility.await; import static org.kie.kogito.serverless.workflow.fluent.ActionBuilder.call; import static org.kie.kogito.serverless.workflow.fluent.EventDefBuilder.eventDef; import static org.kie.kogito.serverless.workflow.fluent.FunctionBuilder.expr; @@ -73,7 +74,7 @@ void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws Interru final String additionalData = "This has been injected by the event"; Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"er Beti\"}")), eventDef(eventType))).end().build(); try (StaticWorkflowApplication application = - StaticWorkflowApplication.create().withProcessInstances(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) { + StaticWorkflowApplication.create().processInstancesFactory(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) { String id = application.execute(workflow, jsonObject().put("slogan", "Viva ")).getId(); assertThat(application.variables(id).orElseThrow().getWorkflowdata().get("slogan").equals(new TextNode("Viva "))); publish(eventType, buildCloudEvent(eventType, id) @@ -81,8 +82,7 @@ void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws Interru .build()); assertThat(application.waitForFinish(id, Duration.ofSeconds(20)).orElseThrow().getWorkflowdata()) .isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti")); - Thread.sleep(10); - assertThat(application.variables(id)).isEmpty(); + await().atMost(Duration.ofSeconds(1)).pollInterval(Duration.ofMillis(50)).until(() -> application.variables(id).isEmpty()); } } diff --git a/quarkus/addons/process-definitions/runtime/pom.xml b/quarkus/addons/process-definitions/runtime/pom.xml index aeead3cd7ee..34cfb825430 100644 --- a/quarkus/addons/process-definitions/runtime/pom.xml +++ b/quarkus/addons/process-definitions/runtime/pom.xml @@ -35,6 +35,11 @@ quarkus-junit5 test + + org.kie.kogito + kogito-addons-quarkus-persistence-rocksdb + test + io.rest-assured rest-assured diff --git a/quarkus/addons/process-definitions/runtime/src/main/java/org/kie/kogito/process/definitions/ProcessDefinitionsResource.java b/quarkus/addons/process-definitions/runtime/src/main/java/org/kie/kogito/process/definitions/ProcessDefinitionsResource.java index c27ce320ffb..2b7c9d08799 100644 --- a/quarkus/addons/process-definitions/runtime/src/main/java/org/kie/kogito/process/definitions/ProcessDefinitionsResource.java +++ b/quarkus/addons/process-definitions/runtime/src/main/java/org/kie/kogito/process/definitions/ProcessDefinitionsResource.java @@ -20,6 +20,8 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import javax.enterprise.inject.Instance; +import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.POST; import javax.ws.rs.Path; @@ -28,6 +30,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.kie.kogito.process.ProcessInstancesFactory; import org.kie.kogito.serverless.workflow.executor.StaticWorkflowApplication; import org.kie.kogito.serverless.workflow.models.JsonNodeModel; import org.kie.kogito.serverless.workflow.models.JsonNodeModelInput; @@ -41,9 +44,13 @@ public class ProcessDefinitionsResource { private StaticWorkflowApplication application; + @Inject + Instance factories; + @PostConstruct void init() { application = StaticWorkflowApplication.create(); + factories.stream().findFirst().ifPresent(application::processInstancesFactory); } @PreDestroy