Skip to content

Commit

Permalink
Moving persistence test to test project to avoid product dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Jul 31, 2023
1 parent 859978a commit 06f4262
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,11 @@
<groupId>io.cloudevents</groupId>
<artifactId>cloudevents-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-persistence-rocksdb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package org.kie.kogito.serverless.workflow.executor;

import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
Expand All @@ -27,12 +26,6 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.kie.kogito.persistence.rocksdb.RocksDBProcessInstancesFactory;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;

import com.fasterxml.jackson.databind.node.TextNode;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
Expand All @@ -41,7 +34,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.awaitility.Awaitility.await;
import static org.kie.kogito.serverless.workflow.fluent.ActionBuilder.call;
import static org.kie.kogito.serverless.workflow.fluent.EventDefBuilder.eventDef;
import static org.kie.kogito.serverless.workflow.fluent.FunctionBuilder.expr;
Expand All @@ -68,24 +60,6 @@ void testCallbackSubscriber() throws InterruptedException, TimeoutException {
}
}

@Test
void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws InterruptedException, TimeoutException, RocksDBException {
final String eventType = "testSubscribe";
final String additionalData = "This has been injected by the event";
Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"er Beti\"}")), eventDef(eventType))).end().build();
try (StaticWorkflowApplication application =
StaticWorkflowApplication.create().processInstancesFactory(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) {
String id = application.execute(workflow, jsonObject().put("slogan", "Viva ")).getId();
assertThat(application.variables(id).orElseThrow().getWorkflowdata()).doesNotContain(new TextNode(additionalData));
publish(eventType, buildCloudEvent(eventType, id)
.withData(JsonCloudEventData.wrap(jsonObject().put("additionalData", additionalData)))
.build());
assertThat(application.waitForFinish(id, Duration.ofSeconds(20)).orElseThrow().getWorkflowdata())
.isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti"));
await().atMost(Duration.ofSeconds(1)).pollInterval(Duration.ofMillis(50)).until(() -> application.variables(id).isEmpty());
}
}

@Test
void testEventSubscriber() throws InterruptedException, TimeoutException {
final String eventType = "testSubscribe";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-serverless-workflow-executor-rest</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-serverless-workflow-executor-kafka</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand All @@ -49,6 +54,21 @@
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock-jre8</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-persistence-rocksdb</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-serverless-workflow-fluent</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<!-- necessary for Java 9+ -->
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.serverless.workflow.executor;

import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventSerializer;

public class MockKafkaEventEmitterFactory extends KafkaEventEmitterFactory {

public static MockProducer<byte[], CloudEvent> producer = new MockProducer<>(true, new ByteArraySerializer(), new CloudEventSerializer() {
@Override
public byte[] serialize(String topic, CloudEvent data) {
return super.serialize(topic, new RecordHeaders(), data);
}
});

@Override
public int ordinal() {
return 1;
}

@Override
protected Producer<byte[], CloudEvent> createKafkaProducer() {
return producer;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.serverless.workflow.executor;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;

import io.cloudevents.CloudEvent;

public class MockKafkaEventReceiverFactory extends KafkaEventReceiverFactory {

public static MockConsumer<byte[], CloudEvent> consumer;

@Override
public int ordinal() {
return 1;
}

@Override
protected Consumer<byte[], CloudEvent> createKafkaConsumer() {
return consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST) {
@Override
public void subscribe(Collection<String> topics) {
super.subscribe(topics);
List<TopicPartition> partitions = topics.stream().map(topic -> new TopicPartition(topic, 0)).collect(Collectors.toList());
Map<TopicPartition, Long> partitionsBeginningMap = new HashMap<>();
Map<TopicPartition, Long> partitionsEndMap = new HashMap<>();
for (TopicPartition partition : partitions) {
partitionsBeginningMap.put(partition, 0L);
partitionsEndMap.put(partition, 10L);
}
rebalance(partitions);
updateBeginningOffsets(partitionsBeginningMap);
updateEndOffsets(partitionsEndMap);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2023 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kie.kogito.serverless.workflow.executor;

import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.kie.kogito.persistence.rocksdb.RocksDBProcessInstancesFactory;
import org.rocksdb.Options;
import org.rocksdb.RocksDBException;

import com.fasterxml.jackson.databind.node.TextNode;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.jackson.JsonCloudEventData;
import io.serverlessworkflow.api.Workflow;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.kie.kogito.serverless.workflow.fluent.ActionBuilder.call;
import static org.kie.kogito.serverless.workflow.fluent.EventDefBuilder.eventDef;
import static org.kie.kogito.serverless.workflow.fluent.FunctionBuilder.expr;
import static org.kie.kogito.serverless.workflow.fluent.StateBuilder.callback;
import static org.kie.kogito.serverless.workflow.fluent.WorkflowBuilder.jsonObject;
import static org.kie.kogito.serverless.workflow.fluent.WorkflowBuilder.workflow;

public class PersistentApplicationTest {
@Test
void testCallbackSubscriberWithPersistence(@TempDir Path tempDir) throws InterruptedException, TimeoutException, RocksDBException {
final String eventType = "testSubscribe";
final String additionalData = "This has been injected by the event";
Workflow workflow = workflow("testCallback").start(callback(call(expr("concat", "{slogan:.slogan+\"er Beti\"}")), eventDef(eventType))).end().build();
try (StaticWorkflowApplication application =
StaticWorkflowApplication.create().processInstancesFactory(new RocksDBProcessInstancesFactory(new Options().setCreateIfMissing(true), tempDir.toString()))) {
String id = application.execute(workflow, jsonObject().put("slogan", "Viva ")).getId();
assertThat(application.variables(id).orElseThrow().getWorkflowdata()).doesNotContain(new TextNode(additionalData));
publish(eventType, buildCloudEvent(eventType, id)
.withData(JsonCloudEventData.wrap(jsonObject().put("additionalData", additionalData)))
.build());
assertThat(application.waitForFinish(id, Duration.ofSeconds(20)).orElseThrow().getWorkflowdata())
.isEqualTo(jsonObject().put("additionalData", additionalData).put("slogan", "Viva er Beti"));
await().atMost(Duration.ofSeconds(1)).pollInterval(Duration.ofMillis(50)).until(() -> application.variables(id).isEmpty());
}
}

private CloudEventBuilder buildCloudEvent(String eventType, String id) {
return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create(""))
.withType(eventType)
.withTime(OffsetDateTime.now())
.withExtension("kogitoprocrefid", id);
}

private void publish(String topic, CloudEvent event) {
MockConsumer<byte[], CloudEvent> mockConsumer = MockKafkaEventReceiverFactory.consumer;
Set<String> topics = mockConsumer.subscription();
assertThat(topics).contains(topic);
mockConsumer.addRecord(new ConsumerRecord<>(topic, 0, 0, new byte[0], event));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.kie.kogito.serverless.workflow.executor.MockKafkaEventEmitterFactory
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.kie.kogito.serverless.workflow.executor.MockKafkaEventReceiverFactory

0 comments on commit 06f4262

Please sign in to comment.