Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KOGITO-9613] Adding persistence support to executor #3133

Merged
merged 4 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.kie.kogito.internal.process.runtime.KogitoWorkItemHandler;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstancesFactory;
import org.kie.kogito.process.Processes;
import org.kie.kogito.process.impl.CachedWorkItemHandlerConfig;
import org.kie.kogito.process.impl.DefaultProcessEventListenerConfig;
Expand Down Expand Up @@ -90,6 +91,7 @@ public class StaticWorkflowApplication extends StaticApplication implements Auto
private Iterable<StaticProcessRegister> processRegisters;
private final Collection<AutoCloseable> closeables = new ArrayList<>();
private final Map<String, SynchronousQueue<JsonNodeModel>> queues;
private ProcessInstancesFactory processInstancesFactory;

private static class StaticCompletionEventListener extends DefaultKogitoProcessEventListener {

Expand All @@ -105,7 +107,9 @@ public void afterProcessCompleted(ProcessCompletedEvent event) {
SynchronousQueue<JsonNodeModel> queue = queues.remove(instance.getId());
if (queue != null) {
try {
queue.offer(new JsonNodeModel(instance.getId(), instance.getVariables().get(SWFConstants.DEFAULT_WORKFLOW_VAR)), 1000L, TimeUnit.SECONDS);
if (queue.offer(new JsonNodeModel(instance.getId(), instance.getVariables().get(SWFConstants.DEFAULT_WORKFLOW_VAR)), 1L, TimeUnit.SECONDS)) {
logger.debug("waiting process instance {} has been notified about its completion", instance.getId());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -171,6 +175,11 @@ public JsonNodeModel execute(Workflow workflow, JsonNode data) {
return execute(findOrCreate(workflow), data);
}

public StaticWorkflowApplication processInstancesFactory(ProcessInstancesFactory processInstanceFactory) {
this.processInstancesFactory = processInstanceFactory;
return this;
}

private Process<JsonNodeModel> findOrCreate(Workflow workflow) {
return findProcessById(workflow.getId()).orElseGet(() -> process(workflow));
}
Expand Down Expand Up @@ -264,7 +273,7 @@ public Optional<JsonNodeModel> waitForFinish(String id, Duration duration) throw

private Process<JsonNodeModel> createProcess(Workflow workflow) {
workflowRegisters.forEach(r -> r.register(this, workflow));
StaticWorkflowProcess process = new StaticWorkflowProcess(this, handlers, ServerlessWorkflowParser
StaticWorkflowProcess process = new StaticWorkflowProcess(this, handlers, processInstancesFactory, ServerlessWorkflowParser
.of(workflow, JavaKogitoBuildContext.builder().withApplicationProperties(System.getProperties()).build()).getProcessInfo().info());
processRegisters.forEach(r -> r.register(this, workflow, process));
WorkflowProcessImpl workflowProcess = (WorkflowProcessImpl) process.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@
import org.kie.kogito.internal.process.runtime.KogitoWorkItemHandler;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstancesFactory;
import org.kie.kogito.process.impl.AbstractProcess;
import org.kie.kogito.serverless.workflow.models.JsonNodeModel;

class StaticWorkflowProcess extends AbstractProcess<JsonNodeModel> {

private final KogitoWorkflowProcess process;

public StaticWorkflowProcess(Application app, Collection<KogitoWorkItemHandler> handlers, KogitoWorkflowProcess process) {
super(app, handlers, null);
public StaticWorkflowProcess(Application app, Collection<KogitoWorkItemHandler> handlers, ProcessInstancesFactory processInstanceFactory, KogitoWorkflowProcess process) {
super(app, handlers, null, processInstanceFactory);
this.process = process;
activate();
}

@Override
Expand All @@ -53,6 +55,11 @@ public ProcessInstance<? extends Model> createInstance(Model m) {
return createInstance((JsonNodeModel) m);
}

@Override
public JsonNodeModel createModel() {
return new JsonNodeModel();
}

@Override
public ProcessInstance<JsonNodeModel> createInstance(WorkflowProcessInstance wpi) {
return new StaticWorkflowProcessInstance(this, this.createModel(), this.createProcessRuntime(), wpi);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,21 @@
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-persistence-rocksdb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.kie.kogito.serverless.workflow.executor;

import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
Expand All @@ -26,6 +27,12 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.kie.kogito.persistence.rocksdb.RocksDBProcessInstancesFactory;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;

import com.fasterxml.jackson.databind.node.TextNode;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
Expand All @@ -34,6 +41,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.awaitility.Awaitility.await;
import static org.kie.kogito.serverless.workflow.fluent.ActionBuilder.call;
import static org.kie.kogito.serverless.workflow.fluent.EventDefBuilder.eventDef;
import static org.kie.kogito.serverless.workflow.fluent.FunctionBuilder.expr;
Expand All @@ -60,6 +68,24 @@ void testCallbackSubscriber() throws InterruptedException, TimeoutException {
}
}

@Test
void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws InterruptedException, TimeoutException, RocksDBException {
final String eventType = "testSubscribe";
final String additionalData = "This has been injected by the event";
Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"er Beti\"}")), eventDef(eventType))).end().build();
try (StaticWorkflowApplication application =
StaticWorkflowApplication.create().processInstancesFactory(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) {
String id = application.execute(workflow, jsonObject().put("slogan", "Viva ")).getId();
assertThat(application.variables(id).orElseThrow().getWorkflowdata()).doesNotContain(new TextNode(additionalData));
publish(eventType, buildCloudEvent(eventType, id)
.withData(JsonCloudEventData.wrap(jsonObject().put("additionalData", additionalData)))
.build());
assertThat(application.waitForFinish(id, Duration.ofSeconds(20)).orElseThrow().getWorkflowdata())
.isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti"));
await().atMost(Duration.ofSeconds(1)).pollInterval(Duration.ofMillis(50)).until(() -> application.variables(id).isEmpty());
}
}

@Test
void testEventSubscriber() throws InterruptedException, TimeoutException {
final String eventType = "testSubscribe";
Expand Down
5 changes: 5 additions & 0 deletions quarkus/addons/process-definitions/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-quarkus-persistence-rocksdb-deployment</artifactId>
<scope>test</scope>
</dependency>
Comment on lines +38 to +42
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjtirado So this means we will now always be testing the RocksDB implementation? We should be testing in-memory as well, I think, wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fjtirado another point, if you use postgresql here, it would be easier to include this new addon as part of the prod build.

<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
Expand All @@ -28,6 +30,7 @@
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import org.kie.kogito.process.ProcessInstancesFactory;
import org.kie.kogito.serverless.workflow.executor.StaticWorkflowApplication;
import org.kie.kogito.serverless.workflow.models.JsonNodeModel;
import org.kie.kogito.serverless.workflow.models.JsonNodeModelInput;
Expand All @@ -41,9 +44,13 @@ public class ProcessDefinitionsResource {

private StaticWorkflowApplication application;

@Inject
Instance<ProcessInstancesFactory> factories;

@PostConstruct
void init() {
application = StaticWorkflowApplication.create();
factories.stream().findFirst().ifPresent(application::processInstancesFactory);
}

@PreDestroy
Expand Down
Loading