Skip to content

Commit

Permalink
[Fix apache/incubator-kie-issues#1457] Grouping of event serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
fjtirado committed Sep 26, 2024
1 parent c9f017a commit 3c80c57
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;

import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
Expand Down Expand Up @@ -57,6 +58,8 @@ public ProcessInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCon
String type = node.get("type").asText();

switch (type) {
case "MultipleProcessInstanceDataEvent":
return jp.getCodec().treeToValue(node, MultipleProcessInstanceDataEvent.class);
case "ProcessInstanceErrorDataEvent":
return (ProcessInstanceDataEvent<?>) jp.getCodec().treeToValue(node, ProcessInstanceErrorDataEvent.class);
case "ProcessInstanceNodeDataEvent":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.IOException;

import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
Expand Down Expand Up @@ -58,6 +59,8 @@ public UserTaskInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCo
String type = node.get("type").asText();

switch (type) {
case "MultipleUserTaskInstanceDataEvent":
return jp.getCodec().treeToValue(node, MultipleUserTaskInstanceDataEvent.class);
case "UserTaskInstanceAssignmentDataEvent":
return (UserTaskInstanceDataEvent<?>) jp.getCodec().treeToValue(node, UserTaskInstanceAssignmentDataEvent.class);
case "UserTaskInstanceAttachmentDataEvent":
Expand All @@ -76,4 +79,4 @@ public UserTaskInstanceDataEvent<?> deserialize(JsonParser jp, DeserializationCo

}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,15 @@
import java.util.Optional;

import org.eclipse.microprofile.faulttolerance.Retry;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessDefinitionDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessInstanceErrorDataEvent;
import org.kie.kogito.event.process.ProcessInstanceNodeDataEvent;
import org.kie.kogito.event.process.ProcessInstanceSLADataEvent;
import org.kie.kogito.event.process.ProcessInstanceStateDataEvent;
import org.kie.kogito.event.process.ProcessInstanceVariableDataEvent;
import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAssignmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceAttachmentDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceCommentDataEvent;
Expand Down Expand Up @@ -76,6 +78,15 @@ public class IndexingService {
@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public void indexProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
ProcessInstanceStorage storage = manager.getProcessInstanceStorage();
if (event instanceof MultipleProcessInstanceDataEvent) {
for (ProcessInstanceDataEvent<?> item : ((MultipleProcessInstanceDataEvent) event).getData())
indexProccessInstanceEvent(storage, item);
} else {
indexProccessInstanceEvent(storage, event);
}
}

private void indexProccessInstanceEvent(ProcessInstanceStorage storage, ProcessInstanceDataEvent<?> event) {
if (event instanceof ProcessInstanceErrorDataEvent) {
storage.indexError((ProcessInstanceErrorDataEvent) event);
} else if (event instanceof ProcessInstanceNodeDataEvent) {
Expand All @@ -100,6 +111,16 @@ public void indexProcessDefinition(ProcessDefinitionDataEvent definitionDataEven
@Retry(maxRetries = 3, delay = 300, jitter = 100, retryOn = ConcurrentModificationException.class)
public <T> void indexUserTaskInstanceEvent(UserTaskInstanceDataEvent<T> event) {
UserTaskInstanceStorage storage = manager.getUserTaskInstanceStorage();
if (event instanceof MultipleUserTaskInstanceDataEvent) {
for (UserTaskInstanceDataEvent<?> item : ((MultipleUserTaskInstanceDataEvent) event).getData()) {
indexUserTaskInstanceEvent(storage, item);
}
} else {
indexUserTaskInstanceEvent(storage, event);
}
}

private void indexUserTaskInstanceEvent(UserTaskInstanceStorage storage, UserTaskInstanceDataEvent<?> event) {
if (event instanceof UserTaskInstanceAssignmentDataEvent) {
storage.indexAssignment((UserTaskInstanceAssignmentDataEvent) event);
} else if (event instanceof UserTaskInstanceAttachmentDataEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
*/
package org.kie.kogito.index.service.messaging;

import java.util.Collection;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.process.ProcessDefinitionDataEvent;
Expand All @@ -31,7 +29,6 @@
import org.slf4j.LoggerFactory;

import io.quarkus.arc.properties.IfBuildProperty;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.annotations.Blocking;

import jakarta.enterprise.context.ApplicationScoped;
Expand Down Expand Up @@ -60,101 +57,34 @@ public class BlockingMessagingEventConsumer {
@Incoming(KOGITO_PROCESSINSTANCES_EVENTS)
@Blocking
@Transactional
public Uni<Void> onProcessInstanceEvent(Object input) {
if (input instanceof Collection) {
@SuppressWarnings("unchecked")
Collection<ProcessInstanceDataEvent<?>> events = (Collection<ProcessInstanceDataEvent<?>>) input;
LOGGER.debug("Process instance consumer received grouped ProcessInstanceDataEvents: \n{}", events);
for (ProcessInstanceDataEvent<?> event : events) {
handleProcessInstanceEvent(event);
}
} else if (input instanceof ProcessInstanceDataEvent) {
ProcessInstanceDataEvent<?> event = (ProcessInstanceDataEvent<?>) input;
LOGGER.debug("Process instance consumer received ProcessInstanceDataEvent: \n{}", event);
handleProcessInstanceEvent(event);
} else {
LOGGER.error("Unknown event type received: {}", input.getClass());
}
return Uni.createFrom().voidItem();
public void onProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
LOGGER.debug("Process instance consumer received ProcessInstanceDataEvent: \n{}", event);
indexingService.indexProcessInstanceEvent(event);
eventPublisher.fire(event);
}

@Incoming(KOGITO_USERTASKINSTANCES_EVENTS)
@Blocking
@Transactional
public Uni<Void> onUserTaskInstanceEvent(Object input) {
if (input instanceof Collection) {
@SuppressWarnings("unchecked")
Collection<UserTaskInstanceDataEvent<?>> events = (Collection<UserTaskInstanceDataEvent<?>>) input;
LOGGER.debug("UserTask instance consumer received grouped UserTaskInstanceDataEvent: \n{}", events);
for (UserTaskInstanceDataEvent<?> event : events) {
handleUserTaskInstanceEvent(event);
}
} else if (input instanceof UserTaskInstanceDataEvent) {
UserTaskInstanceDataEvent<?> event = (UserTaskInstanceDataEvent<?>) input;
LOGGER.debug("Process instance consumer received UserTaskInstanceDataEvent: \n{}", event);
handleUserTaskInstanceEvent(event);
} else {
LOGGER.error("Unknown event type received: {}", input.getClass());
}
return Uni.createFrom().voidItem();
public void onUserTaskInstanceEvent(UserTaskInstanceDataEvent<?> event) {
LOGGER.debug("Task instance received UserTaskInstanceDataEvent \n{}", event);
indexingService.indexUserTaskInstanceEvent(event);
eventPublisher.fire(event);
}

@Incoming(KOGITO_JOBS_EVENTS)
@Blocking
@Transactional
public Uni<Void> onJobEvent(KogitoJobCloudEvent event) {
public void onJobEvent(KogitoJobCloudEvent event) {
LOGGER.debug("Job received KogitoJobCloudEvent \n{}", event);
return Uni.createFrom().item(event)
.onItem().invoke(e -> indexingService.indexJob(e.getData()))
.onFailure().invoke(t -> LOGGER.error("Error processing job KogitoJobCloudEvent: {}", t.getMessage(), t))
.onItem().ignore().andContinueWithNull();
indexingService.indexJob(event.getData());
}

@Incoming(KOGITO_PROCESS_DEFINITIONS_EVENTS)
@Blocking
@Transactional
public Uni<Void> onProcessDefinitionDataEvent(Object input) {
if (input instanceof Collection) {
@SuppressWarnings("unchecked")
Collection<ProcessDefinitionDataEvent> events = (Collection<ProcessDefinitionDataEvent>) input;
LOGGER.debug("Process definition instance consumer received grouped ProcessDefinitionDataEvent: \n{}", events);
for (ProcessDefinitionDataEvent event : events) {
handleProcessDefinitionEvent(event);
}
} else if (input instanceof ProcessDefinitionDataEvent) {
ProcessDefinitionDataEvent event = (ProcessDefinitionDataEvent) input;
LOGGER.debug("Process definition consumer received ProcessDefinitionDataEvent: \n{}", event);
handleProcessDefinitionEvent(event);
} else {
LOGGER.error("Unknown event type received: {}", input.getClass());
}
return Uni.createFrom().voidItem();
}

private void handleProcessInstanceEvent(ProcessInstanceDataEvent<?> event) {
try {
indexingService.indexProcessInstanceEvent(event);
eventPublisher.fire(event);
} catch (Exception ex) {
LOGGER.error("Error processing process instance event: {}", event, ex);
}
}

private void handleUserTaskInstanceEvent(UserTaskInstanceDataEvent<?> event) {
try {
indexingService.indexUserTaskInstanceEvent(event);
eventPublisher.fire(event);
} catch (Exception ex) {
LOGGER.error("Error processing userTask instance event: {}", event, ex);
}
}

private void handleProcessDefinitionEvent(ProcessDefinitionDataEvent event) {
try {
indexingService.indexProcessDefinition(event);
eventPublisher.fire(event);
} catch (Exception ex) {
LOGGER.error("Error processing process definition event: {}", event, ex);
}
public void onProcessDefinitionDataEvent(ProcessDefinitionDataEvent event) {
LOGGER.debug("Job received KogitoJobCloudEvent \n{}", event);
indexingService.indexProcessDefinition(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

package org.kie.kogito.index.service.messaging;

import java.net.URI;
import java.util.Arrays;
import java.util.Collection;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.process.MultipleProcessInstanceDataEvent;
import org.kie.kogito.event.process.ProcessDefinitionDataEvent;
import org.kie.kogito.event.process.ProcessInstanceDataEvent;
import org.kie.kogito.event.usertask.MultipleUserTaskInstanceDataEvent;
import org.kie.kogito.event.usertask.UserTaskInstanceDataEvent;
import org.kie.kogito.index.event.KogitoJobCloudEvent;
import org.kie.kogito.index.model.Job;
Expand All @@ -38,13 +42,12 @@
import org.mockito.MockitoAnnotations;
import org.mockito.junit.jupiter.MockitoExtension;

import io.smallrye.mutiny.Uni;

import jakarta.enterprise.event.Event;

import static org.mockito.Mockito.*;

@ExtendWith(MockitoExtension.class)
@Disabled
class BlockingMessagingEventConsumerTest {

@Mock
Expand All @@ -70,16 +73,14 @@ void testOnProcessInstanceEvent() {
ProcessInstanceDataEvent<?> event1 = mock(ProcessInstanceDataEvent.class);
ProcessInstanceDataEvent<?> event2 = mock(ProcessInstanceDataEvent.class);
Collection<ProcessInstanceDataEvent<?>> events = Arrays.asList(event1, event2);
MultipleProcessInstanceDataEvent event = new MultipleProcessInstanceDataEvent(URI.create("dummy"), events);

// Act
Uni<Void> result = consumer.onProcessInstanceEvent(events);
consumer.onProcessInstanceEvent(event);

// Assert
verify(indexingService, times(1)).indexProcessInstanceEvent(event1);
verify(indexingService, times(1)).indexProcessInstanceEvent(event2);
verify(eventPublisher, times(1)).fire(event1);
verify(eventPublisher, times(1)).fire(event2);
result.await().indefinitely(); // Ensure Uni completes without error
verify(indexingService, times(1)).indexProcessInstanceEvent(event);
verify(eventPublisher, times(1)).fire(event);
}

@Test
Expand All @@ -88,16 +89,14 @@ void testOnUserTaskInstanceEvent() {
UserTaskInstanceDataEvent<?> event1 = mock(UserTaskInstanceDataEvent.class);
UserTaskInstanceDataEvent<?> event2 = mock(UserTaskInstanceDataEvent.class);
Collection<UserTaskInstanceDataEvent<?>> events = Arrays.asList(event1, event2);
MultipleUserTaskInstanceDataEvent event = new MultipleUserTaskInstanceDataEvent(URI.create("dummy"), events);

// Act
Uni<Void> result = consumer.onUserTaskInstanceEvent(events);
consumer.onUserTaskInstanceEvent(event);

// Assert
verify(indexingService, times(1)).indexUserTaskInstanceEvent(event1);
verify(indexingService, times(1)).indexUserTaskInstanceEvent(event2);
verify(eventPublisher, times(1)).fire(event1);
verify(eventPublisher, times(1)).fire(event2);
result.await().indefinitely(); // Ensure Uni completes without error
verify(indexingService, times(1)).indexUserTaskInstanceEvent(event);
verify(eventPublisher, times(1)).fire(event);
}

@Test
Expand All @@ -108,10 +107,7 @@ void testOnJobEvent() {
when(event.getData()).thenReturn(mockJob);

// Act
Uni<Void> result = consumer.onJobEvent(event);

// Await the result before assertions
result.await().indefinitely(); // Ensure Uni completes without error
consumer.onJobEvent(event);

// Assert
verify(indexingService, times(1)).indexJob(mockJob); // Perform the verification after Uni completes
Expand All @@ -121,18 +117,13 @@ void testOnJobEvent() {
void testOnProcessDefinitionDataEvent() {
// Arrange
ProcessDefinitionDataEvent event1 = mock(ProcessDefinitionDataEvent.class);
ProcessDefinitionDataEvent event2 = mock(ProcessDefinitionDataEvent.class);
Collection<ProcessDefinitionDataEvent> events = Arrays.asList(event1, event2);

// Act
Uni<Void> result = consumer.onProcessDefinitionDataEvent(events);
consumer.onProcessDefinitionDataEvent(event1);

// Assert
verify(indexingService, times(1)).indexProcessDefinition(event1);
verify(indexingService, times(1)).indexProcessDefinition(event2);
verify(eventPublisher, times(1)).fire(event1);
verify(eventPublisher, times(1)).fire(event2);
result.await().indefinitely(); // Ensure Uni completes without error
}

@Test
Expand All @@ -143,12 +134,10 @@ void testErrorHandlingInOnProcessInstanceEvent() {
doThrow(new RuntimeException("On purpose! Indexing failed")).when(indexingService).indexProcessInstanceEvent(event);

// Act
Uni<Void> result = consumer.onProcessInstanceEvent(events);
consumer.onProcessInstanceEvent(new MultipleProcessInstanceDataEvent(URI.create("dummy"), events));

// Assert
verify(indexingService, times(1)).indexProcessInstanceEvent(event);
verify(eventPublisher, never()).fire(event); // Event should not be published if indexing fails
result.await().indefinitely(); // Ensure Uni completes without error
}

@Test
Expand All @@ -159,27 +148,24 @@ void testErrorHanlingInOnUserTaskInstanceEvent() {
doThrow(new RuntimeException("On purpose! Indexing failed")).when(indexingService).indexUserTaskInstanceEvent(event);

// Act
Uni<Void> result = consumer.onUserTaskInstanceEvent(events);
consumer.onUserTaskInstanceEvent(new MultipleUserTaskInstanceDataEvent(URI.create("dummy"), events));

// Assert
verify(indexingService, times(1)).indexUserTaskInstanceEvent(event);
verify(eventPublisher, never()).fire(event); // Event should not be published if indexing fails
result.await().indefinitely(); // Ensure Uni completes without error

}

@Test
void testErrorHandlingInOnProcessDefinitionDataEvent() {
// Arrange
ProcessDefinitionDataEvent event = mock(ProcessDefinitionDataEvent.class);
Collection<ProcessDefinitionDataEvent> events = Arrays.asList(event);
doThrow(new RuntimeException("On purpose! Indexing failed")).when(indexingService).indexProcessDefinition(event);

// Act
Uni<Void> result = consumer.onProcessDefinitionDataEvent(events);
consumer.onProcessDefinitionDataEvent(event);

// Assert
verify(indexingService, times(1)).indexProcessDefinition(event);
verify(eventPublisher, never()).fire(event);
result.await().indefinitely(); // Ensure Uni completes without error
}
}

0 comments on commit 3c80c57

Please sign in to comment.