Skip to content

Commit

Permalink
Merge branch 'develop' into feature/update_vertx
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/test/java/org/swisspush/redisques/handler/RedisquesHttpRequestHandlerTest.java
  • Loading branch information
mcweba committed Jan 10, 2024
2 parents 4b9cbb8 + c86baf0 commit b7661ae
Show file tree
Hide file tree
Showing 9 changed files with 359 additions and 79 deletions.
25 changes: 23 additions & 2 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.swisspush.redisques.util.*;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import static org.swisspush.redisques.util.RedisquesAPI.*;
Expand Down Expand Up @@ -93,6 +94,8 @@ private enum QueueState {

private Map<QueueOperation, QueueAction> queueActions = new HashMap<>();

private Map<String, DequeueStatistic> dequeueStatistic = new ConcurrentHashMap<>();

public RedisQues() {
}

Expand Down Expand Up @@ -161,6 +164,14 @@ public void start(Promise<Void> promise) {
RedisquesConfiguration modConfig = configurationProvider.configuration();
log.info("Starting Redisques module with configuration: {}", configurationProvider.configuration());

int dequeueStatisticReportInterval = modConfig.getDequeueStatisticReportIntervalSec();
if (dequeueStatisticReportInterval > 0) {
vertx.setPeriodic(1000L * dequeueStatisticReportInterval, handler -> {
dequeueStatistic.forEach((queueName, dequeueStatistic) ->
queueStatisticsCollector.setDequeueStatistic(queueName, dequeueStatistic));
});
}

queuesKey = modConfig.getRedisPrefix() + "queues";
queuesPrefix = modConfig.getRedisPrefix() + "queues:";
consumersPrefix = modConfig.getRedisPrefix() + "consumers:";
Expand Down Expand Up @@ -188,7 +199,7 @@ private void initialize() {
this.queueStatisticsCollector = new QueueStatisticsCollector(redisProvider,
queuesPrefix, vertx, configuration.getQueueSpeedIntervalSec());

RedisquesHttpRequestHandler.init(vertx, configuration);
RedisquesHttpRequestHandler.init(vertx, configuration, queueStatisticsCollector);

// only initialize memoryUsageProvider when not provided in the constructor
if (memoryUsageProvider == null) {
Expand Down Expand Up @@ -558,6 +569,8 @@ private Future<Void> readQueue(final String queueName) {
Response response = answer.result();
log.trace("RedisQues read queue lindex result: {}", response);
if (response != null) {
dequeueStatistic.computeIfAbsent(queueName, s -> new DequeueStatistic());
dequeueStatistic.get(queueName).lastDequeueAttemptTimestamp = System.currentTimeMillis();
processMessageWithTimeout(queueName, response.toString(), success -> {

// update the queue failure count and get a retry interval
Expand Down Expand Up @@ -619,6 +632,7 @@ private Future<Void> readQueue(final String queueName) {
// This can happen when requests to consume happen at the same moment the queue is emptied.
log.debug("Got a request to consume from empty queue {}", queueName);
myQueues.put(queueName, QueueState.READY);
dequeueStatistic.remove(queueName);
promise.complete();
}
})).onFailure(throwable -> {
Expand All @@ -640,6 +654,8 @@ private void rescheduleSendMessageAfterFailure(final String queueName, int retry
log.trace("RedsQues reschedule after failure for queue: {}", queueName);

vertx.setTimer(retryInSeconds * 1000L, timerId -> {
long retryDelayInMills = retryInSeconds * 1000L;
dequeueStatistic.get(queueName).nextDequeueDueTimestamp = System.currentTimeMillis() + retryDelayInMills;
if (log.isDebugEnabled()) {
log.debug("RedisQues re-notify the consumer of queue '{}' at {}", queueName, new Date(System.currentTimeMillis()));
}
Expand Down Expand Up @@ -675,6 +691,8 @@ private void processMessageWithTimeout(final String queue, final String payload,
boolean success;
if (reply.succeeded()) {
success = OK.equals(reply.result().body().getString(STATUS));
dequeueStatistic.get(queue).lastDequeueSuccessTimestamp = System.currentTimeMillis();
dequeueStatistic.get(queue).nextDequeueDueTimestamp = null;
} else {
log.info("RedisQues QUEUE_ERROR: Consumer failed {} queue: {}",
uid, queue, new Exception(reply.cause()));
Expand Down Expand Up @@ -826,7 +844,10 @@ private Future<Void> checkQueues() {
});
} else {
// Ensure we clean the old queues also in the case of empty queue.
log.trace("RedisQues remove old queue: {}", queueName);
if (log.isTraceEnabled()) {
log.trace("RedisQues remove old queue: {}", queueName);
}
dequeueStatistic.remove(queueName);
if (counter.decrementAndGet() == 0) {
removeOldQueues(limit).onComplete(removeOldQueuesEvent -> {
if( removeOldQueuesEvent.failed() )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.util.HandlerUtil;
import org.swisspush.redisques.util.QueueStatisticsCollector;

import java.util.List;
import java.util.Optional;
import java.util.regex.Pattern;

import static org.slf4j.LoggerFactory.getLogger;
import static org.swisspush.redisques.util.RedisquesAPI.ERROR;
import static org.swisspush.redisques.util.RedisquesAPI.STATUS;

Expand All @@ -23,7 +23,7 @@
*/
public class GetQueuesStatisticsHandler implements Handler<AsyncResult<Response>> {

private static final Logger log = getLogger(GetQueuesStatisticsHandler.class);
private static final Logger log = LoggerFactory.getLogger(GetQueuesStatisticsHandler.class);
private final Message<JsonObject> event;
private final Optional<Pattern> filterPattern;
private final QueueStatisticsCollector queueStatisticsCollector;
Expand All @@ -42,8 +42,13 @@ public GetQueuesStatisticsHandler(
public void handle(AsyncResult<Response> handleQueues) {
if (handleQueues.succeeded()) {
List<String> queues = HandlerUtil
.filterByPattern(handleQueues.result(), filterPattern);
queueStatisticsCollector.getQueueStatistics(event, queues);
.filterByPattern(handleQueues.result(), filterPattern);
queueStatisticsCollector.getQueueStatistics(queues)
.onFailure(ex -> {
log.error("", ex);
event.reply(new JsonObject().put(STATUS, ERROR));
})
.onSuccess(event::reply);
} else {
log.warn("Concealed error", new Exception(handleQueues.cause()));
event.reply(new JsonObject().put(STATUS, ERROR));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

import io.netty.util.internal.StringUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.Message;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.Json;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.authentication.AuthenticationProvider;
Expand All @@ -18,18 +21,69 @@
import io.vertx.ext.web.handler.BasicAuthHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.swisspush.redisques.util.DequeueStatistic;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisquesAPI;
import org.swisspush.redisques.util.RedisquesConfiguration;
import org.swisspush.redisques.util.Result;
import org.swisspush.redisques.util.StatusCode;

import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.swisspush.redisques.util.HttpServerRequestUtil.*;
import static org.swisspush.redisques.util.RedisquesAPI.*;
import static org.swisspush.redisques.util.HttpServerRequestUtil.decode;
import static org.swisspush.redisques.util.HttpServerRequestUtil.encodePayload;
import static org.swisspush.redisques.util.HttpServerRequestUtil.evaluateUrlParameterToBeEmptyOrTrue;
import static org.swisspush.redisques.util.HttpServerRequestUtil.extractNonEmptyJsonArrayFromBody;
import static org.swisspush.redisques.util.RedisquesAPI.BAD_INPUT;
import static org.swisspush.redisques.util.RedisquesAPI.COUNT;
import static org.swisspush.redisques.util.RedisquesAPI.ERROR_TYPE;
import static org.swisspush.redisques.util.RedisquesAPI.FILTER;
import static org.swisspush.redisques.util.RedisquesAPI.LIMIT;
import static org.swisspush.redisques.util.RedisquesAPI.LOCKS;
import static org.swisspush.redisques.util.RedisquesAPI.MEMORY_FULL;
import static org.swisspush.redisques.util.RedisquesAPI.MESSAGE;
import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_NAME;
import static org.swisspush.redisques.util.RedisquesAPI.MONITOR_QUEUE_SIZE;
import static org.swisspush.redisques.util.RedisquesAPI.NO_SUCH_LOCK;
import static org.swisspush.redisques.util.RedisquesAPI.OK;
import static org.swisspush.redisques.util.RedisquesAPI.QUEUES;
import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_DEQUEUESTATISTIC;
import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT;
import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS;
import static org.swisspush.redisques.util.RedisquesAPI.STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS;
import static org.swisspush.redisques.util.RedisquesAPI.STATUS;
import static org.swisspush.redisques.util.RedisquesAPI.VALUE;
import static org.swisspush.redisques.util.RedisquesAPI.buildAddQueueItemOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildBulkDeleteLocksOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildBulkDeleteQueuesOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildBulkPutLocksOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteAllLocksOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteAllQueueItemsOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteLockOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildDeleteQueueItemOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildEnqueueOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetAllLocksOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetConfigurationOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetLockOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemsCountOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueueItemsOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesCountOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesItemsCountOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesSpeedOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildGetQueuesStatisticsOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildLockedEnqueueOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildPutLockOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildReplaceQueueItemOperation;
import static org.swisspush.redisques.util.RedisquesAPI.buildSetConfigurationOperation;

/**
* Handler class for HTTP requests providing access to Redisques over HTTP.
Expand All @@ -53,16 +107,19 @@ public class RedisquesHttpRequestHandler implements Handler<HttpServerRequest> {
private static final String EMPTY_QUEUES_PARAM = "emptyQueues";
private static final String DELETED = "deleted";

private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("dd.MM.yyyy hh:mm:ss");

private final String redisquesAddress;
private final String userHeader;
private final boolean enableQueueNameDecoding;
private final int queueSpeedIntervalSec;
private final QueueStatisticsCollector queueStatisticsCollector;

public static void init(Vertx vertx, RedisquesConfiguration modConfig) {
public static void init(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) {
log.info("Enable http request handler: " + modConfig.getHttpRequestHandlerEnabled());
if (modConfig.getHttpRequestHandlerEnabled()) {
if (modConfig.getHttpRequestHandlerPort() != null && modConfig.getHttpRequestHandlerUserHeader() != null) {
RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig);
RedisquesHttpRequestHandler handler = new RedisquesHttpRequestHandler(vertx, modConfig, queueStatisticsCollector);
// in Vert.x 2x 100-continues was activated per default, in vert.x 3x it is off per default.
HttpServerOptions options = new HttpServerOptions().setHandle100ContinueAutomatically(true);
vertx.createHttpServer(options).requestHandler(handler).listen(modConfig.getHttpRequestHandlerPort(), result -> {
Expand Down Expand Up @@ -91,13 +148,14 @@ private Result<Boolean, String> checkHttpAuthenticationConfiguration(RedisquesCo
return Result.ok(false);
}

private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfig) {
private RedisquesHttpRequestHandler(Vertx vertx, RedisquesConfiguration modConfig, QueueStatisticsCollector queueStatisticsCollector) {
this.router = Router.router(vertx);
this.eventBus = vertx.eventBus();
this.redisquesAddress = modConfig.getAddress();
this.userHeader = modConfig.getHttpRequestHandlerUserHeader();
this.enableQueueNameDecoding = modConfig.getEnableQueueNameDecoding();
this.queueSpeedIntervalSec = modConfig.getQueueSpeedIntervalSec();
this.queueStatisticsCollector = queueStatisticsCollector;

final String prefix = modConfig.getHttpRequestHandlerPrefix();

Expand Down Expand Up @@ -504,9 +562,13 @@ private void getMonitorInformation(RoutingContext ctx) {
if (limit > 0) {
queuesList = limitJsonQueueArray(queuesList, limit);
}
JsonObject resultObject = new JsonObject();
resultObject.put(QUEUES, queuesList);
jsonResponse(ctx.response(), resultObject);

// this function always succeeds, no need to handle the error case
fillStatisticToQueuesList(queuesList).onSuccess(updatedQueuesList -> {
JsonObject resultObject = new JsonObject();
resultObject.put(QUEUES, updatedQueuesList);
jsonResponse(ctx.response(), resultObject);
});
} else {
// there was no result, we as well return an empty result
JsonObject resultObject = new JsonObject();
Expand All @@ -521,6 +583,54 @@ private void getMonitorInformation(RoutingContext ctx) {
});
}

private Future<List<JsonObject>> fillStatisticToQueuesList(List<JsonObject> queuesList) {
Promise<List<JsonObject>> promise = Promise.promise();
List<String> queueNameList = new ArrayList<>();
for (JsonObject jsonObject : queuesList) {
queueNameList.add(jsonObject.getString(MONITOR_QUEUE_NAME));
}

queueStatisticsCollector.getQueueStatistics(queueNameList)
.onFailure(ex -> {
log.error("Failed to fetch QueueStatistics for queue", ex);
promise.complete(queuesList);
})
.onSuccess(queueStatisticsJsonObject -> {
if (OK.equals(queueStatisticsJsonObject.getString(STATUS))
&& !queueStatisticsJsonObject.getJsonArray(QUEUES).isEmpty()) {
JsonArray queueStatisticsArray = queueStatisticsJsonObject.getJsonArray(QUEUES);
queuesList.forEach(entries -> {
String queueName = entries.getString(MONITOR_QUEUE_NAME);
entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT, "");
entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS, "");
entries.put(STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS, "");
queueStatisticsJsonObject.getJsonArray(QUEUES);

for (Iterator<Object> it = queueStatisticsArray.stream().iterator(); it.hasNext(); ) {
JsonObject queueStatistic = (JsonObject) it.next();
if (queueName.equals(queueStatistic.getString(MONITOR_QUEUE_NAME))
&& queueStatistic.containsKey(STATISTIC_QUEUE_DEQUEUESTATISTIC)
&& queueStatistic.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC) != null) {
DequeueStatistic dequeueStatistic = queueStatistic.getJsonObject(STATISTIC_QUEUE_DEQUEUESTATISTIC).mapTo(DequeueStatistic.class);
if (dequeueStatistic.lastDequeueAttemptTimestamp != null) {
entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_ATTEMPT, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueAttemptTimestamp)));
}
if (dequeueStatistic.lastDequeueSuccessTimestamp != null) {
entries.put(STATISTIC_QUEUE_LAST_DEQUEUE_SUCCESS, DATE_FORMAT.format(new Date(dequeueStatistic.lastDequeueSuccessTimestamp)));
}
if (dequeueStatistic.nextDequeueDueTimestamp != null) {
entries.put(STATISTIC_QUEUE_NEXT_DEQUEUE_DUE_TS, DATE_FORMAT.format(new Date(dequeueStatistic.nextDequeueDueTimestamp)));
}
break;
}
}
});
}
promise.complete(queuesList);
});
return promise.future();
}


private void listOrCountQueues(RoutingContext ctx) {
if (evaluateUrlParameterToBeEmptyOrTrue(COUNT, ctx.request())) {
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/swisspush/redisques/util/DequeueStatistic.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.swisspush.redisques.util;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public class DequeueStatistic {
public Long lastDequeueAttemptTimestamp = null;
public Long lastDequeueSuccessTimestamp = null;
public Long nextDequeueDueTimestamp = null;

public boolean isEmpty() {
return lastDequeueAttemptTimestamp == null && lastDequeueSuccessTimestamp == null && nextDequeueDueTimestamp == null;
}
}
Loading

0 comments on commit b7661ae

Please sign in to comment.