diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 0091990..26d75f1 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -209,6 +209,8 @@ private enum QueueState { private MessageConsumer consumersMessageConsumer; + private MessageConsumer consumersAliveMessageConsumer; + // Configuration private RedisProvider redisProvider; @@ -234,6 +236,7 @@ private enum QueueState { private Map queueActions = new HashMap<>(); private Map dequeueStatistic = new ConcurrentHashMap<>(); + private Map aliveConsumers = new ConcurrentHashMap<>(); private boolean dequeueStatisticEnabled = false; private final RedisQuesExceptionFactory exceptionFactory; private PeriodicSkipScheduler periodicSkipScheduler; @@ -453,6 +456,8 @@ private void initialize() { // Handles registration requests consumersMessageConsumer = vertx.eventBus().consumer(address + "-consumers", this::handleRegistrationRequest); + consumersAliveMessageConsumer = vertx.eventBus().consumer(address + "-consumer-alive", this::handleConsumerAlive); + // Handles notifications uidMessageConsumer = vertx.eventBus().consumer(uid, event -> { final String queue = event.body(); @@ -468,8 +473,34 @@ private void initialize() { registerQueueCheck(); registerMetricsGathering(configuration); registerNotExpiredQueueCheck(); + registerKeepConsumerAlive(); } + private void registerKeepConsumerAlive() { + final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; + final String address = configurationProvider.configuration().getAddress(); + vertx.setPeriodic(10, periodMs, event -> { + Iterator> iterator = aliveConsumers.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (currentTimeMillis() > entry.getValue()) { + log.info("RedisQues consumer with id {}' has expired", entry.getKey()); + iterator.remove(); + } + } + vertx.eventBus().publish(address + "-consumer-alive", uid); + log.debug("RedisQues consumer {} keep alive published", uid); + }); + } + + private void handleConsumerAlive(Message msg) { + final String consumerId = msg.body(); + final long periodMs = configurationProvider.configuration().getRefreshPeriod() * 1000L; + aliveConsumers.put(consumerId, currentTimeMillis() + (periodMs * 4)); + log.debug("RedisQues consumer {} keep alive renewed", consumerId); + } + + private void registerNotExpiredQueueCheck() { vertx.setPeriodic(20 * 1000, event -> { if (!log.isDebugEnabled()) { @@ -784,16 +815,24 @@ public void stop() { private void gracefulStop(final Handler doneHandler) { consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> { if (event.failed()) log.warn("TODO error handling", exceptionFactory.newException( - "unregister(" + event + ") failed", event.cause())); - unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { - if( unregisterEvent.failed() ) { - log.warn("TODO error handling", exceptionFactory.newException( - "unregisterConsumers() failed", unregisterEvent.cause())); - } - stoppedHandler = doneHandler; - if (myQueues.keySet().isEmpty()) { - doneHandler.handle(null); - } + "unregister(" + event + ") failed", event.cause())); + if (unregisterEvent.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "unregisterConsumers() failed", unregisterEvent.cause())); + } + consumersAliveMessageConsumer.unregister(event1 -> { + if (event1.failed()) log.warn("TODO error handling", exceptionFactory.newException( + "unregister(" + event1 + ") failed", event1.cause())); + unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { + if (unregisterConsumersEvent.failed()) { + log.warn("TODO error handling", exceptionFactory.newException( + "unregisterConsumers(false) failed", unregisterConsumersEvent.cause())); + } + stoppedHandler = doneHandler; + if (myQueues.keySet().isEmpty()) { + doneHandler.handle(null); + } + }); }); })); } @@ -1165,6 +1204,16 @@ private Future notifyConsumer(final String queueName) { log.debug("RedisQues Sending registration request for queue {}", queueName); eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName); promise.complete(); + } else if (!aliveConsumers.containsKey(consumer)) { + log.warn("RedisQues consumer {} of queue {} does not exist.", consumer, queueName); + redisAPI.del(Collections.singletonList(key), result -> { + if (result.failed()) { + log.warn("Failed to removed consumer '{}'", key, exceptionFactory.newException(result.cause())); + } else { + log.debug("{} consumer key removed", result.result().toLong()); + } + promise.complete(); + }); } else { // Notify the registered consumer log.debug("RedisQues Notifying consumer {} to consume queue {}", consumer, queueName);