From 913ee4e88714ce7e678e0d787c14d729594bbf62 Mon Sep 17 00:00:00 2001 From: runner Date: Tue, 23 Jan 2024 06:56:19 +0000 Subject: [PATCH 01/14] updating poms for branch'release-3.1.1' with non-snapshot versions --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4140a9c..32b19f5 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 3.1.1-SNAPSHOT + 3.1.1 redisques A highly scalable redis-persistent queuing system for vertx From b861e4a4235164276aed3f8c7548e97a3de6da72 Mon Sep 17 00:00:00 2001 From: runner Date: Tue, 23 Jan 2024 06:56:19 +0000 Subject: [PATCH 02/14] updating poms for 3.1.2-SNAPSHOT development --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 4140a9c..1a73012 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 3.1.1-SNAPSHOT + 3.1.2-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx From 9efaba1f1755d9815b302368855aabe27ead336f Mon Sep 17 00:00:00 2001 From: runner Date: Tue, 23 Jan 2024 06:58:47 +0000 Subject: [PATCH 03/14] updating develop poms to master versions to avoid merge conflicts --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 1a73012..32b19f5 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 3.1.2-SNAPSHOT + 3.1.1 redisques A highly scalable redis-persistent queuing system for vertx From 60d8a17c335f051c9f6e5983e6d01c6ecd2c8878 Mon Sep 17 00:00:00 2001 From: runner Date: Tue, 23 Jan 2024 06:58:47 +0000 Subject: [PATCH 04/14] Updating develop poms back to pre merge state --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 32b19f5..1a73012 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 3.1.1 + 3.1.2-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx From 0de96ef98263e9bdd311d5f84d1db46c2d9fc53f Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Tue, 19 Mar 2024 10:07:50 +0100 Subject: [PATCH 05/14] [SDCISA-15223] Drop logger implementation dependency Libraries must not include logger implementations. It is the applications choice which logger implementation to use. Not a libraries one. Including logger implementations from within libraries defeats the whole purpose of having a logging facade at all. And makes logger configuration in applications a nightmare, as they need to configure all logger impls pulled in somewhere by some library deep down the dependency tree. --- pom.xml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/pom.xml b/pom.xml index 1a73012..ea6a3bb 100644 --- a/pom.xml +++ b/pom.xml @@ -70,11 +70,6 @@ slf4j-api ${slf4j.version} - - org.slf4j - slf4j-reload4j - ${slf4j.version} - From 52b5ced9e7b3186ca0fc4099a3d4cb89cdc97ad2 Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Tue, 26 Mar 2024 17:39:55 +0100 Subject: [PATCH 06/14] [SDCISA-13746] Add simple time measuring log. --- .../util/QueueStatisticsCollector.java | 67 ++++++++++--------- 1 file changed, 34 insertions(+), 33 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java index 6f49c6d..ec76982 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java +++ b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java @@ -7,11 +7,7 @@ import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; -import io.vertx.redis.client.Command; -import io.vertx.redis.client.Redis; -import io.vertx.redis.client.RedisAPI; -import io.vertx.redis.client.Request; -import io.vertx.redis.client.Response; +import io.vertx.redis.client.*; import io.vertx.redis.client.impl.types.NumberType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,15 +22,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; -import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME; -import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_SIZE; -import static org.swisspush.redisques.util.RedisquesAPI.OK; -import static org.swisspush.redisques.util.RedisquesAPI.QUEUENAME; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_BACKPRESSURE; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_FAILURES; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_SLOWDOWN; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_SPEED; -import static org.swisspush.redisques.util.RedisquesAPI.STATUS; +import static java.lang.System.currentTimeMillis; +import static org.swisspush.redisques.util.RedisquesAPI.*; /** * Class StatisticsCollector helps collecting statistics information about queue handling and @@ -489,28 +478,40 @@ Future step1(RequestCtx ctx) { Future step2(RequestCtx ctx) { assert ctx.conn != null; final Promise promise = Promise.promise(); + int numQueues = ctx.queueNames.size(); + String fmt1 = "About to perform {} requests to redis just for monitoring"; + if (numQueues > 256) { + log.warn(fmt1, numQueues); + } else { + log.debug(fmt1, numQueues); + } + long begRedisRequestsEpochMs = currentTimeMillis(); List responses = ctx.queueNames.stream() .map(queue -> ctx.conn.send(Request.cmd(Command.LLEN, queuePrefix + queue))) .collect(Collectors.toList()); - CompositeFuture.all(responses) - .onFailure(ex -> { - promise.fail("Unexpected queue length result"); - }) - .onSuccess(compositeFuture -> { - List queueLengthList = compositeFuture.list(); - if (queueLengthList == null) { - promise.fail("Unexpected queue length result: null"); - return; - } - if (queueLengthList.size() != ctx.queueNames.size()) { - String err = "Unexpected queue length result with unequal size " + - ctx.queueNames.size() + " : " + queueLengthList.size(); - promise.fail(err); - return; - } - ctx.queueLengthList = queueLengthList; - promise.complete(); - }); + CompositeFuture.all(responses).onComplete( ev -> { + long durRedisRequestsMs = currentTimeMillis() - begRedisRequestsEpochMs; + String fmt2 = "All those {} redis requests took {}ms"; + if (durRedisRequestsMs > 3000) log.warn(fmt2, numQueues, durRedisRequestsMs); + else log.debug(fmt2, numQueues, durRedisRequestsMs); + if(ev.failed()){ + promise.fail(new Exception("Unexpected queue length result", ev.cause())); + return; + } + List queueLengthList = ev.result().list(); + if (queueLengthList == null) { + promise.fail("Unexpected queue length result: null"); + return; + } + if (queueLengthList.size() != ctx.queueNames.size()) { + String err = "Unexpected queue length result with unequal size " + + ctx.queueNames.size() + " : " + queueLengthList.size(); + promise.fail(err); + return; + } + ctx.queueLengthList = queueLengthList; + promise.complete(); + }); return promise.future(); } From 973c589cba7ef7fd44a8f01fdbf179bb151f7898 Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Wed, 27 Mar 2024 13:03:03 +0100 Subject: [PATCH 07/14] [SDCISA-13746] Add formatting bloat. --- .../redisques/util/QueueStatisticsCollector.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java index ec76982..55f1c04 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java +++ b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java @@ -492,9 +492,12 @@ Future step2(RequestCtx ctx) { CompositeFuture.all(responses).onComplete( ev -> { long durRedisRequestsMs = currentTimeMillis() - begRedisRequestsEpochMs; String fmt2 = "All those {} redis requests took {}ms"; - if (durRedisRequestsMs > 3000) log.warn(fmt2, numQueues, durRedisRequestsMs); - else log.debug(fmt2, numQueues, durRedisRequestsMs); - if(ev.failed()){ + if (durRedisRequestsMs > 3000) { + log.warn(fmt2, numQueues, durRedisRequestsMs); + } else { + log.debug(fmt2, numQueues, durRedisRequestsMs); + } + if (ev.failed()) { promise.fail(new Exception("Unexpected queue length result", ev.cause())); return; } From 4111082cbdc5fa8885671c16fbc612676a9cd8bb Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Wed, 27 Mar 2024 13:52:15 +0100 Subject: [PATCH 08/14] Fix few bad habits in RedisquesHttpRequestHandler --- .../handler/RedisquesHttpRequestHandler.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index 410e21c..4e7879f 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -116,7 +116,7 @@ public class RedisquesHttpRequestHandler implements Handler { private final QueueStatisticsCollector queueStatisticsCollector; public static void init(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) { - log.info("Enable http request handler: " + modConfig.getHttpRequestHandlerEnabled()); + log.info("Enable http request handler: {}", modConfig.getHttpRequestHandlerEnabled()); if (modConfig.getHttpRequestHandlerEnabled()) { if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) { RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig, queueStatisticsCollector); @@ -124,7 +124,7 @@ public static void init(Vertx vertx, RedisquesConfiguration modConfig, QueueStat HttpServerOptions options = new HttpServerOptions().setHandle100ContinueAutomatically(true); vertx.createHttpServer(options).requestHandler(handler).listen(modConfig.getHttpRequestHandlerPort(), result -> { if (result.succeeded()) { - log.info("Successfully started http request handler on port " + modConfig.getHttpRequestHandlerPort()); + log.info("Successfully started http request handler on port {}", modConfig.getHttpRequestHandlerPort()); } else { log.error("Unable to start http request handler.", result.cause()); } @@ -710,7 +710,8 @@ private void listQueueItems(RoutingContext ctx) { } else { ctx.response().setStatusCode(StatusCode.NOT_FOUND.getStatusCode()); ctx.response().end(replyBody.getString(MESSAGE)); - log.warn("Error in routerMatcher.getWithRegEx. Command = '" + (replyBody.getString("command") == null ? "" : replyBody.getString("command")) + "'."); + log.warn("Error in routerMatcher.getWithRegEx. Command = '{}'.", + replyBody.getString("command") == null ? "" : replyBody.getString("command")); } }); }); @@ -869,7 +870,7 @@ private void bulkDeleteQueues(RoutingContext ctx) { private void respondWith(StatusCode statusCode, String responseMessage, HttpServerRequest request) { final HttpServerResponse response = request.response(); - log.info("Responding with status code " + statusCode + " and message: " + responseMessage); + log.info("Responding with status code {} and message: {}", statusCode, responseMessage); response.setStatusCode(statusCode.getStatusCode()); response.setStatusMessage(statusCode.getStatusMessage()); response.putHeader(CONTENT_TYPE, TEXT_PLAIN); @@ -939,7 +940,7 @@ private int extractLimit(RoutingContext ctx) { return Integer.parseInt(limitParam); } catch (NumberFormatException ex) { if (limitParam != null) { - log.warn("Non-numeric limit parameter value used: " + limitParam); + log.warn("Non-numeric limit parameter value used: {}", limitParam); } return Integer.MAX_VALUE; } From e10d8e1c4fded639d50fc3765bc1c4c8facfa6a2 Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Thu, 28 Mar 2024 15:34:24 +0100 Subject: [PATCH 09/14] [SDCISA-13746] ReWrite QueueStats merging from an exponential algorithm to a linear one Likely this is only part of the story. And we likely need more fixes around this request. For now I'll only fix the algorithm part, as this was the simplest one. (wip @ 5af5024a352f0d04e57a8adc4119dd2483acfa53) --- .../redisques/QueueStatsService.java | 219 ++++++++++++++++++ .../handler/RedisquesHttpRequestHandler.java | 219 +++++++----------- .../util/QueueStatisticsCollector.java | 30 +-- 3 files changed, 318 insertions(+), 150 deletions(-) create mode 100644 src/main/java/org/swisspush/redisques/QueueStatsService.java diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java new file mode 100644 index 0000000..7393822 --- /dev/null +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -0,0 +1,219 @@ +package org.swisspush.redisques; + +import io.vertx.core.Vertx; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.Message; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import org.slf4j.Logger; +import org.swisspush.redisques.util.DequeueStatistic; +import org.swisspush.redisques.util.QueueStatisticsCollector; + +import java.util.*; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +import static java.lang.System.currentTimeMillis; +import static java.util.Collections.emptyList; +import static org.slf4j.LoggerFactory.getLogger; +import static org.swisspush.redisques.util.RedisquesAPI.*; + + +public class QueueStatsService { + + private static final Logger log = getLogger(QueueStatsService.class); + private final Vertx vertx; + private final EventBus eventBus; + private final String redisquesAddress; + private final QueueStatisticsCollector queueStatisticsCollector; + + public QueueStatsService(Vertx vertx, EventBus eventBus, String redisquesAddress, QueueStatisticsCollector queueStatisticsCollector) { + this.vertx = vertx; + this.eventBus = eventBus; + this.redisquesAddress = redisquesAddress; + this.queueStatisticsCollector = queueStatisticsCollector; + } + + public void getQueueStats(CTX mCtx, GetQueueStatsMentor mentor) { + var req0 = new GetQueueStatsRequest(); + req0.mCtx = mCtx; + req0.mentor = mentor; + fetchQueueNamesAndSize(req0, (ex1, req1) -> { + if (ex1 != null) { req1.mentor.onError(ex1, req1.mCtx); return; } + // Prepare a list of queue names as it is needed to fetch retryDetails. + req1.queueNames = new ArrayList<>(req1.queues.size()); + for (Queue q : req1.queues) req1.queueNames.add(q.name); + fetchRetryDetails(req1, (ex2, req2) -> { + if (ex2 != null) { req2.mentor.onError(ex2, req2.mCtx); return; } + mergeRetryDetailsIntoCollectedData(req2, (ex3, req3) -> { + if (ex3 != null) { req3.mentor.onError(ex3, req3.mCtx); return; } + req3.mentor.onQueueStatistics(req3.queues, req3.mCtx); + }); + }); + }); + } + + private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsumer> onDone) { + String filter = req.mentor.filter(req.mCtx); + JsonObject operation = buildGetQueuesItemsCountOperation(filter); + eventBus.request(redisquesAddress, operation, ev -> { + if (ev.failed()) { + onDone.accept(new Exception("eventBus.request()", ev.cause()), null); + return; + } + Message msg = ev.result(); + JsonObject body = msg.body(); + String status = body.getString(STATUS); + if (!OK.equals(status)) { + onDone.accept(new Exception("Unexpected status " + status), null); + return; + } + JsonArray queuesJsonArr = body.getJsonArray(QUEUES); + if (queuesJsonArr == null || queuesJsonArr.isEmpty()) { + log.debug("result was {}, we return an empty result.", queuesJsonArr == null ? "null" : "empty"); + req.queues = emptyList(); + onDone.accept(null, req); + return; + } + boolean includeEmptyQueues = req.mentor.includeEmptyQueues(req.mCtx); + List queues = new ArrayList<>(queuesJsonArr.size()); + for (var it = queuesJsonArr.iterator(); it.hasNext(); ) { + JsonObject queueJson = (JsonObject) it.next(); + String name = queueJson.getString(MONITOR_QUEUE_NAME); + Long size = queueJson.getLong(MONITOR_QUEUE_SIZE); + // No need to process empty queues any further if caller is not interested + // in them anyway. + if (!includeEmptyQueues && (size == null || size == 0)) continue; + Queue queue = new Queue(); + queue.name = name; + queue.size = size; + queues.add(queue); + } + queues.sort(this::compareLargestFirst); + // Only the part with the most filled queues got requested. Get rid of + // all shorter queues then. + int limit = req.mentor.limit(req.mCtx); + if (limit != 0 && queues.size() > limit) queues = queues.subList(0, limit); + req.queues = queues; + onDone.accept(null, req); + }); + } + + private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer> onDone) { + long begGetQueueStatsMs = currentTimeMillis(); + assert req.queueNames != null; + queueStatisticsCollector.getQueueStatistics(req.queueNames).onComplete( ev -> { + req.queueNames = null; // <- no longer needed + long durGetQueueStatsMs = currentTimeMillis() - begGetQueueStatsMs; + if (durGetQueueStatsMs > 42) log.debug("queueStatisticsCollector.getQueueStatistics() took {}ms", durGetQueueStatsMs); + if (ev.failed()) { + log.warn("queueStatisticsCollector.getQueueStatistics() failed. Fallback to empty result.", ev.cause()); + req.queuesJsonArr = new JsonArray(); + onDone.accept(null, req); + return; + } + JsonObject queStatsJsonObj = ev.result(); + String status = queStatsJsonObj.getString(STATUS); + if (!OK.equals(status)) { + log.warn("queueStatisticsCollector.getQueueStatistics() responded '" + status + "'. Fallback to empty result.", ev.cause()); + req.queuesJsonArr = new JsonArray(); + onDone.accept(null, req); + return; + } + req.queuesJsonArr = queStatsJsonObj.getJsonArray(QUEUES); + onDone.accept(null, req); + }); + } + + private void mergeRetryDetailsIntoCollectedData(GetQueueStatsRequest req, BiConsumer> onDone) { + // Setup a lookup table as we need to find by name further below. + Map detailsByName = new HashMap<>(req.queuesJsonArr.size()); + for (var it = (Iterator) (Object) req.queuesJsonArr.iterator(); it.hasNext(); ) { + JsonObject detailJson = it.next(); + String name = detailJson.getString(MONITOR_QUEUE_NAME); + detailsByName.put(name, detailJson); + } + for (Queue queue : req.queues) { + JsonObject detail = detailsByName.get(queue.name); + if (detail == null) continue; // no details to enrich. + JsonObject dequeueStatsJson = detail.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC); + if (dequeueStatsJson == null) continue; // no dequeue stats we could enrich + DequeueStatistic dequeueStats = dequeueStatsJson.mapTo(DequeueStatistic.class); + // Attach whatever details we got. + queue.lastDequeueAttemptEpochMs = dequeueStats.lastDequeueAttemptTimestamp; + queue.lastDequeueSuccessEpochMs = dequeueStats.lastDequeueSuccessTimestamp; + queue.nextDequeueDueTimestampEpochMs = dequeueStats.nextDequeueDueTimestamp; + } + onDone.accept(null, req); + } + + private int compareLargestFirst(Queue aq, Queue bq) { + if (aq.size == null && bq.size == null) return 0; + if (aq.size == null) return -1; + if (bq.size == null) return +1; + long as = aq.size, bs = bq.size; + if (as > bs) return -1; + if (as < bs) return +1; + assert as == bs : as +", "+ bs; + return 0; + } + + + private static class GetQueueStatsRequest { + private CTX mCtx; + private GetQueueStatsMentor mentor; + private List queueNames; + private JsonArray queuesJsonArr; + private List queues; + } + + + public static class Queue { + private String name; + private Long size; + private Long lastDequeueAttemptEpochMs; + private Long lastDequeueSuccessEpochMs; + private Long nextDequeueDueTimestampEpochMs; + + public String getName() { return name; } + public long getSize() { return size; } + public Long getLastDequeueAttemptEpochMs() { return lastDequeueAttemptEpochMs; } + public Long getLastDequeueSuccessEpochMs() { return lastDequeueSuccessEpochMs; } + public Long getNextDequeueDueTimestampEpochMs() { return nextDequeueDueTimestampEpochMs; } + } + + + /** + *

Mentors fetching operations and so provides the fetcher the required + * information. Finally it also receives the operations result.

+ * + * @param + * The context object of choice handled back to each callback so the mentor + * knows about what request the fetcher is talking. + */ + public static interface GetQueueStatsMentor { + + /** + *

Returning true means that all queues will be present in the result. If + * false, empty queues won't show up the result.

+ * + * @param ctx See {@link GetQueueStatsMentor}. + */ + public boolean includeEmptyQueues( CTX ctx ); + + /**

Limits the result to the largest N queues.

*/ + public int limit( CTX ctx ); + + public String filter( CTX ctx); + + /**

Called ONCE with the final result.

*/ + public void onQueueStatistics(List queues, CTX ctx); + + /** + *

Called as soon an error occurs. After an error occurred, {@link #onQueueStatistics(List, Object)} + * will NOT be called, as the operation did fail.

+ */ + public void onError(Throwable ex, CTX ctx); + } + +} diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index 410e21c..57a382a 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -2,9 +2,7 @@ import io.netty.util.internal.StringUtil; import io.vertx.core.AsyncResult; -import io.vertx.core.Future; import io.vertx.core.Handler; -import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.eventbus.EventBus; @@ -12,7 +10,6 @@ import io.vertx.core.http.HttpServerOptions; import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.HttpServerResponse; -import io.vertx.core.json.Json; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.ext.auth.authentication.AuthenticationProvider; @@ -21,69 +18,19 @@ import io.vertx.ext.web.handler.BasicAuthHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.swisspush.redisques.util.DequeueStatistic; -import org.swisspush.redisques.util.QueueStatisticsCollector; -import org.swisspush.redisques.util.RedisquesAPI; -import org.swisspush.redisques.util.RedisquesConfiguration; -import org.swisspush.redisques.util.Result; -import org.swisspush.redisques.util.StatusCode; +import org.swisspush.redisques.QueueStatsService; +import org.swisspush.redisques.QueueStatsService.GetQueueStatsMentor; +import org.swisspush.redisques.util.*; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; -import java.util.ArrayList; import java.util.Date; -import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; -import static org.swisspush.redisques.util.HttpServerRequestUtil.decode; -import static org.swisspush.redisques.util.HttpServerRequestUtil.encodePayload; -import static org.swisspush.redisques.util.HttpServerRequestUtil.evaluateUrlParameterToBeEmptyOrTrue; -import static org.swisspush.redisques.util.HttpServerRequestUtil.extractNonEmptyJsonArrayFromBody; -import static org.swisspush.redisques.util.RedisquesAPI.BAD_INPUT; -import static org.swisspush.redisques.util.RedisquesAPI.COUNT; -import static org.swisspush.redisques.util.RedisquesAPI.ERROR_TYPE; -import static org.swisspush.redisques.util.RedisquesAPI.FILTER; -import static org.swisspush.redisques.util.RedisquesAPI.LIMIT; -import static org.swisspush.redisques.util.RedisquesAPI.LOCKS; -import static org.swisspush.redisques.util.RedisquesAPI.MEMORY_FULL; -import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE; -import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME; -import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_SIZE; -import static org.swisspush.redisques.util.RedisquesAPI.NO_SUCH_LOCK; -import static org.swisspush.redisques.util.RedisquesAPI.OK; -import static org.swisspush.redisques.util.RedisquesAPI.QUEUES; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_DEQUEUESTATISTIC; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS; -import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS; -import static org.swisspush.redisques.util.RedisquesAPI.STATUS; -import static org.swisspush.redisques.util.RedisquesAPI.VALUE; -import static org.swisspush.redisques.util.RedisquesAPI.buildAddQueueItemOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildBulkDeleteLocksOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildBulkDeleteQueuesOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildBulkPutLocksOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteAllLocksOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteAllQueueItemsOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteLockOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteQueueItemOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildEnqueueOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetAllLocksOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetConfigurationOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetLockOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemsCountOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemsOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesCountOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesItemsCountOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesSpeedOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesStatisticsOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildLockedEnqueueOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildPutLockOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildReplaceQueueItemOperation; -import static org.swisspush.redisques.util.RedisquesAPI.buildSetConfigurationOperation; +import static org.swisspush.redisques.util.HttpServerRequestUtil.*; +import static org.swisspush.redisques.util.RedisquesAPI.*; /** * Handler class for HTTP requests providing access to Redisques over HTTP. @@ -95,6 +42,7 @@ public class RedisquesHttpRequestHandler implements Handler { private static final Logger log = LoggerFactory.getLogger(RedisquesHttpRequestHandler.class); + private final Vertx vertx; private final Router router; private final EventBus eventBus; @@ -107,6 +55,13 @@ public class RedisquesHttpRequestHandler implements Handler { private static final String EMPTY_QUEUES_PARAM = "emptyQueues"; private static final String DELETED = "deleted"; + /** + *

For why we should NOT use such date formats, see SDCISA-15311. We really + * should utilize ISO dates and include timezone information.

+ * + * @deprecated TODO { private final boolean enableQueueNameDecoding; private final int queueSpeedIntervalSec; private final QueueStatisticsCollector queueStatisticsCollector; + private final QueueStatsService queueStatsService; + private final GetQueueStatsMentor queueStatsMentor = new MyQueueStatsMentor(); public static void init(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) { log.info("Enable http request handler: " + modConfig.getHttpRequestHandlerEnabled()); @@ -149,6 +106,7 @@ private Result checkHttpAuthenticationConfiguration(RedisquesCo } private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) { + this.vertx = vertx; this.router = Router.router(vertx); this.eventBus = vertx.eventBus(); this.redisquesAddress = modConfig.getAddress(); @@ -156,6 +114,7 @@ private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfi this.enableQueueNameDecoding = modConfig.getEnableQueueNameDecoding(); this.queueSpeedIntervalSec = modConfig.getQueueSpeedIntervalSec(); this.queueStatisticsCollector = queueStatisticsCollector; + this.queueStatsService = new QueueStatsService(vertx, eventBus, redisquesAddress, queueStatisticsCollector); final String prefix = modConfig.getHttpRequestHandlerPrefix(); @@ -547,90 +506,80 @@ private void setConfiguration(RoutingContext ctx) { } private void getMonitorInformation(RoutingContext ctx) { - final boolean emptyQueues = evaluateUrlParameterToBeEmptyOrTrue(EMPTY_QUEUES_PARAM, ctx.request()); - final int limit = extractLimit(ctx); - String filter = ctx.request().params().get(FILTER); - eventBus.request(redisquesAddress, buildGetQueuesItemsCountOperation(filter), (Handler>>) reply -> { - if (reply.succeeded() && OK.equals(reply.result().body().getString(STATUS))) { - JsonArray queuesArray = reply.result().body().getJsonArray(QUEUES); - if (queuesArray != null && !queuesArray.isEmpty()) { - List queuesList = queuesArray.getList(); - queuesList = sortJsonQueueArrayBySize(queuesList); - if (!emptyQueues) { - queuesList = filterJsonQueueArrayNotEmpty(queuesList); - } - if (limit > 0) { - queuesList = limitJsonQueueArray(queuesList, limit); - } + queueStatsService.getQueueStats(ctx, queueStatsMentor); + } - // this function always succeeds, no need to handle the error case - fillStatisticToQueuesList(queuesList).onSuccess(updatedQueuesList -> { - JsonObject resultObject = new JsonObject(); - resultObject.put(QUEUES, updatedQueuesList); - jsonResponse(ctx.response(), resultObject); - }); - } else { - // there was no result, we as well return an empty result - JsonObject resultObject = new JsonObject(); - resultObject.put(QUEUES, new JsonArray()); - jsonResponse(ctx.response(), resultObject); - } - } else { - String error = "Error gathering names of active queues"; - log.error(error, reply.cause()); - respondWith(StatusCode.INTERNAL_SERVER_ERROR, error, ctx.request()); + private class MyQueueStatsMentor implements GetQueueStatsMentor { + + @Override + public boolean includeEmptyQueues(RoutingContext ctx) { + return evaluateUrlParameterToBeEmptyOrTrue(EMPTY_QUEUES_PARAM, ctx.request()); + } + + @Override + public int limit(RoutingContext ctx) { + return extractLimit(ctx); + } + + @Override + public String filter(RoutingContext ctx) { + return ctx.request().params().get(FILTER); + } + + @Override + public void onQueueStatistics(List queues, RoutingContext ctx) { + var rsp = ctx.request().response(); + rsp.putHeader(CONTENT_TYPE, APPLICATION_JSON); + rsp.setChunked(true); + rsp.write("{\"queues\": ["); + JsonObject queueJson = new JsonObject(); + boolean isFirst = true; + for (QueueStatsService.Queue queue : queues) { + rsp.write(isFirst ? "" : ","); + isFirst = false; + queueJson.clear(); + queueJson.put(MONITOR_QUEUE_NAME, queue.getName()); + queueJson.put(MONITOR_QUEUE_SIZE, queue.getSize()); + // TODO old impl did bloat result with empty strings for whatever undocumented + // reason. Those fields should be set to 'null' (or we could even skip them + // entirely in JSON), as obviously the information is not available. But + // we'll add the same bloat as we don't know if any downstream code now + // relies on this behavior. + Long epochMs; + epochMs = queue.getLastDequeueAttemptEpochMs(); + queueJson.put("lastDequeueAttempt", epochMs == null ? "" : asUglyDate(epochMs)); + epochMs = queue.getLastDequeueSuccessEpochMs(); + queueJson.put("lastDequeueSuccess", epochMs == null ? "" : asUglyDate(epochMs)); + epochMs = queue.getNextDequeueDueTimestampEpochMs(); + queueJson.put("nextDequeueDueTimestamp", epochMs == null ? "" : asUglyDate(epochMs)); + rsp.write(queueJson.encode()); } - }); - } + rsp.end("]}\n"); + } - private Future> fillStatisticToQueuesList(List queuesList) { - Promise> promise = Promise.promise(); - List queueNameList = new ArrayList<>(); - for (JsonObject jsonObject : queuesList) { - queueNameList.add(jsonObject.getString(MONITOR_QUEUE_NAME)); + @Override + public void onError(Throwable ex, RoutingContext ctx) { + String exMsg = ex.getMessage(); + if (!ctx.response().ended()) { + respondWith(StatusCode.INTERNAL_SERVER_ERROR, exMsg, ctx.request()); + } else { + log.warn("_q938hugz_ {}", ctx.request().uri(), ex); + } } - queueStatisticsCollector.getQueueStatistics(queueNameList) - .onFailure(ex -> { - log.error("Failed to fetch QueueStatistics for queue", ex); - promise.complete(queuesList); - }) - .onSuccess(queueStatisticsJsonObject -> { - if (OK.equals(queueStatisticsJsonObject.getString(STATUS)) - && !queueStatisticsJsonObject.getJsonArray(QUEUES).isEmpty()) { - JsonArray queueStatisticsArray = queueStatisticsJsonObject.getJsonArray(QUEUES); - queuesList.forEach(entries -> { - String queueName = entries.getString(MONITOR_QUEUE_NAME); - entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT, ""); - entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS, ""); - entries.put(STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS, ""); - queueStatisticsJsonObject.getJsonArray(QUEUES); - - for (Iterator it = queueStatisticsArray.stream().iterator(); it.hasNext(); ) { - JsonObject queueStatistic = (JsonObject) it.next(); - if (queueName.equals(queueStatistic.getString(MONITOR_QUEUE_NAME)) - && queueStatistic.containsKey(STATISTIC_QUEUE_DEQUEUESTATISTIC) - && queueStatistic.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC) != null) { - DequeueStatistic dequeueStatistic = queueStatistic.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC).mapTo(DequeueStatistic.class); - if (dequeueStatistic.lastDequeueAttemptTimestamp != null) { - entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueAttemptTimestamp))); - } - if (dequeueStatistic.lastDequeueSuccessTimestamp != null) { - entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueSuccessTimestamp))); - } - if (dequeueStatistic.nextDequeueDueTimestamp != null) { - entries.put(STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS, DATE_FORMAT.format(new Date(dequeueStatistic.nextDequeueDueTimestamp))); - } - break; - } - } - }); - } - promise.complete(queuesList); - }); - return promise.future(); } + /** + *

Old impl did not document WHY this date format got chosen. Now we're + * stuck as consumers likely rely on this format. To get this fixed, we have + * to find and fix all consumers.

+ * + * @deprecated about date formats + */ + @Deprecated + private String asUglyDate(long epochMs) { + return DATE_FORMAT.format(new Date(epochMs)); + } private void listOrCountQueues(RoutingContext ctx) { if (evaluateUrlParameterToBeEmptyOrTrue(COUNT, ctx.request())) { diff --git a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java index 55f1c04..09a130a 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java +++ b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java @@ -450,18 +450,18 @@ public Future getQueueStatistics(final List queues) { var ctx = new RequestCtx(); ctx.queueNames = queues; step1(ctx).compose( - jsonObject1 -> step2(ctx).compose( - jsonObject2 -> step3(ctx).compose( - jsonObject3 -> step4(ctx).compose( - jsonObject4 -> step5(ctx).compose( - jsonObject5 -> step6(ctx)) + nothing1 -> step2(ctx).compose( + nothing2 -> step3(ctx).compose( + nothing3 -> step4(ctx).compose( + nothing4 -> step5(ctx).compose( + nothing5 -> step6(ctx)) )))).onComplete(promise); return promise.future(); } /**

init redis connection.

*/ - Future step1(RequestCtx ctx) { - final Promise promise = Promise.promise(); + Future step1(RequestCtx ctx) { + final Promise promise = Promise.promise(); redisProvider.connection() .onFailure(throwable -> { promise.fail(new Exception("Redis: Failed to get queue length.", throwable)); @@ -475,9 +475,9 @@ Future step1(RequestCtx ctx) { } /**

Query queue lengths.

*/ - Future step2(RequestCtx ctx) { + Future step2(RequestCtx ctx) { assert ctx.conn != null; - final Promise promise = Promise.promise(); + final Promise promise = Promise.promise(); int numQueues = ctx.queueNames.size(); String fmt1 = "About to perform {} requests to redis just for monitoring"; if (numQueues > 256) { @@ -519,9 +519,9 @@ Future step2(RequestCtx ctx) { } /**

init queue statistics.

*/ - Future step3(RequestCtx ctx) { + Future step3(RequestCtx ctx) { assert ctx.queueLengthList != null; - final Promise promise = Promise.promise(); + final Promise promise = Promise.promise(); // populate the list of queue statistics in a Hashmap for later fast merging ctx.statistics = new HashMap<>(ctx.queueNames.size()); for (int i = 0; i < ctx.queueNames.size(); i++) { @@ -535,8 +535,8 @@ Future step3(RequestCtx ctx) { } /**

init a resAPI instance we need to get more details.

*/ - Future step4(RequestCtx ctx){ - final Promise promise = Promise.promise(); + Future step4(RequestCtx ctx){ + final Promise promise = Promise.promise(); redisProvider.redis() .onFailure(throwable -> { promise.fail(new Exception("Redis: Error in getQueueStatistics", throwable)); @@ -553,10 +553,10 @@ Future step4(RequestCtx ctx){ *

retrieve all available failure statistics from Redis and merge them * together with the previous populated common queue statistics map

*/ - Future step5(RequestCtx ctx) { + Future step5(RequestCtx ctx) { assert ctx.redisAPI != null; assert ctx.statistics != null; - final Promise promise = Promise.promise(); + final Promise promise = Promise.promise(); ctx.redisAPI.hvals(STATSKEY, statisticsSet -> { if( statisticsSet == null || statisticsSet.failed() ){ promise.fail(new RuntimeException("statistics queue evaluation failed", From 7ec89e12989defd36374ffbec9ee3d9c0ff35cd5 Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Thu, 28 Mar 2024 17:32:15 +0100 Subject: [PATCH 10/14] [SDCISA-13746] Apply PR feedback. --- .../redisques/QueueStatsService.java | 27 +++++++++---------- .../handler/RedisquesHttpRequestHandler.java | 2 +- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index 7393822..ea795f3 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -13,6 +13,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; +import static java.lang.Long.compare; import static java.lang.System.currentTimeMillis; import static java.util.Collections.emptyList; import static org.slf4j.LoggerFactory.getLogger; @@ -84,9 +85,7 @@ private void fetchQueueNamesAndSize(GetQueueStatsRequest req, BiConsu // No need to process empty queues any further if caller is not interested // in them anyway. if (!includeEmptyQueues && (size == null || size == 0)) continue; - Queue queue = new Queue(); - queue.name = name; - queue.size = size; + Queue queue = new Queue(name, size); queues.add(queue); } queues.sort(this::compareLargestFirst); @@ -105,7 +104,7 @@ private void fetchRetryDetails(GetQueueStatsRequest req, BiConsumer { req.queueNames = null; // <- no longer needed long durGetQueueStatsMs = currentTimeMillis() - begGetQueueStatsMs; - if (durGetQueueStatsMs > 42) log.debug("queueStatisticsCollector.getQueueStatistics() took {}ms", durGetQueueStatsMs); + log.debug("queueStatisticsCollector.getQueueStatistics() took {}ms", durGetQueueStatsMs); if (ev.failed()) { log.warn("queueStatisticsCollector.getQueueStatistics() failed. Fallback to empty result.", ev.cause()); req.queuesJsonArr = new JsonArray(); @@ -147,15 +146,8 @@ private void mergeRetryDetailsIntoCollectedData(GetQueueStatsRequest onDone.accept(null, req); } - private int compareLargestFirst(Queue aq, Queue bq) { - if (aq.size == null && bq.size == null) return 0; - if (aq.size == null) return -1; - if (bq.size == null) return +1; - long as = aq.size, bs = bq.size; - if (as > bs) return -1; - if (as < bs) return +1; - assert as == bs : as +", "+ bs; - return 0; + private int compareLargestFirst(Queue a, Queue b) { + return compare(b.size, a.size); } @@ -169,11 +161,16 @@ private static class GetQueueStatsRequest { public static class Queue { - private String name; - private Long size; + private final String name; + private final long size; private Long lastDequeueAttemptEpochMs; private Long lastDequeueSuccessEpochMs; private Long nextDequeueDueTimestampEpochMs; + private Queue(String name, long size){ + assert name != null; + this.name = name; + this.size = size; + } public String getName() { return name; } public long getSize() { return size; } diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index 57a382a..13cca4f 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -59,7 +59,7 @@ public class RedisquesHttpRequestHandler implements Handler { *

For why we should NOT use such date formats, see SDCISA-15311. We really * should utilize ISO dates and include timezone information.

* - * @deprecated TODO about date formats */ @Deprecated private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd.MM.yyyy hh:mm:ss"); From 71d648fcd87fd7d82f5fcd0c9c5a34391bb57bb3 Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Tue, 2 Apr 2024 09:12:37 +0200 Subject: [PATCH 11/14] [SDCISA-13746] Apply PR feedback (#2) - Reword method to conceal the issue better. - Explain exponential VS linear. --- .../java/org/swisspush/redisques/QueueStatsService.java | 9 +++++++++ .../redisques/handler/RedisquesHttpRequestHandler.java | 8 ++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/QueueStatsService.java b/src/main/java/org/swisspush/redisques/QueueStatsService.java index ea795f3..5ff05ef 100644 --- a/src/main/java/org/swisspush/redisques/QueueStatsService.java +++ b/src/main/java/org/swisspush/redisques/QueueStatsService.java @@ -20,6 +20,15 @@ import static org.swisspush.redisques.util.RedisquesAPI.*; +/** + *

Old impl did fetch all queues (take 2000 as an example from PaISA prod) Did + * a nested iteration, so worst case iterate 2000 times 2000 queues to find the + * matching queue (2'000 * 2'000 = 4'000'000 iterations). New impl now does setup a + * dictionary then does indexed access while iterating the 2000 entries + * (2'000 + 2'000 = 4'000 iterations). Keep in mind 2'000 is just some usual value. + * Under load, this value can increase further and so the old, exponential approach + * just did explode in terms of computational efford.

+ */ public class QueueStatsService { private static final Logger log = getLogger(QueueStatsService.class); diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index 13cca4f..5ab5caf 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -547,11 +547,11 @@ public void onQueueStatistics(List queues, RoutingConte // relies on this behavior. Long epochMs; epochMs = queue.getLastDequeueAttemptEpochMs(); - queueJson.put("lastDequeueAttempt", epochMs == null ? "" : asUglyDate(epochMs)); + queueJson.put("lastDequeueAttempt", epochMs == null ? "" : formatAsUIDate(epochMs)); epochMs = queue.getLastDequeueSuccessEpochMs(); - queueJson.put("lastDequeueSuccess", epochMs == null ? "" : asUglyDate(epochMs)); + queueJson.put("lastDequeueSuccess", epochMs == null ? "" : formatAsUIDate(epochMs)); epochMs = queue.getNextDequeueDueTimestampEpochMs(); - queueJson.put("nextDequeueDueTimestamp", epochMs == null ? "" : asUglyDate(epochMs)); + queueJson.put("nextDequeueDueTimestamp", epochMs == null ? "" : formatAsUIDate(epochMs)); rsp.write(queueJson.encode()); } rsp.end("]}\n"); @@ -577,7 +577,7 @@ public void onError(Throwable ex, RoutingContext ctx) { * @deprecated about date formats */ @Deprecated - private String asUglyDate(long epochMs) { + private String formatAsUIDate(long epochMs) { return DATE_FORMAT.format(new Date(epochMs)); } From b9d47aa3491e458ce147ab9c1f0642b7b5c6cc6e Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Tue, 2 Apr 2024 13:34:03 +0200 Subject: [PATCH 12/14] Apply PR feedback. --- .../redisques/handler/RedisquesHttpRequestHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java index 4e7879f..7997310 100644 --- a/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java +++ b/src/main/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandler.java @@ -116,7 +116,7 @@ public class RedisquesHttpRequestHandler implements Handler { private final QueueStatisticsCollector queueStatisticsCollector; public static void init(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) { - log.info("Enable http request handler: {}", modConfig.getHttpRequestHandlerEnabled()); + log.info("Enabling http request handler: {}", modConfig.getHttpRequestHandlerEnabled()); if (modConfig.getHttpRequestHandlerEnabled()) { if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) { RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig, queueStatisticsCollector); From 91309a99a6a4413b13f27147af930dc7bea02043 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Weber?= Date: Tue, 2 Apr 2024 15:48:01 +0200 Subject: [PATCH 13/14] Update Vert.x to 4.5.2 to solve CVE-2024-1023 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ea6a3bb..bca0b84 100644 --- a/pom.xml +++ b/pom.xml @@ -384,7 +384,7 @@ - 4.5.1 + 4.5.2 2.0.10 5.8.0 4.13.2 From de6bdb1a504ba0b42c6d6c3d07be2e82bfcdb684 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marc-Andr=C3=A9=20Weber?= Date: Tue, 2 Apr 2024 16:45:59 +0200 Subject: [PATCH 14/14] Update pom.xml --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index bca0b84..be82a0f 100644 --- a/pom.xml +++ b/pom.xml @@ -392,7 +392,7 @@ 3.7.0 5.4.0 1.16.0 - 2.15.0 + 2.17.0 UTF8 https://oss.sonatype.org/content/repositories/snapshots/