Skip to content

Commit

Permalink
Implement suggestions from code review.
Browse files Browse the repository at this point in the history
Create a test in the different way.
remove unnecessary code and imports
Rollback private declarations
  • Loading branch information
almeidast committed Sep 19, 2024
1 parent f64f24d commit 8ad5738
Show file tree
Hide file tree
Showing 8 changed files with 104 additions and 243 deletions.
15 changes: 4 additions & 11 deletions src/main/java/org/swisspush/redisques/QueueStatsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.DequeueStatisticCollector;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisquesConfiguration;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -48,7 +47,6 @@ public class QueueStatsService {
private final DequeueStatisticCollector dequeueStatisticCollector;
private final RedisQuesExceptionFactory exceptionFactory;
private final Semaphore incomingRequestQuota;
private final Boolean fetchQueueStats;

public QueueStatsService(
Vertx vertx,
Expand All @@ -57,8 +55,7 @@ public QueueStatsService(
QueueStatisticsCollector queueStatisticsCollector,
DequeueStatisticCollector dequeueStatisticCollector,
RedisQuesExceptionFactory exceptionFactory,
Semaphore incomingRequestQuota,
Boolean fetchQueueStats
Semaphore incomingRequestQuota
) {
this.vertx = vertx;
this.eventBus = eventBus;
Expand All @@ -67,7 +64,6 @@ public QueueStatsService(
this.dequeueStatisticCollector = dequeueStatisticCollector;
this.exceptionFactory = exceptionFactory;
this.incomingRequestQuota = incomingRequestQuota;
this.fetchQueueStats = fetchQueueStats;
}

public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
Expand Down Expand Up @@ -98,13 +94,10 @@ public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
for (Queue q : req1.queues) req1.queueNames.add(q.name);
fetchRetryDetails(req1, (ex2, req2) -> {
if (ex2 != null) { onDone.accept(ex2, null); return; }
if (fetchQueueStats) {
attachDequeueStats(req2, (ex3, req3) -> {
if (ex3 != null) { onDone.accept(ex3, null); return; }
onDone.accept(null, req3.queues);
});
}
onDone.accept(null, req2.queues);
});
});
} catch (Exception ex) {
Expand All @@ -117,7 +110,7 @@ public <CTX> void getQueueStats(CTX mCtx, GetQueueStatsMentor<CTX> mentor) {
}
}

<CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
private <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
String filter = req.mentor.filter(req.mCtx);
JsonObject operation = buildGetQueuesItemsCountOperation(filter);
eventBus.<JsonObject>request(redisquesAddress, operation, ev -> {
Expand Down Expand Up @@ -164,7 +157,7 @@ <CTX> void fetchQueueNamesAndSize(GetQueueStatsRequest<CTX> req, BiConsumer<Thro
});
}

<CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
private <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
long begGetQueueStatsMs = currentTimeMillis();
assert req.queueNames != null;
queueStatisticsCollector.getQueueStatistics(req.queueNames).onComplete( ev -> {
Expand All @@ -190,7 +183,7 @@ <CTX> void fetchRetryDetails(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable
});
}

<CTX> void attachDequeueStats(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
private <CTX> void attachDequeueStats(GetQueueStatsRequest<CTX> req, BiConsumer<Throwable, GetQueueStatsRequest<CTX>> onDone) {
dequeueStatisticCollector.getAllDequeueStatistics().onSuccess(event -> {
for (Queue queue : req.queues) {
if (event.containsKey(queue.name)) {
Expand Down
9 changes: 4 additions & 5 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,6 @@ public void start(Promise<Void> promise) {
this.configurationProvider = new DefaultRedisquesConfigurationProvider(vertx, config());
}

if (this.dequeueStatisticCollector == null) {
this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx);
}

if (this.periodicSkipScheduler == null) {
this.periodicSkipScheduler = new PeriodicSkipScheduler(vertx);
}
Expand All @@ -323,11 +319,14 @@ public void start(Promise<Void> promise) {
log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration());

int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec();
if (dequeueStatisticReportIntervalSec > 0) {
if (modConfig.isDequeueStatsEnabled()) {
dequeueStatisticEnabled = true;
Runnable publisher = newDequeueStatisticPublisher();
vertx.setPeriodic(1000L * dequeueStatisticReportIntervalSec, time -> publisher.run());
}
if (this.dequeueStatisticCollector == null) {
this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx,dequeueStatisticEnabled);
}
queuesKey = modConfig.getRedisPrefix() + "queues";
queuesPrefix = modConfig.getRedisPrefix() + "queues:";
consumersPrefix = modConfig.getRedisPrefix() + "consumers:";
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/swisspush/redisques/RedisQuesRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public static void main(String[] args) {
.redisReconnectAttempts(-1)
.redisPoolRecycleTimeoutMs(-1)
.redisReadyCheckIntervalMs(5000)
//.dequeueStatisticReportIntervalSec(10)
.build().asJsonObject();

Vertx.vertx().deployVerticle(new RedisQues(), new DeploymentOptions().setConfig(configuration),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private RedisquesHttpRequestHandler(
this.exceptionFactory = exceptionFactory;
this.queueStatsService = new QueueStatsService(
vertx, eventBus, redisquesAddress, queueStatisticsCollector, dequeueStatisticCollector,
exceptionFactory, queueStatsRequestQuota,modConfig.isDequeueStatsEnabled());
exceptionFactory, queueStatsRequestQuota);

final String prefix = modConfig.getHttpRequestHandlerPrefix();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;

public class DequeueStatisticCollector {
private static final Logger log = LoggerFactory.getLogger(DequeueStatisticCollector.class);
private final static String DEQUEUE_STATISTIC_DATA = "dequeueStatisticData";
private final static String DEQUEUE_STATISTIC_LOCK_PREFIX = "dequeueStatisticLock.";
private final SharedData sharedData;
private final boolean dequeueStatisticEnabled;

public DequeueStatisticCollector(Vertx vertx) {
public DequeueStatisticCollector(Vertx vertx, boolean dequeueStatisticEnabled) {
this.sharedData = vertx.sharedData();
this.dequeueStatisticEnabled = dequeueStatisticEnabled;
}

/**
Expand Down Expand Up @@ -107,6 +110,10 @@ public Future<Void> setDequeueStatistic(final String queueName, final DequeueSta
}

public Future<Map<String, DequeueStatistic>> getAllDequeueStatistics() {
// Check if dequeue statistics are enabled
if (!dequeueStatisticEnabled) {
return Future.succeededFuture(Collections.emptyMap()); // Return an empty map to avoid NullPointerExceptions
}
Promise<Map<String, DequeueStatistic>> promise = Promise.promise();
sharedData.getAsyncMap(DEQUEUE_STATISTIC_DATA, (Handler<AsyncResult<AsyncMap<String, DequeueStatistic>>>) asyncResult -> {
if (asyncResult.failed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,9 @@ public String toString() {
return asJsonObject().toString();
}

public boolean isDequeueStatsEnabled() { return getDequeueStatisticReportIntervalSec()>0; }
public boolean isDequeueStatsEnabled() {
return getDequeueStatisticReportIntervalSec() > 0;
}

/**
* RedisquesConfigurationBuilder class for simplified configuration.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.swisspush.redisques;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.SharedData;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.DequeueStatisticCollector;

import java.util.HashMap;
import java.util.Map;

import static org.mockito.Mockito.*;

@RunWith(VertxUnitRunner.class)
public class DequeueStatisticCollectorTest {

private Vertx vertx;
private SharedData sharedData;
private AsyncMap<String, DequeueStatistic> asyncMap;
private DequeueStatisticCollector dequeueStatisticCollectorEnabled;
private DequeueStatisticCollector dequeueStatisticCollectorDisabled;

@Before
public void setUp() {
vertx = mock(Vertx.class);
sharedData = mock(SharedData.class);
asyncMap = mock(AsyncMap.class);

doAnswer(invocation -> {
io.vertx.core.Handler<io.vertx.core.AsyncResult<AsyncMap<String, DequeueStatistic>>> handler = invocation.getArgument(1);
handler.handle(Future.succeededFuture(asyncMap));
return null;
}).when(sharedData).getAsyncMap(anyString(), any());

when(vertx.sharedData()).thenReturn(sharedData);

// Set up the enabled and disabled DequeueStatisticCollector
dequeueStatisticCollectorEnabled = new DequeueStatisticCollector(vertx, true);
dequeueStatisticCollectorDisabled = new DequeueStatisticCollector(vertx, false);
}

@Test
public void testGetAllDequeueStatisticsEnabled(TestContext context) {
// Mocking asyncMap.entries() to return a non-empty map
Map<String, DequeueStatistic> dequeueStats = new HashMap<>();
dequeueStats.put("queue1", new DequeueStatistic());
when(asyncMap.entries()).thenReturn(Future.succeededFuture(dequeueStats));

// Test for when dequeue statistics are enabled
Async async = context.async();
dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> {
context.assertTrue(result.succeeded());
context.assertEquals(1, result.result().size());
async.complete();
});

// Verify that sharedData and asyncMap were used correctly
verify(sharedData, times(1)).getAsyncMap(anyString(), any());
verify(asyncMap, times(1)).entries();
}

@Test
public void testGetAllDequeueStatisticsDisabled(TestContext context) {
// Test for when dequeue statistics are disabled
Async async = context.async();
dequeueStatisticCollectorDisabled.getAllDequeueStatistics().onComplete(result -> {
context.assertTrue(result.succeeded());
context.assertTrue(result.result().isEmpty());
async.complete();
});

// Verify that sharedData and asyncMap were NOT used
verify(sharedData, never()).getAsyncMap(anyString(), any());
verify(asyncMap, never()).entries();
}
}
Loading

0 comments on commit 8ad5738

Please sign in to comment.