From 2da01c091a168dc50e4146d346f9752d37bc6780 Mon Sep 17 00:00:00 2001 From: vttran Date: Mon, 13 Jan 2025 20:18:57 +0700 Subject: [PATCH] ISSUE-5381 Fix RedisKeyRegistrationHandler could not deserialize list event (#1457) --- .../events/RedisKeyRegistrationHandler.java | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/tmail-backend/event-bus-redis/src/main/java/org/apache/james/events/RedisKeyRegistrationHandler.java b/tmail-backend/event-bus-redis/src/main/java/org/apache/james/events/RedisKeyRegistrationHandler.java index d8be8618d4..8a60edf4d0 100644 --- a/tmail-backend/event-bus-redis/src/main/java/org/apache/james/events/RedisKeyRegistrationHandler.java +++ b/tmail-backend/event-bus-redis/src/main/java/org/apache/james/events/RedisKeyRegistrationHandler.java @@ -5,7 +5,9 @@ import java.util.List; import java.util.Optional; import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; import org.apache.james.metrics.api.MetricFactory; import org.apache.james.util.MDCBuilder; import org.apache.james.util.MDCStructuredLogger; @@ -158,19 +160,19 @@ private Mono handleChannelMessage(ChannelMessage channelMe return Mono.empty(); } - Event event = toEvent(keyChannelMessage.eventAsJson()); + List events = toEvent(keyChannelMessage.eventAsJson()); return Flux.fromIterable(listenersToCall) - .flatMap(listener -> executeListener(listener, event, registrationKey), EventBus.EXECUTION_RATE) + .flatMap(listener -> executeListener(listener, events, registrationKey), EventBus.EXECUTION_RATE) .then(); } - private Mono executeListener(EventListener.ReactiveEventListener listener, Event event, RegistrationKey key) { + private Mono executeListener(EventListener.ReactiveEventListener listener, List events, RegistrationKey key) { MDCBuilder mdcBuilder = MDCBuilder.create() .addToContext(EventBus.StructuredLoggingFields.REGISTRATION_KEY, key.asString()); - return listenerExecutor.execute(listener, mdcBuilder, event) - .doOnError(e -> structuredLogger(event, key) + return listenerExecutor.execute(listener, mdcBuilder, events) + .doOnError(e -> structuredLogger(events, key) .log(logger -> logger.error("Exception happens when handling event", e))) .onErrorResume(e -> Mono.empty()) .then(); @@ -181,15 +183,30 @@ private boolean isLocalSynchronousListeners(EventBusId eventBusId, EventListener listener.getExecutionMode().equals(EventListener.ExecutionMode.SYNCHRONOUS); } - private Event toEvent(String eventAsJson) { - return eventSerializer.asEvent(eventAsJson); + private List toEvent(String eventAsJson) { + // if the json is an array, we have multiple events + if (StringUtils.trim(eventAsJson).startsWith("[")) { + return eventSerializer.asEvents(eventAsJson); + } + + try { + return List.of(eventSerializer.asEvent(eventAsJson)); + } catch (RuntimeException exception) { + return eventSerializer.asEvents(eventAsJson); + } } - private StructuredLogger structuredLogger(Event event, RegistrationKey key) { + private StructuredLogger structuredLogger(List events, RegistrationKey key) { return MDCStructuredLogger.forLogger(LOGGER) - .field(EventBus.StructuredLoggingFields.EVENT_ID, event.getEventId().getId().toString()) - .field(EventBus.StructuredLoggingFields.EVENT_CLASS, event.getClass().getCanonicalName()) - .field(EventBus.StructuredLoggingFields.USER, event.getUsername().asString()) + .field(EventBus.StructuredLoggingFields.EVENT_ID, events.stream() + .map(e -> e.getEventId().getId().toString()) + .collect(Collectors.joining(","))) + .field(EventBus.StructuredLoggingFields.EVENT_CLASS, events.stream() + .map(e -> e.getClass().getCanonicalName()) + .collect(Collectors.joining(","))) + .field(EventBus.StructuredLoggingFields.USER, events.stream() + .map(e -> e.getUsername().asString()) + .collect(Collectors.joining(","))) .field(EventBus.StructuredLoggingFields.REGISTRATION_KEY, key.asString()); } }