From 783539b72115c1395582a507e612945964b791b3 Mon Sep 17 00:00:00 2001 From: runner Date: Fri, 12 Jul 2024 09:37:00 +0000 Subject: [PATCH 01/14] updating poms for branch'release-4.1.2' with non-snapshot versions --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 237ad106..1132e0af 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.2-SNAPSHOT + 4.1.2 redisques A highly scalable redis-persistent queuing system for vertx From 3ad042f0d1ea00b6c7d1f2ce021c2c1b0a0390ed Mon Sep 17 00:00:00 2001 From: runner Date: Fri, 12 Jul 2024 09:37:00 +0000 Subject: [PATCH 02/14] updating poms for 4.1.3-SNAPSHOT development --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 237ad106..96eb3b42 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.2-SNAPSHOT + 4.1.3-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx From ee41ba8672acf7cc4c4c906b0365b036f5d6f85a Mon Sep 17 00:00:00 2001 From: runner Date: Fri, 12 Jul 2024 09:39:36 +0000 Subject: [PATCH 03/14] updating develop poms to master versions to avoid merge conflicts --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 96eb3b42..1132e0af 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.3-SNAPSHOT + 4.1.2 redisques A highly scalable redis-persistent queuing system for vertx From 19d788a67bf785e3654e12dd02e221691c8b346e Mon Sep 17 00:00:00 2001 From: runner Date: Fri, 12 Jul 2024 09:39:36 +0000 Subject: [PATCH 04/14] Updating develop poms back to pre merge state --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 1132e0af..96eb3b42 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.2 + 4.1.3-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx From 974a287ded92501a0062d73528ad5dfb235fc097 Mon Sep 17 00:00:00 2001 From: "Andreas Fankhauser hiddenalpha.ch" <23085769+hiddenalpha@users.noreply.github.com> Date: Mon, 15 Jul 2024 10:12:08 +0200 Subject: [PATCH 05/14] [SDCISA-16207] Fix typo in log message. --- .../redisques/handler/GetQueuesItemsCountHandler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java index 589b35aa..35c83411 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java @@ -134,10 +134,10 @@ public void handle(AsyncResult handleQueues) { var obj = new JsonObject().put(STATUS, OK).put(QUEUES, result); long jsonCreateDurationMs = currentTimeMillis() - beginEpchMs; if (jsonCreateDurationMs > 10) { - log.info("Creating JSON with {} entries did block this tread for {}ms", + log.info("Creating JSON with {} entries did block this thread for {}ms", ctx.queueLengths.length, jsonCreateDurationMs); }else{ - log.debug("Creating JSON with {} entries did block this tread for {}ms", + log.debug("Creating JSON with {} entries did block this thread for {}ms", ctx.queueLengths.length, jsonCreateDurationMs); } workerPromise.complete(obj); From e887b3d2e4a90d742cc5e18a41cf5b2c6a3fd91f Mon Sep 17 00:00:00 2001 From: "Andreas Fankhauser hiddenalpha.ch" <23085769+hiddenalpha@users.noreply.github.com> Date: Mon, 15 Jul 2024 10:19:40 +0200 Subject: [PATCH 06/14] [SDCISA-16207] Cleanup dead code. --- .../java/org/swisspush/redisques/QueueStatsService.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index 71e1534e..f16cc811 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -187,14 +187,6 @@ private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer void attachDequeueStats(GetQueueStatsRequest req, BiConsumer> onDone) { - // Setup a lookup table as we need to find by name further below. - Map detailsByName = new HashMap<>(req.queuesJsonArr.size()); - for (var it = (Iterator) (Object) req.queuesJsonArr.iterator(); it.hasNext(); ) { - JsonObject detailJson = it.next(); - String name = detailJson.getString(MONITOR_QUEUE_NAME); - detailsByName.put(name, detailJson); - } - dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(event -> { for (Queue queue : req.queues) { if (event.containsKey(queue.name)) { @@ -221,6 +213,7 @@ private static class GetQueueStatsRequest { private CTX mCtx; private GetQueueStatsMentor mentor; private List queueNames; + /* TODO: Why is 'queuesJsonArr' never accessed? Isn't this the reason of our class in the first place? */ private JsonArray queuesJsonArr; private List queues; } From 2218daeb7469218e887997a8d9599b531800f76f Mon Sep 17 00:00:00 2001 From: "Andreas Fankhauser hiddenalpha.ch" <23085769+hiddenalpha@users.noreply.github.com> Date: Mon, 15 Jul 2024 10:31:12 +0200 Subject: [PATCH 07/14] [SDCISA-16207] Reduce duplication by delegating to already existing ctor. --- src/main/java/org/swisspush/redisques/RedisQues.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index afb61e95..bf2318d2 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -230,12 +230,9 @@ private enum QueueState { private final Semaphore getQueuesItemsCountRedisRequestQuota; public RedisQues() { - this.exceptionFactory = newThriftyExceptionFactory(); + this(null, null, null, newThriftyExceptionFactory(), new Semaphore(Integer.MAX_VALUE), + new Semaphore(Integer.MAX_VALUE), new Semaphore(Integer.MAX_VALUE), new Semaphore(Integer.MAX_VALUE)); log.warn("Fallback to legacy behavior and allow up to {} simultaneous requests to redis", Integer.MAX_VALUE); - this.redisMonitoringReqQuota = new Semaphore(Integer.MAX_VALUE); - this.checkQueueRequestsQuota = new Semaphore(Integer.MAX_VALUE); - this.queueStatsRequestQuota = new Semaphore(Integer.MAX_VALUE); - this.getQueuesItemsCountRedisRequestQuota = new Semaphore(Integer.MAX_VALUE); } public RedisQues( From c5a4c194618f78b7ccfb9c09c45a278b26960b55 Mon Sep 17 00:00:00 2001 From: almeidast Date: Mon, 9 Sep 2024 12:30:38 +0100 Subject: [PATCH 08/14] Verify the config RedisquesConfiguration if the parameter getDequeueStatisticReportIntervalSec is active to process attachDequeueStats. --- .../redisques/QueueStatsService.java | 28 +++++++++++-------- .../handler/RedisquesHttpRequestHandler.java | 2 +- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index f16cc811..34856255 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -9,6 +9,7 @@ import org.swisspush.redisques.util.DequeueStatistic; import org.swisspush.redisques.util.DequeueStatisticCollector; import org.swisspush.redisques.util.QueueStatisticsCollector; +import org.swisspush.redisques.util.RedisquesConfiguration; import java.util.ArrayList; import java.util.HashMap; @@ -50,15 +51,17 @@ public class QueueStatsService { private final DequeueStatisticCollector dequeueStatisticCollector; private final RedisQuesExceptionFactory exceptionFactory; private final Semaphore incomingRequestQuota; + private final RedisquesConfiguration modConfig; public QueueStatsService( - Vertx vertx, - EventBus eventBus, - String redisquesAddress, - QueueStatisticsCollector queueStatisticsCollector, - DequeueStatisticCollector dequeueStatisticCollector, - RedisQuesExceptionFactory exceptionFactory, - Semaphore incomingRequestQuota + Vertx vertx, + EventBus eventBus, + String redisquesAddress, + QueueStatisticsCollector queueStatisticsCollector, + DequeueStatisticCollector dequeueStatisticCollector, + RedisQuesExceptionFactory exceptionFactory, + Semaphore incomingRequestQuota, + RedisquesConfiguration modConfig ) { this.vertx = vertx; this.eventBus = eventBus; @@ -67,6 +70,7 @@ public QueueStatsService( this.dequeueStatisticCollector = dequeueStatisticCollector; this.exceptionFactory = exceptionFactory; this.incomingRequestQuota = incomingRequestQuota; + this.modConfig = modConfig; } public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { @@ -97,10 +101,12 @@ public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { for (Queue q : req1.queues) req1.queueNames.add(q.name); fetchRetryDetails(req1, (ex2, req2) -> { if (ex2 != null) { onDone.accept(ex2, null); return; } - attachDequeueStats(req2, (ex3, req3) -> { - if (ex3 != null) { onDone.accept(ex3, null); return; } - onDone.accept(null, req3.queues); - }); + if (modConfig.getDequeueStatisticReportIntervalSec() > 0){ + attachDequeueStats(req2, (ex3, req3) -> { + if (ex3 != null) { onDone.accept(ex3, null); return; } + onDone.accept(null, req3.queues); + }); + } }); }); } catch (Exception ex) { diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index a5af7a1f..d19c6c98 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -138,7 +138,7 @@ private RedisquesHttpRequestHandler( this.exceptionFactory = exceptionFactory; this.queueStatsService = new QueueStatsService( vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector, - exceptionFactory, queueStatsRequestQuota); + exceptionFactory, queueStatsRequestQuota,modConfig); final String prefix = modConfig.getHttpRequestHandlerPrefix(); From e0fbb6f74b32db3c51697e28e8c53475af41cd5a Mon Sep 17 00:00:00 2001 From: almeidast Date: Tue, 10 Sep 2024 16:43:59 +0100 Subject: [PATCH 09/14] Create tests to check when RedisquesConfiguration if the parameter getDequeueStatisticReportIntervalSec is active to process attachDequeueStats. --- .../redisques/QueueStatsService.java | 12 +- .../redisques/QueueStatsServiceTest.java | 191 ++++++++++++++++++ 2 files changed, 197 insertions(+), 6 deletions(-) create mode 100644 src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index 34856255..f8d2da2d 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -119,7 +119,7 @@ public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { } } - private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer> onDone) { + void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer> onDone) { String filter = req.mentor.filter(req.mCtx); JsonObject operation = buildGetQueuesItemsCountOperation(filter); eventBus.request(redisquesAddress, operation, ev -> { @@ -166,7 +166,7 @@ private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsu }); } - private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer> onDone) { + void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer> onDone) { long begGetQueueStatsMs = currentTimeMillis(); assert req.queueNames != null; queueStatisticsCollector.getQueueStatistics(req.queueNames).onComplete( ev -> { @@ -192,7 +192,7 @@ private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer void attachDequeueStats(GetQueueStatsRequest req, BiConsumer> onDone) { + void attachDequeueStats(GetQueueStatsRequest req, BiConsumer> onDone) { dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(event -> { for (Queue queue : req.queues) { if (event.containsKey(queue.name)) { @@ -215,13 +215,13 @@ private int compareLargestFirst(Queue a, Queue b) { } - private static class GetQueueStatsRequest { + static class GetQueueStatsRequest { private CTX mCtx; private GetQueueStatsMentor mentor; private List queueNames; /* TODO: Why is 'queuesJsonArr' never accessed? Isn't this the reason of our class in the first place? */ private JsonArray queuesJsonArr; - private List queues; + List queues; } @@ -231,7 +231,7 @@ public static class Queue { private Long lastDequeueAttemptEpochMs; private Long lastDequeueSuccessEpochMs; private Long nextDequeueDueTimestampEpochMs; - private Queue(String name, long size){ + Queue(String name, long size){ assert name != null; this.name = name; this.size = size; diff --git a/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java b/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java new file mode 100644 index 00000000..77223e6f --- /dev/null +++ b/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java @@ -0,0 +1,191 @@ +package org.swisspush.redisques; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.EventBus; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.swisspush.redisques.util.DequeueStatistic; +import org.swisspush.redisques.util.DequeueStatisticCollector; +import org.swisspush.redisques.util.QueueStatisticsCollector; +import org.swisspush.redisques.util.RedisquesConfiguration; +import org.swisspush.redisques.exception.RedisQuesExceptionFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.concurrent.Semaphore; +import java.util.function.BiConsumer; + +@RunWith(VertxUnitRunner.class) +public class QueueStatsServiceTest { + + private Vertx vertx; + private EventBus eventBus; + private QueueStatsService queueStatsService; + private RedisquesConfiguration config; + private DequeueStatisticCollector dequeueStatisticCollector; + + @Before + public void setUp() { + // Initialize Vertx and mock dependencies + vertx = Vertx.vertx(); + eventBus = Mockito.mock(EventBus.class); + QueueStatisticsCollector queueStatisticsCollector = Mockito.mock(QueueStatisticsCollector.class); + dequeueStatisticCollector = Mockito.mock(DequeueStatisticCollector.class); + RedisQuesExceptionFactory exceptionFactory = Mockito.mock(RedisQuesExceptionFactory.class); + Semaphore semaphore = new Semaphore(1); + config = Mockito.spy(new RedisquesConfiguration()); + + // Create the QueueStatsService instance with mocked dependencies + queueStatsService = Mockito.spy(new QueueStatsService(vertx, eventBus, "redisques", queueStatisticsCollector, + dequeueStatisticCollector, exceptionFactory, semaphore, config)); + } + + @Test + public void testDequeueStatsCalledWhenIntervalGreaterThanZero(TestContext testContext) { + // Mock the config so that getDequeueStatisticReportIntervalSec returns a value greater than 0 + Mockito.when(config.getDequeueStatisticReportIntervalSec()).thenReturn(10); // Value greater than 0 + + // Mock the getAllDequeueStatistics method to return a successful Future + HashMap dequeueStatistics = new HashMap<>(); + dequeueStatistics.put("testQueue", new DequeueStatistic()); + Mockito.when(dequeueStatisticCollector.getAllDequeueStatistics()).thenReturn(Future.succeededFuture(dequeueStatistics)); + + // Set up async for the test + Async async = testContext.async(); + + // Mock a context and mentor + Object mockContext = new Object(); + QueueStatsService.GetQueueStatsMentor mentor = new QueueStatsService.GetQueueStatsMentor<>() { + @Override + public boolean includeEmptyQueues(Object ctx) { + return true; + } + + @Override + public int limit(Object ctx) { + return 10; + } + + @Override + public String filter(Object ctx) { + return "*"; + } + + @Override + public void onQueueStatistics(java.util.List queues, Object ctx) { + // Assert the dequeue stats were attached + testContext.assertTrue(queues.get(0).getSize() >0); + async.complete(); + } + + @Override + public void onError(Throwable ex, Object ctx) { + testContext.fail(ex); + } + }; + + // Mock fetchQueueNamesAndSize to return a valid list of queues + Mockito.doAnswer(invocation -> { + BiConsumer> callback = invocation.getArgument(1); + QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); + request.queues = Collections.singletonList(new QueueStatsService.Queue("testQueue", 1)); + callback.accept(null, request); + return null; + }).when(queueStatsService).fetchQueueNamesAndSize(Mockito.any(), Mockito.any()); + + // Mock fetchRetryDetails to do nothing and call the next step + Mockito.doAnswer(invocation -> { + BiConsumer> callback = invocation.getArgument(1); + QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); + callback.accept(null, request); + return null; + }).when(queueStatsService).fetchRetryDetails(Mockito.any(), Mockito.any()); + + // Call getQueueStats to trigger the flow + queueStatsService.getQueueStats(mockContext, mentor); + + // Use ArgumentCaptor to capture the arguments passed to attachDequeueStats + ArgumentCaptor> requestCaptor = ArgumentCaptor.forClass(QueueStatsService.GetQueueStatsRequest.class); + ArgumentCaptor>> consumerCaptor = ArgumentCaptor.forClass(BiConsumer.class); + + // Verify that attachDequeueStats was called with the correct arguments + Mockito.verify(queueStatsService).attachDequeueStats(requestCaptor.capture(), consumerCaptor.capture()); + + // Assert that the arguments are not null and match expectations + testContext.assertNotNull(requestCaptor.getValue()); + testContext.assertNotNull(consumerCaptor.getValue()); + } + + @Test + public void testDequeueStatsNotCalledWhenIntervalIsZeroOrNegative(TestContext testContext) { + // Mock the config so that getDequeueStatisticReportIntervalSec returns 0 or a negative value + Mockito.when(config.getDequeueStatisticReportIntervalSec()).thenReturn(0); // Value equal to 0 + // Alternatively: Mockito.when(config.getDequeueStatisticReportIntervalSec()).thenReturn(-1); // For negative value + + // Set up async for the test + Async async = testContext.async(); + + // Mock a context and mentor + Object mockContext = new Object(); + QueueStatsService.GetQueueStatsMentor mentor = new QueueStatsService.GetQueueStatsMentor<>() { + @Override + public boolean includeEmptyQueues(Object ctx) { + return true; + } + + @Override + public int limit(Object ctx) { + return 10; + } + + @Override + public String filter(Object ctx) { + return "*"; + } + + @Override + public void onQueueStatistics(java.util.List queues, Object ctx) { + // Assert the dequeue stats were not attached + testContext.assertTrue(queues.get(0).getNextDequeueDueTimestampEpochMs() == null); + async.complete(); + } + + @Override + public void onError(Throwable ex, Object ctx) { + testContext.fail(ex); + } + }; + + // Mock fetchQueueNamesAndSize to return a valid list of queues + Mockito.doAnswer(invocation -> { + BiConsumer> callback = invocation.getArgument(1); + QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); + request.queues = Collections.singletonList(new QueueStatsService.Queue("testQueue", 1)); + callback.accept(null, request); + return null; + }).when(queueStatsService).fetchQueueNamesAndSize(Mockito.any(), Mockito.any()); + + // Mock fetchRetryDetails to do nothing and call the next step + Mockito.doAnswer(invocation -> { + BiConsumer> callback = invocation.getArgument(1); + QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); + callback.accept(null, request); + return null; + }).when(queueStatsService).fetchRetryDetails(Mockito.any(), Mockito.any()); + + // Call getQueueStats to trigger the flow + queueStatsService.getQueueStats(mockContext, mentor); + + // Verify that attachDequeueStats was NOT called + Mockito.verify(queueStatsService, Mockito.never()).attachDequeueStats(Mockito.any(), Mockito.any()); + + async.complete(); + } +} From f64f24d6d801aece408adeeec19d07d50c225170 Mon Sep 17 00:00:00 2001 From: almeidast Date: Wed, 11 Sep 2024 16:51:16 +0100 Subject: [PATCH 10/14] Fix QueueStatsService, remove no necessary imports, create a helper to avoid pass all RedisConfiguration class, adjusts tests. --- .../redisques/QueueStatsService.java | 12 ++- .../handler/RedisquesHttpRequestHandler.java | 2 +- .../util/RedisquesConfiguration.java | 2 + .../redisques/QueueStatsServiceTest.java | 89 +++++++++++++------ 4 files changed, 69 insertions(+), 36 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index f8d2da2d..96148795 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -12,10 +12,7 @@ import org.swisspush.redisques.util.RedisquesConfiguration; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -51,7 +48,7 @@ public class QueueStatsService { private final DequeueStatisticCollector dequeueStatisticCollector; private final RedisQuesExceptionFactory exceptionFactory; private final Semaphore incomingRequestQuota; - private final RedisquesConfiguration modConfig; + private final Boolean fetchQueueStats; public QueueStatsService( Vertx vertx, @@ -61,7 +58,7 @@ public QueueStatsService( DequeueStatisticCollector dequeueStatisticCollector, RedisQuesExceptionFactory exceptionFactory, Semaphore incomingRequestQuota, - RedisquesConfiguration modConfig + Boolean fetchQueueStats ) { this.vertx = vertx; this.eventBus = eventBus; @@ -70,7 +67,7 @@ public QueueStatsService( this.dequeueStatisticCollector = dequeueStatisticCollector; this.exceptionFactory = exceptionFactory; this.incomingRequestQuota = incomingRequestQuota; - this.modConfig = modConfig; + this.fetchQueueStats = fetchQueueStats; } public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { @@ -101,12 +98,13 @@ public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { for (Queue q : req1.queues) req1.queueNames.add(q.name); fetchRetryDetails(req1, (ex2, req2) -> { if (ex2 != null) { onDone.accept(ex2, null); return; } - if (modConfig.getDequeueStatisticReportIntervalSec() > 0){ + if (fetchQueueStats) { attachDequeueStats(req2, (ex3, req3) -> { if (ex3 != null) { onDone.accept(ex3, null); return; } onDone.accept(null, req3.queues); }); } + onDone.accept(null, req2.queues); }); }); } catch (Exception ex) { diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index d19c6c98..0cf47cec 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -138,7 +138,7 @@ private RedisquesHttpRequestHandler( this.exceptionFactory = exceptionFactory; this.queueStatsService = new QueueStatsService( vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector, - exceptionFactory, queueStatsRequestQuota,modConfig); + exceptionFactory, queueStatsRequestQuota,modConfig.isDequeueStatsEnabled()); final String prefix = modConfig.getHttpRequestHandlerPrefix(); diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java index e44cfc1a..01148b98 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java @@ -704,6 +704,8 @@ public String toString() { return asJsonObject().toString(); } + public boolean isDequeueStatsEnabled() { return getDequeueStatisticReportIntervalSec()>0; } + /** * RedisquesConfigurationBuilder class for simplified configuration. * diff --git a/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java b/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java index 77223e6f..b36231b0 100644 --- a/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java +++ b/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java @@ -31,9 +31,11 @@ public class QueueStatsServiceTest { private RedisquesConfiguration config; private DequeueStatisticCollector dequeueStatisticCollector; + /** + * Test setup for cases where dequeue stats are enabled. + */ @Before - public void setUp() { - // Initialize Vertx and mock dependencies + public void setUpForEnabledDequeueStats() { vertx = Vertx.vertx(); eventBus = Mockito.mock(EventBus.class); QueueStatisticsCollector queueStatisticsCollector = Mockito.mock(QueueStatisticsCollector.class); @@ -42,25 +44,48 @@ public void setUp() { Semaphore semaphore = new Semaphore(1); config = Mockito.spy(new RedisquesConfiguration()); - // Create the QueueStatsService instance with mocked dependencies + // Ensure fetchQueueStats is true queueStatsService = Mockito.spy(new QueueStatsService(vertx, eventBus, "redisques", queueStatisticsCollector, - dequeueStatisticCollector, exceptionFactory, semaphore, config)); + dequeueStatisticCollector, exceptionFactory, semaphore, true)); } + /** + * Test setup for cases where dequeue stats are disabled. + */ + @Before + public void setUpForDisabledDequeueStats() { + vertx = Vertx.vertx(); + eventBus = Mockito.mock(EventBus.class); + QueueStatisticsCollector queueStatisticsCollector = Mockito.mock(QueueStatisticsCollector.class); + dequeueStatisticCollector = Mockito.mock(DequeueStatisticCollector.class); + RedisQuesExceptionFactory exceptionFactory = Mockito.mock(RedisQuesExceptionFactory.class); + Semaphore semaphore = new Semaphore(1); + config = Mockito.spy(new RedisquesConfiguration()); + + // Ensure fetchQueueStats is false + queueStatsService = Mockito.spy(new QueueStatsService(vertx, eventBus, "redisques", queueStatisticsCollector, + dequeueStatisticCollector, exceptionFactory, semaphore, false)); + } + + /** + * Test to verify that dequeue statistics are collected when the interval is greater than zero. + */ @Test public void testDequeueStatsCalledWhenIntervalGreaterThanZero(TestContext testContext) { - // Mock the config so that getDequeueStatisticReportIntervalSec returns a value greater than 0 - Mockito.when(config.getDequeueStatisticReportIntervalSec()).thenReturn(10); // Value greater than 0 + setUpForEnabledDequeueStats(); - // Mock the getAllDequeueStatistics method to return a successful Future + // Mock for enabling dequeue stats + Mockito.when(config.isDequeueStatsEnabled()).thenReturn(true); + + // Mock method to return successful Future with dequeue statistics HashMap dequeueStatistics = new HashMap<>(); dequeueStatistics.put("testQueue", new DequeueStatistic()); Mockito.when(dequeueStatisticCollector.getAllDequeueStatistics()).thenReturn(Future.succeededFuture(dequeueStatistics)); - // Set up async for the test + // Async setup for the test Async async = testContext.async(); - // Mock a context and mentor + // Mock the context and mentor Object mockContext = new Object(); QueueStatsService.GetQueueStatsMentor mentor = new QueueStatsService.GetQueueStatsMentor<>() { @Override @@ -80,8 +105,8 @@ public String filter(Object ctx) { @Override public void onQueueStatistics(java.util.List queues, Object ctx) { - // Assert the dequeue stats were attached - testContext.assertTrue(queues.get(0).getSize() >0); + testContext.assertNotNull(queues); + testContext.assertEquals(1, queues.size()); async.complete(); } @@ -91,7 +116,7 @@ public void onError(Throwable ex, Object ctx) { } }; - // Mock fetchQueueNamesAndSize to return a valid list of queues + // Mock fetchQueueNamesAndSize Mockito.doAnswer(invocation -> { BiConsumer> callback = invocation.getArgument(1); QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); @@ -100,7 +125,7 @@ public void onError(Throwable ex, Object ctx) { return null; }).when(queueStatsService).fetchQueueNamesAndSize(Mockito.any(), Mockito.any()); - // Mock fetchRetryDetails to do nothing and call the next step + // Mock fetchRetryDetails Mockito.doAnswer(invocation -> { BiConsumer> callback = invocation.getArgument(1); QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); @@ -108,31 +133,38 @@ public void onError(Throwable ex, Object ctx) { return null; }).when(queueStatsService).fetchRetryDetails(Mockito.any(), Mockito.any()); - // Call getQueueStats to trigger the flow + // Call the method getQueueStats queueStatsService.getQueueStats(mockContext, mentor); - // Use ArgumentCaptor to capture the arguments passed to attachDequeueStats + // Capture arguments passed to attachDequeueStats ArgumentCaptor> requestCaptor = ArgumentCaptor.forClass(QueueStatsService.GetQueueStatsRequest.class); ArgumentCaptor>> consumerCaptor = ArgumentCaptor.forClass(BiConsumer.class); - // Verify that attachDequeueStats was called with the correct arguments + // Verify that attachDequeueStats was called Mockito.verify(queueStatsService).attachDequeueStats(requestCaptor.capture(), consumerCaptor.capture()); - // Assert that the arguments are not null and match expectations + // Verify that the request contains the correct queue testContext.assertNotNull(requestCaptor.getValue()); - testContext.assertNotNull(consumerCaptor.getValue()); + testContext.assertEquals("testQueue", requestCaptor.getValue().queues.get(0).getName()); + + // Verify that async completed correctly + testContext.assertTrue(async.count() == 0); } + /** + * Test to verify that dequeue statistics are not collected when the interval is zero or negative. + */ @Test public void testDequeueStatsNotCalledWhenIntervalIsZeroOrNegative(TestContext testContext) { - // Mock the config so that getDequeueStatisticReportIntervalSec returns 0 or a negative value - Mockito.when(config.getDequeueStatisticReportIntervalSec()).thenReturn(0); // Value equal to 0 - // Alternatively: Mockito.when(config.getDequeueStatisticReportIntervalSec()).thenReturn(-1); // For negative value + setUpForDisabledDequeueStats(); + + // Mock the configuration to disable dequeue stats + Mockito.when(config.isDequeueStatsEnabled()).thenReturn(false); - // Set up async for the test + // Async setup for the test Async async = testContext.async(); - // Mock a context and mentor + // Mock the context and mentor Object mockContext = new Object(); QueueStatsService.GetQueueStatsMentor mentor = new QueueStatsService.GetQueueStatsMentor<>() { @Override @@ -152,7 +184,7 @@ public String filter(Object ctx) { @Override public void onQueueStatistics(java.util.List queues, Object ctx) { - // Assert the dequeue stats were not attached + // Ensure that nextDequeueDueTimestampEpochMs is null since stats are not attached testContext.assertTrue(queues.get(0).getNextDequeueDueTimestampEpochMs() == null); async.complete(); } @@ -163,7 +195,7 @@ public void onError(Throwable ex, Object ctx) { } }; - // Mock fetchQueueNamesAndSize to return a valid list of queues + // Mock fetchQueueNamesAndSize Mockito.doAnswer(invocation -> { BiConsumer> callback = invocation.getArgument(1); QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); @@ -172,7 +204,7 @@ public void onError(Throwable ex, Object ctx) { return null; }).when(queueStatsService).fetchQueueNamesAndSize(Mockito.any(), Mockito.any()); - // Mock fetchRetryDetails to do nothing and call the next step + // Mock fetchRetryDetails Mockito.doAnswer(invocation -> { BiConsumer> callback = invocation.getArgument(1); QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); @@ -180,12 +212,13 @@ public void onError(Throwable ex, Object ctx) { return null; }).when(queueStatsService).fetchRetryDetails(Mockito.any(), Mockito.any()); - // Call getQueueStats to trigger the flow + // Call the method getQueueStats queueStatsService.getQueueStats(mockContext, mentor); // Verify that attachDequeueStats was NOT called Mockito.verify(queueStatsService, Mockito.never()).attachDequeueStats(Mockito.any(), Mockito.any()); - async.complete(); + // Verify that async completed correctly + async.awaitSuccess(); } } From 8ad573892d7cf18293d9cc3c7c536aa898c9f6ae Mon Sep 17 00:00:00 2001 From: almeidast Date: Thu, 19 Sep 2024 13:07:24 +0200 Subject: [PATCH 11/14] Implement suggestions from code review. Create a test in the different way. remove unnecessary code and imports Rollback private declarations --- .../redisques/QueueStatsService.java | 15 +- .../org/swisspush/redisques/RedisQues.java | 9 +- .../swisspush/redisques/RedisQuesRunner.java | 1 + .../handler/RedisquesHttpRequestHandler.java | 2 +- .../util/DequeueStatisticCollector.java | 9 +- .../util/RedisquesConfiguration.java | 4 +- .../DequeueStatisticCollectorTest.java | 83 +++++++ .../redisques/QueueStatsServiceTest.java | 224 ------------------ 8 files changed, 104 insertions(+), 243 deletions(-) create mode 100644 src/test/java/org/swisspush/redisques/DequeueStatisticCollectorTest.java delete mode 100644 src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index 96148795..e635d8f8 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -9,7 +9,6 @@ import org.swisspush.redisques.util.DequeueStatistic; import org.swisspush.redisques.util.DequeueStatisticCollector; import org.swisspush.redisques.util.QueueStatisticsCollector; -import org.swisspush.redisques.util.RedisquesConfiguration; import java.util.ArrayList; import java.util.List; @@ -48,7 +47,6 @@ public class QueueStatsService { private final DequeueStatisticCollector dequeueStatisticCollector; private final RedisQuesExceptionFactory exceptionFactory; private final Semaphore incomingRequestQuota; - private final Boolean fetchQueueStats; public QueueStatsService( Vertx vertx, @@ -57,8 +55,7 @@ public QueueStatsService( QueueStatisticsCollector queueStatisticsCollector, DequeueStatisticCollector dequeueStatisticCollector, RedisQuesExceptionFactory exceptionFactory, - Semaphore incomingRequestQuota, - Boolean fetchQueueStats + Semaphore incomingRequestQuota ) { this.vertx = vertx; this.eventBus = eventBus; @@ -67,7 +64,6 @@ public QueueStatsService( this.dequeueStatisticCollector = dequeueStatisticCollector; this.exceptionFactory = exceptionFactory; this.incomingRequestQuota = incomingRequestQuota; - this.fetchQueueStats = fetchQueueStats; } public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { @@ -98,13 +94,10 @@ public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { for (Queue q : req1.queues) req1.queueNames.add(q.name); fetchRetryDetails(req1, (ex2, req2) -> { if (ex2 != null) { onDone.accept(ex2, null); return; } - if (fetchQueueStats) { attachDequeueStats(req2, (ex3, req3) -> { if (ex3 != null) { onDone.accept(ex3, null); return; } onDone.accept(null, req3.queues); }); - } - onDone.accept(null, req2.queues); }); }); } catch (Exception ex) { @@ -117,7 +110,7 @@ public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { } } - void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer> onDone) { + private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer> onDone) { String filter = req.mentor.filter(req.mCtx); JsonObject operation = buildGetQueuesItemsCountOperation(filter); eventBus.request(redisquesAddress, operation, ev -> { @@ -164,7 +157,7 @@ void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer> onDone) { + private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer> onDone) { long begGetQueueStatsMs = currentTimeMillis(); assert req.queueNames != null; queueStatisticsCollector.getQueueStatistics(req.queueNames).onComplete( ev -> { @@ -190,7 +183,7 @@ void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer void attachDequeueStats(GetQueueStatsRequest req, BiConsumer> onDone) { + private void attachDequeueStats(GetQueueStatsRequest req, BiConsumer> onDone) { dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(event -> { for (Queue queue : req.queues) { if (event.containsKey(queue.name)) { diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index bf2318d2..8fc491b5 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -311,10 +311,6 @@ public void start(Promise promise) { this.configurationProvider = new DefaultRedisquesConfigurationProvider(vertx, config()); } - if (this.dequeueStatisticCollector == null) { - this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx); - } - if (this.periodicSkipScheduler == null) { this.periodicSkipScheduler = new PeriodicSkipScheduler(vertx); } @@ -323,11 +319,14 @@ public void start(Promise promise) { log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration()); int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec(); - if (dequeueStatisticReportIntervalSec > 0) { + if (modConfig.isDequeueStatsEnabled()) { dequeueStatisticEnabled = true; Runnable publisher = newDequeueStatisticPublisher(); vertx.setPeriodic(1000L * dequeueStatisticReportIntervalSec, time -> publisher.run()); } + if (this.dequeueStatisticCollector == null) { + this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx,dequeueStatisticEnabled); + } queuesKey = modConfig.getRedisPrefix() + "queues"; queuesPrefix = modConfig.getRedisPrefix() + "queues:"; consumersPrefix = modConfig.getRedisPrefix() + "consumers:"; diff --git a/src/main/java/org/swisspush/redisques/RedisQuesRunner.java b/src/main/java/org/swisspush/redisques/RedisQuesRunner.java index ca78c38d..f920e1e6 100644 --- a/src/main/java/org/swisspush/redisques/RedisQuesRunner.java +++ b/src/main/java/org/swisspush/redisques/RedisQuesRunner.java @@ -21,6 +21,7 @@ public static void main(String[] args) { .redisReconnectAttempts(-1) .redisPoolRecycleTimeoutMs(-1) .redisReadyCheckIntervalMs(5000) + //.dequeueStatisticReportIntervalSec(10) .build().asJsonObject(); Vertx.vertx().deployVerticle(new RedisQues(), new DeploymentOptions().setConfig(configuration), diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index 0cf47cec..a5af7a1f 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -138,7 +138,7 @@ private RedisquesHttpRequestHandler( this.exceptionFactory = exceptionFactory; this.queueStatsService = new QueueStatsService( vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector, - exceptionFactory, queueStatsRequestQuota,modConfig.isDequeueStatsEnabled()); + exceptionFactory, queueStatsRequestQuota); final String prefix = modConfig.getHttpRequestHandlerPrefix(); diff --git a/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java b/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java index 5f180186..bc5e1470 100644 --- a/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java +++ b/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.Map; public class DequeueStatisticCollector { @@ -18,9 +19,11 @@ public class DequeueStatisticCollector { private final static String DEQUEUE_STATISTIC_DATA = "dequeueStatisticData"; private final static String DEQUEUE_STATISTIC_LOCK_PREFIX = "dequeueStatisticLock."; private final SharedData sharedData; + private final boolean dequeueStatisticEnabled; - public DequeueStatisticCollector(Vertx vertx) { + public DequeueStatisticCollector(Vertx vertx, boolean dequeueStatisticEnabled) { this.sharedData = vertx.sharedData(); + this.dequeueStatisticEnabled = dequeueStatisticEnabled; } /** @@ -107,6 +110,10 @@ public Future setDequeueStatistic(final String queueName, final DequeueSta } public Future> getAllDequeueStatistics() { + // Check if dequeue statistics are enabled + if (!dequeueStatisticEnabled) { + return Future.succeededFuture(Collections.emptyMap()); // Return an empty map to avoid NullPointerExceptions + } Promise> promise = Promise.promise(); sharedData.getAsyncMap(DEQUEUE_STATISTIC_DATA, (Handler>>) asyncResult -> { if (asyncResult.failed()) { diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java index 01148b98..20a11b88 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java @@ -704,7 +704,9 @@ public String toString() { return asJsonObject().toString(); } - public boolean isDequeueStatsEnabled() { return getDequeueStatisticReportIntervalSec()>0; } + public boolean isDequeueStatsEnabled() { + return getDequeueStatisticReportIntervalSec() > 0; + } /** * RedisquesConfigurationBuilder class for simplified configuration. diff --git a/src/test/java/org/swisspush/redisques/DequeueStatisticCollectorTest.java b/src/test/java/org/swisspush/redisques/DequeueStatisticCollectorTest.java new file mode 100644 index 00000000..219d49dd --- /dev/null +++ b/src/test/java/org/swisspush/redisques/DequeueStatisticCollectorTest.java @@ -0,0 +1,83 @@ +package org.swisspush.redisques; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.SharedData; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.swisspush.redisques.util.DequeueStatistic; +import org.swisspush.redisques.util.DequeueStatisticCollector; + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.*; + +@RunWith(VertxUnitRunner.class) +public class DequeueStatisticCollectorTest { + + private Vertx vertx; + private SharedData sharedData; + private AsyncMap asyncMap; + private DequeueStatisticCollector dequeueStatisticCollectorEnabled; + private DequeueStatisticCollector dequeueStatisticCollectorDisabled; + + @Before + public void setUp() { + vertx = mock(Vertx.class); + sharedData = mock(SharedData.class); + asyncMap = mock(AsyncMap.class); + + doAnswer(invocation -> { + io.vertx.core.Handler>> handler = invocation.getArgument(1); + handler.handle(Future.succeededFuture(asyncMap)); + return null; + }).when(sharedData).getAsyncMap(anyString(), any()); + + when(vertx.sharedData()).thenReturn(sharedData); + + // Set up the enabled and disabled DequeueStatisticCollector + dequeueStatisticCollectorEnabled = new DequeueStatisticCollector(vertx, true); + dequeueStatisticCollectorDisabled = new DequeueStatisticCollector(vertx, false); + } + + @Test + public void testGetAllDequeueStatisticsEnabled(TestContext context) { + // Mocking asyncMap.entries() to return a non-empty map + Map dequeueStats = new HashMap<>(); + dequeueStats.put("queue1", new DequeueStatistic()); + when(asyncMap.entries()).thenReturn(Future.succeededFuture(dequeueStats)); + + // Test for when dequeue statistics are enabled + Async async = context.async(); + dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> { + context.assertTrue(result.succeeded()); + context.assertEquals(1, result.result().size()); + async.complete(); + }); + + // Verify that sharedData and asyncMap were used correctly + verify(sharedData, times(1)).getAsyncMap(anyString(), any()); + verify(asyncMap, times(1)).entries(); + } + + @Test + public void testGetAllDequeueStatisticsDisabled(TestContext context) { + // Test for when dequeue statistics are disabled + Async async = context.async(); + dequeueStatisticCollectorDisabled.getAllDequeueStatistics().onComplete(result -> { + context.assertTrue(result.succeeded()); + context.assertTrue(result.result().isEmpty()); + async.complete(); + }); + + // Verify that sharedData and asyncMap were NOT used + verify(sharedData, never()).getAsyncMap(anyString(), any()); + verify(asyncMap, never()).entries(); + } +} diff --git a/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java b/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java deleted file mode 100644 index b36231b0..00000000 --- a/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java +++ /dev/null @@ -1,224 +0,0 @@ -package org.swisspush.redisques; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.core.eventbus.EventBus; -import io.vertx.ext.unit.Async; -import io.vertx.ext.unit.TestContext; -import io.vertx.ext.unit.junit.VertxUnitRunner; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; -import org.swisspush.redisques.util.DequeueStatistic; -import org.swisspush.redisques.util.DequeueStatisticCollector; -import org.swisspush.redisques.util.QueueStatisticsCollector; -import org.swisspush.redisques.util.RedisquesConfiguration; -import org.swisspush.redisques.exception.RedisQuesExceptionFactory; - -import java.util.Collections; -import java.util.HashMap; -import java.util.concurrent.Semaphore; -import java.util.function.BiConsumer; - -@RunWith(VertxUnitRunner.class) -public class QueueStatsServiceTest { - - private Vertx vertx; - private EventBus eventBus; - private QueueStatsService queueStatsService; - private RedisquesConfiguration config; - private DequeueStatisticCollector dequeueStatisticCollector; - - /** - * Test setup for cases where dequeue stats are enabled. - */ - @Before - public void setUpForEnabledDequeueStats() { - vertx = Vertx.vertx(); - eventBus = Mockito.mock(EventBus.class); - QueueStatisticsCollector queueStatisticsCollector = Mockito.mock(QueueStatisticsCollector.class); - dequeueStatisticCollector = Mockito.mock(DequeueStatisticCollector.class); - RedisQuesExceptionFactory exceptionFactory = Mockito.mock(RedisQuesExceptionFactory.class); - Semaphore semaphore = new Semaphore(1); - config = Mockito.spy(new RedisquesConfiguration()); - - // Ensure fetchQueueStats is true - queueStatsService = Mockito.spy(new QueueStatsService(vertx, eventBus, "redisques", queueStatisticsCollector, - dequeueStatisticCollector, exceptionFactory, semaphore, true)); - } - - /** - * Test setup for cases where dequeue stats are disabled. - */ - @Before - public void setUpForDisabledDequeueStats() { - vertx = Vertx.vertx(); - eventBus = Mockito.mock(EventBus.class); - QueueStatisticsCollector queueStatisticsCollector = Mockito.mock(QueueStatisticsCollector.class); - dequeueStatisticCollector = Mockito.mock(DequeueStatisticCollector.class); - RedisQuesExceptionFactory exceptionFactory = Mockito.mock(RedisQuesExceptionFactory.class); - Semaphore semaphore = new Semaphore(1); - config = Mockito.spy(new RedisquesConfiguration()); - - // Ensure fetchQueueStats is false - queueStatsService = Mockito.spy(new QueueStatsService(vertx, eventBus, "redisques", queueStatisticsCollector, - dequeueStatisticCollector, exceptionFactory, semaphore, false)); - } - - /** - * Test to verify that dequeue statistics are collected when the interval is greater than zero. - */ - @Test - public void testDequeueStatsCalledWhenIntervalGreaterThanZero(TestContext testContext) { - setUpForEnabledDequeueStats(); - - // Mock for enabling dequeue stats - Mockito.when(config.isDequeueStatsEnabled()).thenReturn(true); - - // Mock method to return successful Future with dequeue statistics - HashMap dequeueStatistics = new HashMap<>(); - dequeueStatistics.put("testQueue", new DequeueStatistic()); - Mockito.when(dequeueStatisticCollector.getAllDequeueStatistics()).thenReturn(Future.succeededFuture(dequeueStatistics)); - - // Async setup for the test - Async async = testContext.async(); - - // Mock the context and mentor - Object mockContext = new Object(); - QueueStatsService.GetQueueStatsMentor mentor = new QueueStatsService.GetQueueStatsMentor<>() { - @Override - public boolean includeEmptyQueues(Object ctx) { - return true; - } - - @Override - public int limit(Object ctx) { - return 10; - } - - @Override - public String filter(Object ctx) { - return "*"; - } - - @Override - public void onQueueStatistics(java.util.List queues, Object ctx) { - testContext.assertNotNull(queues); - testContext.assertEquals(1, queues.size()); - async.complete(); - } - - @Override - public void onError(Throwable ex, Object ctx) { - testContext.fail(ex); - } - }; - - // Mock fetchQueueNamesAndSize - Mockito.doAnswer(invocation -> { - BiConsumer> callback = invocation.getArgument(1); - QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); - request.queues = Collections.singletonList(new QueueStatsService.Queue("testQueue", 1)); - callback.accept(null, request); - return null; - }).when(queueStatsService).fetchQueueNamesAndSize(Mockito.any(), Mockito.any()); - - // Mock fetchRetryDetails - Mockito.doAnswer(invocation -> { - BiConsumer> callback = invocation.getArgument(1); - QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); - callback.accept(null, request); - return null; - }).when(queueStatsService).fetchRetryDetails(Mockito.any(), Mockito.any()); - - // Call the method getQueueStats - queueStatsService.getQueueStats(mockContext, mentor); - - // Capture arguments passed to attachDequeueStats - ArgumentCaptor> requestCaptor = ArgumentCaptor.forClass(QueueStatsService.GetQueueStatsRequest.class); - ArgumentCaptor>> consumerCaptor = ArgumentCaptor.forClass(BiConsumer.class); - - // Verify that attachDequeueStats was called - Mockito.verify(queueStatsService).attachDequeueStats(requestCaptor.capture(), consumerCaptor.capture()); - - // Verify that the request contains the correct queue - testContext.assertNotNull(requestCaptor.getValue()); - testContext.assertEquals("testQueue", requestCaptor.getValue().queues.get(0).getName()); - - // Verify that async completed correctly - testContext.assertTrue(async.count() == 0); - } - - /** - * Test to verify that dequeue statistics are not collected when the interval is zero or negative. - */ - @Test - public void testDequeueStatsNotCalledWhenIntervalIsZeroOrNegative(TestContext testContext) { - setUpForDisabledDequeueStats(); - - // Mock the configuration to disable dequeue stats - Mockito.when(config.isDequeueStatsEnabled()).thenReturn(false); - - // Async setup for the test - Async async = testContext.async(); - - // Mock the context and mentor - Object mockContext = new Object(); - QueueStatsService.GetQueueStatsMentor mentor = new QueueStatsService.GetQueueStatsMentor<>() { - @Override - public boolean includeEmptyQueues(Object ctx) { - return true; - } - - @Override - public int limit(Object ctx) { - return 10; - } - - @Override - public String filter(Object ctx) { - return "*"; - } - - @Override - public void onQueueStatistics(java.util.List queues, Object ctx) { - // Ensure that nextDequeueDueTimestampEpochMs is null since stats are not attached - testContext.assertTrue(queues.get(0).getNextDequeueDueTimestampEpochMs() == null); - async.complete(); - } - - @Override - public void onError(Throwable ex, Object ctx) { - testContext.fail(ex); - } - }; - - // Mock fetchQueueNamesAndSize - Mockito.doAnswer(invocation -> { - BiConsumer> callback = invocation.getArgument(1); - QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); - request.queues = Collections.singletonList(new QueueStatsService.Queue("testQueue", 1)); - callback.accept(null, request); - return null; - }).when(queueStatsService).fetchQueueNamesAndSize(Mockito.any(), Mockito.any()); - - // Mock fetchRetryDetails - Mockito.doAnswer(invocation -> { - BiConsumer> callback = invocation.getArgument(1); - QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); - callback.accept(null, request); - return null; - }).when(queueStatsService).fetchRetryDetails(Mockito.any(), Mockito.any()); - - // Call the method getQueueStats - queueStatsService.getQueueStats(mockContext, mentor); - - // Verify that attachDequeueStats was NOT called - Mockito.verify(queueStatsService, Mockito.never()).attachDequeueStats(Mockito.any(), Mockito.any()); - - // Verify that async completed correctly - async.awaitSuccess(); - } -} From f2941cf3ee7529b7c51c1d3b130212d287cf132b Mon Sep 17 00:00:00 2001 From: almeidast Date: Thu, 19 Sep 2024 13:30:31 +0200 Subject: [PATCH 12/14] Rollback wrong commit. --- src/main/java/org/swisspush/redisques/RedisQuesRunner.java | 1 - 1 file changed, 1 deletion(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQuesRunner.java b/src/main/java/org/swisspush/redisques/RedisQuesRunner.java index f920e1e6..ca78c38d 100644 --- a/src/main/java/org/swisspush/redisques/RedisQuesRunner.java +++ b/src/main/java/org/swisspush/redisques/RedisQuesRunner.java @@ -21,7 +21,6 @@ public static void main(String[] args) { .redisReconnectAttempts(-1) .redisPoolRecycleTimeoutMs(-1) .redisReadyCheckIntervalMs(5000) - //.dequeueStatisticReportIntervalSec(10) .build().asJsonObject(); Vertx.vertx().deployVerticle(new RedisQues(), new DeploymentOptions().setConfig(configuration), From 6973703b7d2e1504e01163a3fa3a90d91448cef1 Mon Sep 17 00:00:00 2001 From: almeidast Date: Thu, 19 Sep 2024 13:35:09 +0200 Subject: [PATCH 13/14] Rollback class. --- .../redisques/QueueStatsService.java | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index e635d8f8..f16cc811 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -11,7 +11,10 @@ import org.swisspush.redisques.util.QueueStatisticsCollector; import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; @@ -49,13 +52,13 @@ public class QueueStatsService { private final Semaphore incomingRequestQuota; public QueueStatsService( - Vertx vertx, - EventBus eventBus, - String redisquesAddress, - QueueStatisticsCollector queueStatisticsCollector, - DequeueStatisticCollector dequeueStatisticCollector, - RedisQuesExceptionFactory exceptionFactory, - Semaphore incomingRequestQuota + Vertx vertx, + EventBus eventBus, + String redisquesAddress, + QueueStatisticsCollector queueStatisticsCollector, + DequeueStatisticCollector dequeueStatisticCollector, + RedisQuesExceptionFactory exceptionFactory, + Semaphore incomingRequestQuota ) { this.vertx = vertx; this.eventBus = eventBus; @@ -94,10 +97,10 @@ public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { for (Queue q : req1.queues) req1.queueNames.add(q.name); fetchRetryDetails(req1, (ex2, req2) -> { if (ex2 != null) { onDone.accept(ex2, null); return; } - attachDequeueStats(req2, (ex3, req3) -> { - if (ex3 != null) { onDone.accept(ex3, null); return; } - onDone.accept(null, req3.queues); - }); + attachDequeueStats(req2, (ex3, req3) -> { + if (ex3 != null) { onDone.accept(ex3, null); return; } + onDone.accept(null, req3.queues); + }); }); }); } catch (Exception ex) { @@ -110,7 +113,7 @@ public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { } } - private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer> onDone) { + private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer> onDone) { String filter = req.mentor.filter(req.mCtx); JsonObject operation = buildGetQueuesItemsCountOperation(filter); eventBus.request(redisquesAddress, operation, ev -> { @@ -157,7 +160,7 @@ private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsu }); } - private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer> onDone) { + private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer> onDone) { long begGetQueueStatsMs = currentTimeMillis(); assert req.queueNames != null; queueStatisticsCollector.getQueueStatistics(req.queueNames).onComplete( ev -> { @@ -183,7 +186,7 @@ private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer void attachDequeueStats(GetQueueStatsRequest req, BiConsumer> onDone) { + private void attachDequeueStats(GetQueueStatsRequest req, BiConsumer> onDone) { dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(event -> { for (Queue queue : req.queues) { if (event.containsKey(queue.name)) { @@ -206,13 +209,13 @@ private int compareLargestFirst(Queue a, Queue b) { } - static class GetQueueStatsRequest { + private static class GetQueueStatsRequest { private CTX mCtx; private GetQueueStatsMentor mentor; private List queueNames; /* TODO: Why is 'queuesJsonArr' never accessed? Isn't this the reason of our class in the first place? */ private JsonArray queuesJsonArr; - List queues; + private List queues; } @@ -222,7 +225,7 @@ public static class Queue { private Long lastDequeueAttemptEpochMs; private Long lastDequeueSuccessEpochMs; private Long nextDequeueDueTimestampEpochMs; - Queue(String name, long size){ + private Queue(String name, long size){ assert name != null; this.name = name; this.size = size; From 824bbb9c31881ca821a4f75bbc77d05fe7ed4c04 Mon Sep 17 00:00:00 2001 From: almeidast Date: Wed, 25 Sep 2024 18:41:03 +0100 Subject: [PATCH 14/14] fix package of DequeueStatisticCollectorTest add more testes for failure cases simplify code as suggested in PR --- .../DequeueStatisticCollectorTest.java | 63 ++++++++++++++++--- 1 file changed, 54 insertions(+), 9 deletions(-) rename src/test/java/org/swisspush/redisques/{ => util}/DequeueStatisticCollectorTest.java (50%) diff --git a/src/test/java/org/swisspush/redisques/DequeueStatisticCollectorTest.java b/src/test/java/org/swisspush/redisques/util/DequeueStatisticCollectorTest.java similarity index 50% rename from src/test/java/org/swisspush/redisques/DequeueStatisticCollectorTest.java rename to src/test/java/org/swisspush/redisques/util/DequeueStatisticCollectorTest.java index 219d49dd..a9ea3332 100644 --- a/src/test/java/org/swisspush/redisques/DequeueStatisticCollectorTest.java +++ b/src/test/java/org/swisspush/redisques/util/DequeueStatisticCollectorTest.java @@ -1,4 +1,4 @@ -package org.swisspush.redisques; +package org.swisspush.redisques.util; import io.vertx.core.Future; import io.vertx.core.Vertx; @@ -10,8 +10,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.swisspush.redisques.util.DequeueStatistic; -import org.swisspush.redisques.util.DequeueStatisticCollector; import java.util.HashMap; import java.util.Map; @@ -33,6 +31,7 @@ public void setUp() { sharedData = mock(SharedData.class); asyncMap = mock(AsyncMap.class); + // Mock sharedData.getAsyncMap to return asyncMap doAnswer(invocation -> { io.vertx.core.Handler>> handler = invocation.getArgument(1); handler.handle(Future.succeededFuture(asyncMap)); @@ -41,19 +40,19 @@ public void setUp() { when(vertx.sharedData()).thenReturn(sharedData); - // Set up the enabled and disabled DequeueStatisticCollector + // Initialize DequeueStatisticCollector with enabled/disabled stats collection dequeueStatisticCollectorEnabled = new DequeueStatisticCollector(vertx, true); dequeueStatisticCollectorDisabled = new DequeueStatisticCollector(vertx, false); } @Test public void testGetAllDequeueStatisticsEnabled(TestContext context) { - // Mocking asyncMap.entries() to return a non-empty map + // Mock asyncMap.entries() to return a non-empty map Map dequeueStats = new HashMap<>(); dequeueStats.put("queue1", new DequeueStatistic()); when(asyncMap.entries()).thenReturn(Future.succeededFuture(dequeueStats)); - // Test for when dequeue statistics are enabled + // Test when dequeue statistics are enabled Async async = context.async(); dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> { context.assertTrue(result.succeeded()); @@ -68,7 +67,7 @@ public void testGetAllDequeueStatisticsEnabled(TestContext context) { @Test public void testGetAllDequeueStatisticsDisabled(TestContext context) { - // Test for when dequeue statistics are disabled + // Test when dequeue statistics are disabled Async async = context.async(); dequeueStatisticCollectorDisabled.getAllDequeueStatistics().onComplete(result -> { context.assertTrue(result.succeeded()); @@ -77,7 +76,53 @@ public void testGetAllDequeueStatisticsDisabled(TestContext context) { }); // Verify that sharedData and asyncMap were NOT used - verify(sharedData, never()).getAsyncMap(anyString(), any()); - verify(asyncMap, never()).entries(); + verifyNoInteractions(sharedData); + verifyNoInteractions(asyncMap); + } + + @Test + public void testGetAllDequeueStatisticsAsyncMapFailure(TestContext context) { + // Simulate failure in sharedData.getAsyncMap + doAnswer(invocation -> { + io.vertx.core.Handler>> handler = invocation.getArgument(1); + handler.handle(Future.failedFuture(new RuntimeException("Failed to retrieve async map"))); + return null; + }).when(sharedData).getAsyncMap(anyString(), any()); + + // Test when asyncMap retrieval fails + Async async = context.async(); + dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> { + context.assertTrue(result.failed()); + context.assertEquals("Failed to retrieve async map", result.cause().getMessage()); + async.complete(); + }); + + // Verify that sharedData.getAsyncMap was used, but asyncMap.entries() was not + verify(sharedData, times(1)).getAsyncMap(anyString(), any()); + verifyNoInteractions(asyncMap); + } + + @Test + public void testGetAllDequeueStatisticsEntriesFailure(TestContext context) { + // Simulate success in sharedData.getAsyncMap, but failure in asyncMap.entries + doAnswer(invocation -> { + io.vertx.core.Handler>> handler = invocation.getArgument(1); + handler.handle(Future.succeededFuture(asyncMap)); + return null; + }).when(sharedData).getAsyncMap(anyString(), any()); + + when(asyncMap.entries()).thenReturn(Future.failedFuture(new RuntimeException("Failed to retrieve entries"))); + + // Test when asyncMap.entries fails + Async async = context.async(); + dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> { + context.assertTrue(result.failed()); + context.assertEquals("Failed to retrieve entries", result.cause().getMessage()); + async.complete(); + }); + + // Verify that sharedData.getAsyncMap and asyncMap.entries were used correctly + verify(sharedData, times(1)).getAsyncMap(anyString(), any()); + verify(asyncMap, times(1)).entries(); } }