Skip to content

Commit

Permalink
Merge pull request #145 from swisspost/develop
Browse files Browse the repository at this point in the history
PR for release
  • Loading branch information
mcweba authored Jan 10, 2024
2 parents 9c2f20f + c86baf0 commit d8f6f24
Show file tree
Hide file tree
Showing 49 changed files with 1,211 additions and 664 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
<version>3.0.32-SNAPSHOT</version>
<version>3.0.33-SNAPSHOT</version>
<name>redisques</name>
<description>
A highly scalable redis-persistent queuing system for vertx
Expand Down
303 changes: 149 additions & 154 deletions src/main/java/org/swisspush/redisques/RedisQues.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ public AbstractQueueAction(Vertx vertx, RedisProvider redisProvider, String addr
}

protected Handler<Throwable> replyErrorMessageHandler(Message<JsonObject> event) {
return throwable -> event.reply(new JsonObject().put(STATUS, ERROR));
return ex -> {
log.warn("Concealed error", new Exception(ex));
event.reply(new JsonObject().put(STATUS, ERROR));
};
}

protected long getMaxAgeTimestamp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@

public class AddQueueItemAction extends AbstractQueueAction {

public AddQueueItemAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
public AddQueueItemAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
}

@Override
public void execute(Message<JsonObject> event) {
String key1 = queuesPrefix + event.body().getJsonObject(PAYLOAD).getString(QUEUENAME);
String valueAddItem = event.body().getJsonObject(PAYLOAD).getString(BUFFER);
redisProvider.redis().onSuccess(
redisAPI -> redisAPI.rpush(Arrays.asList(key1, valueAddItem), new AddQueueItemHandler(event)))
.onFailure(replyErrorMessageHandler(event));
var p = redisProvider.redis();
p.onSuccess(redisAPI -> redisAPI.rpush(Arrays.asList(key1, valueAddItem), new AddQueueItemHandler(event)));
p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

public class BulkDeleteQueuesAction extends AbstractQueueAction {

public BulkDeleteQueuesAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
public BulkDeleteQueuesAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
}

@Override
Expand All @@ -39,15 +41,17 @@ public void execute(Message<JsonObject> event) {
event.reply(createErrorReply().put(ERROR_TYPE, BAD_INPUT).put(MESSAGE, "Queues must be string values"));
return;
}
redisProvider.redis().onSuccess(redisAPI -> redisAPI.del(buildQueueKeys(queues), delManyReply -> {
queueStatisticsCollector.resetQueueStatistics(queues);
if (delManyReply.succeeded()) {
event.reply(createOkReply().put(VALUE, delManyReply.result().toLong()));
} else {
log.error("Failed to bulkDeleteQueues", delManyReply.cause());
event.reply(createErrorReply());
}
}))
.onFailure(replyErrorMessageHandler(event));
var p = redisProvider.redis();
p.onSuccess(redisAPI -> redisAPI.del(buildQueueKeys(queues), delManyReply -> {
queueStatisticsCollector.resetQueueStatistics(queues);
if (delManyReply.succeeded()) {
event.reply(createOkReply().put(VALUE, delManyReply.result().toLong()));
} else {
log.error("Failed to bulkDeleteQueues", new Exception(delManyReply.cause()));
event.reply(createErrorReply());
}
}));
p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

public class BulkPutLocksAction extends AbstractQueueAction {


public BulkPutLocksAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
public BulkPutLocksAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
}

@Override
Expand All @@ -43,8 +44,11 @@ public void execute(Message<JsonObject> event) {
return;
}

redisProvider.redis().onSuccess(redisAPI ->
redisAPI.hmset(buildLocksItems(locksKey, locks, lockInfo), new PutLockHandler(event)))
.onFailure(replyErrorMessageHandler(event));
var p = redisProvider.redis();
p.onSuccess(redisAPI -> {
redisAPI.hmset(buildLocksItems(locksKey, locks, lockInfo), new PutLockHandler(event));
});
p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@

public class DeleteAllLocksAction extends AbstractQueueAction {

public DeleteAllLocksAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
public DeleteAllLocksAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
}

@Override
Expand All @@ -29,13 +31,14 @@ public void execute(Message<JsonObject> event) {
Response locks = locksResult.result();
deleteLocks(event, locks);
} else {
replyError(event, locksResult.cause().getMessage());
replyError(event, locksResult.cause());
}
})).onFailure(throwable -> replyError(event, throwable.getMessage()));
})).onFailure(ex -> replyError(event, ex));
}

private void replyError(Message<JsonObject> event, String message) {
log.warn("failed to delete all locks. Message: {}", message);
event.reply(createErrorReply().put(MESSAGE, message));
private void replyError(Message<JsonObject> event, Throwable ex) {
if( log.isWarnEnabled() ) log.warn("failed to delete all locks.", new Exception(ex));
event.reply(createErrorReply().put(MESSAGE, ex.getMessage()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,49 +18,56 @@

public class DeleteAllQueueItemsAction extends AbstractQueueAction {

public DeleteAllQueueItemsAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
public DeleteAllQueueItemsAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
}

@Override
public void execute(Message<JsonObject> event) {
JsonObject payload = event.body().getJsonObject(PAYLOAD);
boolean unlock = payload.getBoolean(UNLOCK, false);
String queue = payload.getString(QUEUENAME);
redisProvider.redis().onSuccess(redisAPI -> redisAPI.del(Collections.singletonList(buildQueueKey(queue)),
deleteReply -> {
if (deleteReply.failed()) {
log.warn("Failed to deleteAllQueueItems. But we'll continue anyway", deleteReply.cause());
// May we should 'fail()' here. But:
// 1st: We don't, to keep backward compatibility
// 2nd: We don't, to may unlock below.
}
queueStatisticsCollector.resetQueueFailureStatistics(queue);
if (unlock) {
redisAPI.hdel(Arrays.asList(locksKey, queue), unlockReply -> {
if (unlockReply.failed()) {
log.warn("Failed to unlock queue '{}'. Will continue anyway", queue, unlockReply.cause());
// IMO we should 'fail()' here. But we don't, to keep backward compatibility.
}
handleDeleteQueueReply(event, deleteReply);
});
} else {
var p = redisProvider.redis();
p.onSuccess(redisAPI -> {
redisAPI.del(Collections.singletonList(buildQueueKey(queue)), deleteReply -> {
if (deleteReply.failed()) {
log.warn("Failed to deleteAllQueueItems. But we'll continue anyway",
new Exception(deleteReply.cause()));
// May we should 'fail()' here. But:
// 1st: We don't, to keep backward compatibility
// 2nd: We don't, to may unlock below.
}
queueStatisticsCollector.resetQueueFailureStatistics(queue);
if (unlock) {
redisAPI.hdel(Arrays.asList(locksKey, queue), unlockReply -> {
if (unlockReply.failed()) {
log.warn("Failed to unlock queue '{}'. Will continue anyway",
queue, unlockReply.cause());
// IMO we should 'fail()' here. But we don't, to keep backward compatibility.
}
handleDeleteQueueReply(event, deleteReply);
}
})).onFailure(throwable -> {
log.error("Redis: Failed to delete all queue items", throwable);
event.reply(createErrorReply());
});
});
} else {
handleDeleteQueueReply(event, deleteReply);
}
});
});
p.onFailure(ex -> {
log.error("Redis: Failed to delete all queue items", new Exception(ex));
event.reply(createErrorReply());
});
}

private void handleDeleteQueueReply(Message<JsonObject> event, AsyncResult<Response> reply) {
if (reply.succeeded()) {
event.reply(createOkReply().put(VALUE, reply.result().toLong()));
} else {
log.error("Failed to replyResultGreaterThanZero", reply.cause());
log.error("Failed to replyResultGreaterThanZero", new Exception(reply.cause()));
event.reply(createErrorReply());
}
}
Expand Down
32 changes: 19 additions & 13 deletions src/main/java/org/swisspush/redisques/action/DeleteLockAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,29 @@

public class DeleteLockAction extends AbstractQueueAction {

public DeleteLockAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
public DeleteLockAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
}

@Override
public void execute(Message<JsonObject> event) {
String queueName = event.body().getJsonObject(PAYLOAD).getString(QUEUENAME);
redisProvider.redis().onSuccess(redisAPI ->
redisAPI.exists(Collections.singletonList(queuesPrefix + queueName), event1 -> {
if (event1.succeeded() && event1.result() != null && event1.result().toInteger() == 1) {
notifyConsumer(queueName);
}
redisAPI.hdel(Arrays.asList(locksKey, queueName), new DeleteLockHandler(event));
}))
.onFailure(replyErrorMessageHandler(event));
var p = redisProvider.redis();
p.onSuccess(redisAPI -> {
redisAPI.exists(Collections.singletonList(queuesPrefix + queueName), event1 -> {
if( event1.failed() ) log.warn("Concealed error", new Exception(event1.cause()));
if (event1.succeeded() && event1.result() != null && event1.result().toInteger() == 1) {
notifyConsumer(queueName);
}
redisAPI.hdel(Arrays.asList(locksKey, queueName), new DeleteLockHandler(event));
});
});
p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

public class DeleteQueueItemAction extends AbstractQueueAction {

public DeleteQueueItemAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
public DeleteQueueItemAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
}

@Override
Expand All @@ -31,17 +33,18 @@ public void execute(Message<JsonObject> event) {
String keyLrem = queuesPrefix + event.body().getJsonObject(PAYLOAD).getString(QUEUENAME);
redisAPI.lrem(keyLrem, "0", "TO_DELETE", replyLrem -> {
if (replyLrem.failed()) {
log.warn("Redis 'lrem' command failed. But will continue anyway.", replyLrem.cause());
log.warn("Redis 'lrem' command failed. But will continue anyway.",
new Exception(replyLrem.cause()));
// IMO we should 'fail()' here. But we don't, to keep backward compatibility.
}
event.reply(createOkReply());
});
} else {
log.error("Failed to 'lset' while deleteQueueItem.", event1.cause());
log.error("Failed to 'lset' while deleteQueueItem.", new Exception(event1.cause()));
event.reply(createErrorReply());
}
})).onFailure(throwable -> {
log.error("Redis: Failed to deleteQueueItem.", throwable);
})).onFailure(ex -> {
log.error("Redis: Failed to deleteQueueItem.", new Exception(ex));
event.reply(createErrorReply());
});
}
Expand Down
Loading

0 comments on commit d8f6f24

Please sign in to comment.