Skip to content

Commit

Permalink
Merge pull request #214 from swisspost/#213_http-api-very-slow
Browse files Browse the repository at this point in the history
#213 http api very slow
  • Loading branch information
mcweba authored Oct 7, 2024
2 parents 70dc264 + 824bbb9 commit 36ff2e7
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 6 deletions.
9 changes: 4 additions & 5 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,6 @@ public void start(Promise<Void> promise) {
this.configurationProvider = new DefaultRedisquesConfigurationProvider(vertx, config());
}

if (this.dequeueStatisticCollector == null) {
this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx);
}

if (this.periodicSkipScheduler == null) {
this.periodicSkipScheduler = new PeriodicSkipScheduler(vertx);
}
Expand All @@ -323,11 +319,14 @@ public void start(Promise<Void> promise) {
log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration());

int dequeueStatisticReportIntervalSec = modConfig.getDequeueStatisticReportIntervalSec();
if (dequeueStatisticReportIntervalSec > 0) {
if (modConfig.isDequeueStatsEnabled()) {
dequeueStatisticEnabled = true;
Runnable publisher = newDequeueStatisticPublisher();
vertx.setPeriodic(1000L * dequeueStatisticReportIntervalSec, time -> publisher.run());
}
if (this.dequeueStatisticCollector == null) {
this.dequeueStatisticCollector = new DequeueStatisticCollector(vertx,dequeueStatisticEnabled);
}
queuesKey = modConfig.getRedisPrefix() + "queues";
queuesPrefix = modConfig.getRedisPrefix() + "queues:";
consumersPrefix = modConfig.getRedisPrefix() + "consumers:";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,19 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.Map;

public class DequeueStatisticCollector {
private static final Logger log = LoggerFactory.getLogger(DequeueStatisticCollector.class);
private final static String DEQUEUE_STATISTIC_DATA = "dequeueStatisticData";
private final static String DEQUEUE_STATISTIC_LOCK_PREFIX = "dequeueStatisticLock.";
private final SharedData sharedData;
private final boolean dequeueStatisticEnabled;

public DequeueStatisticCollector(Vertx vertx) {
public DequeueStatisticCollector(Vertx vertx, boolean dequeueStatisticEnabled) {
this.sharedData = vertx.sharedData();
this.dequeueStatisticEnabled = dequeueStatisticEnabled;
}

/**
Expand Down Expand Up @@ -107,6 +110,10 @@ public Future<Void> setDequeueStatistic(final String queueName, final DequeueSta
}

public Future<Map<String, DequeueStatistic>> getAllDequeueStatistics() {
// Check if dequeue statistics are enabled
if (!dequeueStatisticEnabled) {
return Future.succeededFuture(Collections.emptyMap()); // Return an empty map to avoid NullPointerExceptions
}
Promise<Map<String, DequeueStatistic>> promise = Promise.promise();
sharedData.getAsyncMap(DEQUEUE_STATISTIC_DATA, (Handler<AsyncResult<AsyncMap<String, DequeueStatistic>>>) asyncResult -> {
if (asyncResult.failed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,10 @@ public String toString() {
return asJsonObject().toString();
}

public boolean isDequeueStatsEnabled() {
return getDequeueStatisticReportIntervalSec() > 0;
}

/**
* RedisquesConfigurationBuilder class for simplified configuration.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package org.swisspush.redisques.util;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.shareddata.AsyncMap;
import io.vertx.core.shareddata.SharedData;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.HashMap;
import java.util.Map;

import static org.mockito.Mockito.*;

@RunWith(VertxUnitRunner.class)
public class DequeueStatisticCollectorTest {

private Vertx vertx;
private SharedData sharedData;
private AsyncMap<String, DequeueStatistic> asyncMap;
private DequeueStatisticCollector dequeueStatisticCollectorEnabled;
private DequeueStatisticCollector dequeueStatisticCollectorDisabled;

@Before
public void setUp() {
vertx = mock(Vertx.class);
sharedData = mock(SharedData.class);
asyncMap = mock(AsyncMap.class);

// Mock sharedData.getAsyncMap to return asyncMap
doAnswer(invocation -> {
io.vertx.core.Handler<io.vertx.core.AsyncResult<AsyncMap<String, DequeueStatistic>>> handler = invocation.getArgument(1);
handler.handle(Future.succeededFuture(asyncMap));
return null;
}).when(sharedData).getAsyncMap(anyString(), any());

when(vertx.sharedData()).thenReturn(sharedData);

// Initialize DequeueStatisticCollector with enabled/disabled stats collection
dequeueStatisticCollectorEnabled = new DequeueStatisticCollector(vertx, true);
dequeueStatisticCollectorDisabled = new DequeueStatisticCollector(vertx, false);
}

@Test
public void testGetAllDequeueStatisticsEnabled(TestContext context) {
// Mock asyncMap.entries() to return a non-empty map
Map<String, DequeueStatistic> dequeueStats = new HashMap<>();
dequeueStats.put("queue1", new DequeueStatistic());
when(asyncMap.entries()).thenReturn(Future.succeededFuture(dequeueStats));

// Test when dequeue statistics are enabled
Async async = context.async();
dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> {
context.assertTrue(result.succeeded());
context.assertEquals(1, result.result().size());
async.complete();
});

// Verify that sharedData and asyncMap were used correctly
verify(sharedData, times(1)).getAsyncMap(anyString(), any());
verify(asyncMap, times(1)).entries();
}

@Test
public void testGetAllDequeueStatisticsDisabled(TestContext context) {
// Test when dequeue statistics are disabled
Async async = context.async();
dequeueStatisticCollectorDisabled.getAllDequeueStatistics().onComplete(result -> {
context.assertTrue(result.succeeded());
context.assertTrue(result.result().isEmpty());
async.complete();
});

// Verify that sharedData and asyncMap were NOT used
verifyNoInteractions(sharedData);
verifyNoInteractions(asyncMap);
}

@Test
public void testGetAllDequeueStatisticsAsyncMapFailure(TestContext context) {
// Simulate failure in sharedData.getAsyncMap
doAnswer(invocation -> {
io.vertx.core.Handler<io.vertx.core.AsyncResult<AsyncMap<String, DequeueStatistic>>> handler = invocation.getArgument(1);
handler.handle(Future.failedFuture(new RuntimeException("Failed to retrieve async map")));
return null;
}).when(sharedData).getAsyncMap(anyString(), any());

// Test when asyncMap retrieval fails
Async async = context.async();
dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> {
context.assertTrue(result.failed());
context.assertEquals("Failed to retrieve async map", result.cause().getMessage());
async.complete();
});

// Verify that sharedData.getAsyncMap was used, but asyncMap.entries() was not
verify(sharedData, times(1)).getAsyncMap(anyString(), any());
verifyNoInteractions(asyncMap);
}

@Test
public void testGetAllDequeueStatisticsEntriesFailure(TestContext context) {
// Simulate success in sharedData.getAsyncMap, but failure in asyncMap.entries
doAnswer(invocation -> {
io.vertx.core.Handler<io.vertx.core.AsyncResult<AsyncMap<String, DequeueStatistic>>> handler = invocation.getArgument(1);
handler.handle(Future.succeededFuture(asyncMap));
return null;
}).when(sharedData).getAsyncMap(anyString(), any());

when(asyncMap.entries()).thenReturn(Future.failedFuture(new RuntimeException("Failed to retrieve entries")));

// Test when asyncMap.entries fails
Async async = context.async();
dequeueStatisticCollectorEnabled.getAllDequeueStatistics().onComplete(result -> {
context.assertTrue(result.failed());
context.assertEquals("Failed to retrieve entries", result.cause().getMessage());
async.complete();
});

// Verify that sharedData.getAsyncMap and asyncMap.entries were used correctly
verify(sharedData, times(1)).getAsyncMap(anyString(), any());
verify(asyncMap, times(1)).entries();
}
}

0 comments on commit 36ff2e7

Please sign in to comment.