Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log number of queues locked on a per RedisQues Verticle instance base #219

Merged
merged 4 commits into from
Nov 12, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,36 @@ private void initialize() {
registerActiveQueueRegistrationRefresh();
registerQueueCheck();
registerMetricsGathering(configuration);
registerNotExpiredQueueCheck();
}

private void registerNotExpiredQueueCheck() {
vertx.setPeriodic(20 * 1000, event -> {
if (!log.isDebugEnabled()) {
return;
}
String keysPattern = consumersPrefix + "*";
log.debug("RedisQues list not expired consumers keys:");
redisProvider.redis().onSuccess(redisAPI -> redisAPI.keys(keysPattern, keysResult -> {
dominik-cnx marked this conversation as resolved.
Show resolved Hide resolved
if (keysResult.failed() || keysResult.result() == null) {
log.error("Unable to get redis keys of consumers", keysResult.cause());
return;
}
Response keys = keysResult.result();
if (keys == null || keys.size() == 0) {
log.debug("0 not expired consumers keys found");
return;
}

if (log.isTraceEnabled()) {
for (Response response : keys) {
log.trace(response.toString());
}
}
log.debug("{} not expired consumers keys found", keys.size());
}))
.onFailure(throwable -> log.error("Redis: Unable to get redis keys of consumers", throwable));
});
}

private void registerMetricsGathering(RedisquesConfiguration configuration){
Expand Down Expand Up @@ -485,7 +515,7 @@ public void run() {
startEpochMs = currentTimeMillis();
if (size > 5_000) log.warn("Going to report {} dequeue statistics towards collector", size);
else if (size > 500) log.info("Going to report {} dequeue statistics towards collector", size);
else log.debug("Going to report {} dequeue statistics towards collector", size);
else log.trace("Going to report {} dequeue statistics towards collector", size);
} catch (Throwable ex) {
isRunning.set(false);
throw ex;
Expand Down Expand Up @@ -519,7 +549,9 @@ void resume() {
if (event.failed()) {
log.error("publishing dequeue statistics not complete, just continue", event.cause());
}
log.debug("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs);
if (log.isTraceEnabled()){
log.trace("Done publishing {} dequeue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs);
}
isRunning.set(false);
});
} catch (Throwable ex) {
Expand Down Expand Up @@ -828,7 +860,7 @@ private Future<Void> consume(final String queueName) {
// consumer was restarted
log.warn("Received request to consume from a queue I did not know about: {}", queueName);
}
log.debug("RedisQues Starting to consume queue {}", queueName);
log.trace("RedisQues Starting to consume queue {}", queueName);
readQueue(queueName).onComplete(readQueueEvent -> {
if (readQueueEvent.failed()) {
log.warn("TODO error handling", exceptionFactory.newException(
Expand All @@ -837,12 +869,12 @@ private Future<Void> consume(final String queueName) {
promise.complete();
});
} else {
log.debug("RedisQues Queue {} is already being consumed", queueName);
log.trace("RedisQues Queue {} is already being consumed", queueName);
promise.complete();
}
} else {
// Somehow registration changed. Let's renotify.
log.debug("Registration for queue {} has changed to {}", queueName, consumer);
log.trace("Registration for queue {} has changed to {}", queueName, consumer);
myQueues.remove(queueName);
notifyConsumer(queueName).onComplete(notifyConsumerEvent -> {
if (notifyConsumerEvent.failed()) {
Expand Down Expand Up @@ -961,7 +993,7 @@ private Future<Void> readQueue(final String queueName) {
// Failed. Message will be kept in queue and retried later
log.debug("RedisQues Processing failed for queue {}", queueName);
// reschedule
log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval);
log.trace("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval);
rescheduleSendMessageAfterFailure(queueName, retryInterval);
promise.complete();
}
Expand Down Expand Up @@ -1159,7 +1191,9 @@ private Future<Void> checkQueues() {
redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p);
return p.future();
}).compose((Response queues) -> {
log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs);
if (log.isDebugEnabled()) {
log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs);
}
assert ctx.counter == null;
assert ctx.iter == null;
ctx.counter = new AtomicInteger(queues.size());
Expand Down Expand Up @@ -1201,7 +1235,7 @@ private Future<Void> checkQueues() {
return;
}
if (event.result().toLong() == 1) {
log.debug("Updating queue timestamp for queue '{}'", queueName);
log.trace("Updating queue timestamp for queue '{}'", queueName);
// If not empty, update the queue timestamp to keep it in the sorted set.
updateTimestamp(queueName, upTsResult -> {
if (upTsResult.failed()) {
Expand Down
Loading