Skip to content

Commit

Permalink
[Fix #3571] Adding onErrors support for starting event state (#3578)
Browse files Browse the repository at this point in the history
* [Fix #3571] Adding onErrors support for starting event state

* [Fix #3571] Adding IT test
  • Loading branch information
fjtirado authored Jul 16, 2024
1 parent bdf1715 commit d167ae1
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,16 @@ public MakeNodeResult makeNode(RuleFlowNodeContainerFactory<?, ?> factory) {
handleErrors(factory, embeddedContainer);
return new MakeNodeResult(embeddedContainer);
}

}

private MakeNodeResult processOnEvent(RuleFlowNodeContainerFactory<?, ?> factory, OnEvents onEvent) {
MakeNodeResult result = joinNodes(factory,
onEvent.getEventRefs(), (fact, onEventRef) -> filterAndMergeNode(fact, onEvent.getEventDataFilter(), getVarName(),
(f, inputVar, outputVar) -> buildEventNode(f, onEventRef, inputVar, outputVar)));
CompositeContextNodeFactory<?> embeddedSubProcess = handleActions(makeCompositeNode(factory), onEvent.getActions());
if (isStartState) {
handleErrors(factory, embeddedSubProcess);
}
connect(result.getOutgoingNode(), embeddedSubProcess);
return new MakeNodeResult(result.getIncomingNode(), embeddedSubProcess);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ quarkus.rest-client.enum-parameter_yaml.url=${enum-echo-service-mock.url}
# Error handling properties
kogito.sw.functions.publishPerfectSquare.host=localhost


mp.messaging.incoming.start.connector=quarkus-http
mp.messaging.incoming.start.path=/startWithError

mp.messaging.incoming.move.connector=quarkus-http
mp.messaging.incoming.move.path=/move

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
{
"id": "startEventError",
"version": "1.0",
"name": "Workflow event test",
"description": "An test of a starting event with error on action",
"start": "waitForEvent",
"events": [
{
"name": "startEvent",
"source": "",
"type": "start"
}
],
"errors": [
{
"name": "odd number",
"code": "Odd situation"
}
],
"functions": [
{
"name": "publishEvenError",
"type": "asyncapi",
"operation": "specs/callbackResults.yaml#sendEvenError"
},
{
"name": "isEven",
"type": "custom",
"operation": "service:java:org.kie.kogito.workflows.services.EvenService::isEven"
}
]
,
"states": [
{
"name": "waitForEvent",
"type": "event",
"onEvents": [
{
"eventRefs": [
"startEvent"
],
"actions": [
{
"name": "actionWithError",
"functionRef": {
"refName": "isEven",
"arguments": {
"number": ".number"
}
}
}
]

}
],
"onErrors": [
{
"errorRef": "odd number",
"transition": "PublishError"
}
],
"end":true
},
{
"name": "PublishError",
"type": "operation",
"actions": [
{
"name": "publishEvenError",
"functionRef": "publishEvenError"
}
],
"end": "true"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,13 @@ channels:
summary: Timeout Expired
message:
$ref: '#/components/messages/message'
sendEvenError:
description: A message channel for publishing errors
publish:
operationId: sendEvenError
summary: Reporting error
message:
$ref: '#/components/messages/message'
error:
description: A message channel for failed executions
publish:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@ static void init() {
}

@Test
void testStartingEventWithToStateFilter() {
void testStartingEventWithToStateFilter() throws IOException {
given()
.contentType(ContentType.JSON)
.when()
.body(CloudEventBuilder.v1()
.body(defaultMarshaller.marshall(CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("customer-arrival-event-source"))
.withType("customer-arrival-type")
.withTime(OffsetDateTime.now())
.withData(defaultMarshaller.cloudEventDataFactory().apply(Collections.singletonMap("customer", Map.of("name", "pepe")))).build())
.withData(defaultMarshaller.cloudEventDataFactory().apply(Collections.singletonMap("customer", Map.of("name", "pepe")))).build()))
.post("/eventWithToStateFilter")
.then()
.statusCode(202);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@
package org.kie.kogito.quarkus.workflows;

import java.io.IOException;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -28,8 +32,10 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.kie.kogito.event.CloudEventMarshaller;
import org.kie.kogito.event.Converter;
import org.kie.kogito.event.cloudevents.CloudEventExtensionConstants;
import org.kie.kogito.event.impl.ByteArrayCloudEventMarshaller;
import org.kie.kogito.event.impl.ByteArrayCloudEventUnmarshallerFactory;
import org.kie.kogito.test.quarkus.QuarkusTestProperty;
import org.kie.kogito.test.quarkus.kafka.KafkaTypedTestClient;
Expand All @@ -41,10 +47,14 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.jackson.JsonFormat;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusIntegrationTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;

import static io.restassured.RestAssured.given;
import static org.assertj.core.api.Assertions.assertThat;
import static org.kie.kogito.quarkus.workflows.AssuredTestUtils.startProcess;

Expand All @@ -59,13 +69,17 @@ public class EventTimedoutIT {
private ObjectMapper objectMapper;
private KafkaTypedTestClient<byte[], ByteArraySerializer, ByteArrayDeserializer> kafkaClient;

private static CloudEventMarshaller<byte[]> defaultMarshaller;

@BeforeEach
void setup() {
RestAssured.enableLoggingOfRequestAndResponseIfValidationFails();
kafkaClient = new KafkaTypedTestClient<>(kafkaBootstrapServers, ByteArraySerializer.class, ByteArrayDeserializer.class);
objectMapper = new ObjectMapper()
.registerModule(new JavaTimeModule())
.registerModule(JsonFormat.getCloudEventJacksonModule())
.disable(com.fasterxml.jackson.databind.SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
defaultMarshaller = new ByteArrayCloudEventMarshaller(objectMapper);
}

@AfterEach
Expand Down Expand Up @@ -93,4 +107,26 @@ void testTimedout() throws InterruptedException {
countDownLatch.await(10, TimeUnit.SECONDS);
assertThat(countDownLatch.getCount()).isZero();
}

@Test
void testStartEventWithError() throws InterruptedException, IOException {
given()
.contentType(ContentType.JSON)
.when()
.body(defaultMarshaller.marshall(CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create("source"))
.withType("start")
.withTime(OffsetDateTime.now())
.withData(defaultMarshaller.cloudEventDataFactory().apply(Collections.singletonMap("number", 3))).build()))
.post("/startWithError")
.then()
.statusCode(202);
final CountDownLatch countDownLatch = new CountDownLatch(1);
kafkaClient.consume("sendEvenError", v -> {
countDownLatch.countDown();
});
countDownLatch.await(10, TimeUnit.SECONDS);
assertThat(countDownLatch.getCount()).isZero();
}
}

0 comments on commit d167ae1

Please sign in to comment.