From 7f61e229c0ab2f59ded11bc3df798c586b786c7d Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Tue, 10 Dec 2024 14:03:26 +0700 Subject: [PATCH 1/5] remove consumer if it is not exist --- .../org/swisspush/redisques/RedisQues.java | 72 +++++++++++++++---- 1 file changed, 58 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 0091990..c2fb8f3 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -209,6 +209,8 @@ private enum QueueState { private MessageConsumer consumersMessageConsumer; + private MessageConsumer consumersAliveMessageConsumer; + // Configuration private RedisProvider redisProvider; @@ -234,6 +236,7 @@ private enum QueueState { private Map queueActions = new HashMap<>(); private Map dequeueStatistic = new ConcurrentHashMap<>(); + private Map aliveConsumer = new ConcurrentHashMap<>(); private boolean dequeueStatisticEnabled = false; private final RedisQuesExceptionFactory exceptionFactory; private PeriodicSkipScheduler periodicSkipScheduler; @@ -453,6 +456,8 @@ private void initialize() { // Handles registration requests consumersMessageConsumer = vertx.eventBus().consumer(address + "-consumers", this::handleRegistrationRequest); + consumersAliveMessageConsumer = vertx.eventBus().consumer(address + "-consumer-alive", this::handleConsumerAlive); + // Handles notifications uidMessageConsumer = vertx.eventBus().consumer(uid, event -> { final String queue = event.body(); @@ -468,8 +473,38 @@ private void initialize() { registerQueueCheck(); registerMetricsGathering(configuration); registerNotExpiredQueueCheck(); + registerKeepConsumerAlive(); + } + + private void registerKeepConsumerAlive() { + final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; + final String address = configurationProvider.configuration().getAddress(); + vertx.setPeriodic(10, periodMs, event -> { + Iterator> iterator = aliveConsumer.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (currentTimeMillis() > entry.getValue()) { + log.info("RedisQues consumer with id {}' has expired", entry.getKey()); + iterator.remove(); + } + } + log.debug("RedisQues consumer {} keep alive published", uid); + vertx.eventBus().publish(address + "-consumer-alive", uid); + }); + } + + private void handleConsumerAlive(Message msg) { + final String consumerId = msg.body(); + final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; + if (!UUID.fromString(consumerId).toString().equals(consumerId)) { + log.warn("invalid RedisQues consumer id {}", consumerId); + return; + } + log.debug("RedisQues consumer {} keep alive renewed", consumerId); + aliveConsumer.put(consumerId, currentTimeMillis() + (periodMs * 4)); } + private void registerNotExpiredQueueCheck() { vertx.setPeriodic(20 * 1000, event -> { if (!log.isDebugEnabled()) { @@ -784,16 +819,20 @@ public void stop() { private void gracefulStop(final Handler doneHandler) { consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> { if (event.failed()) log.warn("TODO error handling", exceptionFactory.newException( - "unregister(" + event + ") failed", event.cause())); - unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { - if( unregisterEvent.failed() ) { - log.warn("TODO error handling", exceptionFactory.newException( - "unregisterConsumers() failed", unregisterEvent.cause())); - } - stoppedHandler = doneHandler; - if (myQueues.keySet().isEmpty()) { - doneHandler.handle(null); - } + "unregister(" + event + ") failed", event.cause())); + consumersAliveMessageConsumer.unregister(event1 -> { + if (event1.failed()) log.warn("TODO error handling", exceptionFactory.newException( + "unregister(" + event1 + ") failed", event1.cause())); + unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { + if (unregisterEvent.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "unregisterConsumers() failed", unregisterEvent.cause())); + } + stoppedHandler = doneHandler; + if (myQueues.keySet().isEmpty()) { + doneHandler.handle(null); + } + }); }); })); } @@ -1166,10 +1205,15 @@ private Future notifyConsumer(final String queueName) { eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName); promise.complete(); } else { - // Notify the registered consumer - log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName); - eb.send(consumer, queueName); - promise.complete(); + if (!aliveConsumer.containsKey(consumer)) { + log.warn("RedisQues consumer {} of queue {} is not exist.", consumer, queueName); + redisAPI.del(Collections.singletonList(key), event1 -> promise.complete()); + } else { + // Notify the registered consumer + log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName); + eb.send(consumer, queueName); + promise.complete(); + } } })) .onFailure(throwable -> { From 8bcd92cb5ce741d126641f01b6c0212694e4f33a Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Tue, 10 Dec 2024 15:31:41 +0700 Subject: [PATCH 2/5] renamed --- src/main/java/org/swisspush/redisques/RedisQues.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index c2fb8f3..9d76771 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -236,7 +236,7 @@ private enum QueueState { private Map queueActions = new HashMap<>(); private Map dequeueStatistic = new ConcurrentHashMap<>(); - private Map aliveConsumer = new ConcurrentHashMap<>(); + private Map aliveConsumers = new ConcurrentHashMap<>(); private boolean dequeueStatisticEnabled = false; private final RedisQuesExceptionFactory exceptionFactory; private PeriodicSkipScheduler periodicSkipScheduler; @@ -480,7 +480,7 @@ private void registerKeepConsumerAlive() { final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; final String address = configurationProvider.configuration().getAddress(); vertx.setPeriodic(10, periodMs, event -> { - Iterator> iterator = aliveConsumer.entrySet().iterator(); + Iterator> iterator = aliveConsumers.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next(); if (currentTimeMillis() > entry.getValue()) { @@ -488,8 +488,8 @@ private void registerKeepConsumerAlive() { iterator.remove(); } } - log.debug("RedisQues consumer {} keep alive published", uid); vertx.eventBus().publish(address + "-consumer-alive", uid); + log.debug("RedisQues consumer {} keep alive published", uid); }); } @@ -501,7 +501,7 @@ private void handleConsumerAlive(Message msg) { return; } log.debug("RedisQues consumer {} keep alive renewed", consumerId); - aliveConsumer.put(consumerId, currentTimeMillis() + (periodMs * 4)); + aliveConsumers.put(consumerId, currentTimeMillis() + (periodMs * 4)); } @@ -1205,8 +1205,8 @@ private Future notifyConsumer(final String queueName) { eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName); promise.complete(); } else { - if (!aliveConsumer.containsKey(consumer)) { - log.warn("RedisQues consumer {} of queue {} is not exist.", consumer, queueName); + if (!aliveConsumers.containsKey(consumer)) { + log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName); redisAPI.del(Collections.singletonList(key), event1 -> promise.complete()); } else { // Notify the registered consumer From 71aa2ea77822d95b52f558e1d2ede50a869ad478 Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Tue, 10 Dec 2024 16:37:49 +0700 Subject: [PATCH 3/5] removed unused check --- src/main/java/org/swisspush/redisques/RedisQues.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 9d76771..8d7388a 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -496,12 +496,8 @@ private void registerKeepConsumerAlive() { private void handleConsumerAlive(Message msg) { final String consumerId = msg.body(); final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; - if (!UUID.fromString(consumerId).toString().equals(consumerId)) { - log.warn("invalid RedisQues consumer id {}", consumerId); - return; - } - log.debug("RedisQues consumer {} keep alive renewed", consumerId); aliveConsumers.put(consumerId, currentTimeMillis() + (periodMs * 4)); + log.debug("RedisQues consumer {} keep alive renewed", consumerId); } From fe3b03a45cf55e46745e5fd88041d8c3ecef1c2d Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Wed, 11 Dec 2024 15:09:47 +0700 Subject: [PATCH 4/5] code improved from feedback --- .../org/swisspush/redisques/RedisQues.java | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 8d7388a..c46be73 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -816,13 +816,17 @@ private void gracefulStop(final Handler doneHandler) { consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> { if (event.failed()) log.warn("TODO error handling", exceptionFactory.newException( "unregister(" + event + ") failed", event.cause())); + if (unregisterEvent.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "unregisterConsumers() failed", unregisterEvent.cause())); + } consumersAliveMessageConsumer.unregister(event1 -> { if (event1.failed()) log.warn("TODO error handling", exceptionFactory.newException( "unregister(" + event1 + ") failed", event1.cause())); unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { - if (unregisterEvent.failed()) { + if (unregisterConsumersEvent.failed()) { log.warn("TODO error handling", exceptionFactory.newException( - "unregisterConsumers() failed", unregisterEvent.cause())); + "unregisterConsumers(false) failed", unregisterConsumersEvent.cause())); } stoppedHandler = doneHandler; if (myQueues.keySet().isEmpty()) { @@ -1200,16 +1204,21 @@ private Future notifyConsumer(final String queueName) { log.debug("RedisQues Sending registration request for queue {}", queueName); eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName); promise.complete(); - } else { - if (!aliveConsumers.containsKey(consumer)) { - log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName); - redisAPI.del(Collections.singletonList(key), event1 -> promise.complete()); - } else { - // Notify the registered consumer - log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName); - eb.send(consumer, queueName); + } else if (!aliveConsumers.containsKey(consumer)) { + log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName); + redisAPI.del(Collections.singletonList(key), result -> { + if (result.failed()) { + log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(event.cause())); + } else { + log.debug("{} consumer key removed", result.result().toLong()); + } promise.complete(); - } + }); + } else { + // Notify the registered consumer + log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName); + eb.send(consumer, queueName); + promise.complete(); } })) .onFailure(throwable -> { From 9ae03c8537a118dea2c4e9721749a913655511ab Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Thu, 12 Dec 2024 13:48:54 +0700 Subject: [PATCH 5/5] fixed wrong cause has been logged --- src/main/java/org/swisspush/redisques/RedisQues.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index c46be73..26d75f1 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -1208,7 +1208,7 @@ private Future notifyConsumer(final String queueName) { log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName); redisAPI.del(Collections.singletonList(key), result -> { if (result.failed()) { - log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(event.cause())); + log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(result.cause())); } else { log.debug("{} consumer key removed", result.result().toLong()); }