Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(engine, engine-rest-core) Add a query criteria to retrieve all executing jobs #4494

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<#elseif requestMethod == "POST">
<#assign listSeparator = "">
</#if>

<#assign params = {
"jobDefinitionId": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,11 @@
"type": "boolean",
"desc": "Include jobs which belong to no tenant. Can be used in combination with `tenantIdIn`.
Value may only be `true`, as `false` is the default behavior."
},
"acquired": {
"type": "boolean",
"desc": "Only select jobs which are acquired, i.e., lock expiration date is not null, lock expiration
date is in future and suspension state is 1. Value may only be `true`, as `false` is the default
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
date is in future and suspension state is 1. Value may only be `true`, as `false` is the default
date is in the future. Value may only be `true`, as `false` is the default

To check if suspension state is 1 one can use the existing active filter

behavior."
}
}>
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class JobQueryDto extends AbstractQueryDto<JobQuery> {
protected List<String> tenantIds;
protected Boolean withoutTenantId;
protected Boolean includeJobsWithoutTenantId;
protected Boolean acquired;

protected List<ConditionQueryParameterDto> dueDates;
protected List<ConditionQueryParameterDto> createTimes;
Expand Down Expand Up @@ -234,6 +235,11 @@ public void setIncludeJobsWithoutTenantId(Boolean includeJobsWithoutTenantId) {
this.includeJobsWithoutTenantId = includeJobsWithoutTenantId;
}

@CamundaQueryParam(value="acquired", converter = BooleanConverter.class)
public void setAcquired(Boolean acquired) {
this.acquired = acquired;
}

@Override
protected boolean isValidSortByValue(String value) {
return VALID_SORT_BY_VALUES.contains(value);
Expand Down Expand Up @@ -418,6 +424,9 @@ String fieldName() {
if (TRUE.equals(includeJobsWithoutTenantId)) {
query.includeJobsWithoutTenantId();
}
if (TRUE.equals(acquired)) {
query.acquired();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,17 @@ public void testMessagesParameter() {
verify(mockQuery).list();
}

@Test
public void testAcquiredParameter() {
Map<String, Object> parameters = new HashMap<>();
parameters.put("acquired", MockProvider.EXAMPLE_ACQUIRED);

given().queryParams(parameters).then().expect().statusCode(Status.OK.getStatusCode()).when().get(JOBS_RESOURCE_URL);

verify(mockQuery).acquired();
verify(mockQuery).list();
}

@Test
public void testMessagesTimersParameter() {
Map<String, Object> parameters = new HashMap<>();
Expand Down Expand Up @@ -419,6 +430,7 @@ private Map<String, Object> getCompleteParameters() {
parameters.put("priorityLowerThanOrEquals", JOB_QUERY_MAX_PRIORITY);
parameters.put("priorityHigherThanOrEquals", JOB_QUERY_MIN_PRIORITY);
parameters.put("jobDefinitionId", MockProvider.EXAMPLE_JOB_DEFINITION_ID);
parameters.put("acquired", MockProvider.EXAMPLE_ACQUIRED);
return parameters;
}

Expand Down Expand Up @@ -456,6 +468,7 @@ private void verifyParameterQueryInvocations() {
verify(mockQuery).priorityLowerThanOrEquals(JOB_QUERY_MAX_PRIORITY);
verify(mockQuery).priorityHigherThanOrEquals(JOB_QUERY_MIN_PRIORITY);
verify(mockQuery).jobDefinitionId(MockProvider.EXAMPLE_JOB_DEFINITION_ID);
verify(mockQuery).acquired();
}

private void testDateParameters(DateParameters parameters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,8 @@ public abstract class MockProvider {
public static final String EXAMPLE_AUTHORIZATION_ID = "someAuthorizationId";
public static final int EXAMPLE_AUTHORIZATION_TYPE = 0;
public static final String EXAMPLE_AUTHORIZATION_TYPE_STRING = "0";
public static final Boolean EXAMPLE_ACQUIRED = true;


// process applications
public static final String EXAMPLE_PROCESS_APPLICATION_NAME = "aProcessApplication";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public class JobQueryImpl extends AbstractQuery<JobQuery, Job> implements JobQue
protected String failedActivityId;
protected boolean noRetriesLeft;
protected SuspensionState suspensionState;
protected boolean acquired;

protected boolean isTenantIdSet = false;
protected String[] tenantIds;
Expand Down Expand Up @@ -257,6 +258,11 @@ public JobQuery suspended() {
return this;
}

public JobQuery acquired() {
acquired = true;
return this;
}

@Override
protected boolean hasExcludingConditions() {
return super.hasExcludingConditions()
Expand Down Expand Up @@ -402,5 +408,8 @@ public boolean isWithException() {
public String getExceptionMessage() {
return exceptionMessage;
}
public boolean getAcquired() {
return acquired;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,10 @@ public interface JobQuery extends Query<JobQuery, Job> {
*/
JobQuery orderByTenantId();

/**
* Only select jobs that are currently being acquired,
* ie. lock expiration time is not null, lock expiration is in future and suspension state is 1
*/
JobQuery acquired();

}
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,14 @@
<if test="suspensionState != null">
and RES.SUSPENSION_STATE_ = #{suspensionState.stateCode}
</if>
<if test="acquired">
AND
RES.SUSPENSION_STATE_ = 1
AND
RES.LOCK_EXP_TIME_ IS NOT NULL
AND
RES.LOCK_EXP_TIME_ > #{now, jdbcType=TIMESTAMP}
Copy link
Member

@venetrius venetrius Jan 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<if test="acquired">
AND
RES.SUSPENSION_STATE_ = 1
AND
RES.LOCK_EXP_TIME_ IS NOT NULL
AND
RES.LOCK_EXP_TIME_ > #{now, jdbcType=TIMESTAMP}
<if test="acquired">
and
RES.LOCK_EXP_TIME_ is not null
and
RES.LOCK_EXP_TIME_ > #{now, jdbcType=TIMESTAMP}
  • In the code base SQL keywords or operators are written in lower case.
  • RES.SUSPENSION_STATE_ = 1 can be tested by the existing active filter

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@venetrius Thanks for the clarification. I will update the suggestions. Thank you again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@venetrius can you help me understanding the reason of splitting the query? If we split and user only set the query as 'acquired' (without setting the active flag, i.e false by default), wouldn't the query will also include suspended jobs along with active? Isnt the definition of 'acquired' means the job has to be 'active' and excludes all 'suspended' jobs?

I have also noticed the similar logic in 'executable' flag. It excludes the suspended process instances but does not exclude suspended jobs.
Here is the query:
image

and here is what document about it:
image

I am wondering if its intended. Can you please give me little bit a clarity on it?

</if>
<if test="isTenantIdSet">
<if test="tenantIds != null &amp;&amp; tenantIds.length > 0">
and ( RES.TENANT_ID_ in
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,31 @@ public void testQueryInvalidSortingUsage() {
}
}

@Test
public void testQueryByAcquired() {
Calendar lockExpDate = Calendar.getInstance();
//given - lock expiration date in future
lockExpDate.add(Calendar.MILLISECOND, 30000000);

createJobWithLockExpiration(lockExpDate.getTime());

Job job = managementService.createJobQuery().jobId(timerEntity.getId()).singleResult();
assertNotNull(job);

List<Job> list = managementService.createJobQuery().acquired().list();
assertEquals(list.size(), 1);
deleteJobInDatabase();

//given - lock expiration date in the past
lockExpDate.add(Calendar.MILLISECOND, -60000000);
createJobWithLockExpiration(lockExpDate.getTime());

list = managementService.createJobQuery().acquired().list();
assertEquals(list.size(), 0);

deleteJobInDatabase();
}

//helper ////////////////////////////////////////////////////////////

private void setRetries(final String processInstanceId, final int retries) {
Expand All @@ -886,6 +911,25 @@ public Void execute(CommandContext commandContext) {
});
}

private void createJobWithLockExpiration(Date lockDate) {
CommandExecutor commandExecutor = processEngineConfiguration.getCommandExecutorTxRequired();
commandExecutor.execute((Command<Void>) commandContext -> {
JobManager jobManager = commandContext.getJobManager();
timerEntity = new TimerEntity();
timerEntity.setLockOwner(UUID.randomUUID().toString());
timerEntity.setDuedate(new Date());
timerEntity.setRetries(0);
timerEntity.setLockExpirationTime(lockDate);

jobManager.insert(timerEntity);

assertNotNull(timerEntity.getId());

return null;

});
}

private ProcessInstance startProcessInstanceWithFailingJob() {
// start a process with a failing job
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("exceptionInJobExecution");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ Job.get = function(id, done) {
* @param {String} params.sorting.sortOrder Sort the results in a given order. Values may be asc for ascending order or desc for descending order.
* @param {String} [params.firstResult] Pagination of results. Specifies the index of the first result to return.
* @param {String} [params.maxResults] Pagination of results. Specifies the maximum number of results to return. Will return less results if there are no more results left.
* @param {Bool} [params.acquired] Select jobs which are acquired, ie. lock expiration time is not
* null,lock expiration is in future and suspension state is 1
* @param {Function} done
*/
Job.list = function(params, done) {
Expand Down
Loading