Skip to content

Commit

Permalink
Merge pull request #67 from oli-h/issue-66-makeProcessorTimeoutValueF…
Browse files Browse the repository at this point in the history
…unctional

Use DeliveryOption instead of a timer to detect timeouts on the EventBus
  • Loading branch information
mcweba authored Nov 2, 2018
2 parents 0cc1063 + 6354bbc commit a37cb12
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 24 deletions.
33 changes: 9 additions & 24 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
Expand Down Expand Up @@ -975,12 +976,12 @@ private void readQueue(final String queue) {
log.trace("RedisQues read queue lindex result: " + answer.result());
}
if (answer.result() != null) {
processMessageWithTimeout(queue, answer.result(), sendResult -> {
processMessageWithTimeout(queue, answer.result(), success -> {

// update the queue failure count and get a retry interval
int retryInterval = updateQueueFailureCountAndGetRetryInterval(queue, sendResult.success);
int retryInterval = updateQueueFailureCountAndGetRetryInterval(queue, success);

if (sendResult.success) {
if (success) {
// Remove the processed message from the
// queue
String key1 = getQueuesPrefix() + queue;
Expand All @@ -994,7 +995,6 @@ private void readQueue(final String queue) {
}
log.debug("RedisQues Message removed, queue " + queue + " is ready again");
myQueues.put(queue, QueueState.READY);
vertx.cancelTimer(sendResult.timeoutId);
// Notify that we are stopped in
// case it
// was the last active consumer
Expand All @@ -1021,7 +1021,6 @@ private void readQueue(final String queue) {

// reschedule
log.debug("RedisQues will re-send the message to queue '" + queue + "' in " + retryInterval + " seconds");
vertx.cancelTimer(sendResult.timeoutId);
rescheduleSendMessageAfterFailure(queue, retryInterval);
}
});
Expand Down Expand Up @@ -1052,7 +1051,7 @@ private void rescheduleSendMessageAfterFailure(final String queue, int retryInte
});
}

private void processMessageWithTimeout(final String queue, final String payload, final Handler<SendResult> handler) {
private void processMessageWithTimeout(final String queue, final String payload, final Handler<Boolean> handler) {
if (processorDelayMax > 0) {
log.info("About to process message for queue " + queue + " with a maximum delay of " + processorDelayMax + "ms");
}
Expand All @@ -1070,36 +1069,22 @@ private void processMessageWithTimeout(final String queue, final String payload,
log.trace("RedisQues process message: " + message + " for queue: " + queue + " send it to processor: " + processorAddress);
}

// start a timer, which will cancel the processing, if the consumer didn't respond
final long timeoutId = vertx.setTimer(processorTimeout, timeoutId1 -> {
log.info("RedisQues QUEUE_ERROR: Consumer timeout " + uid + " queue: " + queue);
handler.handle(new SendResult(false, timeoutId1));
});

// send the message to the consumer
eb.send(processorAddress, message, (Handler<AsyncResult<Message<JsonObject>>>) reply -> {
DeliveryOptions options = new DeliveryOptions().setSendTimeout(processorTimeout);
eb.send(processorAddress, message, options, (Handler<AsyncResult<Message<JsonObject>>>) reply -> {
Boolean success;
if (reply.succeeded()) {
success = OK.equals(reply.result().body().getString(STATUS));
} else {
log.info("RedisQues QUEUE_ERROR: Consumer failed " + uid + " queue: " + queue + " (" + reply.cause().getMessage() + ")");
success = Boolean.FALSE;
}
handler.handle(new SendResult(success, timeoutId));
handler.handle(success);
});
updateTimestamp(queue, null);
});
}

private class SendResult {
public final Boolean success;
public final Long timeoutId;

public SendResult(Boolean success, Long timeoutId) {
this.success = success;
this.timeoutId = timeoutId;
}
}

private void notifyConsumer(final String queue) {
log.debug("RedisQues Notifying consumer of queue " + queue);
final EventBus eb = vertx.eventBus();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ protected void deployRedisques(TestContext context) {
.processorAddress("processor-address")
.redisEncoding("ISO-8859-1")
.refreshPeriod(2)
.processorTimeout(10)
.build()
.asJsonObject();

Expand Down

0 comments on commit a37cb12

Please sign in to comment.