diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index a89977e8..8282f964 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -129,22 +129,15 @@ private void handleRegistrationRequest(Message msg) { log.warn("Got message without queue name while handleRegistrationRequest."); // IMO we should 'fail()' here. But we don't, to keep backward compatibility. } - if (log.isDebugEnabled()) { - log.debug( - "RedisQues Got registration request for queue {} from consumer: {}", queueName, uid); - } + log.debug("RedisQues Got registration request for queue {} from consumer: {}", queueName, uid); // Try to register for this queue redisSetWithOptions(consumersPrefix + queueName, uid, true, consumerLockTime, event -> { if (event.succeeded()) { String value = event.result() != null ? event.result().toString() : null; - if (log.isTraceEnabled()) { - log.trace("RedisQues setxn result: {} for queue: {}", value, queueName); - } + log.trace("RedisQues setxn result: {} for queue: {}", value, queueName); if ("OK".equals(value)) { // I am now the registered consumer for this queue. - if (log.isDebugEnabled()) { - log.debug("RedisQues Now registered for queue {}", queueName); - } + log.debug("RedisQues Now registered for queue {}", queueName); myQueues.put(queueName, QueueState.READY); consume(queueName); } else { @@ -185,7 +178,7 @@ public void start(Promise promise) { initialize(); promise.complete(); } else { - promise.fail(event.cause()); + promise.fail(new Exception(event.cause())); } }); } @@ -275,8 +268,11 @@ private void registerActiveQueueRegistrationRefresh() { final String consumer = Objects.toString(getConsumerEvent.result(), ""); if (uid.equals(consumer)) { log.debug("RedisQues Periodic consumer refresh for active queue {}", queue); - refreshRegistration(queue, refreshRegistrationEvent -> - updateTimestamp(queue, null)); + refreshRegistration(queue, ev -> { + if (ev.failed()) + log.warn("TODO error handling", new Exception(ev.cause())); + updateTimestamp(queue, null); + }); } else { log.debug("RedisQues Removing queue {} from the list", queue); myQueues.remove(queue); @@ -291,14 +287,10 @@ private void registerActiveQueueRegistrationRefresh() { private Handler> operationsHandler() { return event -> { final JsonObject body = event.body(); - if (null == body) { - log.warn("Got msg with empty body from event bus. We'll run directly in a NullPointerException now. address={} replyAddress={} ", event.address(), event.replyAddress()); - // IMO we should 'fail()' here. But we don't, to keep backward compatibility. - } + if( body == null ) + throw new NullPointerException("Why is body empty? addr=" + event.address() + " replyAddr=" + event.replyAddress()); String operation = body.getString(OPERATION); - if (log.isTraceEnabled()) { - log.trace("RedisQues got operation: {}", operation); - } + log.trace("RedisQues got operation: {}", operation); QueueOperation queueOperation = QueueOperation.fromString(operation); if (queueOperation == null) { @@ -315,10 +307,7 @@ private Handler> operationsHandler() { resetConsumers(); return; case stop: - gracefulStop(event1 -> { - JsonObject reply = new JsonObject(); - reply.put(STATUS, OK); - }); + gracefulStop(aVoid -> {/*no-op*/}); return; } @@ -352,19 +341,18 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s } private void registerQueueCheck() { - vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> - { - redisProvider.connection().onSuccess(conn -> { - conn.send(Request.cmd(Command.SET, queueCheckLastexecKey, System.currentTimeMillis(), - "NX", "EX", configurationProvider.configuration().getCheckInterval())) - .onFailure(throwable -> log.error("Unexepected queue check result")).onSuccess(response -> { - log.info("periodic queue check is triggered now"); - checkQueues(); - }); - }).onFailure(throwable -> { - log.warn("Redis: Failed to trigger queue check.", throwable); - });; - + vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> { + redisProvider.connection() + .onFailure(ex -> log.error("TODO error handling", new Exception(ex))) + .onSuccess(conn -> { + conn.send(Request.cmd(Command.SET, queueCheckLastexecKey, System.currentTimeMillis(), + "NX", "EX", configurationProvider.configuration().getCheckInterval())) + .onFailure(ex -> log.error("Unexpected queue check result", new Exception(ex))) + .onSuccess(response -> { + log.info("periodic queue check is triggered now"); + checkQueues(); + }); + }); }); } @@ -383,45 +371,41 @@ public void stop() { } private void gracefulStop(final Handler doneHandler) { - consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> - unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { - stoppedHandler = doneHandler; - if (myQueues.keySet().isEmpty()) { - doneHandler.handle(null); - } - }))); + consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> { + if( event.failed() ) log.warn("TODO error handling", new Exception(event.cause())); + unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { + if( unregisterEvent.failed() ) + log.warn("TODO error handling", new Exception(unregisterEvent.cause())); + stoppedHandler = doneHandler; + if (myQueues.keySet().isEmpty()) { + doneHandler.handle(null); + } + }); + })); } private Future unregisterConsumers(boolean force) { final Promise result = Promise.promise(); - if (log.isTraceEnabled()) { - log.trace("RedisQues unregister consumers force: {}", force); - } - log.debug("RedisQues Unregistering consumers"); - final List futureList = new ArrayList<>(); + log.debug("RedisQues unregister consumers. force={}", force); + final List futureList = new ArrayList<>(myQueues.size()); for (final Map.Entry entry : myQueues.entrySet()) { final Promise promise = Promise.promise(); futureList.add(promise.future()); final String queue = entry.getKey(); if (force || entry.getValue() == QueueState.READY) { - if (log.isTraceEnabled()) { - log.trace("RedisQues unregister consumers queue: {}", queue); - } + log.trace("RedisQues unregister consumers queue: {}", queue); refreshRegistration(queue, event -> { + if( event.failed() ) log.warn("TODO error handling", new Exception(event.cause())); // Make sure that I am still the registered consumer String consumerKey = consumersPrefix + queue; - if (log.isTraceEnabled()) { - log.trace("RedisQues unregister consumers get: {}", consumerKey); - } + log.trace("RedisQues unregister consumers get: {}", consumerKey); redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(consumerKey, getEvent -> { if (getEvent.failed()) { log.warn("Failed to retrieve consumer '{}'.", consumerKey, getEvent.cause()); // IMO we should 'fail()' here. But we don't, to keep backward compatibility. } String consumer = Objects.toString(getEvent.result(), ""); - if (log.isTraceEnabled()) { - log.trace("RedisQues unregister consumers get result: {}", consumer); - } + log.trace("RedisQues unregister consumers get result: {}", consumer); if (uid.equals(consumer)) { log.debug("RedisQues remove consumer: {}", uid); myQueues.remove(queue); @@ -436,7 +420,10 @@ private Future unregisterConsumers(boolean force) { promise.complete(); } } - CompositeFuture.all(futureList).onComplete(event1 -> result.complete()); + CompositeFuture.all(futureList).onComplete(ev -> { + if( ev.failed() ) log.warn("TODO error handling", new Exception(ev.cause())); + result.complete(); + }); return result.future(); } @@ -447,9 +434,7 @@ private Future unregisterConsumers(boolean force) { private void resetConsumers() { log.debug("RedisQues Resetting consumers"); String keysPattern = consumersPrefix + "*"; - if (log.isTraceEnabled()) { - log.trace("RedisQues reset consumers keys: {}", keysPattern); - } + log.trace("RedisQues reset consumers keys: {}", keysPattern); redisProvider.redis().onSuccess(redisAPI -> redisAPI.keys(keysPattern, keysResult -> { if (keysResult.failed() || keysResult.result() == null) { log.error("Unable to get redis keys of consumers", keysResult.cause()); @@ -466,8 +451,8 @@ private void resetConsumers() { } redisAPI.del(args, delManyResult -> { if (delManyResult.succeeded()) { - Long count = delManyResult.result().toLong(); - log.debug("Successfully reset {} consumers", count); + if( log.isDebugEnabled() ) + log.debug("Successfully reset {} consumers", delManyResult.result().toLong()); } else { log.error("Unable to delete redis keys of consumers"); } @@ -478,9 +463,7 @@ private void resetConsumers() { private Future consume(final String queueName) { final Promise promise = Promise.promise(); - if (log.isDebugEnabled()) { - log.debug("RedisQues Requested to consume queue {}", queueName); - } + log.debug("RedisQues Requested to consume queue {}", queueName); refreshRegistration(queueName, event -> { if (event.failed()) { log.warn("Failed to refresh registration for queue '{}'.", queueName, event.cause()); @@ -488,23 +471,17 @@ private Future consume(final String queueName) { } // Make sure that I am still the registered consumer String consumerKey = consumersPrefix + queueName; - if (log.isTraceEnabled()) { - log.trace("RedisQues consume get: {}", consumerKey); - } + log.trace("RedisQues consume get: {}", consumerKey); redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(consumerKey, event1 -> { if (event1.failed()) { log.error("Unable to get consumer for queue " + queueName, event1.cause()); return; } String consumer = Objects.toString(event1.result(), ""); - if (log.isTraceEnabled()) { - log.trace("RedisQues refresh registration consumer: {}", consumer); - } + log.trace("RedisQues refresh registration consumer: {}", consumer); if (uid.equals(consumer)) { QueueState state = myQueues.get(queueName); - if (log.isTraceEnabled()) { - log.trace("RedisQues consumer: {} queue: {} state: {}", consumer, queueName, state); - } + log.trace("RedisQues consumer: {} queue: {} state: {}", consumer, queueName, state); // Get the next message only once the previous has // been completely processed if (state != QueueState.CONSUMING) { @@ -514,21 +491,25 @@ private Future consume(final String queueName) { // consumer was restarted log.warn("Received request to consume from a queue I did not know about: {}", queueName); } - if (log.isDebugEnabled()) { - log.debug("RedisQues Starting to consume queue {}", queueName); - } - readQueue(queueName).onComplete(readQueueEvent -> promise.complete()); + log.debug("RedisQues Starting to consume queue {}", queueName); + readQueue(queueName).onComplete(readQueueEvent -> { + if( readQueueEvent.failed() ) + log.warn("TODO error handling", new Exception(readQueueEvent.cause())); + promise.complete(); + }); } else { - if (log.isDebugEnabled()) { - log.debug("RedisQues Queue {} is already being consumed", queueName); - } + log.debug("RedisQues Queue {} is already being consumed", queueName); promise.complete(); } } else { // Somehow registration changed. Let's renotify. log.warn("Registration for queue {} has changed to {}", queueName, consumer); myQueues.remove(queueName); - notifyConsumer(queueName).onComplete(notifyConsumerEvent -> promise.complete()); + notifyConsumer(queueName).onComplete(notifyConsumerEvent -> { + if( notifyConsumerEvent.failed() ) + log.warn("TODO error handling", notifyConsumerEvent.cause()); + promise.complete(); + }); } })) .onFailure(throwable -> log.error("Redis: Unable to get consumer for queue " + queueName, throwable)); @@ -560,19 +541,13 @@ private Future isQueueLocked(final String queue) { private Future readQueue(final String queueName) { final Promise promise = Promise.promise(); - if (log.isTraceEnabled()) { - log.trace("RedisQues read queue: {}", queueName); - } + log.trace("RedisQues read queue: {}", queueName); String queueKey = queuesPrefix + queueName; - if (log.isTraceEnabled()) { - log.trace("RedisQues read queue lindex: {}", queueKey); - } + log.trace("RedisQues read queue lindex: {}", queueKey); isQueueLocked(queueName).onComplete(lockAnswer -> { - if (lockAnswer.failed()) { - log.error("Failed to check if queue '{}' is locked", queueName, lockAnswer.cause()); - // We should return here. See: "https://softwareengineering.stackexchange.com/a/190535" - } + if( lockAnswer.failed() ) + throw new UnsupportedOperationException("TODO error handling " + queueName, lockAnswer.cause()); boolean locked = lockAnswer.result(); if (!locked) { redisProvider.redis().onSuccess(redisAPI -> redisAPI.lindex(queueKey, "0", answer -> { @@ -580,20 +555,17 @@ private Future readQueue(final String queueName) { log.error("Failed to peek queue '{}'", queueName, answer.cause()); // We should return here. See: "https://softwareengineering.stackexchange.com/a/190535" } - if (log.isTraceEnabled()) { - log.trace("RedisQues read queue lindex result: {}", answer.result()); - } - if (answer.result() != null) { - processMessageWithTimeout(queueName, answer.result().toString(), success -> { + Response response = answer.result(); + log.trace("RedisQues read queue lindex result: {}", response); + if (response != null) { + processMessageWithTimeout(queueName, response.toString(), success -> { // update the queue failure count and get a retry interval int retryInterval = updateQueueFailureCountAndGetRetryInterval(queueName, success); if (success) { // Remove the processed message from the queue - if (log.isTraceEnabled()) { - log.trace("RedisQues read queue lpop: {}", queueKey); - } + log.trace("RedisQues read queue lpop: {}", queueKey); redisAPI.lpop(Collections.singletonList(queueKey), jsonAnswer -> { if (jsonAnswer.failed()) { log.error("Failed to pop from queue '{}'", queueName, jsonAnswer.cause()); @@ -604,13 +576,17 @@ private Future readQueue(final String queueName) { Handler nextMsgHandler = event -> { // Issue notification to consume next message if any - if (log.isTraceEnabled()) { - log.trace("RedisQues read queue: {}", queueKey); - } + log.trace("RedisQues read queue: {}", queueKey); redisAPI.llen(queueKey, answer1 -> { if (answer1.succeeded() && answer1.result() != null && answer1.result().toInteger() > 0) { - notifyConsumer(queueName).onComplete(event1 -> promise.complete()); + notifyConsumer(queueName).onComplete(event1 -> { + if( event1.failed() ) + log.warn("TODO error handling", new Exception(event1.cause())); + promise.complete(); + }); } else { + if( answer1.failed() ) + log.warn("TODO error handling", new Exception(answer1.cause())); promise.complete(); } }); @@ -619,6 +595,8 @@ private Future readQueue(final String queueName) { // Notify that we are stopped in case it was the last active consumer if (stoppedHandler != null) { unregisterConsumers(false).onComplete(event -> { + if( event.failed() ) + log.warn("TODO error handling", new Exception(event.cause())); if (myQueues.isEmpty()) { stoppedHandler.handle(null); } @@ -630,20 +608,16 @@ private Future readQueue(final String queueName) { }); } else { // Failed. Message will be kept in queue and retried later - if (log.isDebugEnabled()) { - log.debug("RedisQues Processing failed for queue {}", queueName); - // reschedule - log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval); - } + log.debug("RedisQues Processing failed for queue {}", queueName); + // reschedule + log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval); rescheduleSendMessageAfterFailure(queueName, retryInterval); promise.complete(); } }); } else { // This can happen when requests to consume happen at the same moment the queue is emptied. - if (log.isDebugEnabled()) { - log.debug("Got a request to consume from empty queue {}", queueName); - } + log.debug("Got a request to consume from empty queue {}", queueName); myQueues.put(queueName, QueueState.READY); promise.complete(); } @@ -654,9 +628,7 @@ private Future readQueue(final String queueName) { promise.complete(); }); } else { - if (log.isDebugEnabled()) { - log.debug("Got a request to consume from locked queue {}", queueName); - } + log.debug("Got a request to consume from locked queue {}", queueName); myQueues.put(queueName, QueueState.READY); promise.complete(); } @@ -665,15 +637,14 @@ private Future readQueue(final String queueName) { } private void rescheduleSendMessageAfterFailure(final String queueName, int retryInSeconds) { - if (log.isTraceEnabled()) { - log.trace("RedsQues reschedule after failure for queue: {}", queueName); - } + log.trace("RedsQues reschedule after failure for queue: {}", queueName); vertx.setTimer(retryInSeconds * 1000L, timerId -> { if (log.isDebugEnabled()) { log.debug("RedisQues re-notify the consumer of queue '{}' at {}", queueName, new Date(System.currentTimeMillis())); } notifyConsumer(queueName).onComplete(event -> { + if( event.failed() ) log.warn("TODO error handling", new Exception(event.cause())); // reset the queue state to be consumed by {@link RedisQues#consume(String)} myQueues.put(queueName, QueueState.READY); }); @@ -687,7 +658,7 @@ private void processMessageWithTimeout(final String queue, final String payload, } timer.executeDelayedMax(processorDelayMax).onComplete(delayed -> { if (delayed.failed()) { - log.error("Delayed execution has failed.", delayed.cause()); + log.error("Delayed execution has failed.", new Exception(delayed.cause())); // TODO: May we should call handler with failed state now. return; } @@ -696,9 +667,7 @@ private void processMessageWithTimeout(final String queue, final String payload, JsonObject message = new JsonObject(); message.put("queue", queue); message.put(PAYLOAD, payload); - if (log.isTraceEnabled()) { - log.trace("RedisQues process message: {} for queue: {} send it to processor: {}", message, queue, processorAddress); - } + log.trace("RedisQues process message: {} for queue: {} send it to processor: {}", message, queue, processorAddress); // send the message to the consumer DeliveryOptions options = new DeliveryOptions().setSendTimeout(configurationProvider.configuration().getProcessorTimeout()); @@ -707,7 +676,8 @@ private void processMessageWithTimeout(final String queue, final String payload, if (reply.succeeded()) { success = OK.equals(reply.result().body().getString(STATUS)); } else { - log.info("RedisQues QUEUE_ERROR: Consumer failed {} queue: {} ({})", uid, queue, reply.cause().getMessage()); + log.info("RedisQues QUEUE_ERROR: Consumer failed {} queue: {}", + uid, queue, new Exception(reply.cause())); success = Boolean.FALSE; } handler.handle(success); @@ -722,23 +692,17 @@ private Future notifyConsumer(final String queueName) { final Promise promise = Promise.promise(); // Find the consumer to notify String key = consumersPrefix + queueName; - if (log.isTraceEnabled()) { - log.trace("RedisQues notify consumer get: {}", key); - } + log.trace("RedisQues notify consumer get: {}", key); redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(key, event -> { if (event.failed()) { - log.warn("Failed to get consumer for queue '{}'", queueName, event.cause()); + log.warn("Failed to get consumer for queue '{}'", queueName, new Exception(event.cause())); // We should return here. See: "https://softwareengineering.stackexchange.com/a/190535" } String consumer = Objects.toString(event.result(), null); - if (log.isTraceEnabled()) { - log.trace("RedisQues got consumer: {}", consumer); - } + log.trace("RedisQues got consumer: {}", consumer); if (consumer == null) { // No consumer for this queue, let's make a peer become consumer - if (log.isDebugEnabled()) { - log.debug("RedisQues Sending registration request for queue {}", queueName); - } + log.debug("RedisQues Sending registration request for queue {}", queueName); eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName); promise.complete(); } else { @@ -757,9 +721,7 @@ private Future notifyConsumer(final String queueName) { } private void refreshRegistration(String queueName, Handler> handler) { - if (log.isDebugEnabled()) { - log.debug("RedisQues Refreshing registration of queue {}, expire in {} s", queueName, consumerLockTime); - } + log.debug("RedisQues Refreshing registration of queue {}, expire in {} s", queueName, consumerLockTime); String consumerKey = consumersPrefix + queueName; if (handler == null) { throw new RuntimeException("Handler must be set"); @@ -781,9 +743,7 @@ private void refreshRegistration(String queueName, Handler */ private void updateTimestamp(final String queueName, Handler> handler) { long ts = System.currentTimeMillis(); - if (log.isTraceEnabled()) { - log.trace("RedisQues update timestamp for queue: {} to: {}", queueName, ts); - } + log.trace("RedisQues update timestamp for queue: {} to: {}", queueName, ts); redisProvider.redis().onSuccess(redisAPI -> { if (handler == null) { redisAPI.zadd(Arrays.asList(queuesKey, String.valueOf(ts), queueName)); @@ -816,25 +776,27 @@ private Future checkQueues() { return; } final AtomicInteger counter = new AtomicInteger(queues.size()); - if (log.isTraceEnabled()) { - log.trace("RedisQues update queues: {}", counter); - } - final List futureList = new ArrayList<>(); + log.trace("RedisQues update queues: {}", counter); + final List futureList = new ArrayList<>(queues.size()); for (Response queueObject : queues) { final Promise promise = Promise.promise(); futureList.add(promise.future()); // Check if the inactive queue is not empty (i.e. the key exists) final String queueName = queueObject.toString(); String key = queuesPrefix + queueName; - if (log.isTraceEnabled()) { - log.trace("RedisQues update queue: {}", key); - } + log.trace("RedisQues update queue: {}", key); Handler refreshRegHandler = event -> { // Make sure its TTL is correctly set (replaces the previous orphan detection mechanism). refreshRegistration(queueName, refreshRegistrationEvent -> { + if( refreshRegistrationEvent.failed() ) + log.warn("TODO error handling", new Exception(refreshRegistrationEvent.cause())); // And trigger its consumer. - notifyConsumer(queueName).onComplete(notifyConsumerEvent -> promise.complete()); + notifyConsumer(queueName).onComplete(notifyConsumerEvent -> { + if( notifyConsumerEvent.failed() ) + log.warn("TODO error handling", new Exception(notifyConsumerEvent.cause())); + promise.complete(); + }); }); }; redisAPI.exists(Collections.singletonList(key), event -> { @@ -853,18 +815,22 @@ private Future checkQueues() { } // Ensure we clean the old queues after having updated all timestamps if (counter.decrementAndGet() == 0) { - removeOldQueues(limit).onComplete(removeOldQueuesEvent -> refreshRegHandler.handle(null)); + removeOldQueues(limit).onComplete(removeOldQueuesEvent -> { + if (removeOldQueuesEvent.failed()) + log.warn("TODO error handling", new Exception(removeOldQueuesEvent.cause())); + refreshRegHandler.handle(null); + }); } else { refreshRegHandler.handle(null); } }); } else { // Ensure we clean the old queues also in the case of empty queue. - if (log.isTraceEnabled()) { - log.trace("RedisQues remove old queue: {}", queueName); - } + log.trace("RedisQues remove old queue: {}", queueName); if (counter.decrementAndGet() == 0) { removeOldQueues(limit).onComplete(removeOldQueuesEvent -> { + if( removeOldQueuesEvent.failed() ) + log.warn("TODO error handling", new Exception(removeOldQueuesEvent.cause())); queueStatisticsCollector.resetQueueFailureStatistics(queueName); promise.complete(); }); @@ -875,7 +841,10 @@ private Future checkQueues() { } }); } - CompositeFuture.all(futureList).onComplete(event1 -> result.complete()); + CompositeFuture.all(futureList).onComplete(ev1 -> { + if( ev1.failed() ) log.warn("Cannot happen", new Exception(ev1.cause())); + result.complete(); + }); })) .onFailure(throwable -> { log.warn("Redis: Failed to checkQueues", throwable); @@ -892,8 +861,13 @@ private Future checkQueues() { private Future removeOldQueues(long limit) { final Promise promise = Promise.promise(); log.debug("Cleaning old queues"); - redisProvider.redis().onSuccess(redisAPI -> redisAPI.zremrangebyscore(queuesKey, "-inf", String.valueOf(limit), - event -> promise.complete())) + redisProvider.redis() + .onSuccess(redisAPI -> { + redisAPI.zremrangebyscore(queuesKey, "-inf", String.valueOf(limit), event -> { + if( event.failed() ) log.warn("TODO error handling", event.cause()); + promise.complete(); + }); + }) .onFailure(throwable -> { log.warn("Redis: Failed to removeOldQueues", throwable); promise.complete();