Skip to content

Commit

Permalink
ISSUE-5366 Allow executing some listeners synchronously (#1438)
Browse files Browse the repository at this point in the history
  • Loading branch information
chibenwa authored Jan 6, 2025
1 parent 68e6715 commit d75031c
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 18 deletions.
11 changes: 10 additions & 1 deletion docs/modules/ROOT/pages/tmail-backend/configure/redis.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,13 @@ Specified to TMail backend, we can configure the following configurations in the
| eventBus.redis.timeout
| Timeout for Redis event bus operations. Optional. Duration. Default to `10seconds`.

|===
|===

Additionally, Twake mail allows executing some listeners in a synchronous fashion upon dispatching. They are then
run synchronously as part of the dispatch process. If execution fails then retries happens asynchronously (best effort).

This is controlled by the following setting in `jvm.properties`:

....
tmail.eventbus.synchronous.listener.groups=PopulateEmailQueryViewListenerGroup,MailboxChangeListenerGroup
....
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.james.events;

import java.util.Collection;
import java.util.List;
import java.util.Set;

import jakarta.annotation.PreDestroy;
Expand All @@ -14,6 +15,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

Expand All @@ -23,8 +25,15 @@
import reactor.rabbitmq.Sender;

public class RabbitMQAndRedisEventBus implements EventBus, Startable {

static final List<String> listenersToExecuteSynchronously = Splitter.on(',').omitEmptyStrings().splitToList(System.getProperty("tmail.eventbus.synchronous.listener.groups", ""));
private static final Set<RegistrationKey> NO_KEY = ImmutableSet.of();
private static final String NOT_RUNNING_ERROR_MESSAGE = "Event Bus is not running";

public static boolean shouldBeExecutedSynchronously(Group listener) {
return listenersToExecuteSynchronously.contains(listener.getClass().getSimpleName());
}

static final String EVENT_BUS_ID = "eventBusId";

private final NamingStrategy namingStrategy;
Expand All @@ -45,7 +54,7 @@ public class RabbitMQAndRedisEventBus implements EventBus, Startable {

private volatile boolean isRunning;
private volatile boolean isStopping;
private GroupRegistrationHandler groupRegistrationHandler;
private TmailGroupRegistrationHandler groupRegistrationHandler;
private RedisKeyRegistrationHandler keyRegistrationHandler;
private TMailEventDispatcher eventDispatcher;
private final RedisEventBusConfiguration redisEventBusConfiguration;
Expand Down Expand Up @@ -84,9 +93,9 @@ public void start() {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
keyRegistrationHandler = new RedisKeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, routingKeyConverter,
localListenerRegistry, listenerExecutor, retryBackoff, metricFactory, redisEventBusClientFactory, redisSetReactiveCommands, redisEventBusConfiguration);
groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId, configuration);
groupRegistrationHandler = new TmailGroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId, configuration);
eventDispatcher = new TMailEventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters, configuration,
redisPublisher, redisSetReactiveCommands, redisEventBusConfiguration);
redisPublisher, redisSetReactiveCommands, redisEventBusConfiguration, groupRegistrationHandler);

eventDispatcher.start();
keyRegistrationHandler.start();
Expand All @@ -101,9 +110,9 @@ void startWithoutStartingKeyRegistrationHandler() {
LocalListenerRegistry localListenerRegistry = new LocalListenerRegistry();
keyRegistrationHandler = new RedisKeyRegistrationHandler(namingStrategy, eventBusId, eventSerializer, routingKeyConverter,
localListenerRegistry, listenerExecutor, retryBackoff, metricFactory, redisEventBusClientFactory, redisSetReactiveCommands, redisEventBusConfiguration);
groupRegistrationHandler = new GroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId, configuration);
groupRegistrationHandler = new TmailGroupRegistrationHandler(namingStrategy, eventSerializer, channelPool, sender, receiverProvider, retryBackoff, eventDeadLetters, listenerExecutor, eventBusId, configuration);
eventDispatcher = new TMailEventDispatcher(namingStrategy, eventBusId, eventSerializer, sender, localListenerRegistry, listenerExecutor, eventDeadLetters, configuration,
redisPublisher, redisSetReactiveCommands, redisEventBusConfiguration);
redisPublisher, redisSetReactiveCommands, redisEventBusConfiguration, groupRegistrationHandler);

keyRegistrationHandler.declarePubSubChannel();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete;
import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable;
import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive;
import static org.apache.james.events.GroupRegistration.DEFAULT_RETRY_COUNT;
import static org.apache.james.events.RabbitMQAndRedisEventBus.EVENT_BUS_ID;

import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -62,13 +63,14 @@ public class TMailEventDispatcher {
private final RedisSetReactiveCommands<String, String> redisSetReactiveCommands;
private final EventBusId eventBusId;
private final RedisEventBusConfiguration redisEventBusConfiguration;
private final TmailGroupRegistrationHandler groupRegistrationHandler;

TMailEventDispatcher(NamingStrategy namingStrategy, EventBusId eventBusId, EventSerializer eventSerializer, Sender sender,
LocalListenerRegistry localListenerRegistry,
ListenerExecutor listenerExecutor,
EventDeadLetters deadLetters, RabbitMQConfiguration configuration,
RedisPubSubReactiveCommands<String, String> redisPubSubReactiveCommands,
RedisSetReactiveCommands<String, String> redisSetReactiveCommands, RedisEventBusConfiguration redisEventBusConfiguration) {
RedisSetReactiveCommands<String, String> redisSetReactiveCommands, RedisEventBusConfiguration redisEventBusConfiguration, TmailGroupRegistrationHandler groupRegistrationHandler) {
this.namingStrategy = namingStrategy;
this.eventSerializer = eventSerializer;
this.sender = sender;
Expand All @@ -87,6 +89,7 @@ public class TMailEventDispatcher {
this.redisSetReactiveCommands = redisSetReactiveCommands;
this.eventBusId = eventBusId;
this.redisEventBusConfiguration = redisEventBusConfiguration;
this.groupRegistrationHandler = groupRegistrationHandler;
}

void start() {
Expand Down Expand Up @@ -114,6 +117,7 @@ void start() {
Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {
return Flux
.concat(
executeLocalSynchronousListeners(ImmutableList.of(new EventBus.EventWithRegistrationKey(event, keys))),
dispatchToLocalListeners(event, keys),
dispatchToRemoteListeners(event, keys))
.doOnError(throwable -> LOGGER.error("error while dispatching event", throwable))
Expand All @@ -122,14 +126,31 @@ Mono<Void> dispatch(Event event, Set<RegistrationKey> keys) {

Mono<Void> dispatch(Collection<EventBus.EventWithRegistrationKey> events) {
return Flux
.concat(Flux.fromIterable(events)
.concatMap(e -> dispatchToLocalListeners(e.event(), e.keys()))
.then(),
.concat(
executeLocalSynchronousListeners(events),
dispatchToLocalListeners(events),
dispatchToRemoteListeners(events))
.doOnError(throwable -> LOGGER.error("error while dispatching event", throwable))
.then();
}

private Mono<Void> executeLocalSynchronousListeners(Collection<EventBus.EventWithRegistrationKey> events) {
if (RabbitMQAndRedisEventBus.listenersToExecuteSynchronously.isEmpty()) {
return Mono.empty();
}
return Flux.fromStream(groupRegistrationHandler.synchronousGroupRegistrations())
.flatMap(registration -> registration.runListenerReliably(DEFAULT_RETRY_COUNT, events.stream()
.map(EventBus.EventWithRegistrationKey::event)
.collect(ImmutableList.toImmutableList())))
.then();
}

private Mono<Void> dispatchToLocalListeners(Collection<EventBus.EventWithRegistrationKey> events) {
return Flux.fromIterable(events)
.concatMap(e -> dispatchToLocalListeners(e.event(), e.keys()))
.then();
}

private Mono<Void> dispatchToLocalListeners(Event event, Set<RegistrationKey> keys) {
return Flux.fromIterable(keys)
.flatMap(key -> Flux.fromIterable(localListenerRegistry.getLocalListeners(key))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
* regarding copyright ownership. The ASF licenses this file *
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
* http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/

package org.apache.james.events;

import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
import static org.apache.james.backends.rabbitmq.Constants.EMPTY_ROUTING_KEY;
import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
import static org.apache.james.backends.rabbitmq.Constants.REQUEUE;
import static org.apache.james.backends.rabbitmq.Constants.evaluateAutoDelete;
import static org.apache.james.backends.rabbitmq.Constants.evaluateDurable;
import static org.apache.james.backends.rabbitmq.Constants.evaluateExclusive;
import static org.apache.james.events.GroupRegistration.DEFAULT_RETRY_COUNT;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.util.ReactorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.ImmutableList;

import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.QueueSpecification;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.Sender;
import reactor.util.retry.Retry;

class TmailGroupRegistrationHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(TmailGroupRegistrationHandler.class);

public static class GroupRegistrationHandlerGroup extends Group {

}

static final Group GROUP = new GroupRegistrationHandlerGroup();

private final NamingStrategy namingStrategy;
private final Map<Group, GroupRegistration> groupRegistrations;
private final EventSerializer eventSerializer;
private final ReactorRabbitMQChannelPool channelPool;
private final Sender sender;
private final ReceiverProvider receiverProvider;
private final RetryBackoffConfiguration retryBackoff;
private final EventDeadLetters eventDeadLetters;
private final ListenerExecutor listenerExecutor;
private final RabbitMQConfiguration configuration;
private final GroupRegistration.WorkQueueName queueName;
private final Scheduler scheduler;
private Optional<Disposable> consumer;

TmailGroupRegistrationHandler(NamingStrategy namingStrategy, EventSerializer eventSerializer, ReactorRabbitMQChannelPool channelPool, Sender sender, ReceiverProvider receiverProvider,
RetryBackoffConfiguration retryBackoff,
EventDeadLetters eventDeadLetters, ListenerExecutor listenerExecutor, EventBusId eventBusId, RabbitMQConfiguration configuration) {
this.namingStrategy = namingStrategy;
this.eventSerializer = eventSerializer;
this.channelPool = channelPool;
this.sender = sender;
this.receiverProvider = receiverProvider;
this.retryBackoff = retryBackoff;
this.eventDeadLetters = eventDeadLetters;
this.listenerExecutor = listenerExecutor;
this.configuration = configuration;
this.groupRegistrations = new ConcurrentHashMap<>();
this.queueName = namingStrategy.workQueue(GROUP);
this.scheduler = Schedulers.newBoundedElastic(EventBus.EXECUTION_RATE, ReactorUtils.DEFAULT_BOUNDED_ELASTIC_QUEUESIZE, "groups-handler");
this.consumer = Optional.empty();
}

Stream<GroupRegistration> synchronousGroupRegistrations() {
return groupRegistrations.entrySet()
.stream()
.filter(registration -> RabbitMQAndRedisEventBus.shouldBeExecutedSynchronously(registration.getKey()))
.map(Map.Entry::getValue);
}

Stream<GroupRegistration> asynchronousGroupRegistrations() {
return groupRegistrations.entrySet()
.stream()
.filter(registration -> !RabbitMQAndRedisEventBus.shouldBeExecutedSynchronously(registration.getKey()))
.map(Map.Entry::getValue);
}

GroupRegistration retrieveGroupRegistration(Group group) {
return Optional.ofNullable(groupRegistrations.get(group))
.orElseThrow(() -> new GroupRegistrationNotFound(group));
}

public void start() {
channelPool.createWorkQueue(
QueueSpecification.queue(queueName.asString())
.durable(evaluateDurable(DURABLE, configuration.isQuorumQueuesUsed()))
.exclusive(evaluateExclusive(!EXCLUSIVE, configuration.isQuorumQueuesUsed()))
.autoDelete(evaluateAutoDelete(!AUTO_DELETE, configuration.isQuorumQueuesUsed()))
.arguments(configuration.workQueueArgumentsBuilder()
.deadLetter(namingStrategy.deadLetterExchange())
.build()),
BindingSpecification.binding()
.exchange(namingStrategy.exchange())
.queue(queueName.asString())
.routingKey(EMPTY_ROUTING_KEY))
.retryWhen(Retry.backoff(retryBackoff.getMaxRetries(), retryBackoff.getFirstBackoff()).jitter(retryBackoff.getJitterFactor()).scheduler(Schedulers.boundedElastic()))
.block();

this.consumer = Optional.of(consumeWorkQueue());
}

private Disposable consumeWorkQueue() {
return Flux.using(
receiverProvider::createReceiver,
receiver -> receiver.consumeManualAck(queueName.asString(), new ConsumeOptions().qos(EventBus.EXECUTION_RATE)),
Receiver::close)
.filter(delivery -> Objects.nonNull(delivery.getBody()))
.flatMap(this::deliver, EventBus.EXECUTION_RATE)
.subscribeOn(scheduler)
.subscribe();
}

private Mono<Void> deliver(AcknowledgableDelivery acknowledgableDelivery) {
byte[] eventAsBytes = acknowledgableDelivery.getBody();

return deserializeEvents(eventAsBytes)
.flatMapIterable(events -> asynchronousGroupRegistrations()
.map(group -> Pair.of(group, events))
.collect(ImmutableList.toImmutableList()))
.flatMap(event -> event.getLeft().runListenerReliably(DEFAULT_RETRY_COUNT, event.getRight()))
.then(Mono.<Void>fromRunnable(acknowledgableDelivery::ack).subscribeOn(Schedulers.boundedElastic()))
.then()
.onErrorResume(e -> {
LOGGER.error("Unable to process delivery for group {}", GROUP, e);
return Mono.fromRunnable(() -> acknowledgableDelivery.nack(!REQUEUE))
.subscribeOn(Schedulers.boundedElastic())
.then();
});
}

private Mono<List<Event>> deserializeEvents(byte[] eventAsBytes) {
return Mono.fromCallable(() -> eventSerializer.asEventsFromBytes(eventAsBytes));
}

void stop() {
groupRegistrations.values().forEach(groupRegistration -> Mono.from(groupRegistration.unregister()).block());
consumer.ifPresent(Disposable::dispose);
Optional.ofNullable(scheduler).ifPresent(Scheduler::dispose);
}

void restart() {
Optional<Disposable> previousConsumer = consumer;
consumer = Optional.of(consumeWorkQueue());
previousConsumer
.filter(Predicate.not(Disposable::isDisposed))
.ifPresent(Disposable::dispose);

groupRegistrations.values()
.forEach(GroupRegistration::restart);
}

Registration register(EventListener.ReactiveEventListener listener, Group group) {
if (groupRegistrations.isEmpty()) {
start();
}
return groupRegistrations
.compute(group, (groupToRegister, oldGroupRegistration) -> {
if (oldGroupRegistration != null) {
throw new GroupAlreadyRegistered(group);
}
return newGroupRegistration(listener, groupToRegister);
})
.start();
}

private GroupRegistration newGroupRegistration(EventListener.ReactiveEventListener listener, Group group) {
return new GroupRegistration(
namingStrategy, channelPool, sender,
receiverProvider,
eventSerializer,
listener,
group,
retryBackoff,
eventDeadLetters,
() -> groupRegistrations.remove(group),
listenerExecutor, configuration);
}

Collection<Group> registeredGroups() {
return groupRegistrations.keySet();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class RabbitMQAndRedisEventBusTest implements GroupContract.SingleEventBusGroupC

@RegisterExtension
static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ()
.isolationPolicy(RabbitMQExtension.IsolationPolicy.WEAK);
.isolationPolicy(RabbitMQExtension.IsolationPolicy.STRONG);

@RegisterExtension
static RedisExtension redisExtension = new RedisExtension();
Expand Down
Loading

0 comments on commit d75031c

Please sign in to comment.