From 8ad573892d7cf18293d9cc3c7c536aa898c9f6ae Mon Sep 17 00:00:00 2001 From: almeidast Date: Thu, 19 Sep 2024 13:07:24 +0200 Subject: [PATCH] Implement suggestions from code review. Create a test in the different way. remove unnecessary code and imports Rollback private declarations --- .../redisques/QueueStatsService.java | 15 +- .../org/swisspush/redisques/RedisQues.java | 9 +- .../swisspush/redisques/RedisQuesRunner.java | 1 + .../handler/RedisquesHttpRequestHandler.java | 2 +- .../util/DequeueStatisticCollector.java | 9 +- .../util/RedisquesConfiguration.java | 4 +- .../DequeueStatisticCollectorTest.java | 83 +++++++ .../redisques/QueueStatsServiceTest.java | 224 ------------------ 8 files changed, 104 insertions(+), 243 deletions(-) create mode 100644 src/test/java/org/swisspush/redisques/DequeueStatisticCollectorTest.java delete mode 100644 src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index 96148795..e635d8f8 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -9,7 +9,6 @@ import org.swisspush.redisques.util.DequeueStatistic; import org.swisspush.redisques.util.DequeueStatisticCollector; import org.swisspush.redisques.util.QueueStatisticsCollector; -import org.swisspush.redisques.util.RedisquesConfiguration; import java.util.ArrayList; import java.util.List; @@ -48,7 +47,6 @@ public class QueueStatsService { private final DequeueStatisticCollector dequeueStatisticCollector; private final RedisQuesExceptionFactory exceptionFactory; private final Semaphore incomingRequestQuota; - private final Boolean fetchQueueStats; public QueueStatsService( Vertx vertx, @@ -57,8 +55,7 @@ public QueueStatsService( QueueStatisticsCollector queueStatisticsCollector, DequeueStatisticCollector dequeueStatisticCollector, RedisQuesExceptionFactory exceptionFactory, - Semaphore incomingRequestQuota, - Boolean fetchQueueStats + Semaphore incomingRequestQuota ) { this.vertx = vertx; this.eventBus = eventBus; @@ -67,7 +64,6 @@ public QueueStatsService( this.dequeueStatisticCollector = dequeueStatisticCollector; this.exceptionFactory = exceptionFactory; this.incomingRequestQuota = incomingRequestQuota; - this.fetchQueueStats = fetchQueueStats; } public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { @@ -98,13 +94,10 @@ public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { for (Queue q : req1.queues) req1.queueNames.add(q.name); fetchRetryDetails(req1, (ex2, req2) -> { if (ex2 != null) { onDone.accept(ex2, null); return; } - if (fetchQueueStats) { attachDequeueStats(req2, (ex3, req3) -> { if (ex3 != null) { onDone.accept(ex3, null); return; } onDone.accept(null, req3.queues); }); - } - onDone.accept(null, req2.queues); }); }); } catch (Exception ex) { @@ -117,7 +110,7 @@ public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { } } - void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer> onDone) { + private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer> onDone) { String filter = req.mentor.filter(req.mCtx); JsonObject operation = buildGetQueuesItemsCountOperation(filter); eventBus.request(redisquesAddress, operation, ev -> { @@ -164,7 +157,7 @@ void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer> onDone) { + private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer> onDone) { long begGetQueueStatsMs = currentTimeMillis(); assert req.queueNames != null; queueStatisticsCollector.getQueueStatistics(req.queueNames).onComplete( ev -> { @@ -190,7 +183,7 @@ void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer void attachDequeueStats(GetQueueStatsRequest req, BiConsumer> onDone) { + private void attachDequeueStats(GetQueueStatsRequest req, BiConsumer> onDone) { dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(event -> { for (Queue queue : req.queues) { if (event.containsKey(queue.name)) { diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index bf2318d2..8fc491b5 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -311,10 +311,6 @@ public void start(Promise promise) { this.configurationProvider = new DefaultRedisquesConfigurationProvider(vertx, config()); } - if (this.dequeueStatisticCollector == null) { - this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx); - } - if (this.periodicSkipScheduler == null) { this.periodicSkipScheduler = new PeriodicSkipScheduler(vertx); } @@ -323,11 +319,14 @@ public void start(Promise promise) { log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration()); int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec(); - if (dequeueStatisticReportIntervalSec > 0) { + if (modConfig.isDequeueStatsEnabled()) { dequeueStatisticEnabled = true; Runnable publisher = newDequeueStatisticPublisher(); vertx.setPeriodic(1000L * dequeueStatisticReportIntervalSec, time -> publisher.run()); } + if (this.dequeueStatisticCollector == null) { + this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx,dequeueStatisticEnabled); + } queuesKey = modConfig.getRedisPrefix() + "queues"; queuesPrefix = modConfig.getRedisPrefix() + "queues:"; consumersPrefix = modConfig.getRedisPrefix() + "consumers:"; diff --git a/src/main/java/org/swisspush/redisques/RedisQuesRunner.java b/src/main/java/org/swisspush/redisques/RedisQuesRunner.java index ca78c38d..f920e1e6 100644 --- a/src/main/java/org/swisspush/redisques/RedisQuesRunner.java +++ b/src/main/java/org/swisspush/redisques/RedisQuesRunner.java @@ -21,6 +21,7 @@ public static void main(String[] args) { .redisReconnectAttempts(-1) .redisPoolRecycleTimeoutMs(-1) .redisReadyCheckIntervalMs(5000) + //.dequeueStatisticReportIntervalSec(10) .build().asJsonObject(); Vertx.vertx().deployVerticle(new RedisQues(), new DeploymentOptions().setConfig(configuration), diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index 0cf47cec..a5af7a1f 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -138,7 +138,7 @@ private RedisquesHttpRequestHandler( this.exceptionFactory = exceptionFactory; this.queueStatsService = new QueueStatsService( vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector, - exceptionFactory, queueStatsRequestQuota,modConfig.isDequeueStatsEnabled()); + exceptionFactory, queueStatsRequestQuota); final String prefix = modConfig.getHttpRequestHandlerPrefix(); diff --git a/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java b/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java index 5f180186..bc5e1470 100644 --- a/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java +++ b/src/main/java/org/swisspush/redisques/util/DequeueStatisticCollector.java @@ -11,6 +11,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.Map; public class DequeueStatisticCollector { @@ -18,9 +19,11 @@ public class DequeueStatisticCollector { private final static String DEQUEUE_STATISTIC_DATA = "dequeueStatisticData"; private final static String DEQUEUE_STATISTIC_LOCK_PREFIX = "dequeueStatisticLock."; private final SharedData sharedData; + private final boolean dequeueStatisticEnabled; - public DequeueStatisticCollector(Vertx vertx) { + public DequeueStatisticCollector(Vertx vertx, boolean dequeueStatisticEnabled) { this.sharedData = vertx.sharedData(); + this.dequeueStatisticEnabled = dequeueStatisticEnabled; } /** @@ -107,6 +110,10 @@ public Future setDequeueStatistic(final String queueName, final DequeueSta } public Future> getAllDequeueStatistics() { + // Check if dequeue statistics are enabled + if (!dequeueStatisticEnabled) { + return Future.succeededFuture(Collections.emptyMap()); // Return an empty map to avoid NullPointerExceptions + } Promise> promise = Promise.promise(); sharedData.getAsyncMap(DEQUEUE_STATISTIC_DATA, (Handler>>) asyncResult -> { if (asyncResult.failed()) { diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java index 01148b98..20a11b88 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java @@ -704,7 +704,9 @@ public String toString() { return asJsonObject().toString(); } - public boolean isDequeueStatsEnabled() { return getDequeueStatisticReportIntervalSec()>0; } + public boolean isDequeueStatsEnabled() { + return getDequeueStatisticReportIntervalSec() > 0; + } /** * RedisquesConfigurationBuilder class for simplified configuration. diff --git a/src/test/java/org/swisspush/redisques/DequeueStatisticCollectorTest.java b/src/test/java/org/swisspush/redisques/DequeueStatisticCollectorTest.java new file mode 100644 index 00000000..219d49dd --- /dev/null +++ b/src/test/java/org/swisspush/redisques/DequeueStatisticCollectorTest.java @@ -0,0 +1,83 @@ +package org.swisspush.redisques; + +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.shareddata.AsyncMap; +import io.vertx.core.shareddata.SharedData; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.swisspush.redisques.util.DequeueStatistic; +import org.swisspush.redisques.util.DequeueStatisticCollector; + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.*; + +@RunWith(VertxUnitRunner.class) +public class DequeueStatisticCollectorTest { + + private Vertx vertx; + private SharedData sharedData; + private AsyncMap asyncMap; + private DequeueStatisticCollector dequeueStatisticCollectorEnabled; + private DequeueStatisticCollector dequeueStatisticCollectorDisabled; + + @Before + public void setUp() { + vertx = mock(Vertx.class); + sharedData = mock(SharedData.class); + asyncMap = mock(AsyncMap.class); + + doAnswer(invocation -> { + io.vertx.core.Handler>> handler = invocation.getArgument(1); + handler.handle(Future.succeededFuture(asyncMap)); + return null; + }).when(sharedData).getAsyncMap(anyString(), any()); + + when(vertx.sharedData()).thenReturn(sharedData); + + // Set up the enabled and disabled DequeueStatisticCollector + dequeueStatisticCollectorEnabled = new DequeueStatisticCollector(vertx, true); + dequeueStatisticCollectorDisabled = new DequeueStatisticCollector(vertx, false); + } + + @Test + public void testGetAllDequeueStatisticsEnabled(TestContext context) { + // Mocking asyncMap.entries() to return a non-empty map + Map dequeueStats = new HashMap<>(); + dequeueStats.put("queue1", new DequeueStatistic()); + when(asyncMap.entries()).thenReturn(Future.succeededFuture(dequeueStats)); + + // Test for when dequeue statistics are enabled + Async async = context.async(); + dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> { + context.assertTrue(result.succeeded()); + context.assertEquals(1, result.result().size()); + async.complete(); + }); + + // Verify that sharedData and asyncMap were used correctly + verify(sharedData, times(1)).getAsyncMap(anyString(), any()); + verify(asyncMap, times(1)).entries(); + } + + @Test + public void testGetAllDequeueStatisticsDisabled(TestContext context) { + // Test for when dequeue statistics are disabled + Async async = context.async(); + dequeueStatisticCollectorDisabled.getAllDequeueStatistics().onComplete(result -> { + context.assertTrue(result.succeeded()); + context.assertTrue(result.result().isEmpty()); + async.complete(); + }); + + // Verify that sharedData and asyncMap were NOT used + verify(sharedData, never()).getAsyncMap(anyString(), any()); + verify(asyncMap, never()).entries(); + } +} diff --git a/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java b/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java deleted file mode 100644 index b36231b0..00000000 --- a/src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java +++ /dev/null @@ -1,224 +0,0 @@ -package org.swisspush.redisques; - -import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.core.eventbus.EventBus; -import io.vertx.ext.unit.Async; -import io.vertx.ext.unit.TestContext; -import io.vertx.ext.unit.junit.VertxUnitRunner; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; -import org.swisspush.redisques.util.DequeueStatistic; -import org.swisspush.redisques.util.DequeueStatisticCollector; -import org.swisspush.redisques.util.QueueStatisticsCollector; -import org.swisspush.redisques.util.RedisquesConfiguration; -import org.swisspush.redisques.exception.RedisQuesExceptionFactory; - -import java.util.Collections; -import java.util.HashMap; -import java.util.concurrent.Semaphore; -import java.util.function.BiConsumer; - -@RunWith(VertxUnitRunner.class) -public class QueueStatsServiceTest { - - private Vertx vertx; - private EventBus eventBus; - private QueueStatsService queueStatsService; - private RedisquesConfiguration config; - private DequeueStatisticCollector dequeueStatisticCollector; - - /** - * Test setup for cases where dequeue stats are enabled. - */ - @Before - public void setUpForEnabledDequeueStats() { - vertx = Vertx.vertx(); - eventBus = Mockito.mock(EventBus.class); - QueueStatisticsCollector queueStatisticsCollector = Mockito.mock(QueueStatisticsCollector.class); - dequeueStatisticCollector = Mockito.mock(DequeueStatisticCollector.class); - RedisQuesExceptionFactory exceptionFactory = Mockito.mock(RedisQuesExceptionFactory.class); - Semaphore semaphore = new Semaphore(1); - config = Mockito.spy(new RedisquesConfiguration()); - - // Ensure fetchQueueStats is true - queueStatsService = Mockito.spy(new QueueStatsService(vertx, eventBus, "redisques", queueStatisticsCollector, - dequeueStatisticCollector, exceptionFactory, semaphore, true)); - } - - /** - * Test setup for cases where dequeue stats are disabled. - */ - @Before - public void setUpForDisabledDequeueStats() { - vertx = Vertx.vertx(); - eventBus = Mockito.mock(EventBus.class); - QueueStatisticsCollector queueStatisticsCollector = Mockito.mock(QueueStatisticsCollector.class); - dequeueStatisticCollector = Mockito.mock(DequeueStatisticCollector.class); - RedisQuesExceptionFactory exceptionFactory = Mockito.mock(RedisQuesExceptionFactory.class); - Semaphore semaphore = new Semaphore(1); - config = Mockito.spy(new RedisquesConfiguration()); - - // Ensure fetchQueueStats is false - queueStatsService = Mockito.spy(new QueueStatsService(vertx, eventBus, "redisques", queueStatisticsCollector, - dequeueStatisticCollector, exceptionFactory, semaphore, false)); - } - - /** - * Test to verify that dequeue statistics are collected when the interval is greater than zero. - */ - @Test - public void testDequeueStatsCalledWhenIntervalGreaterThanZero(TestContext testContext) { - setUpForEnabledDequeueStats(); - - // Mock for enabling dequeue stats - Mockito.when(config.isDequeueStatsEnabled()).thenReturn(true); - - // Mock method to return successful Future with dequeue statistics - HashMap dequeueStatistics = new HashMap<>(); - dequeueStatistics.put("testQueue", new DequeueStatistic()); - Mockito.when(dequeueStatisticCollector.getAllDequeueStatistics()).thenReturn(Future.succeededFuture(dequeueStatistics)); - - // Async setup for the test - Async async = testContext.async(); - - // Mock the context and mentor - Object mockContext = new Object(); - QueueStatsService.GetQueueStatsMentor mentor = new QueueStatsService.GetQueueStatsMentor<>() { - @Override - public boolean includeEmptyQueues(Object ctx) { - return true; - } - - @Override - public int limit(Object ctx) { - return 10; - } - - @Override - public String filter(Object ctx) { - return "*"; - } - - @Override - public void onQueueStatistics(java.util.List queues, Object ctx) { - testContext.assertNotNull(queues); - testContext.assertEquals(1, queues.size()); - async.complete(); - } - - @Override - public void onError(Throwable ex, Object ctx) { - testContext.fail(ex); - } - }; - - // Mock fetchQueueNamesAndSize - Mockito.doAnswer(invocation -> { - BiConsumer> callback = invocation.getArgument(1); - QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); - request.queues = Collections.singletonList(new QueueStatsService.Queue("testQueue", 1)); - callback.accept(null, request); - return null; - }).when(queueStatsService).fetchQueueNamesAndSize(Mockito.any(), Mockito.any()); - - // Mock fetchRetryDetails - Mockito.doAnswer(invocation -> { - BiConsumer> callback = invocation.getArgument(1); - QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); - callback.accept(null, request); - return null; - }).when(queueStatsService).fetchRetryDetails(Mockito.any(), Mockito.any()); - - // Call the method getQueueStats - queueStatsService.getQueueStats(mockContext, mentor); - - // Capture arguments passed to attachDequeueStats - ArgumentCaptor> requestCaptor = ArgumentCaptor.forClass(QueueStatsService.GetQueueStatsRequest.class); - ArgumentCaptor>> consumerCaptor = ArgumentCaptor.forClass(BiConsumer.class); - - // Verify that attachDequeueStats was called - Mockito.verify(queueStatsService).attachDequeueStats(requestCaptor.capture(), consumerCaptor.capture()); - - // Verify that the request contains the correct queue - testContext.assertNotNull(requestCaptor.getValue()); - testContext.assertEquals("testQueue", requestCaptor.getValue().queues.get(0).getName()); - - // Verify that async completed correctly - testContext.assertTrue(async.count() == 0); - } - - /** - * Test to verify that dequeue statistics are not collected when the interval is zero or negative. - */ - @Test - public void testDequeueStatsNotCalledWhenIntervalIsZeroOrNegative(TestContext testContext) { - setUpForDisabledDequeueStats(); - - // Mock the configuration to disable dequeue stats - Mockito.when(config.isDequeueStatsEnabled()).thenReturn(false); - - // Async setup for the test - Async async = testContext.async(); - - // Mock the context and mentor - Object mockContext = new Object(); - QueueStatsService.GetQueueStatsMentor mentor = new QueueStatsService.GetQueueStatsMentor<>() { - @Override - public boolean includeEmptyQueues(Object ctx) { - return true; - } - - @Override - public int limit(Object ctx) { - return 10; - } - - @Override - public String filter(Object ctx) { - return "*"; - } - - @Override - public void onQueueStatistics(java.util.List queues, Object ctx) { - // Ensure that nextDequeueDueTimestampEpochMs is null since stats are not attached - testContext.assertTrue(queues.get(0).getNextDequeueDueTimestampEpochMs() == null); - async.complete(); - } - - @Override - public void onError(Throwable ex, Object ctx) { - testContext.fail(ex); - } - }; - - // Mock fetchQueueNamesAndSize - Mockito.doAnswer(invocation -> { - BiConsumer> callback = invocation.getArgument(1); - QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); - request.queues = Collections.singletonList(new QueueStatsService.Queue("testQueue", 1)); - callback.accept(null, request); - return null; - }).when(queueStatsService).fetchQueueNamesAndSize(Mockito.any(), Mockito.any()); - - // Mock fetchRetryDetails - Mockito.doAnswer(invocation -> { - BiConsumer> callback = invocation.getArgument(1); - QueueStatsService.GetQueueStatsRequest request = invocation.getArgument(0); - callback.accept(null, request); - return null; - }).when(queueStatsService).fetchRetryDetails(Mockito.any(), Mockito.any()); - - // Call the method getQueueStats - queueStatsService.getQueueStats(mockContext, mentor); - - // Verify that attachDequeueStats was NOT called - Mockito.verify(queueStatsService, Mockito.never()).attachDequeueStats(Mockito.any(), Mockito.any()); - - // Verify that async completed correctly - async.awaitSuccess(); - } -}