Skip to content

Commit

Permalink
new example: async-retry-counter
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanlukas committed Jun 6, 2024
1 parent 22d2832 commit 2a615d0
Show file tree
Hide file tree
Showing 7 changed files with 388 additions and 0 deletions.
60 changes: 60 additions & 0 deletions async-retry-counter/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.camunda.consulting</groupId>
<artifactId>async-retry-counter</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

<version.spring-boot-starter-camunda>8.5.4</version.spring-boot-starter-camunda>
</properties>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>3.3.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>io.camunda.spring</groupId>
<artifactId>spring-boot-starter-camunda</artifactId>
<version>${version.spring-boot-starter-camunda}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>io.camunda.spring</groupId>
<artifactId>spring-boot-starter-camunda-test</artifactId>
<version>${version.spring-boot-starter-camunda}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.camunda.consulting;

import io.camunda.zeebe.spring.client.annotation.Deployment;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
@Deployment(resources = "classpath*:*.bpmn")
public class ExampleApp {
public static void main(String[] args) {
SpringApplication.run(ExampleApp.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.camunda.consulting;

public class ResponseController {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.camunda.consulting;

import io.camunda.zeebe.client.api.response.ActivatedJob;
import io.camunda.zeebe.client.api.worker.JobClient;
import io.camunda.zeebe.spring.client.annotation.JobWorker;
import io.camunda.zeebe.spring.client.annotation.Variable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

@Component
public class RetryableJobWorker {
private static final Logger LOG = LoggerFactory.getLogger(RetryableJobWorker.class);

@JobWorker(autoComplete = false)
public void sendMessage(
@Variable Integer retryCounter, @Variable String callbackId, JobClient jobClient, ActivatedJob job
) {
LOG.info("retryCounter is {}", retryCounter);
if (retryCounter != null && retryCounter < 1) {
jobClient
.newFailCommand(job)
.retries(0)
.errorMessage("No retries left")
.variable("retryCounter", 1)
.send()
.join();
return;
}
if (callbackId == null) {
callbackId = UUID
.randomUUID()
.toString();
}
LOG.info("Setting callbackId {}", callbackId);
sendMessage();
int nextRetryCounter = Optional
.ofNullable(retryCounter)
.map(i -> i - 1)
.orElse(2);
LOG.info("Setting retryCounter {}", nextRetryCounter);
jobClient
.newCompleteCommand(job.getKey())
.variables(Map.of("retryCounter", nextRetryCounter, "callbackId", callbackId))
.send()
.join();
}

private void sendMessage() {
LOG.info("message sent");
}

}
3 changes: 3 additions & 0 deletions async-retry-counter/src/main/resources/application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
camunda:
client:
mode: simple
142 changes: 142 additions & 0 deletions async-retry-counter/src/main/resources/asnyc-retry-counter.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0n9tswy" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.23.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.5.0">
<bpmn:process id="AsyncRetryCounterProcess" name="Async Retry Counter" isExecutable="true">
<bpmn:startEvent id="AsyncMessagingRequiredStartEvent" name="Async messaging required">
<bpmn:outgoing>Flow_1g8wwcp</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:exclusiveGateway id="Gateway_1uhx8ty">
<bpmn:incoming>Flow_1g8wwcp</bpmn:incoming>
<bpmn:incoming>Flow_0n8m323</bpmn:incoming>
<bpmn:outgoing>Flow_0uebumt</bpmn:outgoing>
</bpmn:exclusiveGateway>
<bpmn:sequenceFlow id="Flow_1g8wwcp" sourceRef="AsyncMessagingRequiredStartEvent" targetRef="Gateway_1uhx8ty" />
<bpmn:sequenceFlow id="Flow_0uebumt" sourceRef="Gateway_1uhx8ty" targetRef="SendMessageTask" />
<bpmn:sendTask id="SendMessageTask" name="Send message">
<bpmn:extensionElements>
<zeebe:taskDefinition type="sendMessage" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_0uebumt</bpmn:incoming>
<bpmn:outgoing>Flow_0ga8sdi</bpmn:outgoing>
</bpmn:sendTask>
<bpmn:sequenceFlow id="Flow_0ga8sdi" sourceRef="SendMessageTask" targetRef="Gateway_0lo9ey2" />
<bpmn:eventBasedGateway id="Gateway_0lo9ey2">
<bpmn:incoming>Flow_0ga8sdi</bpmn:incoming>
<bpmn:outgoing>Flow_0fybakl</bpmn:outgoing>
<bpmn:outgoing>Flow_17w47wv</bpmn:outgoing>
</bpmn:eventBasedGateway>
<bpmn:intermediateCatchEvent id="ResponseReceivedEvent" name="Response received">
<bpmn:incoming>Flow_0fybakl</bpmn:incoming>
<bpmn:outgoing>Flow_0w6kefz</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_1fddpep" messageRef="Message_0s95vg4" />
</bpmn:intermediateCatchEvent>
<bpmn:sequenceFlow id="Flow_0fybakl" sourceRef="Gateway_0lo9ey2" targetRef="ResponseReceivedEvent" />
<bpmn:intermediateCatchEvent id="ErrorReceivedEvent" name="Error received">
<bpmn:incoming>Flow_17w47wv</bpmn:incoming>
<bpmn:outgoing>Flow_0t44whk</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_10g3cex" messageRef="Message_058d1q8" />
</bpmn:intermediateCatchEvent>
<bpmn:sequenceFlow id="Flow_17w47wv" sourceRef="Gateway_0lo9ey2" targetRef="ErrorReceivedEvent" />
<bpmn:sequenceFlow id="Flow_0t44whk" sourceRef="ErrorReceivedEvent" targetRef="DeferRetryEvent" />
<bpmn:intermediateCatchEvent id="DeferRetryEvent" name="Defer retry">
<bpmn:incoming>Flow_0t44whk</bpmn:incoming>
<bpmn:outgoing>Flow_0n8m323</bpmn:outgoing>
<bpmn:timerEventDefinition id="TimerEventDefinition_0dz87wj">
<bpmn:timeDuration xsi:type="bpmn:tFormalExpression">PT5S</bpmn:timeDuration>
</bpmn:timerEventDefinition>
</bpmn:intermediateCatchEvent>
<bpmn:sequenceFlow id="Flow_0n8m323" sourceRef="DeferRetryEvent" targetRef="Gateway_1uhx8ty" />
<bpmn:endEvent id="AsyncMessagingDoneEndEvent" name="Async messaging done">
<bpmn:incoming>Flow_0w6kefz</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0w6kefz" sourceRef="ResponseReceivedEvent" targetRef="AsyncMessagingDoneEndEvent" />
</bpmn:process>
<bpmn:message id="Message_0s95vg4" name="success">
<bpmn:extensionElements>
<zeebe:subscription correlationKey="=callbackId" />
</bpmn:extensionElements>
</bpmn:message>
<bpmn:message id="Message_058d1q8" name="fail">
<bpmn:extensionElements>
<zeebe:subscription correlationKey="=callbackId" />
</bpmn:extensionElements>
</bpmn:message>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="AsyncRetryCounterProcess">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="AsyncMessagingRequiredStartEvent">
<dc:Bounds x="179" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="154" y="142" width="87" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_1uhx8ty_di" bpmnElement="Gateway_1uhx8ty" isMarkerVisible="true">
<dc:Bounds x="265" y="92" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0p4e2cq_di" bpmnElement="SendMessageTask">
<dc:Bounds x="370" y="77" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_0093v8l_di" bpmnElement="Gateway_0lo9ey2">
<dc:Bounds x="525" y="92" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0m0k2pk_di" bpmnElement="ResponseReceivedEvent">
<dc:Bounds x="632" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="625" y="142" width="50" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0r7jt4i_di" bpmnElement="ErrorReceivedEvent">
<dc:Bounds x="632" y="212" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="615" y="255" width="70" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1dxbo7e_di" bpmnElement="DeferRetryEvent">
<dc:Bounds x="452" y="302" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="444" y="345" width="52" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1wyqlll_di" bpmnElement="AsyncMessagingDoneEndEvent">
<dc:Bounds x="732" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="707" y="142" width="87" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1g8wwcp_di" bpmnElement="Flow_1g8wwcp">
<di:waypoint x="215" y="117" />
<di:waypoint x="265" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0n8m323_di" bpmnElement="Flow_0n8m323">
<di:waypoint x="452" y="320" />
<di:waypoint x="290" y="320" />
<di:waypoint x="290" y="142" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0uebumt_di" bpmnElement="Flow_0uebumt">
<di:waypoint x="315" y="117" />
<di:waypoint x="370" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0ga8sdi_di" bpmnElement="Flow_0ga8sdi">
<di:waypoint x="470" y="117" />
<di:waypoint x="525" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0fybakl_di" bpmnElement="Flow_0fybakl">
<di:waypoint x="575" y="117" />
<di:waypoint x="632" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_17w47wv_di" bpmnElement="Flow_17w47wv">
<di:waypoint x="550" y="142" />
<di:waypoint x="550" y="230" />
<di:waypoint x="632" y="230" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0w6kefz_di" bpmnElement="Flow_0w6kefz">
<di:waypoint x="668" y="117" />
<di:waypoint x="732" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0t44whk_di" bpmnElement="Flow_0t44whk">
<di:waypoint x="668" y="230" />
<di:waypoint x="730" y="230" />
<di:waypoint x="730" y="320" />
<di:waypoint x="488" y="320" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.camunda.consulting;

import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.api.response.ProcessInstanceEvent;
import io.camunda.zeebe.process.test.api.ZeebeTestEngine;
import io.camunda.zeebe.process.test.filters.RecordStream;
import io.camunda.zeebe.process.test.filters.StreamFilter;
import io.camunda.zeebe.process.test.inspections.model.InspectedProcessInstance;
import io.camunda.zeebe.protocol.record.intent.JobIntent;
import io.camunda.zeebe.spring.test.ZeebeSpringTest;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.Duration;
import java.util.concurrent.TimeoutException;

import static io.camunda.zeebe.process.test.assertions.BpmnAssert.*;
import static io.camunda.zeebe.spring.test.ZeebeTestThreadSupport.*;

@ZeebeSpringTest
@SpringBootTest
public class ProcessTest {
@Autowired
ZeebeClient zeebeClient;
@Autowired
ZeebeTestEngine zeebeTestEngine;

@Test
void shouldRetryThreeTimes() throws InterruptedException, TimeoutException {
ProcessInstanceEvent process = zeebeClient
.newCreateInstanceCommand()
.bpmnProcessId("AsyncRetryCounterProcess")
.latestVersion()
.variable("callbackId", "123")
.send()
.join();
waitForProcessInstanceHasPassedElement(process, "SendMessageTask");
assertThat(process).hasVariableWithValue("retryCounter", 2);
zeebeClient
.newPublishMessageCommand()
.messageName("fail")
.correlationKey("123")
.send()
.join();
zeebeTestEngine.waitForIdleState(Duration.ofSeconds(10));
zeebeTestEngine.increaseTime(Duration.ofSeconds(5));
waitForProcessInstanceHasPassedElement(new InspectedProcessInstance(process.getProcessInstanceKey()),
"SendMessageTask",
Duration.ofSeconds(10),
2
);
assertThat(process).hasVariableWithValue("retryCounter", 1);
zeebeClient
.newPublishMessageCommand()
.messageName("fail")
.correlationKey("123")
.send()
.join();
zeebeTestEngine.waitForIdleState(Duration.ofSeconds(10));
zeebeTestEngine.increaseTime(Duration.ofSeconds(5));
waitForProcessInstanceHasPassedElement(new InspectedProcessInstance(process.getProcessInstanceKey()),
"SendMessageTask",
Duration.ofSeconds(10),
3
);
assertThat(process).hasVariableWithValue("retryCounter", 0);
zeebeClient
.newPublishMessageCommand()
.messageName("fail")
.correlationKey("123")
.send()
.join();
zeebeTestEngine.waitForIdleState(Duration.ofSeconds(10));
zeebeTestEngine.increaseTime(Duration.ofSeconds(5));
waitForProcessInstanceHasPassedElement(new InspectedProcessInstance(process.getProcessInstanceKey()),
"Gateway_1uhx8ty",
Duration.ofSeconds(10),
4
);
zeebeTestEngine.waitForIdleState(Duration.ofSeconds(10));
long incidentKey = assertThat(process)
.hasAnyIncidents()
.extractingLatestIncident()
.getIncidentKey();
long jobKey = StreamFilter
.jobRecords(RecordStream.of(zeebeTestEngine.getRecordStreamSource()))
.withIntent(JobIntent.FAILED)
.stream()
.findFirst()
.get()
.getKey();
zeebeClient
.newUpdateRetriesCommand(jobKey)
.retries(1)
.send()
.join();
zeebeClient
.newResolveIncidentCommand(incidentKey)
.send()
.join();
zeebeClient
.newPublishMessageCommand()
.messageName("success")
.correlationKey("123")
.send()
.join();
waitForProcessInstanceCompleted(process);
}
}

0 comments on commit 2a615d0

Please sign in to comment.