Skip to content

Commit

Permalink
fixed tests, add next dequeue due time
Browse files Browse the repository at this point in the history
  • Loading branch information
Xin Zheng committed Nov 23, 2023
1 parent b582b8b commit a9e3281
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 22 deletions.
7 changes: 5 additions & 2 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ private enum QueueState {
public class DequeueStatistic{
public Long lastDequeueAttemptTimestamp;
public Long lastDequeueSuccessTimestamp;
public Long nextDequeueDueTimestamp;
}

public RedisQues() {
Expand Down Expand Up @@ -683,8 +684,9 @@ private void rescheduleSendMessageAfterFailure(final String queueName, int retry
if (log.isTraceEnabled()) {
log.trace("RedsQues reschedule after failure for queue: {}", queueName);
}

vertx.setTimer(retryInSeconds * 1000L, timerId -> {
long retryDelayInMills = retryInSeconds * 1000L;
dequeueStatistic.get(queueName).nextDequeueDueTimestamp = new Date().getTime() + retryDelayInMills;
vertx.setTimer(retryDelayInMills, timerId -> {
if (log.isDebugEnabled()) {
log.debug("RedisQues re-notify the consumer of queue '{}' at {}", queueName, new Date(System.currentTimeMillis()));
}
Expand Down Expand Up @@ -722,6 +724,7 @@ private void processMessageWithTimeout(final String queue, final String payload,
if (reply.succeeded()) {
success = OK.equals(reply.result().body().getString(STATUS));
dequeueStatistic.get(queue).lastDequeueSuccessTimestamp = new Date().getTime();
dequeueStatistic.get(queue).nextDequeueDueTimestamp = null;
} else {
log.info("RedisQues QUEUE_ERROR: Consumer failed {} queue: {} ({})", uid, queue, reply.cause().getMessage());
success = Boolean.FALSE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,9 +512,9 @@ private void getMonitorInformation(RoutingContext ctx) {
if (limit > 0) {
queuesList = limitJsonQueueArray(queuesList, limit);
}
Map<String, RedisQues.DequeueStatistic> processTimeList = redisQues.getDequeueStatistic();
if (processTimeList.size() > 0){
fillTheProcessTimeToQueuesList(queuesList, processTimeList);
Map<String, RedisQues.DequeueStatistic> dequeueProcessStatistic = redisQues.getDequeueStatistic();
if (dequeueProcessStatistic.size() > 0) {
fillStatisticToQueuesList(queuesList, dequeueProcessStatistic);
}
JsonObject resultObject = new JsonObject();
resultObject.put(QUEUES, queuesList);
Expand All @@ -533,19 +533,23 @@ private void getMonitorInformation(RoutingContext ctx) {
});
}

private void fillTheProcessTimeToQueuesList(List<JsonObject> queuesList, Map<String, RedisQues.DequeueStatistic> processTimeList) {
private void fillStatisticToQueuesList(List<JsonObject> queuesList, Map<String, RedisQues.DequeueStatistic> dequeueProcessStatistic) {
queuesList.forEach(entries -> {
String queueName = entries.getString(MONITOR_QUEUE_NAME);
entries.put(MONITOR_QUEUE_LAST_DEQUEUE_ATTEMPT, "");
entries.put(MONITOR_QUEUE_LAST_DEQUEUE_SUCCESS, "");
if (processTimeList.containsKey(queueName)) {
RedisQues.DequeueStatistic dequeueStatistic = processTimeList.get(queueName);
entries.put(MONITOR_QUEUE_NEXT_DEQUEUE_DUE_TS, "");
if (dequeueProcessStatistic.containsKey(queueName)) {
RedisQues.DequeueStatistic dequeueStatistic = dequeueProcessStatistic.get(queueName);
if (dequeueStatistic.lastDequeueAttemptTimestamp != null) {
entries.put(MONITOR_QUEUE_LAST_DEQUEUE_ATTEMPT, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueAttemptTimestamp)));
}
if (dequeueStatistic.lastDequeueAttemptTimestamp != null) {
if (dequeueStatistic.lastDequeueSuccessTimestamp != null) {
entries.put(MONITOR_QUEUE_LAST_DEQUEUE_SUCCESS, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueSuccessTimestamp)));
}
if (dequeueStatistic.nextDequeueDueTimestamp != null) {
entries.put(MONITOR_QUEUE_NEXT_DEQUEUE_DUE_TS, dequeueStatistic.nextDequeueDueTimestamp.toString());
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class RedisquesAPI {
public static final String MONITOR_QUEUE_SIZE = "size";
public static final String MONITOR_QUEUE_LAST_DEQUEUE_ATTEMPT = "lastDequeueAttempt";
public static final String MONITOR_QUEUE_LAST_DEQUEUE_SUCCESS = "lastDequeueSuccess";
public static final String MONITOR_QUEUE_NEXT_DEQUEUE_DUE_TS = "nextDequeueDueTimestamp";
public static final String STATISTIC_QUEUE_FAILURES = "failures";
public static final String STATISTIC_QUEUE_BACKPRESSURE = "backpressureTime";
public static final String STATISTIC_QUEUE_SLOWDOWN = "slowdownTime";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.swisspush.redisques.handler;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sun.istack.NotNull;
import com.sun.istack.Nullable;
import io.restassured.RestAssured;
Expand Down Expand Up @@ -1867,10 +1870,10 @@ public void deleteSingleLockEncoded(TestContext context) {
}

@Test
public void getMonitorInformation(TestContext context) {
public void getMonitorInformation(TestContext context) throws JsonProcessingException {
Async async = context.async();
flushAll();

ObjectMapper jsonMapper = new ObjectMapper();
when().get("/queuing/monitor").then().assertThat().statusCode(200)
.body("queues", empty());

Expand Down Expand Up @@ -1900,9 +1903,9 @@ public void getMonitorInformation(TestContext context) {
" }\n" +
" ]\n" +
"}";

when().get("/queuing/monitor").then().assertThat().statusCode(200)
.body(equalTo(new JsonObject(expectedNoEmptyQueuesNoLimit).toString()));
JsonNode expectedStaticJson = jsonMapper.readTree(expectedNoEmptyQueuesNoLimit);
JsonNode receivedJson = jsonMapper.readTree( when().get("/queuing/monitor").then().assertThat().statusCode(200).extract().asString());
verifyResponse(context, expectedStaticJson, receivedJson);

String expectedWithEmptyQueuesNoLimit = "{\n" +
" \"queues\": [\n" +
Expand All @@ -1921,8 +1924,9 @@ public void getMonitorInformation(TestContext context) {
" ]\n" +
"}";

when().get("/queuing/monitor?emptyQueues").then().assertThat().statusCode(200)
.body(equalTo(new JsonObject(expectedWithEmptyQueuesNoLimit).toString()));
expectedStaticJson = jsonMapper.readTree(expectedWithEmptyQueuesNoLimit);
receivedJson = jsonMapper.readTree( when().get("/queuing/monitor?emptyQueues").then().assertThat().statusCode(200).extract().asString());
verifyResponse(context, expectedStaticJson, receivedJson);

String expectedNoEmptyQueuesAndLimit3 = "{\n" +
" \"queues\": [\n" +
Expand All @@ -1937,8 +1941,9 @@ public void getMonitorInformation(TestContext context) {
" ]\n" +
"}";

when().get("/queuing/monitor?limit=3").then().assertThat().statusCode(200)
.body(equalTo(new JsonObject(expectedNoEmptyQueuesAndLimit3).toString()));
expectedStaticJson = jsonMapper.readTree(expectedNoEmptyQueuesAndLimit3);
receivedJson = jsonMapper.readTree( when().get("/queuing/monitor?limit=3").then().assertThat().statusCode(200).extract().asString());
verifyResponse(context, expectedStaticJson, receivedJson);

String expectedWithEmptyQueuesAndLimit3 = "{\n" +
" \"queues\": [\n" +
Expand All @@ -1957,8 +1962,9 @@ public void getMonitorInformation(TestContext context) {
" ]\n" +
"}";

when().get("/queuing/monitor?limit=3&emptyQueues").then().assertThat().statusCode(200)
.body(equalTo(new JsonObject(expectedWithEmptyQueuesAndLimit3).toString()));
expectedStaticJson = jsonMapper.readTree(expectedWithEmptyQueuesAndLimit3);
receivedJson = jsonMapper.readTree( when().get("/queuing/monitor?limit=3&emptyQueues").then().assertThat().statusCode(200).extract().asString());
verifyResponse(context, expectedStaticJson, receivedJson);

String expectedWithEmptyQueuesAndInvalidLimit = "{\n" +
" \"queues\": [\n" +
Expand All @@ -1977,12 +1983,26 @@ public void getMonitorInformation(TestContext context) {
" ]\n" +
"}";

when().get("/queuing/monitor?limit=xx99xx&emptyQueues").then().assertThat().statusCode(200)
.body(equalTo(new JsonObject(expectedWithEmptyQueuesAndInvalidLimit).toString()));
expectedStaticJson = jsonMapper.readTree(expectedWithEmptyQueuesAndInvalidLimit);
receivedJson = jsonMapper.readTree( when().get("/queuing/monitor?limit=limit=xx99xx&emptyQueues").then().assertThat().statusCode(200).extract().asString());
verifyResponse(context, expectedStaticJson, receivedJson);

async.complete();
}

private static void verifyResponse(TestContext context, JsonNode expectedStaticJson, JsonNode receivedJson) {
context.assertEquals(expectedStaticJson.size(), receivedJson.size());
context.assertEquals(expectedStaticJson.get("queues").size(), receivedJson.get("queues").size());
for (int i = 0; i < expectedStaticJson.get("queues").size(); i++){
context.assertEquals(expectedStaticJson.get("queues").get(i).get("name"), receivedJson.get("queues").get(i).get("name"));
context.assertEquals(expectedStaticJson.get("queues").get(i).get("size"), receivedJson.get("queues").get(i).get("size"));

context.assertFalse(receivedJson.get("queues").get(i).get("lastDequeueAttempt").asText().isEmpty());
context.assertTrue(receivedJson.get("queues").get(i).get("lastDequeueSuccess").asText().isEmpty()); // No handlers for address processor-address
context.assertFalse(receivedJson.get("queues").get(i).get("nextDequeueDueTimestamp").asText().isEmpty()); // So will retry
}
}

@Test
public void getStatisticsEmpty(TestContext context) {
flushAll();
Expand Down

0 comments on commit a9e3281

Please sign in to comment.