Skip to content

Commit

Permalink
[KOGITO-9613] Adding persistence support to executor
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Jul 24, 2023
1 parent 5415b30 commit 6a0275b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private class RockSplitIterator extends AbstractSpliterator<ProcessInstance<T>>
private final RocksIterator iterator;

protected RockSplitIterator(RocksIterator iterator) {
super(Integer.MAX_VALUE, 0);
super(Integer.MAX_VALUE, 0);instances
this.iterator = iterator;
iterator.seekToFirst();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.kie.kogito.internal.process.runtime.KogitoWorkItemHandler;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstancesFactory;
import org.kie.kogito.process.Processes;
import org.kie.kogito.process.impl.CachedWorkItemHandlerConfig;
import org.kie.kogito.process.impl.DefaultProcessEventListenerConfig;
Expand Down Expand Up @@ -90,6 +91,7 @@ public class StaticWorkflowApplication extends StaticApplication implements Auto
private Iterable<StaticProcessRegister> processRegisters;
private final Collection<AutoCloseable> closeables = new ArrayList<>();
private final Map<String, SynchronousQueue<JsonNodeModel>> queues;
private ProcessInstancesFactory processInstancesFactory;

private static class StaticCompletionEventListener extends DefaultKogitoProcessEventListener {

Expand Down Expand Up @@ -171,6 +173,11 @@ public JsonNodeModel execute(Workflow workflow, JsonNode data) {
return execute(findOrCreate(workflow), data);
}

public StaticWorkflowApplication withProcessInstances(ProcessInstancesFactory processInstanceFactory) {
this.processInstancesFactory = processInstanceFactory;
return this;
}

private Process<JsonNodeModel> findOrCreate(Workflow workflow) {
return findProcessById(workflow.getId()).orElseGet(() -> process(workflow));
}
Expand Down Expand Up @@ -264,7 +271,7 @@ public Optional<JsonNodeModel> waitForFinish(String id, Duration duration) throw

private Process<JsonNodeModel> createProcess(Workflow workflow) {
workflowRegisters.forEach(r -> r.register(this, workflow));
StaticWorkflowProcess process = new StaticWorkflowProcess(this, handlers, ServerlessWorkflowParser
StaticWorkflowProcess process = new StaticWorkflowProcess(this, handlers, processInstancesFactory, ServerlessWorkflowParser
.of(workflow, JavaKogitoBuildContext.builder().withApplicationProperties(System.getProperties()).build()).getProcessInfo().info());
processRegisters.forEach(r -> r.register(this, workflow, process));
WorkflowProcessImpl workflowProcess = (WorkflowProcessImpl) process.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@
import org.kie.kogito.internal.process.runtime.KogitoWorkItemHandler;
import org.kie.kogito.internal.process.runtime.KogitoWorkflowProcess;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstancesFactory;
import org.kie.kogito.process.impl.AbstractProcess;
import org.kie.kogito.serverless.workflow.models.JsonNodeModel;

class StaticWorkflowProcess extends AbstractProcess<JsonNodeModel> {

private final KogitoWorkflowProcess process;

public StaticWorkflowProcess(Application app, Collection<KogitoWorkItemHandler> handlers, KogitoWorkflowProcess process) {
super(app, handlers, null);
public StaticWorkflowProcess(Application app, Collection<KogitoWorkItemHandler> handlers, ProcessInstancesFactory processInstanceFactory, KogitoWorkflowProcess process) {
super(app, handlers, null, processInstanceFactory);
this.process = process;
activate();
}

@Override
Expand All @@ -53,6 +55,11 @@ public ProcessInstance<? extends Model> createInstance(Model m) {
return createInstance((JsonNodeModel) m);
}

@Override
public JsonNodeModel createModel() {
return new JsonNodeModel();
}

@Override
public ProcessInstance<JsonNodeModel> createInstance(WorkflowProcessInstance wpi) {
return new StaticWorkflowProcessInstance(this, this.createModel(), this.createProcessRuntime(), wpi);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-persistence-rocksdb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.kie.kogito.serverless.workflow.executor;

import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
Expand All @@ -26,6 +27,12 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.kie.kogito.persistence.rocksdb.RocksDBProcessInstancesFactory;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;

import com.fasterxml.jackson.databind.node.TextNode;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
Expand Down Expand Up @@ -60,6 +67,25 @@ void testCallbackSubscriber() throws InterruptedException, TimeoutException {
}
}

@Test
void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws InterruptedException, TimeoutException, RocksDBException {
final String eventType = "testSubscribe";
final String additionalData = "This has been injected by the event";
Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"er Beti\"}")), eventDef(eventType))).end().build();
try (StaticWorkflowApplication application =
StaticWorkflowApplication.create().withProcessInstances(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) {
String id = application.execute(workflow, jsonObject().put("slogan", "Viva ")).getId();
assertThat(application.variables(id).orElseThrow().getWorkflowdata().get("slogan").equals(new TextNode("Viva ")));
publish(eventType, buildCloudEvent(eventType, id)
.withData(JsonCloudEventData.wrap(jsonObject().put("additionalData", additionalData)))
.build());
assertThat(application.waitForFinish(id, Duration.ofSeconds(20)).orElseThrow().getWorkflowdata())
.isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti"));
Thread.sleep(10);
assertThat(application.variables(id)).isEmpty();
}
}

@Test
void testEventSubscriber() throws InterruptedException, TimeoutException {
final String eventType = "testSubscribe";
Expand Down

0 comments on commit 6a0275b

Please sign in to comment.