Skip to content

Commit

Permalink
Create tid (#68)
Browse files Browse the repository at this point in the history
* Refactoring code, more multi-threading. Hight creation throughput
  • Loading branch information
pierre-yves-monnet authored Jul 17, 2024
1 parent 0a85bc9 commit c1a2cf0
Show file tree
Hide file tree
Showing 41 changed files with 775 additions and 348 deletions.
13 changes: 12 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ The Unit Test section covers these goals.

The Load Test section covers these goals.


Process-Automator executes scenario. One scenario pilot a process.

It is possible to execute multiple at the same time to handle a use case like
Expand Down Expand Up @@ -313,3 +312,15 @@ automator.servers:
audience: ""
secret: "4BPUva1U4lDtoG2-torvAtx6w5RbHULUFhGZ-bBXOMWwZJG3d3VDlfPHjVO3Kz-N"
````
# Build
Rebuilt the image via
````
mvn clean install
mvn springboot:build-image
````
The docker image is build using the Dockerfile present on the root level.
Push the image to ghcr.io/camunda-community-hub/process-execution-automator:
4 changes: 2 additions & 2 deletions doc/loadtestscenario/resources/C8CrawlUrlScn.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"processId": "CrawlUrl",
"frequency": "PT30S",
"numberOfExecutions": "200",
"nbWorkers": "1",
"numberOfWorkers": "1",
"variables": {"urlNotFound": false},
"variablesOperation": {
"loopcrawl": "generaterandomlist(10)"
Expand All @@ -70,7 +70,7 @@
"processId": "CrawlUrl",
"frequency": "PT1M",
"numberOfExecutions": "1",
"nbWorkers": "1",
"numberOfWorkers": "1",
"variables": {"urlNotFound": true},
"variablesOperation": {
"loopcrawl": "generaterandomlist(5)"
Expand Down
4 changes: 2 additions & 2 deletions doc/loadtestscenario/resources/C8CrawlUrlScn250.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"processId": "CrawlUrl",
"frequency": "PT30S",
"numberOfExecutions": "250",
"nbWorkers": "1",
"numberOfWorkers": "1",
"variables": {"urlNotFound": false},
"variablesOperation": {
"loopcrawl": "generaterandomlist(10)"
Expand All @@ -70,7 +70,7 @@
"processId": "CrawlUrl",
"frequency": "PT1M",
"numberOfExecutions": "1",
"nbWorkers": "1",
"numberOfWorkers": "1",
"variables": {"urlNotFound": true},
"variablesOperation": {
"loopcrawl": "generaterandomlist(5)"
Expand Down
10 changes: 10 additions & 0 deletions doc/scenarioreference/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,16 @@ Example:
the variable `loopcrawl` will be a list of 500 random string.


**generateuniqueid(<Prefix>)**
Generate a unique sequential number.
The prefix is used to allo wmultiple counter
Example:
````
"tidblue": "generateuniqueid(blue)"
"tidred": "generateuniqueid(red)"
````
Variables `tidblue` and `tidred` got a unique id, each following a different counter.


## Verification

Expand Down
5 changes: 3 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@

<groupId>org.camunda.community.automator</groupId>
<artifactId>process-execution-automator</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>
<!-- Change the banner.txt version -->

<properties>
<java.version>17</java.version>
Expand Down Expand Up @@ -222,7 +223,7 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>3.0.6</version>
<!-- <version>3.0.6</version> -->
<configuration>
<mainClass>org.camunda.automator.AutomatorApplication</mainClass>
<classifier>exec</classifier>
Expand Down
13 changes: 7 additions & 6 deletions src/main/java/org/camunda/automator/AutomatorAPI.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public Scenario loadFromInputStream(InputStream scenarioInputStream, String orig
/**
* Search the engine from the scenario
*
* @param scenario scenario
* @param scenario scenario
* @param bpmnEngineList different engine configuration
* @return the engine, null if no engine exist, an exception if the connection is not possible
*/
Expand All @@ -76,13 +76,13 @@ public BpmnEngine getBpmnEngineFromScenario(Scenario scenario, BpmnEngineList bp
try {

if (scenario.getServerName() != null) {
return getBpmnEngine( bpmnEngineList.getByServerName(scenario.getServerName()), true);
return getBpmnEngine(bpmnEngineList.getByServerName(scenario.getServerName()), true);
}

return null;
} catch (AutomatorException e) {
logger.error("Can't connect the engine for the scenario [{}] serverName[{}]: {}",
scenario.getName(), scenario.getServerName(), e.getMessage());
logger.error("Can't connect the engine for the scenario [{}] serverName[{}]: {}", scenario.getName(),
scenario.getServerName(), e.getMessage());
throw e;
}

Expand Down Expand Up @@ -121,8 +121,9 @@ public RunResult executeScenario(BpmnEngine bpmnEngine, RunParameters runParamet
/* Deploy a process in the server */
/* ******************************************************************** */

public BpmnEngine getBpmnEngine(BpmnEngineList.BpmnServerDefinition serverDefinition,boolean logDebug) throws AutomatorException {
return BpmnEngineFactory.getInstance().getEngineFromConfiguration( serverDefinition, logDebug);
public BpmnEngine getBpmnEngine(BpmnEngineList.BpmnServerDefinition serverDefinition, boolean logDebug)
throws AutomatorException {
return BpmnEngineFactory.getInstance().getEngineFromConfiguration(serverDefinition, logDebug);
}

/**
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/org/camunda/automator/AutomatorCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,15 @@ private static List<File> detectRecursiveScenario(File folderRecursive) {
return listFiles;
}

/**
* To reduce the number of warning
*
* @param message message to log out
*/
private static void logOutLn(String message) {
System.out.println(message);
}

public void run(String[] args) {
if (!isRunningCLI)
return;
Expand Down Expand Up @@ -223,13 +232,4 @@ public void run(String[] args) {
}

public enum ACTION {RUN, RECURSIVE, VERIFY, RUNVERIFY, RECURSIVVERIFY}

/**
* To reduce the number of warning
*
* @param message message to log out
*/
private static void logOutLn(String message) {
System.out.println(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,15 @@ List<String> searchUserTasksByProcessInstance(String processInstanceId, String u
/**
* @param workerId workerId
* @param topic topic to register
* @param streamEnable true if the stream enable is open
* @param lockTime lock time for the job
* @param jobHandler C7: must implement ExternalTaskHandler. C8: must implement JobHandler
* @param backoffSupplier backOffStrategy
* @return list of Service Task
*/
RegisteredTask registerServiceTask(String workerId,
String topic,
boolean streamEnable,
Duration lockTime,
Object jobHandler,
FixedBackoffSupplier backoffSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public static BpmnEngineFactory getInstance() {
return bpmnEngineFactory;
}

public BpmnEngine getEngineFromConfiguration(BpmnEngineList.BpmnServerDefinition serverDefinition,
boolean logDebug)
public BpmnEngine getEngineFromConfiguration(BpmnEngineList.BpmnServerDefinition serverDefinition, boolean logDebug)
throws AutomatorException {
BpmnEngine engine = cacheEngine.get(serverDefinition.serverType);
if (engine != null)
Expand All @@ -45,7 +44,7 @@ public BpmnEngine getEngineFromConfiguration(BpmnEngineList.BpmnServerDefinition

case CAMUNDA_8 -> BpmnEngineCamunda8.getFromServerDefinition(serverDefinition, logDebug);

case CAMUNDA_8_SAAS -> BpmnEngineCamunda8.getFromServerDefinition( serverDefinition, logDebug);
case CAMUNDA_8_SAAS -> BpmnEngineCamunda8.getFromServerDefinition(serverDefinition, logDebug);

case DUMMY -> new BpmnEngineDummy(serverDefinition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void disconnection() throws AutomatorException {
/**
* Engine is ready. If not, a connection() method must be call
*
* @return
* @return true if the engine is ready
*/
public boolean isReady() {
if (count > 2)
Expand Down Expand Up @@ -274,6 +274,7 @@ public void executeUserTask(String userTaskId, String userId, Map<String, Object
@Override
public RegisteredTask registerServiceTask(String workerId,
String topic,
boolean streamEnable,
Duration lockTime,
Object jobHandler,
FixedBackoffSupplier backoffSupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.camunda.tasklist.exception.TaskListException;
import io.camunda.zeebe.client.ZeebeClient;
import io.camunda.zeebe.client.ZeebeClientBuilder;
import io.camunda.zeebe.client.api.command.ClientStatusException;
import io.camunda.zeebe.client.api.command.FinalCommandStep;
import io.camunda.zeebe.client.api.response.ActivateJobsResponse;
import io.camunda.zeebe.client.api.response.ActivatedJob;
Expand Down Expand Up @@ -63,15 +62,14 @@ public class BpmnEngineCamunda8 implements BpmnEngine {
public static final String THIS_IS_A_COMPLETE_IMPOSSIBLE_VARIABLE_NAME = "ThisIsACompleteImpossibleVariableName";
public static final int SEARCH_MAX_SIZE = 100;
private final Logger logger = LoggerFactory.getLogger(BpmnEngineCamunda8.class);

private BpmnEngineList.BpmnServerDefinition serverDefinition;
boolean hightFlowMode = false;
/**
* It is not possible to search user task for a specfic processInstance. So, to realize this, a marker is created in each process instance. Retrieving the user task,
* the process instance can be found and correction can be done
*/
Map<String, Long> cacheProcessInstanceMarker = new HashMap<>();
Random random = new Random(System.currentTimeMillis());
private BpmnEngineList.BpmnServerDefinition serverDefinition;
private ZeebeClient zeebeClient;
private CamundaOperateClient operateClient;
private CamundaTaskListClient taskClient;
Expand Down Expand Up @@ -195,6 +193,8 @@ public void connection() throws AutomatorException {

// connection is critical, so let build the analysis
StringBuilder analysis = new StringBuilder();


boolean isOk = true;

isOk = stillOk(serverDefinition.name, "ZeebeConnection", analysis, false, isOk);
Expand Down Expand Up @@ -234,7 +234,7 @@ public void connection() throws AutomatorException {
} catch (Exception e) {
zeebeClient = null;
throw new AutomatorException(
"Bad credential [" + serverDefinition.name + "] Analysis:" + analysis + " fail : " + e.getMessage());
"BadCredential[" + serverDefinition.name + "] Analysis:" + analysis + " : " + e.getMessage());
}

saOperate = new io.camunda.operate.auth.SaasAuthentication(serverDefinition.zeebeSaasClientId,
Expand Down Expand Up @@ -262,6 +262,9 @@ public void connection() throws AutomatorException {
throw new AutomatorException("Invalid configuration");

// ---------------- connection
boolean zeebeOk = false;
boolean operateOk = false;
boolean tasklistOk = false;
try {
isOk = stillOk(serverDefinition.workerExecutionThreads, "ExecutionThread", analysis, false, isOk);

Expand All @@ -281,7 +284,7 @@ public void connection() throws AutomatorException {
}

if (!isOk)
throw new AutomatorException("Invalid configuration " + analysis.toString());
throw new AutomatorException("Invalid configuration " + analysis);

clientBuilder.numJobWorkerExecutionThreads(serverDefinition.workerExecutionThreads);
clientBuilder.defaultJobWorkerMaxJobsActive(serverDefinition.workerMaxJobsActive);
Expand All @@ -294,6 +297,7 @@ public void connection() throws AutomatorException {

// Actually, if an error arrived, an exception is thrown
analysis.append(join != null ? "successfully," : "error");
zeebeOk = join != null;

isOk = stillOk(serverDefinition.operateUrl, "operateUrl", analysis, false, isOk);

Expand All @@ -302,6 +306,7 @@ public void connection() throws AutomatorException {
.authentication(saOperate)
.build();
analysis.append("successfully,");
operateOk = true;

// TaskList is not mandatory
if (serverDefinition.taskListUrl != null && !serverDefinition.taskListUrl.isEmpty()) {
Expand All @@ -312,14 +317,18 @@ public void connection() throws AutomatorException {
.authentication(saTaskList)
.build();
analysis.append("successfully,");
tasklistOk = true;
}
//get tasks assigned to demo
logger.info(analysis.toString());
logger.info("Zeebe: OK, Operate: OK, TaskList:OK " + analysis.toString());

} catch (Exception e) {
zeebeClient = null;
throw new AutomatorException(
"Can't connect to Server[" + serverDefinition.name + "] Analysis:" + analysis + " fail : " + e.getMessage());
throw new AutomatorException("NoConnection[" + serverDefinition.name // server name
+ "] Zeebe:" + (zeebeOk ? "OK" : "FAIL") // zeebe status
+ ", Operate:" + (operateOk ? "OK" : "FAIL") // Operate status
+ ", Tasklist:" + (tasklistOk ? "OK" : "FAIL") // taskList status
+ ", Analysis:" + analysis + " fail : " + e.getMessage());
}
}

Expand All @@ -330,7 +339,7 @@ public void disconnection() throws AutomatorException {
/**
* Engine is ready. If not, a connection() method must be call
*
* @return
* @return true if the engine is ready
*/
public boolean isReady() {
return zeebeClient != null;
Expand Down Expand Up @@ -501,6 +510,7 @@ public List<String> searchUserTasks(String userTaskId, int maxResult) throws Aut
@Override
public RegisteredTask registerServiceTask(String workerId,
String topic,
boolean streamEnable,
Duration lockTime,
Object jobHandler,
FixedBackoffSupplier backoffSupplier) {
Expand All @@ -520,6 +530,7 @@ public RegisteredTask registerServiceTask(String workerId,
.jobType(topic)
.handler((JobHandler) jobHandler)
.timeout(lockTime)
.streamEnabled(streamEnable) // according the parameter
.name(workerId);

if (backoffSupplier != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

public class BpmnEngineDummy implements BpmnEngine {

private final BpmnEngineList.BpmnServerDefinition serverDefinition;
private final Logger logger = LoggerFactory.getLogger(BpmnEngineDummy.class);

public BpmnEngineDummy(BpmnEngineList.BpmnServerDefinition serverDefinition) {
this.serverDefinition = serverDefinition;
}

@Override
Expand All @@ -30,9 +28,11 @@ public void init() {
}

public void connection() throws AutomatorException {
// nothing to do here
}

public void disconnection() throws AutomatorException {
// nothing to do here
}

/**
Expand Down Expand Up @@ -78,6 +78,7 @@ public void executeUserTask(String userTaskId, String userId, Map<String, Object
@Override
public RegisteredTask registerServiceTask(String workerId,
String topic,
boolean streamEnable,
Duration lockTime,
Object jobHandler,
FixedBackoffSupplier backoffSupplier) {
Expand Down
Loading

0 comments on commit c1a2cf0

Please sign in to comment.