From fe3b03a45cf55e46745e5fd88041d8c3ecef1c2d Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Wed, 11 Dec 2024 15:09:47 +0700 Subject: [PATCH] code improved from feedback --- .../org/swisspush/redisques/RedisQues.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 8d7388a..c46be73 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -816,13 +816,17 @@ private void gracefulStop(final Handler doneHandler) { consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> { if (event.failed()) log.warn("TODO error handling", exceptionFactory.newException( "unregister(" + event + ") failed", event.cause())); + if (unregisterEvent.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "unregisterConsumers() failed", unregisterEvent.cause())); + } consumersAliveMessageConsumer.unregister(event1 -> { if (event1.failed()) log.warn("TODO error handling", exceptionFactory.newException( "unregister(" + event1 + ") failed", event1.cause())); unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { - if (unregisterEvent.failed()) { + if (unregisterConsumersEvent.failed()) { log.warn("TODO error handling", exceptionFactory.newException( - "unregisterConsumers() failed", unregisterEvent.cause())); + "unregisterConsumers(false) failed", unregisterConsumersEvent.cause())); } stoppedHandler = doneHandler; if (myQueues.keySet().isEmpty()) { @@ -1200,16 +1204,21 @@ private Future notifyConsumer(final String queueName) { log.debug("RedisQues Sending registration request for queue {}", queueName); eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName); promise.complete(); - } else { - if (!aliveConsumers.containsKey(consumer)) { - log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName); - redisAPI.del(Collections.singletonList(key), event1 -> promise.complete()); - } else { - // Notify the registered consumer - log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName); - eb.send(consumer, queueName); + } else if (!aliveConsumers.containsKey(consumer)) { + log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName); + redisAPI.del(Collections.singletonList(key), result -> { + if (result.failed()) { + log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(event.cause())); + } else { + log.debug("{} consumer key removed", result.result().toLong()); + } promise.complete(); - } + }); + } else { + // Notify the registered consumer + log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName); + eb.send(consumer, queueName); + promise.complete(); } })) .onFailure(throwable -> {