Skip to content

Commit

Permalink
code improved from feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Xin Zheng committed Dec 11, 2024
1 parent 71aa2ea commit fe3b03a
Showing 1 changed file with 20 additions and 11 deletions.
31 changes: 20 additions & 11 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -816,13 +816,17 @@ private void gracefulStop(final Handler<Void> doneHandler) {
consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> {
if (event.failed()) log.warn("TODO error handling", exceptionFactory.newException(
"unregister(" + event + ") failed", event.cause()));
if (unregisterEvent.failed()) {
log.warn("TODO error handling", exceptionFactory.newException(
"unregisterConsumers() failed", unregisterEvent.cause()));
}
consumersAliveMessageConsumer.unregister(event1 -> {
if (event1.failed()) log.warn("TODO error handling", exceptionFactory.newException(
"unregister(" + event1 + ") failed", event1.cause()));
unregisterConsumers(false).onComplete(unregisterConsumersEvent -> {
if (unregisterEvent.failed()) {
if (unregisterConsumersEvent.failed()) {
log.warn("TODO error handling", exceptionFactory.newException(
"unregisterConsumers() failed", unregisterEvent.cause()));
"unregisterConsumers(false) failed", unregisterConsumersEvent.cause()));
}
stoppedHandler = doneHandler;
if (myQueues.keySet().isEmpty()) {
Expand Down Expand Up @@ -1200,16 +1204,21 @@ private Future<Void> notifyConsumer(final String queueName) {
log.debug("RedisQues Sending registration request for queue {}", queueName);
eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName);
promise.complete();
} else {
if (!aliveConsumers.containsKey(consumer)) {
log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName);
redisAPI.del(Collections.singletonList(key), event1 -> promise.complete());
} else {
// Notify the registered consumer
log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName);
eb.send(consumer, queueName);
} else if (!aliveConsumers.containsKey(consumer)) {
log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName);
redisAPI.del(Collections.singletonList(key), result -> {
if (result.failed()) {
log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(event.cause()));
} else {
log.debug("{} consumer key removed", result.result().toLong());
}
promise.complete();
}
});
} else {
// Notify the registered consumer
log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName);
eb.send(consumer, queueName);
promise.complete();
}
}))
.onFailure(throwable -> {
Expand Down

0 comments on commit fe3b03a

Please sign in to comment.