Skip to content

Commit

Permalink
#224 Added metrics for enqueue and dequeue
Browse files Browse the repository at this point in the history
  • Loading branch information
mcweba committed Nov 27, 2024
1 parent 0b84b9d commit 0f805be
Show file tree
Hide file tree
Showing 12 changed files with 263 additions and 33 deletions.
40 changes: 38 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The following configuration values are available:
| redisReconnectAttempts | 0 | The amount of attempts to reconnect when redis connection is lost. Use **0** to not reconnect at all or **-1** to reconnect indefinitely. |
| redisReconnectDelaySec | 30 | The interval [s] to attempt to reconnect when redis connection is lost. |
| redisPoolRecycleTimeoutMs | 180000 | The timeout [ms] when the connection pool is recycled. Use **-1** when having reconnect feature enabled. |
| micrometerMetricsEnabled | false | Enable / disable collection of metrics using micrometer |
| httpRequestHandlerEnabled | false | Enable / disable the HTTP API |
| httpRequestHandlerAuthenticationEnabled | false | Enable / disable authentication for the HTTP API |
| httpRequestHandlerUsername | | The username for the HTTP API authentication |
Expand Down Expand Up @@ -630,7 +631,6 @@ Response Data
}
```


#### getQueuesSpeed

Request Data
Expand All @@ -651,7 +651,6 @@ Response Data
}
```


## RedisQues HTTP API
RedisQues provides a HTTP API to modify queues, queue items and get information about queue counts and queue item counts.

Expand Down Expand Up @@ -1040,6 +1039,43 @@ SemaphoreConfig semaphoreConfig = new SemaphoreConfig().setInitialPermits(1).set
hazelcastConfig.getCPSubsystemConfig().addSemaphoreConfig(semaphoreConfig);
```

## Metric collection
Besides the API, redisques provides some key metrics collected by [micrometer.io](https://micrometer.io/).

The collected metrics include:

| Metric name | Description |
|:--------------------------------|:------------------------------------------------------------|
| redisques_enqueue_success_total | Overall count of queue items to be enqueued |
| redisques_enqueue_fail_total | Overall count of queue items to be enqueued |
| redisques_dequeue_total | Overall count of queue items to be dequeued from the queues |

### Testing locally
When you include redisques in you project, you probably already have the configuration for publishing the metrics.

To export the metrics locally you have to add this dependency to the `pom.xml`

```
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>${micrometer.version}</version>
</dependency>
```
Also add the micrometer configuration to `RedisQuesRunner` class like this:

```java
MicrometerMetricsOptions options = new MicrometerMetricsOptions()
.setPrometheusOptions(new VertxPrometheusOptions()
.setStartEmbeddedServer(true)
.setEmbeddedServerOptions(new HttpServerOptions().setPort(9101))
.setEnabled(true))
.setEnabled(true);
Vertx vertx = Vertx.vertx(new VertxOptions().setMetricsOptions(options));
```
Using the configuration above, the metrics can be accessed with

> GET http://localhost:9101/metrics

## Dependencies
Expand Down
11 changes: 11 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@
<artifactId>vertx-web</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-micrometer-metrics</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -390,6 +400,7 @@
</profiles>
<properties>
<vertx.version>4.5.2</vertx.version>
<micrometer.version>1.12.13</micrometer.version>
<slf4j.version>2.0.10</slf4j.version>
<mockito.version>5.8.0</mockito.version>
<junit.version>4.13.2</junit.version>
Expand Down
45 changes: 42 additions & 3 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.swisspush.redisques;

import com.google.common.base.Strings;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
Expand All @@ -13,6 +15,7 @@
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.micrometer.backends.BackendRegistries;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.Response;
Expand Down Expand Up @@ -84,6 +87,7 @@ public static class RedisQuesBuilder {
private RedisquesConfigurationProvider configurationProvider;
private RedisProvider redisProvider;
private RedisQuesExceptionFactory exceptionFactory;
private MeterRegistry meterRegistry;
private Semaphore redisMonitoringReqQuota;
private Semaphore checkQueueRequestsQuota;
private Semaphore queueStatsRequestQuota;
Expand Down Expand Up @@ -113,6 +117,11 @@ public RedisQuesBuilder withExceptionFactory(RedisQuesExceptionFactory exception
return this;
}

public RedisQuesBuilder withMeterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
return this;
}

/**
* How many redis requests monitoring related component will trigger
* simultaneously. One of those components for example is
Expand Down Expand Up @@ -172,7 +181,7 @@ public RedisQues build() {
}
return new RedisQues(memoryUsageProvider, configurationProvider, redisProvider, exceptionFactory,
redisMonitoringReqQuota, checkQueueRequestsQuota, queueStatsRequestQuota,
getQueuesItemsCountRedisRequestQuota);
getQueuesItemsCountRedisRequestQuota, meterRegistry);
}
}

Expand Down Expand Up @@ -218,6 +227,9 @@ private enum QueueState {
private RedisquesConfigurationProvider configurationProvider;
private RedisMonitor redisMonitor;

private MeterRegistry meterRegistry = null;
private Counter dequeueCounter;

private Map<QueueOperation, QueueAction> queueActions = new HashMap<>();

private Map<String, DequeueStatistic> dequeueStatistic = new ConcurrentHashMap<>();
Expand All @@ -235,6 +247,20 @@ public RedisQues() {
log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE);
}

public RedisQues(
MemoryUsageProvider memoryUsageProvider,
RedisquesConfigurationProvider configurationProvider,
RedisProvider redisProvider,
RedisQuesExceptionFactory exceptionFactory,
Semaphore redisMonitoringReqQuota,
Semaphore checkQueueRequestsQuota,
Semaphore queueStatsRequestQuota,
Semaphore getQueuesItemsCountRedisRequestQuota
) {
this(memoryUsageProvider, configurationProvider, redisProvider, exceptionFactory, redisMonitoringReqQuota,
checkQueueRequestsQuota, queueStatsRequestQuota, getQueuesItemsCountRedisRequestQuota, null);
}

public RedisQues(
MemoryUsageProvider memoryUsageProvider,
RedisquesConfigurationProvider configurationProvider,
Expand All @@ -243,7 +269,8 @@ public RedisQues(
Semaphore redisMonitoringReqQuota,
Semaphore checkQueueRequestsQuota,
Semaphore queueStatsRequestQuota,
Semaphore getQueuesItemsCountRedisRequestQuota
Semaphore getQueuesItemsCountRedisRequestQuota,
MeterRegistry meterRegistry
) {
this.memoryUsageProvider = memoryUsageProvider;
this.configurationProvider = configurationProvider;
Expand All @@ -253,6 +280,7 @@ public RedisQues(
this.checkQueueRequestsQuota = checkQueueRequestsQuota;
this.queueStatsRequestQuota = queueStatsRequestQuota;
this.getQueuesItemsCountRedisRequestQuota = getQueuesItemsCountRedisRequestQuota;
this.meterRegistry = meterRegistry;
}

public static RedisQuesBuilder builder() {
Expand Down Expand Up @@ -318,6 +346,14 @@ public void start(Promise<Void> promise) {
RedisquesConfiguration modConfig = configurationProvider.configuration();
log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration());

if(configurationProvider.configuration().getMicrometerMetricsEnabled()) {
if(meterRegistry == null) {
meterRegistry = BackendRegistries.getDefaultNow();
}
dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId())
.description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry);
}

int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec();
if (modConfig.isDequeueStatsEnabled()) {
dequeueStatisticEnabled = true;
Expand Down Expand Up @@ -371,7 +407,7 @@ private void initialize() {
queueActionFactory = new QueueActionFactory(
redisProvider, vertx, log, queuesKey, queuesPrefix, consumersPrefix, locksKey,
memoryUsageProvider, queueStatisticsCollector, exceptionFactory,
configurationProvider, getQueuesItemsCountRedisRequestQuota);
configurationProvider, getQueuesItemsCountRedisRequestQuota, meterRegistry);

queueActions.put(addQueueItem, queueActionFactory.buildQueueAction(addQueueItem));
queueActions.put(deleteQueueItem, queueActionFactory.buildQueueAction(deleteQueueItem));
Expand Down Expand Up @@ -950,6 +986,9 @@ private Future<Void> readQueue(final String queueName) {
log.error("Failed to pop from queue '{}'", queueName, jsonAnswer.cause());
// We should return here. See: "https://softwareengineering.stackexchange.com/a/190535"
}
if(dequeueCounter != null) {
dequeueCounter.increment();
}
log.debug("RedisQues Message removed, queue {} is ready again", queueName);
myQueues.put(queueName, QueueState.READY);

Expand Down
32 changes: 27 additions & 5 deletions src/main/java/org/swisspush/redisques/action/EnqueueAction.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package org.swisspush.redisques.action;

import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.util.MemoryUsageProvider;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
import org.swisspush.redisques.util.*;

import java.util.Arrays;
import java.util.List;
Expand All @@ -19,17 +18,24 @@ public class EnqueueAction extends AbstractQueueAction {

private final MemoryUsageProvider memoryUsageProvider;
private final int memoryUsageLimitPercent;
private Counter enqueueCounterSuccess;
private Counter enqueueCounterFail;

public EnqueueAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log,
MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent
MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent, MeterRegistry meterRegistry
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, exceptionFactory, queueStatisticsCollector, log);
this.memoryUsageProvider = memoryUsageProvider;
this.memoryUsageLimitPercent = memoryUsageLimitPercent;

if(meterRegistry != null) {
enqueueCounterSuccess = Counter.builder(MetricMeter.ENQUEUE_SUCCESS.getId()).description(MetricMeter.ENQUEUE_SUCCESS.getDescription()).register(meterRegistry);
enqueueCounterFail = Counter.builder(MetricMeter.ENQUEUE_FAIL.getId()).description(MetricMeter.ENQUEUE_FAIL.getDescription()).register(meterRegistry);
}
}

@Override
Expand All @@ -38,6 +44,7 @@ public void execute(Message<JsonObject> event) {

if (isMemoryUsageLimitReached()) {
log.warn("Failed to enqueue into queue {} because the memory usage limit is reached", queueName);
incrEnqueueFailCount();
event.reply(createErrorReply().put(MESSAGE, MEMORY_FULL));
return;
}
Expand All @@ -61,6 +68,8 @@ public void execute(Message<JsonObject> event) {
reply.put(STATUS, OK);
reply.put(MESSAGE, "enqueued");

incrEnqueueSuccessCount();

// feature EN-queue slow-down (the larger the queue the longer we delay "OK" response)
long delayReplyMillis = 0;
QueueConfiguration queueConfiguration = findQueueConfiguration(queueName);
Expand Down Expand Up @@ -88,7 +97,20 @@ public void execute(Message<JsonObject> event) {
});
}

private void incrEnqueueSuccessCount() {
if(enqueueCounterSuccess != null) {
enqueueCounterSuccess.increment();
}
}

protected void incrEnqueueFailCount() {
if(enqueueCounterFail != null) {
enqueueCounterFail.increment();
}
}

private void replyError(Message<JsonObject> event, String queueName, Throwable ex) {
incrEnqueueFailCount();
String message = "RedisQues QUEUE_ERROR: Error while enqueueing message into queue " + queueName;
log.error(message, new Exception(ex));
JsonObject reply = new JsonObject();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.swisspush.redisques.action;

import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
Expand All @@ -22,10 +23,10 @@ public LockedEnqueueAction(Vertx vertx, RedisProvider redisProvider,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
RedisQuesExceptionFactory exceptionFactory,
QueueStatisticsCollector queueStatisticsCollector, Logger log,
MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent) {
MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent, MeterRegistry meterRegistry) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix,
locksKey, queueConfigurations, exceptionFactory, queueStatisticsCollector, log, memoryUsageProvider,
memoryUsageLimitPercent);
memoryUsageLimitPercent, meterRegistry);
}

@Override
Expand All @@ -34,6 +35,7 @@ public void execute(Message<JsonObject> event) {
String queueName = event.body().getJsonObject(PAYLOAD).getString(QUEUENAME);
if (isMemoryUsageLimitReached()) {
log.warn("Failed to lockedEnqueue into queue {} because the memory usage limit is reached", queueName);
incrEnqueueFailCount();
event.reply(createErrorReply().put(MESSAGE, MEMORY_FULL));
return;
}
Expand All @@ -47,16 +49,18 @@ public void execute(Message<JsonObject> event) {
} else {
log.warn("RedisQues lockedEnqueue locking failed. Skip enqueue",
new Exception(putLockResult.cause()));
incrEnqueueFailCount();
event.reply(createErrorReply());
}
}));
p.onFailure(ex -> {
log.warn("Redis: RedisQues lockedEnqueue locking failed. Skip enqueue",
new Exception(ex));
log.warn("Redis: RedisQues lockedEnqueue locking failed. Skip enqueue", new Exception(ex));
incrEnqueueFailCount();
event.reply(createErrorReply());
});
} else {
log.warn("RedisQues lockedEnqueue failed because property '{}' was missing", REQUESTED_BY);
incrEnqueueFailCount();
event.reply(createErrorReply().put(MESSAGE, "Property '" + REQUESTED_BY + "' missing"));
}
}
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/org/swisspush/redisques/util/MetricMeter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.swisspush.redisques.util;

public enum MetricMeter {

ENQUEUE_SUCCESS("redisques.enqueue.success", "Overall count of queue items to be enqueued sucessfully"),
ENQUEUE_FAIL("redisques.enqueue.fail", "Overall count of queue items which could not be enqueued"),
DEQUEUE("redisques.dequeue", "Overall count of queue items to be dequeued from the queues");

private final String id;
private final String description;

MetricMeter(String id, String description) {
this.id = id;
this.description = description;
}

public String getId() {
return id;
}

public String getDescription() {
return description;
}
}
Loading

0 comments on commit 0f805be

Please sign in to comment.