Skip to content

Commit

Permalink
Merge pull request #226 from swisspost/develop
Browse files Browse the repository at this point in the history
PR for release
  • Loading branch information
mcweba authored Nov 28, 2024
2 parents 9c282a1 + c682ac5 commit 04dee96
Show file tree
Hide file tree
Showing 15 changed files with 365 additions and 44 deletions.
44 changes: 41 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ The following configuration values are available:
| redisReconnectAttempts | 0 | The amount of attempts to reconnect when redis connection is lost. Use **0** to not reconnect at all or **-1** to reconnect indefinitely. |
| redisReconnectDelaySec | 30 | The interval [s] to attempt to reconnect when redis connection is lost. |
| redisPoolRecycleTimeoutMs | 180000 | The timeout [ms] when the connection pool is recycled. Use **-1** when having reconnect feature enabled. |
| micrometerMetricsEnabled | false | Enable / disable collection of metrics using micrometer |
| httpRequestHandlerEnabled | false | Enable / disable the HTTP API |
| httpRequestHandlerAuthenticationEnabled | false | Enable / disable authentication for the HTTP API |
| httpRequestHandlerUsername | | The username for the HTTP API authentication |
Expand Down Expand Up @@ -630,7 +631,6 @@ Response Data
}
```


#### getQueuesSpeed

Request Data
Expand All @@ -651,7 +651,6 @@ Response Data
}
```


## RedisQues HTTP API
RedisQues provides a HTTP API to modify queues, queue items and get information about queue counts and queue item counts.

Expand Down Expand Up @@ -1021,7 +1020,7 @@ Available url parameters are:
* _filter=<regex pattern>_: Filter the queues for which the cumulated speed is evaluated

The result will be a json object with the speed of the last measurement period calculated
over all queues matching the given filter regex. Additionally the used measurement time in seconds
over all queues matching the given filter regex. Additionally, the used measurement time in seconds
is returned (eg. 60 seconds by default)

```json
Expand All @@ -1040,6 +1039,45 @@ SemaphoreConfig semaphoreConfig = new SemaphoreConfig().setInitialPermits(1).set
hazelcastConfig.getCPSubsystemConfig().addSemaphoreConfig(semaphoreConfig);
```

## Metric collection
Besides the API, redisques provides some key metrics collected by [micrometer.io](https://micrometer.io/).

The collected metrics include:

| Metric name | Description |
|:--------------------------------|:------------------------------------------------------------|
| redisques_enqueue_success_total | Overall count of queue items to be enqueued |
| redisques_enqueue_fail_total | Overall count of failing enqueues |
| redisques_dequeue_total | Overall count of queue items to be dequeued from the queues |
| redisques_active_queues | Overall count of active queues |
| redisques_max_queue_size | Amount of queue items of the biggest queue |

### Testing locally
When you include redisques in you project, you probably already have the configuration for publishing the metrics.

To export the metrics locally you have to add this dependency to the `pom.xml`

```
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>${micrometer.version}</version>
</dependency>
```
Also add the micrometer configuration to `RedisQuesRunner` class like this:

```java
MicrometerMetricsOptions options = new MicrometerMetricsOptions()
.setPrometheusOptions(new VertxPrometheusOptions()
.setStartEmbeddedServer(true)
.setEmbeddedServerOptions(new HttpServerOptions().setPort(9101))
.setEnabled(true))
.setEnabled(true);
Vertx vertx = Vertx.vertx(new VertxOptions().setMetricsOptions(options));
```
Using the configuration above, the metrics can be accessed with

> GET http://localhost:9101/metrics

## Dependencies
Expand Down
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
<version>4.1.6-SNAPSHOT</version>
<version>4.1.7-SNAPSHOT</version>
<name>redisques</name>
<description>
A highly scalable redis-persistent queuing system for vertx
Expand Down Expand Up @@ -65,6 +65,16 @@
<artifactId>vertx-web</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-micrometer-metrics</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-core</artifactId>
<version>${micrometer.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down Expand Up @@ -390,6 +400,7 @@
</profiles>
<properties>
<vertx.version>4.5.2</vertx.version>
<micrometer.version>1.12.13</micrometer.version>
<slf4j.version>2.0.10</slf4j.version>
<mockito.version>5.8.0</mockito.version>
<junit.version>4.13.2</junit.version>
Expand Down
27 changes: 23 additions & 4 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.swisspush.redisques;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonArray;
Expand All @@ -8,15 +10,14 @@
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.DequeueStatisticCollector;
import org.swisspush.redisques.util.MetricMeter;
import org.swisspush.redisques.util.QueueStatisticsCollector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

import static java.lang.Long.compare;
Expand Down Expand Up @@ -51,14 +52,17 @@ public class QueueStatsService {
private final RedisQuesExceptionFactory exceptionFactory;
private final Semaphore incomingRequestQuota;

private final AtomicLong maxQueueSize = new AtomicLong(0);

public QueueStatsService(
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore incomingRequestQuota
Semaphore incomingRequestQuota,
MeterRegistry meterRegistry
) {
this.vertx = vertx;
this.eventBus = eventBus;
Expand All @@ -67,6 +71,12 @@ public QueueStatsService(
this.dequeueStatisticCollector = dequeueStatisticCollector;
this.exceptionFactory = exceptionFactory;
this.incomingRequestQuota = incomingRequestQuota;

if(meterRegistry != null) {
Gauge.builder(MetricMeter.MAX_QUEUE_SIZE.getId(), maxQueueSize, AtomicLong::get).
description(MetricMeter.MAX_QUEUE_SIZE.getDescription()).
register(meterRegistry);
}
}

public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
Expand Down Expand Up @@ -156,10 +166,19 @@ private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsu
int limit = req.mentor.limit(req.mCtx);
if (limit != 0 && queues.size() > limit) queues = queues.subList(0, limit);
req.queues = queues;
collectMaxQueueSize(queues);
onDone.accept(null, req);
});
}

private void collectMaxQueueSize(List<Queue> queues) {
if(queues.isEmpty()) {
maxQueueSize.set(0);
} else {
maxQueueSize.set(queues.get(0).getSize());
}
}

private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
long begGetQueueStatsMs = currentTimeMillis();
assert req.queueNames != null;
Expand Down
56 changes: 52 additions & 4 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.swisspush.redisques;

import com.google.common.base.Strings;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
Expand All @@ -13,6 +15,7 @@
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.micrometer.backends.BackendRegistries;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.Response;
Expand All @@ -21,6 +24,7 @@
import org.swisspush.redisques.action.QueueAction;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.handler.RedisquesHttpRequestHandler;
import org.swisspush.redisques.metrics.PeriodicMetricsCollector;
import org.swisspush.redisques.performance.UpperBoundParallel;
import org.swisspush.redisques.scheduling.PeriodicSkipScheduler;
import org.swisspush.redisques.util.*;
Expand Down Expand Up @@ -84,6 +88,7 @@ public static class RedisQuesBuilder {
private RedisquesConfigurationProvider configurationProvider;
private RedisProvider redisProvider;
private RedisQuesExceptionFactory exceptionFactory;
private MeterRegistry meterRegistry;
private Semaphore redisMonitoringReqQuota;
private Semaphore checkQueueRequestsQuota;
private Semaphore queueStatsRequestQuota;
Expand Down Expand Up @@ -113,6 +118,11 @@ public RedisQuesBuilder withExceptionFactory(RedisQuesExceptionFactory exception
return this;
}

public RedisQuesBuilder withMeterRegistry(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
return this;
}

/**
* How many redis requests monitoring related component will trigger
* simultaneously. One of those components for example is
Expand Down Expand Up @@ -172,7 +182,7 @@ public RedisQues build() {
}
return new RedisQues(memoryUsageProvider, configurationProvider, redisProvider, exceptionFactory,
redisMonitoringReqQuota, checkQueueRequestsQuota, queueStatsRequestQuota,
getQueuesItemsCountRedisRequestQuota);
getQueuesItemsCountRedisRequestQuota, meterRegistry);
}
}

Expand Down Expand Up @@ -218,6 +228,9 @@ private enum QueueState {
private RedisquesConfigurationProvider configurationProvider;
private RedisMonitor redisMonitor;

private MeterRegistry meterRegistry;
private Counter dequeueCounter;

private Map<QueueOperation, QueueAction> queueActions = new HashMap<>();

private Map<String, DequeueStatistic> dequeueStatistic = new ConcurrentHashMap<>();
Expand All @@ -235,6 +248,20 @@ public RedisQues() {
log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE);
}

public RedisQues(
MemoryUsageProvider memoryUsageProvider,
RedisquesConfigurationProvider configurationProvider,
RedisProvider redisProvider,
RedisQuesExceptionFactory exceptionFactory,
Semaphore redisMonitoringReqQuota,
Semaphore checkQueueRequestsQuota,
Semaphore queueStatsRequestQuota,
Semaphore getQueuesItemsCountRedisRequestQuota
) {
this(memoryUsageProvider, configurationProvider, redisProvider, exceptionFactory, redisMonitoringReqQuota,
checkQueueRequestsQuota, queueStatsRequestQuota, getQueuesItemsCountRedisRequestQuota, null);
}

public RedisQues(
MemoryUsageProvider memoryUsageProvider,
RedisquesConfigurationProvider configurationProvider,
Expand All @@ -243,7 +270,8 @@ public RedisQues(
Semaphore redisMonitoringReqQuota,
Semaphore checkQueueRequestsQuota,
Semaphore queueStatsRequestQuota,
Semaphore getQueuesItemsCountRedisRequestQuota
Semaphore getQueuesItemsCountRedisRequestQuota,
MeterRegistry meterRegistry
) {
this.memoryUsageProvider = memoryUsageProvider;
this.configurationProvider = configurationProvider;
Expand All @@ -253,6 +281,7 @@ public RedisQues(
this.checkQueueRequestsQuota = checkQueueRequestsQuota;
this.queueStatsRequestQuota = queueStatsRequestQuota;
this.getQueuesItemsCountRedisRequestQuota = getQueuesItemsCountRedisRequestQuota;
this.meterRegistry = meterRegistry;
}

public static RedisQuesBuilder builder() {
Expand Down Expand Up @@ -318,6 +347,10 @@ public void start(Promise<Void> promise) {
RedisquesConfiguration modConfig = configurationProvider.configuration();
log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration());

if(configurationProvider.configuration().getMicrometerMetricsEnabled()) {
initMicrometerMetrics(modConfig);
}

int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec();
if (modConfig.isDequeueStatsEnabled()) {
dequeueStatisticEnabled = true;
Expand Down Expand Up @@ -351,6 +384,18 @@ public void start(Promise<Void> promise) {
});
}

private void initMicrometerMetrics(RedisquesConfiguration modConfig) {
if(meterRegistry == null) {
meterRegistry = BackendRegistries.getDefaultNow();
}
dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId())
.description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry);

String address = modConfig.getAddress();
int metricRefreshPeriod = modConfig.getMetricRefreshPeriod();
new PeriodicMetricsCollector(vertx, periodicSkipScheduler, address, meterRegistry, metricRefreshPeriod);
}

private void initialize() {
RedisquesConfiguration configuration = configurationProvider.configuration();
this.queueStatisticsCollector = new QueueStatisticsCollector(
Expand All @@ -359,7 +404,7 @@ private void initialize() {

RedisquesHttpRequestHandler.init(
vertx, configuration, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota);
exceptionFactory, queueStatsRequestQuota, meterRegistry);

// only initialize memoryUsageProvider when not provided in the constructor
if (memoryUsageProvider == null) {
Expand All @@ -371,7 +416,7 @@ private void initialize() {
queueActionFactory = new QueueActionFactory(
redisProvider, vertx, log, queuesKey, queuesPrefix, consumersPrefix, locksKey,
memoryUsageProvider, queueStatisticsCollector, exceptionFactory,
configurationProvider, getQueuesItemsCountRedisRequestQuota);
configurationProvider, getQueuesItemsCountRedisRequestQuota, meterRegistry);

queueActions.put(addQueueItem, queueActionFactory.buildQueueAction(addQueueItem));
queueActions.put(deleteQueueItem, queueActionFactory.buildQueueAction(deleteQueueItem));
Expand Down Expand Up @@ -950,6 +995,9 @@ private Future<Void> readQueue(final String queueName) {
log.error("Failed to pop from queue '{}'", queueName, jsonAnswer.cause());
// We should return here. See: "https://softwareengineering.stackexchange.com/a/190535"
}
if(dequeueCounter != null) {
dequeueCounter.increment();
}
log.debug("RedisQues Message removed, queue {} is ready again", queueName);
myQueues.put(queueName, QueueState.READY);

Expand Down
Loading

0 comments on commit 04dee96

Please sign in to comment.