From dcd424ab259ef0df21f54dd5fb108f54ad0131a6 Mon Sep 17 00:00:00 2001 From: Tiago Dolphine Date: Mon, 4 Dec 2023 10:07:18 -0300 Subject: [PATCH] adjust concurrency IT and removing the definition indexing from the process instance indexing --- .../kogito/index/service/IndexingService.java | 15 +-------------- .../service/AbstractIndexingServiceIT.java | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java index 65306e2755..cafcab8789 100644 --- a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java @@ -78,20 +78,7 @@ public class IndexingService { //retry in case of rare but possible race condition during the insert for the first registry @Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class) public void indexProcessInstanceEvent(ProcessInstanceDataEvent event) { - ProcessInstance pi = handleProcessInstanceEvent(event); - - ProcessDefinition definition = pi.getDefinition(); - - //todo: can be removed - handleProcessDefinition(definition); - } - - @Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class) - public void handleProcessDefinition(ProcessDefinition definition) { - if (definition != null && !manager.getProcessDefinitionsCache().containsKey(definition.getKey())) { - manager.getProcessDefinitionsCache().put(definition.getKey(), definition); - LOGGER.debug("Stored Process Definition: {}", definition); - } + handleProcessInstanceEvent(event); } private ProcessInstance handleProcessInstanceEvent(ProcessInstanceDataEvent event) { diff --git a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/AbstractIndexingServiceIT.java b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/AbstractIndexingServiceIT.java index c6fad9cd07..acceac7a11 100644 --- a/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/AbstractIndexingServiceIT.java +++ b/data-index/data-index-service/data-index-service-common/src/test/java/org/kie/kogito/index/service/AbstractIndexingServiceIT.java @@ -27,14 +27,11 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; import javax.inject.Inject; import javax.transaction.Transactional; -import io.restassured.RestAssured; -import io.restassured.http.ContentType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -50,6 +47,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.restassured.RestAssured; +import io.restassured.http.ContentType; + import static io.restassured.RestAssured.given; import static io.restassured.config.EncoderConfig.encoderConfig; import static org.awaitility.Awaitility.await; @@ -295,11 +295,12 @@ void testConcurrentProcessInstanceIndex() throws Exception { addFutureEvent(futures, processId, processInstanceId, ACTIVE, executorService, false); addFutureEvent(futures, processId, processInstanceId, PENDING, executorService, false); addFutureEvent(futures, processId, processInstanceId, ACTIVE, executorService, false); - //delay the last event to assert later the state - addFutureEvent(futures, processId, processInstanceId, COMPLETED, executorService, true); + addFutureEvent(futures, processId, processInstanceId, COMPLETED, executorService, false); } + //delay the last event to assert later the state + addFutureEvent(futures, processId, processInstanceId, COMPLETED, executorService, true); //wait for all futures to complete - CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).get(10, TimeUnit.SECONDS); + CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).get(20, TimeUnit.SECONDS); ProcessInstanceStateDataEvent event = getProcessCloudEvent(processId, processInstanceId, COMPLETED, null, null, null, CURRENT_USER); validateProcessInstance(getProcessInstanceById(processInstanceId), event); } @@ -307,7 +308,11 @@ void testConcurrentProcessInstanceIndex() throws Exception { private void addFutureEvent(List> futures, String processId, String processInstanceId, ProcessInstanceState state, ExecutorService executorService, boolean delay) { futures.add(CompletableFuture.runAsync(() -> { if (delay) { - await().atLeast(500, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true)); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } ProcessInstanceStateDataEvent event = getProcessCloudEvent(processId, processInstanceId, state, null, null, null, CURRENT_USER); indexProcessCloudEvent(event);