diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 7cc1021..5acfe11 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -420,6 +420,33 @@ private void initialize() { registerActiveQueueRegistrationRefresh(); registerQueueCheck(); registerMetricsGathering(configuration); + registerNotexpiredQueueCheck(); + } + + private void registerNotexpiredQueueCheck() { + vertx.setPeriodic(20 * 1000, event -> { + if (!log.isDebugEnabled()) { + return; + } + String keysPattern = consumersPrefix + "*"; + log.debug("RedisQues list not expired consumers keys:"); + redisProvider.redis().onSuccess(redisAPI -> redisAPI.keys(keysPattern, keysResult -> { + if (keysResult.failed() || keysResult.result() == null) { + log.error("Unable to get redis keys of consumers", keysResult.cause()); + return; + } + Response keys = keysResult.result(); + if (keys == null || keys.size() == 0) { + log.debug("No consumers found to reset"); + return; + } + for (Response response : keys) { + log.debug(response.toString()); + } + log.debug("Key list end"); + })) + .onFailure(throwable -> log.error("Redis: Unable to get redis keys of consumers", throwable)); + }); } private void registerMetricsGathering(RedisquesConfiguration configuration){ @@ -485,7 +512,7 @@ public void run() { startEpochMs = currentTimeMillis(); if (size > 5_000) log.warn("Going to report {} dequeue statistics towards collector", size); else if (size > 500) log.info("Going to report {} dequeue statistics towards collector", size); - else log.debug("Going to report {} dequeue statistics towards collector", size); + else log.trace("Going to report {} dequeue statistics towards collector", size); } catch (Throwable ex) { isRunning.set(false); throw ex; @@ -519,7 +546,9 @@ void resume() { if (event.failed()) { log.error("publishing dequeue statistics not complete, just continue", event.cause()); } - log.debug("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs); + if (log.isTraceEnabled()){ + log.trace("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs); + } isRunning.set(false); }); } catch (Throwable ex) { @@ -828,7 +857,7 @@ private Future consume(final String queueName) { // consumer was restarted log.warn("Received request to consume from a queue I did not know about: {}", queueName); } - log.debug("RedisQues Starting to consume queue {}", queueName); + log.trace("RedisQues Starting to consume queue {}", queueName); readQueue(queueName).onComplete(readQueueEvent -> { if (readQueueEvent.failed()) { log.warn("TODO error handling", exceptionFactory.newException( @@ -837,12 +866,12 @@ private Future consume(final String queueName) { promise.complete(); }); } else { - log.debug("RedisQues Queue {} is already being consumed", queueName); + log.trace("RedisQues Queue {} is already being consumed", queueName); promise.complete(); } } else { // Somehow registration changed. Let's renotify. - log.debug("Registration for queue {} has changed to {}", queueName, consumer); + log.trace("Registration for queue {} has changed to {}", queueName, consumer); myQueues.remove(queueName); notifyConsumer(queueName).onComplete(notifyConsumerEvent -> { if (notifyConsumerEvent.failed()) { @@ -961,7 +990,7 @@ private Future readQueue(final String queueName) { // Failed. Message will be kept in queue and retried later log.debug("RedisQues Processing failed for queue {}", queueName); // reschedule - log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval); + log.trace("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval); rescheduleSendMessageAfterFailure(queueName, retryInterval); promise.complete(); } @@ -1159,7 +1188,9 @@ private Future checkQueues() { redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p); return p.future(); }).compose((Response queues) -> { - log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs); + if (log.isDebugEnabled()) { + log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs); + } assert ctx.counter == null; assert ctx.iter == null; ctx.counter = new AtomicInteger(queues.size()); @@ -1201,7 +1232,7 @@ private Future checkQueues() { return; } if (event.result().toLong() == 1) { - log.debug("Updating queue timestamp for queue '{}'", queueName); + log.trace("Updating queue timestamp for queue '{}'", queueName); // If not empty, update the queue timestamp to keep it in the sorted set. updateTimestamp(queueName, upTsResult -> { if (upTsResult.failed()) {