Skip to content

Commit

Permalink
ISSUE-5381 Fix RedisKeyRegistrationHandler could not deserialize list…
Browse files Browse the repository at this point in the history
… event (#1457)
  • Loading branch information
vttranlina authored Jan 13, 2025
1 parent fd914e4 commit 2da01c0
Showing 1 changed file with 28 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import java.util.List;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.util.MDCBuilder;
import org.apache.james.util.MDCStructuredLogger;
Expand Down Expand Up @@ -158,19 +160,19 @@ private Mono<Void> handleChannelMessage(ChannelMessage<String, String> channelMe
return Mono.empty();
}

Event event = toEvent(keyChannelMessage.eventAsJson());
List<Event> events = toEvent(keyChannelMessage.eventAsJson());

return Flux.fromIterable(listenersToCall)
.flatMap(listener -> executeListener(listener, event, registrationKey), EventBus.EXECUTION_RATE)
.flatMap(listener -> executeListener(listener, events, registrationKey), EventBus.EXECUTION_RATE)
.then();
}

private Mono<Void> executeListener(EventListener.ReactiveEventListener listener, Event event, RegistrationKey key) {
private Mono<Void> executeListener(EventListener.ReactiveEventListener listener, List<Event> events, RegistrationKey key) {
MDCBuilder mdcBuilder = MDCBuilder.create()
.addToContext(EventBus.StructuredLoggingFields.REGISTRATION_KEY, key.asString());

return listenerExecutor.execute(listener, mdcBuilder, event)
.doOnError(e -> structuredLogger(event, key)
return listenerExecutor.execute(listener, mdcBuilder, events)
.doOnError(e -> structuredLogger(events, key)
.log(logger -> logger.error("Exception happens when handling event", e)))
.onErrorResume(e -> Mono.empty())
.then();
Expand All @@ -181,15 +183,30 @@ private boolean isLocalSynchronousListeners(EventBusId eventBusId, EventListener
listener.getExecutionMode().equals(EventListener.ExecutionMode.SYNCHRONOUS);
}

private Event toEvent(String eventAsJson) {
return eventSerializer.asEvent(eventAsJson);
private List<Event> toEvent(String eventAsJson) {
// if the json is an array, we have multiple events
if (StringUtils.trim(eventAsJson).startsWith("[")) {
return eventSerializer.asEvents(eventAsJson);
}

try {
return List.of(eventSerializer.asEvent(eventAsJson));
} catch (RuntimeException exception) {
return eventSerializer.asEvents(eventAsJson);
}
}

private StructuredLogger structuredLogger(Event event, RegistrationKey key) {
private StructuredLogger structuredLogger(List<Event> events, RegistrationKey key) {
return MDCStructuredLogger.forLogger(LOGGER)
.field(EventBus.StructuredLoggingFields.EVENT_ID, event.getEventId().getId().toString())
.field(EventBus.StructuredLoggingFields.EVENT_CLASS, event.getClass().getCanonicalName())
.field(EventBus.StructuredLoggingFields.USER, event.getUsername().asString())
.field(EventBus.StructuredLoggingFields.EVENT_ID, events.stream()
.map(e -> e.getEventId().getId().toString())
.collect(Collectors.joining(",")))
.field(EventBus.StructuredLoggingFields.EVENT_CLASS, events.stream()
.map(e -> e.getClass().getCanonicalName())
.collect(Collectors.joining(",")))
.field(EventBus.StructuredLoggingFields.USER, events.stream()
.map(e -> e.getUsername().asString())
.collect(Collectors.joining(",")))
.field(EventBus.StructuredLoggingFields.REGISTRATION_KEY, key.asString());
}
}

0 comments on commit 2da01c0

Please sign in to comment.