From aec37f1a464e7776c3f60ecb6d1fb019961949fe Mon Sep 17 00:00:00 2001 From: Oli Henning Date: Thu, 1 Nov 2018 17:06:39 +0100 Subject: [PATCH 1/3] Solves #66 - get rid of this RedisQues internal 'delivery timeout' timer - use Vertx' DeliveryOption when sending the message to the QueueProcessor and set the processorTimeout value (240 seconds) here This simplifies the code and effectively enables delivery timeouts > 30 seconds --- .../org/swisspush/redisques/RedisQues.java | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 65d54558..cf5738cc 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -4,6 +4,7 @@ import io.vertx.core.AsyncResult; import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.EventBus; import io.vertx.core.eventbus.Message; import io.vertx.core.eventbus.MessageConsumer; @@ -994,7 +995,6 @@ private void readQueue(final String queue) { } log.debug("RedisQues Message removed, queue " + queue + " is ready again"); myQueues.put(queue, QueueState.READY); - vertx.cancelTimer(sendResult.timeoutId); // Notify that we are stopped in // case it // was the last active consumer @@ -1021,7 +1021,6 @@ private void readQueue(final String queue) { // reschedule log.debug("RedisQues will re-send the message to queue '" + queue + "' in " + retryInterval + " seconds"); - vertx.cancelTimer(sendResult.timeoutId); rescheduleSendMessageAfterFailure(queue, retryInterval); } }); @@ -1070,21 +1069,17 @@ private void processMessageWithTimeout(final String queue, final String payload, log.trace("RedisQues process message: " + message + " for queue: " + queue + " send it to processor: " + processorAddress); } - // start a timer, which will cancel the processing, if the consumer didn't respond - final long timeoutId = vertx.setTimer(processorTimeout, timeoutId1 -> { - log.info("RedisQues QUEUE_ERROR: Consumer timeout " + uid + " queue: " + queue); - handler.handle(new SendResult(false, timeoutId1)); - }); - // send the message to the consumer - eb.send(processorAddress, message, (Handler>>) reply -> { + DeliveryOptions options = new DeliveryOptions().setSendTimeout(processorTimeout); + eb.send(processorAddress, message, options, (Handler>>) reply -> { Boolean success; if (reply.succeeded()) { success = OK.equals(reply.result().body().getString(STATUS)); } else { + log.info("RedisQues QUEUE_ERROR: Consumer failed " + uid + " queue: " + queue + " (" + reply.cause().getMessage() + ")"); success = Boolean.FALSE; } - handler.handle(new SendResult(success, timeoutId)); + handler.handle(new SendResult(success)); }); updateTimestamp(queue, null); }); @@ -1092,11 +1087,9 @@ private void processMessageWithTimeout(final String queue, final String payload, private class SendResult { public final Boolean success; - public final Long timeoutId; - public SendResult(Boolean success, Long timeoutId) { + public SendResult(Boolean success) { this.success = success; - this.timeoutId = timeoutId; } } From 07d07e31485b6b37ccd0d681ad0348de0ba6a705 Mon Sep 17 00:00:00 2001 From: Oli Henning Date: Thu, 1 Nov 2018 17:30:06 +0100 Subject: [PATCH 2/3] Solves #66 - need to set a small timeout - otherwise the test would now take >240 seconds (>4 minutes) --- .../java/org/swisspush/redisques/RedisQuesProcessorTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/org/swisspush/redisques/RedisQuesProcessorTest.java b/src/test/java/org/swisspush/redisques/RedisQuesProcessorTest.java index 8afdc484..39bb20da 100644 --- a/src/test/java/org/swisspush/redisques/RedisQuesProcessorTest.java +++ b/src/test/java/org/swisspush/redisques/RedisQuesProcessorTest.java @@ -70,6 +70,7 @@ protected void deployRedisques(TestContext context) { .processorAddress("processor-address") .redisEncoding("ISO-8859-1") .refreshPeriod(2) + .processorTimeout(10) .build() .asJsonObject(); From 6354bbcd45aed8535fb68ceaf759a62d62ecdf67 Mon Sep 17 00:00:00 2001 From: Oli Henning Date: Fri, 2 Nov 2018 13:42:47 +0100 Subject: [PATCH 3/3] Solves #66 - replace the (now useless) inner class "SendResult" with a simple Boolean --- .../org/swisspush/redisques/RedisQues.java | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index cf5738cc..a3030be0 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -976,12 +976,12 @@ private void readQueue(final String queue) { log.trace("RedisQues read queue lindex result: " + answer.result()); } if (answer.result() != null) { - processMessageWithTimeout(queue, answer.result(), sendResult -> { + processMessageWithTimeout(queue, answer.result(), success -> { // update the queue failure count and get a retry interval - int retryInterval = updateQueueFailureCountAndGetRetryInterval(queue, sendResult.success); + int retryInterval = updateQueueFailureCountAndGetRetryInterval(queue, success); - if (sendResult.success) { + if (success) { // Remove the processed message from the // queue String key1 = getQueuesPrefix() + queue; @@ -1051,7 +1051,7 @@ private void rescheduleSendMessageAfterFailure(final String queue, int retryInte }); } - private void processMessageWithTimeout(final String queue, final String payload, final Handler handler) { + private void processMessageWithTimeout(final String queue, final String payload, final Handler handler) { if (processorDelayMax > 0) { log.info("About to process message for queue " + queue + " with a maximum delay of " + processorDelayMax + "ms"); } @@ -1079,20 +1079,12 @@ private void processMessageWithTimeout(final String queue, final String payload, log.info("RedisQues QUEUE_ERROR: Consumer failed " + uid + " queue: " + queue + " (" + reply.cause().getMessage() + ")"); success = Boolean.FALSE; } - handler.handle(new SendResult(success)); + handler.handle(success); }); updateTimestamp(queue, null); }); } - private class SendResult { - public final Boolean success; - - public SendResult(Boolean success) { - this.success = success; - } - } - private void notifyConsumer(final String queue) { log.debug("RedisQues Notifying consumer of queue " + queue); final EventBus eb = vertx.eventBus();