From da4cee6ec1ce472bb9f235774159f0eebdcee714 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Mon, 14 Oct 2024 14:12:42 +0200 Subject: [PATCH 1/5] [Fix #2113] Data index group processing --- .../kogito/index/service/IndexingService.java | 8 +- .../index/storage/ProcessInstanceStorage.java | 3 + .../storage/ModelProcessInstanceStorage.java | 18 ++++ .../storage/ProcessInstanceEntityStorage.java | 82 +++++++++++++------ 4 files changed, 77 insertions(+), 34 deletions(-) diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java index f3cd153d5b..947e1972bf 100644 --- a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java @@ -79,14 +79,8 @@ public class IndexingService { public void indexProcessInstanceEvent(ProcessInstanceDataEvent event) { ProcessInstanceStorage storage = manager.getProcessInstanceStorage(); if (event instanceof MultipleProcessInstanceDataEvent) { - for (ProcessInstanceDataEvent item : ((MultipleProcessInstanceDataEvent) event).getData()) - indexProccessInstanceEvent(storage, item); - } else { - indexProccessInstanceEvent(storage, event); + storage.indexGroup(((MultipleProcessInstanceDataEvent) event)); } - } - - private void indexProccessInstanceEvent(ProcessInstanceStorage storage, ProcessInstanceDataEvent event) { if (event instanceof ProcessInstanceErrorDataEvent) { storage.indexError((ProcessInstanceErrorDataEvent) event); } else if (event instanceof ProcessInstanceNodeDataEvent) { diff --git a/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java b/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java index 753caf66bc..674cc17bd4 100644 --- a/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java +++ b/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/ProcessInstanceStorage.java @@ -18,6 +18,7 @@ */ package org.kie.kogito.index.storage; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; import org.kie.kogito.event.process.ProcessInstanceSLADataEvent; @@ -28,6 +29,8 @@ public interface ProcessInstanceStorage extends StorageFetcher { + void indexGroup(MultipleProcessInstanceDataEvent event); + void indexError(ProcessInstanceErrorDataEvent event); void indexNode(ProcessInstanceNodeDataEvent event); diff --git a/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java b/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java index 9f21497fc2..07b645bb70 100644 --- a/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java +++ b/data-index/data-index-storage/data-index-storage-common/src/main/java/org/kie/kogito/index/storage/ModelProcessInstanceStorage.java @@ -18,6 +18,7 @@ */ package org.kie.kogito.index.storage; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; @@ -69,6 +70,23 @@ public void indexVariable(ProcessInstanceVariableDataEvent event) { index(event, variableMerger); } + @Override + public void indexGroup(MultipleProcessInstanceDataEvent events) { + for (ProcessInstanceDataEvent event : events.getData()) { + if (event instanceof ProcessInstanceErrorDataEvent) { + index(event, errorMerger); + } else if (event instanceof ProcessInstanceNodeDataEvent) { + index(event, nodeMerger); + } else if (event instanceof ProcessInstanceSLADataEvent) { + index(event, slaMerger); + } else if (event instanceof ProcessInstanceStateDataEvent) { + index(event, stateMerger); + } else if (event instanceof ProcessInstanceVariableDataEvent) { + index(event, variableMerger); + } + } + } + private > void index(T event, ProcessInstanceEventMerger merger) { ProcessInstance processInstance = storage.get(event.getKogitoProcessInstanceId()); if (processInstance == null) { diff --git a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java index 386aabcf78..de23853a83 100644 --- a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java +++ b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java @@ -20,9 +20,11 @@ import java.time.ZonedDateTime; import java.util.ArrayList; -import java.util.Date; +import java.util.Iterator; import java.util.Set; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; +import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorEventBody; import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; @@ -64,43 +66,60 @@ public ProcessInstanceEntityStorage(ProcessInstanceEntityRepository repository, super(repository, ProcessInstanceEntity.class, mapper::mapToModel); } + @Override + @Transactional + public void indexGroup(MultipleProcessInstanceDataEvent event) { + Iterator> iter = event.getData().iterator(); + ProcessInstanceDataEvent firstEvent = iter.next(); + ProcessInstanceEntity pi = findOrInit(firstEvent); + indexEvent(pi, firstEvent); + while (iter.hasNext()) { + indexEvent(pi, iter.next()); + } + repository.flush(); + } + @Override @Transactional public void indexError(ProcessInstanceErrorDataEvent event) { - indexError(event.getData()); + indexError(findOrInit(event), event.getData()); + repository.flush(); } @Override @Transactional public void indexNode(ProcessInstanceNodeDataEvent event) { - indexNode(event.getData()); + indexNode(findOrInit(event), event.getData()); + repository.flush(); } @Override @Transactional public void indexSLA(ProcessInstanceSLADataEvent event) { - indexSLA(event.getData()); - + indexSla(findOrInit(event), event.getData()); + repository.flush(); } @Override @Transactional public void indexState(ProcessInstanceStateDataEvent event) { - indexState(event.getData(), event.getKogitoAddons() == null ? Set.of() : Set.of(event.getKogitoAddons().split(",")), event.getSource() == null ? null : event.getSource().toString()); + indexState(findOrInit(event), event); + repository.flush(); } @Override @Transactional public void indexVariable(ProcessInstanceVariableDataEvent event) { - indexVariable(event.getData()); + indexVariable(findOrInit(event), event.getData()); + repository.flush(); } - private ProcessInstanceEntity findOrInit(String processId, String processInstanceId, Date date) { - return repository.findByIdOptional(processInstanceId).orElseGet(() -> { + private ProcessInstanceEntity findOrInit(ProcessInstanceDataEvent event) { + return repository.findByIdOptional(event.getKogitoProcessInstanceId()).orElseGet(() -> { ProcessInstanceEntity pi = new ProcessInstanceEntity(); - pi.setProcessId(processId); - pi.setId(processInstanceId); - pi.setLastUpdate(toZonedDateTime(date)); + pi.setProcessId(event.getKogitoProcessId()); + pi.setId(event.getKogitoProcessInstanceId()); + pi.setLastUpdate(toZonedDateTime(event.getTime())); pi.setNodes(new ArrayList<>()); pi.setMilestones(new ArrayList<>()); repository.persist(pi); @@ -108,8 +127,21 @@ private ProcessInstanceEntity findOrInit(String processId, String processInstanc }); } - private void indexError(ProcessInstanceErrorEventBody error) { - ProcessInstanceEntity pi = findOrInit(error.getProcessId(), error.getProcessInstanceId(), error.getEventDate()); + private void indexEvent(ProcessInstanceEntity pi, ProcessInstanceDataEvent event) { + if (event instanceof ProcessInstanceErrorDataEvent) { + indexError(pi, ((ProcessInstanceErrorDataEvent) event).getData()); + } else if (event instanceof ProcessInstanceNodeDataEvent) { + indexNode(pi, ((ProcessInstanceNodeDataEvent) event).getData()); + } else if (event instanceof ProcessInstanceSLADataEvent) { + indexSla(pi, ((ProcessInstanceSLADataEvent) event).getData()); + } else if (event instanceof ProcessInstanceStateDataEvent) { + indexState(pi, (ProcessInstanceStateDataEvent) event); + } else if (event instanceof ProcessInstanceVariableDataEvent) { + indexVariable(pi, ((ProcessInstanceVariableDataEvent) event).getData()); + } + } + + private void indexError(ProcessInstanceEntity pi, ProcessInstanceErrorEventBody error) { ProcessInstanceErrorEntity errorEntity = pi.getError(); if (errorEntity == null) { errorEntity = new ProcessInstanceErrorEntity(); @@ -118,16 +150,14 @@ private void indexError(ProcessInstanceErrorEventBody error) { errorEntity.setMessage(error.getErrorMessage()); errorEntity.setNodeDefinitionId(error.getNodeDefinitionId()); pi.setState(CommonUtils.ERROR_STATE); - repository.flush(); } - private void indexNode(ProcessInstanceNodeEventBody data) { - ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate()); + private void indexNode(ProcessInstanceEntity pi, ProcessInstanceNodeEventBody data) { pi.getNodes().stream().filter(n -> n.getId().equals(data.getNodeInstanceId())).findAny().ifPresentOrElse(n -> updateNode(n, data), () -> createNode(pi, data)); if ("MilestoneNode".equals(data.getNodeType())) { pi.getMilestones().stream().filter(n -> n.getId().equals(data.getNodeInstanceId())).findAny().ifPresentOrElse(n -> updateMilestone(n, data), () -> createMilestone(pi, data)); } - repository.flush(); + } private MilestoneEntity createMilestone(ProcessInstanceEntity pi, ProcessInstanceNodeEventBody data) { @@ -174,13 +204,11 @@ private NodeInstanceEntity updateNode(NodeInstanceEntity nodeInstance, ProcessIn return nodeInstance; } - private void indexSLA(ProcessInstanceSLAEventBody data) { - findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate()); - repository.flush(); + private void indexState(ProcessInstanceEntity pi, ProcessInstanceStateDataEvent event) { + indexState(pi, event.getData(), event.getKogitoAddons() == null ? Set.of() : Set.of(event.getKogitoAddons().split(",")), event.getSource() == null ? null : event.getSource().toString()); } - private void indexState(ProcessInstanceStateEventBody data, Set addons, String endpoint) { - ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate()); + private void indexState(ProcessInstanceEntity pi, ProcessInstanceStateEventBody data, Set addons, String endpoint) { pi.setVersion(data.getProcessVersion()); pi.setProcessName(data.getProcessName()); pi.setRootProcessInstanceId(data.getRootProcessInstanceId()); @@ -199,13 +227,13 @@ private void indexState(ProcessInstanceStateEventBody data, Set addons, pi.setLastUpdate(toZonedDateTime(data.getEventDate())); pi.setAddons(addons); pi.setEndpoint(endpoint); - repository.flush(); } - private void indexVariable(ProcessInstanceVariableEventBody data) { - ProcessInstanceEntity pi = findOrInit(data.getProcessId(), data.getProcessInstanceId(), data.getEventDate()); + private void indexVariable(ProcessInstanceEntity pi, ProcessInstanceVariableEventBody data) { pi.setVariables(JsonUtils.mergeVariable(data.getVariableName(), data.getVariableValue(), pi.getVariables())); - repository.flush(); } + private void indexSla(ProcessInstanceEntity orInit, ProcessInstanceSLAEventBody data) { + // SLA does nothing for now + } } From 01b162db1b4879202fdee87678479b59d2e271f4 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Mon, 14 Oct 2024 20:52:04 +0200 Subject: [PATCH 2/5] [Fix #2113] Support group containing different process instance ids --- .../jpa/storage/ProcessInstanceEntityStorage.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java index de23853a83..dc4970b0a3 100644 --- a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java +++ b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java @@ -20,7 +20,8 @@ import java.time.ZonedDateTime; import java.util.ArrayList; -import java.util.Iterator; +import java.util.HashMap; +import java.util.Map; import java.util.Set; import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; @@ -68,13 +69,10 @@ public ProcessInstanceEntityStorage(ProcessInstanceEntityRepository repository, @Override @Transactional - public void indexGroup(MultipleProcessInstanceDataEvent event) { - Iterator> iter = event.getData().iterator(); - ProcessInstanceDataEvent firstEvent = iter.next(); - ProcessInstanceEntity pi = findOrInit(firstEvent); - indexEvent(pi, firstEvent); - while (iter.hasNext()) { - indexEvent(pi, iter.next()); + public void indexGroup(MultipleProcessInstanceDataEvent events) { + Map piMap = new HashMap<>(); + for (ProcessInstanceDataEvent event : events.getData()) { + indexEvent(piMap.computeIfAbsent(event.getKogitoProcessInstanceId(), id -> findOrInit(event)), event); } repository.flush(); } @@ -157,7 +155,6 @@ private void indexNode(ProcessInstanceEntity pi, ProcessInstanceNodeEventBody da if ("MilestoneNode".equals(data.getNodeType())) { pi.getMilestones().stream().filter(n -> n.getId().equals(data.getNodeInstanceId())).findAny().ifPresentOrElse(n -> updateMilestone(n, data), () -> createMilestone(pi, data)); } - } private MilestoneEntity createMilestone(ProcessInstanceEntity pi, ProcessInstanceNodeEventBody data) { From a4f9386b76a23b64d703f03b25b205f54b2f7a20 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Mon, 14 Oct 2024 20:55:34 +0200 Subject: [PATCH 3/5] [Fix #2113] Flush call is probably not needed --- .../index/jpa/storage/ProcessInstanceEntityStorage.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java index dc4970b0a3..1fbfdc312d 100644 --- a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java +++ b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java @@ -74,42 +74,36 @@ public void indexGroup(MultipleProcessInstanceDataEvent events) { for (ProcessInstanceDataEvent event : events.getData()) { indexEvent(piMap.computeIfAbsent(event.getKogitoProcessInstanceId(), id -> findOrInit(event)), event); } - repository.flush(); } @Override @Transactional public void indexError(ProcessInstanceErrorDataEvent event) { indexError(findOrInit(event), event.getData()); - repository.flush(); } @Override @Transactional public void indexNode(ProcessInstanceNodeDataEvent event) { indexNode(findOrInit(event), event.getData()); - repository.flush(); } @Override @Transactional public void indexSLA(ProcessInstanceSLADataEvent event) { indexSla(findOrInit(event), event.getData()); - repository.flush(); } @Override @Transactional public void indexState(ProcessInstanceStateDataEvent event) { indexState(findOrInit(event), event); - repository.flush(); } @Override @Transactional public void indexVariable(ProcessInstanceVariableDataEvent event) { indexVariable(findOrInit(event), event.getData()); - repository.flush(); } private ProcessInstanceEntity findOrInit(ProcessInstanceDataEvent event) { @@ -186,7 +180,6 @@ private NodeInstanceEntity updateNode(NodeInstanceEntity nodeInstance, ProcessIn nodeInstance.setType(body.getNodeType()); ZonedDateTime eventDate = toZonedDateTime(body.getEventDate()); switch (body.getEventType()) { - case EVENT_TYPE_ENTER: nodeInstance.setEnter(eventDate); break; From de27a6ed4773bf1970ecdc2afdeb358bab5e796d Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti Date: Tue, 15 Oct 2024 17:41:19 +0200 Subject: [PATCH 4/5] [Fix #2113] Optimization for user tasks --- .../kogito/index/service/IndexingService.java | 15 +-- .../storage/UserTaskInstanceStorage.java | 3 + .../storage/ModelUserTaskInstanceStorage.java | 21 +++- .../UserTaskInstanceEntityStorage.java | 109 +++++++++++++----- 4 files changed, 105 insertions(+), 43 deletions(-) diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java index 947e1972bf..5fbf4d6536 100644 --- a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java @@ -80,8 +80,7 @@ public void indexProcessInstanceEvent(ProcessInstanceDataEvent event) { ProcessInstanceStorage storage = manager.getProcessInstanceStorage(); if (event instanceof MultipleProcessInstanceDataEvent) { storage.indexGroup(((MultipleProcessInstanceDataEvent) event)); - } - if (event instanceof ProcessInstanceErrorDataEvent) { + } else if (event instanceof ProcessInstanceErrorDataEvent) { storage.indexError((ProcessInstanceErrorDataEvent) event); } else if (event instanceof ProcessInstanceNodeDataEvent) { storage.indexNode((ProcessInstanceNodeDataEvent) event); @@ -106,16 +105,8 @@ public void indexProcessDefinition(ProcessDefinitionDataEvent definitionDataEven public void indexUserTaskInstanceEvent(UserTaskInstanceDataEvent event) { UserTaskInstanceStorage storage = manager.getUserTaskInstanceStorage(); if (event instanceof MultipleUserTaskInstanceDataEvent) { - for (UserTaskInstanceDataEvent item : ((MultipleUserTaskInstanceDataEvent) event).getData()) { - indexUserTaskInstanceEvent(storage, item); - } - } else { - indexUserTaskInstanceEvent(storage, event); - } - } - - private void indexUserTaskInstanceEvent(UserTaskInstanceStorage storage, UserTaskInstanceDataEvent event) { - if (event instanceof UserTaskInstanceAssignmentDataEvent) { + storage.indexGroup((MultipleUserTaskInstanceDataEvent) event); + } else if (event instanceof UserTaskInstanceAssignmentDataEvent) { storage.indexAssignment((UserTaskInstanceAssignmentDataEvent) event); } else if (event instanceof UserTaskInstanceAttachmentDataEvent) { storage.indexAttachment((UserTaskInstanceAttachmentDataEvent) event); diff --git a/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/UserTaskInstanceStorage.java b/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/UserTaskInstanceStorage.java index f315ae4f73..58c586fd62 100644 --- a/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/UserTaskInstanceStorage.java +++ b/data-index/data-index-storage/data-index-storage-api/src/main/java/org/kie/kogito/index/storage/UserTaskInstanceStorage.java @@ -18,6 +18,7 @@ */ package org.kie.kogito.index.storage; +import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent; @@ -40,4 +41,6 @@ public interface UserTaskInstanceStorage extends StorageFetcher storage) { @Override public void indexAssignment(UserTaskInstanceAssignmentDataEvent event) { index(event, assignmentMerger); - } @Override @@ -76,13 +76,30 @@ public void indexState(UserTaskInstanceStateDataEvent event) { @Override public void indexVariable(UserTaskInstanceVariableDataEvent event) { index(event, variableMerger); - } @Override public void indexComment(UserTaskInstanceCommentDataEvent event) { index(event, commentMerger); + } + @Override + public void indexGroup(MultipleUserTaskInstanceDataEvent events) { + for (UserTaskInstanceDataEvent event : events.getData()) { + if (event instanceof UserTaskInstanceAssignmentDataEvent) { + index((UserTaskInstanceAssignmentDataEvent) event, assignmentMerger); + } else if (event instanceof UserTaskInstanceAttachmentDataEvent) { + index((UserTaskInstanceAttachmentDataEvent) event, attachmentMerger); + } else if (event instanceof UserTaskInstanceDeadlineDataEvent) { + index((UserTaskInstanceDeadlineDataEvent) event, deadlineMerger); + } else if (event instanceof UserTaskInstanceStateDataEvent) { + index((UserTaskInstanceStateDataEvent) event, stateMerger); + } else if (event instanceof UserTaskInstanceCommentDataEvent) { + index((UserTaskInstanceCommentDataEvent) event, commentMerger); + } else if (event instanceof UserTaskInstanceVariableDataEvent) { + index((UserTaskInstanceVariableDataEvent) event, variableMerger); + } + } } private > void index(T event, UserTaskInstanceEventMerger merger) { diff --git a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/UserTaskInstanceEntityStorage.java b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/UserTaskInstanceEntityStorage.java index 1e2c127a36..cb75b1e31a 100644 --- a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/UserTaskInstanceEntityStorage.java +++ b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/UserTaskInstanceEntityStorage.java @@ -19,15 +19,19 @@ package org.kie.kogito.index.jpa.storage; import java.net.URI; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentEventBody; import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentEventBody; import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceCommentEventBody; +import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceStateDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceStateEventBody; @@ -66,20 +70,69 @@ public UserTaskInstanceEntityStorage(UserTaskInstanceEntityRepository repository super(repository, UserTaskInstanceEntity.class, mapper::mapToModel); } - private UserTaskInstanceEntity findOrInit(String taskId) { - return repository.findByIdOptional(taskId).orElseGet(() -> { - UserTaskInstanceEntity ut = new UserTaskInstanceEntity(); - ut.setId(taskId); - repository.persist(ut); - return ut; - }); + @Override + @Transactional + public void indexGroup(MultipleUserTaskInstanceDataEvent events) { + Map taskMap = new HashMap<>(); + for (UserTaskInstanceDataEvent event : events.getData()) { + indexEvent(taskMap.computeIfAbsent(event.getKogitoUserTaskInstanceId(), id -> findOrInit(id)), event); + } } @Override @Transactional public void indexAssignment(UserTaskInstanceAssignmentDataEvent event) { + indexAssignment(findOrInit(event), event); + } + + @Override + @Transactional + public void indexAttachment(UserTaskInstanceAttachmentDataEvent event) { + indexAttachment(findOrInit(event), event); + } + + @Override + @Transactional + public void indexDeadline(UserTaskInstanceDeadlineDataEvent event) { + indexDeadline(findOrInit(event), event); + } + + @Override + @Transactional + public void indexState(UserTaskInstanceStateDataEvent event) { + indexState(findOrInit(event), event); + } + + @Override + @Transactional + public void indexComment(UserTaskInstanceCommentDataEvent event) { + indexComment(findOrInit(event), event); + } + + @Override + @Transactional + public void indexVariable(UserTaskInstanceVariableDataEvent event) { + indexVariable(findOrInit(event), event); + } + + private void indexEvent(UserTaskInstanceEntity task, UserTaskInstanceDataEvent event) { + if (event instanceof UserTaskInstanceAssignmentDataEvent) { + indexAssignment(task, (UserTaskInstanceAssignmentDataEvent) event); + } else if (event instanceof UserTaskInstanceAttachmentDataEvent) { + indexAttachment(task, (UserTaskInstanceAttachmentDataEvent) event); + } else if (event instanceof UserTaskInstanceDeadlineDataEvent) { + indexDeadline(task, (UserTaskInstanceDeadlineDataEvent) event); + } else if (event instanceof UserTaskInstanceStateDataEvent) { + indexState(task, (UserTaskInstanceStateDataEvent) event); + } else if (event instanceof UserTaskInstanceCommentDataEvent) { + indexComment(task, (UserTaskInstanceCommentDataEvent) event); + } else if (event instanceof UserTaskInstanceVariableDataEvent) { + indexVariable(task, (UserTaskInstanceVariableDataEvent) event); + } + } + + private void indexAssignment(UserTaskInstanceEntity userTaskInstance, UserTaskInstanceAssignmentDataEvent event) { UserTaskInstanceAssignmentEventBody body = event.getData(); - UserTaskInstanceEntity userTaskInstance = findOrInit(event.getKogitoUserTaskInstanceId()); switch (body.getAssignmentType()) { case "USER_OWNERS": userTaskInstance.setPotentialUsers(new HashSet<>(body.getUsers())); @@ -97,13 +150,9 @@ public void indexAssignment(UserTaskInstanceAssignmentDataEvent event) { userTaskInstance.setAdminUsers(new HashSet<>(body.getUsers())); break; } - repository.flush(); } - @Override - @Transactional - public void indexAttachment(UserTaskInstanceAttachmentDataEvent event) { - UserTaskInstanceEntity userTaskInstance = findOrInit(event.getKogitoUserTaskInstanceId()); + private void indexAttachment(UserTaskInstanceEntity userTaskInstance, UserTaskInstanceAttachmentDataEvent event) { UserTaskInstanceAttachmentEventBody body = event.getData(); List attachments = userTaskInstance.getAttachments(); switch (body.getEventType()) { @@ -127,17 +176,12 @@ public void indexAttachment(UserTaskInstanceAttachmentDataEvent event) { } } - @Override - @Transactional - public void indexDeadline(UserTaskInstanceDeadlineDataEvent event) { - findOrInit(event.getKogitoUserTaskInstanceId()); + private void indexDeadline(UserTaskInstanceEntity userTaskInstance, UserTaskInstanceDeadlineDataEvent event) { + // deadlines ignored for now } - @Override - @Transactional - public void indexState(UserTaskInstanceStateDataEvent event) { + private void indexState(UserTaskInstanceEntity task, UserTaskInstanceStateDataEvent event) { UserTaskInstanceStateEventBody body = event.getData(); - UserTaskInstanceEntity task = findOrInit(event.getKogitoUserTaskInstanceId()); task.setProcessInstanceId(body.getProcessInstanceId()); task.setProcessId(event.getKogitoProcessId()); task.setRootProcessId(event.getKogitoRootProcessId()); @@ -163,11 +207,8 @@ private String getEndpoint(URI source, String pId, String taskName, String taskI return source.toString() + format("/%s/%s/%s", pId, name, taskId); } - @Override - @Transactional - public void indexComment(UserTaskInstanceCommentDataEvent event) { + private void indexComment(UserTaskInstanceEntity userTaskInstance, UserTaskInstanceCommentDataEvent event) { UserTaskInstanceCommentEventBody body = event.getData(); - UserTaskInstanceEntity userTaskInstance = findOrInit(event.getKogitoUserTaskInstanceId()); List comments = userTaskInstance.getComments(); switch (body.getEventType()) { case UserTaskInstanceCommentEventBody.EVENT_TYPE_ADDED: @@ -190,10 +231,7 @@ public void indexComment(UserTaskInstanceCommentDataEvent event) { } } - @Override - @Transactional - public void indexVariable(UserTaskInstanceVariableDataEvent event) { - UserTaskInstanceEntity userTaskInstance = findOrInit(event.getKogitoUserTaskInstanceId()); + private void indexVariable(UserTaskInstanceEntity userTaskInstance, UserTaskInstanceVariableDataEvent event) { UserTaskInstanceVariableEventBody body = event.getData(); if (body.getVariableType().equals("INPUT")) { ObjectNode objectNode = userTaskInstance.getInputs(); @@ -211,4 +249,17 @@ public void indexVariable(UserTaskInstanceVariableDataEvent event) { userTaskInstance.setOutputs(objectNode); } } + + private UserTaskInstanceEntity findOrInit(UserTaskInstanceDataEvent event) { + return findOrInit(event.getKogitoUserTaskInstanceId()); + } + + private UserTaskInstanceEntity findOrInit(String taskId) { + return repository.findByIdOptional(taskId).orElseGet(() -> { + UserTaskInstanceEntity ut = new UserTaskInstanceEntity(); + ut.setId(taskId); + repository.persist(ut); + return ut; + }); + } } From 5906180cd786585d6f0e4c3ff1b4c346ade72e26 Mon Sep 17 00:00:00 2001 From: Francisco Javier Tirado Sarti <65240126+fjtirado@users.noreply.github.com> Date: Wed, 16 Oct 2024 11:37:09 +0200 Subject: [PATCH 5/5] Update data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Gonzalo Muñoz --- .../kogito/index/jpa/storage/ProcessInstanceEntityStorage.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java index 1fbfdc312d..b991c1a83f 100644 --- a/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java +++ b/data-index/data-index-storage/data-index-storage-jpa-common/src/main/java/org/kie/kogito/index/jpa/storage/ProcessInstanceEntityStorage.java @@ -195,7 +195,8 @@ private NodeInstanceEntity updateNode(NodeInstanceEntity nodeInstance, ProcessIn } private void indexState(ProcessInstanceEntity pi, ProcessInstanceStateDataEvent event) { - indexState(pi, event.getData(), event.getKogitoAddons() == null ? Set.of() : Set.of(event.getKogitoAddons().split(",")), event.getSource() == null ? null : event.getSource().toString()); + indexState(pi, event.getData(), (event.getKogitoAddons() == null || event.getKogitoAddons().isEmpty()) ? Set.of() : Set.of(event.getKogitoAddons().split(",")), + event.getSource() == null ? null : event.getSource().toString()); } private void indexState(ProcessInstanceEntity pi, ProcessInstanceStateEventBody data, Set addons, String endpoint) {