diff --git a/pom.xml b/pom.xml index 4140a9c..be82a0f 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 3.1.1-SNAPSHOT + 3.1.2-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx @@ -70,11 +70,6 @@ slf4j-api ${slf4j.version} - - org.slf4j - slf4j-reload4j - ${slf4j.version} - @@ -389,7 +384,7 @@ - 4.5.1 + 4.5.2 2.0.10 5.8.0 4.13.2 @@ -397,7 +392,7 @@ 3.7.0 5.4.0 1.16.0 - 2.15.0 + 2.17.0 UTF8 https://oss.sonatype.org/content/repositories/snapshots/ diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java new file mode 100644 index 0000000..5ff05ef --- /dev/null +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -0,0 +1,225 @@ +package org.swisspush.redisques; + +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import org.slf4j.Logger; +import org.swisspush.redisques.util.DequeueStatistic; +import org.swisspush.redisques.util.QueueStatisticsCollector; + +import java.util.*; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import static java.lang.Long.compare; +import static java.lang.System.currentTimeMillis; +import static java.util.Collections.emptyList; +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.*; + + +/** + *

Old impl did fetch all queues (take 2000 as an example from PaISA prod) Did + * a nested iteration, so worst case iterate 2000 times 2000 queues to find the + * matching queue (2'000 * 2'000 = 4'000'000 iterations). New impl now does setup a + * dictionary then does indexed access while iterating the 2000 entries + * (2'000 + 2'000 = 4'000 iterations). Keep in mind 2'000 is just some usual value. + * Under load, this value can increase further and so the old, exponential approach + * just did explode in terms of computational efford.

+ */ +public class QueueStatsService { + + private static final Logger log = getLogger(QueueStatsService.class); + private final Vertx vertx; + private final EventBus eventBus; + private final String redisquesAddress; + private final QueueStatisticsCollector queueStatisticsCollector; + + public QueueStatsService(Vertx vertx, EventBus eventBus, String redisquesAddress, QueueStatisticsCollector queueStatisticsCollector) { + this.vertx = vertx; + this.eventBus = eventBus; + this.redisquesAddress = redisquesAddress; + this.queueStatisticsCollector = queueStatisticsCollector; + } + + public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { + var req0 = new GetQueueStatsRequest(); + req0.mCtx = mCtx; + req0.mentor = mentor; + fetchQueueNamesAndSize(req0, (ex1, req1) -> { + if (ex1 != null) { req1.mentor.onError(ex1, req1.mCtx); return; } + // Prepare a list of queue names as it is needed to fetch retryDetails. + req1.queueNames = new ArrayList<>(req1.queues.size()); + for (Queue q : req1.queues) req1.queueNames.add(q.name); + fetchRetryDetails(req1, (ex2, req2) -> { + if (ex2 != null) { req2.mentor.onError(ex2, req2.mCtx); return; } + mergeRetryDetailsIntoCollectedData(req2, (ex3, req3) -> { + if (ex3 != null) { req3.mentor.onError(ex3, req3.mCtx); return; } + req3.mentor.onQueueStatistics(req3.queues, req3.mCtx); + }); + }); + }); + } + + private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer> onDone) { + String filter = req.mentor.filter(req.mCtx); + JsonObject operation = buildGetQueuesItemsCountOperation(filter); + eventBus.request(redisquesAddress, operation, ev -> { + if (ev.failed()) { + onDone.accept(new Exception("eventBus.request()", ev.cause()), null); + return; + } + Message msg = ev.result(); + JsonObject body = msg.body(); + String status = body.getString(STATUS); + if (!OK.equals(status)) { + onDone.accept(new Exception("Unexpected status " + status), null); + return; + } + JsonArray queuesJsonArr = body.getJsonArray(QUEUES); + if (queuesJsonArr == null || queuesJsonArr.isEmpty()) { + log.debug("result was {}, we return an empty result.", queuesJsonArr == null ? "null" : "empty"); + req.queues = emptyList(); + onDone.accept(null, req); + return; + } + boolean includeEmptyQueues = req.mentor.includeEmptyQueues(req.mCtx); + List queues = new ArrayList<>(queuesJsonArr.size()); + for (var it = queuesJsonArr.iterator(); it.hasNext(); ) { + JsonObject queueJson = (JsonObject) it.next(); + String name = queueJson.getString(MONITOR_QUEUE_NAME); + Long size = queueJson.getLong(MONITOR_QUEUE_SIZE); + // No need to process empty queues any further if caller is not interested + // in them anyway. + if (!includeEmptyQueues && (size == null || size == 0)) continue; + Queue queue = new Queue(name, size); + queues.add(queue); + } + queues.sort(this::compareLargestFirst); + // Only the part with the most filled queues got requested. Get rid of + // all shorter queues then. + int limit = req.mentor.limit(req.mCtx); + if (limit != 0 && queues.size() > limit) queues = queues.subList(0, limit); + req.queues = queues; + onDone.accept(null, req); + }); + } + + private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer> onDone) { + long begGetQueueStatsMs = currentTimeMillis(); + assert req.queueNames != null; + queueStatisticsCollector.getQueueStatistics(req.queueNames).onComplete( ev -> { + req.queueNames = null; // <- no longer needed + long durGetQueueStatsMs = currentTimeMillis() - begGetQueueStatsMs; + log.debug("queueStatisticsCollector.getQueueStatistics() took {}ms", durGetQueueStatsMs); + if (ev.failed()) { + log.warn("queueStatisticsCollector.getQueueStatistics() failed. Fallback to empty result.", ev.cause()); + req.queuesJsonArr = new JsonArray(); + onDone.accept(null, req); + return; + } + JsonObject queStatsJsonObj = ev.result(); + String status = queStatsJsonObj.getString(STATUS); + if (!OK.equals(status)) { + log.warn("queueStatisticsCollector.getQueueStatistics() responded '" + status + "'. Fallback to empty result.", ev.cause()); + req.queuesJsonArr = new JsonArray(); + onDone.accept(null, req); + return; + } + req.queuesJsonArr = queStatsJsonObj.getJsonArray(QUEUES); + onDone.accept(null, req); + }); + } + + private void mergeRetryDetailsIntoCollectedData(GetQueueStatsRequest req, BiConsumer> onDone) { + // Setup a lookup table as we need to find by name further below. + Map detailsByName = new HashMap<>(req.queuesJsonArr.size()); + for (var it = (Iterator) (Object) req.queuesJsonArr.iterator(); it.hasNext(); ) { + JsonObject detailJson = it.next(); + String name = detailJson.getString(MONITOR_QUEUE_NAME); + detailsByName.put(name, detailJson); + } + for (Queue queue : req.queues) { + JsonObject detail = detailsByName.get(queue.name); + if (detail == null) continue; // no details to enrich. + JsonObject dequeueStatsJson = detail.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC); + if (dequeueStatsJson == null) continue; // no dequeue stats we could enrich + DequeueStatistic dequeueStats = dequeueStatsJson.mapTo(DequeueStatistic.class); + // Attach whatever details we got. + queue.lastDequeueAttemptEpochMs = dequeueStats.lastDequeueAttemptTimestamp; + queue.lastDequeueSuccessEpochMs = dequeueStats.lastDequeueSuccessTimestamp; + queue.nextDequeueDueTimestampEpochMs = dequeueStats.nextDequeueDueTimestamp; + } + onDone.accept(null, req); + } + + private int compareLargestFirst(Queue a, Queue b) { + return compare(b.size, a.size); + } + + + private static class GetQueueStatsRequest { + private CTX mCtx; + private GetQueueStatsMentor mentor; + private List queueNames; + private JsonArray queuesJsonArr; + private List queues; + } + + + public static class Queue { + private final String name; + private final long size; + private Long lastDequeueAttemptEpochMs; + private Long lastDequeueSuccessEpochMs; + private Long nextDequeueDueTimestampEpochMs; + private Queue(String name, long size){ + assert name != null; + this.name = name; + this.size = size; + } + + public String getName() { return name; } + public long getSize() { return size; } + public Long getLastDequeueAttemptEpochMs() { return lastDequeueAttemptEpochMs; } + public Long getLastDequeueSuccessEpochMs() { return lastDequeueSuccessEpochMs; } + public Long getNextDequeueDueTimestampEpochMs() { return nextDequeueDueTimestampEpochMs; } + } + + + /** + *

Mentors fetching operations and so provides the fetcher the required + * information. Finally it also receives the operations result.

+ * + * @param + * The context object of choice handled back to each callback so the mentor + * knows about what request the fetcher is talking. + */ + public static interface GetQueueStatsMentor { + + /** + *

Returning true means that all queues will be present in the result. If + * false, empty queues won't show up the result.

+ * + * @param ctx See {@link GetQueueStatsMentor}. + */ + public boolean includeEmptyQueues( CTX ctx ); + + /**

Limits the result to the largest N queues.

*/ + public int limit( CTX ctx ); + + public String filter( CTX ctx); + + /**

Called ONCE with the final result.

*/ + public void onQueueStatistics(List queues, CTX ctx); + + /** + *

Called as soon an error occurs. After an error occurred, {@link #onQueueStatistics(List, Object)} + * will NOT be called, as the operation did fail.

+ */ + public void onError(Throwable ex, CTX ctx); + } + +} diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index 410e21c..6ad031d 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -2,9 +2,7 @@ import io.netty.util.internal.StringUtil; import io.vertx.core.AsyncResult; -import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.EventBus; @@ -12,7 +10,6 @@ import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; -import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.auth.authentication.AuthenticationProvider; @@ -21,69 +18,19 @@ import io.vertx.ext.web.handler.BasicAuthHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.swisspush.redisques.util.DequeueStatistic; -import org.swisspush.redisques.util.QueueStatisticsCollector; -import org.swisspush.redisques.util.RedisquesAPI; -import org.swisspush.redisques.util.RedisquesConfiguration; -import org.swisspush.redisques.util.Result; -import org.swisspush.redisques.util.StatusCode; +import org.swisspush.redisques.QueueStatsService; +import org.swisspush.redisques.QueueStatsService.GetQueueStatsMentor; +import org.swisspush.redisques.util.*; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Date; -import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import static org.swisspush.redisques.util.HttpServerRequestUtil.decode; -import static org.swisspush.redisques.util.HttpServerRequestUtil.encodePayload; -import static org.swisspush.redisques.util.HttpServerRequestUtil.evaluateUrlParameterToBeEmptyOrTrue; -import static org.swisspush.redisques.util.HttpServerRequestUtil.extractNonEmptyJsonArrayFromBody; -import static org.swisspush.redisques.util.RedisquesAPI.BAD_INPUT; -import static org.swisspush.redisques.util.RedisquesAPI.COUNT; -import static org.swisspush.redisques.util.RedisquesAPI.ERROR_TYPE; -import static org.swisspush.redisques.util.RedisquesAPI.FILTER; -import static org.swisspush.redisques.util.RedisquesAPI.LIMIT; -import static org.swisspush.redisques.util.RedisquesAPI.LOCKS; -import static org.swisspush.redisques.util.RedisquesAPI.MEMORY_FULL; -import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE; -import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME; -import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_SIZE; -import static org.swisspush.redisques.util.RedisquesAPI.NO_SUCH_LOCK; -import static org.swisspush.redisques.util.RedisquesAPI.OK; -import static org.swisspush.redisques.util.RedisquesAPI.QUEUES; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_DEQUEUESTATISTIC; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS; -import static org.swisspush.redisques.util.RedisquesAPI.STATUS; -import static org.swisspush.redisques.util.RedisquesAPI.VALUE; -import static org.swisspush.redisques.util.RedisquesAPI.buildAddQueueItemOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildBulkDeleteLocksOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildBulkDeleteQueuesOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildBulkPutLocksOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteAllLocksOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteAllQueueItemsOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteLockOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteQueueItemOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildEnqueueOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetAllLocksOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetConfigurationOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetLockOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemsCountOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemsOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesCountOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesItemsCountOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesSpeedOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesStatisticsOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildLockedEnqueueOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildPutLockOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildReplaceQueueItemOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildSetConfigurationOperation; +import static org.swisspush.redisques.util.HttpServerRequestUtil.*; +import static org.swisspush.redisques.util.RedisquesAPI.*; /** * Handler class for HTTP requests providing access to Redisques over HTTP. @@ -95,6 +42,7 @@ public class RedisquesHttpRequestHandler implements Handler { private static final Logger log = LoggerFactory.getLogger(RedisquesHttpRequestHandler.class); + private final Vertx vertx; private final Router router; private final EventBus eventBus; @@ -107,6 +55,13 @@ public class RedisquesHttpRequestHandler implements Handler { private static final String EMPTY_QUEUES_PARAM = "emptyQueues"; private static final String DELETED = "deleted"; + /** + *

For why we should NOT use such date formats, see SDCISA-15311. We really + * should utilize ISO dates and include timezone information.

+ * + * @deprecated TODO about date formats + */ + @Deprecated private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd.MM.yyyy hh:mm:ss"); private final String redisquesAddress; @@ -114,9 +69,11 @@ public class RedisquesHttpRequestHandler implements Handler { private final boolean enableQueueNameDecoding; private final int queueSpeedIntervalSec; private final QueueStatisticsCollector queueStatisticsCollector; + private final QueueStatsService queueStatsService; + private final GetQueueStatsMentor queueStatsMentor = new MyQueueStatsMentor(); public static void init(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) { - log.info("Enable http request handler: " + modConfig.getHttpRequestHandlerEnabled()); + log.info("Enabling http request handler: {}", modConfig.getHttpRequestHandlerEnabled()); if (modConfig.getHttpRequestHandlerEnabled()) { if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) { RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig, queueStatisticsCollector); @@ -124,7 +81,7 @@ public static void init(Vertx vertx, RedisquesConfiguration modConfig, QueueStat HttpServerOptions options = new HttpServerOptions().setHandle100ContinueAutomatically(true); vertx.createHttpServer(options).requestHandler(handler).listen(modConfig.getHttpRequestHandlerPort(), result -> { if (result.succeeded()) { - log.info("Successfully started http request handler on port " + modConfig.getHttpRequestHandlerPort()); + log.info("Successfully started http request handler on port {}", modConfig.getHttpRequestHandlerPort()); } else { log.error("Unable to start http request handler.", result.cause()); } @@ -149,6 +106,7 @@ private Result checkHttpAuthenticationConfiguration(RedisquesCo } private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) { + this.vertx = vertx; this.router = Router.router(vertx); this.eventBus = vertx.eventBus(); this.redisquesAddress = modConfig.getAddress(); @@ -156,6 +114,7 @@ private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfi this.enableQueueNameDecoding = modConfig.getEnableQueueNameDecoding(); this.queueSpeedIntervalSec = modConfig.getQueueSpeedIntervalSec(); this.queueStatisticsCollector = queueStatisticsCollector; + this.queueStatsService = new QueueStatsService(vertx, eventBus, redisquesAddress, queueStatisticsCollector); final String prefix = modConfig.getHttpRequestHandlerPrefix(); @@ -547,90 +506,80 @@ private void setConfiguration(RoutingContext ctx) { } private void getMonitorInformation(RoutingContext ctx) { - final boolean emptyQueues = evaluateUrlParameterToBeEmptyOrTrue(EMPTY_QUEUES_PARAM, ctx.request()); - final int limit = extractLimit(ctx); - String filter = ctx.request().params().get(FILTER); - eventBus.request(redisquesAddress, buildGetQueuesItemsCountOperation(filter), (Handler>>) reply -> { - if (reply.succeeded() && OK.equals(reply.result().body().getString(STATUS))) { - JsonArray queuesArray = reply.result().body().getJsonArray(QUEUES); - if (queuesArray != null && !queuesArray.isEmpty()) { - List queuesList = queuesArray.getList(); - queuesList = sortJsonQueueArrayBySize(queuesList); - if (!emptyQueues) { - queuesList = filterJsonQueueArrayNotEmpty(queuesList); - } - if (limit > 0) { - queuesList = limitJsonQueueArray(queuesList, limit); - } + queueStatsService.getQueueStats(ctx, queueStatsMentor); + } - // this function always succeeds, no need to handle the error case - fillStatisticToQueuesList(queuesList).onSuccess(updatedQueuesList -> { - JsonObject resultObject = new JsonObject(); - resultObject.put(QUEUES, updatedQueuesList); - jsonResponse(ctx.response(), resultObject); - }); - } else { - // there was no result, we as well return an empty result - JsonObject resultObject = new JsonObject(); - resultObject.put(QUEUES, new JsonArray()); - jsonResponse(ctx.response(), resultObject); - } - } else { - String error = "Error gathering names of active queues"; - log.error(error, reply.cause()); - respondWith(StatusCode.INTERNAL_SERVER_ERROR, error, ctx.request()); + private class MyQueueStatsMentor implements GetQueueStatsMentor { + + @Override + public boolean includeEmptyQueues(RoutingContext ctx) { + return evaluateUrlParameterToBeEmptyOrTrue(EMPTY_QUEUES_PARAM, ctx.request()); + } + + @Override + public int limit(RoutingContext ctx) { + return extractLimit(ctx); + } + + @Override + public String filter(RoutingContext ctx) { + return ctx.request().params().get(FILTER); + } + + @Override + public void onQueueStatistics(List queues, RoutingContext ctx) { + var rsp = ctx.request().response(); + rsp.putHeader(CONTENT_TYPE, APPLICATION_JSON); + rsp.setChunked(true); + rsp.write("{\"queues\": ["); + JsonObject queueJson = new JsonObject(); + boolean isFirst = true; + for (QueueStatsService.Queue queue : queues) { + rsp.write(isFirst ? "" : ","); + isFirst = false; + queueJson.clear(); + queueJson.put(MONITOR_QUEUE_NAME, queue.getName()); + queueJson.put(MONITOR_QUEUE_SIZE, queue.getSize()); + // TODO old impl did bloat result with empty strings for whatever undocumented + // reason. Those fields should be set to 'null' (or we could even skip them + // entirely in JSON), as obviously the information is not available. But + // we'll add the same bloat as we don't know if any downstream code now + // relies on this behavior. + Long epochMs; + epochMs = queue.getLastDequeueAttemptEpochMs(); + queueJson.put("lastDequeueAttempt", epochMs == null ? "" : formatAsUIDate(epochMs)); + epochMs = queue.getLastDequeueSuccessEpochMs(); + queueJson.put("lastDequeueSuccess", epochMs == null ? "" : formatAsUIDate(epochMs)); + epochMs = queue.getNextDequeueDueTimestampEpochMs(); + queueJson.put("nextDequeueDueTimestamp", epochMs == null ? "" : formatAsUIDate(epochMs)); + rsp.write(queueJson.encode()); } - }); - } + rsp.end("]}\n"); + } - private Future> fillStatisticToQueuesList(List queuesList) { - Promise> promise = Promise.promise(); - List queueNameList = new ArrayList<>(); - for (JsonObject jsonObject : queuesList) { - queueNameList.add(jsonObject.getString(MONITOR_QUEUE_NAME)); + @Override + public void onError(Throwable ex, RoutingContext ctx) { + String exMsg = ex.getMessage(); + if (!ctx.response().ended()) { + respondWith(StatusCode.INTERNAL_SERVER_ERROR, exMsg, ctx.request()); + } else { + log.warn("_q938hugz_ {}", ctx.request().uri(), ex); + } } - queueStatisticsCollector.getQueueStatistics(queueNameList) - .onFailure(ex -> { - log.error("Failed to fetch QueueStatistics for queue", ex); - promise.complete(queuesList); - }) - .onSuccess(queueStatisticsJsonObject -> { - if (OK.equals(queueStatisticsJsonObject.getString(STATUS)) - && !queueStatisticsJsonObject.getJsonArray(QUEUES).isEmpty()) { - JsonArray queueStatisticsArray = queueStatisticsJsonObject.getJsonArray(QUEUES); - queuesList.forEach(entries -> { - String queueName = entries.getString(MONITOR_QUEUE_NAME); - entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT, ""); - entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS, ""); - entries.put(STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS, ""); - queueStatisticsJsonObject.getJsonArray(QUEUES); - - for (Iterator it = queueStatisticsArray.stream().iterator(); it.hasNext(); ) { - JsonObject queueStatistic = (JsonObject) it.next(); - if (queueName.equals(queueStatistic.getString(MONITOR_QUEUE_NAME)) - && queueStatistic.containsKey(STATISTIC_QUEUE_DEQUEUESTATISTIC) - && queueStatistic.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC) != null) { - DequeueStatistic dequeueStatistic = queueStatistic.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC).mapTo(DequeueStatistic.class); - if (dequeueStatistic.lastDequeueAttemptTimestamp != null) { - entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueAttemptTimestamp))); - } - if (dequeueStatistic.lastDequeueSuccessTimestamp != null) { - entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueSuccessTimestamp))); - } - if (dequeueStatistic.nextDequeueDueTimestamp != null) { - entries.put(STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS, DATE_FORMAT.format(new Date(dequeueStatistic.nextDequeueDueTimestamp))); - } - break; - } - } - }); - } - promise.complete(queuesList); - }); - return promise.future(); } + /** + *

Old impl did not document WHY this date format got chosen. Now we're + * stuck as consumers likely rely on this format. To get this fixed, we have + * to find and fix all consumers.

+ * + * @deprecated about date formats + */ + @Deprecated + private String formatAsUIDate(long epochMs) { + return DATE_FORMAT.format(new Date(epochMs)); + } private void listOrCountQueues(RoutingContext ctx) { if (evaluateUrlParameterToBeEmptyOrTrue(COUNT, ctx.request())) { @@ -710,7 +659,8 @@ private void listQueueItems(RoutingContext ctx) { } else { ctx.response().setStatusCode(StatusCode.NOT_FOUND.getStatusCode()); ctx.response().end(replyBody.getString(MESSAGE)); - log.warn("Error in routerMatcher.getWithRegEx. Command = '" + (replyBody.getString("command") == null ? "" : replyBody.getString("command")) + "'."); + log.warn("Error in routerMatcher.getWithRegEx. Command = '{}'.", + replyBody.getString("command") == null ? "" : replyBody.getString("command")); } }); }); @@ -869,7 +819,7 @@ private void bulkDeleteQueues(RoutingContext ctx) { private void respondWith(StatusCode statusCode, String responseMessage, HttpServerRequest request) { final HttpServerResponse response = request.response(); - log.info("Responding with status code " + statusCode + " and message: " + responseMessage); + log.info("Responding with status code {} and message: {}", statusCode, responseMessage); response.setStatusCode(statusCode.getStatusCode()); response.setStatusMessage(statusCode.getStatusMessage()); response.putHeader(CONTENT_TYPE, TEXT_PLAIN); @@ -939,7 +889,7 @@ private int extractLimit(RoutingContext ctx) { return Integer.parseInt(limitParam); } catch (NumberFormatException ex) { if (limitParam != null) { - log.warn("Non-numeric limit parameter value used: " + limitParam); + log.warn("Non-numeric limit parameter value used: {}", limitParam); } return Integer.MAX_VALUE; } diff --git a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java index 6f49c6d..09a130a 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java +++ b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java @@ -7,11 +7,7 @@ import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; -import io.vertx.redis.client.Command; -import io.vertx.redis.client.Redis; -import io.vertx.redis.client.RedisAPI; -import io.vertx.redis.client.Request; -import io.vertx.redis.client.Response; +import io.vertx.redis.client.*; import io.vertx.redis.client.impl.types.NumberType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,15 +22,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME; -import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_SIZE; -import static org.swisspush.redisques.util.RedisquesAPI.OK; -import static org.swisspush.redisques.util.RedisquesAPI.QUEUENAME; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_BACKPRESSURE; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_FAILURES; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_SLOWDOWN; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_SPEED; -import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static java.lang.System.currentTimeMillis; +import static org.swisspush.redisques.util.RedisquesAPI.*; /** * Class StatisticsCollector helps collecting statistics information about queue handling and @@ -461,18 +450,18 @@ public Future getQueueStatistics(final List queues) { var ctx = new RequestCtx(); ctx.queueNames = queues; step1(ctx).compose( - jsonObject1 -> step2(ctx).compose( - jsonObject2 -> step3(ctx).compose( - jsonObject3 -> step4(ctx).compose( - jsonObject4 -> step5(ctx).compose( - jsonObject5 -> step6(ctx)) + nothing1 -> step2(ctx).compose( + nothing2 -> step3(ctx).compose( + nothing3 -> step4(ctx).compose( + nothing4 -> step5(ctx).compose( + nothing5 -> step6(ctx)) )))).onComplete(promise); return promise.future(); } /**

init redis connection.

*/ - Future step1(RequestCtx ctx) { - final Promise promise = Promise.promise(); + Future step1(RequestCtx ctx) { + final Promise promise = Promise.promise(); redisProvider.connection() .onFailure(throwable -> { promise.fail(new Exception("Redis: Failed to get queue length.", throwable)); @@ -486,38 +475,53 @@ Future step1(RequestCtx ctx) { } /**

Query queue lengths.

*/ - Future step2(RequestCtx ctx) { + Future step2(RequestCtx ctx) { assert ctx.conn != null; - final Promise promise = Promise.promise(); + final Promise promise = Promise.promise(); + int numQueues = ctx.queueNames.size(); + String fmt1 = "About to perform {} requests to redis just for monitoring"; + if (numQueues > 256) { + log.warn(fmt1, numQueues); + } else { + log.debug(fmt1, numQueues); + } + long begRedisRequestsEpochMs = currentTimeMillis(); List responses = ctx.queueNames.stream() .map(queue -> ctx.conn.send(Request.cmd(Command.LLEN, queuePrefix + queue))) .collect(Collectors.toList()); - CompositeFuture.all(responses) - .onFailure(ex -> { - promise.fail("Unexpected queue length result"); - }) - .onSuccess(compositeFuture -> { - List queueLengthList = compositeFuture.list(); - if (queueLengthList == null) { - promise.fail("Unexpected queue length result: null"); - return; - } - if (queueLengthList.size() != ctx.queueNames.size()) { - String err = "Unexpected queue length result with unequal size " + - ctx.queueNames.size() + " : " + queueLengthList.size(); - promise.fail(err); - return; - } - ctx.queueLengthList = queueLengthList; - promise.complete(); - }); + CompositeFuture.all(responses).onComplete( ev -> { + long durRedisRequestsMs = currentTimeMillis() - begRedisRequestsEpochMs; + String fmt2 = "All those {} redis requests took {}ms"; + if (durRedisRequestsMs > 3000) { + log.warn(fmt2, numQueues, durRedisRequestsMs); + } else { + log.debug(fmt2, numQueues, durRedisRequestsMs); + } + if (ev.failed()) { + promise.fail(new Exception("Unexpected queue length result", ev.cause())); + return; + } + List queueLengthList = ev.result().list(); + if (queueLengthList == null) { + promise.fail("Unexpected queue length result: null"); + return; + } + if (queueLengthList.size() != ctx.queueNames.size()) { + String err = "Unexpected queue length result with unequal size " + + ctx.queueNames.size() + " : " + queueLengthList.size(); + promise.fail(err); + return; + } + ctx.queueLengthList = queueLengthList; + promise.complete(); + }); return promise.future(); } /**

init queue statistics.

*/ - Future step3(RequestCtx ctx) { + Future step3(RequestCtx ctx) { assert ctx.queueLengthList != null; - final Promise promise = Promise.promise(); + final Promise promise = Promise.promise(); // populate the list of queue statistics in a Hashmap for later fast merging ctx.statistics = new HashMap<>(ctx.queueNames.size()); for (int i = 0; i < ctx.queueNames.size(); i++) { @@ -531,8 +535,8 @@ Future step3(RequestCtx ctx) { } /**

init a resAPI instance we need to get more details.

*/ - Future step4(RequestCtx ctx){ - final Promise promise = Promise.promise(); + Future step4(RequestCtx ctx){ + final Promise promise = Promise.promise(); redisProvider.redis() .onFailure(throwable -> { promise.fail(new Exception("Redis: Error in getQueueStatistics", throwable)); @@ -549,10 +553,10 @@ Future step4(RequestCtx ctx){ *

retrieve all available failure statistics from Redis and merge them * together with the previous populated common queue statistics map

*/ - Future step5(RequestCtx ctx) { + Future step5(RequestCtx ctx) { assert ctx.redisAPI != null; assert ctx.statistics != null; - final Promise promise = Promise.promise(); + final Promise promise = Promise.promise(); ctx.redisAPI.hvals(STATSKEY, statisticsSet -> { if( statisticsSet == null || statisticsSet.failed() ){ promise.fail(new RuntimeException("statistics queue evaluation failed",