diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 11bcf2d..fc18d8f 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -24,6 +24,7 @@ import org.swisspush.redisques.action.QueueAction; import org.swisspush.redisques.exception.RedisQuesExceptionFactory; import org.swisspush.redisques.handler.RedisquesHttpRequestHandler; +import org.swisspush.redisques.metrics.PeriodicMetricsCollector; import org.swisspush.redisques.performance.UpperBoundParallel; import org.swisspush.redisques.scheduling.PeriodicSkipScheduler; import org.swisspush.redisques.util.*; @@ -331,6 +332,18 @@ private void handleRegistrationRequest(Message msg) { }); } + private void initMicrometerMetrics(RedisquesConfiguration modConfig) { + if(meterRegistry == null) { + meterRegistry = BackendRegistries.getDefaultNow(); + } + dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId()) + .description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry); + + String address = modConfig.getAddress(); + int metricRefreshPeriod = modConfig.getMetricRefreshPeriod(); + new PeriodicMetricsCollector(vertx, address, meterRegistry, metricRefreshPeriod); + } + @Override public void start(Promise promise) { log.info("Started with UID {}", uid); @@ -347,11 +360,7 @@ public void start(Promise promise) { log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration()); if(configurationProvider.configuration().getMicrometerMetricsEnabled()) { - if(meterRegistry == null) { - meterRegistry = BackendRegistries.getDefaultNow(); - } - dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId()) - .description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry); + initMicrometerMetrics(modConfig); } int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec(); diff --git a/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java b/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java new file mode 100644 index 0000000..1a8555e --- /dev/null +++ b/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java @@ -0,0 +1,54 @@ +package org.swisspush.redisques.metrics; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.redisques.util.MetricMeter; + +import java.util.concurrent.atomic.AtomicLong; + +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.VALUE; +import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesCountOperation; + + +public class PeriodicMetricsCollector { + + private static final Logger log = LoggerFactory.getLogger(PeriodicMetricsCollector.class); + private final Vertx vertx; + private final String redisquesAddress; + + private final AtomicLong activeQueuesCount = new AtomicLong(0); + + public PeriodicMetricsCollector(Vertx vertx, String redisquesAddress, MeterRegistry meterRegistry, long metricCollectIntervalSec) { + this.vertx = vertx; + this.redisquesAddress = redisquesAddress; + + Gauge.builder(MetricMeter.ACTIVE_QUEUES.getId(), activeQueuesCount, AtomicLong::get). + description(MetricMeter.ACTIVE_QUEUES.getDescription()). + register(meterRegistry); + + vertx.setPeriodic(metricCollectIntervalSec * 1000, event -> updateActiveQueuesCount()); + } + + private void updateActiveQueuesCount() { + vertx.eventBus().request(redisquesAddress, buildGetQueuesCountOperation(), (Handler>>) reply -> { + if(reply.failed()) { + log.warn("TODO error handling", reply.cause()); + } else if (reply.succeeded() && OK.equals(reply.result().body().getString(STATUS))) { + activeQueuesCount.set(reply.result().body().getLong(VALUE)); + } else { + log.warn("Error gathering count of active queues. Cause: {}", reply.result().body().getString(MESSAGE)); + } + }); + } + +} diff --git a/src/main/java/org/swisspush/redisques/util/MetricMeter.java b/src/main/java/org/swisspush/redisques/util/MetricMeter.java index 3f186b6..66f0b20 100644 --- a/src/main/java/org/swisspush/redisques/util/MetricMeter.java +++ b/src/main/java/org/swisspush/redisques/util/MetricMeter.java @@ -2,9 +2,10 @@ public enum MetricMeter { - ENQUEUE_SUCCESS("redisques.enqueue.success", "Overall count of queue items to be enqueued sucessfully"), + ENQUEUE_SUCCESS("redisques.enqueue.success", "Overall count of queue items to be enqueued successfully"), ENQUEUE_FAIL("redisques.enqueue.fail", "Overall count of queue items which could not be enqueued"), - DEQUEUE("redisques.dequeue", "Overall count of queue items to be dequeued from the queues"); + DEQUEUE("redisques.dequeue", "Overall count of queue items to be dequeued from the queues"), + ACTIVE_QUEUES("redisques.active.queues", "Count of active queues"); private final String id; private final String description;