Skip to content

Commit

Permalink
#224 Added metric of active queue count
Browse files Browse the repository at this point in the history
  • Loading branch information
mcweba committed Nov 27, 2024
1 parent 0f805be commit a1f49c4
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 7 deletions.
19 changes: 14 additions & 5 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.swisspush.redisques.action.QueueAction;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.handler.RedisquesHttpRequestHandler;
import org.swisspush.redisques.metrics.PeriodicMetricsCollector;
import org.swisspush.redisques.performance.UpperBoundParallel;
import org.swisspush.redisques.scheduling.PeriodicSkipScheduler;
import org.swisspush.redisques.util.*;
Expand Down Expand Up @@ -331,6 +332,18 @@ private void handleRegistrationRequest(Message<String> msg) {
});
}

private void initMicrometerMetrics(RedisquesConfiguration modConfig) {
if(meterRegistry == null) {
meterRegistry = BackendRegistries.getDefaultNow();
}
dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId())
.description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry);

String address = modConfig.getAddress();
int metricRefreshPeriod = modConfig.getMetricRefreshPeriod();
new PeriodicMetricsCollector(vertx, address, meterRegistry, metricRefreshPeriod);
}

@Override
public void start(Promise<Void> promise) {
log.info("Started with UID {}", uid);
Expand All @@ -347,11 +360,7 @@ public void start(Promise<Void> promise) {
log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration());

if(configurationProvider.configuration().getMicrometerMetricsEnabled()) {
if(meterRegistry == null) {
meterRegistry = BackendRegistries.getDefaultNow();
}
dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId())
.description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry);
initMicrometerMetrics(modConfig);
}

int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package org.swisspush.redisques.metrics;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.util.MetricMeter;

import java.util.concurrent.atomic.AtomicLong;

import static org.swisspush.redisques.util.RedisquesAPI.STATUS;
import static org.swisspush.redisques.util.RedisquesAPI.OK;
import static org.swisspush.redisques.util.RedisquesAPI.VALUE;
import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesCountOperation;


public class PeriodicMetricsCollector {

private static final Logger log = LoggerFactory.getLogger(PeriodicMetricsCollector.class);
private final Vertx vertx;
private final String redisquesAddress;

private final AtomicLong activeQueuesCount = new AtomicLong(0);

public PeriodicMetricsCollector(Vertx vertx, String redisquesAddress, MeterRegistry meterRegistry, long metricCollectIntervalSec) {
this.vertx = vertx;
this.redisquesAddress = redisquesAddress;

Gauge.builder(MetricMeter.ACTIVE_QUEUES.getId(), activeQueuesCount, AtomicLong::get).
description(MetricMeter.ACTIVE_QUEUES.getDescription()).
register(meterRegistry);

vertx.setPeriodic(metricCollectIntervalSec * 1000, event -> updateActiveQueuesCount());
}

private void updateActiveQueuesCount() {
vertx.eventBus().request(redisquesAddress, buildGetQueuesCountOperation(), (Handler<AsyncResult<Message<JsonObject>>>) reply -> {
if(reply.failed()) {
log.warn("TODO error handling", reply.cause());
} else if (reply.succeeded() && OK.equals(reply.result().body().getString(STATUS))) {
activeQueuesCount.set(reply.result().body().getLong(VALUE));
} else {
log.warn("Error gathering count of active queues. Cause: {}", reply.result().body().getString(MESSAGE));
}
});
}

}
5 changes: 3 additions & 2 deletions src/main/java/org/swisspush/redisques/util/MetricMeter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@

public enum MetricMeter {

ENQUEUE_SUCCESS("redisques.enqueue.success", "Overall count of queue items to be enqueued sucessfully"),
ENQUEUE_SUCCESS("redisques.enqueue.success", "Overall count of queue items to be enqueued successfully"),
ENQUEUE_FAIL("redisques.enqueue.fail", "Overall count of queue items which could not be enqueued"),
DEQUEUE("redisques.dequeue", "Overall count of queue items to be dequeued from the queues");
DEQUEUE("redisques.dequeue", "Overall count of queue items to be dequeued from the queues"),
ACTIVE_QUEUES("redisques.active.queues", "Count of active queues");

private final String id;
private final String description;
Expand Down

0 comments on commit a1f49c4

Please sign in to comment.