diff --git a/README.md b/README.md index f37a480..3689c04 100644 --- a/README.md +++ b/README.md @@ -1020,7 +1020,7 @@ Available url parameters are: * _filter=_: Filter the queues for which the cumulated speed is evaluated The result will be a json object with the speed of the last measurement period calculated -over all queues matching the given filter regex. Additionally the used measurement time in seconds +over all queues matching the given filter regex. Additionally, the used measurement time in seconds is returned (eg. 60 seconds by default) ```json @@ -1047,7 +1047,7 @@ The collected metrics include: | Metric name | Description | |:--------------------------------|:------------------------------------------------------------| | redisques_enqueue_success_total | Overall count of queue items to be enqueued | -| redisques_enqueue_fail_total | Overall count of queue items to be enqueued | +| redisques_enqueue_fail_total | Overall count of failing enqueues | | redisques_dequeue_total | Overall count of queue items to be dequeued from the queues | | redisques_active_queues | Overall count of active queues | | redisques_max_queue_size | Amount of queue items of the biggest queue | diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index c8cd546..2667e0a 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -393,7 +393,7 @@ private void initMicrometerMetrics(RedisquesConfiguration modConfig) { String address = modConfig.getAddress(); int metricRefreshPeriod = modConfig.getMetricRefreshPeriod(); - new PeriodicMetricsCollector(vertx, address, meterRegistry, metricRefreshPeriod); + new PeriodicMetricsCollector(vertx, periodicSkipScheduler, address, meterRegistry, metricRefreshPeriod); } private void initialize() { diff --git a/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java b/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java index 1a8555e..224a100 100644 --- a/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java +++ b/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java @@ -9,6 +9,7 @@ import io.vertx.core.json.JsonObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.swisspush.redisques.scheduling.PeriodicSkipScheduler; import org.swisspush.redisques.util.MetricMeter; import java.util.concurrent.atomic.AtomicLong; @@ -28,7 +29,7 @@ public class PeriodicMetricsCollector { private final AtomicLong activeQueuesCount = new AtomicLong(0); - public PeriodicMetricsCollector(Vertx vertx, String redisquesAddress, MeterRegistry meterRegistry, long metricCollectIntervalSec) { + public PeriodicMetricsCollector(Vertx vertx, PeriodicSkipScheduler periodicSkipScheduler, String redisquesAddress, MeterRegistry meterRegistry, long metricCollectIntervalSec) { this.vertx = vertx; this.redisquesAddress = redisquesAddress; @@ -36,10 +37,11 @@ public PeriodicMetricsCollector(Vertx vertx, String redisquesAddress, MeterRegis description(MetricMeter.ACTIVE_QUEUES.getDescription()). register(meterRegistry); - vertx.setPeriodic(metricCollectIntervalSec * 1000, event -> updateActiveQueuesCount()); + periodicSkipScheduler.setPeriodic(metricCollectIntervalSec * 1000, "metricCollectRefresh", + this::updateActiveQueuesCount); } - private void updateActiveQueuesCount() { + private void updateActiveQueuesCount(Runnable onPeriodicDone) { vertx.eventBus().request(redisquesAddress, buildGetQueuesCountOperation(), (Handler>>) reply -> { if(reply.failed()) { log.warn("TODO error handling", reply.cause()); @@ -48,6 +50,7 @@ private void updateActiveQueuesCount() { } else { log.warn("Error gathering count of active queues. Cause: {}", reply.result().body().getString(MESSAGE)); } + onPeriodicDone.run(); }); }