Skip to content

Commit

Permalink
4.1.0 Hight creation throughput
Browse files Browse the repository at this point in the history
  • Loading branch information
pierre-yves-monnet committed May 30, 2024
1 parent 27e46ac commit 44033de
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 50 deletions.
4 changes: 2 additions & 2 deletions doc/loadtestscenario/resources/C8CrawlUrlScn.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"processId": "CrawlUrl",
"frequency": "PT30S",
"numberOfExecutions": "200",
"nbWorkers": "1",
"numberOfWorkers": "1",
"variables": {"urlNotFound": false},
"variablesOperation": {
"loopcrawl": "generaterandomlist(10)"
Expand All @@ -70,7 +70,7 @@
"processId": "CrawlUrl",
"frequency": "PT1M",
"numberOfExecutions": "1",
"nbWorkers": "1",
"numberOfWorkers": "1",
"variables": {"urlNotFound": true},
"variablesOperation": {
"loopcrawl": "generaterandomlist(5)"
Expand Down
4 changes: 2 additions & 2 deletions doc/loadtestscenario/resources/C8CrawlUrlScn250.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
"processId": "CrawlUrl",
"frequency": "PT30S",
"numberOfExecutions": "250",
"nbWorkers": "1",
"numberOfWorkers": "1",
"variables": {"urlNotFound": false},
"variablesOperation": {
"loopcrawl": "generaterandomlist(10)"
Expand All @@ -70,7 +70,7 @@
"processId": "CrawlUrl",
"frequency": "PT1M",
"numberOfExecutions": "1",
"nbWorkers": "1",
"numberOfWorkers": "1",
"variables": {"urlNotFound": true},
"variablesOperation": {
"loopcrawl": "generaterandomlist(5)"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ public void connection() throws AutomatorException {

// connection is critical, so let build the analysis
StringBuilder analysis = new StringBuilder();


boolean isOk = true;

isOk = stillOk(serverDefinition.name, "ZeebeConnection", analysis, false, isOk);
Expand Down Expand Up @@ -232,7 +234,7 @@ public void connection() throws AutomatorException {
} catch (Exception e) {
zeebeClient = null;
throw new AutomatorException(
"Bad credential [" + serverDefinition.name + "] Analysis:" + analysis + " fail : " + e.getMessage());
"BadCredential[" + serverDefinition.name + "] Analysis:" + analysis + " : " + e.getMessage());
}

saOperate = new io.camunda.operate.auth.SaasAuthentication(serverDefinition.zeebeSaasClientId,
Expand Down Expand Up @@ -260,6 +262,9 @@ public void connection() throws AutomatorException {
throw new AutomatorException("Invalid configuration");

// ---------------- connection
boolean zeebeOk = false;
boolean operateOk = false;
boolean tasklistOk = false;
try {
isOk = stillOk(serverDefinition.workerExecutionThreads, "ExecutionThread", analysis, false, isOk);

Expand Down Expand Up @@ -292,6 +297,7 @@ public void connection() throws AutomatorException {

// Actually, if an error arrived, an exception is thrown
analysis.append(join != null ? "successfully," : "error");
zeebeOk = join != null;

isOk = stillOk(serverDefinition.operateUrl, "operateUrl", analysis, false, isOk);

Expand All @@ -300,6 +306,7 @@ public void connection() throws AutomatorException {
.authentication(saOperate)
.build();
analysis.append("successfully,");
operateOk = true;

// TaskList is not mandatory
if (serverDefinition.taskListUrl != null && !serverDefinition.taskListUrl.isEmpty()) {
Expand All @@ -310,14 +317,18 @@ public void connection() throws AutomatorException {
.authentication(saTaskList)
.build();
analysis.append("successfully,");
tasklistOk = true;
}
//get tasks assigned to demo
logger.info(analysis.toString());
logger.info("Zeebe: OK, Operate: OK, TaskList:OK " + analysis.toString());

} catch (Exception e) {
zeebeClient = null;
throw new AutomatorException(
"Can't connect to Server[" + serverDefinition.name + "] Analysis:" + analysis + " fail : " + e.getMessage());
throw new AutomatorException("NoConnection[" + serverDefinition.name // server name
+ "] Zeebe:" + (zeebeOk ? "OK" : "FAIL") // zeebe status
+ ", Operate:" + (operateOk ? "OK" : "FAIL") // Operate status
+ ", Tasklist:" + (tasklistOk ? "OK" : "FAIL") // taskList status
+ ", Analysis:" + analysis + " fail : " + e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ public MODEEXECUTION getModeExecution() {
*/
public String getId() {
return getType() + " " + switch (getType()) {
case STARTEVENT -> getProcessId() + "-" + getTaskId() + "-" + Thread.currentThread().getName();
case SERVICETASK -> getTopic() + "-" + Thread.currentThread().getName();
case STARTEVENT -> getProcessId() + "(" + getTaskId() + ")-" + Thread.currentThread().getId();
case SERVICETASK -> getTopic() + "-" + Thread.currentThread().getId();
default -> "";
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public CreateProcessInstanceThread(int executionBatchNumber,
*
* @param durationToCreateProcessInstances maximum duration to produce all PI
*/
public void startProcessInstance(Duration durationToCreateProcessInstances) {
public void createProcessInstances(Duration durationToCreateProcessInstances) {

int numberOfThreads = scenarioStep.getNumberOfWorkers() == 0 ? 1 : scenarioStep.getNumberOfWorkers();

Expand Down Expand Up @@ -95,7 +95,8 @@ public boolean isOverload() {
}

/**
* This subclass start a range of process instances
* This subclass start numberOfProcessInstanceToStart of process instances.
* Multiple threads doing the same operation are running at the same time.
*/
private class StartProcess implements Runnable {
private final ScenarioStep scenarioStep;
Expand Down Expand Up @@ -138,6 +139,10 @@ public StartProcess(int executionBatchNumber,
this.scenarioStep = scenarioStep;
}

/**
* This thread will create numberOfProcessInstanceToStart, but it monitor the time, and if the time is over
* the Duration, it stop
*/
@Override
public void run() {
boolean alreadyLoggedError = false;
Expand Down Expand Up @@ -174,10 +179,15 @@ public void run() {
long currentTimeMillis = System.currentTimeMillis();
Duration durationCurrent = durationToCreateProcessInstances.minusMillis(currentTimeMillis - begin);
if (durationCurrent.isNegative()) {
// take too long to create the required process instance, so stop now.
logger.info("batch_#" + executionBatchNumber + "-" + scenarioStep.getId()
+ " Take too long to created ProcessInstances: created {} when expected {} in {} ms", nbCreation,
scenarioStep.getNumberOfExecutions(), currentTimeMillis - begin);
// log only at the debug mode (thread per thread), in monitoring log only at batch level
if (runScenario.getRunParameters().showLevelDebug()) {
// take too long to create the required process instance, so stop now.
logger.info("batch_#{} {} Over the duration. Created {} when expected {} in {} ms",
executionBatchNumber,
scenarioStep.getId(),
nbCreation,
numberOfProcessInstanceToStart, currentTimeMillis - begin);
}
isOverload = true;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ public String getId() {
return scenarioStep.getId();
}

/**
* the task return the topic to address:
* - topic for a service task
* - taskId for a user task
*/
public abstract String getTopic();

public RunScenario getRunScenario() {
return runScenario;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ public RunScenarioFlowServiceTask(TaskScheduler scheduler,

}

@Override
public String getTopic() {
return getScenarioStep().getTopic();
}
@Override
public void execute() {
registerWorker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public RunScenarioFlowStartEvent(TaskScheduler scheduler,
super(scenarioStep, runScenario, runResult);
this.scheduler = scheduler;
}
@Override
public String getTopic() {
return getScenarioStep().getTaskId();
}


@Override
public void execute() {
Expand Down Expand Up @@ -84,7 +89,6 @@ class StartEventRunnable implements Runnable {

private int nbOverloaded = 0;
private int totalCreation = 0;
private int totalCreationGoal = 0;
private int totalFailed = 0;

public StartEventRunnable(TaskScheduler scheduler,
Expand All @@ -109,13 +113,14 @@ public void start() {

@Override
public void run() {
executionBatchNumber++;
if (flowStartEvent.stopping) {
if (runScenario.getRunParameters().showLevelMonitoring()) {
logger.info("Stop now [" + getId() + "]");
if (nbOverloaded > 0)
runResult.addError(scenarioStep,
"Overloaded " + nbOverloaded + " TotalCreation " + totalCreation + " Goal " + totalCreationGoal
"Overloaded " + nbOverloaded
+ " TotalCreation: " + totalCreation // total creation we see
+ " TheoricNumberExpectred: " + (scenarioStep.getNumberOfExecutions()*executionBatchNumber) // expected
+ " Process[" + scenarioStep.getProcessId() + "] Can't create PI at the required frequency");
if (totalFailed > 0)
runResult.addError(scenarioStep,
Expand All @@ -126,19 +131,23 @@ public void run() {
flowStartEvent.isRunning = false;
return;
}
executionBatchNumber++;

Duration durationToCreateProcessInstances = Duration.parse(scenarioStep.getFrequency());

long begin = System.currentTimeMillis();
boolean isOverloadSection = false;

totalCreationGoal += scenarioStep.getNumberOfExecutions();

// generate process instance
// generate process instance in multiple threads
CreateProcessInstanceThread createProcessInstanceThread = new CreateProcessInstanceThread(executionBatchNumber,
scenarioStep, runScenario, runResult);
createProcessInstanceThread.startProcessInstance(durationToCreateProcessInstances);

// creates all process instances, return when finish OR when duration is reach
createProcessInstanceThread.createProcessInstances(durationToCreateProcessInstances);

// Now collect data for the running time
totalCreation += createProcessInstanceThread.getTotalCreation();
totalFailed += createProcessInstanceThread.getTotalCreation();
totalFailed += createProcessInstanceThread.getTotalFailed();
List<String> listProcessInstances = createProcessInstanceThread.getListProcessInstances();
long end = System.currentTimeMillis();

Expand All @@ -161,9 +170,9 @@ public void run() {
+ "] Create (real/scenario)[" + createProcessInstanceThread.getTotalCreation() + "/"
+ scenarioStep.getNumberOfExecutions() // creation/target
+ (isOverloadSection ? "OVERLOAD" : "") // Overload marker
+ "] Failed[" + createProcessInstanceThread.getTotalCreation() // failed
+ "] Failed[" + createProcessInstanceThread.getTotalFailed() // failed
+ "] in " + (end - begin) + " ms " // time of operation
+ " Sleep[" + durationToWait.getSeconds() + " s] listPI(max20): " + listProcessInstances.stream()
+ " Sleep[" + durationToWait.getSeconds() + " s] listPI(first20): " + listProcessInstances.stream()
.collect(Collectors.joining(",")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public RunScenarioFlowUserTask(TaskScheduler scheduler,

}

@Override
public String getTopic() {
return getScenarioStep().getTaskId();
}

@Override
public void execute() {
RunScenarioFlowUserTask.UserTaskRunnable startEventRunnable = new RunScenarioFlowUserTask.UserTaskRunnable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class RunScenarioFlows {
private final ServiceAccess serviceAccess;
Expand Down Expand Up @@ -62,7 +63,7 @@ public void execute(RunResult runResult) {
runObjectives.setStartDate(startTestDate);

logger.info("ScenarioFlow: ------ Start");
List<RunScenarioFlowBasic> listFlows = startExecution();
List<RunScenarioFlowBasic> listFlows = startExecution(runScenarioWarmingUp.getListWarmingUpTask());

waitEndExecution(runObjectives, startTestDate, listFlows);

Expand All @@ -82,12 +83,14 @@ public void execute(RunResult runResult) {
logger.info("ScenarioFlow: ------ TheEnd");
}



/**
* Start execution
*
* @return list of Flow started
*/
private List<RunScenarioFlowBasic> startExecution() {
private List<RunScenarioFlowBasic> startExecution(List<RunScenarioFlowBasic> listWarmingTask) {
List<RunScenarioFlowBasic> listFlows = new ArrayList<>();
for (ScenarioStep scenarioStep : runScenario.getScenario().getFlows()) {
switch (scenarioStep.getType()) {
Expand All @@ -104,34 +107,55 @@ private List<RunScenarioFlowBasic> startExecution() {
}

case SERVICETASK -> {
Optional<RunScenarioFlowBasic> runServiceTaskOp = getFromList(listWarmingTask, scenarioStep.getTopic());

if (!runScenario.getRunParameters().isServiceTask()) {
logger.info("According configuration, SERVICETASK[{}] is fully disabled", scenarioStep.getTopic());
if (runServiceTaskOp.isPresent())
runServiceTaskOp.get().pleaseStop();
} else if (runScenario.getRunParameters().blockExecutionServiceTask(scenarioStep.getTopic())) {
logger.info("According configuration, SERVICETASK[{}] is disabled (only acceptable {})",
scenarioStep.getTopic(), runScenario.getRunParameters().getFilterServiceTask());
if (runServiceTaskOp.isPresent())
runServiceTaskOp.get().pleaseStop();
} else {
RunScenarioFlowServiceTask runServiceTask = new RunScenarioFlowServiceTask(
serviceAccess.getTaskScheduler("serviceTask"), scenarioStep, runScenario, new RunResult(runScenario));
runServiceTask.execute();
listFlows.add(runServiceTask);
if (runServiceTaskOp.isEmpty()) {
RunScenarioFlowServiceTask runServiceTask = new RunScenarioFlowServiceTask(serviceAccess.getTaskScheduler("serviceTask"), scenarioStep, runScenario, new RunResult(runScenario));
runServiceTask.execute();
listFlows.add(runServiceTask);
} else {
listFlows.add(runServiceTaskOp.get());
}
}
}

case USERTASK -> {
Optional<RunScenarioFlowBasic> runUserTaskOpt = getFromList(listWarmingTask, scenarioStep.getTaskId());

if (!runScenario.getRunParameters().isUserTask()) {
logger.info("According configuration, USERTASK[{}] is fully disabled", scenarioStep.getTaskId());
if (runUserTaskOpt.isPresent())
runUserTaskOpt.get().pleaseStop();
} else {
RunScenarioFlowUserTask runUserTask = new RunScenarioFlowUserTask(serviceAccess.getTaskScheduler("userTask"),
scenarioStep, 0, runScenario, new RunResult(runScenario));
runUserTask.execute();
listFlows.add(runUserTask);
if (runUserTaskOpt.isEmpty()) {
RunScenarioFlowUserTask runUserTask = new RunScenarioFlowUserTask(serviceAccess.getTaskScheduler("userTask"), scenarioStep, 0,
runScenario, new RunResult(runScenario));
runUserTask.execute();
listFlows.add(runUserTask);
} else {
listFlows.add(runUserTaskOpt.get());
}
}
}
}
}
return listFlows;
}


private Optional<RunScenarioFlowBasic> getFromList(List<RunScenarioFlowBasic> listTasks, String topic) {
return listTasks.stream().filter(t->t.getTopic().equals(topic)).findFirst();
}
/**
* Wait end of execution. according to the time in the scenario, wait this time
*
Expand Down
Loading

0 comments on commit 44033de

Please sign in to comment.