Skip to content

Commit

Permalink
[KOGITO-9909] Fix concurrency on incoming data-index events adding lo…
Browse files Browse the repository at this point in the history
…ck and merge + fix for duplicate key on process definition indexing (#1906)

* KOGITO-9909 Fix concurrency on incoming data-index events adding lock and merge + fix for duplicate key on process definition indexing
  • Loading branch information
tiagodolphine authored Nov 21, 2023
1 parent af44858 commit d17609f
Show file tree
Hide file tree
Showing 35 changed files with 209 additions and 139 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -586,18 +587,23 @@ public void testProcessGatewayAPIAttachments(String taskId, String processInstan
.body("$.size()", is(1))
.body("[0].name", is(attachmentName)));

Map<String, String> attachmentMap = given().spec(dataIndexSpec()).contentType(ContentType.JSON)
.body("{ \"query\" : \"{ UserTaskInstances (where: { processInstanceId: {equal: \\\"" + processInstanceId + "\\\"}}) { " +
"id description priority potentialGroups attachments {id name content updatedBy updatedAt} } }\"}")
.when().post("/graphql")
.then()
.statusCode(200)
.body("data.UserTaskInstances[0].description", equalTo("NewDescription"))
.body("data.UserTaskInstances[0].priority", equalTo("low"))
.body("data.UserTaskInstances[0].potentialGroups[0]", equalTo("managers"))
.body("data.UserTaskInstances[0].attachments.size()", is(1))
.body("data.UserTaskInstances[0].attachments[0].name", equalTo(attachmentName))
.extract().jsonPath().getMap("data.UserTaskInstances[0].attachments[0]");
AtomicReference<Map<String, String>> attachmentMapRef = new AtomicReference<>();
await()
.atMost(TIMEOUT)
.untilAsserted(() -> attachmentMapRef.set(given().spec(dataIndexSpec()).contentType(ContentType.JSON)
.body("{ \"query\" : \"{ UserTaskInstances (where: { processInstanceId: {equal: \\\"" + processInstanceId + "\\\"}}) { " +
"id description priority potentialGroups attachments {id name content updatedBy updatedAt} } }\"}")
.when().post("/graphql")
.then()
.statusCode(200)
.body("data.UserTaskInstances[0].description", equalTo("NewDescription"))
.body("data.UserTaskInstances[0].priority", equalTo("low"))
.body("data.UserTaskInstances[0].potentialGroups[0]", equalTo("managers"))
.body("data.UserTaskInstances[0].attachments.size()", is(1))
.body("data.UserTaskInstances[0].attachments[0].name", equalTo(attachmentName))
.extract().jsonPath().getMap("data.UserTaskInstances[0].attachments[0]")));

Map<String, String> attachmentMap = attachmentMapRef.get();

checkExpectedCreatedItemData(attachmentCreationResult, attachmentMap);

Expand Down
4 changes: 4 additions & 0 deletions data-index/data-index-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-fault-tolerance</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -29,6 +30,7 @@
import javax.enterprise.inject.Instance;
import javax.inject.Inject;

import org.eclipse.microprofile.faulttolerance.Retry;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.index.event.mapper.ProcessInstanceEventMerger;
Expand Down Expand Up @@ -68,7 +70,25 @@ public class IndexingService {
@Inject
Instance<UserTaskInstanceEventMerger> userTaskInstanceMergers;

//retry in case of rare but possible race condition during the insert for the first registry
@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public void indexProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
ProcessInstance pi = handleProcessInstanceEvent(event);

ProcessDefinition definition = pi.getDefinition();

handleProcessDefinition(definition);
}

@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public void handleProcessDefinition(ProcessDefinition definition) {
if (definition != null && !manager.getProcessDefinitionsCache().containsKey(definition.getKey())) {
manager.getProcessDefinitionsCache().put(definition.getKey(), definition);
LOGGER.debug("Stored Process Definition: {}", definition);
}
}

private ProcessInstance handleProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
Optional<ProcessInstance> found = Optional.ofNullable(manager.getProcessInstancesCache().get(event.getKogitoProcessInstanceId()));
ProcessInstance pi;
if (found.isEmpty()) {
Expand All @@ -81,21 +101,16 @@ public void indexProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
pi = found.get();

}

processInstanceMergers.stream().filter(e -> e.accept(event)).findAny().ifPresent(e -> e.merge(pi, event));

manager.getProcessInstancesCache().put(pi.getId(), pi);
LOGGER.debug("Stored Process Instance: {}", pi);

ProcessDefinition definition = pi.getDefinition();

if (definition != null) {
manager.getProcessDefinitionsCache().put(definition.getKey(), definition);
LOGGER.debug("Stored Process Definitioin: {}", definition);
}

LOGGER.debug("Stored Process Instance: {}", pi);
return pi;
}

//retry in case of rare but possible race condition during the insert for the first registry
@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public <T> void indexUserTaskInstanceEvent(UserTaskInstanceDataEvent<T> event) {
Optional<UserTaskInstance> found = Optional.ofNullable(manager.getUserTaskInstancesCache().get(event.getKogitoUserTaskInstanceId()));
UserTaskInstance ut;
Expand All @@ -112,7 +127,6 @@ public <T> void indexUserTaskInstanceEvent(UserTaskInstanceDataEvent<T> event) {
LOGGER.debug("Stored User Task Instance: {}", ut);

manager.getUserTaskInstancesCache().put(ut.getId(), ut);

}

public void indexJob(Job job) {
Expand All @@ -139,7 +153,7 @@ public void indexModel(ObjectNode updateData) {
cache.put(processInstanceId, newModel);
}

public ObjectNode merge(String processId, String type, String processInstanceId, ObjectNode persistedModel, ObjectNode updateData) {
private ObjectNode merge(String processId, String type, String processInstanceId, ObjectNode persistedModel, ObjectNode updateData) {
ObjectNode newModel = getObjectMapper().createObjectNode();
newModel.put("_type", type);
newModel.setAll(persistedModel);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ProcessDefinition {
addons: [String!]
roles: [String!]
type: String
endpoint: String!
endpoint: String
serviceUrl: String
}

Expand Down Expand Up @@ -72,13 +72,13 @@ type ProcessInstance {
rootProcessInstanceId: String
rootProcessId: String
roles: [String!]
state: ProcessInstanceState!
state: ProcessInstanceState
endpoint: String!
serviceUrl: String
nodes: [NodeInstance!]!
nodes: [NodeInstance!]
milestones: [Milestone!]
variables: JSON
start: DateTime!
start: DateTime
end: DateTime
parentProcessInstance: ProcessInstance
childProcessInstances: [ProcessInstance!]
Expand Down Expand Up @@ -345,15 +345,15 @@ type UserTaskInstance {
name: String
priority: String
processInstanceId: String!
processId: String!
processId: String
rootProcessInstanceId: String
rootProcessId: String
state: String!
state: String
actualOwner: String
adminGroups: [String!]
adminUsers: [String!]
completed: DateTime
started: DateTime!
started: DateTime
excludedUsers: [String!]
potentialGroups: [String!]
potentialUsers: [String!]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;

import javax.inject.Inject;
Expand All @@ -36,6 +41,7 @@
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent;
import org.kie.kogito.index.event.KogitoJobCloudEvent;
import org.kie.kogito.index.model.ProcessInstanceState;
import org.kie.kogito.index.storage.DataIndexStorageService;
import org.kie.kogito.index.test.TestUtils;
import org.slf4j.Logger;
Expand All @@ -58,6 +64,7 @@
import static org.kie.kogito.index.DateTimeUtils.formatZonedDateTime;
import static org.kie.kogito.index.model.ProcessInstanceState.ACTIVE;
import static org.kie.kogito.index.model.ProcessInstanceState.COMPLETED;
import static org.kie.kogito.index.model.ProcessInstanceState.PENDING;
import static org.kie.kogito.index.service.GraphQLUtils.getJobById;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessDefinitionByIdAndVersion;
import static org.kie.kogito.index.service.GraphQLUtils.getProcessInstanceByBusinessKey;
Expand All @@ -80,13 +87,15 @@
import static org.kie.kogito.index.service.GraphQLUtils.getUserTaskInstanceByIdAndStarted;
import static org.kie.kogito.index.service.GraphQLUtils.getUserTaskInstanceByIdAndState;
import static org.kie.kogito.index.service.GraphQLUtils.getUserTaskInstanceByIdNoActualOwner;
import static org.kie.kogito.index.test.TestUtils.PROCESS_VERSION;
import static org.kie.kogito.index.test.TestUtils.getJobCloudEvent;
import static org.kie.kogito.index.test.TestUtils.getProcessCloudEvent;
import static org.kie.kogito.index.test.TestUtils.getUserTaskCloudEvent;

public abstract class AbstractIndexingServiceIT extends AbstractIndexingIT {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractIndexingServiceIT.class);
public static final String CURRENT_USER = "currentUser";

Duration timeout = Duration.ofSeconds(30);

Expand Down Expand Up @@ -154,6 +163,7 @@ protected void validateProcessInstance(String query, ProcessInstanceStateDataEve
.body("data.ProcessInstances[0].processId", is(event.getData().getProcessId()))
.body("data.ProcessInstances[0].processName", is(event.getData().getProcessName()))
.body("data.ProcessInstances[0].version", is(event.getData().getProcessVersion()))
.body("data.ProcessInstances[0].state", is(ProcessInstanceState.fromStatus(event.getData().getState()).name()))
.body("data.ProcessInstances[0].rootProcessId", is(event.getData().getRootProcessId()))
.body("data.ProcessInstances[0].rootProcessInstanceId", is(event.getData().getRootProcessInstanceId()))
.body("data.ProcessInstances[0].parentProcessInstanceId", is(event.getData().getParentInstanceId()))
Expand Down Expand Up @@ -182,7 +192,7 @@ void testProcessInstancePagination() {
IntStream.range(0, 100).forEach(i -> {
String pId = UUID.randomUUID().toString();

ProcessInstanceDataEvent<?> startEvent = getProcessCloudEvent(processId, pId, ACTIVE, null, null, null, "currentUser");
ProcessInstanceDataEvent<?> startEvent = getProcessCloudEvent(processId, pId, ACTIVE, null, null, null, CURRENT_USER);

indexProcessCloudEvent(startEvent);
pIds.add(pId);
Expand Down Expand Up @@ -269,14 +279,45 @@ void testUserTaskInstancePagination() {
.body("data.UserTaskInstances[99].id", is(taskIds.get(99))));
}

@Test
void testConcurrentProcessInstanceIndex() throws Exception {
String processId = "travels";
ExecutorService executorService = new ScheduledThreadPoolExecutor(8);
int max_instance_events = 20;
List<CompletableFuture<Void>> futures = new ArrayList<>();
String processInstanceId = UUID.randomUUID().toString();
//indexing multiple events in parallel to the same process instance id
for (int i = 0; i < max_instance_events; i++) {
addFutureEvent(futures, processId, processInstanceId, ACTIVE, executorService, false);
addFutureEvent(futures, processId, processInstanceId, PENDING, executorService, false);
addFutureEvent(futures, processId, processInstanceId, ACTIVE, executorService, false);
addFutureEvent(futures, processId, processInstanceId, COMPLETED, executorService, true);
}
//wait for all futures to complete
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).get(10, TimeUnit.SECONDS);
ProcessInstanceStateDataEvent event = getProcessCloudEvent(processId, processInstanceId, COMPLETED, null, null, null, CURRENT_USER);
validateProcessDefinition(getProcessDefinitionByIdAndVersion(processId, PROCESS_VERSION), event);
validateProcessInstance(getProcessInstanceById(processInstanceId), event);
}

private void addFutureEvent(List<CompletableFuture<Void>> futures, String processId, String processInstanceId, ProcessInstanceState state, ExecutorService executorService, boolean delay) {
futures.add(CompletableFuture.runAsync(() -> {
if (delay) {
await().atLeast(5, TimeUnit.MILLISECONDS).untilTrue(new AtomicBoolean(true));
}
ProcessInstanceStateDataEvent event = getProcessCloudEvent(processId, processInstanceId, state, null, null, null, CURRENT_USER);
indexProcessCloudEvent(event);
}, executorService));
}

@Test
void testProcessInstanceIndex() throws Exception {
String processId = "travels";
String processInstanceId = UUID.randomUUID().toString();
String subProcessId = processId + "_sub";
String subProcessInstanceId = UUID.randomUUID().toString();

ProcessInstanceStateDataEvent startEvent = (ProcessInstanceStateDataEvent) getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null, "currentUser");
ProcessInstanceStateDataEvent startEvent = getProcessCloudEvent(processId, processInstanceId, ACTIVE, null, null, null, CURRENT_USER);

indexProcessCloudEvent(startEvent);

Expand All @@ -292,14 +333,14 @@ void testProcessInstanceIndex() throws Exception {
validateProcessInstance(getProcessInstanceByCreatedBy(startEvent.getData().getEventUser()), startEvent);
validateProcessInstance(getProcessInstanceByUpdatedBy(startEvent.getData().getEventUser()), startEvent);

ProcessInstanceStateDataEvent endEvent = getProcessCloudEvent(processId, processInstanceId, COMPLETED, null, null, null, "currentUser");
ProcessInstanceStateDataEvent endEvent = getProcessCloudEvent(processId, processInstanceId, COMPLETED, null, null, null, CURRENT_USER);

indexProcessCloudEvent(endEvent);

validateProcessInstance(getProcessInstanceByIdAndState(processInstanceId, COMPLETED), endEvent);

ProcessInstanceStateDataEvent event = getProcessCloudEvent(subProcessId, subProcessInstanceId, ACTIVE, processInstanceId,
processId, processInstanceId, "currentUser");
processId, processInstanceId, CURRENT_USER);

indexProcessCloudEvent(event);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;

public class ProcessDefinition {

Expand Down Expand Up @@ -112,7 +114,11 @@ public String getKey() {
}

public static String toKey(String processId, String version) {
return String.format("%s-%s", processId, version);
return processId + "$v:" + version;
}

public static String[] fromKey(String key) {
return Optional.ofNullable(key).map(k -> k.split(Pattern.quote("$v:"))).orElse(new String[0]);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.function.BiConsumer;

import org.junit.jupiter.api.AfterEach;
import org.kie.kogito.persistence.api.Storage;
import org.kie.kogito.persistence.api.query.AttributeFilter;
import org.kie.kogito.persistence.api.query.AttributeSort;
Expand All @@ -37,11 +36,6 @@ protected Boolean isDateTimeAsLong() {
return true;
}

@AfterEach
void tearDown() {
getStorage().clear();
}

public void queryAndAssert(BiConsumer<List<V>, String[]> assertConsumer, Storage<K, V> storage, List<AttributeFilter<?>> filters, List<AttributeSort> sort, Integer offset, Integer limit,
String... ids) {
assertConsumer.accept(storage.query().filter(filters).sort(sort).offset(offset).limit(limit).execute(), ids);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import javax.inject.Inject;

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -61,7 +62,7 @@ void tearDown() {

@Test
void testCache() {
String processId = "travels";
String processId = RandomStringUtils.randomAlphabetic(10);
String version = "1.0";
ProcessDefinition pdv1 = TestUtils.createProcessDefinition(processId, version, Set.of("admin", "kogito"));
ProcessDefinition pdv2 = TestUtils.createProcessDefinition(processId, version, Set.of("kogito"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import javax.inject.Inject;

import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -57,7 +58,7 @@ void tearDown() {

@Test
void testCache() {
String processId = "travels";
String processId = RandomStringUtils.randomAlphabetic(10);
String type1 = "org.acme.travels.travels";
String type2 = "org.acme.travels";
testStorage(storage, processId, type1, type2);
Expand Down
Loading

0 comments on commit d17609f

Please sign in to comment.