Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[incubator-kie-issues-1529] Avoid concurrency test execution in jbpm-test #3716

Merged
merged 6 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class InMemoryJobService implements JobsService, AutoCloseable {
protected ConcurrentHashMap<String, ScheduledFuture<?>> scheduledJobs = new ConcurrentHashMap<>();
private final Processes processes;

private static final ConcurrentHashMap<Processes, InMemoryJobService> INSTANCE = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<Integer, InMemoryJobService> INSTANCE = new ConcurrentHashMap<>();

protected InMemoryJobService(Processes processes, UnitOfWorkManager unitOfWorkManager) {
this(processes, unitOfWorkManager, new ScheduledThreadPoolExecutor(Integer.parseInt(System.getProperty(IN_MEMORY_JOB_SERVICE_POOL_SIZE_PROPERTY, "10"))));
Expand All @@ -68,14 +68,14 @@ protected InMemoryJobService(Processes processes, UnitOfWorkManager unitOfWorkMa
public static InMemoryJobService get(Processes processes, UnitOfWorkManager unitOfWorkManager) {
Objects.requireNonNull(processes);
Objects.requireNonNull(unitOfWorkManager);
return INSTANCE.computeIfAbsent(processes, k -> new InMemoryJobService(processes, unitOfWorkManager));
return INSTANCE.computeIfAbsent(processes.hashCode() + 7 * unitOfWorkManager.hashCode(), k -> new InMemoryJobService(processes, unitOfWorkManager));
}

public static InMemoryJobService get(Processes processes, UnitOfWorkManager unitOfWorkManager, ScheduledExecutorService scheduler) {
Objects.requireNonNull(processes);
Objects.requireNonNull(unitOfWorkManager);
Objects.requireNonNull(scheduler);
return INSTANCE.computeIfAbsent(processes, k -> new InMemoryJobService(processes, unitOfWorkManager, scheduler));
return INSTANCE.computeIfAbsent(processes.hashCode() + 7 * unitOfWorkManager.hashCode(), k -> new InMemoryJobService(processes, unitOfWorkManager, scheduler));
}

@Override
Expand Down Expand Up @@ -107,8 +107,12 @@ public String scheduleProcessInstanceJob(ProcessInstanceJobDescription descripti
}

public Runnable getSignalProcessInstanceCommand(ProcessInstanceJobDescription description, boolean remove, int limit) {
return new SignalProcessInstanceOnExpiredTimer(description.id(), description.timerId(), description
.processInstanceId(), description.processId(), remove, limit);
return new SignalProcessInstanceOnExpiredTimer(
description.id(),
description.timerId(),
description.processInstanceId(),
remove,
limit);
}

@Override
Expand All @@ -119,7 +123,10 @@ public boolean cancelJob(String id) {
public boolean cancelJob(String id, boolean force) {
LOGGER.debug("Cancel Job: {}", id);
if (scheduledJobs.containsKey(id)) {
return scheduledJobs.remove(id).cancel(force);
ScheduledFuture<?> future = scheduledJobs.remove(id);
if (!future.isDone()) {
return future.cancel(force);
}
}
return false;
}
Expand Down Expand Up @@ -147,15 +154,13 @@ private class SignalProcessInstanceOnExpiredTimer implements Runnable {
private boolean removeAtExecution;
private String processInstanceId;
private Integer limit;
private String processId;

private SignalProcessInstanceOnExpiredTimer(String id, String timerId, String processInstanceId, String processId, boolean removeAtExecution, Integer limit) {
private SignalProcessInstanceOnExpiredTimer(String id, String timerId, String processInstanceId, boolean removeAtExecution, Integer limit) {
this.id = id;
this.timerId = timerId;
this.processInstanceId = processInstanceId;
this.removeAtExecution = removeAtExecution;
this.limit = limit;
this.processId = processId;
}

@Override
Expand Down Expand Up @@ -216,12 +221,12 @@ public void run() {
});
limit--;
if (limit == 0) {
cancelJob(id, false);
cancelJob(id, true);
}
LOGGER.debug("Job {} completed", id);
} finally {
if (removeAtExecution) {
cancelJob(id, true);
cancelJob(id, false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ public boolean waitTillCompleted(long timeOut) {
return latch.await(timeOut, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.debug("Interrputed thread while waiting for all triggers");
logger.error("Interrputed thread while waiting for all triggers", e);
return false;
} catch (Exception e) {
logger.error("Error during waiting state", e);
return false;
}
}
Expand Down
2 changes: 0 additions & 2 deletions jbpm/jbpm-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@
tests -->
<org.jbpm.variable.strict>true</org.jbpm.variable.strict>
</systemPropertyVariables>
<forkCount>2</forkCount>
<parallel>all</parallel>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ public void testEventBasedSplitAfter() {

@Test
public void testEventBasedSplit2() {
ProcessCompletedCountDownProcessEventListener countDownListener = new ProcessCompletedCountDownProcessEventListener(2);
ProcessCompletedCountDownProcessEventListener countDownListener = new ProcessCompletedCountDownProcessEventListener(1);
Application app = ProcessTestHelper.newApplication();
ProcessTestHelper.registerProcessEventListener(app, countDownListener);
ProcessTestHelper.registerHandler(app, "Email1", new SystemOutWorkItemHandler());
Expand All @@ -506,6 +506,7 @@ public void testEventBasedSplit2() {

instance.send(Sig.of("Yes", "YesValue"));
assertThat(instance.status()).isEqualTo(org.kie.kogito.process.ProcessInstance.STATE_COMPLETED);
countDownListener.reset(1);

instance = processDefinition.createInstance(model);
instance.start();
Expand Down
Loading