Skip to content

Commit

Permalink
Fix QueueStatsService, remove no necessary imports, create a helper t…
Browse files Browse the repository at this point in the history
…o avoid pass all RedisConfiguration class, adjusts tests.
  • Loading branch information
almeidast committed Sep 11, 2024
1 parent e0fbb6f commit f64f24d
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 36 deletions.
12 changes: 5 additions & 7 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,7 @@
import org.swisspush.redisques.util.RedisquesConfiguration;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -51,7 +48,7 @@ public class QueueStatsService {
private final DequeueStatisticCollector dequeueStatisticCollector;
private final RedisQuesExceptionFactory exceptionFactory;
private final Semaphore incomingRequestQuota;
private final RedisquesConfiguration modConfig;
private final Boolean fetchQueueStats;

public QueueStatsService(
Vertx vertx,
Expand All @@ -61,7 +58,7 @@ public QueueStatsService(
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore incomingRequestQuota,
RedisquesConfiguration modConfig
Boolean fetchQueueStats
) {
this.vertx = vertx;
this.eventBus = eventBus;
Expand All @@ -70,7 +67,7 @@ public QueueStatsService(
this.dequeueStatisticCollector = dequeueStatisticCollector;
this.exceptionFactory = exceptionFactory;
this.incomingRequestQuota = incomingRequestQuota;
this.modConfig = modConfig;
this.fetchQueueStats = fetchQueueStats;
}

public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
Expand Down Expand Up @@ -101,12 +98,13 @@ public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
for (Queue q : req1.queues) req1.queueNames.add(q.name);
fetchRetryDetails(req1, (ex2, req2) -> {
if (ex2 != null) { onDone.accept(ex2, null); return; }
if (modConfig.getDequeueStatisticReportIntervalSec() > 0){
if (fetchQueueStats) {
attachDequeueStats(req2, (ex3, req3) -> {
if (ex3 != null) { onDone.accept(ex3, null); return; }
onDone.accept(null, req3.queues);
});
}
onDone.accept(null, req2.queues);
});
});
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private RedisquesHttpRequestHandler(
this.exceptionFactory = exceptionFactory;
this.queueStatsService = new QueueStatsService(
vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota,modConfig);
exceptionFactory, queueStatsRequestQuota,modConfig.isDequeueStatsEnabled());

final String prefix = modConfig.getHttpRequestHandlerPrefix();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,8 @@ public String toString() {
return asJsonObject().toString();
}

public boolean isDequeueStatsEnabled() { return getDequeueStatisticReportIntervalSec()>0; }

/**
* RedisquesConfigurationBuilder class for simplified configuration.
*
Expand Down
89 changes: 61 additions & 28 deletions src/test/java/org/swisspush/redisques/QueueStatsServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ public class QueueStatsServiceTest {
private RedisquesConfiguration config;
private DequeueStatisticCollector dequeueStatisticCollector;

/**
* Test setup for cases where dequeue stats are enabled.
*/
@Before
public void setUp() {
// Initialize Vertx and mock dependencies
public void setUpForEnabledDequeueStats() {
vertx = Vertx.vertx();
eventBus = Mockito.mock(EventBus.class);
QueueStatisticsCollector queueStatisticsCollector = Mockito.mock(QueueStatisticsCollector.class);
Expand All @@ -42,25 +44,48 @@ public void setUp() {
Semaphore semaphore = new Semaphore(1);
config = Mockito.spy(new RedisquesConfiguration());

// Create the QueueStatsService instance with mocked dependencies
// Ensure fetchQueueStats is true
queueStatsService = Mockito.spy(new QueueStatsService(vertx, eventBus, "redisques", queueStatisticsCollector,
dequeueStatisticCollector, exceptionFactory, semaphore, config));
dequeueStatisticCollector, exceptionFactory, semaphore, true));
}

/**
* Test setup for cases where dequeue stats are disabled.
*/
@Before
public void setUpForDisabledDequeueStats() {
vertx = Vertx.vertx();
eventBus = Mockito.mock(EventBus.class);
QueueStatisticsCollector queueStatisticsCollector = Mockito.mock(QueueStatisticsCollector.class);
dequeueStatisticCollector = Mockito.mock(DequeueStatisticCollector.class);
RedisQuesExceptionFactory exceptionFactory = Mockito.mock(RedisQuesExceptionFactory.class);
Semaphore semaphore = new Semaphore(1);
config = Mockito.spy(new RedisquesConfiguration());

// Ensure fetchQueueStats is false
queueStatsService = Mockito.spy(new QueueStatsService(vertx, eventBus, "redisques", queueStatisticsCollector,
dequeueStatisticCollector, exceptionFactory, semaphore, false));
}

/**
* Test to verify that dequeue statistics are collected when the interval is greater than zero.
*/
@Test
public void testDequeueStatsCalledWhenIntervalGreaterThanZero(TestContext testContext) {
// Mock the config so that getDequeueStatisticReportIntervalSec returns a value greater than 0
Mockito.when(config.getDequeueStatisticReportIntervalSec()).thenReturn(10); // Value greater than 0
setUpForEnabledDequeueStats();

// Mock the getAllDequeueStatistics method to return a successful Future
// Mock for enabling dequeue stats
Mockito.when(config.isDequeueStatsEnabled()).thenReturn(true);

// Mock method to return successful Future with dequeue statistics
HashMap<String, DequeueStatistic> dequeueStatistics = new HashMap<>();
dequeueStatistics.put("testQueue", new DequeueStatistic());
Mockito.when(dequeueStatisticCollector.getAllDequeueStatistics()).thenReturn(Future.succeededFuture(dequeueStatistics));

// Set up async for the test
// Async setup for the test
Async async = testContext.async();

// Mock a context and mentor
// Mock the context and mentor
Object mockContext = new Object();
QueueStatsService.GetQueueStatsMentor<Object> mentor = new QueueStatsService.GetQueueStatsMentor<>() {
@Override
Expand All @@ -80,8 +105,8 @@ public String filter(Object ctx) {

@Override
public void onQueueStatistics(java.util.List<QueueStatsService.Queue> queues, Object ctx) {
// Assert the dequeue stats were attached
testContext.assertTrue(queues.get(0).getSize() >0);
testContext.assertNotNull(queues);
testContext.assertEquals(1, queues.size());
async.complete();
}

Expand All @@ -91,7 +116,7 @@ public void onError(Throwable ex, Object ctx) {
}
};

// Mock fetchQueueNamesAndSize to return a valid list of queues
// Mock fetchQueueNamesAndSize
Mockito.doAnswer(invocation -> {
BiConsumer<Throwable, QueueStatsService.GetQueueStatsRequest<Object>> callback = invocation.getArgument(1);
QueueStatsService.GetQueueStatsRequest<Object> request = invocation.getArgument(0);
Expand All @@ -100,39 +125,46 @@ public void onError(Throwable ex, Object ctx) {
return null;
}).when(queueStatsService).fetchQueueNamesAndSize(Mockito.any(), Mockito.any());

// Mock fetchRetryDetails to do nothing and call the next step
// Mock fetchRetryDetails
Mockito.doAnswer(invocation -> {
BiConsumer<Throwable, QueueStatsService.GetQueueStatsRequest<Object>> callback = invocation.getArgument(1);
QueueStatsService.GetQueueStatsRequest<Object> request = invocation.getArgument(0);
callback.accept(null, request);
return null;
}).when(queueStatsService).fetchRetryDetails(Mockito.any(), Mockito.any());

// Call getQueueStats to trigger the flow
// Call the method getQueueStats
queueStatsService.getQueueStats(mockContext, mentor);

// Use ArgumentCaptor to capture the arguments passed to attachDequeueStats
// Capture arguments passed to attachDequeueStats
ArgumentCaptor<QueueStatsService.GetQueueStatsRequest<Object>> requestCaptor = ArgumentCaptor.forClass(QueueStatsService.GetQueueStatsRequest.class);
ArgumentCaptor<BiConsumer<Throwable, QueueStatsService.GetQueueStatsRequest<Object>>> consumerCaptor = ArgumentCaptor.forClass(BiConsumer.class);

// Verify that attachDequeueStats was called with the correct arguments
// Verify that attachDequeueStats was called
Mockito.verify(queueStatsService).attachDequeueStats(requestCaptor.capture(), consumerCaptor.capture());

// Assert that the arguments are not null and match expectations
// Verify that the request contains the correct queue
testContext.assertNotNull(requestCaptor.getValue());
testContext.assertNotNull(consumerCaptor.getValue());
testContext.assertEquals("testQueue", requestCaptor.getValue().queues.get(0).getName());

// Verify that async completed correctly
testContext.assertTrue(async.count() == 0);
}

/**
* Test to verify that dequeue statistics are not collected when the interval is zero or negative.
*/
@Test
public void testDequeueStatsNotCalledWhenIntervalIsZeroOrNegative(TestContext testContext) {
// Mock the config so that getDequeueStatisticReportIntervalSec returns 0 or a negative value
Mockito.when(config.getDequeueStatisticReportIntervalSec()).thenReturn(0); // Value equal to 0
// Alternatively: Mockito.when(config.getDequeueStatisticReportIntervalSec()).thenReturn(-1); // For negative value
setUpForDisabledDequeueStats();

// Mock the configuration to disable dequeue stats
Mockito.when(config.isDequeueStatsEnabled()).thenReturn(false);

// Set up async for the test
// Async setup for the test
Async async = testContext.async();

// Mock a context and mentor
// Mock the context and mentor
Object mockContext = new Object();
QueueStatsService.GetQueueStatsMentor<Object> mentor = new QueueStatsService.GetQueueStatsMentor<>() {
@Override
Expand All @@ -152,7 +184,7 @@ public String filter(Object ctx) {

@Override
public void onQueueStatistics(java.util.List<QueueStatsService.Queue> queues, Object ctx) {
// Assert the dequeue stats were not attached
// Ensure that nextDequeueDueTimestampEpochMs is null since stats are not attached
testContext.assertTrue(queues.get(0).getNextDequeueDueTimestampEpochMs() == null);
async.complete();
}
Expand All @@ -163,7 +195,7 @@ public void onError(Throwable ex, Object ctx) {
}
};

// Mock fetchQueueNamesAndSize to return a valid list of queues
// Mock fetchQueueNamesAndSize
Mockito.doAnswer(invocation -> {
BiConsumer<Throwable, QueueStatsService.GetQueueStatsRequest<Object>> callback = invocation.getArgument(1);
QueueStatsService.GetQueueStatsRequest<Object> request = invocation.getArgument(0);
Expand All @@ -172,20 +204,21 @@ public void onError(Throwable ex, Object ctx) {
return null;
}).when(queueStatsService).fetchQueueNamesAndSize(Mockito.any(), Mockito.any());

// Mock fetchRetryDetails to do nothing and call the next step
// Mock fetchRetryDetails
Mockito.doAnswer(invocation -> {
BiConsumer<Throwable, QueueStatsService.GetQueueStatsRequest<Object>> callback = invocation.getArgument(1);
QueueStatsService.GetQueueStatsRequest<Object> request = invocation.getArgument(0);
callback.accept(null, request);
return null;
}).when(queueStatsService).fetchRetryDetails(Mockito.any(), Mockito.any());

// Call getQueueStats to trigger the flow
// Call the method getQueueStats
queueStatsService.getQueueStats(mockContext, mentor);

// Verify that attachDequeueStats was NOT called
Mockito.verify(queueStatsService, Mockito.never()).attachDequeueStats(Mockito.any(), Mockito.any());

async.complete();
// Verify that async completed correctly
async.awaitSuccess();
}
}

0 comments on commit f64f24d

Please sign in to comment.