Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup consumers that are no longer alive #230

Merged
merged 5 commits into from
Dec 12, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 54 additions & 14 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ private enum QueueState {

private MessageConsumer<String> consumersMessageConsumer;

private MessageConsumer<String> consumersAliveMessageConsumer;

// Configuration

private RedisProvider redisProvider;
Expand All @@ -234,6 +236,7 @@ private enum QueueState {
private Map<QueueOperation, QueueAction> queueActions = new HashMap<>();

private Map<String, DequeueStatistic> dequeueStatistic = new ConcurrentHashMap<>();
private Map<String, Long> aliveConsumers = new ConcurrentHashMap<>();
private boolean dequeueStatisticEnabled = false;
private final RedisQuesExceptionFactory exceptionFactory;
private PeriodicSkipScheduler periodicSkipScheduler;
Expand Down Expand Up @@ -453,6 +456,8 @@ private void initialize() {
// Handles registration requests
consumersMessageConsumer = vertx.eventBus().consumer(address + "-consumers", this::handleRegistrationRequest);

consumersAliveMessageConsumer = vertx.eventBus().consumer(address + "-consumer-alive", this::handleConsumerAlive);

// Handles notifications
uidMessageConsumer = vertx.eventBus().consumer(uid, event -> {
final String queue = event.body();
Expand All @@ -468,8 +473,34 @@ private void initialize() {
registerQueueCheck();
registerMetricsGathering(configuration);
registerNotExpiredQueueCheck();
registerKeepConsumerAlive();
}

private void registerKeepConsumerAlive() {
final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L;
final String address = configurationProvider.configuration().getAddress();
vertx.setPeriodic(10, periodMs, event -> {
Iterator<Map.Entry<String, Long>> iterator = aliveConsumers.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Long> entry = iterator.next();
if (currentTimeMillis() > entry.getValue()) {
log.info("RedisQues consumer with id {}' has expired", entry.getKey());
iterator.remove();
}
}
vertx.eventBus().publish(address + "-consumer-alive", uid);
log.debug("RedisQues consumer {} keep alive published", uid);
});
}

private void handleConsumerAlive(Message<String> msg) {
final String consumerId = msg.body();
final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L;
aliveConsumers.put(consumerId, currentTimeMillis() + (periodMs * 4));
log.debug("RedisQues consumer {} keep alive renewed", consumerId);
}


private void registerNotExpiredQueueCheck() {
vertx.setPeriodic(20 * 1000, event -> {
if (!log.isDebugEnabled()) {
Expand Down Expand Up @@ -784,16 +815,20 @@ public void stop() {
private void gracefulStop(final Handler<Void> doneHandler) {
consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> {
if (event.failed()) log.warn("TODO error handling", exceptionFactory.newException(
"unregister(" + event + ") failed", event.cause()));
unregisterConsumers(false).onComplete(unregisterConsumersEvent -> {
if( unregisterEvent.failed() ) {
log.warn("TODO error handling", exceptionFactory.newException(
"unregisterConsumers() failed", unregisterEvent.cause()));
}
stoppedHandler = doneHandler;
if (myQueues.keySet().isEmpty()) {
doneHandler.handle(null);
}
"unregister(" + event + ") failed", event.cause()));
consumersAliveMessageConsumer.unregister(event1 -> {
if (event1.failed()) log.warn("TODO error handling", exceptionFactory.newException(
"unregister(" + event1 + ") failed", event1.cause()));
unregisterConsumers(false).onComplete(unregisterConsumersEvent -> {
if (unregisterEvent.failed()) {
log.warn("TODO error handling", exceptionFactory.newException(
"unregisterConsumers() failed", unregisterEvent.cause()));
}
stoppedHandler = doneHandler;
if (myQueues.keySet().isEmpty()) {
doneHandler.handle(null);
}
});
dominik-cnx marked this conversation as resolved.
Show resolved Hide resolved
dominik-cnx marked this conversation as resolved.
Show resolved Hide resolved
});
}));
}
Expand Down Expand Up @@ -1166,10 +1201,15 @@ private Future<Void> notifyConsumer(final String queueName) {
eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName);
promise.complete();
} else {
// Notify the registered consumer
log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName);
eb.send(consumer, queueName);
promise.complete();
if (!aliveConsumers.containsKey(consumer)) {
log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName);
redisAPI.del(Collections.singletonList(key), event1 -> promise.complete());
dominik-cnx marked this conversation as resolved.
Show resolved Hide resolved
} else {
// Notify the registered consumer
log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName);
eb.send(consumer, queueName);
promise.complete();
}
dominik-cnx marked this conversation as resolved.
Show resolved Hide resolved
}
}))
.onFailure(throwable -> {
Expand Down
Loading