Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR for release #229

Merged
merged 8 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ The following configuration values are available:
| redisReconnectDelaySec | 30 | The interval [s] to attempt to reconnect when redis connection is lost. |
| redisPoolRecycleTimeoutMs | 180000 | The timeout [ms] when the connection pool is recycled. Use **-1** when having reconnect feature enabled. |
| micrometerMetricsEnabled | false | Enable / disable collection of metrics using micrometer |
| micrometerMetricsIdentifier | default | Identifier to track values from multiple redisques instances |
| httpRequestHandlerEnabled | false | Enable / disable the HTTP API |
| httpRequestHandlerAuthenticationEnabled | false | Enable / disable authentication for the HTTP API |
| httpRequestHandlerUsername | | The username for the HTTP API authentication |
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
<version>4.1.8-SNAPSHOT</version>
<version>4.1.9-SNAPSHOT</version>
<name>redisques</name>
<description>
A highly scalable redis-persistent queuing system for vertx
Expand Down
94 changes: 60 additions & 34 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.DequeueStatisticCollector;
import org.swisspush.redisques.util.MetricMeter;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.*;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -55,14 +52,15 @@ public class QueueStatsService {
private final AtomicLong maxQueueSize = new AtomicLong(0);

public QueueStatsService(
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore incomingRequestQuota,
MeterRegistry meterRegistry
Vertx vertx,
EventBus eventBus,
String redisquesAddress,
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore incomingRequestQuota,
MeterRegistry meterRegistry,
String metricsIdentifier
) {
this.vertx = vertx;
this.eventBus = eventBus;
Expand All @@ -72,9 +70,10 @@ public QueueStatsService(
this.exceptionFactory = exceptionFactory;
this.incomingRequestQuota = incomingRequestQuota;

if(meterRegistry != null) {
if (meterRegistry != null) {
Gauge.builder(MetricMeter.MAX_QUEUE_SIZE.getId(), maxQueueSize, AtomicLong::get).
description(MetricMeter.MAX_QUEUE_SIZE.getDescription()).
description(MetricMeter.MAX_QUEUE_SIZE.getDescription())
.tag(MetricTags.IDENTIFIER.getId(), metricsIdentifier).
register(meterRegistry);
}
}
Expand All @@ -101,14 +100,23 @@ public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
req0.mCtx = mCtx;
req0.mentor = mentor;
fetchQueueNamesAndSize(req0, (ex1, req1) -> {
if (ex1 != null) { onDone.accept(ex1, null); return; }
if (ex1 != null) {
onDone.accept(ex1, null);
return;
}
// Prepare a list of queue names as it is needed to fetch retryDetails.
req1.queueNames = new ArrayList<>(req1.queues.size());
for (Queue q : req1.queues) req1.queueNames.add(q.name);
fetchRetryDetails(req1, (ex2, req2) -> {
if (ex2 != null) { onDone.accept(ex2, null); return; }
if (ex2 != null) {
onDone.accept(ex2, null);
return;
}
attachDequeueStats(req2, (ex3, req3) -> {
if (ex3 != null) { onDone.accept(ex3, null); return; }
if (ex3 != null) {
onDone.accept(ex3, null);
return;
}
onDone.accept(null, req3.queues);
});
});
Expand Down Expand Up @@ -172,7 +180,7 @@ private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsu
}

private void collectMaxQueueSize(List<Queue> queues) {
if(queues.isEmpty()) {
if (queues.isEmpty()) {
maxQueueSize.set(0);
} else {
maxQueueSize.set(queues.get(0).getSize());
Expand All @@ -182,7 +190,7 @@ private void collectMaxQueueSize(List<Queue> queues) {
private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
long begGetQueueStatsMs = currentTimeMillis();
assert req.queueNames != null;
queueStatisticsCollector.getQueueStatistics(req.queueNames).onComplete( ev -> {
queueStatisticsCollector.getQueueStatistics(req.queueNames).onComplete(ev -> {
req.queueNames = null; // <- no longer needed
long durGetQueueStatsMs = currentTimeMillis() - begGetQueueStatsMs;
log.debug("queueStatisticsCollector.getQueueStatistics() took {}ms", durGetQueueStatsMs);
Expand Down Expand Up @@ -244,44 +252,62 @@ public static class Queue {
private Long lastDequeueAttemptEpochMs;
private Long lastDequeueSuccessEpochMs;
private Long nextDequeueDueTimestampEpochMs;
private Queue(String name, long size){

private Queue(String name, long size) {
assert name != null;
this.name = name;
this.size = size;
}

public String getName() { return name; }
public long getSize() { return size; }
public Long getLastDequeueAttemptEpochMs() { return lastDequeueAttemptEpochMs; }
public Long getLastDequeueSuccessEpochMs() { return lastDequeueSuccessEpochMs; }
public Long getNextDequeueDueTimestampEpochMs() { return nextDequeueDueTimestampEpochMs; }
public String getName() {
return name;
}

public long getSize() {
return size;
}

public Long getLastDequeueAttemptEpochMs() {
return lastDequeueAttemptEpochMs;
}

public Long getLastDequeueSuccessEpochMs() {
return lastDequeueSuccessEpochMs;
}

public Long getNextDequeueDueTimestampEpochMs() {
return nextDequeueDueTimestampEpochMs;
}
}


/**
* <p>Mentors fetching operations and so provides the fetcher the required
* information. Finally it also receives the operations result.</p>
*
* @param <CTX>
* The context object of choice handled back to each callback so the mentor
* knows about what request the fetcher is talking.
* @param <CTX> The context object of choice handled back to each callback so the mentor
* knows about what request the fetcher is talking.
*/
public static interface GetQueueStatsMentor<CTX> {

/**
* <p>Returning true means that all queues will be present in the result. If
* false, empty queues won't show up the result.</p>
*
* @param ctx See {@link GetQueueStatsMentor}.
* @param ctx See {@link GetQueueStatsMentor}.
*/
public boolean includeEmptyQueues( CTX ctx );
public boolean includeEmptyQueues(CTX ctx);

/** <p>Limits the result to the largest N queues.</p> */
public int limit( CTX ctx );
/**
* <p>Limits the result to the largest N queues.</p>
*/
public int limit(CTX ctx);

public String filter( CTX ctx);
public String filter(CTX ctx);

/** <p>Called ONCE with the final result.</p> */
/**
* <p>Called ONCE with the final result.</p>
*/
public void onQueueStatistics(List<Queue> queues, CTX ctx);

/**
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -388,8 +388,9 @@ private void initMicrometerMetrics(RedisquesConfiguration modConfig) {
if(meterRegistry == null) {
meterRegistry = BackendRegistries.getDefaultNow();
}
String metricsIdentifier = modConfig.getMicrometerMetricsIdentifier();
dequeueCounter = Counter.builder(MetricMeter.DEQUEUE.getId())
.description(MetricMeter.DEQUEUE.getDescription()).register(meterRegistry);
.description(MetricMeter.DEQUEUE.getDescription()).tag(MetricTags.IDENTIFIER.getId(), metricsIdentifier).register(meterRegistry);

String address = modConfig.getAddress();
int metricRefreshPeriod = modConfig.getMetricRefreshPeriod();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,18 @@ public EnqueueAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log,
MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent, MeterRegistry meterRegistry
MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent, MeterRegistry meterRegistry, String metricsIdentifier
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, exceptionFactory, queueStatisticsCollector, log);
this.memoryUsageProvider = memoryUsageProvider;
this.memoryUsageLimitPercent = memoryUsageLimitPercent;

if(meterRegistry != null) {
enqueueCounterSuccess = Counter.builder(MetricMeter.ENQUEUE_SUCCESS.getId()).description(MetricMeter.ENQUEUE_SUCCESS.getDescription()).register(meterRegistry);
enqueueCounterFail = Counter.builder(MetricMeter.ENQUEUE_FAIL.getId()).description(MetricMeter.ENQUEUE_FAIL.getDescription()).register(meterRegistry);
enqueueCounterSuccess = Counter.builder(MetricMeter.ENQUEUE_SUCCESS.getId()).description(MetricMeter.ENQUEUE_SUCCESS.getDescription())
.tag(MetricTags.IDENTIFIER.getId(), metricsIdentifier).register(meterRegistry);
enqueueCounterFail = Counter.builder(MetricMeter.ENQUEUE_FAIL.getId()).description(MetricMeter.ENQUEUE_FAIL.getDescription())
.tag(MetricTags.IDENTIFIER.getId(), metricsIdentifier).register(meterRegistry);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ public LockedEnqueueAction(Vertx vertx, RedisProvider redisProvider,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
RedisQuesExceptionFactory exceptionFactory,
QueueStatisticsCollector queueStatisticsCollector, Logger log,
MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent, MeterRegistry meterRegistry) {
MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent, MeterRegistry meterRegistry,
String metricsIdentifier) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix,
locksKey, queueConfigurations, exceptionFactory, queueStatisticsCollector, log, memoryUsageProvider,
memoryUsageLimitPercent, meterRegistry);
memoryUsageLimitPercent, meterRegistry, metricsIdentifier);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ private RedisquesHttpRequestHandler(
this.exceptionFactory = exceptionFactory;
this.queueStatsService = new QueueStatsService(
vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota, meterRegistry);
exceptionFactory, queueStatsRequestQuota, meterRegistry, modConfig.getMicrometerMetricsIdentifier());

final String prefix = modConfig.getHttpRequestHandlerPrefix();

Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/swisspush/redisques/util/MetricTags.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.swisspush.redisques.util;

public enum MetricTags {

IDENTIFIER("identifier");

private final String id;

MetricTags(String id) {
this.id = id;
}

public String getId() {
return id;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class QueueActionFactory {
private final QueueStatisticsCollector queueStatisticsCollector;
private final int memoryUsageLimitPercent;
private final MeterRegistry meterRegistry;
private final String metricsIdentifier;
private final MemoryUsageProvider memoryUsageProvider;
private final RedisQuesExceptionFactory exceptionFactory;
private final Semaphore getQueuesItemsCountRedisRequestQuota;
Expand Down Expand Up @@ -60,6 +61,8 @@ public QueueActionFactory(
this.memoryUsageLimitPercent = configurationProvider.configuration().getMemoryUsageLimitPercent();
this.getQueuesItemsCountRedisRequestQuota = getQueuesItemsCountRedisRequestQuota;
this.meterRegistry = meterRegistry;

metricsIdentifier = configurationProvider.configuration().getMicrometerMetricsIdentifier();
}

public QueueAction buildQueueAction(RedisquesAPI.QueueOperation queueOperation){
Expand Down Expand Up @@ -101,11 +104,11 @@ public QueueAction buildQueueAction(RedisquesAPI.QueueOperation queueOperation){
case enqueue:
return new EnqueueAction(vertx, redisProvider, address, queuesKey, queuesPrefix,
consumersPrefix, locksKey, queueConfigurations, exceptionFactory, queueStatisticsCollector, log,
memoryUsageProvider, memoryUsageLimitPercent, meterRegistry);
memoryUsageProvider, memoryUsageLimitPercent, meterRegistry, metricsIdentifier);
case lockedEnqueue:
return new LockedEnqueueAction(vertx, redisProvider, address, queuesKey, queuesPrefix,
consumersPrefix, locksKey, queueConfigurations, exceptionFactory, queueStatisticsCollector, log,
memoryUsageProvider, memoryUsageLimitPercent, meterRegistry);
memoryUsageProvider, memoryUsageLimitPercent, meterRegistry, metricsIdentifier);
case getLock:
return new GetLockAction(vertx, redisProvider, address, queuesKey, queuesPrefix,
consumersPrefix, locksKey, queueConfigurations, exceptionFactory, queueStatisticsCollector, log);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class RedisquesConfiguration {
private static final int DEFAULT_PROCESSOR_TIMEOUT_MS = 240000; // 240s
private static final int DEFAULT_METRIC_REFRESH_PERIOD_S = 10; // 10s
private static final String DEFAULT_METRIC_STORAGE_NAME = "queue";
private static final String DEFAULT_MICROMETER_METRICS_IDENTIFIER = "default";
private static final long DEFAULT_PROCESSOR_DELAY_MAX = 0;
private static final int DEFAULT_REDIS_MAX_POOL_SIZE = 200;
private static final int DEFAULT_REDIS_RECONNECT_ATTEMPTS = 0;
Expand Down Expand Up @@ -277,6 +278,13 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres
this.maxPipelineWaitSize = maxPipelineWaitSize;
Logger log = LoggerFactory.getLogger(RedisquesConfiguration.class);

String maybeEmptyMicrometerMetricsIdentifier = Strings.nullToEmpty(micrometerMetricsIdentifier).trim();
if(Strings.isNullOrEmpty(maybeEmptyMicrometerMetricsIdentifier)) {
this.micrometerMetricsIdentifier = DEFAULT_MICROMETER_METRICS_IDENTIFIER;
} else {
this.micrometerMetricsIdentifier = maybeEmptyMicrometerMetricsIdentifier;
}

String maybeEmptyMetricStorageName = Strings.nullToEmpty(metricStorageName).trim();
if(Strings.isNullOrEmpty(maybeEmptyMetricStorageName)) {
this.metricStorageName = DEFAULT_METRIC_STORAGE_NAME;
Expand Down Expand Up @@ -319,7 +327,6 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres
this.httpRequestHandlerEnabled = httpRequestHandlerEnabled;
this.httpRequestHandlerAuthenticationEnabled = httpRequestHandlerAuthenticationEnabled;
this.micrometerMetricsEnabled = micrometerMetricsEnabled;
this.micrometerMetricsIdentifier = micrometerMetricsIdentifier;
this.httpRequestHandlerPrefix = httpRequestHandlerPrefix;
this.httpRequestHandlerUsername = httpRequestHandlerUsername;
this.httpRequestHandlerPassword = httpRequestHandlerPassword;
Expand Down
9 changes: 6 additions & 3 deletions src/test/java/org/swisspush/redisques/RedisQuesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class RedisQuesTest extends AbstractTestCase {
private Counter enqueueCounterFail;
private Counter dequeueCounter;

private final String metricsIdentifier = "foo";

@Rule
public Timeout rule = Timeout.seconds(50);

Expand All @@ -47,6 +49,7 @@ public void deployRedisques(TestContext context) {
JsonObject config = RedisquesConfiguration.with()
.processorAddress(PROCESSOR_ADDRESS)
.micrometerMetricsEnabled(true)
.micrometerMetricsIdentifier(metricsIdentifier)
.refreshPeriod(2)
.publishMetricsAddress("my-metrics-eb-address")
.metricStorageName("foobar")
Expand All @@ -61,9 +64,9 @@ public void deployRedisques(TestContext context) {
.asJsonObject();

MeterRegistry meterRegistry = new SimpleMeterRegistry();
enqueueCounterSuccess = meterRegistry.counter(MetricMeter.ENQUEUE_SUCCESS.getId());
enqueueCounterFail = meterRegistry.counter(MetricMeter.ENQUEUE_FAIL.getId());
dequeueCounter = meterRegistry.counter(MetricMeter.DEQUEUE.getId());
enqueueCounterSuccess = meterRegistry.counter(MetricMeter.ENQUEUE_SUCCESS.getId(), MetricTags.IDENTIFIER.getId(), metricsIdentifier);
enqueueCounterFail = meterRegistry.counter(MetricMeter.ENQUEUE_FAIL.getId(), MetricTags.IDENTIFIER.getId(), metricsIdentifier);
dequeueCounter = meterRegistry.counter(MetricMeter.DEQUEUE.getId(), MetricTags.IDENTIFIER.getId(), metricsIdentifier);

memoryUsageProvider = new TestMemoryUsageProvider(Optional.of(50));
redisQues = RedisQues.builder()
Expand Down
Loading