diff --git a/pom.xml b/pom.xml index 57fc93d9..af253ec1 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 3.0.32-SNAPSHOT + 3.0.33-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index a89977e8..7cb8fa75 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -15,6 +15,7 @@ import org.swisspush.redisques.util.*; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import static org.swisspush.redisques.util.RedisquesAPI.*; @@ -93,6 +94,8 @@ private enum QueueState { private Map queueActions = new HashMap<>(); + private Map dequeueStatistic = new ConcurrentHashMap<>(); + public RedisQues() { } @@ -129,22 +132,15 @@ private void handleRegistrationRequest(Message msg) { log.warn("Got message without queue name while handleRegistrationRequest."); // IMO we should 'fail()' here. But we don't, to keep backward compatibility. } - if (log.isDebugEnabled()) { - log.debug( - "RedisQues Got registration request for queue {} from consumer: {}", queueName, uid); - } + log.debug("RedisQues Got registration request for queue {} from consumer: {}", queueName, uid); // Try to register for this queue redisSetWithOptions(consumersPrefix + queueName, uid, true, consumerLockTime, event -> { if (event.succeeded()) { String value = event.result() != null ? event.result().toString() : null; - if (log.isTraceEnabled()) { - log.trace("RedisQues setxn result: {} for queue: {}", value, queueName); - } + log.trace("RedisQues setxn result: {} for queue: {}", value, queueName); if ("OK".equals(value)) { // I am now the registered consumer for this queue. - if (log.isDebugEnabled()) { - log.debug("RedisQues Now registered for queue {}", queueName); - } + log.debug("RedisQues Now registered for queue {}", queueName); myQueues.put(queueName, QueueState.READY); consume(queueName); } else { @@ -168,6 +164,14 @@ public void start(Promise promise) { RedisquesConfiguration modConfig = configurationProvider.configuration(); log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration()); + int dequeueStatisticReportInterval = modConfig.getDequeueStatisticReportIntervalSec(); + if (dequeueStatisticReportInterval > 0) { + vertx.setPeriodic(1000L * dequeueStatisticReportInterval, handler -> { + dequeueStatistic.forEach((queueName, dequeueStatistic) -> + queueStatisticsCollector.setDequeueStatistic(queueName, dequeueStatistic)); + }); + } + queuesKey = modConfig.getRedisPrefix() + "queues"; queuesPrefix = modConfig.getRedisPrefix() + "queues:"; consumersPrefix = modConfig.getRedisPrefix() + "consumers:"; @@ -185,7 +189,7 @@ public void start(Promise promise) { initialize(); promise.complete(); } else { - promise.fail(event.cause()); + promise.fail(new Exception(event.cause())); } }); } @@ -195,7 +199,7 @@ private void initialize() { this.queueStatisticsCollector = new QueueStatisticsCollector(redisProvider, queuesPrefix, vertx, configuration.getQueueSpeedIntervalSec()); - RedisquesHttpRequestHandler.init(vertx, configuration); + RedisquesHttpRequestHandler.init(vertx, configuration, queueStatisticsCollector); // only initialize memoryUsageProvider when not provided in the constructor if (memoryUsageProvider == null) { @@ -275,8 +279,11 @@ private void registerActiveQueueRegistrationRefresh() { final String consumer = Objects.toString(getConsumerEvent.result(), ""); if (uid.equals(consumer)) { log.debug("RedisQues Periodic consumer refresh for active queue {}", queue); - refreshRegistration(queue, refreshRegistrationEvent -> - updateTimestamp(queue, null)); + refreshRegistration(queue, ev -> { + if (ev.failed()) + log.warn("TODO error handling", new Exception(ev.cause())); + updateTimestamp(queue, null); + }); } else { log.debug("RedisQues Removing queue {} from the list", queue); myQueues.remove(queue); @@ -291,14 +298,10 @@ private void registerActiveQueueRegistrationRefresh() { private Handler> operationsHandler() { return event -> { final JsonObject body = event.body(); - if (null == body) { - log.warn("Got msg with empty body from event bus. We'll run directly in a NullPointerException now. address={} replyAddress={} ", event.address(), event.replyAddress()); - // IMO we should 'fail()' here. But we don't, to keep backward compatibility. - } + if( body == null ) + throw new NullPointerException("Why is body empty? addr=" + event.address() + " replyAddr=" + event.replyAddress()); String operation = body.getString(OPERATION); - if (log.isTraceEnabled()) { - log.trace("RedisQues got operation: {}", operation); - } + log.trace("RedisQues got operation: {}", operation); QueueOperation queueOperation = QueueOperation.fromString(operation); if (queueOperation == null) { @@ -315,10 +318,7 @@ private Handler> operationsHandler() { resetConsumers(); return; case stop: - gracefulStop(event1 -> { - JsonObject reply = new JsonObject(); - reply.put(STATUS, OK); - }); + gracefulStop(aVoid -> {/*no-op*/}); return; } @@ -352,19 +352,18 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s } private void registerQueueCheck() { - vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> - { - redisProvider.connection().onSuccess(conn -> { - conn.send(Request.cmd(Command.SET, queueCheckLastexecKey, System.currentTimeMillis(), - "NX", "EX", configurationProvider.configuration().getCheckInterval())) - .onFailure(throwable -> log.error("Unexepected queue check result")).onSuccess(response -> { - log.info("periodic queue check is triggered now"); - checkQueues(); - }); - }).onFailure(throwable -> { - log.warn("Redis: Failed to trigger queue check.", throwable); - });; - + vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> { + redisProvider.connection() + .onFailure(ex -> log.error("TODO error handling", new Exception(ex))) + .onSuccess(conn -> { + conn.send(Request.cmd(Command.SET, queueCheckLastexecKey, System.currentTimeMillis(), + "NX", "EX", configurationProvider.configuration().getCheckInterval())) + .onFailure(ex -> log.error("Unexpected queue check result", new Exception(ex))) + .onSuccess(response -> { + log.info("periodic queue check is triggered now"); + checkQueues(); + }); + }); }); } @@ -383,45 +382,41 @@ public void stop() { } private void gracefulStop(final Handler doneHandler) { - consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> - unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { - stoppedHandler = doneHandler; - if (myQueues.keySet().isEmpty()) { - doneHandler.handle(null); - } - }))); + consumersMessageConsumer.unregister(event -> uidMessageConsumer.unregister(unregisterEvent -> { + if( event.failed() ) log.warn("TODO error handling", new Exception(event.cause())); + unregisterConsumers(false).onComplete(unregisterConsumersEvent -> { + if( unregisterEvent.failed() ) + log.warn("TODO error handling", new Exception(unregisterEvent.cause())); + stoppedHandler = doneHandler; + if (myQueues.keySet().isEmpty()) { + doneHandler.handle(null); + } + }); + })); } private Future unregisterConsumers(boolean force) { final Promise result = Promise.promise(); - if (log.isTraceEnabled()) { - log.trace("RedisQues unregister consumers force: {}", force); - } - log.debug("RedisQues Unregistering consumers"); - final List futureList = new ArrayList<>(); + log.debug("RedisQues unregister consumers. force={}", force); + final List futureList = new ArrayList<>(myQueues.size()); for (final Map.Entry entry : myQueues.entrySet()) { final Promise promise = Promise.promise(); futureList.add(promise.future()); final String queue = entry.getKey(); if (force || entry.getValue() == QueueState.READY) { - if (log.isTraceEnabled()) { - log.trace("RedisQues unregister consumers queue: {}", queue); - } + log.trace("RedisQues unregister consumers queue: {}", queue); refreshRegistration(queue, event -> { + if( event.failed() ) log.warn("TODO error handling", new Exception(event.cause())); // Make sure that I am still the registered consumer String consumerKey = consumersPrefix + queue; - if (log.isTraceEnabled()) { - log.trace("RedisQues unregister consumers get: {}", consumerKey); - } + log.trace("RedisQues unregister consumers get: {}", consumerKey); redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(consumerKey, getEvent -> { if (getEvent.failed()) { log.warn("Failed to retrieve consumer '{}'.", consumerKey, getEvent.cause()); // IMO we should 'fail()' here. But we don't, to keep backward compatibility. } String consumer = Objects.toString(getEvent.result(), ""); - if (log.isTraceEnabled()) { - log.trace("RedisQues unregister consumers get result: {}", consumer); - } + log.trace("RedisQues unregister consumers get result: {}", consumer); if (uid.equals(consumer)) { log.debug("RedisQues remove consumer: {}", uid); myQueues.remove(queue); @@ -436,7 +431,10 @@ private Future unregisterConsumers(boolean force) { promise.complete(); } } - CompositeFuture.all(futureList).onComplete(event1 -> result.complete()); + CompositeFuture.all(futureList).onComplete(ev -> { + if( ev.failed() ) log.warn("TODO error handling", new Exception(ev.cause())); + result.complete(); + }); return result.future(); } @@ -447,9 +445,7 @@ private Future unregisterConsumers(boolean force) { private void resetConsumers() { log.debug("RedisQues Resetting consumers"); String keysPattern = consumersPrefix + "*"; - if (log.isTraceEnabled()) { - log.trace("RedisQues reset consumers keys: {}", keysPattern); - } + log.trace("RedisQues reset consumers keys: {}", keysPattern); redisProvider.redis().onSuccess(redisAPI -> redisAPI.keys(keysPattern, keysResult -> { if (keysResult.failed() || keysResult.result() == null) { log.error("Unable to get redis keys of consumers", keysResult.cause()); @@ -466,8 +462,8 @@ private void resetConsumers() { } redisAPI.del(args, delManyResult -> { if (delManyResult.succeeded()) { - Long count = delManyResult.result().toLong(); - log.debug("Successfully reset {} consumers", count); + if( log.isDebugEnabled() ) + log.debug("Successfully reset {} consumers", delManyResult.result().toLong()); } else { log.error("Unable to delete redis keys of consumers"); } @@ -478,9 +474,7 @@ private void resetConsumers() { private Future consume(final String queueName) { final Promise promise = Promise.promise(); - if (log.isDebugEnabled()) { - log.debug("RedisQues Requested to consume queue {}", queueName); - } + log.debug("RedisQues Requested to consume queue {}", queueName); refreshRegistration(queueName, event -> { if (event.failed()) { log.warn("Failed to refresh registration for queue '{}'.", queueName, event.cause()); @@ -488,23 +482,17 @@ private Future consume(final String queueName) { } // Make sure that I am still the registered consumer String consumerKey = consumersPrefix + queueName; - if (log.isTraceEnabled()) { - log.trace("RedisQues consume get: {}", consumerKey); - } + log.trace("RedisQues consume get: {}", consumerKey); redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(consumerKey, event1 -> { if (event1.failed()) { log.error("Unable to get consumer for queue " + queueName, event1.cause()); return; } String consumer = Objects.toString(event1.result(), ""); - if (log.isTraceEnabled()) { - log.trace("RedisQues refresh registration consumer: {}", consumer); - } + log.trace("RedisQues refresh registration consumer: {}", consumer); if (uid.equals(consumer)) { QueueState state = myQueues.get(queueName); - if (log.isTraceEnabled()) { - log.trace("RedisQues consumer: {} queue: {} state: {}", consumer, queueName, state); - } + log.trace("RedisQues consumer: {} queue: {} state: {}", consumer, queueName, state); // Get the next message only once the previous has // been completely processed if (state != QueueState.CONSUMING) { @@ -514,21 +502,25 @@ private Future consume(final String queueName) { // consumer was restarted log.warn("Received request to consume from a queue I did not know about: {}", queueName); } - if (log.isDebugEnabled()) { - log.debug("RedisQues Starting to consume queue {}", queueName); - } - readQueue(queueName).onComplete(readQueueEvent -> promise.complete()); + log.debug("RedisQues Starting to consume queue {}", queueName); + readQueue(queueName).onComplete(readQueueEvent -> { + if( readQueueEvent.failed() ) + log.warn("TODO error handling", new Exception(readQueueEvent.cause())); + promise.complete(); + }); } else { - if (log.isDebugEnabled()) { - log.debug("RedisQues Queue {} is already being consumed", queueName); - } + log.debug("RedisQues Queue {} is already being consumed", queueName); promise.complete(); } } else { // Somehow registration changed. Let's renotify. log.warn("Registration for queue {} has changed to {}", queueName, consumer); myQueues.remove(queueName); - notifyConsumer(queueName).onComplete(notifyConsumerEvent -> promise.complete()); + notifyConsumer(queueName).onComplete(notifyConsumerEvent -> { + if( notifyConsumerEvent.failed() ) + log.warn("TODO error handling", notifyConsumerEvent.cause()); + promise.complete(); + }); } })) .onFailure(throwable -> log.error("Redis: Unable to get consumer for queue " + queueName, throwable)); @@ -560,19 +552,13 @@ private Future isQueueLocked(final String queue) { private Future readQueue(final String queueName) { final Promise promise = Promise.promise(); - if (log.isTraceEnabled()) { - log.trace("RedisQues read queue: {}", queueName); - } + log.trace("RedisQues read queue: {}", queueName); String queueKey = queuesPrefix + queueName; - if (log.isTraceEnabled()) { - log.trace("RedisQues read queue lindex: {}", queueKey); - } + log.trace("RedisQues read queue lindex: {}", queueKey); isQueueLocked(queueName).onComplete(lockAnswer -> { - if (lockAnswer.failed()) { - log.error("Failed to check if queue '{}' is locked", queueName, lockAnswer.cause()); - // We should return here. See: "https://softwareengineering.stackexchange.com/a/190535" - } + if( lockAnswer.failed() ) + throw new UnsupportedOperationException("TODO error handling " + queueName, lockAnswer.cause()); boolean locked = lockAnswer.result(); if (!locked) { redisProvider.redis().onSuccess(redisAPI -> redisAPI.lindex(queueKey, "0", answer -> { @@ -580,20 +566,19 @@ private Future readQueue(final String queueName) { log.error("Failed to peek queue '{}'", queueName, answer.cause()); // We should return here. See: "https://softwareengineering.stackexchange.com/a/190535" } - if (log.isTraceEnabled()) { - log.trace("RedisQues read queue lindex result: {}", answer.result()); - } - if (answer.result() != null) { - processMessageWithTimeout(queueName, answer.result().toString(), success -> { + Response response = answer.result(); + log.trace("RedisQues read queue lindex result: {}", response); + if (response != null) { + dequeueStatistic.computeIfAbsent(queueName, s -> new DequeueStatistic()); + dequeueStatistic.get(queueName).lastDequeueAttemptTimestamp = System.currentTimeMillis(); + processMessageWithTimeout(queueName, response.toString(), success -> { // update the queue failure count and get a retry interval int retryInterval = updateQueueFailureCountAndGetRetryInterval(queueName, success); if (success) { // Remove the processed message from the queue - if (log.isTraceEnabled()) { - log.trace("RedisQues read queue lpop: {}", queueKey); - } + log.trace("RedisQues read queue lpop: {}", queueKey); redisAPI.lpop(Collections.singletonList(queueKey), jsonAnswer -> { if (jsonAnswer.failed()) { log.error("Failed to pop from queue '{}'", queueName, jsonAnswer.cause()); @@ -604,13 +589,17 @@ private Future readQueue(final String queueName) { Handler nextMsgHandler = event -> { // Issue notification to consume next message if any - if (log.isTraceEnabled()) { - log.trace("RedisQues read queue: {}", queueKey); - } + log.trace("RedisQues read queue: {}", queueKey); redisAPI.llen(queueKey, answer1 -> { if (answer1.succeeded() && answer1.result() != null && answer1.result().toInteger() > 0) { - notifyConsumer(queueName).onComplete(event1 -> promise.complete()); + notifyConsumer(queueName).onComplete(event1 -> { + if( event1.failed() ) + log.warn("TODO error handling", new Exception(event1.cause())); + promise.complete(); + }); } else { + if( answer1.failed() ) + log.warn("TODO error handling", new Exception(answer1.cause())); promise.complete(); } }); @@ -619,6 +608,8 @@ private Future readQueue(final String queueName) { // Notify that we are stopped in case it was the last active consumer if (stoppedHandler != null) { unregisterConsumers(false).onComplete(event -> { + if( event.failed() ) + log.warn("TODO error handling", new Exception(event.cause())); if (myQueues.isEmpty()) { stoppedHandler.handle(null); } @@ -630,21 +621,18 @@ private Future readQueue(final String queueName) { }); } else { // Failed. Message will be kept in queue and retried later - if (log.isDebugEnabled()) { - log.debug("RedisQues Processing failed for queue {}", queueName); - // reschedule - log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval); - } + log.debug("RedisQues Processing failed for queue {}", queueName); + // reschedule + log.debug("RedisQues will re-send the message to queue '{}' in {} seconds", queueName, retryInterval); rescheduleSendMessageAfterFailure(queueName, retryInterval); promise.complete(); } }); } else { // This can happen when requests to consume happen at the same moment the queue is emptied. - if (log.isDebugEnabled()) { - log.debug("Got a request to consume from empty queue {}", queueName); - } + log.debug("Got a request to consume from empty queue {}", queueName); myQueues.put(queueName, QueueState.READY); + dequeueStatistic.remove(queueName); promise.complete(); } })).onFailure(throwable -> { @@ -654,9 +642,7 @@ private Future readQueue(final String queueName) { promise.complete(); }); } else { - if (log.isDebugEnabled()) { - log.debug("Got a request to consume from locked queue {}", queueName); - } + log.debug("Got a request to consume from locked queue {}", queueName); myQueues.put(queueName, QueueState.READY); promise.complete(); } @@ -665,15 +651,16 @@ private Future readQueue(final String queueName) { } private void rescheduleSendMessageAfterFailure(final String queueName, int retryInSeconds) { - if (log.isTraceEnabled()) { - log.trace("RedsQues reschedule after failure for queue: {}", queueName); - } + log.trace("RedsQues reschedule after failure for queue: {}", queueName); vertx.setTimer(retryInSeconds * 1000L, timerId -> { + long retryDelayInMills = retryInSeconds * 1000L; + dequeueStatistic.get(queueName).nextDequeueDueTimestamp = System.currentTimeMillis() + retryDelayInMills; if (log.isDebugEnabled()) { log.debug("RedisQues re-notify the consumer of queue '{}' at {}", queueName, new Date(System.currentTimeMillis())); } notifyConsumer(queueName).onComplete(event -> { + if( event.failed() ) log.warn("TODO error handling", new Exception(event.cause())); // reset the queue state to be consumed by {@link RedisQues#consume(String)} myQueues.put(queueName, QueueState.READY); }); @@ -687,7 +674,7 @@ private void processMessageWithTimeout(final String queue, final String payload, } timer.executeDelayedMax(processorDelayMax).onComplete(delayed -> { if (delayed.failed()) { - log.error("Delayed execution has failed.", delayed.cause()); + log.error("Delayed execution has failed.", new Exception(delayed.cause())); // TODO: May we should call handler with failed state now. return; } @@ -696,9 +683,7 @@ private void processMessageWithTimeout(final String queue, final String payload, JsonObject message = new JsonObject(); message.put("queue", queue); message.put(PAYLOAD, payload); - if (log.isTraceEnabled()) { - log.trace("RedisQues process message: {} for queue: {} send it to processor: {}", message, queue, processorAddress); - } + log.trace("RedisQues process message: {} for queue: {} send it to processor: {}", message, queue, processorAddress); // send the message to the consumer DeliveryOptions options = new DeliveryOptions().setSendTimeout(configurationProvider.configuration().getProcessorTimeout()); @@ -706,8 +691,11 @@ private void processMessageWithTimeout(final String queue, final String payload, boolean success; if (reply.succeeded()) { success = OK.equals(reply.result().body().getString(STATUS)); + dequeueStatistic.get(queue).lastDequeueSuccessTimestamp = System.currentTimeMillis(); + dequeueStatistic.get(queue).nextDequeueDueTimestamp = null; } else { - log.info("RedisQues QUEUE_ERROR: Consumer failed {} queue: {} ({})", uid, queue, reply.cause().getMessage()); + log.info("RedisQues QUEUE_ERROR: Consumer failed {} queue: {}", + uid, queue, new Exception(reply.cause())); success = Boolean.FALSE; } handler.handle(success); @@ -722,23 +710,17 @@ private Future notifyConsumer(final String queueName) { final Promise promise = Promise.promise(); // Find the consumer to notify String key = consumersPrefix + queueName; - if (log.isTraceEnabled()) { - log.trace("RedisQues notify consumer get: {}", key); - } + log.trace("RedisQues notify consumer get: {}", key); redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(key, event -> { if (event.failed()) { - log.warn("Failed to get consumer for queue '{}'", queueName, event.cause()); + log.warn("Failed to get consumer for queue '{}'", queueName, new Exception(event.cause())); // We should return here. See: "https://softwareengineering.stackexchange.com/a/190535" } String consumer = Objects.toString(event.result(), null); - if (log.isTraceEnabled()) { - log.trace("RedisQues got consumer: {}", consumer); - } + log.trace("RedisQues got consumer: {}", consumer); if (consumer == null) { // No consumer for this queue, let's make a peer become consumer - if (log.isDebugEnabled()) { - log.debug("RedisQues Sending registration request for queue {}", queueName); - } + log.debug("RedisQues Sending registration request for queue {}", queueName); eb.send(configurationProvider.configuration().getAddress() + "-consumers", queueName); promise.complete(); } else { @@ -757,9 +739,7 @@ private Future notifyConsumer(final String queueName) { } private void refreshRegistration(String queueName, Handler> handler) { - if (log.isDebugEnabled()) { - log.debug("RedisQues Refreshing registration of queue {}, expire in {} s", queueName, consumerLockTime); - } + log.debug("RedisQues Refreshing registration of queue {}, expire in {} s", queueName, consumerLockTime); String consumerKey = consumersPrefix + queueName; if (handler == null) { throw new RuntimeException("Handler must be set"); @@ -781,9 +761,7 @@ private void refreshRegistration(String queueName, Handler */ private void updateTimestamp(final String queueName, Handler> handler) { long ts = System.currentTimeMillis(); - if (log.isTraceEnabled()) { - log.trace("RedisQues update timestamp for queue: {} to: {}", queueName, ts); - } + log.trace("RedisQues update timestamp for queue: {} to: {}", queueName, ts); redisProvider.redis().onSuccess(redisAPI -> { if (handler == null) { redisAPI.zadd(Arrays.asList(queuesKey, String.valueOf(ts), queueName)); @@ -816,25 +794,27 @@ private Future checkQueues() { return; } final AtomicInteger counter = new AtomicInteger(queues.size()); - if (log.isTraceEnabled()) { - log.trace("RedisQues update queues: {}", counter); - } - final List futureList = new ArrayList<>(); + log.trace("RedisQues update queues: {}", counter); + final List futureList = new ArrayList<>(queues.size()); for (Response queueObject : queues) { final Promise promise = Promise.promise(); futureList.add(promise.future()); // Check if the inactive queue is not empty (i.e. the key exists) final String queueName = queueObject.toString(); String key = queuesPrefix + queueName; - if (log.isTraceEnabled()) { - log.trace("RedisQues update queue: {}", key); - } + log.trace("RedisQues update queue: {}", key); Handler refreshRegHandler = event -> { // Make sure its TTL is correctly set (replaces the previous orphan detection mechanism). refreshRegistration(queueName, refreshRegistrationEvent -> { + if( refreshRegistrationEvent.failed() ) + log.warn("TODO error handling", new Exception(refreshRegistrationEvent.cause())); // And trigger its consumer. - notifyConsumer(queueName).onComplete(notifyConsumerEvent -> promise.complete()); + notifyConsumer(queueName).onComplete(notifyConsumerEvent -> { + if( notifyConsumerEvent.failed() ) + log.warn("TODO error handling", new Exception(notifyConsumerEvent.cause())); + promise.complete(); + }); }); }; redisAPI.exists(Collections.singletonList(key), event -> { @@ -853,7 +833,11 @@ private Future checkQueues() { } // Ensure we clean the old queues after having updated all timestamps if (counter.decrementAndGet() == 0) { - removeOldQueues(limit).onComplete(removeOldQueuesEvent -> refreshRegHandler.handle(null)); + removeOldQueues(limit).onComplete(removeOldQueuesEvent -> { + if (removeOldQueuesEvent.failed()) + log.warn("TODO error handling", new Exception(removeOldQueuesEvent.cause())); + refreshRegHandler.handle(null); + }); } else { refreshRegHandler.handle(null); } @@ -863,8 +847,11 @@ private Future checkQueues() { if (log.isTraceEnabled()) { log.trace("RedisQues remove old queue: {}", queueName); } + dequeueStatistic.remove(queueName); if (counter.decrementAndGet() == 0) { removeOldQueues(limit).onComplete(removeOldQueuesEvent -> { + if( removeOldQueuesEvent.failed() ) + log.warn("TODO error handling", new Exception(removeOldQueuesEvent.cause())); queueStatisticsCollector.resetQueueFailureStatistics(queueName); promise.complete(); }); @@ -875,7 +862,10 @@ private Future checkQueues() { } }); } - CompositeFuture.all(futureList).onComplete(event1 -> result.complete()); + CompositeFuture.all(futureList).onComplete(ev1 -> { + if( ev1.failed() ) log.warn("Cannot happen", new Exception(ev1.cause())); + result.complete(); + }); })) .onFailure(throwable -> { log.warn("Redis: Failed to checkQueues", throwable); @@ -892,8 +882,13 @@ private Future checkQueues() { private Future removeOldQueues(long limit) { final Promise promise = Promise.promise(); log.debug("Cleaning old queues"); - redisProvider.redis().onSuccess(redisAPI -> redisAPI.zremrangebyscore(queuesKey, "-inf", String.valueOf(limit), - event -> promise.complete())) + redisProvider.redis() + .onSuccess(redisAPI -> { + redisAPI.zremrangebyscore(queuesKey, "-inf", String.valueOf(limit), event -> { + if( event.failed() ) log.warn("TODO error handling", event.cause()); + promise.complete(); + }); + }) .onFailure(throwable -> { log.warn("Redis: Failed to removeOldQueues", throwable); promise.complete(); diff --git a/src/main/java/org/swisspush/redisques/action/AbstractQueueAction.java b/src/main/java/org/swisspush/redisques/action/AbstractQueueAction.java index 67ebef30..dae9b8a3 100644 --- a/src/main/java/org/swisspush/redisques/action/AbstractQueueAction.java +++ b/src/main/java/org/swisspush/redisques/action/AbstractQueueAction.java @@ -51,7 +51,10 @@ public AbstractQueueAction(Vertx vertx, RedisProvider redisProvider, String addr } protected Handler replyErrorMessageHandler(Message event) { - return throwable -> event.reply(new JsonObject().put(STATUS, ERROR)); + return ex -> { + log.warn("Concealed error", new Exception(ex)); + event.reply(new JsonObject().put(STATUS, ERROR)); + }; } protected long getMaxAgeTimestamp() { diff --git a/src/main/java/org/swisspush/redisques/action/AddQueueItemAction.java b/src/main/java/org/swisspush/redisques/action/AddQueueItemAction.java index 7e094da0..5919f6ea 100644 --- a/src/main/java/org/swisspush/redisques/action/AddQueueItemAction.java +++ b/src/main/java/org/swisspush/redisques/action/AddQueueItemAction.java @@ -16,19 +16,22 @@ public class AddQueueItemAction extends AbstractQueueAction { - public AddQueueItemAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public AddQueueItemAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, + String consumersPrefix, String locksKey, List queueConfigurations, + QueueStatisticsCollector queueStatisticsCollector, Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override public void execute(Message event) { String key1 = queuesPrefix + event.body().getJsonObject(PAYLOAD).getString(QUEUENAME); String valueAddItem = event.body().getJsonObject(PAYLOAD).getString(BUFFER); - redisProvider.redis().onSuccess( - redisAPI -> redisAPI.rpush(Arrays.asList(key1, valueAddItem), new AddQueueItemHandler(event))) - .onFailure(replyErrorMessageHandler(event)); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> redisAPI.rpush(Arrays.asList(key1, valueAddItem), new AddQueueItemHandler(event))); + p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } + } diff --git a/src/main/java/org/swisspush/redisques/action/BulkDeleteQueuesAction.java b/src/main/java/org/swisspush/redisques/action/BulkDeleteQueuesAction.java index 6b2410ba..83c50a89 100644 --- a/src/main/java/org/swisspush/redisques/action/BulkDeleteQueuesAction.java +++ b/src/main/java/org/swisspush/redisques/action/BulkDeleteQueuesAction.java @@ -15,11 +15,13 @@ public class BulkDeleteQueuesAction extends AbstractQueueAction { - public BulkDeleteQueuesAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public BulkDeleteQueuesAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, + String consumersPrefix, String locksKey, List queueConfigurations, + QueueStatisticsCollector queueStatisticsCollector, Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override @@ -39,15 +41,17 @@ public void execute(Message event) { event.reply(createErrorReply().put(ERROR_TYPE, BAD_INPUT).put(MESSAGE, "Queues must be string values")); return; } - redisProvider.redis().onSuccess(redisAPI -> redisAPI.del(buildQueueKeys(queues), delManyReply -> { - queueStatisticsCollector.resetQueueStatistics(queues); - if (delManyReply.succeeded()) { - event.reply(createOkReply().put(VALUE, delManyReply.result().toLong())); - } else { - log.error("Failed to bulkDeleteQueues", delManyReply.cause()); - event.reply(createErrorReply()); - } - })) - .onFailure(replyErrorMessageHandler(event)); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> redisAPI.del(buildQueueKeys(queues), delManyReply -> { + queueStatisticsCollector.resetQueueStatistics(queues); + if (delManyReply.succeeded()) { + event.reply(createOkReply().put(VALUE, delManyReply.result().toLong())); + } else { + log.error("Failed to bulkDeleteQueues", new Exception(delManyReply.cause())); + event.reply(createErrorReply()); + } + })); + p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } + } diff --git a/src/main/java/org/swisspush/redisques/action/BulkPutLocksAction.java b/src/main/java/org/swisspush/redisques/action/BulkPutLocksAction.java index 5adc7338..6e6ca833 100644 --- a/src/main/java/org/swisspush/redisques/action/BulkPutLocksAction.java +++ b/src/main/java/org/swisspush/redisques/action/BulkPutLocksAction.java @@ -16,12 +16,13 @@ public class BulkPutLocksAction extends AbstractQueueAction { - - public BulkPutLocksAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public BulkPutLocksAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, + String consumersPrefix, String locksKey, List queueConfigurations, + QueueStatisticsCollector queueStatisticsCollector, Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override @@ -43,8 +44,11 @@ public void execute(Message event) { return; } - redisProvider.redis().onSuccess(redisAPI -> - redisAPI.hmset(buildLocksItems(locksKey, locks, lockInfo), new PutLockHandler(event))) - .onFailure(replyErrorMessageHandler(event)); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> { + redisAPI.hmset(buildLocksItems(locksKey, locks, lockInfo), new PutLockHandler(event)); + }); + p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } + } diff --git a/src/main/java/org/swisspush/redisques/action/DeleteAllLocksAction.java b/src/main/java/org/swisspush/redisques/action/DeleteAllLocksAction.java index dc2a9eff..dc816a86 100644 --- a/src/main/java/org/swisspush/redisques/action/DeleteAllLocksAction.java +++ b/src/main/java/org/swisspush/redisques/action/DeleteAllLocksAction.java @@ -15,11 +15,13 @@ public class DeleteAllLocksAction extends AbstractQueueAction { - public DeleteAllLocksAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public DeleteAllLocksAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, + String consumersPrefix, String locksKey, List queueConfigurations, + QueueStatisticsCollector queueStatisticsCollector, Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override @@ -29,13 +31,14 @@ public void execute(Message event) { Response locks = locksResult.result(); deleteLocks(event, locks); } else { - replyError(event, locksResult.cause().getMessage()); + replyError(event, locksResult.cause()); } - })).onFailure(throwable -> replyError(event, throwable.getMessage())); + })).onFailure(ex -> replyError(event, ex)); } - private void replyError(Message event, String message) { - log.warn("failed to delete all locks. Message: {}", message); - event.reply(createErrorReply().put(MESSAGE, message)); + private void replyError(Message event, Throwable ex) { + if( log.isWarnEnabled() ) log.warn("failed to delete all locks.", new Exception(ex)); + event.reply(createErrorReply().put(MESSAGE, ex.getMessage())); } + } diff --git a/src/main/java/org/swisspush/redisques/action/DeleteAllQueueItemsAction.java b/src/main/java/org/swisspush/redisques/action/DeleteAllQueueItemsAction.java index 9bb0a3a1..035cda2e 100644 --- a/src/main/java/org/swisspush/redisques/action/DeleteAllQueueItemsAction.java +++ b/src/main/java/org/swisspush/redisques/action/DeleteAllQueueItemsAction.java @@ -18,11 +18,13 @@ public class DeleteAllQueueItemsAction extends AbstractQueueAction { - public DeleteAllQueueItemsAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public DeleteAllQueueItemsAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, + String consumersPrefix, String locksKey, List queueConfigurations, + QueueStatisticsCollector queueStatisticsCollector, Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override @@ -30,37 +32,42 @@ public void execute(Message event) { JsonObject payload = event.body().getJsonObject(PAYLOAD); boolean unlock = payload.getBoolean(UNLOCK, false); String queue = payload.getString(QUEUENAME); - redisProvider.redis().onSuccess(redisAPI -> redisAPI.del(Collections.singletonList(buildQueueKey(queue)), - deleteReply -> { - if (deleteReply.failed()) { - log.warn("Failed to deleteAllQueueItems. But we'll continue anyway", deleteReply.cause()); - // May we should 'fail()' here. But: - // 1st: We don't, to keep backward compatibility - // 2nd: We don't, to may unlock below. - } - queueStatisticsCollector.resetQueueFailureStatistics(queue); - if (unlock) { - redisAPI.hdel(Arrays.asList(locksKey, queue), unlockReply -> { - if (unlockReply.failed()) { - log.warn("Failed to unlock queue '{}'. Will continue anyway", queue, unlockReply.cause()); - // IMO we should 'fail()' here. But we don't, to keep backward compatibility. - } - handleDeleteQueueReply(event, deleteReply); - }); - } else { + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> { + redisAPI.del(Collections.singletonList(buildQueueKey(queue)), deleteReply -> { + if (deleteReply.failed()) { + log.warn("Failed to deleteAllQueueItems. But we'll continue anyway", + new Exception(deleteReply.cause())); + // May we should 'fail()' here. But: + // 1st: We don't, to keep backward compatibility + // 2nd: We don't, to may unlock below. + } + queueStatisticsCollector.resetQueueFailureStatistics(queue); + if (unlock) { + redisAPI.hdel(Arrays.asList(locksKey, queue), unlockReply -> { + if (unlockReply.failed()) { + log.warn("Failed to unlock queue '{}'. Will continue anyway", + queue, unlockReply.cause()); + // IMO we should 'fail()' here. But we don't, to keep backward compatibility. + } handleDeleteQueueReply(event, deleteReply); - } - })).onFailure(throwable -> { - log.error("Redis: Failed to delete all queue items", throwable); - event.reply(createErrorReply()); - }); + }); + } else { + handleDeleteQueueReply(event, deleteReply); + } + }); + }); + p.onFailure(ex -> { + log.error("Redis: Failed to delete all queue items", new Exception(ex)); + event.reply(createErrorReply()); + }); } private void handleDeleteQueueReply(Message event, AsyncResult reply) { if (reply.succeeded()) { event.reply(createOkReply().put(VALUE, reply.result().toLong())); } else { - log.error("Failed to replyResultGreaterThanZero", reply.cause()); + log.error("Failed to replyResultGreaterThanZero", new Exception(reply.cause())); event.reply(createErrorReply()); } } diff --git a/src/main/java/org/swisspush/redisques/action/DeleteLockAction.java b/src/main/java/org/swisspush/redisques/action/DeleteLockAction.java index 69a304bd..530c8a7a 100644 --- a/src/main/java/org/swisspush/redisques/action/DeleteLockAction.java +++ b/src/main/java/org/swisspush/redisques/action/DeleteLockAction.java @@ -18,23 +18,29 @@ public class DeleteLockAction extends AbstractQueueAction { - public DeleteLockAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public DeleteLockAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, + String consumersPrefix, String locksKey, List queueConfigurations, + QueueStatisticsCollector queueStatisticsCollector, Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override public void execute(Message event) { String queueName = event.body().getJsonObject(PAYLOAD).getString(QUEUENAME); - redisProvider.redis().onSuccess(redisAPI -> - redisAPI.exists(Collections.singletonList(queuesPrefix + queueName), event1 -> { - if (event1.succeeded() && event1.result() != null && event1.result().toInteger() == 1) { - notifyConsumer(queueName); - } - redisAPI.hdel(Arrays.asList(locksKey, queueName), new DeleteLockHandler(event)); - })) - .onFailure(replyErrorMessageHandler(event)); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> { + redisAPI.exists(Collections.singletonList(queuesPrefix + queueName), event1 -> { + if( event1.failed() ) log.warn("Concealed error", new Exception(event1.cause())); + if (event1.succeeded() && event1.result() != null && event1.result().toInteger() == 1) { + notifyConsumer(queueName); + } + redisAPI.hdel(Arrays.asList(locksKey, queueName), new DeleteLockHandler(event)); + }); + }); + p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } + } diff --git a/src/main/java/org/swisspush/redisques/action/DeleteQueueItemAction.java b/src/main/java/org/swisspush/redisques/action/DeleteQueueItemAction.java index 0da94e4f..2e0ce4ee 100644 --- a/src/main/java/org/swisspush/redisques/action/DeleteQueueItemAction.java +++ b/src/main/java/org/swisspush/redisques/action/DeleteQueueItemAction.java @@ -14,11 +14,13 @@ public class DeleteQueueItemAction extends AbstractQueueAction { - public DeleteQueueItemAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public DeleteQueueItemAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, + String consumersPrefix, String locksKey, List queueConfigurations, + QueueStatisticsCollector queueStatisticsCollector, Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override @@ -31,17 +33,18 @@ public void execute(Message event) { String keyLrem = queuesPrefix + event.body().getJsonObject(PAYLOAD).getString(QUEUENAME); redisAPI.lrem(keyLrem, "0", "TO_DELETE", replyLrem -> { if (replyLrem.failed()) { - log.warn("Redis 'lrem' command failed. But will continue anyway.", replyLrem.cause()); + log.warn("Redis 'lrem' command failed. But will continue anyway.", + new Exception(replyLrem.cause())); // IMO we should 'fail()' here. But we don't, to keep backward compatibility. } event.reply(createOkReply()); }); } else { - log.error("Failed to 'lset' while deleteQueueItem.", event1.cause()); + log.error("Failed to 'lset' while deleteQueueItem.", new Exception(event1.cause())); event.reply(createErrorReply()); } - })).onFailure(throwable -> { - log.error("Redis: Failed to deleteQueueItem.", throwable); + })).onFailure(ex -> { + log.error("Redis: Failed to deleteQueueItem.", new Exception(ex)); event.reply(createErrorReply()); }); } diff --git a/src/main/java/org/swisspush/redisques/action/EnqueueAction.java b/src/main/java/org/swisspush/redisques/action/EnqueueAction.java index 488e62a1..b2f1c779 100644 --- a/src/main/java/org/swisspush/redisques/action/EnqueueAction.java +++ b/src/main/java/org/swisspush/redisques/action/EnqueueAction.java @@ -19,11 +19,14 @@ public class EnqueueAction extends AbstractQueueAction { private final MemoryUsageProvider memoryUsageProvider; private final int memoryUsageLimitPercent; - public EnqueueAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log, MemoryUsageProvider memoryUsageProvider, int memoryUsageLimitPercent) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public EnqueueAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, + String consumersPrefix, String locksKey, List queueConfigurations, + QueueStatisticsCollector queueStatisticsCollector, Logger log, MemoryUsageProvider memoryUsageProvider, + int memoryUsageLimitPercent + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); this.memoryUsageProvider = memoryUsageProvider; this.memoryUsageLimitPercent = memoryUsageLimitPercent; } @@ -45,7 +48,8 @@ public void execute(Message event) { String keyEnqueue = queuesPrefix + queueName; String valueEnqueue = event.body().getString(MESSAGE); - redisProvider.redis().onSuccess(redisAPI -> redisAPI.rpush(Arrays.asList(keyEnqueue, valueEnqueue)).onComplete(enqueueEvent -> { + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> redisAPI.rpush(Arrays.asList(keyEnqueue, valueEnqueue)).onComplete(enqueueEvent -> { JsonObject reply = new JsonObject(); if (enqueueEvent.succeeded()) { if (log.isDebugEnabled()) { @@ -79,13 +83,13 @@ public void execute(Message event) { } else { replyError(event, queueName, enqueueEvent.cause()); } - })).onFailure(throwable -> replyError(event, queueName, throwable)); + })).onFailure(ex -> replyError(event, queueName, ex)); }); } - private void replyError(Message event, String queueName, Throwable cause) { + private void replyError(Message event, String queueName, Throwable ex) { String message = "RedisQues QUEUE_ERROR: Error while enqueueing message into queue " + queueName; - log.error(message, cause); + log.error(message, new Exception(ex)); JsonObject reply = new JsonObject(); reply.put(STATUS, ERROR); reply.put(MESSAGE, message); diff --git a/src/main/java/org/swisspush/redisques/action/GetAllLocksAction.java b/src/main/java/org/swisspush/redisques/action/GetAllLocksAction.java index b44be126..5aae3aa2 100644 --- a/src/main/java/org/swisspush/redisques/action/GetAllLocksAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetAllLocksAction.java @@ -15,20 +15,26 @@ public class GetAllLocksAction extends AbstractQueueAction { - public GetAllLocksAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public GetAllLocksAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, + String queuesPrefix, String consumersPrefix, String locksKey, + List queueConfigurations, QueueStatisticsCollector queueStatisticsCollector, + Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override public void execute(Message event) { Result, String> result = MessageUtil.extractFilterPattern(event); if (result.isOk()) { - redisProvider.redis().onSuccess(redisAPI -> redisAPI.hkeys(locksKey, new GetAllLocksHandler(event, result.getOk()))).onFailure(replyErrorMessageHandler(event)); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> redisAPI.hkeys(locksKey, new GetAllLocksHandler(event, result.getOk()))); + p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } else { event.reply(createErrorReply().put(ERROR_TYPE, BAD_INPUT).put(MESSAGE, result.getErr())); } } + } diff --git a/src/main/java/org/swisspush/redisques/action/GetLockAction.java b/src/main/java/org/swisspush/redisques/action/GetLockAction.java index 09fb36a0..b7f86c62 100644 --- a/src/main/java/org/swisspush/redisques/action/GetLockAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetLockAction.java @@ -17,25 +17,30 @@ public class GetLockAction extends AbstractQueueAction { - public GetLockAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public GetLockAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, + String queuesPrefix, String consumersPrefix, String locksKey, + List queueConfigurations, QueueStatisticsCollector queueStatisticsCollector, + Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } - @Override public void execute(Message event) { final JsonObject body = event.body(); - if (null == body) { - log.warn("Got msg with empty body from event bus. We'll run directly in " + - "a NullPointerException now. address={} replyAddress={} ", event.address(), event.replyAddress()); - // IMO we should 'fail()' here. But we don't, to keep backward compatibility. + if (body == null) { + replyErrorMessageHandler(event).handle(new NullPointerException("" + + "Got msg with no body from event bus. address=" + + event.address() + " replyAddress=" + event.replyAddress())); + return; } - redisProvider.redis().onSuccess(redisAPI -> - redisAPI.hget(locksKey, body.getJsonObject(PAYLOAD).getString(QUEUENAME), new GetLockHandler(event))) - .onFailure(replyErrorMessageHandler(event)); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> { + redisAPI.hget(locksKey, body.getJsonObject(PAYLOAD).getString(QUEUENAME), new GetLockHandler(event)); + }); + p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } } diff --git a/src/main/java/org/swisspush/redisques/action/GetQueueItemAction.java b/src/main/java/org/swisspush/redisques/action/GetQueueItemAction.java index 5e33aa70..78a7b1b4 100644 --- a/src/main/java/org/swisspush/redisques/action/GetQueueItemAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetQueueItemAction.java @@ -16,19 +16,25 @@ public class GetQueueItemAction extends AbstractQueueAction { - public GetQueueItemAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public GetQueueItemAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, + String queuesPrefix, String consumersPrefix, String locksKey, + List queueConfigurations, QueueStatisticsCollector queueStatisticsCollector, + Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override public void execute(Message event) { String key = queuesPrefix + event.body().getJsonObject(PAYLOAD).getString(QUEUENAME); int index = event.body().getJsonObject(PAYLOAD).getInteger(INDEX); - redisProvider.redis().onSuccess(redisAPI -> - redisAPI.lindex(key, String.valueOf(index), new GetQueueItemHandler(event))) - .onFailure(replyErrorMessageHandler(event)); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> { + redisAPI.lindex(key, String.valueOf(index), new GetQueueItemHandler(event)); + }); + p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } + } diff --git a/src/main/java/org/swisspush/redisques/action/GetQueueItemsAction.java b/src/main/java/org/swisspush/redisques/action/GetQueueItemsAction.java index 737579df..6f44fb53 100644 --- a/src/main/java/org/swisspush/redisques/action/GetQueueItemsAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetQueueItemsAction.java @@ -29,20 +29,25 @@ public void execute(Message event) { String queueName = event.body().getJsonObject(PAYLOAD).getString(QUEUENAME); String keyListRange = queuesPrefix + queueName; int maxQueueItemCountIndex = getMaxQueueItemCountIndex(event.body().getJsonObject(PAYLOAD).getString(LIMIT)); - redisProvider.redis().onSuccess(redisAPI -> redisAPI.llen(keyListRange, countReply -> { - Long queueItemCount = countReply.result().toLong(); - if (countReply.succeeded() && queueItemCount != null) { - redisAPI.lrange(keyListRange, "0", String.valueOf(maxQueueItemCountIndex), - new GetQueueItemsHandler(event, queueItemCount)); - } else { - log.warn("Operation getQueueItems failed. But I'll not notify my caller :)", countReply.cause()); - // IMO we should 'event.fail(countReply.cause())' here. But we don't, to keep backward compatibility. - } - })) - .onFailure(throwable -> { - log.warn("Operation getQueueItems failed. But I'll not notify my caller :)", throwable); - // IMO we should 'event.fail(countReply.cause())' here. But we don't, to keep backward compatibility. - }); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> redisAPI.llen(keyListRange, countReply -> { + Long queueItemCount = countReply.result().toLong(); + if (countReply.succeeded() && queueItemCount != null) { + redisAPI.lrange(keyListRange, "0", String.valueOf(maxQueueItemCountIndex), + new GetQueueItemsHandler(event, queueItemCount)); + } else { + if( countReply.failed() ) { + log.warn("Operation getQueueItems failed. But I'll not notify my caller :)", + countReply.cause()); + // IMO we should 'event.fail(countReply.cause())' here. But we don't, to keep + // backward compatibility. + } + } + })); + p.onFailure(ex -> { + log.warn("Operation getQueueItems failed. But I'll not notify my caller :)", ex); + // IMO we should 'event.fail(countReply.cause())' here. But we don't, to keep backward compatibility. + }); } private int getMaxQueueItemCountIndex(String limit) { @@ -55,9 +60,11 @@ private int getMaxQueueItemCountIndex(String limit) { } log.info("use limit parameter " + maxIndex); } catch (NumberFormatException ex) { - log.warn("Invalid limit parameter '{}' configured for max queue item count. Using default {}", limit, DEFAULT_MAX_QUEUEITEM_COUNT); + log.warn("Invalid limit parameter '{}' configured for max queue item count. Using default {}", + limit, DEFAULT_MAX_QUEUEITEM_COUNT); } } return defaultMaxIndex; } + } diff --git a/src/main/java/org/swisspush/redisques/action/GetQueueItemsCountAction.java b/src/main/java/org/swisspush/redisques/action/GetQueueItemsCountAction.java index 5977587c..04d8ca5b 100644 --- a/src/main/java/org/swisspush/redisques/action/GetQueueItemsCountAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetQueueItemsCountAction.java @@ -19,17 +19,22 @@ */ public class GetQueueItemsCountAction extends AbstractQueueAction { - public GetQueueItemsCountAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public GetQueueItemsCountAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, + String queuesPrefix, String consumersPrefix, String locksKey, + List queueConfigurations, QueueStatisticsCollector queueStatisticsCollector, + Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override public void execute(Message event) { String queue = event.body().getJsonObject(PAYLOAD).getString(QUEUENAME); - redisProvider.redis().onSuccess(redisAPI -> redisAPI.llen(queuesPrefix + queue, new GetQueueItemsCountHandler(event))) - .onFailure(replyErrorMessageHandler(event)); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> redisAPI.llen(queuesPrefix + queue, new GetQueueItemsCountHandler(event))); + p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } + } diff --git a/src/main/java/org/swisspush/redisques/action/GetQueuesAction.java b/src/main/java/org/swisspush/redisques/action/GetQueuesAction.java index 889eaf1a..2fa4d510 100644 --- a/src/main/java/org/swisspush/redisques/action/GetQueuesAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetQueuesAction.java @@ -16,11 +16,14 @@ public class GetQueuesAction extends AbstractQueueAction { - public GetQueuesAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public GetQueuesAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, + String queuesPrefix, String consumersPrefix, String locksKey, + List queueConfigurations, QueueStatisticsCollector queueStatisticsCollector, + Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override @@ -33,10 +36,12 @@ protected void getQueues(Message event, boolean countOnly, Result redisAPI.zrangebyscore( - Arrays.asList(queuesKey, String.valueOf(getMaxAgeTimestamp()), "+inf"), - new GetQueuesHandler(event, filterPatternResult.getOk(), countOnly))) - .onFailure(replyErrorMessageHandler(event)); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> redisAPI.zrangebyscore( + Arrays.asList(queuesKey, String.valueOf(getMaxAgeTimestamp()), "+inf"), + new GetQueuesHandler(event, filterPatternResult.getOk(), countOnly))); + p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } } + } diff --git a/src/main/java/org/swisspush/redisques/action/GetQueuesCountAction.java b/src/main/java/org/swisspush/redisques/action/GetQueuesCountAction.java index 2ad1d1c4..4fbb1056 100644 --- a/src/main/java/org/swisspush/redisques/action/GetQueuesCountAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetQueuesCountAction.java @@ -15,11 +15,14 @@ public class GetQueuesCountAction extends GetQueuesAction { - public GetQueuesCountAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public GetQueuesCountAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, + String queuesPrefix, String consumersPrefix, String locksKey, + List queueConfigurations, QueueStatisticsCollector queueStatisticsCollector, + Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override @@ -38,7 +41,8 @@ public void execute(Message event) { } else { redisProvider.redis().onSuccess(redisAPI -> redisAPI.zcount(queuesKey, String.valueOf(getMaxAgeTimestamp()), String.valueOf(Double.MAX_VALUE), new GetQueuesCountHandler(event))) - .onFailure(replyErrorMessageHandler(event)); + .onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } } + } diff --git a/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java b/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java index 076dbd8c..77637b6b 100644 --- a/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetQueuesItemsCountAction.java @@ -36,7 +36,8 @@ public void execute(Message event) { String.valueOf(getMaxAgeTimestamp()), "+inf"), new GetQueuesItemsCountHandler(event, filterPattern.getOk(), queuesPrefix, redisProvider))) - .onFailure(replyErrorMessageHandler(event)); + .onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } } + } diff --git a/src/main/java/org/swisspush/redisques/action/GetQueuesSpeedAction.java b/src/main/java/org/swisspush/redisques/action/GetQueuesSpeedAction.java index fabf8c80..4cbe70ea 100644 --- a/src/main/java/org/swisspush/redisques/action/GetQueuesSpeedAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetQueuesSpeedAction.java @@ -18,11 +18,14 @@ */ public class GetQueuesSpeedAction extends AbstractQueueAction { - public GetQueuesSpeedAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public GetQueuesSpeedAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, + String queuesPrefix, String consumersPrefix, String locksKey, + List queueConfigurations, QueueStatisticsCollector queueStatisticsCollector, + Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override @@ -47,7 +50,7 @@ private void getQueuesSpeed(Message event, new GetQueuesSpeedHandler(event, filterPattern.getOk(), queueStatisticsCollector)); }) - .onFailure(replyErrorMessageHandler(event)); + .onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } } diff --git a/src/main/java/org/swisspush/redisques/action/GetQueuesStatisticsAction.java b/src/main/java/org/swisspush/redisques/action/GetQueuesStatisticsAction.java index 561bcf40..99ada01a 100644 --- a/src/main/java/org/swisspush/redisques/action/GetQueuesStatisticsAction.java +++ b/src/main/java/org/swisspush/redisques/action/GetQueuesStatisticsAction.java @@ -18,11 +18,14 @@ */ public class GetQueuesStatisticsAction extends AbstractQueueAction { - public GetQueuesStatisticsAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public GetQueuesStatisticsAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, + String queuesPrefix, String consumersPrefix, String locksKey, + List queueConfigurations, QueueStatisticsCollector queueStatisticsCollector, + Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override @@ -47,8 +50,7 @@ private void getQueuesStatistics(Message event, new GetQueuesStatisticsHandler(event, filterPattern.getOk(), queueStatisticsCollector)); }) - .onFailure(replyErrorMessageHandler(event)); - + .onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } } diff --git a/src/main/java/org/swisspush/redisques/action/LockedEnqueueAction.java b/src/main/java/org/swisspush/redisques/action/LockedEnqueueAction.java index 53fe650a..f7fc06a4 100644 --- a/src/main/java/org/swisspush/redisques/action/LockedEnqueueAction.java +++ b/src/main/java/org/swisspush/redisques/action/LockedEnqueueAction.java @@ -36,20 +36,22 @@ public void execute(Message event) { } JsonObject lockInfo = extractLockInfo(event.body().getJsonObject(PAYLOAD).getString(REQUESTED_BY)); if (lockInfo != null) { - redisProvider.redis().onSuccess(redisAPI -> redisAPI.hmset(Arrays.asList(locksKey, queueName, lockInfo.encode()), - putLockResult -> { - if (putLockResult.succeeded()) { - log.debug("RedisQues lockedEnqueue locking successful, now going to enqueue"); - enqueueActionExecute(event); - } else { - log.warn("RedisQues lockedEnqueue locking failed. Skip enqueue"); - event.reply(createErrorReply()); - } - })) - .onFailure(throwable -> { - log.warn("Redis: RedisQues lockedEnqueue locking failed. Skip enqueue"); - event.reply(createErrorReply()); - }); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> redisAPI.hmset(Arrays.asList(locksKey, queueName, lockInfo.encode()), putLockResult -> { + if (putLockResult.succeeded()) { + log.debug("RedisQues lockedEnqueue locking successful, now going to enqueue"); + enqueueActionExecute(event); + } else { + log.warn("RedisQues lockedEnqueue locking failed. Skip enqueue", + new Exception(putLockResult.cause())); + event.reply(createErrorReply()); + } + })); + p.onFailure(ex -> { + log.warn("Redis: RedisQues lockedEnqueue locking failed. Skip enqueue", + new Exception(ex)); + event.reply(createErrorReply()); + }); } else { log.warn("RedisQues lockedEnqueue failed because property '{}' was missing", REQUESTED_BY); event.reply(createErrorReply().put(MESSAGE, "Property '" + REQUESTED_BY + "' missing")); diff --git a/src/main/java/org/swisspush/redisques/action/PutLockAction.java b/src/main/java/org/swisspush/redisques/action/PutLockAction.java index 5b8ac78c..87311908 100644 --- a/src/main/java/org/swisspush/redisques/action/PutLockAction.java +++ b/src/main/java/org/swisspush/redisques/action/PutLockAction.java @@ -16,11 +16,14 @@ public class PutLockAction extends AbstractQueueAction { - public PutLockAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public PutLockAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, + String queuesPrefix, String consumersPrefix, String locksKey, + List queueConfigurations, + QueueStatisticsCollector queueStatisticsCollector, Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, + queueConfigurations, queueStatisticsCollector, log); } @Override @@ -32,9 +35,11 @@ public void execute(Message event) { event.reply(createErrorReply().put(ERROR_TYPE, BAD_INPUT).put(MESSAGE, "Lock must be a string value")); return; } - redisProvider.redis().onSuccess(redisAPI -> - redisAPI.hmset(buildLocksItems(locksKey, lockNames, lockInfo), new PutLockHandler(event))) - .onFailure(replyErrorMessageHandler(event)); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> { + redisAPI.hmset(buildLocksItems(locksKey, lockNames, lockInfo), new PutLockHandler(event)); + }); + p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } else { event.reply(createErrorReply().put(MESSAGE, "Property '" + REQUESTED_BY + "' missing")); } diff --git a/src/main/java/org/swisspush/redisques/action/ReplaceQueueItemAction.java b/src/main/java/org/swisspush/redisques/action/ReplaceQueueItemAction.java index e5f80546..ebf81a08 100644 --- a/src/main/java/org/swisspush/redisques/action/ReplaceQueueItemAction.java +++ b/src/main/java/org/swisspush/redisques/action/ReplaceQueueItemAction.java @@ -15,11 +15,14 @@ public class ReplaceQueueItemAction extends AbstractQueueAction { - public ReplaceQueueItemAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix, - String consumersPrefix, String locksKey, List queueConfigurations, - QueueStatisticsCollector queueStatisticsCollector, Logger log) { - super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations, - queueStatisticsCollector, log); + public ReplaceQueueItemAction( + Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, + String queuesPrefix, String consumersPrefix, String locksKey, + List queueConfigurations, + QueueStatisticsCollector queueStatisticsCollector, Logger log + ) { + super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, + locksKey, queueConfigurations, queueStatisticsCollector, log); } @Override @@ -27,8 +30,10 @@ public void execute(Message event) { String keyReplaceItem = queuesPrefix + event.body().getJsonObject(PAYLOAD).getString(QUEUENAME); int indexReplaceItem = event.body().getJsonObject(PAYLOAD).getInteger(INDEX); String bufferReplaceItem = event.body().getJsonObject(PAYLOAD).getString(BUFFER); - redisProvider.redis().onSuccess(redisAPI -> redisAPI.lset(keyReplaceItem, - String.valueOf(indexReplaceItem), bufferReplaceItem, new ReplaceQueueItemHandler(event))) - .onFailure(replyErrorMessageHandler(event)); + var p = redisProvider.redis(); + p.onSuccess(redisAPI -> redisAPI.lset(keyReplaceItem, String.valueOf(indexReplaceItem), + bufferReplaceItem, new ReplaceQueueItemHandler(event))); + p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex)); } + } diff --git a/src/main/java/org/swisspush/redisques/handler/AddQueueItemHandler.java b/src/main/java/org/swisspush/redisques/handler/AddQueueItemHandler.java index c8f20a93..74735582 100644 --- a/src/main/java/org/swisspush/redisques/handler/AddQueueItemHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/AddQueueItemHandler.java @@ -5,8 +5,12 @@ import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Response; +import org.slf4j.Logger; -import static org.swisspush.redisques.util.RedisquesAPI.*; +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; /** * Class AddQueueItemHandler. @@ -14,6 +18,8 @@ * @author baldim, https://github.com/mcweba [Marc-Andre Weber] */ public class AddQueueItemHandler implements Handler> { + + private static final Logger log = getLogger(AddQueueItemHandler.class); private final Message event; public AddQueueItemHandler(Message event) { @@ -25,6 +31,7 @@ public void handle(AsyncResult reply) { if(reply.succeeded()){ event.reply(new JsonObject().put(STATUS, OK)); } else { + log.warn("Concealed error", new Exception(reply.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); } } diff --git a/src/main/java/org/swisspush/redisques/handler/DeleteLockHandler.java b/src/main/java/org/swisspush/redisques/handler/DeleteLockHandler.java index 6ad985ed..7dbf9f71 100644 --- a/src/main/java/org/swisspush/redisques/handler/DeleteLockHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/DeleteLockHandler.java @@ -1,31 +1,38 @@ -package org.swisspush.redisques.handler; - -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.eventbus.Message; -import io.vertx.core.json.JsonObject; -import io.vertx.redis.client.Response; - -import static org.swisspush.redisques.util.RedisquesAPI.*; - -/** - * Class DeleteLockHandler. - * - * @author baldim - */ -public class DeleteLockHandler implements Handler> { - private final Message event; - - public DeleteLockHandler(Message event) { - this.event = event; - } - - @Override - public void handle(AsyncResult reply) { - if (reply.succeeded()) { - event.reply(new JsonObject().put(STATUS, OK)); - } else { - event.reply(new JsonObject().put(STATUS, ERROR)); - } - } -} +package org.swisspush.redisques.handler; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import io.vertx.redis.client.Response; +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; + +/** + * Class DeleteLockHandler. + * + * @author baldim + */ +public class DeleteLockHandler implements Handler> { + + private static final Logger log = getLogger(DeleteLockHandler.class); + private final Message event; + + public DeleteLockHandler(Message event) { + this.event = event; + } + + @Override + public void handle(AsyncResult reply) { + if (reply.succeeded()) { + event.reply(new JsonObject().put(STATUS, OK)); + } else { + log.warn("Concealed error", new Exception(reply.cause())); + event.reply(new JsonObject().put(STATUS, ERROR)); + } + } +} diff --git a/src/main/java/org/swisspush/redisques/handler/GetAllLocksHandler.java b/src/main/java/org/swisspush/redisques/handler/GetAllLocksHandler.java index c1dbbe4f..7c49c9cb 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetAllLocksHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetAllLocksHandler.java @@ -1,44 +1,52 @@ -package org.swisspush.redisques.handler; - -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.eventbus.Message; -import io.vertx.core.json.JsonArray; -import io.vertx.core.json.JsonObject; -import io.vertx.redis.client.Response; -import org.swisspush.redisques.util.HandlerUtil; - -import java.util.List; -import java.util.Optional; -import java.util.regex.Pattern; - -import static org.swisspush.redisques.util.RedisquesAPI.*; - -/** - * Class GetAllLocksHandler. - * - * @author baldim - */ -public class GetAllLocksHandler implements Handler> { - - private final Message event; - private final Optional filterPattern; - - public GetAllLocksHandler(Message event, Optional filterPattern) { - this.event = event; - this.filterPattern = filterPattern; - } - - @Override - public void handle(AsyncResult reply) { - if (reply.succeeded() && reply.result() != null) { - JsonObject result = new JsonObject(); - Response locks = reply.result(); - List filteredLocks = HandlerUtil.filterByPattern(locks, filterPattern); - result.put("locks", new JsonArray(filteredLocks)); - event.reply(new JsonObject().put(STATUS, OK).put(VALUE, result)); - } else { - event.reply(new JsonObject().put(STATUS, ERROR)); - } - } +package org.swisspush.redisques.handler; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import io.vertx.redis.client.Response; +import org.slf4j.Logger; +import org.swisspush.redisques.util.HandlerUtil; + +import java.util.List; +import java.util.Optional; +import java.util.regex.Pattern; + +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static org.swisspush.redisques.util.RedisquesAPI.VALUE; + +/** + * Class GetAllLocksHandler. + * + * @author baldim + */ +public class GetAllLocksHandler implements Handler> { + + private static final Logger log = getLogger(GetAllLocksHandler.class); + private final Message event; + private final Optional filterPattern; + + public GetAllLocksHandler(Message event, Optional filterPattern) { + this.event = event; + this.filterPattern = filterPattern; + } + + @Override + public void handle(AsyncResult reply) { + if (reply.succeeded() && reply.result() != null) { + JsonObject result = new JsonObject(); + Response locks = reply.result(); + List filteredLocks = HandlerUtil.filterByPattern(locks, filterPattern); + result.put("locks", new JsonArray(filteredLocks)); + event.reply(new JsonObject().put(STATUS, OK).put(VALUE, result)); + } else { + if( reply.failed() ) log.warn("Concealed error", new Exception(reply.cause())); + event.reply(new JsonObject().put(STATUS, ERROR)); + } + } + } \ No newline at end of file diff --git a/src/main/java/org/swisspush/redisques/handler/GetLockHandler.java b/src/main/java/org/swisspush/redisques/handler/GetLockHandler.java index 27d9bd6f..d40b6734 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetLockHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetLockHandler.java @@ -1,35 +1,45 @@ -package org.swisspush.redisques.handler; - -import io.vertx.core.AsyncResult; -import io.vertx.core.Handler; -import io.vertx.core.eventbus.Message; -import io.vertx.core.json.JsonObject; -import io.vertx.redis.client.Response; - -import static org.swisspush.redisques.util.RedisquesAPI.*; - -/** - * Class GetLockHandler. - * - * @author baldim - */ -public class GetLockHandler implements Handler> { - private final Message event; - - public GetLockHandler(Message event) { - this.event = event; - } - - @Override - public void handle(AsyncResult reply) { - if (reply.succeeded()) { - if (reply.result() != null) { - event.reply(new JsonObject().put(STATUS, OK).put(VALUE, reply.result().toString())); - } else { - event.reply(new JsonObject().put(STATUS, NO_SUCH_LOCK)); - } - } else { - event.reply(new JsonObject().put(STATUS, ERROR)); - } - } -} +package org.swisspush.redisques.handler; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import io.vertx.redis.client.Response; +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.NO_SUCH_LOCK; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static org.swisspush.redisques.util.RedisquesAPI.VALUE; + +/** + * Class GetLockHandler. + * + * @author baldim + */ +public class GetLockHandler implements Handler> { + + private static final Logger log = getLogger(GetLockHandler.class); + private final Message event; + + public GetLockHandler(Message event) { + this.event = event; + } + + @Override + public void handle(AsyncResult reply) { + if (reply.succeeded()) { + if (reply.result() != null) { + event.reply(new JsonObject().put(STATUS, OK).put(VALUE, reply.result().toString())); + } else { + event.reply(new JsonObject().put(STATUS, NO_SUCH_LOCK)); + } + } else { + log.warn("Concealed error", new Exception(reply.cause())); + event.reply(new JsonObject().put(STATUS, ERROR)); + } + } + +} diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueueItemHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueueItemHandler.java index 8f82b558..57c7915f 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueueItemHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueueItemHandler.java @@ -1,11 +1,17 @@ package org.swisspush.redisques.handler; import io.vertx.core.AsyncResult; -import static org.swisspush.redisques.util.RedisquesAPI.*; import io.vertx.core.Handler; import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Response; +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static org.swisspush.redisques.util.RedisquesAPI.VALUE; /** * Class GetQueueItemHandler. @@ -13,6 +19,8 @@ * @author baldim, https://github.com/mcweba [Marc-Andre Weber] */ public class GetQueueItemHandler implements Handler> { + + private static final Logger log = getLogger(GetQueueItemHandler.class); private final Message event; public GetQueueItemHandler(Message event) { @@ -24,7 +32,9 @@ public void handle(AsyncResult reply) { if (reply.succeeded() && reply.result() != null) { event.reply(new JsonObject().put(STATUS, OK).put(VALUE, reply.result().toString())); } else { + if( reply.failed() ) log.warn("Concealed error", new Exception(reply.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); } } + } diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueueItemsCountHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueueItemsCountHandler.java index 0a292f80..a63fa62a 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueueItemsCountHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueueItemsCountHandler.java @@ -5,8 +5,13 @@ import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Response; +import org.slf4j.Logger; -import static org.swisspush.redisques.util.RedisquesAPI.*; +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static org.swisspush.redisques.util.RedisquesAPI.VALUE; /** * Class GetQueueItemsCountHandler. @@ -14,6 +19,8 @@ * @author https://github.com/mcweba [Marc-Andre Weber] */ public class GetQueueItemsCountHandler implements Handler> { + + private static final Logger log = getLogger(GetQueueItemsCountHandler.class); private final Message event; public GetQueueItemsCountHandler(Message event) { @@ -26,6 +33,7 @@ public void handle(AsyncResult reply) { Long queueItemCount = reply.result().toLong(); event.reply(new JsonObject().put(STATUS, OK).put(VALUE, queueItemCount)); } else { + log.warn("Concealed error", new Exception(reply.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); } } diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueueItemsHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueueItemsHandler.java index 4eb4b61d..770adbd7 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueueItemsHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueueItemsHandler.java @@ -1,14 +1,19 @@ package org.swisspush.redisques.handler; import io.vertx.core.AsyncResult; -import io.vertx.core.json.JsonArray; - -import static org.swisspush.redisques.util.RedisquesAPI.*; - import io.vertx.core.Handler; import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Response; +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.INFO; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static org.swisspush.redisques.util.RedisquesAPI.VALUE; /** * Class GetQueueItemsHandler. @@ -16,6 +21,8 @@ * @author baldim, https://github.com/mcweba [Marc-Andre Weber] */ public class GetQueueItemsHandler implements Handler> { + + private static final Logger log = getLogger(GetQueueItemsHandler.class); private final Message event; private final Long queueItemCount; @@ -39,7 +46,9 @@ public void handle(AsyncResult reply) { } event.reply(new JsonObject().put(STATUS, OK).put(VALUE, values).put(INFO, countInfo)); } else { + log.warn("Concealed error", new Exception(reply.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); } } + } diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueuesCountHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueuesCountHandler.java index d6737fec..2b241f41 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueuesCountHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueuesCountHandler.java @@ -5,8 +5,13 @@ import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Response; +import org.slf4j.Logger; -import static org.swisspush.redisques.util.RedisquesAPI.*; +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static org.swisspush.redisques.util.RedisquesAPI.VALUE; /** * Class GetQueuesCountHandler. @@ -14,6 +19,8 @@ * @author https://github.com/mcweba [Marc-Andre Weber] */ public class GetQueuesCountHandler implements Handler> { + + private static final Logger log = getLogger(GetQueuesCountHandler.class); private final Message event; public GetQueuesCountHandler(Message event) { @@ -26,7 +33,9 @@ public void handle(AsyncResult reply) { Long queueCount = reply.result().toLong(); event.reply(new JsonObject().put(STATUS, OK).put(VALUE, queueCount)); } else { + log.warn("Concealed error", new Exception(reply.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); } } + } diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueuesHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueuesHandler.java index 1cf75857..3a794a83 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueuesHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueuesHandler.java @@ -6,19 +6,26 @@ import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Response; +import org.slf4j.Logger; import org.swisspush.redisques.util.HandlerUtil; import java.util.List; import java.util.Optional; import java.util.regex.Pattern; -import static org.swisspush.redisques.util.RedisquesAPI.*; +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.QUEUES; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static org.swisspush.redisques.util.RedisquesAPI.VALUE; /** * @author https://github.com/mcweba [Marc-Andre Weber] */ public class GetQueuesHandler implements Handler> { + private static final Logger log = getLogger(GetQueuesHandler.class); private final Message event; private final Optional filterPattern; private final boolean countOnly; @@ -42,6 +49,7 @@ public void handle(AsyncResult reply) { event.reply(new JsonObject().put(STATUS, OK).put(VALUE, jsonRes)); } } else { + log.warn("Concealed error", new Exception(reply.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); } } diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java index 9997bbdf..6f51dd1e 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueuesItemsCountHandler.java @@ -39,7 +39,8 @@ public GetQueuesItemsCountHandler( Message event, Optional filterPattern, String queuesPrefix, - RedisProvider redisProvider) { + RedisProvider redisProvider + ) { this.event = event; this.filterPattern = filterPattern; this.queuesPrefix = queuesPrefix; @@ -57,17 +58,18 @@ public void handle(AsyncResult handleQueues) { return; } - redisProvider.connection().onSuccess(conn -> { - List responses = queues.stream().map(queue -> conn.send(Request.cmd(Command.LLEN, queuesPrefix + queue)) - ).collect(Collectors.toList()); - CompositeFuture.all(responses).onFailure(throwable -> { - log.error("Unexepected queue length result"); + List responses = queues.stream() + .map(queue -> conn.send(Request.cmd(Command.LLEN, queuesPrefix + queue))) + .collect(Collectors.toList()); + CompositeFuture.all(responses).onFailure(ex -> { + log.error("Unexpected queue length result", new Exception(ex)); event.reply(new JsonObject().put(STATUS, ERROR)); }).onSuccess(compositeFuture -> { List queueLengthList = compositeFuture.list(); if (queueLengthList == null) { - log.error("Unexepected queue length result null"); + log.error("Unexpected queue length result null", + new Exception(compositeFuture.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); return; } @@ -87,12 +89,13 @@ public void handle(AsyncResult handleQueues) { event.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK) .put(QUEUES, result)); }); - }).onFailure(throwable -> { - log.warn("Redis: Failed to get queue length.", throwable); + }).onFailure(ex -> { + log.warn("Redis: Failed to get queue length.", new Exception(ex)); event.reply(new JsonObject().put(STATUS, ERROR)); }); } else { + log.warn("Concealed error", new Exception(handleQueues.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); } } diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueuesSpeedHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueuesSpeedHandler.java index 233fa9b4..faa7fb8e 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueuesSpeedHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueuesSpeedHandler.java @@ -1,18 +1,21 @@ package org.swisspush.redisques.handler; -import static org.swisspush.redisques.util.RedisquesAPI.ERROR; -import static org.swisspush.redisques.util.RedisquesAPI.STATUS; - import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.redisques.util.HandlerUtil; +import org.swisspush.redisques.util.QueueStatisticsCollector; + import java.util.List; import java.util.Optional; import java.util.regex.Pattern; -import org.swisspush.redisques.util.HandlerUtil; -import org.swisspush.redisques.util.QueueStatisticsCollector; + +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; /** * Retrieves in it's AsyncResult handler the speed summary of the queues matching the given filter @@ -20,13 +23,16 @@ */ public class GetQueuesSpeedHandler implements Handler> { + private static final Logger log = LoggerFactory.getLogger(GetQueuesSpeedHandler.class); private final Message event; private final Optional filterPattern; private final QueueStatisticsCollector queueStatisticsCollector; - public GetQueuesSpeedHandler(Message event, - Optional filterPattern, - QueueStatisticsCollector queueStatisticsCollector) { + public GetQueuesSpeedHandler( + Message event, + Optional filterPattern, + QueueStatisticsCollector queueStatisticsCollector + ) { this.event = event; this.filterPattern = filterPattern; this.queueStatisticsCollector = queueStatisticsCollector; @@ -40,6 +46,7 @@ public void handle(AsyncResult handleQueues) { .filterByPattern(handleQueues.result(), filterPattern); queueStatisticsCollector.getQueuesSpeed(event, queues); } else { + log.warn("Concealed error", new Exception(handleQueues.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); } } diff --git a/src/main/java/org/swisspush/redisques/handler/GetQueuesStatisticsHandler.java b/src/main/java/org/swisspush/redisques/handler/GetQueuesStatisticsHandler.java index 91f96794..832170f7 100644 --- a/src/main/java/org/swisspush/redisques/handler/GetQueuesStatisticsHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/GetQueuesStatisticsHandler.java @@ -1,18 +1,21 @@ package org.swisspush.redisques.handler; -import static org.swisspush.redisques.util.RedisquesAPI.ERROR; -import static org.swisspush.redisques.util.RedisquesAPI.STATUS; - import io.vertx.core.AsyncResult; import io.vertx.core.Handler; import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Response; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.swisspush.redisques.util.HandlerUtil; +import org.swisspush.redisques.util.QueueStatisticsCollector; + import java.util.List; import java.util.Optional; import java.util.regex.Pattern; -import org.swisspush.redisques.util.HandlerUtil; -import org.swisspush.redisques.util.QueueStatisticsCollector; + +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; /** * Retrieves in it's AsyncResult handler for all given queue names the queue statistics information @@ -20,13 +23,16 @@ */ public class GetQueuesStatisticsHandler implements Handler> { + private static final Logger log = LoggerFactory.getLogger(GetQueuesStatisticsHandler.class); private final Message event; private final Optional filterPattern; private final QueueStatisticsCollector queueStatisticsCollector; - public GetQueuesStatisticsHandler(Message event, - Optional filterPattern, - QueueStatisticsCollector queueStatisticsCollector) { + public GetQueuesStatisticsHandler( + Message event, + Optional filterPattern, + QueueStatisticsCollector queueStatisticsCollector + ) { this.event = event; this.filterPattern = filterPattern; this.queueStatisticsCollector = queueStatisticsCollector; @@ -36,10 +42,17 @@ public GetQueuesStatisticsHandler(Message event, public void handle(AsyncResult handleQueues) { if (handleQueues.succeeded()) { List queues = HandlerUtil - .filterByPattern(handleQueues.result(), filterPattern); - queueStatisticsCollector.getQueueStatistics(event, queues); + .filterByPattern(handleQueues.result(), filterPattern); + queueStatisticsCollector.getQueueStatistics(queues) + .onFailure(ex -> { + log.error("", ex); + event.reply(new JsonObject().put(STATUS, ERROR)); + }) + .onSuccess(event::reply); } else { + log.warn("Concealed error", new Exception(handleQueues.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); } } + } diff --git a/src/main/java/org/swisspush/redisques/handler/PutLockHandler.java b/src/main/java/org/swisspush/redisques/handler/PutLockHandler.java index 9247b1f5..0f733421 100644 --- a/src/main/java/org/swisspush/redisques/handler/PutLockHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/PutLockHandler.java @@ -1,30 +1,39 @@ -package org.swisspush.redisques.handler; - -import io.vertx.core.AsyncResult; -import static org.swisspush.redisques.util.RedisquesAPI.*; -import io.vertx.core.Handler; -import io.vertx.core.eventbus.Message; -import io.vertx.core.json.JsonObject; -import io.vertx.redis.client.Response; - -/** - * Class PutLock. - * - * @author baldim - */ -public class PutLockHandler implements Handler> { - private final Message event; - - public PutLockHandler(Message event) { - this.event = event; - } - - @Override - public void handle(AsyncResult reply) { - if(reply.succeeded()){ - event.reply(new JsonObject().put(STATUS, OK)); - } else { - event.reply(new JsonObject().put(STATUS, ERROR)); - } - } -} +package org.swisspush.redisques.handler; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Handler; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonObject; +import io.vertx.redis.client.Response; +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; + +/** + * Class PutLock. + * + * @author baldim + */ +public class PutLockHandler implements Handler> { + + private static final Logger log = getLogger(PutLockHandler.class); + private final Message event; + + public PutLockHandler(Message event) { + this.event = event; + } + + @Override + public void handle(AsyncResult reply) { + if(reply.succeeded()){ + event.reply(new JsonObject().put(STATUS, OK)); + } else { + log.warn("Concealed error", new Exception(reply.cause())); + event.reply(new JsonObject().put(STATUS, ERROR)); + } + } + +} diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index ee2e453f..410e21cb 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -2,7 +2,9 @@ import io.netty.util.internal.StringUtil; import io.vertx.core.AsyncResult; +import io.vertx.core.Future; import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.EventBus; @@ -10,6 +12,7 @@ import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.auth.authentication.AuthenticationProvider; @@ -18,18 +21,69 @@ import io.vertx.ext.web.handler.BasicAuthHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.swisspush.redisques.util.DequeueStatistic; +import org.swisspush.redisques.util.QueueStatisticsCollector; import org.swisspush.redisques.util.RedisquesAPI; import org.swisspush.redisques.util.RedisquesConfiguration; import org.swisspush.redisques.util.Result; import org.swisspush.redisques.util.StatusCode; import java.nio.charset.StandardCharsets; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import static org.swisspush.redisques.util.HttpServerRequestUtil.*; -import static org.swisspush.redisques.util.RedisquesAPI.*; +import static org.swisspush.redisques.util.HttpServerRequestUtil.decode; +import static org.swisspush.redisques.util.HttpServerRequestUtil.encodePayload; +import static org.swisspush.redisques.util.HttpServerRequestUtil.evaluateUrlParameterToBeEmptyOrTrue; +import static org.swisspush.redisques.util.HttpServerRequestUtil.extractNonEmptyJsonArrayFromBody; +import static org.swisspush.redisques.util.RedisquesAPI.BAD_INPUT; +import static org.swisspush.redisques.util.RedisquesAPI.COUNT; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR_TYPE; +import static org.swisspush.redisques.util.RedisquesAPI.FILTER; +import static org.swisspush.redisques.util.RedisquesAPI.LIMIT; +import static org.swisspush.redisques.util.RedisquesAPI.LOCKS; +import static org.swisspush.redisques.util.RedisquesAPI.MEMORY_FULL; +import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE; +import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME; +import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_SIZE; +import static org.swisspush.redisques.util.RedisquesAPI.NO_SUCH_LOCK; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.QUEUES; +import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_DEQUEUESTATISTIC; +import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT; +import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS; +import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static org.swisspush.redisques.util.RedisquesAPI.VALUE; +import static org.swisspush.redisques.util.RedisquesAPI.buildAddQueueItemOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildBulkDeleteLocksOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildBulkDeleteQueuesOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildBulkPutLocksOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteAllLocksOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteAllQueueItemsOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteLockOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteQueueItemOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildEnqueueOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetAllLocksOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetConfigurationOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetLockOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemsCountOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemsOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesCountOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesItemsCountOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesSpeedOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesStatisticsOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildLockedEnqueueOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildPutLockOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildReplaceQueueItemOperation; +import static org.swisspush.redisques.util.RedisquesAPI.buildSetConfigurationOperation; /** * Handler class for HTTP requests providing access to Redisques over HTTP. @@ -53,16 +107,19 @@ public class RedisquesHttpRequestHandler implements Handler { private static final String EMPTY_QUEUES_PARAM = "emptyQueues"; private static final String DELETED = "deleted"; + private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd.MM.yyyy hh:mm:ss"); + private final String redisquesAddress; private final String userHeader; private final boolean enableQueueNameDecoding; private final int queueSpeedIntervalSec; + private final QueueStatisticsCollector queueStatisticsCollector; - public static void init(Vertx vertx, RedisquesConfiguration modConfig) { + public static void init(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) { log.info("Enable http request handler: " + modConfig.getHttpRequestHandlerEnabled()); if (modConfig.getHttpRequestHandlerEnabled()) { if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) { - RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig); + RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig, queueStatisticsCollector); // in Vert.x 2x 100-continues was activated per default, in vert.x 3x it is off per default. HttpServerOptions options = new HttpServerOptions().setHandle100ContinueAutomatically(true); vertx.createHttpServer(options).requestHandler(handler).listen(modConfig.getHttpRequestHandlerPort(), result -> { @@ -91,13 +148,14 @@ private Result checkHttpAuthenticationConfiguration(RedisquesCo return Result.ok(false); } - private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfig) { + private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) { this.router = Router.router(vertx); this.eventBus = vertx.eventBus(); this.redisquesAddress = modConfig.getAddress(); this.userHeader = modConfig.getHttpRequestHandlerUserHeader(); this.enableQueueNameDecoding = modConfig.getEnableQueueNameDecoding(); this.queueSpeedIntervalSec = modConfig.getQueueSpeedIntervalSec(); + this.queueStatisticsCollector = queueStatisticsCollector; final String prefix = modConfig.getHttpRequestHandlerPrefix(); @@ -504,9 +562,13 @@ private void getMonitorInformation(RoutingContext ctx) { if (limit > 0) { queuesList = limitJsonQueueArray(queuesList, limit); } - JsonObject resultObject = new JsonObject(); - resultObject.put(QUEUES, queuesList); - jsonResponse(ctx.response(), resultObject); + + // this function always succeeds, no need to handle the error case + fillStatisticToQueuesList(queuesList).onSuccess(updatedQueuesList -> { + JsonObject resultObject = new JsonObject(); + resultObject.put(QUEUES, updatedQueuesList); + jsonResponse(ctx.response(), resultObject); + }); } else { // there was no result, we as well return an empty result JsonObject resultObject = new JsonObject(); @@ -515,12 +577,60 @@ private void getMonitorInformation(RoutingContext ctx) { } } else { String error = "Error gathering names of active queues"; - log.error(error); + log.error(error, reply.cause()); respondWith(StatusCode.INTERNAL_SERVER_ERROR, error, ctx.request()); } }); } + private Future> fillStatisticToQueuesList(List queuesList) { + Promise> promise = Promise.promise(); + List queueNameList = new ArrayList<>(); + for (JsonObject jsonObject : queuesList) { + queueNameList.add(jsonObject.getString(MONITOR_QUEUE_NAME)); + } + + queueStatisticsCollector.getQueueStatistics(queueNameList) + .onFailure(ex -> { + log.error("Failed to fetch QueueStatistics for queue", ex); + promise.complete(queuesList); + }) + .onSuccess(queueStatisticsJsonObject -> { + if (OK.equals(queueStatisticsJsonObject.getString(STATUS)) + && !queueStatisticsJsonObject.getJsonArray(QUEUES).isEmpty()) { + JsonArray queueStatisticsArray = queueStatisticsJsonObject.getJsonArray(QUEUES); + queuesList.forEach(entries -> { + String queueName = entries.getString(MONITOR_QUEUE_NAME); + entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT, ""); + entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS, ""); + entries.put(STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS, ""); + queueStatisticsJsonObject.getJsonArray(QUEUES); + + for (Iterator it = queueStatisticsArray.stream().iterator(); it.hasNext(); ) { + JsonObject queueStatistic = (JsonObject) it.next(); + if (queueName.equals(queueStatistic.getString(MONITOR_QUEUE_NAME)) + && queueStatistic.containsKey(STATISTIC_QUEUE_DEQUEUESTATISTIC) + && queueStatistic.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC) != null) { + DequeueStatistic dequeueStatistic = queueStatistic.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC).mapTo(DequeueStatistic.class); + if (dequeueStatistic.lastDequeueAttemptTimestamp != null) { + entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueAttemptTimestamp))); + } + if (dequeueStatistic.lastDequeueSuccessTimestamp != null) { + entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueSuccessTimestamp))); + } + if (dequeueStatistic.nextDequeueDueTimestamp != null) { + entries.put(STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS, DATE_FORMAT.format(new Date(dequeueStatistic.nextDequeueDueTimestamp))); + } + break; + } + } + }); + } + promise.complete(queuesList); + }); + return promise.future(); + } + private void listOrCountQueues(RoutingContext ctx) { if (evaluateUrlParameterToBeEmptyOrTrue(COUNT, ctx.request())) { @@ -533,6 +643,7 @@ private void listOrCountQueues(RoutingContext ctx) { private void getQueuesCount(RoutingContext ctx) { String filter = ctx.request().params().get(FILTER); eventBus.request(redisquesAddress, buildGetQueuesCountOperation(filter), (Handler>>) reply -> { + if( reply.failed() ) log.warn("TODO error handling", reply.cause()); if (reply.succeeded() && OK.equals(reply.result().body().getString(STATUS))) { JsonObject result = new JsonObject(); result.put(COUNT, reply.result().body().getLong(VALUE)); @@ -552,6 +663,7 @@ private void getQueuesCount(RoutingContext ctx) { private void listQueues(RoutingContext ctx) { String filter = ctx.request().params().get(FILTER); eventBus.request(redisquesAddress, buildGetQueuesOperation(filter), (Handler>>) reply -> { + if( reply.failed() ) log.warn("TODO error handling", reply.cause()); if (reply.succeeded() && OK.equals(reply.result().body().getString(STATUS))) { jsonResponse(ctx.response(), reply.result().body().getJsonObject(VALUE)); } else { diff --git a/src/main/java/org/swisspush/redisques/handler/ReplaceQueueItemHandler.java b/src/main/java/org/swisspush/redisques/handler/ReplaceQueueItemHandler.java index 73307bba..f789db43 100644 --- a/src/main/java/org/swisspush/redisques/handler/ReplaceQueueItemHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/ReplaceQueueItemHandler.java @@ -1,11 +1,16 @@ package org.swisspush.redisques.handler; import io.vertx.core.AsyncResult; -import static org.swisspush.redisques.util.RedisquesAPI.*; import io.vertx.core.Handler; import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Response; +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.ERROR; +import static org.swisspush.redisques.util.RedisquesAPI.OK; +import static org.swisspush.redisques.util.RedisquesAPI.STATUS; /** * Class ReplaceQueueItemHandler. @@ -13,6 +18,8 @@ * @author baldim, https://github.com/mcweba [Marc-Andre Weber] */ public class ReplaceQueueItemHandler implements Handler> { + + private static final Logger log = getLogger(ReplaceQueueItemHandler.class); private final Message event; public ReplaceQueueItemHandler(Message event) { @@ -24,7 +31,9 @@ public void handle(AsyncResult reply) { if(reply.succeeded()){ event.reply(new JsonObject().put(STATUS, OK)); } else { + log.warn("Concealed error", new Exception(reply.cause())); event.reply(new JsonObject().put(STATUS, ERROR)); } } + } diff --git a/src/main/java/org/swisspush/redisques/util/DefaultMemoryUsageProvider.java b/src/main/java/org/swisspush/redisques/util/DefaultMemoryUsageProvider.java index 1fb7c572..f6664e94 100644 --- a/src/main/java/org/swisspush/redisques/util/DefaultMemoryUsageProvider.java +++ b/src/main/java/org/swisspush/redisques/util/DefaultMemoryUsageProvider.java @@ -29,7 +29,9 @@ public Optional currentMemoryUsagePercentage() { } private void updateCurrentMemoryUsage() { - redisProvider.redis().onSuccess(redisAPI -> redisAPI.info(Collections.singletonList("memory")) + redisProvider.redis() + .onFailure( ex -> log.warn("TODO error handling", ex)) + .onSuccess(redisAPI -> redisAPI.info(Collections.singletonList("memory")) .onComplete(memoryInfoEvent -> { if (memoryInfoEvent.failed()) { log.error("Unable to get memory information from redis", memoryInfoEvent.cause()); diff --git a/src/main/java/org/swisspush/redisques/util/DefaultRedisProvider.java b/src/main/java/org/swisspush/redisques/util/DefaultRedisProvider.java index 79d796f8..3cfc97f0 100644 --- a/src/main/java/org/swisspush/redisques/util/DefaultRedisProvider.java +++ b/src/main/java/org/swisspush/redisques/util/DefaultRedisProvider.java @@ -15,7 +15,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Function; /** * Default implementation for a Provider for {@link RedisAPI} @@ -70,7 +69,7 @@ private Future setupRedisClient() { connectToRedis().onComplete(event -> { connectPromiseRef.getAndSet(null); if (event.failed()) { - currentPromise.fail(event.cause()); + currentPromise.fail(new Exception(event.cause())); } else { redisAPI = event.result(); currentPromise.complete(redisAPI); @@ -104,7 +103,7 @@ private Future connectToRedis() { .setMaxPoolWaiting(redisMaxPoolWaitingSize) .setPoolRecycleTimeout(redisPoolRecycleTimeoutMs) .setMaxWaitingHandlers(redisMaxPipelineWaitingSize) - .setType(config.getRedisClientType());; + .setType(config.getRedisClientType()); createConnectStrings().forEach(redisOptions::addConnectionString); @@ -122,7 +121,10 @@ private Future connectToRedis() { // eg, the underlying TCP connection is closed but the client side doesn't know it yet // the client tries to use the staled connection to talk to server. An exceptions will be raised if (reconnectEnabled()) { - conn.exceptionHandler(e -> attemptReconnect(0)); + conn.exceptionHandler(ex -> { + log.warn("Connection broken. Attempt reconnect.", ex); + attemptReconnect(0); + }); } // make sure the client is reconnected on connection close @@ -154,9 +156,10 @@ private List createConnectStrings() { StringBuilder connectionStringPrefixBuilder = new StringBuilder(); connectionStringPrefixBuilder.append(config.getRedisEnableTls() ? "rediss://" : "redis://"); if (redisUser != null && !redisUser.isEmpty()) { - connectionStringPrefixBuilder.append(redisUser).append(":").append((redisPassword == null ? "" : redisPassword)).append("@"); + connectionStringPrefixBuilder.append(redisUser).append(":") + .append((redisPassword == null ? "" : redisPassword)).append("@"); } - List connectionString = new ArrayList<>(); + List connectionString = new ArrayList<>(config.getRedisHosts().size()); String connectionStringPrefix = connectionStringPrefixBuilder.toString(); for (int i = 0; i < config.getRedisHosts().size(); i++) { String host = config.getRedisHosts().get(i); @@ -183,7 +186,10 @@ private void attemptReconnect(int retry) { private void doReconnect(int retry) { long backoffMs = (long) (Math.pow(2, Math.min(retry, 10)) * configurationProvider.configuration().getRedisReconnectDelaySec()); log.debug("Schedule reconnect #{} in {}ms.", retry, backoffMs); - vertx.setTimer(backoffMs, timer -> connectToRedis() - .onFailure(t -> attemptReconnect(retry + 1))); + vertx.setTimer(backoffMs, timer -> connectToRedis().onFailure(ex -> { + log.info("Reconnect failed. Try again.", ex); + attemptReconnect(retry + 1); + })); } + } diff --git a/src/main/java/org/swisspush/redisques/util/DefaultRedisquesConfigurationProvider.java b/src/main/java/org/swisspush/redisques/util/DefaultRedisquesConfigurationProvider.java index ecd0e022..2c3c0e4f 100644 --- a/src/main/java/org/swisspush/redisques/util/DefaultRedisquesConfigurationProvider.java +++ b/src/main/java/org/swisspush/redisques/util/DefaultRedisquesConfigurationProvider.java @@ -78,7 +78,9 @@ private List findNotAllowedConfigurationValues(Set configuration if (configurationValues == null) { return Collections.emptyList(); } - return configurationValues.stream().filter(p -> !ALLOWED_CONFIGURATION_VALUES.contains(p)).collect(Collectors.toList()); + return configurationValues.stream() + .filter(p -> !ALLOWED_CONFIGURATION_VALUES.contains(p)) + .collect(Collectors.toList()); } private void changeProperty(Long propertyValue, String propertyName){ diff --git a/src/main/java/org/swisspush/redisques/util/DequeueStatistic.java b/src/main/java/org/swisspush/redisques/util/DequeueStatistic.java new file mode 100644 index 00000000..740c4e6f --- /dev/null +++ b/src/main/java/org/swisspush/redisques/util/DequeueStatistic.java @@ -0,0 +1,14 @@ +package org.swisspush.redisques.util; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class DequeueStatistic { + public Long lastDequeueAttemptTimestamp = null; + public Long lastDequeueSuccessTimestamp = null; + public Long nextDequeueDueTimestamp = null; + + public boolean isEmpty() { + return lastDequeueAttemptTimestamp == null && lastDequeueSuccessTimestamp == null && nextDequeueDueTimestamp == null; + } +} diff --git a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java index 6b0fea15..6f49c6dd 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java +++ b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java @@ -2,12 +2,14 @@ import io.vertx.core.CompositeFuture; import io.vertx.core.Future; -import io.vertx.core.Handler; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Command; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisAPI; import io.vertx.redis.client.Request; import io.vertx.redis.client.Response; import io.vertx.redis.client.impl.types.NumberType; @@ -24,7 +26,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import static org.swisspush.redisques.util.RedisquesAPI.ERROR; import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME; import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_SIZE; import static org.swisspush.redisques.util.RedisquesAPI.OK; @@ -58,10 +59,12 @@ public class QueueStatisticsCollector { private final static String QUEUE_FAILURES = "failures"; private final static String QUEUE_BACKPRESSURE = "backpressureTime"; private final static String QUEUE_SLOWDOWNTIME = "slowdownTime"; + private final static String QUEUE_DEQUEUE_STATISTIC = "dequeueStatistic"; private final Map queueFailureCount = new HashMap<>(); private final Map queueBackpressureTime = new HashMap<>(); private final Map queueSlowDownTime = new HashMap<>(); + private final Map dequeueStatistics = new HashMap<>(); private final ConcurrentMap queueMessageSpeedCtr = new ConcurrentHashMap<>(); private volatile Map queueMessageSpeed = new HashMap<>(); private final RedisProvider redisProvider; @@ -299,6 +302,30 @@ private long getQueueSlowDownTime(String queueName) { return queueSlowDownTime.getOrDefault(queueName, 0L); } + /** + * Sets the {@link DequeueStatistic} for the given queue. Note that this is done in memory + * but as well persisted in redis. + * + * @param queueName The name of the queue for which the stats must be set. + * @param dequeueStatistic The {@link DequeueStatistic} + */ + public void setDequeueStatistic(String queueName, DequeueStatistic dequeueStatistic) { + if (!dequeueStatistic.isEmpty()) { + dequeueStatistics.put(queueName, dequeueStatistic); + updateStatisticsInRedis(queueName); + } + } + + /** + * Retrieves the current {@link DequeueStatistic} of the given queue we have in memory for this redisques instance. + *

+ * @param queueName The queue name for which we want to retrieve the current failure count + * @return The last {@link DequeueStatistic} + */ + private DequeueStatistic getDequeueStatistic(String queueName) { + return dequeueStatistics.getOrDefault(queueName, new DequeueStatistic()); + } + /** * Write all the collected failure statistics for the given Queue to * redis for later usage if somebody requests the queue statistics. @@ -309,20 +336,29 @@ private void updateStatisticsInRedis(String queueName) { long failures = getQueueFailureCount(queueName); long slowDownTime = getQueueSlowDownTime(queueName); long backpressureTime = getQueueBackPressureTime(queueName); - if (failures > 0 || slowDownTime > 0 || backpressureTime > 0) { + DequeueStatistic dequeueStatistic = getDequeueStatistic(queueName); + if (failures > 0 || slowDownTime > 0 || backpressureTime > 0 || !dequeueStatistic.isEmpty()) { JsonObject obj = new JsonObject(); obj.put(QUEUENAME, queueName); obj.put(QUEUE_FAILURES, failures); obj.put(QUEUE_SLOWDOWNTIME, slowDownTime); obj.put(QUEUE_BACKPRESSURE, backpressureTime); - redisProvider.redis().onSuccess(redisAPI -> redisAPI.hset(List.of(STATSKEY, queueName, obj.toString()), - emptyHandler -> { - })).onFailure(throwable -> log.error("Redis: Error in updateStatisticsInRedis", throwable)); + obj.put(QUEUE_DEQUEUE_STATISTIC, JsonObject.mapFrom(dequeueStatistic)); + redisProvider.redis() + .onSuccess(redisAPI -> { + redisAPI.hset(List.of(STATSKEY, queueName, obj.toString()), ev -> { + if( ev.failed() ) log.warn("TODO error handling", new Exception(ev.cause())); + }); + }) + .onFailure(ex -> log.error("Redis: Error in updateStatisticsInRedis", ex)); } else { - redisProvider.redis().onSuccess(redisAPI -> redisAPI.hdel(List.of(STATSKEY, queueName), - emptyHandler -> { - })).onFailure(throwable -> log.error("Redis: Error in updateStatisticsInRedis", throwable)); - + redisProvider.redis() + .onSuccess(redisAPI -> { + redisAPI.hdel(List.of(STATSKEY, queueName), ev -> { + if (ev.failed()) log.warn("TODO error handling", new Exception(ev.cause())); + }); + }) + .onFailure(ex -> log.error("Redis: Error in updateStatisticsInRedis", ex)); } } @@ -337,6 +373,7 @@ private static class QueueStatistic { private long backpressureTime; private long slowdownTime; private long speed; + private JsonObject dequeueStatistic; QueueStatistic(String queueName) { this.queueName = queueName; @@ -382,6 +419,14 @@ void setMessageSpeed(Long speed) { this.speed = 0; } + void setDequeueStatistic(DequeueStatistic dequeueStatistic) { + if (dequeueStatistic != null && !dequeueStatistic.isEmpty()) { + this.dequeueStatistic = JsonObject.mapFrom(dequeueStatistic); + return; + } + this.dequeueStatistic = JsonObject.mapFrom(new DequeueStatistic()); + } + JsonObject getAsJsonObject() { return new JsonObject() .put(MONITOR_QUEUE_NAME, queueName) @@ -389,7 +434,8 @@ JsonObject getAsJsonObject() { .put(STATISTIC_QUEUE_FAILURES, failures) .put(STATISTIC_QUEUE_BACKPRESSURE, backpressureTime) .put(STATISTIC_QUEUE_SLOWDOWN, slowdownTime) - .put(STATISTIC_QUEUE_SPEED, speed); + .put(STATISTIC_QUEUE_SPEED, speed) + .put(QUEUE_DEQUEUE_STATISTIC, dequeueStatistic); } } @@ -400,84 +446,157 @@ JsonObject getAsJsonObject() { * for all queues requested (independent of the redisques instance for which the queues are * registered). Therefore this method must be used with care and not be called too often! * - * @param event The event on which we will answer finally * @param queues The queues for which we are interested in the statistics + * + * @return A Future */ - public void getQueueStatistics(Message event, final List queues) { + + public Future getQueueStatistics(final List queues) { + final Promise promise = Promise.promise(); if (queues == null || queues.isEmpty()) { log.debug("Queue statistics evaluation with empty queues, returning empty result"); - event.reply(new JsonObject().put(STATUS, OK).put(RedisquesAPI.QUEUES, new JsonArray())); - return; + promise.complete(new JsonObject().put(STATUS, OK).put(RedisquesAPI.QUEUES, new JsonArray())); + return promise.future(); } - redisProvider.connection().onSuccess(conn -> { - List responses = queues.stream().map(queue -> conn.send(Request.cmd(Command.LLEN, queuePrefix + queue)) - ).collect(Collectors.toList()); - CompositeFuture.all(responses).onFailure(throwable -> { - log.error("Unexepected queue length result"); - event.reply(new JsonObject().put(STATUS, ERROR)); - }).onSuccess(compositeFuture -> { - List queueLengthList = compositeFuture.list(); - if (queueLengthList == null) { - log.error("Unexepected queue length result null"); - event.reply(new JsonObject().put(STATUS, ERROR)); - return; - } - if (queueLengthList.size() != queues.size()) { - log.error("Unexpected queue length result with unequal size {} : {}", - queues.size(), queueLengthList.size()); - event.reply(new JsonObject().put(STATUS, ERROR)); - return; - } - // populate the list of queue statistics in a Hashmap for later fast merging - final HashMap statisticsMap = new HashMap<>(); - for (int i = 0; i < queues.size(); i++) { - QueueStatistic qs = new QueueStatistic(queues.get(i)); - qs.setSize(queueLengthList.get(i).toLong()); - qs.setMessageSpeed(getQueueSpeed(qs.queueName)); - statisticsMap.put(qs.queueName, qs); - } - // now retrieve all available failure statistics from Redis and merge them - // together with the previous populated common queue statistics map - redisProvider.redis().onSuccess(redisAPI -> redisAPI.hvals(STATSKEY, statisticsSet -> { - if (statisticsSet == null) { - log.error("Unexpected statistics queue evaluation result result null"); - event.reply(new JsonObject().put(STATUS, ERROR)); + var ctx = new RequestCtx(); + ctx.queueNames = queues; + step1(ctx).compose( + jsonObject1 -> step2(ctx).compose( + jsonObject2 -> step3(ctx).compose( + jsonObject3 -> step4(ctx).compose( + jsonObject4 -> step5(ctx).compose( + jsonObject5 -> step6(ctx)) + )))).onComplete(promise); + return promise.future(); + } + + /**

init redis connection.

*/ + Future step1(RequestCtx ctx) { + final Promise promise = Promise.promise(); + redisProvider.connection() + .onFailure(throwable -> { + promise.fail(new Exception("Redis: Failed to get queue length.", throwable)); + }) + .onSuccess(conn -> { + assert conn != null; + ctx.conn = conn; + promise.complete(); + }); + return promise.future(); + } + + /**

Query queue lengths.

*/ + Future step2(RequestCtx ctx) { + assert ctx.conn != null; + final Promise promise = Promise.promise(); + List responses = ctx.queueNames.stream() + .map(queue -> ctx.conn.send(Request.cmd(Command.LLEN, queuePrefix + queue))) + .collect(Collectors.toList()); + CompositeFuture.all(responses) + .onFailure(ex -> { + promise.fail("Unexpected queue length result"); + }) + .onSuccess(compositeFuture -> { + List queueLengthList = compositeFuture.list(); + if (queueLengthList == null) { + promise.fail("Unexpected queue length result: null"); return; } - // put the received statistics data to the former prepared statistics objects - // per queue - for (Response response : statisticsSet.result()) { - JsonObject jObj = new JsonObject(response.toString()); - String queueName = jObj.getString(RedisquesAPI.QUEUENAME); - QueueStatistic queueStatistic = statisticsMap.get(queueName); - if (queueStatistic != null) { - // if it isn't there, there is obviously no statistic needed - queueStatistic.setFailures(jObj.getLong(QUEUE_FAILURES, 0L)); - queueStatistic.setBackpressureTime(jObj.getLong(QUEUE_BACKPRESSURE, 0L)); - queueStatistic.setSlowdownTime(jObj.getLong(QUEUE_SLOWDOWNTIME, 0L)); - } - } - // build the final resulting statistics list from the former merged queue - // values from various sources - JsonArray result = new JsonArray(); - for (String queueName : queues) { - QueueStatistic stats = statisticsMap.get(queueName); - if (stats != null) { - result.add(stats.getAsJsonObject()); - } + if (queueLengthList.size() != ctx.queueNames.size()) { + String err = "Unexpected queue length result with unequal size " + + ctx.queueNames.size() + " : " + queueLengthList.size(); + promise.fail(err); + return; } - event.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK) - .put(RedisquesAPI.QUEUES, result)); - })).onFailure(throwable -> { - log.error("Redis: Error in getQueueStatistics", throwable); - event.reply(new JsonObject().put(STATUS, ERROR)); + ctx.queueLengthList = queueLengthList; + promise.complete(); }); + return promise.future(); + } + + /**

init queue statistics.

*/ + Future step3(RequestCtx ctx) { + assert ctx.queueLengthList != null; + final Promise promise = Promise.promise(); + // populate the list of queue statistics in a Hashmap for later fast merging + ctx.statistics = new HashMap<>(ctx.queueNames.size()); + for (int i = 0; i < ctx.queueNames.size(); i++) { + QueueStatistic qs = new QueueStatistic(ctx.queueNames.get(i)); + qs.setSize(ctx.queueLengthList.get(i).toLong()); + qs.setMessageSpeed(getQueueSpeed(qs.queueName)); + ctx.statistics.put(qs.queueName, qs); + } + promise.complete(); + return promise.future(); + } + + /**

init a resAPI instance we need to get more details.

*/ + Future step4(RequestCtx ctx){ + final Promise promise = Promise.promise(); + redisProvider.redis() + .onFailure(throwable -> { + promise.fail(new Exception("Redis: Error in getQueueStatistics", throwable)); + }) + .onSuccess(redisAPI -> { + assert redisAPI != null; + ctx.redisAPI = redisAPI; + promise.complete(); + }); + return promise.future(); + } - }); - }).onFailure(throwable -> { - log.warn("Redis: Failed to get queue length.", throwable); - event.reply(new JsonObject().put(STATUS, ERROR)); + /** + *

retrieve all available failure statistics from Redis and merge them + * together with the previous populated common queue statistics map

+ */ + Future step5(RequestCtx ctx) { + assert ctx.redisAPI != null; + assert ctx.statistics != null; + final Promise promise = Promise.promise(); + ctx.redisAPI.hvals(STATSKEY, statisticsSet -> { + if( statisticsSet == null || statisticsSet.failed() ){ + promise.fail(new RuntimeException("statistics queue evaluation failed", + statisticsSet == null ? null : statisticsSet.cause())); + return; + } + ctx.redisFailStats = statisticsSet.result(); + assert ctx.redisFailStats != null; + promise.complete(); }); + return promise.future(); + } + + /**

put received statistics data to the former prepared statistics objects per + * queue.

*/ + Future step6(RequestCtx ctx){ + assert ctx.redisFailStats != null; + final Promise promise = Promise.promise(); + for (Response response : ctx.redisFailStats) { + JsonObject jObj = new JsonObject(response.toString()); + String queueName = jObj.getString(QUEUENAME); + QueueStatistic queueStatistic = ctx.statistics.get(queueName); + if (queueStatistic != null) { + // if it isn't there, there is obviously no statistic needed + queueStatistic.setFailures(jObj.getLong(QUEUE_FAILURES, 0L)); + queueStatistic.setBackpressureTime(jObj.getLong(QUEUE_BACKPRESSURE, 0L)); + queueStatistic.setSlowdownTime(jObj.getLong(QUEUE_SLOWDOWNTIME, 0L)); + if (jObj.containsKey(QUEUE_DEQUEUE_STATISTIC)) { + queueStatistic.setDequeueStatistic(jObj.getJsonObject(QUEUE_DEQUEUE_STATISTIC).mapTo(DequeueStatistic.class)); + } + } + } + // build the final resulting statistics list from the former merged queue + // values from various sources + JsonArray result = new JsonArray(); + for (String queueName : ctx.queueNames) { + QueueStatistic stats = ctx.statistics.get(queueName); + if (stats != null) { + result.add(stats.getAsJsonObject()); + } + } + promise.complete(new JsonObject().put(STATUS, OK) + .put(RedisquesAPI.QUEUES, result)); + return promise.future(); } /** @@ -503,4 +622,16 @@ public void getQueuesSpeed(Message event, final List queues) event.reply(new JsonObject().put(STATUS, OK).put(STATISTIC_QUEUE_SPEED, speed)); } + /**

Holds intermediate state related to a {@link #getQueueStatistics(Message, List)} + * request.

*/ + private static class RequestCtx { + private Message event; // origin event we have to answer + private List queueNames; // Requested queues to analyze + private Redis conn; + private RedisAPI redisAPI; + private List queueLengthList; + private HashMap statistics; // Stats we're going to populate + private Response redisFailStats; // failure stats we got from redis. + } + } diff --git a/src/main/java/org/swisspush/redisques/util/RedisQuesTimer.java b/src/main/java/org/swisspush/redisques/util/RedisQuesTimer.java index a01b7076..f372b359 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisQuesTimer.java +++ b/src/main/java/org/swisspush/redisques/util/RedisQuesTimer.java @@ -39,7 +39,7 @@ public Future executeDelayedMax(long delayMs) { log.debug("starting timer with a delay of " + delay + "ms"); vertx.setTimer(delay, delayed -> promise.complete()); } else { - promise.complete(); + vertx.runOnContext(aVoid -> promise.complete()); } return promise.future(); diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesAPI.java b/src/main/java/org/swisspush/redisques/util/RedisquesAPI.java index 3dfac162..0fe3e255 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesAPI.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesAPI.java @@ -41,10 +41,14 @@ public class RedisquesAPI { public static final String MONITOR_QUEUE_NAME = "name"; public static final String MONITOR_QUEUE_SIZE = "size"; + public static final String STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT = "lastDequeueAttempt"; + public static final String STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS = "lastDequeueSuccess"; + public static final String STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS = "nextDequeueDueTimestamp"; public static final String STATISTIC_QUEUE_FAILURES = "failures"; public static final String STATISTIC_QUEUE_BACKPRESSURE = "backpressureTime"; public static final String STATISTIC_QUEUE_SLOWDOWN = "slowdownTime"; public static final String STATISTIC_QUEUE_SPEED = "speed"; + public final static String STATISTIC_QUEUE_DEQUEUESTATISTIC = "dequeueStatistic"; public static final String STATISTIC_QUEUE_SPEED_INTERVAL_UNIT= "unitSec"; private static final Logger log = LoggerFactory.getLogger(RedisquesAPI.class); diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java index 5547e5ec..4d396fda 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java @@ -50,6 +50,7 @@ public class RedisquesConfiguration { private final int redisReconnectAttempts; private final int redisReconnectDelaySec; private final int redisPoolRecycleTimeoutMs; + private final int dequeueStatisticReportIntervalSec; private static final int DEFAULT_CHECK_INTERVAL_S = 60; // 60s private static final int DEFAULT_PROCESSOR_TIMEOUT_MS = 240000; // 240s @@ -69,6 +70,7 @@ public class RedisquesConfiguration { private static final int DEFAULT_QUEUE_SPEED_INTERVAL_SEC = 60; private static final int DEFAULT_MEMORY_USAGE_LIMIT_PCT = 100; private static final int DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC = 60; + private static final int DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC = 30; public static final String PROP_ADDRESS = "address"; public static final String PROP_CONFIGURATION_UPDATED_ADDRESS = "configuration-updated-address"; @@ -109,6 +111,7 @@ public class RedisquesConfiguration { public static final String PROP_QUEUE_SPEED_INTERVAL_SEC = "queueSpeedIntervalSec"; public static final String PROP_MEMORY_USAGE_LIMIT_PCT = "memoryUsageLimitPercent"; public static final String PROP_MEMORY_USAGE_CHECK_INTERVAL_SEC = "memoryUsageCheckIntervalSec"; + public static final String PROP_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC = "dequeueStatisticReportIntervalSec"; /** * Constructor with default values. Use the {@link RedisquesConfigurationBuilder} class @@ -138,7 +141,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE, DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT, DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC, DEFAULT_REDIS_RECONNECT_ATTEMPTS, DEFAULT_REDIS_RECONNECT_DELAY_SEC, - DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS); + DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS, DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC); } /** @@ -159,7 +162,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE, DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT, DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC, DEFAULT_REDIS_RECONNECT_ATTEMPTS, DEFAULT_REDIS_RECONNECT_DELAY_SEC, - DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS); + DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS, DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC); } /** @@ -179,7 +182,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE, DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT, DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC, DEFAULT_REDIS_RECONNECT_ATTEMPTS, DEFAULT_REDIS_RECONNECT_DELAY_SEC, - DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS); + DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS, DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC); } private RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress, int refreshPeriod, @@ -191,7 +194,8 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres List queueConfigurations, boolean enableQueueNameDecoding, int maxPoolSize, int maxPoolWaitSize, int maxPipelineWaitSize, int queueSpeedIntervalSec, int memoryUsageLimitPercent, int memoryUsageCheckIntervalSec, - int redisReconnectAttempts, int redisReconnectDelaySec, int redisPoolRecycleTimeoutMs) { + int redisReconnectAttempts, int redisReconnectDelaySec, int redisPoolRecycleTimeoutMs, + int dequeueStatisticReportIntervalSec) { this.address = address; this.configurationUpdatedAddress = configurationUpdatedAddress; this.redisPrefix = redisPrefix; @@ -268,6 +272,7 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres } this.redisPoolRecycleTimeoutMs = redisPoolRecycleTimeoutMs; + this.dequeueStatisticReportIntervalSec = dequeueStatisticReportIntervalSec; } public static RedisquesConfigurationBuilder with() { @@ -289,7 +294,8 @@ private RedisquesConfiguration(RedisquesConfigurationBuilder builder) { builder.memoryUsageCheckIntervalSec, builder.redisReconnectAttempts, builder.redisReconnectDelaySec, - builder.redisPoolRecycleTimeoutMs); + builder.redisPoolRecycleTimeoutMs, + builder.dequeueStatisticReportIntervalSec); } public JsonObject asJsonObject() { @@ -329,6 +335,7 @@ public JsonObject asJsonObject() { obj.put(PROP_QUEUE_SPEED_INTERVAL_SEC, getQueueSpeedIntervalSec()); obj.put(PROP_MEMORY_USAGE_LIMIT_PCT, getMemoryUsageLimitPercent()); obj.put(PROP_MEMORY_USAGE_CHECK_INTERVAL_SEC, getMemoryUsageCheckIntervalSec()); + obj.put(PROP_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC, getDequeueStatisticReportIntervalSec()); return obj; } @@ -442,6 +449,9 @@ public static RedisquesConfiguration fromJsonObject(JsonObject json) { if (json.containsKey(PROP_MEMORY_USAGE_CHECK_INTERVAL_SEC)) { builder.memoryUsageCheckIntervalSec(json.getInteger(PROP_MEMORY_USAGE_CHECK_INTERVAL_SEC)); } + if (json.containsKey(PROP_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC)) { + builder.dequeueStatisticReportIntervalSec(json.getInteger(PROP_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC)); + } return builder.build(); } @@ -491,6 +501,9 @@ public int getRedisReconnectDelaySec() { public int getRedisPoolRecycleTimeoutMs() { return redisPoolRecycleTimeoutMs; } + public int getDequeueStatisticReportIntervalSec() { + return dequeueStatisticReportIntervalSec; + } public String getRedisAuth() { return redisAuth; @@ -636,6 +649,7 @@ public static class RedisquesConfigurationBuilder { private int redisReconnectAttempts; private int redisReconnectDelaySec; private int redisPoolRecycleTimeoutMs; + private int dequeueStatisticReportIntervalSec; private RedisClientType redisClientType; private String redisAuth; private String redisPassword; @@ -691,6 +705,7 @@ public RedisquesConfigurationBuilder() { this.queueSpeedIntervalSec = DEFAULT_QUEUE_SPEED_INTERVAL_SEC; this.memoryUsageLimitPercent = DEFAULT_MEMORY_USAGE_LIMIT_PCT; this.memoryUsageCheckIntervalSec = DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC; + this.dequeueStatisticReportIntervalSec = DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC; } public RedisquesConfigurationBuilder address(String address) { @@ -868,6 +883,10 @@ public RedisquesConfigurationBuilder memoryUsageCheckIntervalSec(int memoryUsage return this; } + public RedisquesConfigurationBuilder dequeueStatisticReportIntervalSec(int dequeueStatisticReportIntervalSec) { + this.dequeueStatisticReportIntervalSec = dequeueStatisticReportIntervalSec; + return this; + } public RedisquesConfiguration build() { return new RedisquesConfiguration(this); } diff --git a/src/test/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandlerTest.java b/src/test/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandlerTest.java index 233d603c..842d2579 100644 --- a/src/test/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandlerTest.java +++ b/src/test/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandlerTest.java @@ -1,5 +1,8 @@ package org.swisspush.redisques.handler; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.sun.istack.NotNull; import com.sun.istack.Nullable; import io.restassured.RestAssured; @@ -123,7 +126,7 @@ public class RedisquesHttpRequestHandlerTest extends AbstractTestCase { private static final String configurationEmpty = "{}"; @Rule - public Timeout rule = Timeout.seconds(15); + public Timeout rule = Timeout.seconds(35); @BeforeClass public static void beforeClass() { @@ -151,6 +154,7 @@ public void deployRedisques(TestContext context) { .withEnqueueDelayMillisPerSize(5).withEnqueueMaxDelayMillis(100) )) .queueSpeedIntervalSec(4) + .dequeueStatisticReportIntervalSec(5) .build() .asJsonObject(); @@ -1867,10 +1871,10 @@ public void deleteSingleLockEncoded(TestContext context) { } @Test - public void getMonitorInformation(TestContext context) { + public void getMonitorInformation(TestContext context) throws JsonProcessingException { Async async = context.async(); flushAll(); - + ObjectMapper jsonMapper = new ObjectMapper(); when().get("/queuing/monitor").then().assertThat().statusCode(200) .body("queues", empty()); @@ -1888,6 +1892,12 @@ public void getMonitorInformation(TestContext context) { given().body("{}").when().put("/queuing/locks/queue_3").then().assertThat().statusCode(200); when().delete("/queuing/queues/queue_3/0").then().assertThat().statusCode(200); + // wait 5.1 second, because the update time is 5 seconds + try { + Thread.sleep(5100); + } catch (InterruptedException iex) { + // ignore + } String expectedNoEmptyQueuesNoLimit = "{\n" + " \"queues\": [\n" + " {\n" + @@ -1900,10 +1910,15 @@ public void getMonitorInformation(TestContext context) { " }\n" + " ]\n" + "}"; - - when().get("/queuing/monitor").then().assertThat().statusCode(200) - .body(equalTo(new JsonObject(expectedNoEmptyQueuesNoLimit).toString())); - + JsonNode expectedStaticJson = jsonMapper.readTree(expectedNoEmptyQueuesNoLimit); + JsonNode receivedJson = jsonMapper.readTree(when().get("/queuing/monitor").then().assertThat().statusCode(200).extract().asString()); + verifyResponse(context, expectedStaticJson, receivedJson); + // wait 5.1 second, because the update time is 5 seconds + try { + Thread.sleep(5100); + } catch (InterruptedException iex) { + // ignore + } String expectedWithEmptyQueuesNoLimit = "{\n" + " \"queues\": [\n" + " {\n" + @@ -1921,9 +1936,15 @@ public void getMonitorInformation(TestContext context) { " ]\n" + "}"; - when().get("/queuing/monitor?emptyQueues").then().assertThat().statusCode(200) - .body(equalTo(new JsonObject(expectedWithEmptyQueuesNoLimit).toString())); - + expectedStaticJson = jsonMapper.readTree(expectedWithEmptyQueuesNoLimit); + receivedJson = jsonMapper.readTree(when().get("/queuing/monitor?emptyQueues").then().assertThat().statusCode(200).extract().asString()); + verifyResponse(context, expectedStaticJson, receivedJson); + // wait 5.1 second, because the update time is 5 seconds + try { + Thread.sleep(5100); + } catch (InterruptedException iex) { + // ignore + } String expectedNoEmptyQueuesAndLimit3 = "{\n" + " \"queues\": [\n" + " {\n" + @@ -1937,9 +1958,15 @@ public void getMonitorInformation(TestContext context) { " ]\n" + "}"; - when().get("/queuing/monitor?limit=3").then().assertThat().statusCode(200) - .body(equalTo(new JsonObject(expectedNoEmptyQueuesAndLimit3).toString())); - + expectedStaticJson = jsonMapper.readTree(expectedNoEmptyQueuesAndLimit3); + receivedJson = jsonMapper.readTree(when().get("/queuing/monitor?limit=3").then().assertThat().statusCode(200).extract().asString()); + verifyResponse(context, expectedStaticJson, receivedJson); + // wait 5.1 second, because the update time is 5 seconds + try { + Thread.sleep(5100); + } catch (InterruptedException iex) { + // ignore + } String expectedWithEmptyQueuesAndLimit3 = "{\n" + " \"queues\": [\n" + " {\n" + @@ -1957,9 +1984,15 @@ public void getMonitorInformation(TestContext context) { " ]\n" + "}"; - when().get("/queuing/monitor?limit=3&emptyQueues").then().assertThat().statusCode(200) - .body(equalTo(new JsonObject(expectedWithEmptyQueuesAndLimit3).toString())); - + expectedStaticJson = jsonMapper.readTree(expectedWithEmptyQueuesAndLimit3); + receivedJson = jsonMapper.readTree(when().get("/queuing/monitor?limit=3&emptyQueues").then().assertThat().statusCode(200).extract().asString()); + verifyResponse(context, expectedStaticJson, receivedJson); + // wait 5.1 second, because the update time is 5 seconds + try { + Thread.sleep(5100); + } catch (InterruptedException iex) { + // ignore + } String expectedWithEmptyQueuesAndInvalidLimit = "{\n" + " \"queues\": [\n" + " {\n" + @@ -1977,12 +2010,26 @@ public void getMonitorInformation(TestContext context) { " ]\n" + "}"; - when().get("/queuing/monitor?limit=xx99xx&emptyQueues").then().assertThat().statusCode(200) - .body(equalTo(new JsonObject(expectedWithEmptyQueuesAndInvalidLimit).toString())); + expectedStaticJson = jsonMapper.readTree(expectedWithEmptyQueuesAndInvalidLimit); + receivedJson = jsonMapper.readTree(when().get("/queuing/monitor?limit=limit=xx99xx&emptyQueues").then().assertThat().statusCode(200).extract().asString()); + verifyResponse(context, expectedStaticJson, receivedJson); async.complete(); } + private static void verifyResponse(TestContext context, JsonNode expectedStaticJson, JsonNode receivedJson) { + context.assertEquals(expectedStaticJson.size(), receivedJson.size()); + context.assertEquals(expectedStaticJson.get("queues").size(), receivedJson.get("queues").size()); + for (int i = 0; i < expectedStaticJson.get("queues").size(); i++){ + context.assertEquals(expectedStaticJson.get("queues").get(i).get("name"), receivedJson.get("queues").get(i).get("name")); + context.assertEquals(expectedStaticJson.get("queues").get(i).get("size"), receivedJson.get("queues").get(i).get("size")); + + context.assertFalse(receivedJson.get("queues").get(i).get("lastDequeueAttempt").asText().isEmpty()); + context.assertTrue(receivedJson.get("queues").get(i).get("lastDequeueSuccess").asText().isEmpty()); // No handlers for address processor-address + context.assertFalse(receivedJson.get("queues").get(i).get("nextDequeueDueTimestamp").asText().isEmpty()); // So will retry + } + } + @Test public void getStatisticsEmpty(TestContext context) { flushAll(); diff --git a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java index 1a3535af..6890eac0 100644 --- a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java +++ b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java @@ -137,6 +137,7 @@ public void testGetDefaultAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getJsonArray(PROP_QUEUE_CONFIGURATIONS).getList().size(), 0); testContext.assertEquals(json.getInteger(PROP_QUEUE_SPEED_INTERVAL_SEC), 60); testContext.assertEquals(json.getInteger(PROP_MEMORY_USAGE_LIMIT_PCT), 100); + testContext.assertEquals(json.getInteger(PROP_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC), 30); } @Test @@ -163,6 +164,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) { )) .queueSpeedIntervalSec(1) .memoryUsageLimitPercent(55) + .dequeueStatisticReportIntervalSec(44) .build(); JsonObject json = config.asJsonObject(); @@ -192,7 +194,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getString(PROP_HTTP_REQUEST_HANDLER_USER_HEADER), "x-custom-user-header"); testContext.assertEquals(json.getInteger(PROP_QUEUE_SPEED_INTERVAL_SEC), 1); testContext.assertEquals(json.getInteger(PROP_MEMORY_USAGE_LIMIT_PCT), 55); - + testContext.assertEquals(json.getInteger(PROP_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC), 44); // queue configurations JsonArray queueConfigurationsJsonArray = json.getJsonArray(PROP_QUEUE_CONFIGURATIONS); List queueConfigurationJsonObjects = queueConfigurationsJsonArray.getList(); @@ -230,6 +232,7 @@ public void testGetDefaultFromJsonObject(TestContext testContext) { testContext.assertEquals(config.getQueueConfigurations().size(), 0); testContext.assertEquals(config.getQueueSpeedIntervalSec(), 60); testContext.assertEquals(config.getMemoryUsageLimitPercent(), 100); + testContext.assertEquals(config.getDequeueStatisticReportIntervalSec(), 30); } @Test @@ -257,6 +260,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) { json.put(PROP_HTTP_REQUEST_HANDLER_USER_HEADER, "x-custom-user-header"); json.put(PROP_QUEUE_SPEED_INTERVAL_SEC, 1); json.put(PROP_MEMORY_USAGE_LIMIT_PCT, 75); + json.put(PROP_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC, 22); json.put(PROP_QUEUE_CONFIGURATIONS, new JsonArray(Collections.singletonList( new QueueConfiguration().withPattern("vehicle-.*") .withRetryIntervals(10, 20, 30, 60) @@ -285,6 +289,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) { testContext.assertEquals(config.getHttpRequestHandlerUserHeader(), "x-custom-user-header"); testContext.assertEquals(config.getQueueSpeedIntervalSec(), 1); testContext.assertEquals(config.getMemoryUsageLimitPercent(), 75); + testContext.assertEquals(config.getDequeueStatisticReportIntervalSec(), 22); // queue configurations testContext.assertEquals(config.getQueueConfigurations().size(), 1);