diff --git a/README.md b/README.md index 700a9d4..9a4a9e9 100644 --- a/README.md +++ b/README.md @@ -457,8 +457,7 @@ The docker image is build using the Dockerfile present on the root level. Push the image to ```` -docker build -t pierre-yves-monnet/process-execution-automator:1.8.0 . -docker build pycamunda/camunda-community-hub/process-execution-automator:1.8.0 +docker build -t pierre-yves-monnet/process-execution-automator:1.8.1 . ```` @@ -468,8 +467,12 @@ Push the image to the Camunda hub (you must be login first to the docker registr docker tag pierre-yves-monnet/process-execution-automator:1.8.0 ghcr.io/camunda-community-hub/process-execution-automator:1.8.0 docker push ghcr.io/camunda-community-hub/process-execution-automator:1.8.0 ```` -docker tag pierre-yves-monnet/process-execution-automator:1.8.0 pycamunda/camunda-hub:process-execution-automator-1.8.0 -docker push pycamunda/camunda-hub:process-execution-automator-1.8.0 + + +Temporary: +docker build -t pierre-yves-monnet/process-execution-automator:1.8.3 . +docker tag pierre-yves-monnet/process-execution-automator:1.8.3 pycamunda/camunda-hub:process-execution-automator-1.8.3 +docker push pycamunda/camunda-hub:process-execution-automator-1.8.3 diff --git a/doc/scenarioreference/C8CrawlUrl.bpmn b/doc/scenarioreference/C8CrawlUrl.bpmn index 5d946a7..e19d4e0 100644 --- a/doc/scenarioreference/C8CrawlUrl.bpmn +++ b/doc/scenarioreference/C8CrawlUrl.bpmn @@ -1,5 +1,5 @@ - + @@ -429,7 +429,7 @@ - + @@ -470,6 +470,45 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -526,45 +565,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - diff --git a/doc/unittestscenario/README.md b/doc/unittestscenario/README.md index 44a02c5..9691171 100644 --- a/doc/unittestscenario/README.md +++ b/doc/unittestscenario/README.md @@ -58,28 +58,35 @@ Check the scenario: ## execute -1. First, upload the scenario file in a config map + +1. Deploy the scenario on the cluster, via the Modeler + +2. Create the pod process-execution-automator ``` -kubectl create configmap scoreacceptancescn --from-file=doc/unittestscenario/resources/scoreacceptancescn.json -n camunda +kubectl create -f doc/unittestscenario/resources/UnittestAutomator.yaml -n camunda ``` -2. Deploy the scenario on the cluster, via the Modeler - -3. Create the pod process-execution-automator +3. Port forward ``` -kubectl create -f doc/unittestscenario/resources/UnittestAutomator.yaml -n camunda +kubectl port-forward svc/process-execution-automator 8381:8381 -n camunda ``` -This configuration will upload the scenario +4. Upload the scenario -4. Port forward ``` -kubectl port-forward svc/process-execution-automator 8381:8381 -n camunda +curl -X POST http://localhost:8381/api/files/upload \ + -H "Content-Type: multipart/form-data" \ + -F "file=@doc/unittestscenario/resources/ScoreAcceptanceScn.json" + +curl -X GET "http://localhost:8381/api/content/list" -H "Content-Type: application/json" + ``` + + 6. Check the scenario is uploaded ``` @@ -94,3 +101,11 @@ curl -X POST -F "file=@/path/to/your/file.txt" http://localhost:8080/api/files/u curl -X GET "http://localhost:8381/api/unittest/get?id=1732767184446" -H "Content-Type: application/json" ``` +Option: give the scenario via the configMap + +a. create the configMap +```` +kubectl create configmap scoreacceptancescn --from-file=doc/unittestscenario/resources/scoreacceptancescn.json -n camunda +```` + +b. Chheck the configuration diff --git a/doc/unittestscenario/SendUnitTestCommand.rest b/doc/unittestscenario/SendUnitTestCommand.rest index 92c0bcb..0dca8f6 100644 --- a/doc/unittestscenario/SendUnitTestCommand.rest +++ b/doc/unittestscenario/SendUnitTestCommand.rest @@ -29,11 +29,10 @@ Content-Type: application/json ### Upload file POST http://localhost:8381/api/content/add -Content-Type: multipart/form-data +Content-Type: multipart/form-data; boundary=boundary --boundary -Content-Disposition: form-data; name="File"; filename="file1.txt" -Content-Type: text/plain +Content-Disposition: form-data; name="FileToUpload"; filename="ScoreAcceptanceScn.json" < ./resources/ScoreAcceptanceScn.json diff --git a/doc/unittestscenario/resources/ScoreAcceptance.bpmn b/doc/unittestscenario/resources/ScoreAcceptance.bpmn new file mode 100644 index 0000000..f7f46ec --- /dev/null +++ b/doc/unittestscenario/resources/ScoreAcceptance.bpmn @@ -0,0 +1,127 @@ + + + + + Flow_1wuzgpt + + + + + + + Flow_1wuzgpt + Flow_0gi4xk6 + + + Flow_0gi4xk6 + Flow_16hd1bd + Flow_0a4sjzy + + + + Flow_16hd1bd + Flow_09z898p + + + =accepted + + + Flow_09z898p + + + + Flow_0wya675 + Flow_1pxztl2 + + + + Flow_1pxztl2 + + + + + + Flow_0a4sjzy + Flow_0wya675 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/doc/unittestscenario/resources/ScoreAcceptance.png b/doc/unittestscenario/resources/ScoreAcceptance.png new file mode 100644 index 0000000..7cf28e2 Binary files /dev/null and b/doc/unittestscenario/resources/ScoreAcceptance.png differ diff --git a/doc/unittestscenario/resources/ScoreAcceptanceScn.json b/doc/unittestscenario/resources/ScoreAcceptanceScn.json index cdd80e7..81546bb 100644 --- a/doc/unittestscenario/resources/ScoreAcceptanceScn.json +++ b/doc/unittestscenario/resources/ScoreAcceptanceScn.json @@ -69,6 +69,14 @@ "variables": { "score": 67 } + }, + { + "type": "USERTASK", + "taskId": "CallApplicant", + "processId": "ScoreAcceptance", + "variables": { + "phoneNumber": "(+1) 542 778 2352" + } } ], "verifications": { diff --git a/doc/unittestscenario/resources/UnittestAutomator.yaml b/doc/unittestscenario/resources/UnittestAutomator.yaml new file mode 100644 index 0000000..ec6b9bf --- /dev/null +++ b/doc/unittestscenario/resources/UnittestAutomator.yaml @@ -0,0 +1,68 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: process-execution-automator + labels: + app: process-execution-automator +spec: + selector: + matchLabels: + app: process-execution-automator + replicas: 1 + template: + metadata: + labels: + app: process-execution-automator + annotations: + prometheus.io/scrape: "true" + prometheus.io/port: "8088" + prometheus.io/path: "/actuator/prometheus" + spec: + containers: + - name: process-execution-automator + image: pycamunda/camunda-hub:process-execution-automator-1.8.7 + imagePullPolicy: Always + env: + - name: JAVA_TOOL_OPTIONS + value: >- + -Dautomator.servers.camunda8.zeebeGatewayAddress=camunda-zeebe-gateway:26500 + -Dautomator.servers.camunda8.operateUserName=demo + -Dautomator.servers.camunda8.operateUserPassword=demo + -Dautomator.servers.camunda8.operateUrl=http://camunda-operate:80 + -Dautomator.servers.camunda8.taskListUrl=http://camunda-tasklist:80 + -Dautomator.servers.camunda8.workerExecutionThreads=2000 + -Dautomator.startup.scenarioPath=/scenarii + -Dautomator.startup.contentAtStartup=file:/ScoreAcceptanceScn.json + -Dautomator.startup.logLevel=MAIN + resources: + limits: + cpu: "1" + memory: 2Gi + requests: + cpu: "1" + memory: 1Gi + volumeMounts: + - name: scenarii + mountPath: ScoreAcceptanceScn.json + subPath: ScoreAcceptanceScn.json + readOnly: true + volumes: + - name: scenarii + configMap: + name: scoreacceptancescn + +# Upload your scenario in this config map +# kubectl create configmap ScoreAcceptanceScn --from-file=ScoreAcceptanceScn.json -n camunda +--- +apiVersion: v1 +kind: Service +metadata: + name: process-execution-automator +spec: + selector: + app: process-execution-automator + ports: + - protocol: TCP + port: 8381 + targetPort: 8381 + type: ClusterIP diff --git a/pom.xml b/pom.xml index 3a78b70..4764ae2 100644 --- a/pom.xml +++ b/pom.xml @@ -18,9 +18,19 @@ ${java.version} ${java.version} + 8.5.7 + + 8.5.5 + + 7.19.0 5.9.1 @@ -55,23 +65,65 @@ + + + + + io.camunda + spring-boot-starter-camunda-sdk + 8.6.5 + + + + + io.camunda.spring + java-client-operate + 8.6.2 + + + + + + io.camunda + camunda-tasklist-client-java + 8.6.6 + + + org.camunda.bpm camunda-external-task-client ${camunda7.version} + + org.camunda.community + camunda-engine-rest-client-openapi-java + 7.18.0 + @@ -91,22 +143,12 @@ 2.10.1 - - org.camunda.community - camunda-engine-rest-client-openapi-java - 7.18.0 + org.apache.httpcomponents.client5 + httpclient5 - - - - io.camunda - camunda-tasklist-client-java - 8.5.3.5 - diff --git a/src/main/java/org/camunda/automator/AutomatorCLI.java b/src/main/java/org/camunda/automator/AutomatorCLI.java index 93665bc..39d4d34 100644 --- a/src/main/java/org/camunda/automator/AutomatorCLI.java +++ b/src/main/java/org/camunda/automator/AutomatorCLI.java @@ -16,7 +16,6 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.stereotype.Component; -import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; diff --git a/src/main/java/org/camunda/automator/AutomatorRest.java b/src/main/java/org/camunda/automator/AutomatorRest.java index ce9c260..5a7003d 100644 --- a/src/main/java/org/camunda/automator/AutomatorRest.java +++ b/src/main/java/org/camunda/automator/AutomatorRest.java @@ -29,10 +29,11 @@ public class AutomatorRest { public static final String JSON_STATUS = "status"; public static final String JSON_GENERAL_STATUS = "generalStatus"; public static final String JSON_NAME = "name"; + public static final String JSON_PROCESSINSTANCESID = "processInstancesId"; public static final String JSON_DETAIL = "detail"; public static final String JSON_MESSAGE = "message"; public static final String JSON_INFO = "info"; - private static final Logger logger = LoggerFactory.getLogger(ContentManager.class.getName()); + private static final Logger logger = LoggerFactory.getLogger(AutomatorRest.class.getName()); private final ConfigurationStartup configurationStartup; private final ContentManager contentManager; private final AutomatorAPI automatorAPI; @@ -48,6 +49,7 @@ public AutomatorRest(ConfigurationStartup configurationStartup, ContentManager c @PostMapping(value = "/api/unittest/run", produces = "application/json") public Map runUnitTest(@RequestParam(name = "name") String scenarioName, @RequestParam(name = "server", required = false) String serverName, @RequestParam(name = "wait", required = false) Boolean wait) { + logger.info("AutomatorRest: runUnitTest scenario[{}] Wait? {}", scenarioName, wait != null && wait); Map resultMap = new HashMap<>(); resultMap.put("senarioName", scenarioName); @@ -98,6 +100,7 @@ public List> getListUnitTest() { * Start a test */ private void startTest(String scenarioName, String serverName, String unitTestId) { + Map resultMap = new HashMap<>(); cacheExecution.put(unitTestId, resultMap); @@ -109,8 +112,7 @@ private void startTest(String scenarioName, String serverName, String unitTestId resultMap.put(JSON_SERVER_NAME, runParameters.getServerName()); resultMap.put(JSON_SCENARIO_NAME, scenarioName); - logger.info( - "Unit Test parameters serverName[{}] ", + logger.info("AutomatorRest: Start Test scenario[{}] unitTestId[{}] serverName[{}] ", scenarioName, unitTestId, runParameters.getServerName()); // now proceed the scenario @@ -120,7 +122,7 @@ private void startTest(String scenarioName, String serverName, String unitTestId try { scenario = automatorAPI.loadFromFile(scenarioFile); } catch (Exception e) { - logger.error("Error during accessing InputStream from File [{}]: {}", scenarioFile.toAbsolutePath().toString(), + logger.error("Error during accessing InputStream from File [{}]: {}", scenarioFile.toAbsolutePath(), e.getMessage()); } if (scenario == null) { @@ -171,6 +173,7 @@ private Map resultToJson(RunResult runResult) { } Map recordResult = new HashMap<>(); listVerificationsJson.add(recordResult); + recordResult.put(JSON_PROCESSINSTANCESID, String.join(", ", runResultUnit.getListProcessInstancesId())); recordResult.put(JSON_NAME, runResultUnit.getScnExecution().getName()); recordResult.put(JSON_STATUS, runResultUnit.isSuccess() ? StatusTest.SUCCESS : StatusTest.FAIL); recordResult.put(JSON_DETAIL, runResultUnit.getListVerifications().stream() diff --git a/src/main/java/org/camunda/automator/bpmnengine/BpmnEngine.java b/src/main/java/org/camunda/automator/bpmnengine/BpmnEngine.java index f2be874..b0490a2 100644 --- a/src/main/java/org/camunda/automator/bpmnengine/BpmnEngine.java +++ b/src/main/java/org/camunda/automator/bpmnengine/BpmnEngine.java @@ -128,7 +128,7 @@ RegisteredTask registerServiceTask(String workerId, * @return list of taskId * @throws AutomatorException in case of error */ - List searchServiceTasks(String processInstanceId, String serviceTaskId, String topic, int maxResult) + List activateServiceTasks(String processInstanceId, String serviceTaskId, String topic, int maxResult) throws AutomatorException; /** diff --git a/src/main/java/org/camunda/automator/bpmnengine/camunda7/BpmnEngineCamunda7.java b/src/main/java/org/camunda/automator/bpmnengine/camunda7/BpmnEngineCamunda7.java index 7f4cb16..2f12df7 100644 --- a/src/main/java/org/camunda/automator/bpmnengine/camunda7/BpmnEngineCamunda7.java +++ b/src/main/java/org/camunda/automator/bpmnengine/camunda7/BpmnEngineCamunda7.java @@ -287,7 +287,7 @@ public RegisteredTask registerServiceTask(String workerId, * @throws AutomatorException any error during search */ @Override - public List searchServiceTasks(String processInstanceId, String serviceTaskId, String topic, int maxResult) + public List activateServiceTasks(String processInstanceId, String serviceTaskId, String topic, int maxResult) throws AutomatorException { if (logDebug) { logger.info("BpmnEngine7.searchForActivity: Process[{}] taskName[{}]", processInstanceId, serviceTaskId); diff --git a/src/main/java/org/camunda/automator/bpmnengine/camunda8/BpmnEngineCamunda8.java b/src/main/java/org/camunda/automator/bpmnengine/camunda8/BpmnEngineCamunda8.java index 4084d77..9240704 100644 --- a/src/main/java/org/camunda/automator/bpmnengine/camunda8/BpmnEngineCamunda8.java +++ b/src/main/java/org/camunda/automator/bpmnengine/camunda8/BpmnEngineCamunda8.java @@ -1,34 +1,19 @@ package org.camunda.automator.bpmnengine.camunda8; -import io.camunda.common.auth.*; -import io.camunda.common.auth.identity.IdentityConfig; -import io.camunda.common.auth.identity.IdentityContainer; -import io.camunda.common.json.SdkObjectMapper; -import io.camunda.identity.sdk.Identity; -import io.camunda.identity.sdk.IdentityConfiguration; -import io.camunda.operate.CamundaOperateClient; -import io.camunda.operate.CamundaOperateClientBuilder; -import io.camunda.operate.exception.OperateException; -import io.camunda.operate.model.*; -import io.camunda.operate.search.*; -import io.camunda.tasklist.CamundaTaskListClient; -import io.camunda.tasklist.CamundaTaskListClientBuilder; + import io.camunda.tasklist.dto.Variable; -import io.camunda.tasklist.dto.*; -import io.camunda.tasklist.exception.TaskListException; import io.camunda.zeebe.client.ZeebeClient; import io.camunda.zeebe.client.ZeebeClientBuilder; -import io.camunda.zeebe.client.api.command.FinalCommandStep; -import io.camunda.zeebe.client.api.response.*; +import io.camunda.zeebe.client.api.response.DeploymentEvent; +import io.camunda.zeebe.client.api.response.ProcessInstanceEvent; +import io.camunda.zeebe.client.api.response.Topology; import io.camunda.zeebe.client.api.worker.JobHandler; import io.camunda.zeebe.client.api.worker.JobWorkerBuilderStep1; import io.camunda.zeebe.client.impl.oauth.OAuthCredentialsProvider; import io.camunda.zeebe.client.impl.oauth.OAuthCredentialsProviderBuilder; import org.camunda.automator.bpmnengine.BpmnEngine; -import org.camunda.automator.bpmnengine.camunda8.refactoring.RefactoredCommandWrapper; import org.camunda.automator.configuration.BpmnEngineList; import org.camunda.automator.definition.ScenarioDeployment; -import org.camunda.automator.definition.ScenarioStep; import org.camunda.automator.engine.AutomatorException; import org.camunda.automator.engine.flow.FixedBackoffSupplier; import org.slf4j.Logger; @@ -36,18 +21,14 @@ import java.io.File; import java.net.URI; -import java.net.URL; import java.time.Duration; import java.util.*; -import java.util.stream.Collectors; public class BpmnEngineCamunda8 implements BpmnEngine { public static final String THIS_IS_A_COMPLETE_IMPOSSIBLE_VARIABLE_NAME = "ThisIsACompleteImpossibleVariableName"; - public static final int SEARCH_MAX_SIZE = 100; public static final String SAAS_AUTHENTICATE_URL = "https://login.cloud.camunda.io/oauth/token"; private final Logger logger = LoggerFactory.getLogger(BpmnEngineCamunda8.class); - private final BenchmarkStartPiExceptionHandlingStrategy exceptionHandlingStrategy; boolean hightFlowMode = false; /** * It is not possible to search user task for a specific processInstance. So, to realize this, a marker is created in each process instance. Retrieving the user task, @@ -55,15 +36,16 @@ public class BpmnEngineCamunda8 implements BpmnEngine { */ Map cacheProcessInstanceMarker = new HashMap<>(); Random random = new Random(System.currentTimeMillis()); - private BpmnEngineList.BpmnServerDefinition serverDefinition; + private final BpmnEngineList.BpmnServerDefinition serverDefinition; private ZeebeClient zeebeClient; - private CamundaOperateClient operateClient; - private CamundaTaskListClient taskClient; - // Default - private BpmnEngineList.CamundaEngine typeCamundaEngine = BpmnEngineList.CamundaEngine.CAMUNDA_8; + private final TaskListClient taskListClient; + private final OperateClient operateClient; + - private BpmnEngineCamunda8(BenchmarkStartPiExceptionHandlingStrategy exceptionHandlingStrategy) { - this.exceptionHandlingStrategy = exceptionHandlingStrategy; + private BpmnEngineCamunda8(BpmnEngineList.BpmnServerDefinition serverDefinition, BenchmarkStartPiExceptionHandlingStrategy exceptionHandlingStrategy) { + this.serverDefinition = serverDefinition; + this.taskListClient = new TaskListClient(this); + this.operateClient = new OperateClient(this); } /** @@ -75,10 +57,7 @@ private BpmnEngineCamunda8(BenchmarkStartPiExceptionHandlingStrategy exceptionHa public static BpmnEngineCamunda8 getFromServerDefinition(BpmnEngineList.BpmnServerDefinition serverDefinition, BenchmarkStartPiExceptionHandlingStrategy benchmarkStartPiExceptionHandlingStrategy, boolean logDebug) { - BpmnEngineCamunda8 bpmnEngineCamunda8 = new BpmnEngineCamunda8(benchmarkStartPiExceptionHandlingStrategy); - bpmnEngineCamunda8.serverDefinition = serverDefinition; - return bpmnEngineCamunda8; - + return new BpmnEngineCamunda8(serverDefinition, benchmarkStartPiExceptionHandlingStrategy); } /** @@ -100,25 +79,21 @@ public static BpmnEngineCamunda8 getFromCamunda8(String zeebeSelfGatewayAddress, String operateUserPassword, String tasklistUrl, BenchmarkStartPiExceptionHandlingStrategy benchmarkStartPiExceptionHandlingStrategy) { - BpmnEngineCamunda8 bpmnEngineCamunda8 = new BpmnEngineCamunda8(benchmarkStartPiExceptionHandlingStrategy); - bpmnEngineCamunda8.serverDefinition = new BpmnEngineList.BpmnServerDefinition(); - bpmnEngineCamunda8.serverDefinition.serverType = BpmnEngineList.CamundaEngine.CAMUNDA_8; - bpmnEngineCamunda8.serverDefinition = new BpmnEngineList.BpmnServerDefinition(); - bpmnEngineCamunda8.serverDefinition.zeebeGatewayAddress = zeebeSelfGatewayAddress; - bpmnEngineCamunda8.serverDefinition.zeebeGrpcAddress = zeebeGrpcAddress; - bpmnEngineCamunda8.serverDefinition.zeebeRestAddress = zeebeRestAddress; - bpmnEngineCamunda8.serverDefinition.zeebePlainText = zeebePlainText; - - + BpmnEngineList.BpmnServerDefinition serverDefinition = new BpmnEngineList.BpmnServerDefinition(); + serverDefinition.serverType = BpmnEngineList.CamundaEngine.CAMUNDA_8; + serverDefinition.zeebeGatewayAddress = zeebeSelfGatewayAddress; + serverDefinition.zeebeGrpcAddress = zeebeGrpcAddress; + serverDefinition.zeebeRestAddress = zeebeRestAddress; + serverDefinition.zeebePlainText = zeebePlainText; /* * Connection to Operate */ - bpmnEngineCamunda8.serverDefinition.operateUserName = operateUserName; - bpmnEngineCamunda8.serverDefinition.operateUserPassword = operateUserPassword; - bpmnEngineCamunda8.serverDefinition.operateUrl = operateUrl; - bpmnEngineCamunda8.serverDefinition.taskListUrl = tasklistUrl; - return bpmnEngineCamunda8; + serverDefinition.operateUserName = operateUserName; + serverDefinition.operateUserPassword = operateUserPassword; + serverDefinition.operateUrl = operateUrl; + serverDefinition.taskListUrl = tasklistUrl; + return new BpmnEngineCamunda8(serverDefinition, benchmarkStartPiExceptionHandlingStrategy); } /** @@ -144,29 +119,29 @@ public static BpmnEngineCamunda8 getFromCamunda8SaaS(String zeebeSaasCloudRegion String operateUserPassword, String tasklistUrl, BenchmarkStartPiExceptionHandlingStrategy benchmarkStartPiExceptionHandlingStrategy) { - BpmnEngineCamunda8 bpmnEngineCamunda8 = new BpmnEngineCamunda8(benchmarkStartPiExceptionHandlingStrategy); - bpmnEngineCamunda8.serverDefinition = new BpmnEngineList.BpmnServerDefinition(); - bpmnEngineCamunda8.serverDefinition.serverType = BpmnEngineList.CamundaEngine.CAMUNDA_8; + + BpmnEngineList.BpmnServerDefinition serverDefinition = new BpmnEngineList.BpmnServerDefinition(); + serverDefinition.serverType = BpmnEngineList.CamundaEngine.CAMUNDA_8; /* * SaaS Zeebe */ - bpmnEngineCamunda8.serverDefinition.zeebeSaasRegion = zeebeSaasCloudRegion; - bpmnEngineCamunda8.serverDefinition.zeebeSaasClusterId = zeebeSaasCloudClusterId; - bpmnEngineCamunda8.serverDefinition.zeebeClientId = zeebeSaasCloudClientId; - bpmnEngineCamunda8.serverDefinition.zeebeClientSecret = zeebeSaasClientSecret; - bpmnEngineCamunda8.serverDefinition.authenticationUrl = zeebeSaasAuthenticationUrl; - bpmnEngineCamunda8.serverDefinition.zeebeAudience = zeebeSaasAudience; + serverDefinition.zeebeSaasRegion = zeebeSaasCloudRegion; + serverDefinition.zeebeSaasClusterId = zeebeSaasCloudClusterId; + serverDefinition.zeebeClientId = zeebeSaasCloudClientId; + serverDefinition.zeebeClientSecret = zeebeSaasClientSecret; + serverDefinition.authenticationUrl = zeebeSaasAuthenticationUrl; + serverDefinition.zeebeAudience = zeebeSaasAudience; /* * Connection to Operate */ - bpmnEngineCamunda8.serverDefinition.operateUserName = operateUserName; - bpmnEngineCamunda8.serverDefinition.operateUserPassword = operateUserPassword; - bpmnEngineCamunda8.serverDefinition.operateUrl = operateUrl; - bpmnEngineCamunda8.serverDefinition.taskListUrl = tasklistUrl; - return bpmnEngineCamunda8; + serverDefinition.operateUserName = operateUserName; + serverDefinition.operateUserPassword = operateUserPassword; + serverDefinition.operateUrl = operateUrl; + serverDefinition.taskListUrl = tasklistUrl; + return new BpmnEngineCamunda8(serverDefinition, benchmarkStartPiExceptionHandlingStrategy); } @Override @@ -175,13 +150,11 @@ public void init() { } public void connection() throws AutomatorException { - - this.typeCamundaEngine = this.serverDefinition.serverType; StringBuilder analysis = new StringBuilder(); try { connectZeebe(analysis); - connectOperate(analysis); - connectTaskList(analysis); + operateClient.connectOperate(analysis); + taskListClient.connectTaskList(analysis); logger.info("Zeebe: OK, Operate: OK, TaskList:OK {}", analysis); } catch (AutomatorException e) { @@ -218,27 +191,26 @@ public void turnHighFlowMode(boolean highFlowMode) { this.hightFlowMode = highFlowMode; } + public boolean isHightFlowMode() { + return hightFlowMode; + } + @Override public String createProcessInstance(String processId, String starterEventId, Map variables) throws AutomatorException { try { String marker = null; if (!hightFlowMode) { - marker = getUniqueMarker(processId, starterEventId); + marker = getUniqueMarker(processId); variables.put(THIS_IS_A_COMPLETE_IMPOSSIBLE_VARIABLE_NAME, marker); } - FinalCommandStep createCommand = zeebeClient.newCreateInstanceCommand() + ProcessInstanceEvent processInstanceEvent = zeebeClient.newCreateInstanceCommand() .bpmnProcessId(processId) .latestVersion() - .variables(variables); - RefactoredCommandWrapper command = new RefactoredCommandWrapper(createCommand, - System.currentTimeMillis() + 5 * 60 * 1000, - // 5 minutes - "CreatePi" + processId, exceptionHandlingStrategy); - - ProcessInstanceEvent workflowInstanceEvent = (ProcessInstanceEvent) command.executeSync(); - Long processInstanceId = workflowInstanceEvent.getProcessInstanceKey(); + .variables(variables) + .send().join(); + Long processInstanceId = processInstanceEvent.getProcessInstanceKey(); if (!hightFlowMode) { cacheProcessInstanceMarker.put(marker, processInstanceId); } @@ -253,7 +225,7 @@ public String createProcessInstanceDirect(String processId, String starterEventI try { String marker = null; if (!hightFlowMode) { - marker = getUniqueMarker(processId, starterEventId); + marker = getUniqueMarker(processId); variables.put(THIS_IS_A_COMPLETE_IMPOSSIBLE_VARIABLE_NAME, marker); } @@ -295,76 +267,12 @@ public void endProcessInstance(String processInstanceId, boolean cleanAll) throw @Override public List searchUserTasksByProcessInstance(String processInstanceId, String userTaskId, int maxResult) throws AutomatorException { - try { - // impossible to filter by the task name/ task type, so be ready to get a lot of flowNode and search the correct one - Long processInstanceIdLong = Long.valueOf(processInstanceId); - - TaskSearch taskSearch = new TaskSearch(); - taskSearch.setState(TaskState.CREATED); - taskSearch.setAssigned(Boolean.FALSE); - taskSearch.setWithVariables(true); - taskSearch.setPagination(new Pagination().setPageSize(maxResult)); - - TaskList tasksList = taskClient.getTasks(taskSearch); - boolean getAllTasks = tasksList.size() < maxResult; - List listTasksResult = new ArrayList<>(); - do { - if (!hightFlowMode) { - // We check that the task is the one expected - listTasksResult.addAll(tasksList.getItems().stream().filter(t -> { - List listVariables = t.getVariables(); - Optional markerTask = listVariables.stream() - .filter(v -> v.getName().equals(THIS_IS_A_COMPLETE_IMPOSSIBLE_VARIABLE_NAME)) - .findFirst(); - if (markerTask.isEmpty()) - return false; - Long processInstanceIdTask = cacheProcessInstanceMarker.get(markerTask.get().getValue()); - return (processInstanceIdLong.equals(processInstanceIdTask)); - }).map(Task::getId) // Task to ID - .toList()); - } else { - listTasksResult.addAll(tasksList.getItems().stream() - .map(Task::getId) // Task to ID - .toList()); - } - - if (tasksList.size() > 0 && !getAllTasks) - tasksList = taskClient.after(tasksList); - } while (tasksList.size() > 0 && !getAllTasks); - - return listTasksResult; - - } catch (TaskListException e) { - throw new AutomatorException("Can't search users task " + e.getMessage()); - } + return taskListClient.searchUserTasksByProcessInstance(processInstanceId, userTaskId, maxResult); } @Override public List searchUserTasks(String userTaskId, int maxResult) throws AutomatorException { - try { - // impossible to filter by the task name/ task type, so be ready to get a lot of flowNode and search the correct one - - TaskSearch taskSearch = new TaskSearch(); - taskSearch.setState(TaskState.CREATED); - taskSearch.setAssigned(Boolean.FALSE); - taskSearch.setWithVariables(true); - taskSearch.setPagination(new Pagination().setPageSize(maxResult)); - - TaskList tasksList = taskClient.getTasks(taskSearch); - List listTasksResult = new ArrayList<>(); - do { - listTasksResult.addAll(tasksList.getItems().stream().map(Task::getId) // Task to ID - .toList()); - - if (tasksList.size() > 0) - tasksList = taskClient.after(tasksList); - } while (tasksList.size() > 0); - - return listTasksResult; - - } catch (TaskListException e) { - throw new AutomatorException("Can't search users task " + e.getMessage()); - } + return taskListClient.searchUserTasks(userTaskId, maxResult); } /* ******************************************************************** */ @@ -414,57 +322,13 @@ public RegisteredTask registerServiceTask(String workerId, @Override public void executeUserTask(String userTaskId, String userId, Map variables) throws AutomatorException { - try { - taskClient.claim(userTaskId, serverDefinition.operateUserName); - taskClient.completeTask(userTaskId, variables); - } catch (TaskListException e) { - throw new AutomatorException("Can't execute task [" + userTaskId + "]"); - } catch (Exception e) { - throw new AutomatorException("Can't execute task [" + userTaskId + "]"); - } + taskListClient.executeUserTask(userTaskId, userId, variables); } @Override - public List searchServiceTasks(String processInstanceId, String serviceTaskId, String topic, int maxResult) + public List activateServiceTasks(String processInstanceId, String serviceTaskId, String topic, int maxResult) throws AutomatorException { - try { - if (operateClient == null) { - throw new AutomatorException("No Operate connection was provided"); - } - long processInstanceIdLong = Long.parseLong(processInstanceId); - - ProcessInstanceFilter processInstanceFilter = ProcessInstanceFilter.builder() - .parentKey(processInstanceIdLong) - .build(); - - SearchQuery processInstanceQuery = new SearchQuery.Builder().filter(processInstanceFilter).size(100).build(); - - List listProcessInstances = operateClient.searchProcessInstances(processInstanceQuery); - Set setProcessInstances = listProcessInstances.stream() - .map(ProcessInstance::getKey) - .collect(Collectors.toSet()); - setProcessInstances.add(processInstanceIdLong); - - ActivateJobsResponse jobsResponse = zeebeClient.newActivateJobsCommand() - .jobType(topic) - .maxJobsToActivate(10000) - .workerName(Thread.currentThread().getName()) - .send() - .join(); - List listJobsId = new ArrayList<>(); - - for (ActivatedJob job : jobsResponse.getJobs()) { - if (setProcessInstances.contains(job.getProcessInstanceKey())) - listJobsId.add(String.valueOf(job.getKey())); - else { - zeebeClient.newFailCommand(job.getKey()).retries(2).send().join(); - } - } - return listJobsId; - - } catch (Exception e) { - throw new AutomatorException("Can't search service task topic[" + topic + "] : " + e.getMessage()); - } + return operateClient.activateServiceTasks(processInstanceId, serviceTaskId, topic, maxResult); } @@ -486,13 +350,23 @@ public void executeServiceTask(String serviceTaskId, String workerId, Map variables) throws AutomatorException { try { - zeebeClient.newThrowErrorCommand(Long.valueOf(serviceTaskId)) + zeebeClient.newThrowErrorCommand(Long.parseLong(serviceTaskId)) .errorCode(errorCode) .errorMessage(errorMessage) .variables(variables) @@ -513,120 +387,19 @@ public void throwBpmnServiceTask(String serviceTaskId, @Override public List searchTasksByProcessInstanceId(String processInstanceId, String filterTaskId, int maxResult) throws AutomatorException { - try { - if (operateClient == null) { - throw new AutomatorException("No Operate connection was provided"); - } - - // impossible to filter by the task name/ task tyoe, so be ready to get a lot of flowNode and search the correct onee - FlowNodeInstanceFilter flownodeFilter = FlowNodeInstanceFilter.builder() - .processInstanceKey(Long.valueOf(processInstanceId)) - .build(); - - SearchQuery flowNodeQuery = new SearchQuery.Builder().filter(flownodeFilter).size(maxResult).build(); - List flowNodes = operateClient.searchFlowNodeInstances(flowNodeQuery); - return flowNodes.stream() - .filter(t -> filterTaskId == null || filterTaskId.equals(t.getFlowNodeId())) // Filter by name - .map(t -> { - TaskDescription taskDescription = new TaskDescription(); - taskDescription.taskId = t.getFlowNodeId(); - taskDescription.processInstanceId = String.valueOf(t.getProcessInstanceKey()); - taskDescription.startDate = t.getStartDate(); - taskDescription.endDate = t.getEndDate(); - taskDescription.type = getTaskType(t.getType()); // to implement - taskDescription.isCompleted = FlowNodeInstanceState.COMPLETED.equals(t.getState()); // to implement - return taskDescription; - }).toList(); - - } catch (OperateException e) { - throw new AutomatorException("Can't search users task " + e.getMessage()); - } + return operateClient.searchTasksByProcessInstanceId(processInstanceId, filterTaskId, maxResult); } public List searchProcessInstanceByVariable(String processId, Map filterVariables, int maxResult) throws AutomatorException { - try { - if (operateClient == null) { - throw new AutomatorException("No Operate connection was provided"); - } - - // impossible to filter by the task name/ task tyoe, so be ready to get a lot of flowNode and search the correct onee - ProcessInstanceFilter processInstanceFilter = ProcessInstanceFilter.builder().bpmnProcessId(processId).build(); - - SearchQuery processInstanceQuery = new SearchQuery.Builder().filter(processInstanceFilter) - .size(maxResult) - .build(); - List listProcessInstances = operateClient.searchProcessInstances(processInstanceQuery); - - List listProcessInstanceFind = new ArrayList<>(); - // now, we have to filter based on variableName/value - - for (ProcessInstance processInstance : listProcessInstances) { - Map processVariables = getVariables(processInstance.getKey().toString()); - List> entriesNotFiltered = filterVariables.entrySet() - .stream() - .filter( - t -> processVariables.containsKey(t.getKey()) && processVariables.get(t.getKey()).equals(t.getValue())) - .toList(); - - if (entriesNotFiltered.isEmpty()) { - - ProcessDescription processDescription = new ProcessDescription(); - processDescription.processInstanceId = processInstance.getKey().toString(); - - listProcessInstanceFind.add(processDescription); - } - } - return listProcessInstanceFind; - } catch (OperateException e) { - throw new AutomatorException("Can't search users task " + e.getMessage()); - } + return operateClient.searchProcessInstanceByVariable(processId, filterVariables, maxResult); } - private ScenarioStep.Step getTaskType(String taskTypeC8) { - if (taskTypeC8.equals("SERVICE_TASK")) - return ScenarioStep.Step.SERVICETASK; - else if (taskTypeC8.equals("USER_TASK")) - return ScenarioStep.Step.USERTASK; - else if (taskTypeC8.equals("START_EVENT")) - return ScenarioStep.Step.STARTEVENT; - else if (taskTypeC8.equals("END_EVENT")) - return ScenarioStep.Step.ENDEVENT; - else if (taskTypeC8.equals("EXCLUSIVE_GATEWAY")) - return ScenarioStep.Step.EXCLUSIVEGATEWAY; - else if (taskTypeC8.equals("PARALLEL_GATEWAY")) - return ScenarioStep.Step.PARALLELGATEWAY; - else if (taskTypeC8.equals("TASK")) - return ScenarioStep.Step.TASK; - else if (taskTypeC8.equals("SCRIPT_TASK")) - return ScenarioStep.Step.SCRIPTTASK; - - return null; - } @Override public Map getVariables(String processInstanceId) throws AutomatorException { - try { - if (operateClient == null) { - throw new AutomatorException("No Operate connection was provided"); - } - - // impossible to filter by the task name/ task tyoe, so be ready to get a lot of flowNode and search the correct onee - VariableFilter variableFilter = VariableFilter.builder() - .processInstanceKey(Long.valueOf(processInstanceId)) - .build(); - - SearchQuery variableQuery = new SearchQuery.Builder().filter(variableFilter).build(); - List listVariables = operateClient.searchVariables(variableQuery); - - Map variables = new HashMap<>(); - listVariables.forEach(t -> variables.put(t.getName(), t.getValue())); - - return variables; - } catch (OperateException e) { - throw new AutomatorException("Can't search variables task " + e.getMessage()); - } + return operateClient.getVariables(processInstanceId); } /* ******************************************************************** */ @@ -636,69 +409,12 @@ public Map getVariables(String processInstanceId) throws Automat /* ******************************************************************** */ public long countNumberOfProcessInstancesCreated(String processId, Date startDate, Date endDate) throws AutomatorException { - if (operateClient == null) { - throw new AutomatorException("No Operate connection was provided"); - } - - SearchQuery.Builder queryBuilder = new SearchQuery.Builder(); - try { - int cumul = 0; - SearchResult searchResult = null; - queryBuilder = queryBuilder.filter(ProcessInstanceFilter.builder().bpmnProcessId(processId).build()); - queryBuilder.sort(new Sort("key", SortOrder.ASC)); - int maxLoop = 0; - do { - maxLoop++; - if (searchResult != null && !searchResult.getItems().isEmpty()) { - queryBuilder.searchAfter(searchResult.getSortValues()); - } - SearchQuery searchQuery = queryBuilder.build(); - searchQuery.setSize(SEARCH_MAX_SIZE); - searchResult = operateClient.searchProcessInstanceResults(searchQuery); - - cumul += searchResult.getItems().stream().filter(t -> t.getStartDate().after(startDate)).count(); - - } while (searchResult.getItems().size() >= SEARCH_MAX_SIZE && maxLoop < 1000); - return cumul; - } catch (Exception e) { - throw new AutomatorException("Search countNumberProcessInstanceCreated " + e.getMessage()); - } + return operateClient.countNumberOfProcessInstancesCreated(processId, startDate, endDate); } public long countNumberOfProcessInstancesEnded(String processId, Date startDate, Date endDate) throws AutomatorException { - if (operateClient == null) { - throw new AutomatorException("No Operate connection was provided"); - } - - SearchQuery.Builder queryBuilder = new SearchQuery.Builder(); - try { - int cumul = 0; - SearchResult searchResult = null; - - queryBuilder = queryBuilder.filter(ProcessInstanceFilter.builder().bpmnProcessId(processId) - // .startDate(startDate) - // .endDate(endDate) - .state(ProcessInstanceState.COMPLETED).build()); - - queryBuilder.sort(new Sort("key", SortOrder.ASC)); - int maxLoop = 0; - do { - maxLoop++; - if (searchResult != null && !searchResult.getItems().isEmpty()) { - queryBuilder.searchAfter(searchResult.getSortValues()); - } - SearchQuery searchQuery = queryBuilder.build(); - searchQuery.setSize(SEARCH_MAX_SIZE); - searchResult = operateClient.searchProcessInstanceResults(searchQuery); - cumul += searchResult.getItems().stream().filter(t -> t.getStartDate().after(startDate)).count(); - - } while (searchResult.getItems().size() >= SEARCH_MAX_SIZE && maxLoop < 1000); - return cumul; - - } catch (Exception e) { - throw new AutomatorException("Search countNumberProcessEnded " + e.getMessage()); - } + return operateClient.countNumberOfProcessInstancesEnded(processId, startDate, endDate); } /* ******************************************************************** */ /* */ @@ -707,33 +423,7 @@ public long countNumberOfProcessInstancesEnded(String processId, Date startDate, /* ******************************************************************** */ public long countNumberOfTasks(String processId, String taskId) throws AutomatorException { - if (operateClient == null) { - throw new AutomatorException("No Operate connection was provided"); - } - - try { - - int cumul = 0; - SearchResult searchResult = null; - int maxLoop = 0; - do { - maxLoop++; - - SearchQuery.Builder queryBuilder = new SearchQuery.Builder(); - queryBuilder = queryBuilder.filter(FlowNodeInstanceFilter.builder().flowNodeId(taskId).build()); - queryBuilder.sort(new Sort("key", SortOrder.ASC)); - if (searchResult != null && !searchResult.getItems().isEmpty()) { - queryBuilder.searchAfter(searchResult.getSortValues()); - } - SearchQuery searchQuery = queryBuilder.build(); - searchQuery.setSize(SEARCH_MAX_SIZE); - searchResult = operateClient.searchFlowNodeInstanceResults(searchQuery); - cumul += (long) searchResult.getItems().size(); - } while (searchResult.getItems().size() >= SEARCH_MAX_SIZE && maxLoop < 1000); - return cumul; - } catch (Exception e) { - throw new AutomatorException("Search countNumberProcessEnded " + e.getMessage()); - } + return operateClient.countNumberOfTasks(processId, taskId); } @@ -759,13 +449,13 @@ public String deployBpmn(File processFile, ScenarioDeployment.Policy policy) thr @Override public BpmnEngineList.CamundaEngine getTypeCamundaEngine() { - return typeCamundaEngine; + return serverDefinition.serverType; } @Override public String getSignature() { - String signature = typeCamundaEngine.toString() + " "; - if (typeCamundaEngine.equals(BpmnEngineList.CamundaEngine.CAMUNDA_8_SAAS)) + String signature = serverDefinition.serverType.toString() + " "; + if (serverDefinition.serverType.equals(BpmnEngineList.CamundaEngine.CAMUNDA_8_SAAS)) signature += "Cloud ClientId[" + serverDefinition.zeebeClientId + "] ClusterId[" + serverDefinition.zeebeSaasClusterId + "]"; @@ -781,7 +471,7 @@ public int getWorkerExecutionThreads() { return serverDefinition != null ? serverDefinition.workerExecutionThreads : 0; } - private String getUniqueMarker(String processId, String starterEventId) { + private String getUniqueMarker(String processId) { return processId + "-" + random.nextInt(1000000); } @@ -789,8 +479,15 @@ public ZeebeClient getZeebeClient() { return zeebeClient; } - - + protected Long getProcessInstanceIdFromMarker(List listVariables) { + Optional markerOptional = listVariables.stream() + .filter(v -> v.getName().equals(THIS_IS_A_COMPLETE_IMPOSSIBLE_VARIABLE_NAME)) + .findFirst(); + if (markerOptional.isEmpty()) + return null; + String marker = (String) markerOptional.get().getValue(); + return cacheProcessInstanceMarker.get(marker); + } /* ******************************************************************** */ /* */ /* Connection to each component */ @@ -804,12 +501,11 @@ private void connectZeebe(StringBuilder analysis) throws AutomatorException { boolean isOk = true; isOk = stillOk(serverDefinition.name, "ZeebeConnection", analysis, false, true, isOk); - this.typeCamundaEngine = this.serverDefinition.serverType; ZeebeClientBuilder clientBuilder; // ---------------------------- Camunda Saas - if (BpmnEngineList.CamundaEngine.CAMUNDA_8_SAAS.equals(this.typeCamundaEngine)) { + if (BpmnEngineList.CamundaEngine.CAMUNDA_8_SAAS.equals(serverDefinition.serverType)) { analysis.append("SaaS;"); String gatewayAddressCloud = @@ -847,7 +543,7 @@ private void connectZeebe(StringBuilder analysis) throws AutomatorException { } //---------------------------- Camunda 8 Self Manage - else if (BpmnEngineList.CamundaEngine.CAMUNDA_8.equals(this.typeCamundaEngine)) { + else if (BpmnEngineList.CamundaEngine.CAMUNDA_8.equals(serverDefinition.serverType)) { analysis.append("SelfManage;"); isOk = stillOk(serverDefinition.zeebeGatewayAddress, "GatewayAddress", analysis, true, true, isOk); if (serverDefinition.isAuthenticationUrl()) { @@ -948,278 +644,9 @@ else if (BpmnEngineList.CamundaEngine.CAMUNDA_8.equals(this.typeCamundaEngine)) } } - /** - * Connect Operate - * - * @param analysis to cpmplete the analysis - * @throws AutomatorException in case of error - */ - private void connectOperate(StringBuilder analysis) throws AutomatorException { - if (!serverDefinition.isOperate()) { - analysis.append("No operate connection required, "); - return; - } - analysis.append("Operate connection..."); - - boolean isOk = true; - isOk = stillOk(serverDefinition.operateUrl, "operateUrl", analysis, true, true, isOk); - - CamundaOperateClientBuilder camundaOperateClientBuilder = new CamundaOperateClientBuilder(); - // ---------------------------- Camunda Saas - if (BpmnEngineList.CamundaEngine.CAMUNDA_8_SAAS.equals(this.typeCamundaEngine)) { - - try { - isOk = stillOk(serverDefinition.zeebeSaasRegion, "zeebeSaasRegion", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.zeebeSaasClusterId, "zeebeSaasClusterId", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.zeebeClientId, "zeebeClientId", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.zeebeClientSecret, "zeebeClientSecret", analysis, true, false, isOk); - - URL operateUrl = URI.create("https://" + serverDefinition.zeebeSaasRegion + ".operate.camunda.io/" - + serverDefinition.zeebeSaasClusterId).toURL(); - - SaaSAuthenticationBuilder saaSAuthenticationBuilder = SaaSAuthentication.builder(); - JwtConfig jwtConfig = new JwtConfig(); - jwtConfig.addProduct(Product.OPERATE, - new JwtCredential(serverDefinition.zeebeClientId, serverDefinition.zeebeClientSecret, - serverDefinition.operateAudience != null ? serverDefinition.operateAudience : "operate.camunda.io", - serverDefinition.authenticationUrl != null ? - serverDefinition.authenticationUrl : - SAAS_AUTHENTICATE_URL)); - - Authentication saasAuthentication = SaaSAuthentication.builder() - .withJwtConfig(jwtConfig) - .withJsonMapper(new SdkObjectMapper()) - .build(); - - camundaOperateClientBuilder.authentication(saasAuthentication) - .operateUrl(serverDefinition.operateUrl) - .setup() - .build(); - - } catch (Exception e) { - zeebeClient = null; - logger.error("Can't connect to SaaS environemnt[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); - throw new AutomatorException( - "Can't connect to SaaS environment[" + serverDefinition.name + "] Analysis:" + analysis + " fail : " - + e.getMessage()); - } - - //---------------------------- Camunda 8 Self Manage - } else if (BpmnEngineList.CamundaEngine.CAMUNDA_8.equals(this.typeCamundaEngine)) { - - isOk = stillOk(serverDefinition.zeebeGatewayAddress, "GatewayAddress", analysis, true, true, isOk); - - try { - if (serverDefinition.isAuthenticationUrl()) { - isOk = stillOk(serverDefinition.authenticationUrl, "authenticationUrl", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.operateClientId, "operateClientId", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.operateClientSecret, "operateClientSecret", analysis, true, false, isOk); - - IdentityConfiguration identityConfiguration = new IdentityConfiguration.Builder().withBaseUrl( - serverDefinition.identityUrl) - .withIssuer(serverDefinition.authenticationUrl) - .withIssuerBackendUrl(serverDefinition.authenticationUrl) - .withClientId(serverDefinition.operateClientId) - .withClientSecret(serverDefinition.operateClientSecret) - .withAudience(serverDefinition.operateAudience) - .build(); - Identity identity = new Identity(identityConfiguration); - - IdentityConfig identityConfig = new IdentityConfig(); - identityConfig.addProduct(Product.OPERATE, new IdentityContainer(identity, identityConfiguration)); - - JwtConfig jwtConfig = new JwtConfig(); - jwtConfig.addProduct(Product.OPERATE, new JwtCredential(serverDefinition.operateClientId, // clientId - serverDefinition.operateClientSecret, // clientSecret - "zeebe-api", // audience - serverDefinition.authenticationUrl)); - - io.camunda.common.auth.SelfManagedAuthenticationBuilder identityAuthenticationBuilder = io.camunda.common.auth.SelfManagedAuthentication.builder(); - identityAuthenticationBuilder.withJwtConfig(jwtConfig); - identityAuthenticationBuilder.withIdentityConfig(identityConfig); - - Authentication identityAuthentication = identityAuthenticationBuilder.build(); - camundaOperateClientBuilder.authentication(identityAuthentication) - .operateUrl(serverDefinition.operateUrl) - .setup() - .build(); - - } else { - // Simple authentication - isOk = stillOk(serverDefinition.operateUserName, "operateUserName", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.operateUserPassword, "operateUserPassword", analysis, true, false, isOk); - - SimpleCredential simpleCredential = new SimpleCredential(serverDefinition.operateUrl, - serverDefinition.operateUserName, serverDefinition.operateUserPassword); - - SimpleConfig jwtConfig = new io.camunda.common.auth.SimpleConfig(); - jwtConfig.addProduct(Product.OPERATE, simpleCredential); - - io.camunda.common.auth.SimpleAuthenticationBuilder simpleAuthenticationBuilder = SimpleAuthentication.builder(); - simpleAuthenticationBuilder.withSimpleConfig(jwtConfig); - - Authentication simpleAuthentication = simpleAuthenticationBuilder.build(); - camundaOperateClientBuilder.authentication(simpleAuthentication) - .operateUrl(serverDefinition.operateUrl) - .setup() - .build(); - } - } catch (Exception e) { - logger.error("Can't connect to SaaS environment[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); - throw new AutomatorException( - "Can't connect to SaaS environment[" + serverDefinition.name + "] Analysis:" + analysis + " fail : " - + e.getMessage()); - } - - } else - throw new AutomatorException("Invalid configuration"); - - if (!isOk) - throw new AutomatorException("Invalid configuration " + analysis); - - // ---------------- connection - try { - - operateClient = camundaOperateClientBuilder.build(); - - analysis.append("successfully, "); - - } catch (Exception e) { - logger.error("Can't connect to Server[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); - throw new AutomatorException( - "Can't connect to Server[" + serverDefinition.name + "] Analysis:" + analysis + " Fail : " + e.getMessage()); - } - } - - /** - * Connect to TaskList - * - * @param analysis complete the analysis - * @throws AutomatorException in case of error - */ - private void connectTaskList(StringBuilder analysis) throws AutomatorException { - - if (!serverDefinition.isTaskList()) { - analysis.append("No TaskList connection required, "); - return; - } - analysis.append("Tasklist ..."); - - boolean isOk = true; - isOk = stillOk(serverDefinition.taskListUrl, "taskListUrl", analysis, true, true, isOk); - - CamundaTaskListClientBuilder taskListBuilder = CamundaTaskListClient.builder(); - // ---------------------------- Camunda Saas - if (BpmnEngineList.CamundaEngine.CAMUNDA_8_SAAS.equals(this.typeCamundaEngine)) { - try { - isOk = stillOk(serverDefinition.zeebeSaasRegion, "zeebeSaasRegion", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.zeebeSaasClusterId, "zeebeSaasClusterId", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.taskListClientId, "taskListClientId", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.taskListClientSecret, "taskListClientSecret", analysis, true, false, isOk); - - String taskListUrl = "https://" + serverDefinition.zeebeSaasRegion + ".tasklist.camunda.io/" - + serverDefinition.zeebeSaasClusterId; - - taskListBuilder.taskListUrl(taskListUrl) - .saaSAuthentication(serverDefinition.taskListClientId, serverDefinition.taskListClientSecret); - } catch (Exception e) { - logger.error("Can't connect to SaaS environemnt[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); - throw new AutomatorException( - "Can't connect to SaaS environment[" + serverDefinition.name + "] Analysis:" + analysis + " fail : " - + e.getMessage()); - } - - //---------------------------- Camunda 8 Self Manage - } else if (BpmnEngineList.CamundaEngine.CAMUNDA_8.equals(this.typeCamundaEngine)) { - - if (serverDefinition.isAuthenticationUrl()) { - isOk = stillOk(serverDefinition.taskListClientId, "taskListClientId", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.taskListClientSecret, "taskListClientSecret", analysis, true, false, isOk); - isOk = stillOk(serverDefinition.authenticationUrl, "authenticationUrl", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.taskListKeycloakUrl, "taskListKeycloakUrl", analysis, true, true, isOk); - - taskListBuilder.taskListUrl(serverDefinition.taskListUrl) - .selfManagedAuthentication(serverDefinition.taskListClientId, serverDefinition.taskListClientSecret, - serverDefinition.taskListKeycloakUrl); - } else { - isOk = stillOk(serverDefinition.taskListUserName, "User", analysis, true, true, isOk); - isOk = stillOk(serverDefinition.taskListUserPassword, "Password", analysis, true, false, isOk); - - SimpleConfig simpleConf = new SimpleConfig(); - simpleConf.addProduct(Product.TASKLIST, - new SimpleCredential(serverDefinition.taskListUrl, serverDefinition.taskListUserName, - serverDefinition.taskListUserPassword)); - Authentication auth = SimpleAuthentication.builder().withSimpleConfig(simpleConf).build(); - - taskListBuilder.taskListUrl(serverDefinition.taskListUrl) - .authentication(auth) - .cookieExpiration(Duration.ofSeconds(5)); - } - } else - throw new AutomatorException("Invalid configuration"); - - if (!isOk) - throw new AutomatorException("Invalid configuration " + analysis); - - // ---------------- connection - try { - taskListBuilder.zeebeClient(zeebeClient); - taskListBuilder.useZeebeUserTasks(); - taskClient = taskListBuilder.build(); - - analysis.append("successfully, "); - - } catch (Exception e) { - logger.error("Can't connect to Server[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); - throw new AutomatorException( - "Can't connect to Server[" + serverDefinition.name + "] Analysis:" + analysis + " Fail : " + e.getMessage()); - } - - /* 1.6.1 - boolean isOk = true; - io.camunda.tasklist.auth.AuthInterface saTaskList; - - // ---------------------------- Camunda Saas - if (BpmnEngineList.CamundaEngine.CAMUNDA_8_SAAS.equals(this.typeCamundaEngine)) { - try { - saTaskList = new io.camunda.tasklist.auth.SaasAuthentication(serverDefinition.zeebeSaasClientId, - serverDefinition.zeebeSaasClientSecret); - } catch (Exception e) { - logger.error("Can't connect to SaaS environment[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); - throw new AutomatorException( - "Can't connect to SaaS environment[" + serverDefinition.name + "] Analysis:" + analysis + " fail : " - + e.getMessage()); - } - - //---------------------------- Camunda 8 Self Manage - } else if (BpmnEngineList.CamundaEngine.CAMUNDA_8.equals(this.typeCamundaEngine)) { - saTaskList = new io.camunda.tasklist.auth.SimpleAuthentication(serverDefinition.operateUserName, - serverDefinition.operateUserPassword); - } else - throw new AutomatorException("Invalid configuration"); - - if (!isOk) - throw new AutomatorException("Invalid configuration " + analysis); - - // ---------------- connection - try { - isOk = stillOk(serverDefinition.taskListUrl, "taskListUrl", analysis, false, isOk); - analysis.append("Tasklist ..."); - - taskClient = new CamundaTaskListClient.Builder().taskListUrl(serverDefinition.taskListUrl) - .authentication(saTaskList) - .build(); - analysis.append("successfully, "); - //get tasks assigned to demo - logger.info("Zeebe: OK, Operate: OK, TaskList:OK " + analysis); - - } catch (Exception e) { - logger.error("Can't connect to Server[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); - throw new AutomatorException( - "Can't connect to Server[" + serverDefinition.name + "] Analysis:" + analysis + " Fail : " + e.getMessage()); - } - */ + public BpmnEngineList.BpmnServerDefinition getServerDefinition() { + return serverDefinition; } /** @@ -1233,12 +660,12 @@ private void connectTaskList(StringBuilder analysis) throws AutomatorException { * @param wasOkBefore previous value, is returned if this check is Ok * @return previous value is ok false else */ - private boolean stillOk(Object value, - String message, - StringBuilder analysis, - boolean check, - boolean displayValueInAnalysis, - boolean wasOkBefore) { + protected boolean stillOk(Object value, + String message, + StringBuilder analysis, + boolean check, + boolean displayValueInAnalysis, + boolean wasOkBefore) { analysis.append(message); analysis.append("["); analysis.append(getDisplayValue(value, displayValueInAnalysis)); diff --git a/src/main/java/org/camunda/automator/bpmnengine/camunda8/OperateClient.java b/src/main/java/org/camunda/automator/bpmnengine/camunda8/OperateClient.java new file mode 100644 index 0000000..a4011df --- /dev/null +++ b/src/main/java/org/camunda/automator/bpmnengine/camunda8/OperateClient.java @@ -0,0 +1,429 @@ +package org.camunda.automator.bpmnengine.camunda8; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.camunda.operate.CamundaOperateClient; +import io.camunda.operate.CamundaOperateClientConfiguration; +import io.camunda.operate.auth.*; +import io.camunda.operate.auth.TokenResponseMapper.JacksonTokenResponseMapper; +import io.camunda.operate.exception.OperateException; +import io.camunda.operate.model.*; +import io.camunda.operate.search.*; +import io.camunda.zeebe.client.api.response.ActivateJobsResponse; +import io.camunda.zeebe.client.api.response.ActivatedJob; +import org.apache.hc.client5.http.impl.classic.HttpClients; +import org.camunda.automator.bpmnengine.BpmnEngine; +import org.camunda.automator.configuration.BpmnEngineList; +import org.camunda.automator.definition.ScenarioStep; +import org.camunda.automator.engine.AutomatorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.URI; +import java.net.URL; +import java.time.Duration; +import java.util.*; +import java.util.stream.Collectors; + + +public class OperateClient { + + public static final int SEARCH_MAX_SIZE = 100; + private final Logger logger = LoggerFactory.getLogger(OperateClient.class); + BpmnEngineCamunda8 engineCamunda8; + private CamundaOperateClient operateClient; + + protected OperateClient(BpmnEngineCamunda8 engineCamunda8) { + this.engineCamunda8 = engineCamunda8; + } + + /** + * Connect Operate + * + * @param analysis to cpmplete the analysis + * @throws AutomatorException in case of error + */ + protected void connectOperate(StringBuilder analysis) throws AutomatorException { + BpmnEngineList.BpmnServerDefinition serverDefinition = engineCamunda8.getServerDefinition(); + + if (!serverDefinition.isOperate()) { + analysis.append("No operate connection required, "); + return; + } + analysis.append("Operate connection..."); + + boolean isOk = true; + isOk = engineCamunda8.stillOk(serverDefinition.operateUrl, "operateUrl", analysis, true, true, isOk); + + // CamundaOperateClientBuilder camundaOperateClientBuilder = new CamundaOperateClientBuilder(); + CamundaOperateClientConfiguration configuration = null; + // ---------------------------- Camunda Saas + if (BpmnEngineList.CamundaEngine.CAMUNDA_8_SAAS.equals(serverDefinition.serverType)) { + + try { + isOk = engineCamunda8.stillOk(serverDefinition.zeebeSaasRegion, "zeebeSaasRegion", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.zeebeSaasClusterId, "zeebeSaasClusterId", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.zeebeClientId, "zeebeClientId", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.zeebeClientSecret, "zeebeClientSecret", analysis, true, false, isOk); + + URL operateUrl = URI.create("https://" + serverDefinition.zeebeSaasRegion + ".operate.camunda.io/" + + serverDefinition.zeebeSaasClusterId).toURL(); + URL authUrl = URI.create("https://login.cloud.camunda.io/oauth/token").toURL(); + JwtCredential credentials = + new JwtCredential(serverDefinition.zeebeClientId, serverDefinition.zeebeClientSecret, "operate.camunda.io", authUrl, null); + ObjectMapper objectMapper = new ObjectMapper(); + TokenResponseMapper tokenResponseMapper = new JacksonTokenResponseMapper(objectMapper); + JwtAuthentication authentication = new JwtAuthentication(credentials, tokenResponseMapper); + configuration = + new CamundaOperateClientConfiguration( + authentication, operateUrl, objectMapper, HttpClients.createDefault()); + + + } catch (Exception e) { + logger.error("Can't connect to SaaS environemnt[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); + throw new AutomatorException( + "Can't connect to SaaS environment[" + serverDefinition.name + "] Analysis:" + analysis + " fail : " + + e.getMessage()); + } + + //---------------------------- Camunda 8 Self Manage + } else if (BpmnEngineList.CamundaEngine.CAMUNDA_8.equals(serverDefinition.serverType)) { + + isOk = engineCamunda8.stillOk(serverDefinition.zeebeGatewayAddress, "GatewayAddress", analysis, true, true, isOk); + + try { + if (serverDefinition.isAuthenticationUrl()) { + isOk = engineCamunda8.stillOk(serverDefinition.authenticationUrl, "authenticationUrl", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.operateClientId, "operateClientId", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.operateClientSecret, "operateClientSecret", analysis, true, false, isOk); + + String scope = ""; + URL authUrl = + URI.create( + "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token") + .toURL(); + URL operateUrl = URI.create(serverDefinition.operateUrl).toURL(); + // bootstrapping + JwtCredential credentials = + new JwtCredential(serverDefinition.operateClientId, serverDefinition.operateClientSecret, "operate-api", authUrl, scope); + ObjectMapper objectMapper = new ObjectMapper(); + TokenResponseMapper tokenResponseMapper = new JacksonTokenResponseMapper(objectMapper); + JwtAuthentication authentication = new JwtAuthentication(credentials, tokenResponseMapper); + configuration = + new CamundaOperateClientConfiguration( + authentication, operateUrl, objectMapper, HttpClients.createDefault()); + + } else { + // Simple authentication + isOk = engineCamunda8.stillOk(serverDefinition.operateUserName, "operateUserName", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.operateUserPassword, "operateUserPassword", analysis, true, false, isOk); + URL operateUrl = URI.create(serverDefinition.operateUrl).toURL(); + + SimpleCredential credentials = new SimpleCredential(serverDefinition.operateUserName, serverDefinition.operateUserPassword, operateUrl, Duration.ofMinutes(10)); + SimpleAuthentication authentication = new SimpleAuthentication(credentials); + ObjectMapper objectMapper = new ObjectMapper(); + configuration = new CamundaOperateClientConfiguration(authentication, operateUrl, objectMapper, HttpClients.createDefault()); + } + } catch (Exception e) { + logger.error("Can't connect to SaaS environment[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); + throw new AutomatorException( + "Can't connect to SaaS environment[" + serverDefinition.name + "] Analysis:" + analysis + " fail : " + + e.getMessage()); + } + + } else + throw new AutomatorException("Invalid configuration"); + + if (!isOk) + throw new AutomatorException("Invalid configuration " + analysis); + + // ---------------- connection + try { + + operateClient = new CamundaOperateClient(configuration); + + analysis.append("successfully, "); + + } catch (Exception e) { + logger.error("Can't connect to Server[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); + throw new AutomatorException( + "Can't connect to Server[" + serverDefinition.name + "] Analysis:" + analysis + " Fail : " + e.getMessage()); + } + } + + public List activateServiceTasks(String processInstanceId, String serviceTaskId, String topic, int maxResult) + throws AutomatorException { + try { + if (operateClient == null) { + throw new AutomatorException("No Operate connection was provided"); + } + long processInstanceIdLong = Long.parseLong(processInstanceId); + + ProcessInstanceFilter processInstanceFilter = ProcessInstanceFilter.builder() + .parentKey(processInstanceIdLong) + .build(); + + SearchQuery processInstanceQuery = new SearchQuery.Builder().filter(processInstanceFilter).size(100).build(); + + List listProcessInstances = operateClient.searchProcessInstances(processInstanceQuery); + Set setProcessInstances = listProcessInstances.stream() + .map(ProcessInstance::getKey) + .collect(Collectors.toSet()); + setProcessInstances.add(processInstanceIdLong); + + ActivateJobsResponse jobsResponse = engineCamunda8.getZeebeClient().newActivateJobsCommand() + .jobType(topic) + .maxJobsToActivate(10000) + .workerName(Thread.currentThread().getName()) + .send() + .join(); + List listJobsId = new ArrayList<>(); + + for (ActivatedJob job : jobsResponse.getJobs()) { + if (setProcessInstances.contains(job.getProcessInstanceKey())) + listJobsId.add(String.valueOf(job.getKey())); + else { + engineCamunda8.getZeebeClient().newFailCommand(job.getKey()).retries(2).send().join(); + } + } + return listJobsId; + + } catch (Exception e) { + throw new AutomatorException("Can't search service task topic[" + topic + "] : " + e.getMessage()); + } + } + + /** + * @param processInstanceId filter on the processInstanceId. may be null + * @param filterTaskId filter on the taskId + * @param maxResult maximum Result + * @return list of Task + * @throws AutomatorException + */ + public List searchTasksByProcessInstanceId(String processInstanceId, String filterTaskId, int maxResult) + throws AutomatorException { + try { + if (operateClient == null) { + throw new AutomatorException("No Operate connection was provided"); + } + + // impossible to filter by the task name/ task tyoe, so be ready to get a lot of flowNode and search the correct onee + FlowNodeInstanceFilter flownodeFilter = FlowNodeInstanceFilter.builder() + .processInstanceKey(Long.valueOf(processInstanceId)) + .build(); + + SearchQuery flowNodeQuery = new SearchQuery.Builder().filter(flownodeFilter).size(maxResult).build(); + List flowNodes = operateClient.searchFlowNodeInstances(flowNodeQuery); + return flowNodes.stream() + .filter(t -> filterTaskId == null || filterTaskId.equals(t.getFlowNodeId())) // Filter by name + .map(t -> { + BpmnEngine.TaskDescription taskDescription = new BpmnEngine.TaskDescription(); + taskDescription.taskId = t.getFlowNodeId(); + taskDescription.processInstanceId = String.valueOf(t.getProcessInstanceKey()); + taskDescription.startDate = t.getStartDate().getDate(); + taskDescription.endDate = t.getEndDate().getDate(); + taskDescription.type = getTaskType(t.getType()); // to implement + taskDescription.isCompleted = FlowNodeInstanceState.COMPLETED.equals(t.getState()); // to implement + return taskDescription; + }).toList(); + + } catch (OperateException e) { + logger.error("Can't search FlowNode: " + e.getMessage()); + throw new AutomatorException("Can't search FlowNode: " + e.getMessage()); + } + // We must not be here + catch (Exception e) { + logger.error("Can't search FlowNode EXCEPTION NOT EXPECTED: " + e.getMessage()); + throw new AutomatorException("Can't search FlowNode: " + e.getMessage()); + } + } + + public List searchProcessInstanceByVariable(String processId, + Map filterVariables, + int maxResult) throws AutomatorException { + try { + if (operateClient == null) { + throw new AutomatorException("No Operate connection was provided"); + } + + // impossible to filter by the task name/ task tyoe, so be ready to get a lot of flowNode and search the correct onee + ProcessInstanceFilter processInstanceFilter = ProcessInstanceFilter.builder().bpmnProcessId(processId).build(); + + SearchQuery processInstanceQuery = new SearchQuery.Builder().filter(processInstanceFilter) + .size(maxResult) + .build(); + List listProcessInstances = operateClient.searchProcessInstances(processInstanceQuery); + + List listProcessInstanceFind = new ArrayList<>(); + // now, we have to filter based on variableName/value + + for (ProcessInstance processInstance : listProcessInstances) { + Map processVariables = getVariables(processInstance.getKey().toString()); + List> entriesNotFiltered = filterVariables.entrySet() + .stream() + .filter( + t -> processVariables.containsKey(t.getKey()) && processVariables.get(t.getKey()).equals(t.getValue())) + .toList(); + + if (entriesNotFiltered.isEmpty()) { + + BpmnEngine.ProcessDescription processDescription = new BpmnEngine.ProcessDescription(); + processDescription.processInstanceId = processInstance.getKey().toString(); + + listProcessInstanceFind.add(processDescription); + } + } + return listProcessInstanceFind; + } catch (OperateException e) { + logger.error("Can't search flowNodeByVariable: " + e.getMessage()); + throw new AutomatorException("Can't search flowNodeByVariable " + e.getMessage()); + } + // We must not be here + catch (Exception e) { + logger.error("Can't search flowNodeByVariable EXCEPTION NOT EXPECTED: " + e.getMessage()); + throw new AutomatorException("Can't search FlowNode: " + e.getMessage()); + } + } + + + public Map getVariables(String processInstanceId) throws AutomatorException { + try { + if (operateClient == null) { + throw new AutomatorException("No Operate connection was provided"); + } + + // impossible to filter by the task name/ task tyoe, so be ready to get a lot of flowNode and search the correct onee + VariableFilter variableFilter = VariableFilter.builder() + .processInstanceKey(Long.valueOf(processInstanceId)) + .build(); + + SearchQuery variableQuery = new SearchQuery.Builder().filter(variableFilter).build(); + List listVariables = operateClient.searchVariables(variableQuery); + + Map variables = new HashMap<>(); + listVariables.forEach(t -> variables.put(t.getName(), t.getValue())); + + return variables; + } catch (OperateException e) { + logger.error("Can't getVariables: " + e.getMessage()); + throw new AutomatorException("Can't search variables task " + e.getMessage()); + } + // We must not be here + catch (Exception e) { + logger.error("Can't getVariables EXCEPTION NOT EXPECTED: " + e.getMessage()); + throw new AutomatorException("Can't getVariables: " + e.getMessage()); + } + } + + public long countNumberOfProcessInstancesCreated(String processId, Date startDate, Date endDate) + throws AutomatorException { + if (operateClient == null) { + throw new AutomatorException("No Operate connection was provided"); + } + + SearchQuery.Builder queryBuilder = new SearchQuery.Builder(); + try { + int cumul = 0; + SearchResult searchResult = null; + queryBuilder = queryBuilder.filter(ProcessInstanceFilter.builder().bpmnProcessId(processId).build()); + queryBuilder.sort(new Sort("key", SortOrder.ASC)); + int maxLoop = 0; + do { + maxLoop++; + if (searchResult != null && !searchResult.getItems().isEmpty()) { + queryBuilder.searchAfter(searchResult.getSortValues()); + } + SearchQuery searchQuery = queryBuilder.build(); + searchQuery.setSize(SEARCH_MAX_SIZE); + searchResult = operateClient.searchProcessInstanceResults(searchQuery); + + cumul += searchResult.getItems().stream().filter(t -> t.getStartDate().getDate().after(startDate)).count(); + + } while (searchResult.getItems().size() >= SEARCH_MAX_SIZE && maxLoop < 1000); + return cumul; + } catch (Exception e) { + throw new AutomatorException("Search countNumberProcessInstanceCreated " + e.getMessage()); + } + } + + public long countNumberOfProcessInstancesEnded(String processId, Date startDate, Date endDate) + throws AutomatorException { + if (operateClient == null) { + throw new AutomatorException("No Operate connection was provided"); + } + + SearchQuery.Builder queryBuilder = new SearchQuery.Builder(); + try { + int cumul = 0; + SearchResult searchResult = null; + + queryBuilder = queryBuilder.filter(ProcessInstanceFilter.builder().bpmnProcessId(processId) + // .startDate(startDate) + // .endDate(endDate) + .state(ProcessInstanceState.COMPLETED).build()); + + queryBuilder.sort(new Sort("key", SortOrder.ASC)); + int maxLoop = 0; + do { + maxLoop++; + if (searchResult != null && !searchResult.getItems().isEmpty()) { + queryBuilder.searchAfter(searchResult.getSortValues()); + } + SearchQuery searchQuery = queryBuilder.build(); + searchQuery.setSize(SEARCH_MAX_SIZE); + searchResult = operateClient.searchProcessInstanceResults(searchQuery); + cumul += searchResult.getItems().stream().filter(t -> t.getStartDate().getDate().after(startDate)).count(); + + } while (searchResult.getItems().size() >= SEARCH_MAX_SIZE && maxLoop < 1000); + return cumul; + + } catch (Exception e) { + throw new AutomatorException("Search countNumberProcessEnded " + e.getMessage()); + } + } + + public long countNumberOfTasks(String processId, String taskId) throws AutomatorException { + if (operateClient == null) { + throw new AutomatorException("No Operate connection was provided"); + } + + try { + int cumul = 0; + SearchResult searchResult = null; + int maxLoop = 0; + do { + maxLoop++; + + SearchQuery.Builder queryBuilder = new SearchQuery.Builder(); + queryBuilder = queryBuilder.filter(FlowNodeInstanceFilter.builder().flowNodeId(taskId).build()); + queryBuilder.sort(new Sort("key", SortOrder.ASC)); + if (searchResult != null && !searchResult.getItems().isEmpty()) { + queryBuilder.searchAfter(searchResult.getSortValues()); + } + SearchQuery searchQuery = queryBuilder.build(); + searchQuery.setSize(SEARCH_MAX_SIZE); + searchResult = operateClient.searchFlowNodeInstanceResults(searchQuery); + cumul += (long) searchResult.getItems().size(); + } while (searchResult.getItems().size() >= SEARCH_MAX_SIZE && maxLoop < 1000); + return cumul; + } catch (Exception e) { + throw new AutomatorException("Search countNumberProcessEnded " + e.getMessage()); + } + } + + private ScenarioStep.Step getTaskType(String taskTypeC8) { + return switch (taskTypeC8) { + case "SERVICE_TASK" -> ScenarioStep.Step.SERVICETASK; + case "USER_TASK" -> ScenarioStep.Step.USERTASK; + case "START_EVENT" -> ScenarioStep.Step.STARTEVENT; + case "END_EVENT" -> ScenarioStep.Step.ENDEVENT; + case "EXCLUSIVE_GATEWAY" -> ScenarioStep.Step.EXCLUSIVEGATEWAY; + case "PARALLEL_GATEWAY" -> ScenarioStep.Step.PARALLELGATEWAY; + case "TASK" -> ScenarioStep.Step.TASK; + case "SCRIPT_TASK" -> ScenarioStep.Step.SCRIPTTASK; + default -> null; + }; + + } + +} diff --git a/src/main/java/org/camunda/automator/bpmnengine/camunda8/TaskListClient.java b/src/main/java/org/camunda/automator/bpmnengine/camunda8/TaskListClient.java new file mode 100644 index 0000000..3fe8ea2 --- /dev/null +++ b/src/main/java/org/camunda/automator/bpmnengine/camunda8/TaskListClient.java @@ -0,0 +1,245 @@ +package org.camunda.automator.bpmnengine.camunda8; + +import io.camunda.tasklist.CamundaTaskListClient; +import io.camunda.tasklist.CamundaTaskListClientBuilder; +import io.camunda.tasklist.auth.Authentication; +import io.camunda.tasklist.auth.SimpleAuthentication; +import io.camunda.tasklist.auth.SimpleCredential; +import io.camunda.tasklist.dto.*; +import io.camunda.tasklist.exception.TaskListException; +import org.camunda.automator.configuration.BpmnEngineList; +import org.camunda.automator.engine.AutomatorException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.MalformedURLException; +import java.net.URL; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TaskListClient { + private final Logger logger = LoggerFactory.getLogger(TaskListClient.class); + BpmnEngineCamunda8 engineCamunda8; + private CamundaTaskListClient taskClient; + + protected TaskListClient(BpmnEngineCamunda8 engineCamunda8) { + this.engineCamunda8 = engineCamunda8; + } + + /** + * Connect to TaskList + * + * @param analysis complete the analysis + * @throws AutomatorException in case of error + */ + public void connectTaskList(StringBuilder analysis) throws AutomatorException { + + BpmnEngineList.BpmnServerDefinition serverDefinition = engineCamunda8.getServerDefinition(); + if (!serverDefinition.isTaskList()) { + analysis.append("No TaskList connection required, "); + return; + } + analysis.append("Tasklist ..."); + + boolean isOk = true; + isOk = engineCamunda8.stillOk(serverDefinition.taskListUrl, "taskListUrl", analysis, true, true, isOk); + + CamundaTaskListClientBuilder taskListBuilder = CamundaTaskListClient.builder(); + // ---------------------------- Camunda Saas + if (BpmnEngineList.CamundaEngine.CAMUNDA_8_SAAS.equals(serverDefinition.serverType)) { + try { + isOk = engineCamunda8.stillOk(serverDefinition.zeebeSaasRegion, "zeebeSaasRegion", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.zeebeSaasClusterId, "zeebeSaasClusterId", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.taskListClientId, "taskListClientId", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.taskListClientSecret, "taskListClientSecret", analysis, true, false, isOk); + + String taskListUrl = "https://" + serverDefinition.zeebeSaasRegion + ".tasklist.camunda.io/" + + serverDefinition.zeebeSaasClusterId; + + taskListBuilder.taskListUrl(taskListUrl) + .saaSAuthentication(serverDefinition.taskListClientId, serverDefinition.taskListClientSecret); + } catch (Exception e) { + logger.error("Can't connect to SaaS environemnt[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); + throw new AutomatorException( + "Can't connect to SaaS environment[" + serverDefinition.name + "] Analysis:" + analysis + " fail : " + + e.getMessage()); + } + + //---------------------------- Camunda 8 Self Manage + } else if (BpmnEngineList.CamundaEngine.CAMUNDA_8.equals(serverDefinition.serverType)) { + + if (serverDefinition.isAuthenticationUrl()) { + isOk = engineCamunda8.stillOk(serverDefinition.taskListClientId, "taskListClientId", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.taskListClientSecret, "taskListClientSecret", analysis, true, false, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.authenticationUrl, "authenticationUrl", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.taskListKeycloakUrl, "taskListKeycloakUrl", analysis, true, true, isOk); + + taskListBuilder.taskListUrl(serverDefinition.taskListUrl) + .selfManagedAuthentication(serverDefinition.taskListClientId, serverDefinition.taskListClientSecret, + serverDefinition.taskListKeycloakUrl); + } else { + isOk = engineCamunda8.stillOk(serverDefinition.taskListUserName, "User", analysis, true, true, isOk); + isOk = engineCamunda8.stillOk(serverDefinition.taskListUserPassword, "Password", analysis, true, false, isOk); + try { + SimpleCredential credentials = new SimpleCredential(serverDefinition.taskListUserName, + serverDefinition.taskListUserPassword, new URL(serverDefinition.taskListUrl), Duration.ofHours(1000)); + Authentication authentication = new SimpleAuthentication(credentials); + taskListBuilder.taskListUrl(serverDefinition.taskListUrl).authentication(authentication); + } catch (MalformedURLException e) { + throw new AutomatorException("Invalid Url for TaskList: [" + serverDefinition.taskListUrl + "]"); + + } + } + } else + throw new AutomatorException("Invalid configuration"); + + if (!isOk) + throw new AutomatorException("Invalid configuration " + analysis); + + // ---------------- connection + try { + taskListBuilder.zeebeClient(engineCamunda8.getZeebeClient()); + taskListBuilder.useZeebeUserTasks(); + taskClient = taskListBuilder.build(); + + analysis.append("successfully, "); + + } catch (Exception e) { + logger.error("Can't connect to Server[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); + throw new AutomatorException( + "Can't connect to Server[" + serverDefinition.name + "] Analysis:" + analysis + " Fail : " + e.getMessage()); + } + + /* 1.6.1 + boolean isOk = true; + io.camunda.tasklist.auth.AuthInterface saTaskList; + + // ---------------------------- Camunda Saas + if (BpmnEngineList.CamundaEngine.CAMUNDA_8_SAAS.equals(this.typeCamundaEngine)) { + try { + saTaskList = new io.camunda.tasklist.auth.SaasAuthentication(serverDefinition.zeebeSaasClientId, + serverDefinition.zeebeSaasClientSecret); + } catch (Exception e) { + logger.error("Can't connect to SaaS environment[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); + throw new AutomatorException( + "Can't connect to SaaS environment[" + serverDefinition.name + "] Analysis:" + analysis + " fail : " + + e.getMessage()); + } + + //---------------------------- Camunda 8 Self Manage + } else if (BpmnEngineList.CamundaEngine.CAMUNDA_8.equals(this.typeCamundaEngine)) { + saTaskList = new io.camunda.tasklist.auth.SimpleAuthentication(serverDefinition.operateUserName, + serverDefinition.operateUserPassword); + } else + throw new AutomatorException("Invalid configuration"); + + if (!isOk) + throw new AutomatorException("Invalid configuration " + analysis); + + // ---------------- connection + try { + isOk = engineCamunda8.stillOk(serverDefinition.taskListUrl, "taskListUrl", analysis, false, isOk); + analysis.append("Tasklist ..."); + + taskClient = new CamundaTaskListClient.Builder().taskListUrl(serverDefinition.taskListUrl) + .authentication(saTaskList) + .build(); + analysis.append("successfully, "); + //get tasks assigned to demo + logger.info("Zeebe: OK, Operate: OK, TaskList:OK " + analysis); + + } catch (Exception e) { + logger.error("Can't connect to Server[{}] Analysis:{} : {}", serverDefinition.name, analysis, e); + throw new AutomatorException( + "Can't connect to Server[" + serverDefinition.name + "] Analysis:" + analysis + " Fail : " + e.getMessage()); + } + */ + + } + + public List searchUserTasksByProcessInstance(String processInstanceId, String userTaskId, int maxResult) + throws AutomatorException { + try { + // impossible to filter by the task name/ task type, so be ready to get a lot of flowNode and search the correct one + Long processInstanceIdLong = Long.valueOf(processInstanceId); + + TaskSearch taskSearch = new TaskSearch(); + taskSearch.setState(TaskState.CREATED); + taskSearch.setAssigned(Boolean.FALSE); + taskSearch.setWithVariables(true); + taskSearch.setPagination(new Pagination().setPageSize(maxResult)); + + TaskList tasksList = taskClient.getTasks(taskSearch); + boolean getAllTasks = tasksList.size() < maxResult; + List listTasksResult = new ArrayList<>(); + do { + if (!engineCamunda8.isHightFlowMode()) { + // We check that the task is the one expected + listTasksResult.addAll(tasksList.getItems().stream().filter(t -> { + List listVariables = t.getVariables(); + Long processInstanceIdTask = engineCamunda8.getProcessInstanceIdFromMarker(listVariables); + if (processInstanceIdTask == null) { + return false; + } + return (processInstanceIdLong.equals(processInstanceIdTask)); + }).map(Task::getId) // Task to ID + .toList()); + } else { + listTasksResult.addAll(tasksList.getItems().stream() + .map(Task::getId) // Task to ID + .toList()); + } + + if (tasksList.size() > 0 && !getAllTasks) + tasksList = taskClient.after(tasksList); + } while (tasksList.size() > 0 && !getAllTasks); + + return listTasksResult; + + } catch (TaskListException e) { + logger.error("TaskListClient: error during search task [{}]", e.getMessage()); + throw new AutomatorException("Can't search users task " + e.getMessage()); + } + } + + public List searchUserTasks(String userTaskId, int maxResult) throws AutomatorException { + try { + // impossible to filter by the task name/ task type, so be ready to get a lot of flowNode and search the correct one + + TaskSearch taskSearch = new TaskSearch(); + taskSearch.setState(TaskState.CREATED); + taskSearch.setAssigned(Boolean.FALSE); + taskSearch.setWithVariables(true); + taskSearch.setPagination(new Pagination().setPageSize(maxResult)); + + TaskList tasksList = taskClient.getTasks(taskSearch); + List listTasksResult = new ArrayList<>(); + do { + listTasksResult.addAll(tasksList.getItems().stream().map(Task::getId) // Task to ID + .toList()); + + if (tasksList.size() > 0) + tasksList = taskClient.after(tasksList); + } while (tasksList.size() > 0); + + return listTasksResult; + + } catch (TaskListException e) { + throw new AutomatorException("Can't search users task " + e.getMessage()); + } + } + + public void executeUserTask(String userTaskId, String userId, Map variables) + throws AutomatorException { + try { + taskClient.claim(userTaskId, engineCamunda8.getServerDefinition().operateUserName); + taskClient.completeTask(userTaskId, variables); + } catch (TaskListException e) { + throw new AutomatorException("Can't execute task [" + userTaskId + "]"); + } catch (Exception e) { + throw new AutomatorException("Can't execute task [" + userTaskId + "]"); + } + } +} diff --git a/src/main/java/org/camunda/automator/bpmnengine/camunda8/refactoring/RefactoredCommandWrapper.java b/src/main/java/org/camunda/automator/bpmnengine/camunda8/refactoring/RefactoredCommandWrapper.java deleted file mode 100644 index ae5c95f..0000000 --- a/src/main/java/org/camunda/automator/bpmnengine/camunda8/refactoring/RefactoredCommandWrapper.java +++ /dev/null @@ -1,89 +0,0 @@ -package org.camunda.automator.bpmnengine.camunda8.refactoring; - -import io.camunda.zeebe.client.api.ZeebeFuture; -import io.camunda.zeebe.client.api.command.FinalCommandStep; -import io.camunda.zeebe.client.api.worker.BackoffSupplier; -import io.camunda.zeebe.spring.client.jobhandling.CommandWrapper; -import io.camunda.zeebe.spring.client.jobhandling.DefaultCommandExceptionHandlingStrategy; - -import java.time.Instant; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * Copied from CommandWrapper from spring-zeebe. Refactor over there to be able to use built-in stuff directly - */ -public class RefactoredCommandWrapper extends CommandWrapper { - - private final FinalCommandStep command; - private final long deadline; - private final String entityLogInfo; - private final DefaultCommandExceptionHandlingStrategy commandExceptionHandlingStrategy; - private final int maxRetries = 20; - private long currentRetryDelay = 50L; - private int invocationCounter = 0; - - public RefactoredCommandWrapper(FinalCommandStep command, - long deadline, - String entityLogInfo, - DefaultCommandExceptionHandlingStrategy commandExceptionHandlingStrategy) { - super(command, null, commandExceptionHandlingStrategy); - this.command = command; - this.deadline = deadline; - this.entityLogInfo = entityLogInfo; - this.commandExceptionHandlingStrategy = commandExceptionHandlingStrategy; - } - - @Override - public void executeAsync() { - ++this.invocationCounter; - ZeebeFuture zeebeFuture = this.command.send(); - if (commandExceptionHandlingStrategy != null) - zeebeFuture.exceptionally(t -> { - this.commandExceptionHandlingStrategy.handleCommandError(this, t); - return null; - }); - } - - public Object executeSync() { - ++this.invocationCounter; - ZeebeFuture zeebeFutur = this.command.send(); - if (commandExceptionHandlingStrategy != null) - zeebeFutur.exceptionally(t -> { - this.commandExceptionHandlingStrategy.handleCommandError(this, t); - return null; - }); - return zeebeFutur.join(); - } - - @Override - public void increaseBackoffUsing(BackoffSupplier backoffSupplier) { - this.currentRetryDelay = backoffSupplier.supplyRetryDelay(this.currentRetryDelay); - } - - @Override - public void scheduleExecutionUsing(ScheduledExecutorService scheduledExecutorService) { - scheduledExecutorService.schedule(this::executeAsync, this.currentRetryDelay, TimeUnit.MILLISECONDS); - } - - @Override - public String toString() { - return "{command=" + this.command.getClass() + ", entity=" + this.entityLogInfo + ", currentRetryDelay=" - + this.currentRetryDelay + '}'; - } - - @Override - public boolean hasMoreRetries() { - if (this.jobDeadlineExceeded()) { - return false; - } else { - return this.invocationCounter < this.maxRetries; - } - } - - @Override - public boolean jobDeadlineExceeded() { - return Instant.now().getEpochSecond() > this.deadline; - } - -} diff --git a/src/main/java/org/camunda/automator/bpmnengine/dummy/BpmnEngineDummy.java b/src/main/java/org/camunda/automator/bpmnengine/dummy/BpmnEngineDummy.java index 5ed76ea..ae31b7d 100644 --- a/src/main/java/org/camunda/automator/bpmnengine/dummy/BpmnEngineDummy.java +++ b/src/main/java/org/camunda/automator/bpmnengine/dummy/BpmnEngineDummy.java @@ -86,7 +86,7 @@ public RegisteredTask registerServiceTask(String workerId, } @Override - public List searchServiceTasks(String processInstanceId, String serviceTaskId, String topic, int maxResult) + public List activateServiceTasks(String processInstanceId, String serviceTaskId, String topic, int maxResult) throws AutomatorException { return Collections.emptyList(); } diff --git a/src/main/java/org/camunda/automator/configuration/BpmnEngineList.java b/src/main/java/org/camunda/automator/configuration/BpmnEngineList.java index efa90bf..6ecc70c 100644 --- a/src/main/java/org/camunda/automator/configuration/BpmnEngineList.java +++ b/src/main/java/org/camunda/automator/configuration/BpmnEngineList.java @@ -516,7 +516,8 @@ public enum CamundaEngine {CAMUNDA_7, CAMUNDA_8, CAMUNDA_8_SAAS, DUMMY} public static class BpmnServerDefinition { public String name; - public CamundaEngine serverType; + public CamundaEngine serverType = BpmnEngineList.CamundaEngine.CAMUNDA_8; + /** * My Zeebe Address diff --git a/src/main/java/org/camunda/automator/content/ContentManager.java b/src/main/java/org/camunda/automator/content/ContentManager.java index 4b2f866..0df289d 100644 --- a/src/main/java/org/camunda/automator/content/ContentManager.java +++ b/src/main/java/org/camunda/automator/content/ContentManager.java @@ -14,7 +14,7 @@ import org.springframework.web.multipart.MultipartFile; import javax.annotation.PostConstruct; -import java.io.*; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -42,10 +42,10 @@ public Path getFromName(String scenarioName) { } - @PostConstruct public void init() { try { + logger.info("ContentManager: start initialization"); repositoryManager.initializeRepository(repositoryPath); loadUploadPath(); LoadContentResource(); @@ -70,7 +70,7 @@ public List getContentScenario() { List listContent = repositoryManager.getContentRepository(); for (Path path : listContent) { // The content can have multiple files, not only scenario - if (! path.getFileName().toString().endsWith(".json")) + if (!path.getFileName().toString().endsWith(".json")) continue; try { Scenario scenario = Scenario.createFromFile(path); @@ -83,7 +83,6 @@ public List getContentScenario() { } - public Path addFile(Path file) throws IOException { return repositoryManager.addFile(file); } @@ -124,7 +123,7 @@ private void LoadContentResource() { private void loadUploadPath() { try { Path sourceDirectory = Paths.get(uploadPath); - logger.info("ContentManager/Upload: from [{}]", sourceDirectory); + logger.info("ContentManager/Upload: from [{}]", sourceDirectory.toAbsolutePath()); int nbFilesCopied = 0; // Copy all files from source to target List listFiles = Files.walk(sourceDirectory) diff --git a/src/main/java/org/camunda/automator/content/ContentRestController.java b/src/main/java/org/camunda/automator/content/ContentRestController.java index 3f75eba..bef88dd 100644 --- a/src/main/java/org/camunda/automator/content/ContentRestController.java +++ b/src/main/java/org/camunda/automator/content/ContentRestController.java @@ -31,15 +31,16 @@ public class ContentRestController { **/ @PostMapping(value = "/api/content/add", consumes = { MediaType.MULTIPART_FORM_DATA_VALUE}, produces = MediaType.APPLICATION_JSON_VALUE) - public List> upload(@RequestPart("File") List uploadedfiles) { + public List> upload(@RequestPart("FileToUpload") List uploadedfiles) { List> result = new ArrayList<>(); for (MultipartFile file : uploadedfiles) { try { Path fileSaved = contentManager.addFromMultipart(file, file.getOriginalFilename()); result.add(Map.of("filename", fileSaved.getFileName(), "status", "UPLOADED")); + logger.info("ControlRestController: uploaded file[{}] with success", file.getOriginalFilename()); } catch (Exception e) { + logger.info("ControlRestController: Errir upload file [{}] : {}", file.getOriginalFilename(), e.getMessage()); result.add(Map.of("filename", file.getOriginalFilename(), "status", "ERROR", "error", e.getMessage())); - } } return result; @@ -48,11 +49,15 @@ public List> upload(@RequestPart("File") List @GetMapping("/api/content/list") List> getContentScenario() { + logger.debug("ControlRestController/getContentScenario: start"); try { - return contentManager.getContentScenario().stream() + List> listScenario = contentManager.getContentScenario().stream() .map(Scenario::getDescription) .toList(); + logger.info("ControlRestController/getContentScenario: found {} scenario", listScenario.size()); + return listScenario; } catch (Exception e) { + logger.info("ControlRestController/getContentScenario: Error during getContentScenario {} ", e.getMessage()); throw new ResponseStatusException(HttpStatus.BAD_REQUEST, "Error during Content : " + e.getMessage()); } } diff --git a/src/main/java/org/camunda/automator/content/RepositoryManager.java b/src/main/java/org/camunda/automator/content/RepositoryManager.java index 3b424e9..55a4403 100644 --- a/src/main/java/org/camunda/automator/content/RepositoryManager.java +++ b/src/main/java/org/camunda/automator/content/RepositoryManager.java @@ -6,10 +6,7 @@ import org.springframework.core.io.Resource; import java.io.*; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardCopyOption; +import java.nio.file.*; import java.util.Collections; import java.util.List; import java.util.stream.Stream; @@ -20,25 +17,35 @@ public class RepositoryManager { private Path repositoryPath; public void initializeRepository(String repositoryProposition) throws AutomatorException { + logger.info("RepositoryManager: initialisation proposition: {}", repositoryProposition); + + repositoryPath = null; if (repositoryProposition != null && !repositoryProposition.isEmpty()) { Path path = Paths.get(repositoryProposition); if (Files.exists(path) && Files.isDirectory(path)) { repositoryPath = path; + logger.info("RepositoryManager: PathFromConfiguration [{}]", repositoryPath.toAbsolutePath()); } - } else { + } + if (repositoryPath == null) { // Not exist: create a subfolder Path tempDir = Paths.get(System.getProperty("java.io.tmpdir")); + logger.info("RepositoryManager/initialization: TemporaryFolder [{}]", tempDir.toAbsolutePath()); // Create a new folder in the temporary directory try { - repositoryPath = Files.createDirectory(tempDir.resolve("repository")); + repositoryPath = tempDir.resolve("repository"); + Files.createDirectory(repositoryPath); + logger.info("RepositoryManager/initialization: PathFromTemporaryFolder [{}]", repositoryPath.toAbsolutePath()); + } catch (FileAlreadyExistsException e) { + logger.info("RepositoryManager/initialization: File already exists [{}]", repositoryPath.toAbsolutePath()); } catch (Exception e) { - logger.error("Can't create folder [{}]", tempDir.toAbsolutePath() + "/repository"); + logger.error("RepositoryManager/initialization: Can't create folder [{}]", repositoryPath.toAbsolutePath()); + repositoryPath = null; throw new AutomatorException("Can't create folder[" + tempDir.toAbsolutePath() + "/repository]"); } } - logger.info("RepositoryManager: directory under [{}] ", repositoryPath.toAbsolutePath()); - + logger.info("RepositoryManager/initialization: Folder [{}]", repositoryPath.toAbsolutePath()); } public Path addResource(Resource resource) throws IOException { diff --git a/src/main/java/org/camunda/automator/definition/Scenario.java b/src/main/java/org/camunda/automator/definition/Scenario.java index 461abee..660b6c8 100644 --- a/src/main/java/org/camunda/automator/definition/Scenario.java +++ b/src/main/java/org/camunda/automator/definition/Scenario.java @@ -199,11 +199,11 @@ private void afterUnSerialize() { public Map getDescription() { - return Map.of("name", name==null?"":name,// - "server", serverName==null? "": serverName, // - "serverType", serverType==null?"": serverType, // - "processId", processId==null?"":processId, // - "typeScenario", typeScenario==null? "": typeScenario.toString()); + return Map.of("name", name == null ? "" : name,// + "server", serverName == null ? "" : serverName, // + "serverType", serverType == null ? "" : serverType, // + "processId", processId == null ? "" : processId, // + "typeScenario", typeScenario == null ? "" : typeScenario.toString()); } public enum TYPESCENARIO {FLOW, UNIT} diff --git a/src/main/java/org/camunda/automator/definition/ScenarioExecution.java b/src/main/java/org/camunda/automator/definition/ScenarioExecution.java index 0d0af65..e49e5a0 100644 --- a/src/main/java/org/camunda/automator/definition/ScenarioExecution.java +++ b/src/main/java/org/camunda/automator/definition/ScenarioExecution.java @@ -120,6 +120,7 @@ public ScenarioExecution setName(String name) { return this; } + public int getNumberOfThreads() { return (numberOfThreads == null ? 1 : numberOfThreads <= 0 ? 1 : numberOfThreads); } diff --git a/src/main/java/org/camunda/automator/engine/RunResult.java b/src/main/java/org/camunda/automator/engine/RunResult.java index a112d40..54a764e 100644 --- a/src/main/java/org/camunda/automator/engine/RunResult.java +++ b/src/main/java/org/camunda/automator/engine/RunResult.java @@ -203,6 +203,17 @@ public void merge(RunResult result) { } } + /** + * Two execution on the exact same execution: we go for a merge plus one step more, we collect additionnal information, like processinstanceIdList + * @param result + */ + public void mergeDuplicateExecution(RunResult result) { + merge(result); + listProcessInstancesId.addAll(result.listProcessInstancesId); + listDetailsSteps.addAll(result.listDetailsSteps); + + } + public void add(RunResult runResult) { // We keep track of the result in a list listRunResults.add(runResult); @@ -230,6 +241,13 @@ public String getFirstProcessInstanceId() { return listProcessInstancesId.isEmpty() ? null : listProcessInstancesId.get(0); } + public List getListProcessInstancesId() { + return listProcessInstancesId; + } + + public void addListProcessInstancesId(List listProcessInstancesId) { + listProcessInstancesId.addAll(listProcessInstancesId); + } public List getProcessInstanceId() { return this.listProcessInstancesId; } diff --git a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowServiceTask.java b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowServiceTask.java index bb799a8..c4ed9d1 100644 --- a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowServiceTask.java +++ b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowServiceTask.java @@ -6,15 +6,11 @@ /* ******************************************************************** */ package org.camunda.automator.engine.flow; -import io.camunda.zeebe.client.api.command.CompleteJobCommandStep1; -import io.camunda.zeebe.client.api.command.FinalCommandStep; import io.camunda.zeebe.client.api.response.ActivatedJob; import io.camunda.zeebe.client.api.worker.JobClient; import io.camunda.zeebe.client.api.worker.JobHandler; -import io.camunda.zeebe.spring.client.jobhandling.CommandWrapper; import org.camunda.automator.bpmnengine.BpmnEngine; import org.camunda.automator.bpmnengine.camunda8.BenchmarkCompleteJobExceptionHandlingStrategy; -import org.camunda.automator.bpmnengine.camunda8.refactoring.RefactoredCommandWrapper; import org.camunda.automator.definition.ScenarioStep; import org.camunda.automator.engine.RunResult; import org.camunda.automator.engine.RunScenario; @@ -36,12 +32,11 @@ public class RunScenarioFlowServiceTask extends RunScenarioFlowBasic { private static final TrackActiveWorker trackAsynchronousWorkers = new TrackActiveWorker(); private final TaskScheduler scheduler; private final Semaphore semaphore; + private final BenchmarkCompleteJobExceptionHandlingStrategy exceptionHandlingStrategy = null; Logger logger = LoggerFactory.getLogger(RunScenarioFlowServiceTask.class); private BpmnEngine.RegisteredTask registeredTask; private boolean stopping; - private final BenchmarkCompleteJobExceptionHandlingStrategy exceptionHandlingStrategy = null; - public RunScenarioFlowServiceTask(TaskScheduler scheduler, ScenarioStep scenarioStep, RunScenario runScenario, @@ -173,6 +168,15 @@ public void handle(JobClient jobClient, ActivatedJob activatedJob) throws Except } } + /** + * This method execute the jib and wait for the result of the sending + * + * @param externalTask external task (C7 engine) + * @param externalTaskService service (C7 engine) + * @param jobClient jobClient (C8 engine) + * @param activatedJob activated Job (C8 engine) + * @param waitTimeInMs Wait time to simulate a worker + */ private void manageWaitExecution(org.camunda.bpm.client.task.ExternalTask externalTask, ExternalTaskService externalTaskService, JobClient jobClient, @@ -200,11 +204,7 @@ private void manageWaitExecution(org.camunda.bpm.client.task.ExternalTask extern /* C8 */ if (jobClient != null) { currentVariables = activatedJob.getVariablesAsMap(); - CompleteJobCommandStep1 completeCommand = jobClient.newCompleteCommand(activatedJob.getKey()); - CommandWrapper command = new RefactoredCommandWrapper((FinalCommandStep) completeCommand, - activatedJob.getDeadline(), activatedJob.toString(), exceptionHandlingStrategy); - - command.executeAsync(); + jobClient.newCompleteCommand(activatedJob.getKey()).send().join(); } flowServiceTask.runResult.registerAddStepExecution(); @@ -232,6 +232,16 @@ private void manageWaitExecution(org.camunda.bpm.client.task.ExternalTask extern } + /** + * Run the server in a different thread, so the library does not wait for the answer. Simulate a Reactive Programming + * In the thread, the execution wait for the durationSleep, then it call the manageWaitExecution, with a delay of 0. + * Then, no thread is engaged during the waiting. + * + * @param externalTask external task (C7 engine) + * @param externalTaskService service (C7 engine) + * @param jobClient jobClient (C8 engine) + * @param activatedJob activated Job (C8 engine) + */ private void manageAsynchronousExecution(org.camunda.bpm.client.task.ExternalTask externalTask, ExternalTaskService externalTaskService, JobClient jobClient, @@ -248,6 +258,17 @@ public void run() { }, Instant.now().plusMillis(durationSleep.toMillis())); } + /** + * Simulate a Asynchronous Token implementation. + * A token is get (number of token is limited in the step), and when it gets a token, manage the execution asynchronously. + * With that implementation, we ensure there are only execution at a time, to control the access to an external service + * Because the getToken is in the method, the library waits if there are no more token, and don't acquire new job + * + * @param externalTask external task (C7 engine) + * @param externalTaskService service (C7 engine) + * @param jobClient jobClient (C8 engine) + * @param activatedJob activated Job (C8 engine) + */ private void manageAsynchronousLimitedExecution(org.camunda.bpm.client.task.ExternalTask externalTask, ExternalTaskService externalTaskService, JobClient jobClient, diff --git a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnit.java b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnit.java index d4c20f2..2e6967e 100644 --- a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnit.java +++ b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnit.java @@ -63,8 +63,9 @@ public RunResult runExecution() { try { for (Future f : listFutures) { Object scnRunResult = f.get(); - resultExecution.merge((RunResult) scnRunResult); - + if (scnRunResult instanceof RunResult runResultInstances) { + resultExecution.mergeDuplicateExecution(runResultInstances); + } } } catch (Exception e) { diff --git a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitServiceTask.java b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitServiceTask.java index 3773877..9e4a310 100644 --- a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitServiceTask.java +++ b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitServiceTask.java @@ -56,7 +56,7 @@ public RunResult executeServiceTask(ScenarioStep step, RunResult result) { do { listActivities = runScenario.getBpmnEngine() - .searchServiceTasks(result.getFirstProcessInstanceId(), step.getTaskId(), step.getTopic(), 1); + .activateServiceTasks(result.getFirstProcessInstanceId(), step.getTaskId(), step.getTopic(), 1); if (listActivities.isEmpty()) { try { diff --git a/src/main/java/org/camunda/automator/services/AutomatorStartup.java b/src/main/java/org/camunda/automator/services/AutomatorStartup.java index c5eb633..5f48609 100644 --- a/src/main/java/org/camunda/automator/services/AutomatorStartup.java +++ b/src/main/java/org/camunda/automator/services/AutomatorStartup.java @@ -75,6 +75,61 @@ private void runFixedWarmup() { } } + private List loadStartupScenario() { + List scenarioList = new ArrayList<>(); + // File + if (configurationStartup.getScenarioFileAtStartup().isEmpty()) { + logger.info("AutomatorStartup/StartupScenario: no scenario [File] from {} given", configurationStartup.getScenarioFileAtStartupName()); + } else { + logger.info("Detect {} scenario [File] from variable [{}] ScenarioPath[{}]", + configurationStartup.getScenarioFileAtStartup().size(), configurationStartup.getScenarioFileAtStartupName(), + configurationStartup.scenarioPath); + + for (String scenarioFileName : configurationStartup.getScenarioFileAtStartup()) { + logger.info("AutomatorStartup/StartupScenario: Register scenario [File] [{}]", scenarioFileName); + + Path scenarioFile = Paths.get(configurationStartup.scenarioPath + "/" + scenarioFileName); + if (!Files.exists(scenarioFile)) { + scenarioFile = Paths.get(scenarioFileName); + } + if (Files.exists(scenarioFile)) { + try { + contentManager.addFile(scenarioFile); + } catch (IOException e) { + logger.error("AutomatorStartup/StartupScenario: File [{}] Can't add in the repository: {}", scenarioFile.toAbsolutePath(), e.getMessage()); + } + } else { + logger.error("AutomatorStartup/StartupScenario:: Can't find File [{}/{}] or [{}]", configurationStartup.scenarioPath, + scenarioFileName, scenarioFileName); + continue; + } + } + + } + + // Resource + if (configurationStartup.getScenarioResourceAtStartup().isEmpty()) { + logger.info("No scenario [Resource] from variable {} given", + configurationStartup.getScenarioResourceAtStartupName()); + } else { + List scenarioResource = configurationStartup.getScenarioResourceAtStartup().stream() + .filter(t -> t != null) + .collect(Collectors.toList()); + + logger.info("Detect {} scenario [Resource] from variable [{}]", + scenarioResource.size(), + configurationStartup.getScenarioResourceAtStartupName()); + for (Resource resource : scenarioResource) { + try { + scenarioList.add(contentManager.addResource(resource)); + } catch (IOException e) { + logger.error("Error loading resource [{}]", resource.getFilename()); + } + } + } + + return scenarioList; + } /** * AutomatorSetupRunnable - run in parallel @@ -220,61 +275,5 @@ public void run() { } } - private List loadStartupScenario() { - List scenarioList = new ArrayList<>(); - // File - if (configurationStartup.getScenarioFileAtStartup().isEmpty()) { - logger.info("AutomatorStartup/StartupScenario: no scenario [File] from {} given", configurationStartup.getScenarioFileAtStartupName()); - } else { - logger.info("Detect {} scenario [File] from variable [{}] ScenarioPath[{}]", - configurationStartup.getScenarioFileAtStartup().size(), configurationStartup.getScenarioFileAtStartupName(), - configurationStartup.scenarioPath); - - for (String scenarioFileName : configurationStartup.getScenarioFileAtStartup()) { - logger.info("AutomatorStartup/StartupScenario: Register scenario [File] [{}]", scenarioFileName); - - Path scenarioFile = Paths.get(configurationStartup.scenarioPath + "/" + scenarioFileName); - if (!Files.exists(scenarioFile)) { - scenarioFile = Paths.get(scenarioFileName); - } - if (Files.exists(scenarioFile)) { - try { - contentManager.addFile(scenarioFile); - }catch (IOException e) { - logger.error("AutomatorStartup/StartupScenario: File [{}] Can't add in the repository: {}", scenarioFile.toAbsolutePath().toString(), e.getMessage()); - } - } else { - logger.error("AutomatorStartup/StartupScenario:: Can't find File [{}/{}] or [{}]", configurationStartup.scenarioPath, - scenarioFileName, scenarioFileName); - continue; - } - } - - } - - // Resource - if (configurationStartup.getScenarioResourceAtStartup().isEmpty()) { - logger.info("No scenario [Resource] from variable {} given", - configurationStartup.getScenarioResourceAtStartupName()); - } else { - List scenarioResource = configurationStartup.getScenarioResourceAtStartup().stream() - .filter(t -> t != null) - .collect(Collectors.toList()); - - logger.info("Detect {} scenario [Resource] from variable [{}]", - scenarioResource.size(), - configurationStartup.getScenarioResourceAtStartupName()); - for (Resource resource : scenarioResource) { - try { - scenarioList.add(contentManager.addResource(resource)); - } catch (IOException e) { - logger.error("Error loading resource [{}]", resource.getFilename()); - } - } - } - - return scenarioList; - } - } diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index 729dc17..a99b1f0 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -3,8 +3,14 @@ automator: scheduler: content: - repositoryPath: "c:/temp/processautomator" - uploadPath: "C:/temp/upload" + # RepositoryPath is null, so it will use the temporary path of the machine + repositoryPath: + + # for convenience raison, upload all files on this path directly + # Does not start by /, so use the default location of the starter + uploadPath: "src/test/resources/uploadpath" + + # A scenario can be load via this variable scenario: startup: @@ -68,6 +74,21 @@ automator: # -1 means : align the jobsActive to the workerExecutionThreads workerMaxJobsActive: -1 + - type: "camunda8" + description: "Kubernetes, Simple authentication" + name: "Camunda8Topaz" + zeebeGatewayAddress: "camunda-zeebe-gateway:26500" + zeebeRestAddress: "http://camunda-zeebe-gateway:9600" + operateUserName: "demo" + operateUserPassword: "demo" + operateUrl: "http://camunda-operate:80" + taskListUserName: "demo" + taskListUserPassword: "demo" + taskListUrl: "http://camunda-tasklist:80" + workerExecutionThreads: 200 + # -1 means : align the jobsActive to the workerExecutionThreads + workerMaxJobsActive: -1 + - type: "camunda8" name: "Camunda8Lazuli" description: "A Zeebe+Identity server" diff --git a/src/main/resources/banner.txt b/src/main/resources/banner.txt index a349ce2..f1be3d6 100644 --- a/src/main/resources/banner.txt +++ b/src/main/resources/banner.txt @@ -5,4 +5,4 @@ __________ __ |____| |__| \____/ \___ >___ >____ >____ > (____ /____/ |__| \____/|__|_| (____ /__| \____/|__| \/ \/ \/ \/ \/ \/ \/ - (v1.7.0) + (v1.8.7)