Skip to content

Commit

Permalink
More control
Browse files Browse the repository at this point in the history
  • Loading branch information
pierre-yves-monnet committed Jul 17, 2024
1 parent 44033de commit f87d352
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,20 @@ public static class BpmnServerDefinition {
public Integer workerExecutionThreads = Integer.valueOf(DEFAULT_VALUE_EXECUTION_THREADS);
public Integer workerMaxJobsActive = Integer.valueOf(DEFAULT_VALUE_MAX_JOBS_ACTIVE);


public String getSynthesis() {
String synthesis= serverType.name();
if (serverType.equals(CamundaEngine.CAMUNDA_7)) {
synthesis+=" url["+camunda7ServerUrl+"] userName["+camunda7UserName+"]";
}
if (serverType.equals(CamundaEngine.CAMUNDA_8) ) {
synthesis+=" address["+zeebeGatewayAddress+"] workerThread["+workerExecutionThreads+"] MaxJobActive["+workerMaxJobsActive+"]";
}
if (serverType.equals(CamundaEngine.CAMUNDA_8_SAAS) ) {
synthesis+=" clientId["+zeebeSaasClientId+"] workerThread["+workerExecutionThreads+"] MaxJobActive["+workerMaxJobsActive+"]";
}
return synthesis;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ public MODEEXECUTION getModeExecution() {
*/
public String getId() {
return getType() + " " + switch (getType()) {
case STARTEVENT -> getProcessId() + "(" + getTaskId() + ")-" + Thread.currentThread().getId();
case SERVICETASK -> getTopic() + "-" + Thread.currentThread().getId();
case STARTEVENT -> getProcessId() + "(" + getTaskId() + ")";
case SERVICETASK -> getTopic();
default -> "";
};
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/camunda/automator/engine/RunResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,9 @@ public void add(RecordCreationPI record) {
nbCreated += record.nbCreated;
nbFailed += record.nbFailed;
}
public String toString() {
return "Created["+nbCreated+"] Failed["+nbFailed+"]";
}
}

public class VerificationStatus {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public List<String> getListProcessInstances() {
return listStartProcess.stream().flatMap(t -> t.listProcessInstances.stream()).collect(Collectors.toList());
}

public int getNumberOfRunningThreads() {
return (int) listStartProcess.stream().filter(t->t.isRunning()).count();
}
public int getTotalCreation() {
return listStartProcess.stream().mapToInt(t -> t.nbCreation).sum();
}
Expand Down Expand Up @@ -112,6 +115,7 @@ private class StartProcess implements Runnable {
* the batch number
*/
boolean isOverload = false;
boolean isRunning = false;
Duration durationToCreateProcessInstances;

/**
Expand Down Expand Up @@ -145,6 +149,7 @@ public StartProcess(int executionBatchNumber,
*/
@Override
public void run() {
isRunning=true;
boolean alreadyLoggedError = false;
isOverload = false;
long begin = System.currentTimeMillis();
Expand Down Expand Up @@ -192,6 +197,10 @@ public void run() {
break;
}
}
isRunning=false;
}
public boolean isRunning() {
return isRunning;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,10 @@ private ObjectiveResult checkObjectiveCreated(ScenarioFlowControl.Objective obje
int percent = (int) (100.0 * objectiveResult.recordedSuccessValue / (objective.value == 0 ? 1 : objective.value));

objectiveResult.analysis +=
"Objective " + objective.getInformation() + ": ObjectiveCreation[" + objective.value // objective
"Objective " + objective.getInformation() // informatin
+ ": Goal[" + objective.value // objective
+ "] Created(zeebeAPI)[" + processInstancesCreatedAPI // Value by the API, not really accurate
+ "] Create(AutomatorRecord)[" + objectiveResult.recordedSuccessValue // value recorded by automator
+ "] Created(AutomatorRecord)[" + objectiveResult.recordedSuccessValue // value recorded by automator
+ " (" + percent + " % )" // percent based on the recorded value
+ " CreateFail(AutomatorRecord)[" + objectiveResult.recordedFailValue + "]";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ private void registerWorker() {
Duration durationSleep = getScenarioStep().getWaitingTimeDuration(Duration.ZERO);
durationSleep = durationSleep.plusSeconds(10);

if (getRunScenario().getRunParameters().showLevelMonitoring()) {
logger.info("Start service TaskId[{}] Topic[{}] StreamEnable:{} DurationSleep[{} ms]",
getScenarioStep().getTaskId(),
getScenarioStep().getTopic(),
getScenarioStep().isStreamEnable(),
durationSleep.toMillis());
}

registeredTask = bpmnEngine.registerServiceTask(getId(), // workerId
getScenarioStep().getTopic(), // topic
getScenarioStep().isStreamEnable(), // stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@ public RunScenarioFlowStartEvent(TaskScheduler scheduler,
super(scenarioStep, runScenario, runResult);
this.scheduler = scheduler;
}

@Override
public String getTopic() {
return getScenarioStep().getTaskId();
}

StartEventRunnable startEventRunnable;

@Override
public void execute() {
stopping = false;
isRunning = true;
StartEventRunnable startEventRunnable = new StartEventRunnable(scheduler, getScenarioStep(), getRunScenario(), this,
runResult);
startEventRunnable = new StartEventRunnable(scheduler, getScenarioStep(), getRunScenario(), this, runResult);
startEventRunnable.start();
}

Expand All @@ -66,7 +67,13 @@ public RunScenarioFlowBasic.STATUS getStatus() {

@Override
public int getCurrentNumberOfThreads() {
return 0;
try {
return startEventRunnable == null ? 0 : startEventRunnable.getNumberOfRunningThreads();
} catch (Exception e) {
// do nothing
logger.error("During getCurrentNumberOfThreads : {}", e);
return 0;
}
}

@Override
Expand All @@ -91,6 +98,8 @@ class StartEventRunnable implements Runnable {
private int totalCreation = 0;
private int totalFailed = 0;

private CreateProcessInstanceThread createProcessInstanceThread = null;

public StartEventRunnable(TaskScheduler scheduler,
ScenarioStep scenarioStep,
RunScenario runScenario,
Expand All @@ -108,7 +117,6 @@ public StartEventRunnable(TaskScheduler scheduler,
*/
public void start() {
scheduler.schedule(this, Instant.now());

}

@Override
Expand All @@ -118,9 +126,9 @@ public void run() {
logger.info("Stop now [" + getId() + "]");
if (nbOverloaded > 0)
runResult.addError(scenarioStep,
"Overloaded " + nbOverloaded
+ " TotalCreation: " + totalCreation // total creation we see
+ " TheoricNumberExpectred: " + (scenarioStep.getNumberOfExecutions()*executionBatchNumber) // expected
"Overloaded:" + "" + nbOverloaded + " TotalCreation:" + totalCreation // total creation we see
+ " TheoricNumberExpectred:" + (scenarioStep.getNumberOfExecutions() * executionBatchNumber)
// expected
+ " Process[" + scenarioStep.getProcessId() + "] Can't create PI at the required frequency");
if (totalFailed > 0)
runResult.addError(scenarioStep,
Expand All @@ -139,8 +147,8 @@ public void run() {
boolean isOverloadSection = false;

// generate process instance in multiple threads
CreateProcessInstanceThread createProcessInstanceThread = new CreateProcessInstanceThread(executionBatchNumber,
scenarioStep, runScenario, runResult);
createProcessInstanceThread = new CreateProcessInstanceThread(executionBatchNumber, scenarioStep, runScenario,
runResult);

// creates all process instances, return when finish OR when duration is reach
createProcessInstanceThread.createProcessInstances(durationToCreateProcessInstances);
Expand All @@ -166,20 +174,26 @@ public void run() {

// report now
if (runScenario.getRunParameters().showLevelMonitoring() || createProcessInstanceThread.isOverload()) {
logger.info("Step #" + executionBatchNumber + "-" + getId() // id
+ "] Create (real/scenario)[" + createProcessInstanceThread.getTotalCreation() + "/"
+ scenarioStep.getNumberOfExecutions() // creation/target
+ (isOverloadSection ? "OVERLOAD" : "") // Overload marker
+ "] Failed[" + createProcessInstanceThread.getTotalFailed() // failed
+ "] in " + (end - begin) + " ms " // time of operation
+ " Sleep[" + durationToWait.getSeconds() + " s] listPI(first20): " + listProcessInstances.stream()
.collect(Collectors.joining(",")));
logger.info("Step #{}-{}" + " Create (real/scenario)[{}/{} {}]" // Overload marker
+ " Failed[{}] in {} ms " // time of operation
+ " Sleep[{} s] ", // end message
executionBatchNumber, getId(), createProcessInstanceThread.getTotalCreation(),
scenarioStep.getNumberOfExecutions(), (isOverloadSection ? "OVERLOAD" : ""),
createProcessInstanceThread.getTotalFailed(), (end - begin), durationToWait.getSeconds());
}
if (runScenario.getRunParameters().showLevelInfo()) {
logger.info(" listPI(first20): " + listProcessInstances.stream().collect(Collectors.joining(",")));

}

// Wait to restart
scheduler.schedule(this, Instant.now().plusMillis(durationToWait.toMillis()));

}

public int getNumberOfRunningThreads() {
return createProcessInstanceThread == null ? 0 : createProcessInstanceThread.getNumberOfRunningThreads();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ public void execute(RunResult runResult) {
logger.info("ScenarioFlow: ------ TheEnd");
}



/**
* Start execution
*
Expand Down Expand Up @@ -120,7 +118,8 @@ private List<RunScenarioFlowBasic> startExecution(List<RunScenarioFlowBasic> lis
runServiceTaskOp.get().pleaseStop();
} else {
if (runServiceTaskOp.isEmpty()) {
RunScenarioFlowServiceTask runServiceTask = new RunScenarioFlowServiceTask(serviceAccess.getTaskScheduler("serviceTask"), scenarioStep, runScenario, new RunResult(runScenario));
RunScenarioFlowServiceTask runServiceTask = new RunScenarioFlowServiceTask(
serviceAccess.getTaskScheduler("serviceTask"), scenarioStep, runScenario, new RunResult(runScenario));
runServiceTask.execute();
listFlows.add(runServiceTask);
} else {
Expand All @@ -138,8 +137,8 @@ private List<RunScenarioFlowBasic> startExecution(List<RunScenarioFlowBasic> lis
runUserTaskOpt.get().pleaseStop();
} else {
if (runUserTaskOpt.isEmpty()) {
RunScenarioFlowUserTask runUserTask = new RunScenarioFlowUserTask(serviceAccess.getTaskScheduler("userTask"), scenarioStep, 0,
runScenario, new RunResult(runScenario));
RunScenarioFlowUserTask runUserTask = new RunScenarioFlowUserTask(
serviceAccess.getTaskScheduler("userTask"), scenarioStep, 0, runScenario, new RunResult(runScenario));
runUserTask.execute();
listFlows.add(runUserTask);
} else {
Expand All @@ -152,10 +151,10 @@ private List<RunScenarioFlowBasic> startExecution(List<RunScenarioFlowBasic> lis
return listFlows;
}


private Optional<RunScenarioFlowBasic> getFromList(List<RunScenarioFlowBasic> listTasks, String topic) {
return listTasks.stream().filter(t->t.getTopic().equals(topic)).findFirst();
return listTasks.stream().filter(t -> t.getTopic().equals(topic)).findFirst();
}

/**
* Wait end of execution. according to the time in the scenario, wait this time
*
Expand Down Expand Up @@ -260,25 +259,27 @@ private void collectInformation(List<RunScenarioFlowBasic> listFlows,
private void checkObjectives(RunObjectives runObjectives, Date startTestDate, Date endTestDate, RunResult runResult) {

// Objectives ask Operate, which get the result with a delay. So, wait 1 mn
logger.info("CollectingData...");
logger.info("CollectingData... (sleep 30s)");
try {
Thread.sleep(1000 * 60);
Thread.sleep(1000 * 30);
} catch (InterruptedException e) {
// do nothing
}

List<RunObjectives.ObjectiveResult> listCheckResult = runObjectives.check();
for (RunObjectives.ObjectiveResult checkResult : listCheckResult) {
if (checkResult.success) {
logger.info("Objective: SUCCESS type {} label [{}} processId[{}] reach {} (objective is {} ) analysis [{}}",
logger.info("Objective: SUCCESS type[{}] label[{}} processId[{}] reach/objective {}/{} analysis [{}}",
checkResult.objective.type, checkResult.objective.label, checkResult.objective.processId,
checkResult.recordedSuccessValue, checkResult.objective.value, checkResult.analysis);
// do not need to log the error, already done

} else {
// do not need to log the error, already done
runResult.addError(null,
"Objective: FAIL " + checkResult.objective.getInformation() + " type " + checkResult.objective.type
+ " processId [" + checkResult.objective.processId + "] " + checkResult.analysis);
"Objective: FAIL " + checkResult.objective.getInformation() + " type[" + checkResult.objective.type
+ "] processId[" + checkResult.objective.processId // ProcessID
+ "] reach/objective " + checkResult.recordedSuccessValue // Reach
+ "/" + checkResult.objective.value // Objective
+ " " + checkResult.analysis);
}
}
}
Expand All @@ -304,7 +305,7 @@ private void logRealTime(List<RunScenarioFlowBasic> listFlows, long timeToFinish
long previousValue = previousValueMap.getOrDefault(flowBasic.getId(), 0L);

ScenarioStep scenarioStep = flowBasic.getScenarioStep();
String key = "[" + flowBasic.getId() + "] " + flowBasic.getStatus().toString() + " " + " currentNbThreads["
String key = "[" + flowBasic.getId() + "] " + flowBasic.getStatus().toString() + " currentNbThreads["
+ currentNumberOfThreads + "] ";
key += switch (scenarioStep.getType()) {
case STARTEVENT -> "PI[" + runResultFlow.getRecordCreationPI() + "] delta[" + (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,28 @@ public void warmingUp(RunResult runResult) {
listWarmingUpUserTask.clear();
List<StartEventWarmingUpRunnable> listWarmingUpStartEvent = new ArrayList<>();
List<ScenarioStep> listOperationWarmingUp = warmingUp.getOperations();
if (warmingUp.useServiceTasks) {
if (warmingUp.useServiceTasks && runScenario.getRunParameters().isServiceTask()) {
listOperationWarmingUp.addAll(runScenario.getScenario()
.getFlows()
.stream()
.filter(t -> t.getType().equals(ScenarioStep.Step.SERVICETASK))
.toList());
}
if (warmingUp.useUserTasks) {
if (warmingUp.useUserTasks && runScenario.getRunParameters().isUserTask()) {
listOperationWarmingUp.addAll(runScenario.getScenario()
.getFlows()
.stream()
.filter(t -> t.getType().equals(ScenarioStep.Step.USERTASK))
.toList());
}
logger.info("WarmingUp: Start ---- {} operations (useServiceTask {} useUserTask {})", listOperationWarmingUp.size(),
warmingUp.useServiceTasks, warmingUp.useUserTasks);

logger.info("WarmingUp: Start ---- {} operations (Scenario/Policy: serviceTask:{}/{} userTask: {}/{})",
listOperationWarmingUp.size(), // size of operations
warmingUp.useServiceTasks, // scenario allow service task?
runScenario.getRunParameters().isServiceTask(), // pod can run service task?
warmingUp.useUserTasks,
runScenario.getRunParameters().isUserTask() // pod can run User Task?
);

for (ScenarioStep scenarioStep : listOperationWarmingUp) {
switch (scenarioStep.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ else if (scenarioObject instanceof Resource scenarioResource) {
throw new AutomatorException(
"Server [" + runParameters.getServerName() + "] does not exist in the list");

if (runParameters.showLevelMonitoring())
{
logger.info("Run scenario with Server {}", serverDefinition.getSynthesis());
}
bpmnEngine = automatorAPI.getBpmnEngine(serverDefinition, true);
}
if (runParameters.showLevelDashboard()) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/banner.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ __________ __
| | | | \( <_> ) \__\ ___/ \___ \ \___ \ /_____/ / __ \| | /| | ( <_> ) Y Y \/ __ \| | ( <_> ) | \/
|____| |__| \____/ \___ >___ >____ >____ > (____ /____/ |__| \____/|__|_| (____ /__| \____/|__|
\/ \/ \/ \/ \/ \/ \/
(v1.4.0)
(v1.4.0.c)

0 comments on commit f87d352

Please sign in to comment.