Skip to content

Commit

Permalink
logging optimization and logs not expired keys
Browse files Browse the repository at this point in the history
  • Loading branch information
Xin Zheng committed Nov 7, 2024
1 parent c94186c commit 74ddb7e
Showing 1 changed file with 39 additions and 8 deletions.
47 changes: 39 additions & 8 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,33 @@ private void initialize() {
registerActiveQueueRegistrationRefresh();
registerQueueCheck();
registerMetricsGathering(configuration);
registerNotexpiredQueueCheck();
}

private void registerNotexpiredQueueCheck() {
vertx.setPeriodic(20 * 1000, event -> {
if (!log.isDebugEnabled()) {
return;
}
String keysPattern = consumersPrefix + "*";
log.debug("RedisQues list not expired consumers keys:");
redisProvider.redis().onSuccess(redisAPI -> redisAPI.keys(keysPattern, keysResult -> {
if (keysResult.failed() || keysResult.result() == null) {
log.error("Unable to get redis keys of consumers", keysResult.cause());
return;
}
Response keys = keysResult.result();
if (keys == null || keys.size() == 0) {
log.debug("No consumers found to reset");
return;
}
for (Response response : keys) {
log.debug(response.toString());
}
log.debug("Key list end");
}))
.onFailure(throwable -> log.error("Redis: Unable to get redis keys of consumers", throwable));
});
}

private void registerMetricsGathering(RedisquesConfiguration configuration){
Expand Down Expand Up @@ -485,7 +512,7 @@ public void run() {
startEpochMs = currentTimeMillis();
if (size > 5_000) log.warn("Going to report {} dequeue statistics towards collector", size);
else if (size > 500) log.info("Going to report {} dequeue statistics towards collector", size);
else log.debug("Going to report {} dequeue statistics towards collector", size);
else log.trace("Going to report {} dequeue statistics towards collector", size);
} catch (Throwable ex) {
isRunning.set(false);
throw ex;
Expand Down Expand Up @@ -519,7 +546,9 @@ void resume() {
if (event.failed()) {
log.error("publishing dequeue statistics not complete, just continue", event.cause());
}
log.debug("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs);
if (log.isTraceEnabled()){
log.trace("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs);
}
isRunning.set(false);
});
} catch (Throwable ex) {
Expand Down Expand Up @@ -828,7 +857,7 @@ private Future<Void> consume(final String queueName) {
// consumer was restarted
log.warn("Received request to consume from a queue I did not know about: {}", queueName);
}
log.debug("RedisQues Starting to consume queue {}", queueName);
log.trace("RedisQues Starting to consume queue {}", queueName);
readQueue(queueName).onComplete(readQueueEvent -> {
if (readQueueEvent.failed()) {
log.warn("TODO error handling", exceptionFactory.newException(
Expand All @@ -837,12 +866,12 @@ private Future<Void> consume(final String queueName) {
promise.complete();
});
} else {
log.debug("RedisQues Queue {} is already being consumed", queueName);
log.trace("RedisQues Queue {} is already being consumed", queueName);
promise.complete();
}
} else {
// Somehow registration changed. Let's renotify.
log.debug("Registration for queue {} has changed to {}", queueName, consumer);
log.trace("Registration for queue {} has changed to {}", queueName, consumer);
myQueues.remove(queueName);
notifyConsumer(queueName).onComplete(notifyConsumerEvent -> {
if (notifyConsumerEvent.failed()) {
Expand Down Expand Up @@ -961,7 +990,7 @@ private Future<Void> readQueue(final String queueName) {
// Failed. Message will be kept in queue and retried later
log.debug("RedisQues Processing failed for queue {}", queueName);
// reschedule
log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval);
log.trace("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval);
rescheduleSendMessageAfterFailure(queueName, retryInterval);
promise.complete();
}
Expand Down Expand Up @@ -1159,7 +1188,9 @@ private Future<Void> checkQueues() {
redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p);
return p.future();
}).compose((Response queues) -> {
log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs);
if (log.isDebugEnabled()) {
log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs);
}
assert ctx.counter == null;
assert ctx.iter == null;
ctx.counter = new AtomicInteger(queues.size());
Expand Down Expand Up @@ -1201,7 +1232,7 @@ private Future<Void> checkQueues() {
return;
}
if (event.result().toLong() == 1) {
log.debug("Updating queue timestamp for queue '{}'", queueName);
log.trace("Updating queue timestamp for queue '{}'", queueName);
// If not empty, update the queue timestamp to keep it in the sorted set.
updateTimestamp(queueName, upTsResult -> {
if (upTsResult.failed()) {
Expand Down

0 comments on commit 74ddb7e

Please sign in to comment.