diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/json/JsonProcessInstanceDataEventDeserializer.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/json/JsonProcessInstanceDataEventDeserializer.java index e60b3e4681..9fec2e2ca9 100644 --- a/data-index/data-index-common/src/main/java/org/kie/kogito/index/json/JsonProcessInstanceDataEventDeserializer.java +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/json/JsonProcessInstanceDataEventDeserializer.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent; @@ -57,6 +58,8 @@ public ProcessInstanceDataEvent deserialize(JsonParser jp, DeserializationCon String type = node.get("type").asText(); switch (type) { + case "MultipleProcessInstanceDataEvent": + return jp.getCodec().treeToValue(node, MultipleProcessInstanceDataEvent.class); case "ProcessInstanceErrorDataEvent": return (ProcessInstanceDataEvent) jp.getCodec().treeToValue(node, ProcessInstanceErrorDataEvent.class); case "ProcessInstanceNodeDataEvent": diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/json/JsonUserTaskInstanceDataEventDeserializer.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/json/JsonUserTaskInstanceDataEventDeserializer.java index 7aa0437f55..5c340a9873 100644 --- a/data-index/data-index-common/src/main/java/org/kie/kogito/index/json/JsonUserTaskInstanceDataEventDeserializer.java +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/json/JsonUserTaskInstanceDataEventDeserializer.java @@ -20,6 +20,7 @@ import java.io.IOException; +import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent; @@ -58,6 +59,8 @@ public UserTaskInstanceDataEvent deserialize(JsonParser jp, DeserializationCo String type = node.get("type").asText(); switch (type) { + case "MultipleUserTaskInstanceDataEvent": + return jp.getCodec().treeToValue(node, MultipleUserTaskInstanceDataEvent.class); case "UserTaskInstanceAssignmentDataEvent": return (UserTaskInstanceDataEvent) jp.getCodec().treeToValue(node, UserTaskInstanceAssignmentDataEvent.class); case "UserTaskInstanceAttachmentDataEvent": @@ -76,4 +79,4 @@ public UserTaskInstanceDataEvent deserialize(JsonParser jp, DeserializationCo } } -} \ No newline at end of file +} diff --git a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java index 2802fe8f94..f3cd153d5b 100644 --- a/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java +++ b/data-index/data-index-common/src/main/java/org/kie/kogito/index/service/IndexingService.java @@ -26,6 +26,7 @@ import java.util.Optional; import org.eclipse.microprofile.faulttolerance.Retry; +import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessDefinitionDataEvent; import org.kie.kogito.event.process.ProcessInstanceDataEvent; import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent; @@ -33,6 +34,7 @@ import org.kie.kogito.event.process.ProcessInstanceSLADataEvent; import org.kie.kogito.event.process.ProcessInstanceStateDataEvent; import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent; +import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent; import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent; @@ -76,6 +78,15 @@ public class IndexingService { @Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class) public void indexProcessInstanceEvent(ProcessInstanceDataEvent event) { ProcessInstanceStorage storage = manager.getProcessInstanceStorage(); + if (event instanceof MultipleProcessInstanceDataEvent) { + for (ProcessInstanceDataEvent item : ((MultipleProcessInstanceDataEvent) event).getData()) + indexProccessInstanceEvent(storage, item); + } else { + indexProccessInstanceEvent(storage, event); + } + } + + private void indexProccessInstanceEvent(ProcessInstanceStorage storage, ProcessInstanceDataEvent event) { if (event instanceof ProcessInstanceErrorDataEvent) { storage.indexError((ProcessInstanceErrorDataEvent) event); } else if (event instanceof ProcessInstanceNodeDataEvent) { @@ -100,6 +111,16 @@ public void indexProcessDefinition(ProcessDefinitionDataEvent definitionDataEven @Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class) public void indexUserTaskInstanceEvent(UserTaskInstanceDataEvent event) { UserTaskInstanceStorage storage = manager.getUserTaskInstanceStorage(); + if (event instanceof MultipleUserTaskInstanceDataEvent) { + for (UserTaskInstanceDataEvent item : ((MultipleUserTaskInstanceDataEvent) event).getData()) { + indexUserTaskInstanceEvent(storage, item); + } + } else { + indexUserTaskInstanceEvent(storage, event); + } + } + + private void indexUserTaskInstanceEvent(UserTaskInstanceStorage storage, UserTaskInstanceDataEvent event) { if (event instanceof UserTaskInstanceAssignmentDataEvent) { storage.indexAssignment((UserTaskInstanceAssignmentDataEvent) event); } else if (event instanceof UserTaskInstanceAttachmentDataEvent) { diff --git a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/BlockingMessagingEventConsumer.java b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/BlockingMessagingEventConsumer.java index e9dcbbe2e5..a646d3f68c 100644 --- a/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/BlockingMessagingEventConsumer.java +++ b/data-index/data-index-service/data-index-service-common/src/main/java/org/kie/kogito/index/service/messaging/BlockingMessagingEventConsumer.java @@ -18,8 +18,6 @@ */ package org.kie.kogito.index.service.messaging; -import java.util.Collection; - import org.eclipse.microprofile.reactive.messaging.Incoming; import org.kie.kogito.event.DataEvent; import org.kie.kogito.event.process.ProcessDefinitionDataEvent; @@ -31,7 +29,6 @@ import org.slf4j.LoggerFactory; import io.quarkus.arc.properties.IfBuildProperty; -import io.smallrye.mutiny.Uni; import io.smallrye.reactive.messaging.annotations.Blocking; import jakarta.enterprise.context.ApplicationScoped; @@ -60,101 +57,34 @@ public class BlockingMessagingEventConsumer { @Incoming(KOGITO_PROCESSINSTANCES_EVENTS) @Blocking @Transactional - public Uni onProcessInstanceEvent(Object input) { - if (input instanceof Collection) { - @SuppressWarnings("unchecked") - Collection> events = (Collection>) input; - LOGGER.debug("Process instance consumer received grouped ProcessInstanceDataEvents: \n{}", events); - for (ProcessInstanceDataEvent event : events) { - handleProcessInstanceEvent(event); - } - } else if (input instanceof ProcessInstanceDataEvent) { - ProcessInstanceDataEvent event = (ProcessInstanceDataEvent) input; - LOGGER.debug("Process instance consumer received ProcessInstanceDataEvent: \n{}", event); - handleProcessInstanceEvent(event); - } else { - LOGGER.error("Unknown event type received: {}", input.getClass()); - } - return Uni.createFrom().voidItem(); + public void onProcessInstanceEvent(ProcessInstanceDataEvent event) { + LOGGER.debug("Process instance consumer received ProcessInstanceDataEvent: \n{}", event); + indexingService.indexProcessInstanceEvent(event); + eventPublisher.fire(event); } @Incoming(KOGITO_USERTASKINSTANCES_EVENTS) @Blocking @Transactional - public Uni onUserTaskInstanceEvent(Object input) { - if (input instanceof Collection) { - @SuppressWarnings("unchecked") - Collection> events = (Collection>) input; - LOGGER.debug("UserTask instance consumer received grouped UserTaskInstanceDataEvent: \n{}", events); - for (UserTaskInstanceDataEvent event : events) { - handleUserTaskInstanceEvent(event); - } - } else if (input instanceof UserTaskInstanceDataEvent) { - UserTaskInstanceDataEvent event = (UserTaskInstanceDataEvent) input; - LOGGER.debug("Process instance consumer received UserTaskInstanceDataEvent: \n{}", event); - handleUserTaskInstanceEvent(event); - } else { - LOGGER.error("Unknown event type received: {}", input.getClass()); - } - return Uni.createFrom().voidItem(); + public void onUserTaskInstanceEvent(UserTaskInstanceDataEvent event) { + LOGGER.debug("Task instance received UserTaskInstanceDataEvent \n{}", event); + indexingService.indexUserTaskInstanceEvent(event); + eventPublisher.fire(event); } @Incoming(KOGITO_JOBS_EVENTS) @Blocking @Transactional - public Uni onJobEvent(KogitoJobCloudEvent event) { + public void onJobEvent(KogitoJobCloudEvent event) { LOGGER.debug("Job received KogitoJobCloudEvent \n{}", event); - return Uni.createFrom().item(event) - .onItem().invoke(e -> indexingService.indexJob(e.getData())) - .onFailure().invoke(t -> LOGGER.error("Error processing job KogitoJobCloudEvent: {}", t.getMessage(), t)) - .onItem().ignore().andContinueWithNull(); + indexingService.indexJob(event.getData()); } @Incoming(KOGITO_PROCESS_DEFINITIONS_EVENTS) @Blocking @Transactional - public Uni onProcessDefinitionDataEvent(Object input) { - if (input instanceof Collection) { - @SuppressWarnings("unchecked") - Collection events = (Collection) input; - LOGGER.debug("Process definition instance consumer received grouped ProcessDefinitionDataEvent: \n{}", events); - for (ProcessDefinitionDataEvent event : events) { - handleProcessDefinitionEvent(event); - } - } else if (input instanceof ProcessDefinitionDataEvent) { - ProcessDefinitionDataEvent event = (ProcessDefinitionDataEvent) input; - LOGGER.debug("Process definition consumer received ProcessDefinitionDataEvent: \n{}", event); - handleProcessDefinitionEvent(event); - } else { - LOGGER.error("Unknown event type received: {}", input.getClass()); - } - return Uni.createFrom().voidItem(); - } - - private void handleProcessInstanceEvent(ProcessInstanceDataEvent event) { - try { - indexingService.indexProcessInstanceEvent(event); - eventPublisher.fire(event); - } catch (Exception ex) { - LOGGER.error("Error processing process instance event: {}", event, ex); - } - } - - private void handleUserTaskInstanceEvent(UserTaskInstanceDataEvent event) { - try { - indexingService.indexUserTaskInstanceEvent(event); - eventPublisher.fire(event); - } catch (Exception ex) { - LOGGER.error("Error processing userTask instance event: {}", event, ex); - } - } - - private void handleProcessDefinitionEvent(ProcessDefinitionDataEvent event) { - try { - indexingService.indexProcessDefinition(event); - eventPublisher.fire(event); - } catch (Exception ex) { - LOGGER.error("Error processing process definition event: {}", event, ex); - } + public void onProcessDefinitionDataEvent(ProcessDefinitionDataEvent event) { + LOGGER.debug("Job received KogitoJobCloudEvent \n{}", event); + indexingService.indexProcessDefinition(event); } }