diff --git a/pom.xml b/pom.xml index 237ad106..96eb3b42 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.2-SNAPSHOT + 4.1.3-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index 71e1534e..f16cc811 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -187,14 +187,6 @@ private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer void attachDequeueStats(GetQueueStatsRequest req, BiConsumer> onDone) { - // Setup a lookup table as we need to find by name further below. - Map detailsByName = new HashMap<>(req.queuesJsonArr.size()); - for (var it = (Iterator) (Object) req.queuesJsonArr.iterator(); it.hasNext(); ) { - JsonObject detailJson = it.next(); - String name = detailJson.getString(MONITOR_QUEUE_NAME); - detailsByName.put(name, detailJson); - } - dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(event -> { for (Queue queue : req.queues) { if (event.containsKey(queue.name)) { @@ -221,6 +213,7 @@ private static class GetQueueStatsRequest { private CTX mCtx; private GetQueueStatsMentor mentor; private List queueNames; + /* TODO: Why is 'queuesJsonArr' never accessed? Isn't this the reason of our class in the first place? */ private JsonArray queuesJsonArr; private List queues; } diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index afb61e95..8fc491b5 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -230,12 +230,9 @@ private enum QueueState { private final Semaphore getQueuesItemsCountRedisRequestQuota; public RedisQues() { - this.exceptionFactory = newThriftyExceptionFactory(); + this(null, null, null, newThriftyExceptionFactory(), new Semaphore(Integer.MAX_VALUE), + new Semaphore(Integer.MAX_VALUE), new Semaphore(Integer.MAX_VALUE), new Semaphore(Integer.MAX_VALUE)); log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE); - this.redisMonitoringReqQuota = new Semaphore(Integer.MAX_VALUE); - this.checkQueueRequestsQuota = new Semaphore(Integer.MAX_VALUE); - this.queueStatsRequestQuota = new Semaphore(Integer.MAX_VALUE); - this.getQueuesItemsCountRedisRequestQuota = new Semaphore(Integer.MAX_VALUE); } public RedisQues( @@ -314,10 +311,6 @@ public void start(Promise promise) { this.configurationProvider = new DefaultRedisquesConfigurationProvider(vertx, config()); } - if (this.dequeueStatisticCollector == null) { - this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx); - } - if (this.periodicSkipScheduler == null) { this.periodicSkipScheduler = new PeriodicSkipScheduler(vertx); } @@ -326,11 +319,14 @@ public void start(Promise promise) { log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration()); int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec(); - if (dequeueStatisticReportIntervalSec > 0) { + if (modConfig.isDequeueStatsEnabled()) { dequeueStatisticEnabled = true; Runnable publisher = newDequeueStatisticPublisher(); vertx.setPeriodic(1000L * dequeueStatisticReportIntervalSec, time -> publisher.run()); } + if (this.dequeueStatisticCollector == null) { + this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx,dequeueStatisticEnabled); + } queuesKey = modConfig.getRedisPrefix() + "queues"; queuesPrefix = modConfig.getRedisPrefix() + "queues:"; consumersPrefix = modConfig.getRedisPrefix() + "consumers:"; diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java index 589b35aa..35c83411 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java @@ -134,10 +134,10 @@ public void handle(AsyncResult handleQueues) { var obj = new JsonObject().put(STATUS, OK).put(QUEUES, result); long jsonCreateDurationMs = currentTimeMillis() - beginEpchMs; if (jsonCreateDurationMs > 10) { - log.info("Creating JSON with {} entries did block this tread for {}ms", + log.info("Creating JSON with {} entries did block this thread for {}ms", ctx.queueLengths.length, jsonCreateDurationMs); }else{ - log.debug("Creating JSON with {} entries did block this tread for {}ms", + log.debug("Creating JSON with {} entries did block this thread for {}ms", ctx.queueLengths.length, jsonCreateDurationMs); } workerPromise.complete(obj); diff --git a/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java b/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java index 5f180186..bc5e1470 100644 --- a/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java +++ b/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.Map; public class DequeueStatisticCollector { @@ -18,9 +19,11 @@ public class DequeueStatisticCollector { private final static String DEQUEUE_STATISTIC_DATA = "dequeueStatisticData"; private final static String DEQUEUE_STATISTIC_LOCK_PREFIX = "dequeueStatisticLock."; private final SharedData sharedData; + private final boolean dequeueStatisticEnabled; - public DequeueStatisticCollector(Vertx vertx) { + public DequeueStatisticCollector(Vertx vertx, boolean dequeueStatisticEnabled) { this.sharedData = vertx.sharedData(); + this.dequeueStatisticEnabled = dequeueStatisticEnabled; } /** @@ -107,6 +110,10 @@ public Future setDequeueStatistic(final String queueName, final DequeueSta } public Future> getAllDequeueStatistics() { + // Check if dequeue statistics are enabled + if (!dequeueStatisticEnabled) { + return Future.succeededFuture(Collections.emptyMap()); // Return an empty map to avoid NullPointerExceptions + } Promise> promise = Promise.promise(); sharedData.getAsyncMap(DEQUEUE_STATISTIC_DATA, (Handler>>) asyncResult -> { if (asyncResult.failed()) { diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java index e44cfc1a..20a11b88 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java @@ -704,6 +704,10 @@ public String toString() { return asJsonObject().toString(); } + public boolean isDequeueStatsEnabled() { + return getDequeueStatisticReportIntervalSec() > 0; + } + /** * RedisquesConfigurationBuilder class for simplified configuration. * diff --git a/src/test/java/org/swisspush/redisques/util/DequeueStatisticCollectorTest.java b/src/test/java/org/swisspush/redisques/util/DequeueStatisticCollectorTest.java new file mode 100644 index 00000000..a9ea3332 --- /dev/null +++ b/src/test/java/org/swisspush/redisques/util/DequeueStatisticCollectorTest.java @@ -0,0 +1,128 @@ +package org.swisspush.redisques.util; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.SharedData; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.*; + +@RunWith(VertxUnitRunner.class) +public class DequeueStatisticCollectorTest { + + private Vertx vertx; + private SharedData sharedData; + private AsyncMap asyncMap; + private DequeueStatisticCollector dequeueStatisticCollectorEnabled; + private DequeueStatisticCollector dequeueStatisticCollectorDisabled; + + @Before + public void setUp() { + vertx = mock(Vertx.class); + sharedData = mock(SharedData.class); + asyncMap = mock(AsyncMap.class); + + // Mock sharedData.getAsyncMap to return asyncMap + doAnswer(invocation -> { + io.vertx.core.Handler>> handler = invocation.getArgument(1); + handler.handle(Future.succeededFuture(asyncMap)); + return null; + }).when(sharedData).getAsyncMap(anyString(), any()); + + when(vertx.sharedData()).thenReturn(sharedData); + + // Initialize DequeueStatisticCollector with enabled/disabled stats collection + dequeueStatisticCollectorEnabled = new DequeueStatisticCollector(vertx, true); + dequeueStatisticCollectorDisabled = new DequeueStatisticCollector(vertx, false); + } + + @Test + public void testGetAllDequeueStatisticsEnabled(TestContext context) { + // Mock asyncMap.entries() to return a non-empty map + Map dequeueStats = new HashMap<>(); + dequeueStats.put("queue1", new DequeueStatistic()); + when(asyncMap.entries()).thenReturn(Future.succeededFuture(dequeueStats)); + + // Test when dequeue statistics are enabled + Async async = context.async(); + dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> { + context.assertTrue(result.succeeded()); + context.assertEquals(1, result.result().size()); + async.complete(); + }); + + // Verify that sharedData and asyncMap were used correctly + verify(sharedData, times(1)).getAsyncMap(anyString(), any()); + verify(asyncMap, times(1)).entries(); + } + + @Test + public void testGetAllDequeueStatisticsDisabled(TestContext context) { + // Test when dequeue statistics are disabled + Async async = context.async(); + dequeueStatisticCollectorDisabled.getAllDequeueStatistics().onComplete(result -> { + context.assertTrue(result.succeeded()); + context.assertTrue(result.result().isEmpty()); + async.complete(); + }); + + // Verify that sharedData and asyncMap were NOT used + verifyNoInteractions(sharedData); + verifyNoInteractions(asyncMap); + } + + @Test + public void testGetAllDequeueStatisticsAsyncMapFailure(TestContext context) { + // Simulate failure in sharedData.getAsyncMap + doAnswer(invocation -> { + io.vertx.core.Handler>> handler = invocation.getArgument(1); + handler.handle(Future.failedFuture(new RuntimeException("Failed to retrieve async map"))); + return null; + }).when(sharedData).getAsyncMap(anyString(), any()); + + // Test when asyncMap retrieval fails + Async async = context.async(); + dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> { + context.assertTrue(result.failed()); + context.assertEquals("Failed to retrieve async map", result.cause().getMessage()); + async.complete(); + }); + + // Verify that sharedData.getAsyncMap was used, but asyncMap.entries() was not + verify(sharedData, times(1)).getAsyncMap(anyString(), any()); + verifyNoInteractions(asyncMap); + } + + @Test + public void testGetAllDequeueStatisticsEntriesFailure(TestContext context) { + // Simulate success in sharedData.getAsyncMap, but failure in asyncMap.entries + doAnswer(invocation -> { + io.vertx.core.Handler>> handler = invocation.getArgument(1); + handler.handle(Future.succeededFuture(asyncMap)); + return null; + }).when(sharedData).getAsyncMap(anyString(), any()); + + when(asyncMap.entries()).thenReturn(Future.failedFuture(new RuntimeException("Failed to retrieve entries"))); + + // Test when asyncMap.entries fails + Async async = context.async(); + dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> { + context.assertTrue(result.failed()); + context.assertEquals("Failed to retrieve entries", result.cause().getMessage()); + async.complete(); + }); + + // Verify that sharedData.getAsyncMap and asyncMap.entries were used correctly + verify(sharedData, times(1)).getAsyncMap(anyString(), any()); + verify(asyncMap, times(1)).entries(); + } +}