From 3b77e8c5a80489ca1257fddceb1ab2f54549db57 Mon Sep 17 00:00:00 2001 From: Andreas Fankhauser <23085769+hiddenalpha@users.noreply.github.com> Date: Wed, 6 Dec 2023 14:40:42 +0100 Subject: [PATCH] [SDCISA-10974, SDCISA-14097] Fix async behavior in timer. Kill callback-hell. Log concealed errors. --- .../util/QueueStatisticsCollector.java | 220 ++++++++++++------ .../redisques/util/RedisQuesTimer.java | 2 +- 2 files changed, 149 insertions(+), 73 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java index 6b0fea15..f5a8465c 100644 --- a/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java +++ b/src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java @@ -2,12 +2,13 @@ import io.vertx.core.CompositeFuture; import io.vertx.core.Future; -import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.redis.client.Command; +import io.vertx.redis.client.Redis; +import io.vertx.redis.client.RedisAPI; import io.vertx.redis.client.Request; import io.vertx.redis.client.Response; import io.vertx.redis.client.impl.types.NumberType; @@ -315,14 +316,21 @@ private void updateStatisticsInRedis(String queueName) { obj.put(QUEUE_FAILURES, failures); obj.put(QUEUE_SLOWDOWNTIME, slowDownTime); obj.put(QUEUE_BACKPRESSURE, backpressureTime); - redisProvider.redis().onSuccess(redisAPI -> redisAPI.hset(List.of(STATSKEY, queueName, obj.toString()), - emptyHandler -> { - })).onFailure(throwable -> log.error("Redis: Error in updateStatisticsInRedis", throwable)); + redisProvider.redis() + .onSuccess(redisAPI -> { + redisAPI.hset(List.of(STATSKEY, queueName, obj.toString()), ev -> { + if( ev.failed() ) log.warn("TODO error handling", new Exception(ev.cause())); + }); + }) + .onFailure(ex -> log.error("Redis: Error in updateStatisticsInRedis", ex)); } else { - redisProvider.redis().onSuccess(redisAPI -> redisAPI.hdel(List.of(STATSKEY, queueName), - emptyHandler -> { - })).onFailure(throwable -> log.error("Redis: Error in updateStatisticsInRedis", throwable)); - + redisProvider.redis() + .onSuccess(redisAPI -> { + redisAPI.hdel(List.of(STATSKEY, queueName), ev -> { + if (ev.failed()) log.warn("TODO error handling", new Exception(ev.cause())); + }); + }) + .onFailure(ex -> log.error("Redis: Error in updateStatisticsInRedis", ex)); } } @@ -409,77 +417,133 @@ public void getQueueStatistics(Message event, final List que event.reply(new JsonObject().put(STATUS, OK).put(RedisquesAPI.QUEUES, new JsonArray())); return; } - redisProvider.connection().onSuccess(conn -> { - List responses = queues.stream().map(queue -> conn.send(Request.cmd(Command.LLEN, queuePrefix + queue)) - ).collect(Collectors.toList()); - CompositeFuture.all(responses).onFailure(throwable -> { - log.error("Unexepected queue length result"); - event.reply(new JsonObject().put(STATUS, ERROR)); - }).onSuccess(compositeFuture -> { - List queueLengthList = compositeFuture.list(); - if (queueLengthList == null) { - log.error("Unexepected queue length result null"); - event.reply(new JsonObject().put(STATUS, ERROR)); - return; - } - if (queueLengthList.size() != queues.size()) { - log.error("Unexpected queue length result with unequal size {} : {}", - queues.size(), queueLengthList.size()); - event.reply(new JsonObject().put(STATUS, ERROR)); - return; - } - // populate the list of queue statistics in a Hashmap for later fast merging - final HashMap statisticsMap = new HashMap<>(); - for (int i = 0; i < queues.size(); i++) { - QueueStatistic qs = new QueueStatistic(queues.get(i)); - qs.setSize(queueLengthList.get(i).toLong()); - qs.setMessageSpeed(getQueueSpeed(qs.queueName)); - statisticsMap.put(qs.queueName, qs); - } - // now retrieve all available failure statistics from Redis and merge them - // together with the previous populated common queue statistics map - redisProvider.redis().onSuccess(redisAPI -> redisAPI.hvals(STATSKEY, statisticsSet -> { - if (statisticsSet == null) { - log.error("Unexpected statistics queue evaluation result result null"); - event.reply(new JsonObject().put(STATUS, ERROR)); + var ctx = new RequestCtx(); + ctx.event = event; + ctx.queueNames = queues; + step1(ctx); + } + + /**

init redis connection.

*/ + void step1(RequestCtx ctx) { + redisProvider.connection() + .onFailure(ex -> { + log.warn("Redis: Failed to get queue length.", new Exception(ex)); + ctx.event.reply(new JsonObject().put(STATUS, ERROR)); + }) + .onSuccess(conn -> { + assert conn != null; + ctx.conn = conn; + step2(ctx); + }); + } + + /**

Query queue lengths.

*/ + void step2(RequestCtx ctx) { + assert ctx.conn != null; + List responses = ctx.queueNames.stream() + .map(queue -> ctx.conn.send(Request.cmd(Command.LLEN, queuePrefix + queue))) + .collect(Collectors.toList()); + CompositeFuture.all(responses) + .onFailure(ex -> { + log.warn("Unexpected queue length result", new Exception(ex)); + ctx.event.reply(new JsonObject().put(STATUS, ERROR)); + }) + .onSuccess(compositeFuture -> { + List queueLengthList = compositeFuture.list(); + if (queueLengthList == null) { + log.warn("Unexpected queue length result: null"); + ctx.event.reply(new JsonObject().put(STATUS, ERROR)); return; } - // put the received statistics data to the former prepared statistics objects - // per queue - for (Response response : statisticsSet.result()) { - JsonObject jObj = new JsonObject(response.toString()); - String queueName = jObj.getString(RedisquesAPI.QUEUENAME); - QueueStatistic queueStatistic = statisticsMap.get(queueName); - if (queueStatistic != null) { - // if it isn't there, there is obviously no statistic needed - queueStatistic.setFailures(jObj.getLong(QUEUE_FAILURES, 0L)); - queueStatistic.setBackpressureTime(jObj.getLong(QUEUE_BACKPRESSURE, 0L)); - queueStatistic.setSlowdownTime(jObj.getLong(QUEUE_SLOWDOWNTIME, 0L)); - } - } - // build the final resulting statistics list from the former merged queue - // values from various sources - JsonArray result = new JsonArray(); - for (String queueName : queues) { - QueueStatistic stats = statisticsMap.get(queueName); - if (stats != null) { - result.add(stats.getAsJsonObject()); - } + if (queueLengthList.size() != ctx.queueNames.size()) { + log.error("Unexpected queue length result with unequal size {} : {}", + ctx.queueNames.size(), queueLengthList.size()); + ctx.event.reply(new JsonObject().put(STATUS, ERROR)); + return; } - event.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK) - .put(RedisquesAPI.QUEUES, result)); - })).onFailure(throwable -> { - log.error("Redis: Error in getQueueStatistics", throwable); - event.reply(new JsonObject().put(STATUS, ERROR)); - }); + ctx.queueLengthList = queueLengthList; + step3(ctx); + }) + ; + } + + /**

init queue statistics.

*/ + void step3(RequestCtx ctx) { + assert ctx.queueLengthList != null; + // populate the list of queue statistics in a Hashmap for later fast merging + ctx.statistics = new HashMap<>(ctx.queueNames.size()); + for (int i = 0; i < ctx.queueNames.size(); i++) { + QueueStatistic qs = new QueueStatistic(ctx.queueNames.get(i)); + qs.setSize(ctx.queueLengthList.get(i).toLong()); + qs.setMessageSpeed(getQueueSpeed(qs.queueName)); + ctx.statistics.put(qs.queueName, qs); + } + step4(ctx); + } + + /**

init a resAPI instance we need to get more details.

*/ + void step4(RequestCtx ctx){ + redisProvider.redis() + .onFailure(ex -> { + log.error("Redis: Error in getQueueStatistics", new Exception(ex)); + ctx.event.reply(new JsonObject().put(STATUS, ERROR)); + }) + .onSuccess(redisAPI -> { + assert redisAPI != null; + ctx.redisAPI = redisAPI; + step5(ctx); + }) + ; + } - }); - }).onFailure(throwable -> { - log.warn("Redis: Failed to get queue length.", throwable); - event.reply(new JsonObject().put(STATUS, ERROR)); + /** + *

retrieve all available failure statistics from Redis and merge them + * together with the previous populated common queue statistics map

+ */ + void step5(RequestCtx ctx) { + assert ctx.redisAPI != null; + assert ctx.statistics != null; + ctx.redisAPI.hvals(STATSKEY, statisticsSet -> { + if( statisticsSet == null || statisticsSet.failed() ){ + log.error("statistics queue evaluation failed", + statisticsSet == null ? null : statisticsSet.cause()); + ctx.event.reply(new JsonObject().put(STATUS, ERROR)); + return; + } + ctx.redisFailStats = statisticsSet.result(); + assert ctx.redisFailStats != null; + step6(ctx); }); } + /**

put received statistics data to the former prepared statistics objects per + * queue.

*/ + void step6(RequestCtx ctx ){ + assert ctx.redisFailStats != null; + for (Response response : ctx.redisFailStats) { + JsonObject jObj = new JsonObject(response.toString()); + String queueName = jObj.getString(QUEUENAME); + QueueStatistic queueStatistic = ctx.statistics.get(queueName); + if (queueStatistic != null) { + // if it isn't there, there is obviously no statistic needed + queueStatistic.setFailures(jObj.getLong(QUEUE_FAILURES, 0L)); + queueStatistic.setBackpressureTime(jObj.getLong(QUEUE_BACKPRESSURE, 0L)); + queueStatistic.setSlowdownTime(jObj.getLong(QUEUE_SLOWDOWNTIME, 0L)); + } + } + // build the final resulting statistics list from the former merged queue + // values from various sources + JsonArray result = new JsonArray(); + for (String queueName : ctx.queueNames) { + QueueStatistic stats = ctx.statistics.get(queueName); + if (stats != null) { + result.add(stats.getAsJsonObject()); + } + } + ctx.event.reply(new JsonObject().put(STATUS, OK) + .put(RedisquesAPI.QUEUES, result)); + } + /** * Retrieve the summarized queue speed for the requested queues. *

@@ -503,4 +567,16 @@ public void getQueuesSpeed(Message event, final List queues) event.reply(new JsonObject().put(STATUS, OK).put(STATISTIC_QUEUE_SPEED, speed)); } + /**

Holds intermediate state related to a {@link #getQueueStatistics(Message, List)} + * request.

*/ + private static class RequestCtx { + private Message event; // origin event we have to answer + private List queueNames; // Requested queues to analyze + private Redis conn; + private RedisAPI redisAPI; + private List queueLengthList; + private HashMap statistics; // Stats we're going to populate + private Response redisFailStats; // failure stats we got from redis. + } + } diff --git a/src/main/java/org/swisspush/redisques/util/RedisQuesTimer.java b/src/main/java/org/swisspush/redisques/util/RedisQuesTimer.java index a01b7076..f372b359 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisQuesTimer.java +++ b/src/main/java/org/swisspush/redisques/util/RedisQuesTimer.java @@ -39,7 +39,7 @@ public Future executeDelayedMax(long delayMs) { log.debug("starting timer with a delay of " + delay + "ms"); vertx.setTimer(delay, delayed -> promise.complete()); } else { - promise.complete(); + vertx.runOnContext(aVoid -> promise.complete()); } return promise.future();