Skip to content

Commit

Permalink
adding smalrye fault tolerance
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagodolphine committed Nov 17, 2023
1 parent c9ecdf4 commit 521ea9e
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 38 deletions.
4 changes: 4 additions & 0 deletions data-index/data-index-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-fault-tolerance</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Instance;
import javax.inject.Inject;

import org.eclipse.microprofile.faulttolerance.Retry;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.index.event.mapper.ProcessInstanceEventMerger;
Expand Down Expand Up @@ -70,32 +70,22 @@ public class IndexingService {
@Inject
Instance<UserTaskInstanceEventMerger> userTaskInstanceMergers;

//retry in case of rare but possible race condition during the insert for the first registry
@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public void indexProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
//retry in case of rare but possible race condition during the insert for the first registry
ProcessInstance pi = executeWithRetry(event, this::handleProcessInstanceEvent, "indexing process instance");
ProcessInstance pi = handleProcessInstanceEvent(event);

ProcessDefinition definition = pi.getDefinition();

if (definition != null && !manager.getProcessDefinitionsCache().containsKey(definition.getKey())) {
//retry in case of rare but possible race condition during the insert for the first registry
executeWithRetry(definition, this::handleProcessDefinition, "indexing process definition");
LOGGER.debug("Stored Process Definition: {}", definition);
}
}

private ProcessDefinition handleProcessDefinition(ProcessDefinition definition) {
return manager.getProcessDefinitionsCache().put(definition.getKey(), definition);
handleProcessDefinition(definition);
}

private <R, T> R executeWithRetry(T input, Function<T, R> toExecute, String logMessage) {
R response;
try {
response = toExecute.apply(input);
} catch (ConcurrentModificationException e) {
LOGGER.warn("Retrying {} for {} {}", logMessage, input, e.getMessage());
response = toExecute.apply(input);
@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public void handleProcessDefinition(ProcessDefinition definition) {
if (definition != null && !manager.getProcessDefinitionsCache().containsKey(definition.getKey())) {
manager.getProcessDefinitionsCache().put(definition.getKey(), definition);
LOGGER.debug("Stored Process Definition: {}", definition);
}
return response;
}

private ProcessInstance handleProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
Expand All @@ -119,11 +109,9 @@ private ProcessInstance handleProcessInstanceEvent(ProcessInstanceDataEvent<?> e
return pi;
}

//retry in case of rare but possible race condition during the insert for the first registry
@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public <T> void indexUserTaskInstanceEvent(UserTaskInstanceDataEvent<T> event) {
executeWithRetry(event, this::handleUserTaskEvent, "indexing user task");
}

private <T> UserTaskInstance handleUserTaskEvent(UserTaskInstanceDataEvent<T> event) {
Optional<UserTaskInstance> found = Optional.ofNullable(manager.getUserTaskInstancesCache().get(event.getKogitoUserTaskInstanceId()));
UserTaskInstance ut;
if (found.isEmpty()) {
Expand All @@ -138,7 +126,7 @@ private <T> UserTaskInstance handleUserTaskEvent(UserTaskInstanceDataEvent<T> ev
userTaskInstanceMergers.stream().filter(e -> e.accept(event)).findAny().ifPresent(e -> e.merge(ut, event));
LOGGER.debug("Stored User Task Instance: {}", ut);

return manager.getUserTaskInstancesCache().put(ut.getId(), ut);
manager.getUserTaskInstancesCache().put(ut.getId(), ut);
}

public void indexJob(Job job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;

import javax.inject.Inject;
import javax.transaction.Transactional;

import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand All @@ -46,9 +49,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.restassured.RestAssured;
import io.restassured.http.ContentType;

import static io.restassured.RestAssured.given;
import static io.restassured.config.EncoderConfig.encoderConfig;
import static org.awaitility.Awaitility.await;
Expand Down Expand Up @@ -86,6 +86,7 @@
import static org.kie.kogito.index.service.GraphQLUtils.getUserTaskInstanceByIdAndStarted;
import static org.kie.kogito.index.service.GraphQLUtils.getUserTaskInstanceByIdAndState;
import static org.kie.kogito.index.service.GraphQLUtils.getUserTaskInstanceByIdNoActualOwner;
import static org.kie.kogito.index.test.TestUtils.PROCESS_VERSION;
import static org.kie.kogito.index.test.TestUtils.getJobCloudEvent;
import static org.kie.kogito.index.test.TestUtils.getProcessCloudEvent;
import static org.kie.kogito.index.test.TestUtils.getUserTaskCloudEvent;
Expand All @@ -100,9 +101,6 @@ public abstract class AbstractIndexingServiceIT extends AbstractIndexingIT {
@Inject
public DataIndexStorageService cacheService;

@Inject
IndexingService indexingService;

@BeforeAll
static void setup() {
RestAssured.config = RestAssured.config()
Expand Down Expand Up @@ -289,18 +287,25 @@ void testConcurrentProcessInstanceIndex() throws Exception {
String processInstanceId = UUID.randomUUID().toString();
//indexing multiple events in parallel to the same process instance id
for (int i = 0; i < max_instance_events; i++) {
addFutureEvent(futures, processId, processInstanceId, ACTIVE, executorService);
addFutureEvent(futures, processId, processInstanceId, PENDING, executorService);
addFutureEvent(futures, processId, processInstanceId, COMPLETED, executorService);
addFutureEvent(futures, processId, processInstanceId, ACTIVE, executorService, false);
addFutureEvent(futures, processId, processInstanceId, PENDING, executorService, false);
addFutureEvent(futures, processId, processInstanceId, ACTIVE, executorService, false);
addFutureEvent(futures, processId, processInstanceId, COMPLETED, executorService, true);
}
//wait for all futures to complete
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).get(30, TimeUnit.SECONDS);
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).get(10, TimeUnit.SECONDS);
ProcessInstanceStateDataEvent event = getProcessCloudEvent(processId, processInstanceId, COMPLETED, null, null, null, CURRENT_USER);
validateProcessDefinition(getProcessDefinitionByIdAndVersion(processId, PROCESS_VERSION), event);
validateProcessInstance(getProcessInstanceById(processInstanceId), event);
}

private void addFutureEvent(List<CompletableFuture<Void>> futures, String processId, String processInstanceId, ProcessInstanceState state, ExecutorService executorService) {
private void addFutureEvent(List<CompletableFuture<Void>> futures, String processId, String processInstanceId, ProcessInstanceState state, ExecutorService executorService, boolean delay) {
futures.add(CompletableFuture.runAsync(() -> {
if (delay) {
await().atLeast(5, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true));
}
ProcessInstanceStateDataEvent event = getProcessCloudEvent(processId, processInstanceId, state, null, null, null, CURRENT_USER);
indexingService.indexProcessInstanceEvent(event);
indexProcessCloudEvent(event);
}, executorService));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.inject.Inject;
import javax.persistence.EntityManager;
import javax.persistence.LockModeType;
import javax.transaction.Transactional;

import org.kie.kogito.index.model.ProcessDefinition;
import org.kie.kogito.index.postgresql.mapper.ProcessDefinitionEntityMapper;
Expand All @@ -45,6 +46,7 @@ public ProcessDefinitionEntityStorage(ProcessDefinitionEntityRepository reposito
e.getVersion()).getKey());
}

@Transactional
@Override
public boolean containsKey(String key) {
ProcessDefinitionEntityId id = new ProcessDefinitionEntityId(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public static ProcessInstanceStateDataEvent getProcessCloudEvent(String processI
.processName(getProcessName(processId))
.eventDate(new Date())
.state(status.ordinal())
.businessKey(getProcessName(processId))
.businessKey(processInstanceId)
.roles("admin")
.eventUser(identity)
.eventType(eventType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,9 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-oidc-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-fault-tolerance-deployment</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-quarkus-data-index-persistence-common-deployment</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-fault-tolerance-deployment</artifactId>
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-addons-quarkus-data-index-common-runtime</artifactId>
Expand Down

0 comments on commit 521ea9e

Please sign in to comment.