From 0f805be7f76e850a6cce840ae2abf34cdaec6f3b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Weber?= Date: Wed, 27 Nov 2024 11:15:08 +0100 Subject: [PATCH 1/5] #224 Added metrics for enqueue and dequeue --- README.md | 40 ++++++++++++++++- pom.xml | 11 +++++ .../org/swisspush/redisques/RedisQues.java | 45 +++++++++++++++++-- .../redisques/action/EnqueueAction.java | 32 ++++++++++--- .../redisques/action/LockedEnqueueAction.java | 12 +++-- .../swisspush/redisques/util/MetricMeter.java | 24 ++++++++++ .../redisques/util/QueueActionFactory.java | 10 +++-- .../util/RedisquesConfiguration.java | 40 ++++++++++++----- .../swisspush/redisques/RedisQuesTest.java | 31 ++++++++++++- .../redisques/action/EnqueueActionTest.java | 25 ++++++++++- .../action/LockedEnqueueActionTest.java | 17 ++++++- .../util/RedisquesConfigurationTest.java | 9 ++++ 12 files changed, 263 insertions(+), 33 deletions(-) create mode 100644 src/main/java/org/swisspush/redisques/util/MetricMeter.java diff --git a/README.md b/README.md index cb66cf44..9e8913ee 100644 --- a/README.md +++ b/README.md @@ -66,6 +66,7 @@ The following configuration values are available: | redisReconnectAttempts | 0 | The amount of attempts to reconnect when redis connection is lost. Use **0** to not reconnect at all or **-1** to reconnect indefinitely. | | redisReconnectDelaySec | 30 | The interval [s] to attempt to reconnect when redis connection is lost. | | redisPoolRecycleTimeoutMs | 180000 | The timeout [ms] when the connection pool is recycled. Use **-1** when having reconnect feature enabled. | +| micrometerMetricsEnabled | false | Enable / disable collection of metrics using micrometer | | httpRequestHandlerEnabled | false | Enable / disable the HTTP API | | httpRequestHandlerAuthenticationEnabled | false | Enable / disable authentication for the HTTP API | | httpRequestHandlerUsername | | The username for the HTTP API authentication | @@ -630,7 +631,6 @@ Response Data } ``` - #### getQueuesSpeed Request Data @@ -651,7 +651,6 @@ Response Data } ``` - ## RedisQues HTTP API RedisQues provides a HTTP API to modify queues, queue items and get information about queue counts and queue item counts. @@ -1040,6 +1039,43 @@ SemaphoreConfig semaphoreConfig = new SemaphoreConfig().setInitialPermits(1).set hazelcastConfig.getCPSubsystemConfig().addSemaphoreConfig(semaphoreConfig); ``` +## Metric collection +Besides the API, redisques provides some key metrics collected by [micrometer.io](https://micrometer.io/). + +The collected metrics include: + +| Metric name | Description | +|:--------------------------------|:------------------------------------------------------------| +| redisques_enqueue_success_total | Overall count of queue items to be enqueued | +| redisques_enqueue_fail_total | Overall count of queue items to be enqueued | +| redisques_dequeue_total | Overall count of queue items to be dequeued from the queues | + +### Testing locally +When you include redisques in you project, you probably already have the configuration for publishing the metrics. + +To export the metrics locally you have to add this dependency to the `pom.xml` + +``` + + io.micrometer + micrometer-registry-prometheus + ${micrometer.version} + +``` +Also add the micrometer configuration to `RedisQuesRunner` class like this: + +```java +MicrometerMetricsOptions options = new MicrometerMetricsOptions() + .setPrometheusOptions(new VertxPrometheusOptions() + .setStartEmbeddedServer(true) + .setEmbeddedServerOptions(new HttpServerOptions().setPort(9101)) + .setEnabled(true)) + .setEnabled(true); +Vertx vertx = Vertx.vertx(new VertxOptions().setMetricsOptions(options)); +``` +Using the configuration above, the metrics can be accessed with + +> GET http://localhost:9101/metrics ## Dependencies diff --git a/pom.xml b/pom.xml index 8dccbc5e..ed427166 100644 --- a/pom.xml +++ b/pom.xml @@ -65,6 +65,16 @@ vertx-web ${vertx.version} + + io.vertx + vertx-micrometer-metrics + ${vertx.version} + + + io.micrometer + micrometer-core + ${micrometer.version} + org.slf4j slf4j-api @@ -390,6 +400,7 @@ 4.5.2 + 1.12.13 2.0.10 5.8.0 4.13.2 diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index fe375ebf..11bcf2d5 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -1,6 +1,8 @@ package org.swisspush.redisques; import com.google.common.base.Strings; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.AbstractVerticle; import io.vertx.core.AsyncResult; import io.vertx.core.CompositeFuture; @@ -13,6 +15,7 @@ import io.vertx.core.eventbus.MessageConsumer; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; +import io.vertx.micrometer.backends.BackendRegistries; import io.vertx.redis.client.Command; import io.vertx.redis.client.RedisAPI; import io.vertx.redis.client.Response; @@ -84,6 +87,7 @@ public static class RedisQuesBuilder { private RedisquesConfigurationProvider configurationProvider; private RedisProvider redisProvider; private RedisQuesExceptionFactory exceptionFactory; + private MeterRegistry meterRegistry; private Semaphore redisMonitoringReqQuota; private Semaphore checkQueueRequestsQuota; private Semaphore queueStatsRequestQuota; @@ -113,6 +117,11 @@ public RedisQuesBuilder withExceptionFactory(RedisQuesExceptionFactory exception return this; } + public RedisQuesBuilder withMeterRegistry(MeterRegistry meterRegistry) { + this.meterRegistry = meterRegistry; + return this; + } + /** * How many redis requests monitoring related component will trigger * simultaneously. One of those components for example is @@ -172,7 +181,7 @@ public RedisQues build() { } return new RedisQues(memoryUsageProvider, configurationProvider, redisProvider, exceptionFactory, redisMonitoringReqQuota, checkQueueRequestsQuota, queueStatsRequestQuota, - getQueuesItemsCountRedisRequestQuota); + getQueuesItemsCountRedisRequestQuota, meterRegistry); } } @@ -218,6 +227,9 @@ private enum QueueState { private RedisquesConfigurationProvider configurationProvider; private RedisMonitor redisMonitor; + private MeterRegistry meterRegistry = null; + private Counter dequeueCounter; + private Map queueActions = new HashMap<>(); private Map dequeueStatistic = new ConcurrentHashMap<>(); @@ -235,6 +247,20 @@ public RedisQues() { log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE); } + public RedisQues( + MemoryUsageProvider memoryUsageProvider, + RedisquesConfigurationProvider configurationProvider, + RedisProvider redisProvider, + RedisQuesExceptionFactory exceptionFactory, + Semaphore redisMonitoringReqQuota, + Semaphore checkQueueRequestsQuota, + Semaphore queueStatsRequestQuota, + Semaphore getQueuesItemsCountRedisRequestQuota + ) { + this(memoryUsageProvider, configurationProvider, redisProvider, exceptionFactory, redisMonitoringReqQuota, + checkQueueRequestsQuota, queueStatsRequestQuota, getQueuesItemsCountRedisRequestQuota, null); + } + public RedisQues( MemoryUsageProvider memoryUsageProvider, RedisquesConfigurationProvider configurationProvider, @@ -243,7 +269,8 @@ public RedisQues( Semaphore redisMonitoringReqQuota, Semaphore checkQueueRequestsQuota, Semaphore queueStatsRequestQuota, - Semaphore getQueuesItemsCountRedisRequestQuota + Semaphore getQueuesItemsCountRedisRequestQuota, + MeterRegistry meterRegistry ) { this.memoryUsageProvider = memoryUsageProvider; this.configurationProvider = configurationProvider; @@ -253,6 +280,7 @@ public RedisQues( this.checkQueueRequestsQuota = checkQueueRequestsQuota; this.queueStatsRequestQuota = queueStatsRequestQuota; this.getQueuesItemsCountRedisRequestQuota = getQueuesItemsCountRedisRequestQuota; + this.meterRegistry = meterRegistry; } public static RedisQuesBuilder builder() { @@ -318,6 +346,14 @@ public void start(Promise promise) { RedisquesConfiguration modConfig = configurationProvider.configuration(); log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration()); + if(configurationProvider.configuration().getMicrometerMetricsEnabled()) { + if(meterRegistry == null) { + meterRegistry = BackendRegistries.getDefaultNow(); + } + dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId()) + .description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry); + } + int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec(); if (modConfig.isDequeueStatsEnabled()) { dequeueStatisticEnabled = true; @@ -371,7 +407,7 @@ private void initialize() { queueActionFactory = new QueueActionFactory( redisProvider, vertx, log, queuesKey, queuesPrefix, consumersPrefix, locksKey, memoryUsageProvider, queueStatisticsCollector, exceptionFactory, - configurationProvider, getQueuesItemsCountRedisRequestQuota); + configurationProvider, getQueuesItemsCountRedisRequestQuota, meterRegistry); queueActions.put(addQueueItem, queueActionFactory.buildQueueAction(addQueueItem)); queueActions.put(deleteQueueItem, queueActionFactory.buildQueueAction(deleteQueueItem)); @@ -950,6 +986,9 @@ private Future readQueue(final String queueName) { log.error("Failed to pop from queue '{}'", queueName, jsonAnswer.cause()); // We should return here. See: "https://softwareengineering.stackexchange.com/a/190535" } + if(dequeueCounter != null) { + dequeueCounter.increment(); + } log.debug("RedisQues Message removed, queue {} is ready again", queueName); myQueues.put(queueName, QueueState.READY); diff --git a/src/main/java/org/swisspush/redisques/action/EnqueueAction.java b/src/main/java/org/swisspush/redisques/action/EnqueueAction.java index 0fd33978..f2278657 100644 --- a/src/main/java/org/swisspush/redisques/action/EnqueueAction.java +++ b/src/main/java/org/swisspush/redisques/action/EnqueueAction.java @@ -1,14 +1,13 @@ package org.swisspush.redisques.action; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.Vertx; import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import org.slf4j.Logger; import org.swisspush.redisques.exception.RedisQuesExceptionFactory; -import org.swisspush.redisques.util.MemoryUsageProvider; -import org.swisspush.redisques.util.QueueConfiguration; -import org.swisspush.redisques.util.QueueStatisticsCollector; -import org.swisspush.redisques.util.RedisProvider; +import org.swisspush.redisques.util.*; import java.util.Arrays; import java.util.List; @@ -19,17 +18,24 @@ public class EnqueueAction extends AbstractQueueAction { private final MemoryUsageProvider memoryUsageProvider; private final int memoryUsageLimitPercent; + private Counter enqueueCounterSuccess; + private Counter enqueueCounterFail; public EnqueueAction( Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, String consumersPrefix, String locksKey, List queueConfigurations, RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log, - MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent + MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent, MeterRegistry meterRegistry ) { super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, exceptionFactory, queueStatisticsCollector, log); this.memoryUsageProvider = memoryUsageProvider; this.memoryUsageLimitPercent = memoryUsageLimitPercent; + + if(meterRegistry != null) { + enqueueCounterSuccess = Counter.builder(MetricMeter.ENQUEUE_SUCCESS.getId()).description(MetricMeter.ENQUEUE_SUCCESS.getDescription()).register(meterRegistry); + enqueueCounterFail = Counter.builder(MetricMeter.ENQUEUE_FAIL.getId()).description(MetricMeter.ENQUEUE_FAIL.getDescription()).register(meterRegistry); + } } @Override @@ -38,6 +44,7 @@ public void execute(Message event) { if (isMemoryUsageLimitReached()) { log.warn("Failed to enqueue into queue {} because the memory usage limit is reached", queueName); + incrEnqueueFailCount(); event.reply(createErrorReply().put(MESSAGE, MEMORY_FULL)); return; } @@ -61,6 +68,8 @@ public void execute(Message event) { reply.put(STATUS, OK); reply.put(MESSAGE, "enqueued"); + incrEnqueueSuccessCount(); + // feature EN-queue slow-down (the larger the queue the longer we delay "OK" response) long delayReplyMillis = 0; QueueConfiguration queueConfiguration = findQueueConfiguration(queueName); @@ -88,7 +97,20 @@ public void execute(Message event) { }); } + private void incrEnqueueSuccessCount() { + if(enqueueCounterSuccess != null) { + enqueueCounterSuccess.increment(); + } + } + + protected void incrEnqueueFailCount() { + if(enqueueCounterFail != null) { + enqueueCounterFail.increment(); + } + } + private void replyError(Message event, String queueName, Throwable ex) { + incrEnqueueFailCount(); String message = "RedisQues QUEUE_ERROR: Error while enqueueing message into queue " + queueName; log.error(message, new Exception(ex)); JsonObject reply = new JsonObject(); diff --git a/src/main/java/org/swisspush/redisques/action/LockedEnqueueAction.java b/src/main/java/org/swisspush/redisques/action/LockedEnqueueAction.java index bc7412f1..bc510e3a 100644 --- a/src/main/java/org/swisspush/redisques/action/LockedEnqueueAction.java +++ b/src/main/java/org/swisspush/redisques/action/LockedEnqueueAction.java @@ -1,5 +1,6 @@ package org.swisspush.redisques.action; +import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.Vertx; import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; @@ -22,10 +23,10 @@ public LockedEnqueueAction(Vertx vertx, RedisProvider redisProvider, String consumersPrefix, String locksKey, List queueConfigurations, RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log, - MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent) { + MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent, MeterRegistry meterRegistry) { super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, exceptionFactory, queueStatisticsCollector, log, memoryUsageProvider, - memoryUsageLimitPercent); + memoryUsageLimitPercent, meterRegistry); } @Override @@ -34,6 +35,7 @@ public void execute(Message event) { String queueName = event.body().getJsonObject(PAYLOAD).getString(QUEUENAME); if (isMemoryUsageLimitReached()) { log.warn("Failed to lockedEnqueue into queue {} because the memory usage limit is reached", queueName); + incrEnqueueFailCount(); event.reply(createErrorReply().put(MESSAGE, MEMORY_FULL)); return; } @@ -47,16 +49,18 @@ public void execute(Message event) { } else { log.warn("RedisQues lockedEnqueue locking failed. Skip enqueue", new Exception(putLockResult.cause())); + incrEnqueueFailCount(); event.reply(createErrorReply()); } })); p.onFailure(ex -> { - log.warn("Redis: RedisQues lockedEnqueue locking failed. Skip enqueue", - new Exception(ex)); + log.warn("Redis: RedisQues lockedEnqueue locking failed. Skip enqueue", new Exception(ex)); + incrEnqueueFailCount(); event.reply(createErrorReply()); }); } else { log.warn("RedisQues lockedEnqueue failed because property '{}' was missing", REQUESTED_BY); + incrEnqueueFailCount(); event.reply(createErrorReply().put(MESSAGE, "Property '" + REQUESTED_BY + "' missing")); } } diff --git a/src/main/java/org/swisspush/redisques/util/MetricMeter.java b/src/main/java/org/swisspush/redisques/util/MetricMeter.java new file mode 100644 index 00000000..3f186b6c --- /dev/null +++ b/src/main/java/org/swisspush/redisques/util/MetricMeter.java @@ -0,0 +1,24 @@ +package org.swisspush.redisques.util; + +public enum MetricMeter { + + ENQUEUE_SUCCESS("redisques.enqueue.success", "Overall count of queue items to be enqueued sucessfully"), + ENQUEUE_FAIL("redisques.enqueue.fail", "Overall count of queue items which could not be enqueued"), + DEQUEUE("redisques.dequeue", "Overall count of queue items to be dequeued from the queues"); + + private final String id; + private final String description; + + MetricMeter(String id, String description) { + this.id = id; + this.description = description; + } + + public String getId() { + return id; + } + + public String getDescription() { + return description; + } +} diff --git a/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java b/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java index 930c8511..145e4da6 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java +++ b/src/main/java/org/swisspush/redisques/util/QueueActionFactory.java @@ -1,5 +1,6 @@ package org.swisspush.redisques.util; +import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.Vertx; import org.slf4j.Logger; import org.swisspush.redisques.action.*; @@ -21,6 +22,7 @@ public class QueueActionFactory { private final List queueConfigurations; private final QueueStatisticsCollector queueStatisticsCollector; private final int memoryUsageLimitPercent; + private final MeterRegistry meterRegistry; private final MemoryUsageProvider memoryUsageProvider; private final RedisQuesExceptionFactory exceptionFactory; private final Semaphore getQueuesItemsCountRedisRequestQuota; @@ -39,7 +41,8 @@ public QueueActionFactory( QueueStatisticsCollector queueStatisticsCollector, RedisQuesExceptionFactory exceptionFactory, RedisquesConfigurationProvider configurationProvider, - Semaphore getQueuesItemsCountRedisRequestQuota + Semaphore getQueuesItemsCountRedisRequestQuota, + MeterRegistry meterRegistry ) { this.redisProvider = redisProvider; this.vertx = vertx; @@ -56,6 +59,7 @@ public QueueActionFactory( this.queueConfigurations = configurationProvider.configuration().getQueueConfigurations(); this.memoryUsageLimitPercent = configurationProvider.configuration().getMemoryUsageLimitPercent(); this.getQueuesItemsCountRedisRequestQuota = getQueuesItemsCountRedisRequestQuota; + this.meterRegistry = meterRegistry; } public QueueAction buildQueueAction(RedisquesAPI.QueueOperation queueOperation){ @@ -97,11 +101,11 @@ public QueueAction buildQueueAction(RedisquesAPI.QueueOperation queueOperation){ case enqueue: return new EnqueueAction(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, exceptionFactory, queueStatisticsCollector, log, - memoryUsageProvider, memoryUsageLimitPercent); + memoryUsageProvider, memoryUsageLimitPercent, meterRegistry); case lockedEnqueue: return new LockedEnqueueAction(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, exceptionFactory, queueStatisticsCollector, log, - memoryUsageProvider, memoryUsageLimitPercent); + memoryUsageProvider, memoryUsageLimitPercent, meterRegistry); case getLock: return new GetLockAction(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, exceptionFactory, queueStatisticsCollector, log); diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java index 00074def..368eee33 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java @@ -39,6 +39,7 @@ public class RedisquesConfiguration { private final long processorDelayMax; private final boolean httpRequestHandlerEnabled; private final boolean httpRequestHandlerAuthenticationEnabled; + private final boolean micrometerMetricsEnabled; private final String httpRequestHandlerPrefix; private final String httpRequestHandlerUsername; private final String httpRequestHandlerPassword; @@ -112,6 +113,7 @@ public class RedisquesConfiguration { public static final String PROP_PROCESSOR_DELAY_MAX = "processorDelayMax"; public static final String PROP_HTTP_REQUEST_HANDLER_ENABLED = "httpRequestHandlerEnabled"; public static final String PROP_HTTP_REQUEST_HANDLER_AUTH_ENABLED = "httpRequestHandlerAuthenticationEnabled"; + public static final String PROP_MICROMETER_METRICS_ENABLED = "micrometerMetricsEnabled"; public static final String PROP_HTTP_REQUEST_HANDLER_PREFIX = "httpRequestHandlerPrefix"; public static final String PROP_HTTP_REQUEST_HANDLER_USERNAME = "httpRequestHandlerUsername"; public static final String PROP_HTTP_REQUEST_HANDLER_PASSWORD = "httpRequestHandlerPassword"; @@ -146,7 +148,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod, String redisHost, int redisPort, String redisAuth, int checkInterval, int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled, - boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix, + boolean httpRequestHandlerAuthenticationEnabled, boolean micrometerMetricsEnabled, String httpRequestHandlerPrefix, String httpRequestHandlerUsername, String httpRequestHandlerPassword, Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { @@ -154,7 +156,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), RedisClientType.STANDALONE, redisAuth, null, null, false, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, - httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, + httpRequestHandlerAuthenticationEnabled, micrometerMetricsEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, httpRequestHandlerPassword, httpRequestHandlerPort, httpRequestHandlerUserHeader, queueConfigurations, enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE, DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT, @@ -171,7 +173,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod, String redisHost, int redisPort, String redisPassword, String redisUser, boolean redisEnableTls, int checkInterval, int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled, - boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix, + boolean httpRequestHandlerAuthenticationEnabled, boolean micrometerMetricsEnabled, String httpRequestHandlerPrefix, String httpRequestHandlerUsername, String httpRequestHandlerPassword, Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { @@ -179,7 +181,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), RedisClientType.STANDALONE, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, - httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, + httpRequestHandlerAuthenticationEnabled, micrometerMetricsEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, httpRequestHandlerPassword, httpRequestHandlerPort, httpRequestHandlerUserHeader, queueConfigurations, enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE, DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT, @@ -196,15 +198,14 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress String redisHost, int redisPort, RedisClientType redisClientType, String redisPassword, String redisUser, boolean redisEnableTls, int checkInterval, int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled, - boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix, + boolean httpRequestHandlerAuthenticationEnabled, boolean micrometerMetricsEnabled, String httpRequestHandlerPrefix, String httpRequestHandlerUsername, String httpRequestHandlerPassword, Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, - processorDelayMax, httpRequestHandlerEnabled, - httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, + processorDelayMax, httpRequestHandlerEnabled, httpRequestHandlerAuthenticationEnabled, micrometerMetricsEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, httpRequestHandlerPassword, httpRequestHandlerPort, httpRequestHandlerUserHeader, queueConfigurations, enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE, DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT, @@ -221,7 +222,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress int consumerLockMultiplier, String redisHost, int redisPort, RedisClientType redisClientType, String redisPassword, String redisUser, boolean redisEnableTls, int checkInterval, int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled, - boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix, + boolean httpRequestHandlerAuthenticationEnabled, boolean micrometerMetricsEnabled, String httpRequestHandlerPrefix, String httpRequestHandlerUsername, String httpRequestHandlerPassword, Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { @@ -229,7 +230,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress metricRefreshPeriod, refreshPeriod, consumerLockMultiplier, Collections.singletonList(redisHost), Collections.singletonList(redisPort), redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, - httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, + httpRequestHandlerAuthenticationEnabled, micrometerMetricsEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, httpRequestHandlerPassword, httpRequestHandlerPort, httpRequestHandlerUserHeader, queueConfigurations, enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE, DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT, @@ -243,8 +244,8 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres int consumerLockMultiplier, List redisHosts, List redisPorts, RedisClientType redisClientType, String redisAuth, String redisPassword, String redisUser, boolean redisEnableTls, int checkInterval, int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled, - boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix, - String httpRequestHandlerUsername, String httpRequestHandlerPassword, + boolean httpRequestHandlerAuthenticationEnabled, boolean micrometerMetricsEnabled, + String httpRequestHandlerPrefix, String httpRequestHandlerUsername, String httpRequestHandlerPassword, Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding, int maxPoolSize, int maxPoolWaitSize, int maxPipelineWaitSize, @@ -311,6 +312,7 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres this.httpRequestHandlerEnabled = httpRequestHandlerEnabled; this.httpRequestHandlerAuthenticationEnabled = httpRequestHandlerAuthenticationEnabled; + this.micrometerMetricsEnabled = micrometerMetricsEnabled; this.httpRequestHandlerPrefix = httpRequestHandlerPrefix; this.httpRequestHandlerUsername = httpRequestHandlerUsername; this.httpRequestHandlerPassword = httpRequestHandlerPassword; @@ -358,7 +360,8 @@ private RedisquesConfiguration(RedisquesConfigurationBuilder builder) { builder.refreshPeriod, builder.consumerLockMultiplier, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth, builder.redisPassword, builder.redisUser, builder.redisEnableTls, builder.checkInterval, builder.processorTimeout, builder.processorDelayMax, builder.httpRequestHandlerEnabled, - builder.httpRequestHandlerAuthenticationEnabled, builder.httpRequestHandlerPrefix, + builder.httpRequestHandlerAuthenticationEnabled, + builder.micrometerMetricsEnabled, builder.httpRequestHandlerPrefix, builder.httpRequestHandlerUsername, builder.httpRequestHandlerPassword, builder.httpRequestHandlerPort, builder.httpRequestHandlerUserHeader, builder.queueConfigurations, builder.enableQueueNameDecoding, @@ -401,6 +404,7 @@ public JsonObject asJsonObject() { obj.put(PROP_PROCESSOR_DELAY_MAX, getProcessorDelayMax()); obj.put(PROP_HTTP_REQUEST_HANDLER_ENABLED, getHttpRequestHandlerEnabled()); obj.put(PROP_HTTP_REQUEST_HANDLER_AUTH_ENABLED, getHttpRequestHandlerAuthenticationEnabled()); + obj.put(PROP_MICROMETER_METRICS_ENABLED, getMicrometerMetricsEnabled()); obj.put(PROP_HTTP_REQUEST_HANDLER_PREFIX, getHttpRequestHandlerPrefix()); obj.put(PROP_HTTP_REQUEST_HANDLER_USERNAME, getHttpRequestHandlerUsername()); obj.put(PROP_HTTP_REQUEST_HANDLER_PASSWORD, getHttpRequestHandlerPassword()); @@ -499,6 +503,9 @@ public static RedisquesConfiguration fromJsonObject(JsonObject json) { if (json.containsKey(PROP_HTTP_REQUEST_HANDLER_AUTH_ENABLED)) { builder.httpRequestHandlerAuthenticationEnabled(json.getBoolean(PROP_HTTP_REQUEST_HANDLER_AUTH_ENABLED)); } + if (json.containsKey(PROP_MICROMETER_METRICS_ENABLED)) { + builder.micrometerMetricsEnabled(json.getBoolean(PROP_MICROMETER_METRICS_ENABLED)); + } if (json.containsKey(PROP_HTTP_REQUEST_HANDLER_PREFIX)) { builder.httpRequestHandlerPrefix(json.getString(PROP_HTTP_REQUEST_HANDLER_PREFIX)); } @@ -656,6 +663,8 @@ public boolean getHttpRequestHandlerAuthenticationEnabled() { return httpRequestHandlerAuthenticationEnabled; } + public boolean getMicrometerMetricsEnabled() { return micrometerMetricsEnabled; } + public String getHttpRequestHandlerPrefix() { return httpRequestHandlerPrefix; } @@ -780,6 +789,7 @@ public static class RedisquesConfigurationBuilder { private int checkInterval; private int processorTimeout; private long processorDelayMax; + private boolean micrometerMetricsEnabled; private boolean httpRequestHandlerEnabled; private boolean httpRequestHandlerAuthenticationEnabled; private String httpRequestHandlerPrefix; @@ -818,6 +828,7 @@ public RedisquesConfigurationBuilder() { this.processorDelayMax = 0; this.httpRequestHandlerEnabled = false; this.httpRequestHandlerAuthenticationEnabled = false; + this.micrometerMetricsEnabled = false; this.httpRequestHandlerPrefix = "/queuing"; this.httpRequestHandlerUsername = null; this.httpRequestHandlerPassword = null; @@ -960,6 +971,11 @@ public RedisquesConfigurationBuilder httpRequestHandlerEnabled(boolean httpReque return this; } + public RedisquesConfigurationBuilder micrometerMetricsEnabled(boolean micrometerMetricsEnabled) { + this.micrometerMetricsEnabled = micrometerMetricsEnabled; + return this; + } + public RedisquesConfigurationBuilder httpRequestHandlerAuthenticationEnabled(boolean httpRequestHandlerAuthenticationEnabled) { this.httpRequestHandlerAuthenticationEnabled = httpRequestHandlerAuthenticationEnabled; return this; diff --git a/src/test/java/org/swisspush/redisques/RedisQuesTest.java b/src/test/java/org/swisspush/redisques/RedisQuesTest.java index daff4830..a3eab6c3 100644 --- a/src/test/java/org/swisspush/redisques/RedisQuesTest.java +++ b/src/test/java/org/swisspush/redisques/RedisQuesTest.java @@ -1,5 +1,8 @@ package org.swisspush.redisques; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.vertx.core.DeploymentOptions; import io.vertx.core.Handler; import io.vertx.core.Vertx; @@ -29,9 +32,11 @@ public class RedisQuesTest extends AbstractTestCase { private RedisQues redisQues; - private TestMemoryUsageProvider memoryUsageProvider; - + private Counter enqueueCounterSuccess; + private Counter enqueueCounterFail; + private Counter dequeueCounter; + @Rule public Timeout rule = Timeout.seconds(50); @@ -41,6 +46,7 @@ public void deployRedisques(TestContext context) { JsonObject config = RedisquesConfiguration.with() .processorAddress(PROCESSOR_ADDRESS) + .micrometerMetricsEnabled(true) .refreshPeriod(2) .publishMetricsAddress("my-metrics-eb-address") .metricStorageName("foobar") @@ -54,10 +60,16 @@ public void deployRedisques(TestContext context) { .build() .asJsonObject(); + MeterRegistry meterRegistry = new SimpleMeterRegistry(); + enqueueCounterSuccess = meterRegistry.counter(MetricMeter.ENQUEUE_SUCCESS.getId()); + enqueueCounterFail = meterRegistry.counter(MetricMeter.ENQUEUE_FAIL.getId()); + dequeueCounter = meterRegistry.counter(MetricMeter.DEQUEUE.getId()); + memoryUsageProvider = new TestMemoryUsageProvider(Optional.of(50)); redisQues = RedisQues.builder() .withMemoryUsageProvider(memoryUsageProvider) .withRedisquesRedisquesConfigurationProvider(new DefaultRedisquesConfigurationProvider(vertx, config)) + .withMeterRegistry(meterRegistry) .build(); vertx.deployVerticle(redisQues, new DeploymentOptions().setConfig(config), context.asyncAssertSuccess(event -> { deploymentId = event; @@ -189,6 +201,9 @@ public void enqueue(TestContext context) { context.assertEquals(OK, message.result().body().getString(STATUS)); context.assertEquals("helloEnqueue", jedis.lindex(getQueuesRedisKeyPrefix() + "queueEnqueue", 0)); assertKeyCount(context, getQueuesRedisKeyPrefix(), 1); + + assertEnqueueCounts(context, 1.0, 0.0); + async.complete(); }); } @@ -204,6 +219,8 @@ public void enqueueWithReachedMemoryUsageLimit(TestContext context) { context.assertEquals("memory usage limit reached", message.result().body().getString(MESSAGE)); assertKeyCount(context, getQueuesRedisKeyPrefix(), 0); + assertEnqueueCounts(context, 0.0, 1.0); + //reduce current memory usage below the limit memoryUsageProvider.setCurrentMemoryUsage(Optional.of(50)); @@ -211,6 +228,7 @@ public void enqueueWithReachedMemoryUsageLimit(TestContext context) { context.assertEquals(OK, message2.result().body().getString(STATUS)); context.assertEquals("helloEnqueue", jedis.lindex(getQueuesRedisKeyPrefix() + "queueEnqueue", 0)); assertKeyCount(context, getQueuesRedisKeyPrefix(), 1); + context.assertEquals(1.0, enqueueCounterSuccess.count()); async.complete(); }); }); @@ -228,6 +246,7 @@ public void lockedEnqueue(TestContext context) { assertLockContent(context, "queueEnqueue", "someuser"); assertKeyCount(context, getQueuesRedisKeyPrefix(), 1); assertKeyCount(context, getLocksRedisKey(), 1); + assertEnqueueCounts(context, 1.0, 0.0); async.complete(); }); } @@ -244,6 +263,7 @@ public void lockedEnqueueWithReachedMemoryUsageLimit(TestContext context) { assertLockDoesNotExist(context, "queueEnqueue"); assertKeyCount(context, getQueuesRedisKeyPrefix(), 0); assertKeyCount(context, getLocksRedisKey(), 0); + assertEnqueueCounts(context, 0.0, 1.0); //reduce current memory usage below the limit memoryUsageProvider.setCurrentMemoryUsage(Optional.of(50)); @@ -255,6 +275,7 @@ public void lockedEnqueueWithReachedMemoryUsageLimit(TestContext context) { assertLockContent(context, "queueEnqueue", "someuser"); assertKeyCount(context, getQueuesRedisKeyPrefix(), 1); assertKeyCount(context, getLocksRedisKey(), 1); + assertEnqueueCounts(context, 1.0, 1.0); async.complete(); }); }); @@ -277,6 +298,7 @@ public void lockedEnqueueMissingRequestedBy(TestContext context) { assertKeyCount(context, getLocksRedisKey(), 0); assertKeyCount(context, getQueuesRedisKeyPrefix(), 0); assertLockDoesNotExist(context, "queue1"); + assertEnqueueCounts(context, 0.0, 1.0); async.complete(); }); } @@ -1216,4 +1238,9 @@ public void getRescheduleRefreshPeriodOfUnknownQueue(TestContext context) { // still use the default refresh period context.assertEquals(2, redisQues.updateQueueFailureCountAndGetRetryInterval(queue, false), "The retry interval is wrong"); } + + private void assertEnqueueCounts(TestContext context, double successCount, double failCount){ + context.assertEquals(successCount, enqueueCounterSuccess.count(), "Success enqueue count is wrong"); + context.assertEquals(failCount, enqueueCounterFail.count(), "Failed enqueue count is wrong"); + } } diff --git a/src/test/java/org/swisspush/redisques/action/EnqueueActionTest.java b/src/test/java/org/swisspush/redisques/action/EnqueueActionTest.java index cda5c0fd..6c1aee91 100644 --- a/src/test/java/org/swisspush/redisques/action/EnqueueActionTest.java +++ b/src/test/java/org/swisspush/redisques/action/EnqueueActionTest.java @@ -1,5 +1,8 @@ package org.swisspush.redisques.action; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonObject; @@ -11,6 +14,7 @@ import org.junit.runner.RunWith; import org.mockito.Mockito; import org.slf4j.Logger; +import org.swisspush.redisques.util.MetricMeter; import org.swisspush.redisques.util.QueueStatisticsCollector; import java.util.ArrayList; @@ -28,14 +32,20 @@ @RunWith(VertxUnitRunner.class) public class EnqueueActionTest extends AbstractQueueActionTest { + private Counter enqueueCounterSuccess; + private Counter enqueueCounterFail; + @Before @Override public void setup() { super.setup(); + MeterRegistry meterRegistry = new SimpleMeterRegistry(); + enqueueCounterSuccess = meterRegistry.counter(MetricMeter.ENQUEUE_SUCCESS.getId()); + enqueueCounterFail = meterRegistry.counter(MetricMeter.ENQUEUE_FAIL.getId()); action = new EnqueueAction(vertx, redisProvider, "addr", "q-", "prefix-", "c-", "l-", new ArrayList<>(), exceptionFactory, Mockito.mock(QueueStatisticsCollector.class), - Mockito.mock(Logger.class), memoryUsageProvider, 80); + Mockito.mock(Logger.class), memoryUsageProvider, 80, meterRegistry); } @Test @@ -47,6 +57,8 @@ public void testEnqueueWhenRedisIsNotReady(TestContext context){ verify(message, times(1)).reply(eq(new JsonObject(Buffer.buffer("{\"status\":\"error\",\"message\":\"RedisQues QUEUE_ERROR: Error while enqueueing message into queue someQueue\"}")))); verifyNoInteractions(redisAPI); + + assertEnqueueCounts(context,0.0, 1.0); } @Test @@ -58,6 +70,8 @@ public void testDontEnqueueWhenMemoryUsageLimitIsReached(TestContext context){ verify(message, times(1)).reply(eq(new JsonObject(Buffer.buffer("{\"status\":\"error\",\"message\":\"memory usage limit reached\"}")))); verifyNoInteractions(redisAPI); + + assertEnqueueCounts(context,0.0, 1.0); } @Test @@ -73,6 +87,8 @@ public void testDontEnqueueWhenUpdateTimestampFails(TestContext context){ "\"error\",\"message\":\"RedisQues QUEUE_ERROR: Error while enqueueing message into " + "queue updateTimestampFail\"}")))); verify(redisAPI, never()).rpush(anyList()); + + assertEnqueueCounts(context,0.0, 1.0); } @Test @@ -87,5 +103,12 @@ public void testEnqueueWhenUpdateTimestampSucceeds(TestContext context){ verify(message, times(1)).reply(eq(new JsonObject(Buffer.buffer("{\"status\":\"ok\",\"message\":\"enqueued\"}")))); verify(redisAPI, times(1)).rpush(eq(Arrays.asList("prefix-someQueue", "hello"))); + + assertEnqueueCounts(context,1.0, 0.0); + } + + private void assertEnqueueCounts(TestContext context, double successCount, double failCount){ + context.assertEquals(successCount, enqueueCounterSuccess.count(), "Success enqueue count is wrong"); + context.assertEquals(failCount, enqueueCounterFail.count(), "Failed enqueue count is wrong"); } } diff --git a/src/test/java/org/swisspush/redisques/action/LockedEnqueueActionTest.java b/src/test/java/org/swisspush/redisques/action/LockedEnqueueActionTest.java index 471581b2..ba22d868 100644 --- a/src/test/java/org/swisspush/redisques/action/LockedEnqueueActionTest.java +++ b/src/test/java/org/swisspush/redisques/action/LockedEnqueueActionTest.java @@ -1,5 +1,8 @@ package org.swisspush.redisques.action; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.MeterRegistry; +import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; import io.vertx.core.json.JsonObject; @@ -10,6 +13,7 @@ import org.junit.runner.RunWith; import org.mockito.Mockito; import org.slf4j.Logger; +import org.swisspush.redisques.util.MetricMeter; import org.swisspush.redisques.util.QueueStatisticsCollector; import java.util.ArrayList; @@ -25,14 +29,20 @@ @RunWith(VertxUnitRunner.class) public class LockedEnqueueActionTest extends AbstractQueueActionTest { + private Counter enqueueCounterSuccess; + private Counter enqueueCounterFail; + @Before @Override public void setup() { super.setup(); + MeterRegistry meterRegistry = new SimpleMeterRegistry(); + enqueueCounterSuccess = meterRegistry.counter(MetricMeter.ENQUEUE_SUCCESS.getId()); + enqueueCounterFail = meterRegistry.counter(MetricMeter.ENQUEUE_FAIL.getId()); action = new LockedEnqueueAction(vertx, redisProvider, "addr", "q-", "prefix-", "c-", "l-", new ArrayList<>(), exceptionFactory, Mockito.mock(QueueStatisticsCollector.class), - Mockito.mock(Logger.class), memoryUsageProvider, 80); + Mockito.mock(Logger.class), memoryUsageProvider, 80, meterRegistry); } @Test @@ -42,8 +52,13 @@ public void testLockedEnqueueWhenRedisIsNotReady(TestContext context){ action.execute(message); + assertEnqueueCounts(context, 0.0, 1.0); verify(message, times(1)).reply(eq(new JsonObject(Buffer.buffer("{\"status\":\"error\"}")))); verifyNoInteractions(redisAPI); } + private void assertEnqueueCounts(TestContext context, double successCount, double failCount){ + context.assertEquals(successCount, enqueueCounterSuccess.count(), "Success enqueue count is wrong"); + context.assertEquals(failCount, enqueueCounterFail.count(), "Failed enqueue count is wrong"); + } } diff --git a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java index b82e2a70..41590b0b 100644 --- a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java +++ b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java @@ -41,6 +41,7 @@ public void testDefaultConfiguration(TestContext testContext) { testContext.assertEquals(config.getProcessorDelayMax(), 0L); testContext.assertFalse(config.getHttpRequestHandlerEnabled()); testContext.assertFalse(config.getHttpRequestHandlerAuthenticationEnabled()); + testContext.assertFalse(config.getMicrometerMetricsEnabled()); testContext.assertEquals(config.getHttpRequestHandlerPrefix(), "/queuing"); testContext.assertNull(config.getHttpRequestHandlerUsername()); testContext.assertNull(config.getHttpRequestHandlerPassword()); @@ -69,6 +70,7 @@ public void testOverrideConfiguration(TestContext testContext) { .processorDelayMax(50) .httpRequestHandlerEnabled(true) .httpRequestHandlerAuthenticationEnabled(true) + .micrometerMetricsEnabled(true) .httpRequestHandlerPrefix("/queuing/test") .httpRequestHandlerUsername("foo") .httpRequestHandlerPassword("bar") @@ -103,6 +105,7 @@ public void testOverrideConfiguration(TestContext testContext) { testContext.assertEquals(config.getProcessorDelayMax(), 50L); testContext.assertTrue(config.getHttpRequestHandlerEnabled()); testContext.assertTrue(config.getHttpRequestHandlerAuthenticationEnabled()); + testContext.assertTrue(config.getMicrometerMetricsEnabled()); testContext.assertEquals(config.getHttpRequestHandlerPrefix(), "/queuing/test"); testContext.assertEquals(config.getHttpRequestHandlerUsername(), "foo"); testContext.assertEquals(config.getHttpRequestHandlerPassword(), "bar"); @@ -142,6 +145,7 @@ public void testGetDefaultAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getInteger(PROP_PROCESSOR_DELAY_MAX), 0); testContext.assertFalse(json.getBoolean(PROP_HTTP_REQUEST_HANDLER_ENABLED)); testContext.assertFalse(json.getBoolean(PROP_HTTP_REQUEST_HANDLER_AUTH_ENABLED)); + testContext.assertFalse(json.getBoolean(PROP_MICROMETER_METRICS_ENABLED)); testContext.assertEquals(json.getString(PROP_HTTP_REQUEST_HANDLER_PREFIX), "/queuing"); testContext.assertNull(json.getString(PROP_HTTP_REQUEST_HANDLER_USERNAME)); testContext.assertNull(json.getString(PROP_HTTP_REQUEST_HANDLER_PASSWORD)); @@ -172,6 +176,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) { .processorDelayMax(50) .httpRequestHandlerPort(7171) .httpRequestHandlerAuthenticationEnabled(true) + .micrometerMetricsEnabled(true) .httpRequestHandlerUsername("foo") .httpRequestHandlerPassword("bar") .httpRequestHandlerUserHeader("x-custom-user-header") @@ -212,6 +217,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getString(PROP_HTTP_REQUEST_HANDLER_USERNAME), "foo"); testContext.assertEquals(json.getString(PROP_HTTP_REQUEST_HANDLER_PASSWORD), "bar"); testContext.assertTrue(json.getBoolean(PROP_HTTP_REQUEST_HANDLER_AUTH_ENABLED)); + testContext.assertTrue(json.getBoolean(PROP_MICROMETER_METRICS_ENABLED)); testContext.assertEquals(json.getString(PROP_HTTP_REQUEST_HANDLER_USER_HEADER), "x-custom-user-header"); testContext.assertEquals(json.getInteger(PROP_QUEUE_SPEED_INTERVAL_SEC), 1); testContext.assertEquals(json.getInteger(PROP_MEMORY_USAGE_LIMIT_PCT), 55); @@ -251,6 +257,7 @@ public void testGetDefaultFromJsonObject(TestContext testContext) { testContext.assertEquals(config.getProcessorDelayMax(), 0L); testContext.assertFalse(config.getHttpRequestHandlerEnabled()); testContext.assertFalse(config.getHttpRequestHandlerAuthenticationEnabled()); + testContext.assertFalse(config.getMicrometerMetricsEnabled()); testContext.assertEquals(config.getHttpRequestHandlerPrefix(), "/queuing"); testContext.assertNull(config.getHttpRequestHandlerUsername()); testContext.assertNull(config.getHttpRequestHandlerPassword()); @@ -285,6 +292,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) { json.put(PROP_PROCESSOR_DELAY_MAX, 99); json.put(PROP_HTTP_REQUEST_HANDLER_ENABLED, Boolean.TRUE); json.put(PROP_HTTP_REQUEST_HANDLER_AUTH_ENABLED, Boolean.TRUE); + json.put(PROP_MICROMETER_METRICS_ENABLED, Boolean.TRUE); json.put(PROP_HTTP_REQUEST_HANDLER_PREFIX, "/queuing/test123"); json.put(PROP_HTTP_REQUEST_HANDLER_USERNAME, "foo"); json.put(PROP_HTTP_REQUEST_HANDLER_PASSWORD, "bar"); @@ -319,6 +327,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) { testContext.assertEquals(config.getProcessorDelayMax(), 99L); testContext.assertTrue(config.getHttpRequestHandlerEnabled()); testContext.assertTrue(config.getHttpRequestHandlerAuthenticationEnabled()); + testContext.assertTrue(config.getMicrometerMetricsEnabled()); testContext.assertEquals(config.getHttpRequestHandlerPort(), 7171); testContext.assertEquals(config.getHttpRequestHandlerPrefix(), "/queuing/test123"); testContext.assertEquals(config.getHttpRequestHandlerUsername(), "foo"); From a1f49c49e5ce2e8ee10c370b7bc28269f423fec1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Weber?= Date: Wed, 27 Nov 2024 14:40:41 +0100 Subject: [PATCH 2/5] #224 Added metric of active queue count --- .../org/swisspush/redisques/RedisQues.java | 19 +++++-- .../metrics/PeriodicMetricsCollector.java | 54 +++++++++++++++++++ .../swisspush/redisques/util/MetricMeter.java | 5 +- 3 files changed, 71 insertions(+), 7 deletions(-) create mode 100644 src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 11bcf2d5..fc18d8f6 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -24,6 +24,7 @@ import org.swisspush.redisques.action.QueueAction; import org.swisspush.redisques.exception.RedisQuesExceptionFactory; import org.swisspush.redisques.handler.RedisquesHttpRequestHandler; +import org.swisspush.redisques.metrics.PeriodicMetricsCollector; import org.swisspush.redisques.performance.UpperBoundParallel; import org.swisspush.redisques.scheduling.PeriodicSkipScheduler; import org.swisspush.redisques.util.*; @@ -331,6 +332,18 @@ private void handleRegistrationRequest(Message msg) { }); } + private void initMicrometerMetrics(RedisquesConfiguration modConfig) { + if(meterRegistry == null) { + meterRegistry = BackendRegistries.getDefaultNow(); + } + dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId()) + .description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry); + + String address = modConfig.getAddress(); + int metricRefreshPeriod = modConfig.getMetricRefreshPeriod(); + new PeriodicMetricsCollector(vertx, address, meterRegistry, metricRefreshPeriod); + } + @Override public void start(Promise promise) { log.info("Started with UID {}", uid); @@ -347,11 +360,7 @@ public void start(Promise promise) { log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration()); if(configurationProvider.configuration().getMicrometerMetricsEnabled()) { - if(meterRegistry == null) { - meterRegistry = BackendRegistries.getDefaultNow(); - } - dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId()) - .description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry); + initMicrometerMetrics(modConfig); } int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec(); diff --git a/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java b/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java new file mode 100644 index 00000000..1a8555e7 --- /dev/null +++ b/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java @@ -0,0 +1,54 @@ +package org.swisspush.redisques.metrics; + +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.redisques.util.MetricMeter; + +import java.util.concurrent.atomic.AtomicLong; + +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.VALUE; +import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesCountOperation; + + +public class PeriodicMetricsCollector { + + private static final Logger log = LoggerFactory.getLogger(PeriodicMetricsCollector.class); + private final Vertx vertx; + private final String redisquesAddress; + + private final AtomicLong activeQueuesCount = new AtomicLong(0); + + public PeriodicMetricsCollector(Vertx vertx, String redisquesAddress, MeterRegistry meterRegistry, long metricCollectIntervalSec) { + this.vertx = vertx; + this.redisquesAddress = redisquesAddress; + + Gauge.builder(MetricMeter.ACTIVE_QUEUES.getId(), activeQueuesCount, AtomicLong::get). + description(MetricMeter.ACTIVE_QUEUES.getDescription()). + register(meterRegistry); + + vertx.setPeriodic(metricCollectIntervalSec * 1000, event -> updateActiveQueuesCount()); + } + + private void updateActiveQueuesCount() { + vertx.eventBus().request(redisquesAddress, buildGetQueuesCountOperation(), (Handler>>) reply -> { + if(reply.failed()) { + log.warn("TODO error handling", reply.cause()); + } else if (reply.succeeded() && OK.equals(reply.result().body().getString(STATUS))) { + activeQueuesCount.set(reply.result().body().getLong(VALUE)); + } else { + log.warn("Error gathering count of active queues. Cause: {}", reply.result().body().getString(MESSAGE)); + } + }); + } + +} diff --git a/src/main/java/org/swisspush/redisques/util/MetricMeter.java b/src/main/java/org/swisspush/redisques/util/MetricMeter.java index 3f186b6c..66f0b205 100644 --- a/src/main/java/org/swisspush/redisques/util/MetricMeter.java +++ b/src/main/java/org/swisspush/redisques/util/MetricMeter.java @@ -2,9 +2,10 @@ public enum MetricMeter { - ENQUEUE_SUCCESS("redisques.enqueue.success", "Overall count of queue items to be enqueued sucessfully"), + ENQUEUE_SUCCESS("redisques.enqueue.success", "Overall count of queue items to be enqueued successfully"), ENQUEUE_FAIL("redisques.enqueue.fail", "Overall count of queue items which could not be enqueued"), - DEQUEUE("redisques.dequeue", "Overall count of queue items to be dequeued from the queues"); + DEQUEUE("redisques.dequeue", "Overall count of queue items to be dequeued from the queues"), + ACTIVE_QUEUES("redisques.active.queues", "Count of active queues"); private final String id; private final String description; From bb94eadcf0212e9945aa19506876822f7c6d860f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Weber?= Date: Wed, 27 Nov 2024 14:42:56 +0100 Subject: [PATCH 3/5] #224 Added metric of active queue count --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 9e8913ee..ae5cbec7 100644 --- a/README.md +++ b/README.md @@ -1049,6 +1049,7 @@ The collected metrics include: | redisques_enqueue_success_total | Overall count of queue items to be enqueued | | redisques_enqueue_fail_total | Overall count of queue items to be enqueued | | redisques_dequeue_total | Overall count of queue items to be dequeued from the queues | +| redisques_active_queues | Overall count of active queues | ### Testing locally When you include redisques in you project, you probably already have the configuration for publishing the metrics. From 0942eac4395ca94b619bc7ed3795c2a6c9fe6340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Weber?= Date: Wed, 27 Nov 2024 16:30:38 +0100 Subject: [PATCH 4/5] #224 Added metric of max queue size --- README.md | 1 + .../redisques/QueueStatsService.java | 27 +++++++++++++++--- .../org/swisspush/redisques/RedisQues.java | 28 +++++++++---------- .../handler/RedisquesHttpRequestHandler.java | 10 ++++--- .../swisspush/redisques/util/MetricMeter.java | 3 +- 5 files changed, 46 insertions(+), 23 deletions(-) diff --git a/README.md b/README.md index ae5cbec7..f37a4808 100644 --- a/README.md +++ b/README.md @@ -1050,6 +1050,7 @@ The collected metrics include: | redisques_enqueue_fail_total | Overall count of queue items to be enqueued | | redisques_dequeue_total | Overall count of queue items to be dequeued from the queues | | redisques_active_queues | Overall count of active queues | +| redisques_max_queue_size | Amount of queue items of the biggest queue | ### Testing locally When you include redisques in you project, you probably already have the configuration for publishing the metrics. diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index f16cc811..f3c66a22 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -1,5 +1,7 @@ package org.swisspush.redisques; +import io.micrometer.core.instrument.Gauge; +import io.micrometer.core.instrument.MeterRegistry; import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; import io.vertx.core.json.JsonArray; @@ -8,15 +10,14 @@ import org.swisspush.redisques.exception.RedisQuesExceptionFactory; import org.swisspush.redisques.util.DequeueStatistic; import org.swisspush.redisques.util.DequeueStatisticCollector; +import org.swisspush.redisques.util.MetricMeter; import org.swisspush.redisques.util.QueueStatisticsCollector; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiConsumer; import static java.lang.Long.compare; @@ -51,6 +52,8 @@ public class QueueStatsService { private final RedisQuesExceptionFactory exceptionFactory; private final Semaphore incomingRequestQuota; + private final AtomicLong maxQueueSize = new AtomicLong(0); + public QueueStatsService( Vertx vertx, EventBus eventBus, @@ -58,7 +61,8 @@ public QueueStatsService( QueueStatisticsCollector queueStatisticsCollector, DequeueStatisticCollector dequeueStatisticCollector, RedisQuesExceptionFactory exceptionFactory, - Semaphore incomingRequestQuota + Semaphore incomingRequestQuota, + MeterRegistry meterRegistry ) { this.vertx = vertx; this.eventBus = eventBus; @@ -67,6 +71,12 @@ public QueueStatsService( this.dequeueStatisticCollector = dequeueStatisticCollector; this.exceptionFactory = exceptionFactory; this.incomingRequestQuota = incomingRequestQuota; + + if(meterRegistry != null) { + Gauge.builder(MetricMeter.MAX_QUEUE_SIZE.getId(), maxQueueSize, AtomicLong::get). + description(MetricMeter.MAX_QUEUE_SIZE.getDescription()). + register(meterRegistry); + } } public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { @@ -156,10 +166,19 @@ private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsu int limit = req.mentor.limit(req.mCtx); if (limit != 0 && queues.size() > limit) queues = queues.subList(0, limit); req.queues = queues; + collectMaxQueueSize(queues); onDone.accept(null, req); }); } + private void collectMaxQueueSize(List queues) { + if(queues.isEmpty()) { + maxQueueSize.set(0); + } else { + maxQueueSize.set(queues.get(0).getSize()); + } + } + private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer> onDone) { long begGetQueueStatsMs = currentTimeMillis(); assert req.queueNames != null; diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index fc18d8f6..c8cd546e 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -228,7 +228,7 @@ private enum QueueState { private RedisquesConfigurationProvider configurationProvider; private RedisMonitor redisMonitor; - private MeterRegistry meterRegistry = null; + private MeterRegistry meterRegistry; private Counter dequeueCounter; private Map queueActions = new HashMap<>(); @@ -332,18 +332,6 @@ private void handleRegistrationRequest(Message msg) { }); } - private void initMicrometerMetrics(RedisquesConfiguration modConfig) { - if(meterRegistry == null) { - meterRegistry = BackendRegistries.getDefaultNow(); - } - dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId()) - .description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry); - - String address = modConfig.getAddress(); - int metricRefreshPeriod = modConfig.getMetricRefreshPeriod(); - new PeriodicMetricsCollector(vertx, address, meterRegistry, metricRefreshPeriod); - } - @Override public void start(Promise promise) { log.info("Started with UID {}", uid); @@ -396,6 +384,18 @@ public void start(Promise promise) { }); } + private void initMicrometerMetrics(RedisquesConfiguration modConfig) { + if(meterRegistry == null) { + meterRegistry = BackendRegistries.getDefaultNow(); + } + dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId()) + .description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry); + + String address = modConfig.getAddress(); + int metricRefreshPeriod = modConfig.getMetricRefreshPeriod(); + new PeriodicMetricsCollector(vertx, address, meterRegistry, metricRefreshPeriod); + } + private void initialize() { RedisquesConfiguration configuration = configurationProvider.configuration(); this.queueStatisticsCollector = new QueueStatisticsCollector( @@ -404,7 +404,7 @@ private void initialize() { RedisquesHttpRequestHandler.init( vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector, - exceptionFactory, queueStatsRequestQuota); + exceptionFactory, queueStatsRequestQuota, meterRegistry); // only initialize memoryUsageProvider when not provided in the constructor if (memoryUsageProvider == null) { diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index a5af7a1f..6921b232 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -1,5 +1,6 @@ package org.swisspush.redisques.handler; +import io.micrometer.core.instrument.MeterRegistry; import io.netty.util.internal.StringUtil; import io.vertx.core.AsyncResult; import io.vertx.core.Handler; @@ -83,14 +84,14 @@ public class RedisquesHttpRequestHandler implements Handler { public static void init( Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector, DequeueStatisticCollector dequeueStatisticCollector, RedisQuesExceptionFactory exceptionFactory, - Semaphore queueStatsRequestQuota + Semaphore queueStatsRequestQuota, MeterRegistry meterRegistry ) { log.info("Enabling http request handler: {}", modConfig.getHttpRequestHandlerEnabled()); if (modConfig.getHttpRequestHandlerEnabled()) { if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) { var handler = new RedisquesHttpRequestHandler( vertx, modConfig, queueStatisticsCollector, dequeueStatisticCollector, - exceptionFactory, queueStatsRequestQuota); + exceptionFactory, queueStatsRequestQuota, meterRegistry); // in Vert.x 2x 100-continues was activated per default, in vert.x 3x it is off per default. HttpServerOptions options = new HttpServerOptions().setHandle100ContinueAutomatically(true); vertx.createHttpServer(options).requestHandler(handler).listen(modConfig.getHttpRequestHandlerPort(), result -> { @@ -125,7 +126,8 @@ private RedisquesHttpRequestHandler( QueueStatisticsCollector queueStatisticsCollector, DequeueStatisticCollector dequeueStatisticCollector, RedisQuesExceptionFactory exceptionFactory, - Semaphore queueStatsRequestQuota + Semaphore queueStatsRequestQuota, + MeterRegistry meterRegistry ) { this.vertx = vertx; this.router = Router.router(vertx); @@ -138,7 +140,7 @@ private RedisquesHttpRequestHandler( this.exceptionFactory = exceptionFactory; this.queueStatsService = new QueueStatsService( vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector, - exceptionFactory, queueStatsRequestQuota); + exceptionFactory, queueStatsRequestQuota, meterRegistry); final String prefix = modConfig.getHttpRequestHandlerPrefix(); diff --git a/src/main/java/org/swisspush/redisques/util/MetricMeter.java b/src/main/java/org/swisspush/redisques/util/MetricMeter.java index 66f0b205..43227838 100644 --- a/src/main/java/org/swisspush/redisques/util/MetricMeter.java +++ b/src/main/java/org/swisspush/redisques/util/MetricMeter.java @@ -5,7 +5,8 @@ public enum MetricMeter { ENQUEUE_SUCCESS("redisques.enqueue.success", "Overall count of queue items to be enqueued successfully"), ENQUEUE_FAIL("redisques.enqueue.fail", "Overall count of queue items which could not be enqueued"), DEQUEUE("redisques.dequeue", "Overall count of queue items to be dequeued from the queues"), - ACTIVE_QUEUES("redisques.active.queues", "Count of active queues"); + ACTIVE_QUEUES("redisques.active.queues", "Count of active queues"), + MAX_QUEUE_SIZE("redisques.max.queue.size", "Amount of queue items of the biggest queue"); private final String id; private final String description; From bc2700e56032f7cfb339e0f4070c9f2ee6fd0f1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Weber?= Date: Thu, 28 Nov 2024 16:07:37 +0100 Subject: [PATCH 5/5] #224 Adopted feedback from code review --- README.md | 4 ++-- src/main/java/org/swisspush/redisques/RedisQues.java | 2 +- .../redisques/metrics/PeriodicMetricsCollector.java | 9 ++++++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index f37a4808..3689c042 100644 --- a/README.md +++ b/README.md @@ -1020,7 +1020,7 @@ Available url parameters are: * _filter=_: Filter the queues for which the cumulated speed is evaluated The result will be a json object with the speed of the last measurement period calculated -over all queues matching the given filter regex. Additionally the used measurement time in seconds +over all queues matching the given filter regex. Additionally, the used measurement time in seconds is returned (eg. 60 seconds by default) ```json @@ -1047,7 +1047,7 @@ The collected metrics include: | Metric name | Description | |:--------------------------------|:------------------------------------------------------------| | redisques_enqueue_success_total | Overall count of queue items to be enqueued | -| redisques_enqueue_fail_total | Overall count of queue items to be enqueued | +| redisques_enqueue_fail_total | Overall count of failing enqueues | | redisques_dequeue_total | Overall count of queue items to be dequeued from the queues | | redisques_active_queues | Overall count of active queues | | redisques_max_queue_size | Amount of queue items of the biggest queue | diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index c8cd546e..2667e0a0 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -393,7 +393,7 @@ private void initMicrometerMetrics(RedisquesConfiguration modConfig) { String address = modConfig.getAddress(); int metricRefreshPeriod = modConfig.getMetricRefreshPeriod(); - new PeriodicMetricsCollector(vertx, address, meterRegistry, metricRefreshPeriod); + new PeriodicMetricsCollector(vertx, periodicSkipScheduler, address, meterRegistry, metricRefreshPeriod); } private void initialize() { diff --git a/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java b/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java index 1a8555e7..224a1000 100644 --- a/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java +++ b/src/main/java/org/swisspush/redisques/metrics/PeriodicMetricsCollector.java @@ -9,6 +9,7 @@ import io.vertx.core.json.JsonObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.swisspush.redisques.scheduling.PeriodicSkipScheduler; import org.swisspush.redisques.util.MetricMeter; import java.util.concurrent.atomic.AtomicLong; @@ -28,7 +29,7 @@ public class PeriodicMetricsCollector { private final AtomicLong activeQueuesCount = new AtomicLong(0); - public PeriodicMetricsCollector(Vertx vertx, String redisquesAddress, MeterRegistry meterRegistry, long metricCollectIntervalSec) { + public PeriodicMetricsCollector(Vertx vertx, PeriodicSkipScheduler periodicSkipScheduler, String redisquesAddress, MeterRegistry meterRegistry, long metricCollectIntervalSec) { this.vertx = vertx; this.redisquesAddress = redisquesAddress; @@ -36,10 +37,11 @@ public PeriodicMetricsCollector(Vertx vertx, String redisquesAddress, MeterRegis description(MetricMeter.ACTIVE_QUEUES.getDescription()). register(meterRegistry); - vertx.setPeriodic(metricCollectIntervalSec * 1000, event -> updateActiveQueuesCount()); + periodicSkipScheduler.setPeriodic(metricCollectIntervalSec * 1000, "metricCollectRefresh", + this::updateActiveQueuesCount); } - private void updateActiveQueuesCount() { + private void updateActiveQueuesCount(Runnable onPeriodicDone) { vertx.eventBus().request(redisquesAddress, buildGetQueuesCountOperation(), (Handler>>) reply -> { if(reply.failed()) { log.warn("TODO error handling", reply.cause()); @@ -48,6 +50,7 @@ private void updateActiveQueuesCount() { } else { log.warn("Error gathering count of active queues. Cause: {}", reply.result().body().getString(MESSAGE)); } + onPeriodicDone.run(); }); }