diff --git a/pom.xml b/pom.xml index 78bcdce..a29e688 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.4-SNAPSHOT + 4.1.5-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index afc3434..1566336 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -420,6 +420,36 @@ private void initialize() { registerActiveQueueRegistrationRefresh(); registerQueueCheck(); registerMetricsGathering(configuration); + registerNotExpiredQueueCheck(); + } + + private void registerNotExpiredQueueCheck() { + vertx.setPeriodic(20 * 1000, event -> { + if (!log.isDebugEnabled()) { + return; + } + String keysPattern = consumersPrefix + "*"; + log.debug("RedisQues list not expired consumers keys:"); + redisProvider.redis().onSuccess(redisAPI -> redisAPI.scan(Arrays.asList("0", "MATCH", keysPattern, "COUNT", "1000"), keysResult -> { + if (keysResult.failed() || keysResult.result() == null || keysResult.result().size() != 2) { + log.error("Unable to get redis keys of consumers", keysResult.cause()); + return; + } + Response keys = keysResult.result().get(1); + if (keys == null || keys.size() == 0) { + log.debug("0 not expired consumers keys found"); + return; + } + + if (log.isTraceEnabled()) { + for (Response response : keys) { + log.trace(response.toString()); + } + } + log.debug("{} not expired consumers keys found, {} keys in myQueues list", keys.size(), myQueues.size()); + })) + .onFailure(throwable -> log.error("Redis: Unable to get redis keys of consumers", throwable)); + }); } private void registerMetricsGathering(RedisquesConfiguration configuration){ @@ -485,7 +515,7 @@ public void run() { startEpochMs = currentTimeMillis(); if (size > 5_000) log.warn("Going to report {} dequeue statistics towards collector", size); else if (size > 500) log.info("Going to report {} dequeue statistics towards collector", size); - else log.debug("Going to report {} dequeue statistics towards collector", size); + else log.trace("Going to report {} dequeue statistics towards collector", size); } catch (Throwable ex) { isRunning.set(false); throw ex; @@ -519,7 +549,9 @@ void resume() { if (event.failed()) { log.error("publishing dequeue statistics not complete, just continue", event.cause()); } - log.debug("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs); + if (log.isTraceEnabled()){ + log.trace("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs); + } isRunning.set(false); }); } catch (Throwable ex) { @@ -828,7 +860,7 @@ private Future consume(final String queueName) { // consumer was restarted log.warn("Received request to consume from a queue I did not know about: {}", queueName); } - log.debug("RedisQues Starting to consume queue {}", queueName); + log.trace("RedisQues Starting to consume queue {}", queueName); readQueue(queueName).onComplete(readQueueEvent -> { if (readQueueEvent.failed()) { log.warn("TODO error handling", exceptionFactory.newException( @@ -837,12 +869,12 @@ private Future consume(final String queueName) { promise.complete(); }); } else { - log.debug("RedisQues Queue {} is already being consumed", queueName); + log.trace("RedisQues Queue {} is already being consumed", queueName); promise.complete(); } } else { // Somehow registration changed. Let's renotify. - log.debug("Registration for queue {} has changed to {}", queueName, consumer); + log.trace("Registration for queue {} has changed to {}", queueName, consumer); myQueues.remove(queueName); notifyConsumer(queueName).onComplete(notifyConsumerEvent -> { if (notifyConsumerEvent.failed()) { @@ -961,7 +993,7 @@ private Future readQueue(final String queueName) { // Failed. Message will be kept in queue and retried later log.debug("RedisQues Processing failed for queue {}", queueName); // reschedule - log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval); + log.trace("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval); rescheduleSendMessageAfterFailure(queueName, retryInterval); promise.complete(); } @@ -1159,7 +1191,9 @@ private Future checkQueues() { redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p); return p.future(); }).compose((Response queues) -> { - log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs); + if (log.isDebugEnabled()) { + log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs); + } assert ctx.counter == null; assert ctx.iter == null; ctx.counter = new AtomicInteger(queues.size()); @@ -1201,7 +1235,7 @@ private Future checkQueues() { return; } if (event.result().toLong() == 1) { - log.debug("Updating queue timestamp for queue '{}'", queueName); + log.trace("Updating queue timestamp for queue '{}'", queueName); // If not empty, update the queue timestamp to keep it in the sorted set. updateTimestamp(queueName, upTsResult -> { if (upTsResult.failed()) { @@ -1248,7 +1282,6 @@ private Future checkQueues() { } }); } else { - log.debug("all queue items time used is {} ms", System.currentTimeMillis() - startTs); onDone.accept(null, null); } return ctx.iter.hasNext(); @@ -1263,6 +1296,7 @@ private Future checkQueues() { ctx.counter = null; ctx.iter = null; // Mark this composition step as completed. + log.debug("all queue items time used is {} ms", System.currentTimeMillis() - startTs); p.complete(); } });