Skip to content

Commit

Permalink
[Fix apache/incubator-kie-issues#1457] Grouping of event serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Sep 26, 2024
1 parent c9f017a commit 1c13a37
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;

import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
Expand Down Expand Up @@ -57,6 +58,8 @@ public ProcessInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCon
String type = node.get("type").asText();

switch (type) {
case "MultipleProcessInstanceDataEvent":
return jp.getCodec().treeToValue(node, MultipleProcessInstanceDataEvent.class);
case "ProcessInstanceErrorDataEvent":
return (ProcessInstanceDataEvent<?>) jp.getCodec().treeToValue(node, ProcessInstanceErrorDataEvent.class);
case "ProcessInstanceNodeDataEvent":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;

import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
Expand Down Expand Up @@ -58,6 +59,8 @@ public UserTaskInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCo
String type = node.get("type").asText();

switch (type) {
case "MultipleUserTaskInstanceDataEvent":
return jp.getCodec().treeToValue(node, MultipleUserTaskInstanceDataEvent.class);
case "UserTaskInstanceAssignmentDataEvent":
return (UserTaskInstanceDataEvent<?>) jp.getCodec().treeToValue(node, UserTaskInstanceAssignmentDataEvent.class);
case "UserTaskInstanceAttachmentDataEvent":
Expand All @@ -76,4 +79,4 @@ public UserTaskInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCo

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import java.util.Optional;

import org.eclipse.microprofile.faulttolerance.Retry;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessDefinitionDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
Expand Down Expand Up @@ -76,6 +78,15 @@ public class IndexingService {
@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public void indexProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
ProcessInstanceStorage storage = manager.getProcessInstanceStorage();
if (event instanceof MultipleProcessInstanceDataEvent) {
for (ProcessInstanceDataEvent<?> item : ((MultipleProcessInstanceDataEvent) event).getData())
indexProccessInstanceEvent(storage, item);
} else {
indexProccessInstanceEvent(storage, event);
}
}

private void indexProccessInstanceEvent(ProcessInstanceStorage storage, ProcessInstanceDataEvent<?> event) {
if (event instanceof ProcessInstanceErrorDataEvent) {
storage.indexError((ProcessInstanceErrorDataEvent) event);
} else if (event instanceof ProcessInstanceNodeDataEvent) {
Expand All @@ -100,6 +111,16 @@ public void indexProcessDefinition(ProcessDefinitionDataEvent definitionDataEven
@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public <T> void indexUserTaskInstanceEvent(UserTaskInstanceDataEvent<T> event) {
UserTaskInstanceStorage storage = manager.getUserTaskInstanceStorage();
if (event instanceof MultipleUserTaskInstanceDataEvent) {
for (UserTaskInstanceDataEvent<?> item : ((MultipleUserTaskInstanceDataEvent) event).getData()) {
indexUserTaskInstanceEvent(storage, item);
}
} else {
indexUserTaskInstanceEvent(storage, event);
}
}

private void indexUserTaskInstanceEvent(UserTaskInstanceStorage storage, UserTaskInstanceDataEvent<?> event) {
if (event instanceof UserTaskInstanceAssignmentDataEvent) {
storage.indexAssignment((UserTaskInstanceAssignmentDataEvent) event);
} else if (event instanceof UserTaskInstanceAttachmentDataEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.kie.kogito.index.service.messaging;

import java.util.Collection;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.process.ProcessDefinitionDataEvent;
Expand All @@ -31,7 +29,6 @@
import org.slf4j.LoggerFactory;

import io.quarkus.arc.properties.IfBuildProperty;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Blocking;

import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -60,101 +57,34 @@ public class BlockingMessagingEventConsumer {
@Incoming(KOGITO_PROCESSINSTANCES_EVENTS)
@Blocking
@Transactional
public Uni<Void> onProcessInstanceEvent(Object input) {
if (input instanceof Collection) {
@SuppressWarnings("unchecked")
Collection<ProcessInstanceDataEvent<?>> events = (Collection<ProcessInstanceDataEvent<?>>) input;
LOGGER.debug("Process instance consumer received grouped ProcessInstanceDataEvents: \n{}", events);
for (ProcessInstanceDataEvent<?> event : events) {
handleProcessInstanceEvent(event);
}
} else if (input instanceof ProcessInstanceDataEvent) {
ProcessInstanceDataEvent<?> event = (ProcessInstanceDataEvent<?>) input;
LOGGER.debug("Process instance consumer received ProcessInstanceDataEvent: \n{}", event);
handleProcessInstanceEvent(event);
} else {
LOGGER.error("Unknown event type received: {}", input.getClass());
}
return Uni.createFrom().voidItem();
public void onProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
LOGGER.debug("Process instance consumer received ProcessInstanceDataEvent: \n{}", event);
indexingService.indexProcessInstanceEvent(event);
eventPublisher.fire(event);
}

@Incoming(KOGITO_USERTASKINSTANCES_EVENTS)
@Blocking
@Transactional
public Uni<Void> onUserTaskInstanceEvent(Object input) {
if (input instanceof Collection) {
@SuppressWarnings("unchecked")
Collection<UserTaskInstanceDataEvent<?>> events = (Collection<UserTaskInstanceDataEvent<?>>) input;
LOGGER.debug("UserTask instance consumer received grouped UserTaskInstanceDataEvent: \n{}", events);
for (UserTaskInstanceDataEvent<?> event : events) {
handleUserTaskInstanceEvent(event);
}
} else if (input instanceof UserTaskInstanceDataEvent) {
UserTaskInstanceDataEvent<?> event = (UserTaskInstanceDataEvent<?>) input;
LOGGER.debug("Process instance consumer received UserTaskInstanceDataEvent: \n{}", event);
handleUserTaskInstanceEvent(event);
} else {
LOGGER.error("Unknown event type received: {}", input.getClass());
}
return Uni.createFrom().voidItem();
public void onUserTaskInstanceEvent(UserTaskInstanceDataEvent<?> event) {
LOGGER.debug("Task instance received UserTaskInstanceDataEvent \n{}", event);
indexingService.indexUserTaskInstanceEvent(event);
eventPublisher.fire(event);
}

@Incoming(KOGITO_JOBS_EVENTS)
@Blocking
@Transactional
public Uni<Void> onJobEvent(KogitoJobCloudEvent event) {
public void onJobEvent(KogitoJobCloudEvent event) {
LOGGER.debug("Job received KogitoJobCloudEvent \n{}", event);
return Uni.createFrom().item(event)
.onItem().invoke(e -> indexingService.indexJob(e.getData()))
.onFailure().invoke(t -> LOGGER.error("Error processing job KogitoJobCloudEvent: {}", t.getMessage(), t))
.onItem().ignore().andContinueWithNull();
indexingService.indexJob(event.getData());
}

@Incoming(KOGITO_PROCESS_DEFINITIONS_EVENTS)
@Blocking
@Transactional
public Uni<Void> onProcessDefinitionDataEvent(Object input) {
if (input instanceof Collection) {
@SuppressWarnings("unchecked")
Collection<ProcessDefinitionDataEvent> events = (Collection<ProcessDefinitionDataEvent>) input;
LOGGER.debug("Process definition instance consumer received grouped ProcessDefinitionDataEvent: \n{}", events);
for (ProcessDefinitionDataEvent event : events) {
handleProcessDefinitionEvent(event);
}
} else if (input instanceof ProcessDefinitionDataEvent) {
ProcessDefinitionDataEvent event = (ProcessDefinitionDataEvent) input;
LOGGER.debug("Process definition consumer received ProcessDefinitionDataEvent: \n{}", event);
handleProcessDefinitionEvent(event);
} else {
LOGGER.error("Unknown event type received: {}", input.getClass());
}
return Uni.createFrom().voidItem();
}

private void handleProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
try {
indexingService.indexProcessInstanceEvent(event);
eventPublisher.fire(event);
} catch (Exception ex) {
LOGGER.error("Error processing process instance event: {}", event, ex);
}
}

private void handleUserTaskInstanceEvent(UserTaskInstanceDataEvent<?> event) {
try {
indexingService.indexUserTaskInstanceEvent(event);
eventPublisher.fire(event);
} catch (Exception ex) {
LOGGER.error("Error processing userTask instance event: {}", event, ex);
}
}

private void handleProcessDefinitionEvent(ProcessDefinitionDataEvent event) {
try {
indexingService.indexProcessDefinition(event);
eventPublisher.fire(event);
} catch (Exception ex) {
LOGGER.error("Error processing process definition event: {}", event, ex);
}
public void onProcessDefinitionDataEvent(ProcessDefinitionDataEvent event) {
LOGGER.debug("Job received KogitoJobCloudEvent \n{}", event);
indexingService.indexProcessDefinition(event);
}
}

0 comments on commit 1c13a37

Please sign in to comment.