From 74ddb7e3d9e21c1dd143cd4a7f10d008796fe5c8 Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Thu, 7 Nov 2024 09:23:16 +0700 Subject: [PATCH 1/3] logging optimization and logs not expired keys --- .../org/swisspush/redisques/RedisQues.java | 47 +++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 7cc1021..5acfe11 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -420,6 +420,33 @@ private void initialize() { registerActiveQueueRegistrationRefresh(); registerQueueCheck(); registerMetricsGathering(configuration); + registerNotexpiredQueueCheck(); + } + + private void registerNotexpiredQueueCheck() { + vertx.setPeriodic(20 * 1000, event -> { + if (!log.isDebugEnabled()) { + return; + } + String keysPattern = consumersPrefix + "*"; + log.debug("RedisQues list not expired consumers keys:"); + redisProvider.redis().onSuccess(redisAPI -> redisAPI.keys(keysPattern, keysResult -> { + if (keysResult.failed() || keysResult.result() == null) { + log.error("Unable to get redis keys of consumers", keysResult.cause()); + return; + } + Response keys = keysResult.result(); + if (keys == null || keys.size() == 0) { + log.debug("No consumers found to reset"); + return; + } + for (Response response : keys) { + log.debug(response.toString()); + } + log.debug("Key list end"); + })) + .onFailure(throwable -> log.error("Redis: Unable to get redis keys of consumers", throwable)); + }); } private void registerMetricsGathering(RedisquesConfiguration configuration){ @@ -485,7 +512,7 @@ public void run() { startEpochMs = currentTimeMillis(); if (size > 5_000) log.warn("Going to report {} dequeue statistics towards collector", size); else if (size > 500) log.info("Going to report {} dequeue statistics towards collector", size); - else log.debug("Going to report {} dequeue statistics towards collector", size); + else log.trace("Going to report {} dequeue statistics towards collector", size); } catch (Throwable ex) { isRunning.set(false); throw ex; @@ -519,7 +546,9 @@ void resume() { if (event.failed()) { log.error("publishing dequeue statistics not complete, just continue", event.cause()); } - log.debug("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs); + if (log.isTraceEnabled()){ + log.trace("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs); + } isRunning.set(false); }); } catch (Throwable ex) { @@ -828,7 +857,7 @@ private Future consume(final String queueName) { // consumer was restarted log.warn("Received request to consume from a queue I did not know about: {}", queueName); } - log.debug("RedisQues Starting to consume queue {}", queueName); + log.trace("RedisQues Starting to consume queue {}", queueName); readQueue(queueName).onComplete(readQueueEvent -> { if (readQueueEvent.failed()) { log.warn("TODO error handling", exceptionFactory.newException( @@ -837,12 +866,12 @@ private Future consume(final String queueName) { promise.complete(); }); } else { - log.debug("RedisQues Queue {} is already being consumed", queueName); + log.trace("RedisQues Queue {} is already being consumed", queueName); promise.complete(); } } else { // Somehow registration changed. Let's renotify. - log.debug("Registration for queue {} has changed to {}", queueName, consumer); + log.trace("Registration for queue {} has changed to {}", queueName, consumer); myQueues.remove(queueName); notifyConsumer(queueName).onComplete(notifyConsumerEvent -> { if (notifyConsumerEvent.failed()) { @@ -961,7 +990,7 @@ private Future readQueue(final String queueName) { // Failed. Message will be kept in queue and retried later log.debug("RedisQues Processing failed for queue {}", queueName); // reschedule - log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval); + log.trace("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval); rescheduleSendMessageAfterFailure(queueName, retryInterval); promise.complete(); } @@ -1159,7 +1188,9 @@ private Future checkQueues() { redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p); return p.future(); }).compose((Response queues) -> { - log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs); + if (log.isDebugEnabled()) { + log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs); + } assert ctx.counter == null; assert ctx.iter == null; ctx.counter = new AtomicInteger(queues.size()); @@ -1201,7 +1232,7 @@ private Future checkQueues() { return; } if (event.result().toLong() == 1) { - log.debug("Updating queue timestamp for queue '{}'", queueName); + log.trace("Updating queue timestamp for queue '{}'", queueName); // If not empty, update the queue timestamp to keep it in the sorted set. updateTimestamp(queueName, upTsResult -> { if (upTsResult.failed()) { From 9ad4ad03d9fbb69ee450f896462e35da84061055 Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Thu, 7 Nov 2024 15:57:41 +0700 Subject: [PATCH 2/3] logging optimization --- .../java/org/swisspush/redisques/RedisQues.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 5acfe11..4f7da1e 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -420,10 +420,10 @@ private void initialize() { registerActiveQueueRegistrationRefresh(); registerQueueCheck(); registerMetricsGathering(configuration); - registerNotexpiredQueueCheck(); + registerNotExpiredQueueCheck(); } - private void registerNotexpiredQueueCheck() { + private void registerNotExpiredQueueCheck() { vertx.setPeriodic(20 * 1000, event -> { if (!log.isDebugEnabled()) { return; @@ -437,13 +437,16 @@ private void registerNotexpiredQueueCheck() { } Response keys = keysResult.result(); if (keys == null || keys.size() == 0) { - log.debug("No consumers found to reset"); + log.debug("0 not expired consumers keys found"); return; } - for (Response response : keys) { - log.debug(response.toString()); + + if (log.isTraceEnabled()) { + for (Response response : keys) { + log.trace(response.toString()); + } } - log.debug("Key list end"); + log.debug("{} not expired consumers keys found", keys.size()); })) .onFailure(throwable -> log.error("Redis: Unable to get redis keys of consumers", throwable)); }); From 1bd8e963a87752f277dda4048cd62233b7d92988 Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Mon, 11 Nov 2024 09:20:31 +0700 Subject: [PATCH 3/3] use scan to get the keys --- src/main/java/org/swisspush/redisques/RedisQues.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 4f7da1e..1566336 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -430,12 +430,12 @@ private void registerNotExpiredQueueCheck() { } String keysPattern = consumersPrefix + "*"; log.debug("RedisQues list not expired consumers keys:"); - redisProvider.redis().onSuccess(redisAPI -> redisAPI.keys(keysPattern, keysResult -> { - if (keysResult.failed() || keysResult.result() == null) { + redisProvider.redis().onSuccess(redisAPI -> redisAPI.scan(Arrays.asList("0", "MATCH", keysPattern, "COUNT", "1000"), keysResult -> { + if (keysResult.failed() || keysResult.result() == null || keysResult.result().size() != 2) { log.error("Unable to get redis keys of consumers", keysResult.cause()); return; } - Response keys = keysResult.result(); + Response keys = keysResult.result().get(1); if (keys == null || keys.size() == 0) { log.debug("0 not expired consumers keys found"); return; @@ -446,7 +446,7 @@ private void registerNotExpiredQueueCheck() { log.trace(response.toString()); } } - log.debug("{} not expired consumers keys found", keys.size()); + log.debug("{} not expired consumers keys found, {} keys in myQueues list", keys.size(), myQueues.size()); })) .onFailure(throwable -> log.error("Redis: Unable to get redis keys of consumers", throwable)); });