From a9e3281bef88159c362d4f0db87157f814dca00d Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Thu, 23 Nov 2023 14:03:33 +0700 Subject: [PATCH] fixed tests, add next dequeue due time --- .../org/swisspush/redisques/RedisQues.java | 7 ++- .../handler/RedisquesHttpRequestHandler.java | 18 +++++--- .../redisques/util/RedisquesAPI.java | 1 + .../RedisquesHttpRequestHandlerTest.java | 46 +++++++++++++------ 4 files changed, 50 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 4aa0612f..36872cab 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -99,6 +99,7 @@ private enum QueueState { public class DequeueStatistic{ public Long lastDequeueAttemptTimestamp; public Long lastDequeueSuccessTimestamp; + public Long nextDequeueDueTimestamp; } public RedisQues() { @@ -683,8 +684,9 @@ private void rescheduleSendMessageAfterFailure(final String queueName, int retry if (log.isTraceEnabled()) { log.trace("RedsQues reschedule after failure for queue: {}", queueName); } - - vertx.setTimer(retryInSeconds * 1000L, timerId -> { + long retryDelayInMills = retryInSeconds * 1000L; + dequeueStatistic.get(queueName).nextDequeueDueTimestamp = new Date().getTime() + retryDelayInMills; + vertx.setTimer(retryDelayInMills, timerId -> { if (log.isDebugEnabled()) { log.debug("RedisQues re-notify the consumer of queue '{}' at {}", queueName, new Date(System.currentTimeMillis())); } @@ -722,6 +724,7 @@ private void processMessageWithTimeout(final String queue, final String payload, if (reply.succeeded()) { success = OK.equals(reply.result().body().getString(STATUS)); dequeueStatistic.get(queue).lastDequeueSuccessTimestamp = new Date().getTime(); + dequeueStatistic.get(queue).nextDequeueDueTimestamp = null; } else { log.info("RedisQues QUEUE_ERROR: Consumer failed {} queue: {} ({})", uid, queue, reply.cause().getMessage()); success = Boolean.FALSE; diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index 1fe711c1..34779bca 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -512,9 +512,9 @@ private void getMonitorInformation(RoutingContext ctx) { if (limit > 0) { queuesList = limitJsonQueueArray(queuesList, limit); } - Map processTimeList = redisQues.getDequeueStatistic(); - if (processTimeList.size() > 0){ - fillTheProcessTimeToQueuesList(queuesList, processTimeList); + Map dequeueProcessStatistic = redisQues.getDequeueStatistic(); + if (dequeueProcessStatistic.size() > 0) { + fillStatisticToQueuesList(queuesList, dequeueProcessStatistic); } JsonObject resultObject = new JsonObject(); resultObject.put(QUEUES, queuesList); @@ -533,19 +533,23 @@ private void getMonitorInformation(RoutingContext ctx) { }); } - private void fillTheProcessTimeToQueuesList(List queuesList, Map processTimeList) { + private void fillStatisticToQueuesList(List queuesList, Map dequeueProcessStatistic) { queuesList.forEach(entries -> { String queueName = entries.getString(MONITOR_QUEUE_NAME); entries.put(MONITOR_QUEUE_LAST_DEQUEUE_ATTEMPT, ""); entries.put(MONITOR_QUEUE_LAST_DEQUEUE_SUCCESS, ""); - if (processTimeList.containsKey(queueName)) { - RedisQues.DequeueStatistic dequeueStatistic = processTimeList.get(queueName); + entries.put(MONITOR_QUEUE_NEXT_DEQUEUE_DUE_TS, ""); + if (dequeueProcessStatistic.containsKey(queueName)) { + RedisQues.DequeueStatistic dequeueStatistic = dequeueProcessStatistic.get(queueName); if (dequeueStatistic.lastDequeueAttemptTimestamp != null) { entries.put(MONITOR_QUEUE_LAST_DEQUEUE_ATTEMPT, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueAttemptTimestamp))); } - if (dequeueStatistic.lastDequeueAttemptTimestamp != null) { + if (dequeueStatistic.lastDequeueSuccessTimestamp != null) { entries.put(MONITOR_QUEUE_LAST_DEQUEUE_SUCCESS, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueSuccessTimestamp))); } + if (dequeueStatistic.nextDequeueDueTimestamp != null) { + entries.put(MONITOR_QUEUE_NEXT_DEQUEUE_DUE_TS, dequeueStatistic.nextDequeueDueTimestamp.toString()); + } } }); } diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesAPI.java b/src/main/java/org/swisspush/redisques/util/RedisquesAPI.java index 0bbd8d6e..77c0bf38 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesAPI.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesAPI.java @@ -43,6 +43,7 @@ public class RedisquesAPI { public static final String MONITOR_QUEUE_SIZE = "size"; public static final String MONITOR_QUEUE_LAST_DEQUEUE_ATTEMPT = "lastDequeueAttempt"; public static final String MONITOR_QUEUE_LAST_DEQUEUE_SUCCESS = "lastDequeueSuccess"; + public static final String MONITOR_QUEUE_NEXT_DEQUEUE_DUE_TS = "nextDequeueDueTimestamp"; public static final String STATISTIC_QUEUE_FAILURES = "failures"; public static final String STATISTIC_QUEUE_BACKPRESSURE = "backpressureTime"; public static final String STATISTIC_QUEUE_SLOWDOWN = "slowdownTime"; diff --git a/src/test/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandlerTest.java b/src/test/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandlerTest.java index 233d603c..a69b5008 100644 --- a/src/test/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandlerTest.java +++ b/src/test/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandlerTest.java @@ -1,5 +1,8 @@ package org.swisspush.redisques.handler; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.sun.istack.NotNull; import com.sun.istack.Nullable; import io.restassured.RestAssured; @@ -1867,10 +1870,10 @@ public void deleteSingleLockEncoded(TestContext context) { } @Test - public void getMonitorInformation(TestContext context) { + public void getMonitorInformation(TestContext context) throws JsonProcessingException { Async async = context.async(); flushAll(); - + ObjectMapper jsonMapper = new ObjectMapper(); when().get("/queuing/monitor").then().assertThat().statusCode(200) .body("queues", empty()); @@ -1900,9 +1903,9 @@ public void getMonitorInformation(TestContext context) { " }\n" + " ]\n" + "}"; - - when().get("/queuing/monitor").then().assertThat().statusCode(200) - .body(equalTo(new JsonObject(expectedNoEmptyQueuesNoLimit).toString())); + JsonNode expectedStaticJson = jsonMapper.readTree(expectedNoEmptyQueuesNoLimit); + JsonNode receivedJson = jsonMapper.readTree( when().get("/queuing/monitor").then().assertThat().statusCode(200).extract().asString()); + verifyResponse(context, expectedStaticJson, receivedJson); String expectedWithEmptyQueuesNoLimit = "{\n" + " \"queues\": [\n" + @@ -1921,8 +1924,9 @@ public void getMonitorInformation(TestContext context) { " ]\n" + "}"; - when().get("/queuing/monitor?emptyQueues").then().assertThat().statusCode(200) - .body(equalTo(new JsonObject(expectedWithEmptyQueuesNoLimit).toString())); + expectedStaticJson = jsonMapper.readTree(expectedWithEmptyQueuesNoLimit); + receivedJson = jsonMapper.readTree( when().get("/queuing/monitor?emptyQueues").then().assertThat().statusCode(200).extract().asString()); + verifyResponse(context, expectedStaticJson, receivedJson); String expectedNoEmptyQueuesAndLimit3 = "{\n" + " \"queues\": [\n" + @@ -1937,8 +1941,9 @@ public void getMonitorInformation(TestContext context) { " ]\n" + "}"; - when().get("/queuing/monitor?limit=3").then().assertThat().statusCode(200) - .body(equalTo(new JsonObject(expectedNoEmptyQueuesAndLimit3).toString())); + expectedStaticJson = jsonMapper.readTree(expectedNoEmptyQueuesAndLimit3); + receivedJson = jsonMapper.readTree( when().get("/queuing/monitor?limit=3").then().assertThat().statusCode(200).extract().asString()); + verifyResponse(context, expectedStaticJson, receivedJson); String expectedWithEmptyQueuesAndLimit3 = "{\n" + " \"queues\": [\n" + @@ -1957,8 +1962,9 @@ public void getMonitorInformation(TestContext context) { " ]\n" + "}"; - when().get("/queuing/monitor?limit=3&emptyQueues").then().assertThat().statusCode(200) - .body(equalTo(new JsonObject(expectedWithEmptyQueuesAndLimit3).toString())); + expectedStaticJson = jsonMapper.readTree(expectedWithEmptyQueuesAndLimit3); + receivedJson = jsonMapper.readTree( when().get("/queuing/monitor?limit=3&emptyQueues").then().assertThat().statusCode(200).extract().asString()); + verifyResponse(context, expectedStaticJson, receivedJson); String expectedWithEmptyQueuesAndInvalidLimit = "{\n" + " \"queues\": [\n" + @@ -1977,12 +1983,26 @@ public void getMonitorInformation(TestContext context) { " ]\n" + "}"; - when().get("/queuing/monitor?limit=xx99xx&emptyQueues").then().assertThat().statusCode(200) - .body(equalTo(new JsonObject(expectedWithEmptyQueuesAndInvalidLimit).toString())); + expectedStaticJson = jsonMapper.readTree(expectedWithEmptyQueuesAndInvalidLimit); + receivedJson = jsonMapper.readTree( when().get("/queuing/monitor?limit=limit=xx99xx&emptyQueues").then().assertThat().statusCode(200).extract().asString()); + verifyResponse(context, expectedStaticJson, receivedJson); async.complete(); } + private static void verifyResponse(TestContext context, JsonNode expectedStaticJson, JsonNode receivedJson) { + context.assertEquals(expectedStaticJson.size(), receivedJson.size()); + context.assertEquals(expectedStaticJson.get("queues").size(), receivedJson.get("queues").size()); + for (int i = 0; i < expectedStaticJson.get("queues").size(); i++){ + context.assertEquals(expectedStaticJson.get("queues").get(i).get("name"), receivedJson.get("queues").get(i).get("name")); + context.assertEquals(expectedStaticJson.get("queues").get(i).get("size"), receivedJson.get("queues").get(i).get("size")); + + context.assertFalse(receivedJson.get("queues").get(i).get("lastDequeueAttempt").asText().isEmpty()); + context.assertTrue(receivedJson.get("queues").get(i).get("lastDequeueSuccess").asText().isEmpty()); // No handlers for address processor-address + context.assertFalse(receivedJson.get("queues").get(i).get("nextDequeueDueTimestamp").asText().isEmpty()); // So will retry + } + } + @Test public void getStatisticsEmpty(TestContext context) { flushAll();