Skip to content

Commit

Permalink
Merge pull request #139 from hiddenalpha/SDCISA-10974-DiscloseErrorsI…
Browse files Browse the repository at this point in the history
…nQueueStatisticsCollector-20231206
  • Loading branch information
hiddenalpha authored Dec 7, 2023
2 parents c0b2d1a + 3b77e8c commit 7954e2f
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 73 deletions.
220 changes: 148 additions & 72 deletions src/main/java/org/swisspush/redisques/util/QueueStatisticsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.impl.types.NumberType;
Expand Down Expand Up @@ -315,14 +316,21 @@ private void updateStatisticsInRedis(String queueName) {
obj.put(QUEUE_FAILURES, failures);
obj.put(QUEUE_SLOWDOWNTIME, slowDownTime);
obj.put(QUEUE_BACKPRESSURE, backpressureTime);
redisProvider.redis().onSuccess(redisAPI -> redisAPI.hset(List.of(STATSKEY, queueName, obj.toString()),
emptyHandler -> {
})).onFailure(throwable -> log.error("Redis: Error in updateStatisticsInRedis", throwable));
redisProvider.redis()
.onSuccess(redisAPI -> {
redisAPI.hset(List.of(STATSKEY, queueName, obj.toString()), ev -> {
if( ev.failed() ) log.warn("TODO error handling", new Exception(ev.cause()));
});
})
.onFailure(ex -> log.error("Redis: Error in updateStatisticsInRedis", ex));
} else {
redisProvider.redis().onSuccess(redisAPI -> redisAPI.hdel(List.of(STATSKEY, queueName),
emptyHandler -> {
})).onFailure(throwable -> log.error("Redis: Error in updateStatisticsInRedis", throwable));

redisProvider.redis()
.onSuccess(redisAPI -> {
redisAPI.hdel(List.of(STATSKEY, queueName), ev -> {
if (ev.failed()) log.warn("TODO error handling", new Exception(ev.cause()));
});
})
.onFailure(ex -> log.error("Redis: Error in updateStatisticsInRedis", ex));
}
}

Expand Down Expand Up @@ -409,77 +417,133 @@ public void getQueueStatistics(Message<JsonObject> event, final List<String> que
event.reply(new JsonObject().put(STATUS, OK).put(RedisquesAPI.QUEUES, new JsonArray()));
return;
}
redisProvider.connection().onSuccess(conn -> {
List<Future> responses = queues.stream().map(queue -> conn.send(Request.cmd(Command.LLEN, queuePrefix + queue))
).collect(Collectors.toList());
CompositeFuture.all(responses).onFailure(throwable -> {
log.error("Unexepected queue length result");
event.reply(new JsonObject().put(STATUS, ERROR));
}).onSuccess(compositeFuture -> {
List<NumberType> queueLengthList = compositeFuture.list();
if (queueLengthList == null) {
log.error("Unexepected queue length result null");
event.reply(new JsonObject().put(STATUS, ERROR));
return;
}
if (queueLengthList.size() != queues.size()) {
log.error("Unexpected queue length result with unequal size {} : {}",
queues.size(), queueLengthList.size());
event.reply(new JsonObject().put(STATUS, ERROR));
return;
}
// populate the list of queue statistics in a Hashmap for later fast merging
final HashMap<String, QueueStatistic> statisticsMap = new HashMap<>();
for (int i = 0; i < queues.size(); i++) {
QueueStatistic qs = new QueueStatistic(queues.get(i));
qs.setSize(queueLengthList.get(i).toLong());
qs.setMessageSpeed(getQueueSpeed(qs.queueName));
statisticsMap.put(qs.queueName, qs);
}
// now retrieve all available failure statistics from Redis and merge them
// together with the previous populated common queue statistics map
redisProvider.redis().onSuccess(redisAPI -> redisAPI.hvals(STATSKEY, statisticsSet -> {
if (statisticsSet == null) {
log.error("Unexpected statistics queue evaluation result result null");
event.reply(new JsonObject().put(STATUS, ERROR));
var ctx = new RequestCtx();
ctx.event = event;
ctx.queueNames = queues;
step1(ctx);
}

/** <p>init redis connection.</p> */
void step1(RequestCtx ctx) {
redisProvider.connection()
.onFailure(ex -> {
log.warn("Redis: Failed to get queue length.", new Exception(ex));
ctx.event.reply(new JsonObject().put(STATUS, ERROR));
})
.onSuccess(conn -> {
assert conn != null;
ctx.conn = conn;
step2(ctx);
});
}

/** <p>Query queue lengths.</p> */
void step2(RequestCtx ctx) {
assert ctx.conn != null;
List<Future> responses = ctx.queueNames.stream()
.map(queue -> ctx.conn.send(Request.cmd(Command.LLEN, queuePrefix + queue)))
.collect(Collectors.toList());
CompositeFuture.all(responses)
.onFailure(ex -> {
log.warn("Unexpected queue length result", new Exception(ex));
ctx.event.reply(new JsonObject().put(STATUS, ERROR));
})
.onSuccess(compositeFuture -> {
List<NumberType> queueLengthList = compositeFuture.list();
if (queueLengthList == null) {
log.warn("Unexpected queue length result: null");
ctx.event.reply(new JsonObject().put(STATUS, ERROR));
return;
}
// put the received statistics data to the former prepared statistics objects
// per queue
for (Response response : statisticsSet.result()) {
JsonObject jObj = new JsonObject(response.toString());
String queueName = jObj.getString(RedisquesAPI.QUEUENAME);
QueueStatistic queueStatistic = statisticsMap.get(queueName);
if (queueStatistic != null) {
// if it isn't there, there is obviously no statistic needed
queueStatistic.setFailures(jObj.getLong(QUEUE_FAILURES, 0L));
queueStatistic.setBackpressureTime(jObj.getLong(QUEUE_BACKPRESSURE, 0L));
queueStatistic.setSlowdownTime(jObj.getLong(QUEUE_SLOWDOWNTIME, 0L));
}
}
// build the final resulting statistics list from the former merged queue
// values from various sources
JsonArray result = new JsonArray();
for (String queueName : queues) {
QueueStatistic stats = statisticsMap.get(queueName);
if (stats != null) {
result.add(stats.getAsJsonObject());
}
if (queueLengthList.size() != ctx.queueNames.size()) {
log.error("Unexpected queue length result with unequal size {} : {}",
ctx.queueNames.size(), queueLengthList.size());
ctx.event.reply(new JsonObject().put(STATUS, ERROR));
return;
}
event.reply(new JsonObject().put(RedisquesAPI.STATUS, RedisquesAPI.OK)
.put(RedisquesAPI.QUEUES, result));
})).onFailure(throwable -> {
log.error("Redis: Error in getQueueStatistics", throwable);
event.reply(new JsonObject().put(STATUS, ERROR));
});
ctx.queueLengthList = queueLengthList;
step3(ctx);
})
;
}

/** <p>init queue statistics.</p> */
void step3(RequestCtx ctx) {
assert ctx.queueLengthList != null;
// populate the list of queue statistics in a Hashmap for later fast merging
ctx.statistics = new HashMap<>(ctx.queueNames.size());
for (int i = 0; i < ctx.queueNames.size(); i++) {
QueueStatistic qs = new QueueStatistic(ctx.queueNames.get(i));
qs.setSize(ctx.queueLengthList.get(i).toLong());
qs.setMessageSpeed(getQueueSpeed(qs.queueName));
ctx.statistics.put(qs.queueName, qs);
}
step4(ctx);
}

/** <p>init a resAPI instance we need to get more details.</p> */
void step4(RequestCtx ctx){
redisProvider.redis()
.onFailure(ex -> {
log.error("Redis: Error in getQueueStatistics", new Exception(ex));
ctx.event.reply(new JsonObject().put(STATUS, ERROR));
})
.onSuccess(redisAPI -> {
assert redisAPI != null;
ctx.redisAPI = redisAPI;
step5(ctx);
})
;
}

});
}).onFailure(throwable -> {
log.warn("Redis: Failed to get queue length.", throwable);
event.reply(new JsonObject().put(STATUS, ERROR));
/**
* <p>retrieve all available failure statistics from Redis and merge them
* together with the previous populated common queue statistics map</p>
*/
void step5(RequestCtx ctx) {
assert ctx.redisAPI != null;
assert ctx.statistics != null;
ctx.redisAPI.hvals(STATSKEY, statisticsSet -> {
if( statisticsSet == null || statisticsSet.failed() ){
log.error("statistics queue evaluation failed",
statisticsSet == null ? null : statisticsSet.cause());
ctx.event.reply(new JsonObject().put(STATUS, ERROR));
return;
}
ctx.redisFailStats = statisticsSet.result();
assert ctx.redisFailStats != null;
step6(ctx);
});
}

/** <p>put received statistics data to the former prepared statistics objects per
* queue.</p> */
void step6(RequestCtx ctx ){
assert ctx.redisFailStats != null;
for (Response response : ctx.redisFailStats) {
JsonObject jObj = new JsonObject(response.toString());
String queueName = jObj.getString(QUEUENAME);
QueueStatistic queueStatistic = ctx.statistics.get(queueName);
if (queueStatistic != null) {
// if it isn't there, there is obviously no statistic needed
queueStatistic.setFailures(jObj.getLong(QUEUE_FAILURES, 0L));
queueStatistic.setBackpressureTime(jObj.getLong(QUEUE_BACKPRESSURE, 0L));
queueStatistic.setSlowdownTime(jObj.getLong(QUEUE_SLOWDOWNTIME, 0L));
}
}
// build the final resulting statistics list from the former merged queue
// values from various sources
JsonArray result = new JsonArray();
for (String queueName : ctx.queueNames) {
QueueStatistic stats = ctx.statistics.get(queueName);
if (stats != null) {
result.add(stats.getAsJsonObject());
}
}
ctx.event.reply(new JsonObject().put(STATUS, OK)
.put(RedisquesAPI.QUEUES, result));
}

/**
* Retrieve the summarized queue speed for the requested queues.
* <p>
Expand All @@ -503,4 +567,16 @@ public void getQueuesSpeed(Message<JsonObject> event, final List<String> queues)
event.reply(new JsonObject().put(STATUS, OK).put(STATISTIC_QUEUE_SPEED, speed));
}

/** <p>Holds intermediate state related to a {@link #getQueueStatistics(Message, List)}
* request.</p> */
private static class RequestCtx {
private Message<JsonObject> event; // origin event we have to answer
private List<String> queueNames; // Requested queues to analyze
private Redis conn;
private RedisAPI redisAPI;
private List<NumberType> queueLengthList;
private HashMap<String, QueueStatistic> statistics; // Stats we're going to populate
private Response redisFailStats; // failure stats we got from redis.
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public Future<Void> executeDelayedMax(long delayMs) {
log.debug("starting timer with a delay of " + delay + "ms");
vertx.setTimer(delay, delayed -> promise.complete());
} else {
promise.complete();
vertx.runOnContext(aVoid -> promise.complete());
}

return promise.future();
Expand Down

0 comments on commit 7954e2f

Please sign in to comment.