Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KOGITO-9613] Adding persistence support to executor #3133

Merged
merged 4 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.kie.kogito.internal.process.runtime.KogitoWorkItemHandler;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstancesFactory;
import org.kie.kogito.process.Processes;
import org.kie.kogito.process.impl.CachedWorkItemHandlerConfig;
import org.kie.kogito.process.impl.DefaultProcessEventListenerConfig;
Expand Down Expand Up @@ -90,6 +91,7 @@ public class StaticWorkflowApplication extends StaticApplication implements Auto
private Iterable<StaticProcessRegister> processRegisters;
private final Collection<AutoCloseable> closeables = new ArrayList<>();
private final Map<String, SynchronousQueue<JsonNodeModel>> queues;
private ProcessInstancesFactory processInstancesFactory;

private static class StaticCompletionEventListener extends DefaultKogitoProcessEventListener {

Expand Down Expand Up @@ -171,6 +173,11 @@ public JsonNodeModel execute(Workflow workflow, JsonNode data) {
return execute(findOrCreate(workflow), data);
}

public StaticWorkflowApplication withProcessInstances(ProcessInstancesFactory processInstanceFactory) {
this.processInstancesFactory = processInstanceFactory;
return this;
}

private Process<JsonNodeModel> findOrCreate(Workflow workflow) {
return findProcessById(workflow.getId()).orElseGet(() -> process(workflow));
}
Expand Down Expand Up @@ -264,7 +271,7 @@ public Optional<JsonNodeModel> waitForFinish(String id, Duration duration) throw

private Process<JsonNodeModel> createProcess(Workflow workflow) {
workflowRegisters.forEach(r -> r.register(this, workflow));
StaticWorkflowProcess process = new StaticWorkflowProcess(this, handlers, ServerlessWorkflowParser
StaticWorkflowProcess process = new StaticWorkflowProcess(this, handlers, processInstancesFactory, ServerlessWorkflowParser
.of(workflow, JavaKogitoBuildContext.builder().withApplicationProperties(System.getProperties()).build()).getProcessInfo().info());
processRegisters.forEach(r -> r.register(this, workflow, process));
WorkflowProcessImpl workflowProcess = (WorkflowProcessImpl) process.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@
import org.kie.kogito.internal.process.runtime.KogitoWorkItemHandler;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstancesFactory;
import org.kie.kogito.process.impl.AbstractProcess;
import org.kie.kogito.serverless.workflow.models.JsonNodeModel;

class StaticWorkflowProcess extends AbstractProcess<JsonNodeModel> {

private final KogitoWorkflowProcess process;

public StaticWorkflowProcess(Application app, Collection<KogitoWorkItemHandler> handlers, KogitoWorkflowProcess process) {
super(app, handlers, null);
public StaticWorkflowProcess(Application app, Collection<KogitoWorkItemHandler> handlers, ProcessInstancesFactory processInstanceFactory, KogitoWorkflowProcess process) {
super(app, handlers, null, processInstanceFactory);
this.process = process;
activate();
}

@Override
Expand All @@ -53,6 +55,11 @@ public ProcessInstance<? extends Model> createInstance(Model m) {
return createInstance((JsonNodeModel) m);
}

@Override
public JsonNodeModel createModel() {
return new JsonNodeModel();
}

@Override
public ProcessInstance<JsonNodeModel> createInstance(WorkflowProcessInstance wpi) {
return new StaticWorkflowProcessInstance(this, this.createModel(), this.createProcessRuntime(), wpi);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-persistence-rocksdb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.kie.kogito.serverless.workflow.executor;

import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
Expand All @@ -26,6 +27,12 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.kie.kogito.persistence.rocksdb.RocksDBProcessInstancesFactory;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;

import com.fasterxml.jackson.databind.node.TextNode;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
Expand Down Expand Up @@ -60,6 +67,25 @@ void testCallbackSubscriber() throws InterruptedException, TimeoutException {
}
}

@Test
void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws InterruptedException, TimeoutException, RocksDBException {
final String eventType = "testSubscribe";
final String additionalData = "This has been injected by the event";
Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"er Beti\"}")), eventDef(eventType))).end().build();
try (StaticWorkflowApplication application =
StaticWorkflowApplication.create().withProcessInstances(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) {
String id = application.execute(workflow, jsonObject().put("slogan", "Viva ")).getId();
assertThat(application.variables(id).orElseThrow().getWorkflowdata().get("slogan").equals(new TextNode("Viva ")));
publish(eventType, buildCloudEvent(eventType, id)
.withData(JsonCloudEventData.wrap(jsonObject().put("additionalData", additionalData)))
.build());
assertThat(application.waitForFinish(id, Duration.ofSeconds(20)).orElseThrow().getWorkflowdata())
.isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti"));
Thread.sleep(10);
fjtirado marked this conversation as resolved.
Show resolved Hide resolved
assertThat(application.variables(id)).isEmpty();
}
}

@Test
void testEventSubscriber() throws InterruptedException, TimeoutException {
final String eventType = "testSubscribe";
Expand Down