Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PR for release #206

Merged
merged 28 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
562c508
#192 Adding missing error handling
mcweba Jul 1, 2024
ce90b79
#192 Adding more missing error handling
mcweba Jul 1, 2024
b8da5e9
#192 Adding more missing error handling
mcweba Jul 2, 2024
bffe62d
#192 Adding more missing error handling
mcweba Jul 2, 2024
4dc98d6
#192 Adding more missing error handling
mcweba Jul 2, 2024
456d8a1
#192 Adding more missing error handling
mcweba Jul 2, 2024
cd093ff
#192 Adding more missing error handling
mcweba Jul 2, 2024
ffae879
#192 Adding more missing error handling
mcweba Jul 3, 2024
c87ac31
Merge branch 'refs/heads/develop' into feature/issue192_missing_error…
mcweba Jul 3, 2024
aa6d854
#192 Fixed hanging response when adding invalid queue item
mcweba Jul 3, 2024
ce5c5c6
#192 Adding more missing error handling
mcweba Jul 3, 2024
75870ef
#192 Adding more missing error handling
mcweba Jul 3, 2024
2d4dd94
#192 Fixed javadoc links
mcweba Jul 3, 2024
ec59f55
#192 Code cleanup
mcweba Jul 3, 2024
e13f77b
Merge branch 'refs/heads/develop' into feature/issue192_missing_error…
mcweba Jul 3, 2024
5117707
#192 Adding more missing error handling
mcweba Jul 3, 2024
8af5e66
#192 Adding more missing error handling
mcweba Jul 4, 2024
b6d0cbf
#192 Adding more missing error handling
mcweba Jul 4, 2024
49e12d7
#192 Increased major version since breaking backward compatibility
mcweba Jul 4, 2024
baa42f2
Merge branch 'refs/heads/develop' into feature/issue192_missing_error…
mcweba Jul 9, 2024
d62bb15
updating poms for branch'release-3.1.13' with non-snapshot versions
Jul 9, 2024
f5b1080
updating poms for 3.1.14-SNAPSHOT development
Jul 9, 2024
f145a7a
Merge branch 'release-3.1.13'
Jul 9, 2024
58d9124
updating develop poms to master versions to avoid merge conflicts
Jul 9, 2024
5230588
Merge branch 'master' into develop
Jul 9, 2024
ff7153f
Updating develop poms back to pre merge state
Jul 9, 2024
b52f788
Merge branch 'refs/heads/develop' into feature/issue192_missing_error…
mcweba Jul 9, 2024
8e127e6
Merge pull request #203 from swisspost/feature/issue192_missing_error…
mcweba Jul 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.swisspush</groupId>
<artifactId>redisques</artifactId>
<version>3.1.13-SNAPSHOT</version>
<version>4.1.1-SNAPSHOT</version>
<name>redisques</name>
<description>
A highly scalable redis-persistent queuing system for vertx
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -675,10 +675,10 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s

private void registerQueueCheck() {
vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> {
redisProvider.redis().<Response>compose((RedisAPI redisAPI) -> {
redisProvider.redis().compose((RedisAPI redisAPI) -> {
int checkInterval = configurationProvider.configuration().getCheckInterval();
return redisAPI.send(Command.SET, queueCheckLastexecKey, String.valueOf(currentTimeMillis()), "NX", "EX", String.valueOf(checkInterval));
}).<Void>compose((Response todoExplainWhyThisIsIgnored) -> {
}).compose((Response todoExplainWhyThisIsIgnored) -> {
log.info("periodic queue check is triggered now");
return checkQueues();
}).onFailure((Throwable ex) -> {
Expand Down Expand Up @@ -815,7 +815,7 @@ private Future<Void> consume(final String queueName) {
log.trace("RedisQues consume get: {}", consumerKey);
redisProvider.redis().onSuccess(redisAPI -> redisAPI.get(consumerKey, event1 -> {
if (event1.failed()) {
log.error("Unable to get consumer for queue " + queueName, event1.cause());
log.error("Unable to get consumer for queue {}", queueName, event1.cause());
return;
}
String consumer = Objects.toString(event1.result(), "");
Expand Down Expand Up @@ -857,7 +857,7 @@ private Future<Void> consume(final String queueName) {
});
}
}))
.onFailure(throwable -> log.error("Redis: Unable to get consumer for queue " + queueName, throwable));
.onFailure(throwable -> log.error("Redis: Unable to get consumer for queue {}", queueName, throwable));
});
return promise.future();
}
Expand Down Expand Up @@ -1151,17 +1151,17 @@ private Future<Void> checkQueues() {
AtomicInteger counter;
Iterator<Response> iter;
};
return Future.<Void>succeededFuture().<RedisAPI>compose((Void v) -> {
return Future.<Void>succeededFuture().compose((Void v) -> {
log.debug("Checking queues timestamps");
// List all queues that look inactive (i.e. that have not been updated since 3 periods).
ctx.limit = currentTimeMillis() - 3L * configurationProvider.configuration().getRefreshPeriod() * 1000;
return redisProvider.redis();
}).<Response>compose((RedisAPI redisAPI) -> {
}).compose((RedisAPI redisAPI) -> {
ctx.redisAPI = redisAPI;
var p = Promise.<Response>promise();
redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p);
return p.future();
}).<Void>compose((Response queues) -> {
}).compose((Response queues) -> {
assert ctx.counter == null;
assert ctx.iter == null;
ctx.counter = new AtomicInteger(queues.size());
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/swisspush/redisques/RedisQuesRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* Deploys vertx-redisques to vert.x.
* Used in the standalone scenario.
*
* @author https://github.com/mcweba [Marc-Andre Weber]
* @author <a href="https://github.com/mcweba">Marc-André Weber</a>
*/
public class RedisQuesRunner {

Expand All @@ -20,7 +20,7 @@ public static void main(String[] args) {
.httpRequestHandlerEnabled(true)
.redisReconnectAttempts(-1)
.redisPoolRecycleTimeoutMs(-1)
.redisReadyCheckIntervalMs(10000)
.redisReadyCheckIntervalMs(5000)
.build().asJsonObject();

Vertx.vertx().deployVerticle(new RedisQues(), new DeploymentOptions().setConfig(configuration),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Response;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -33,11 +34,12 @@ public abstract class AbstractQueueAction implements QueueAction {
protected final String consumersPrefix;
protected final String locksKey;
protected final List<QueueConfiguration> queueConfigurations;
protected final RedisQuesExceptionFactory exceptionFactory;
protected final QueueStatisticsCollector queueStatisticsCollector;

public AbstractQueueAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey,
String queuesPrefix, String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log) {
this.vertx = vertx;
this.redisProvider = redisProvider;
this.address = address;
Expand All @@ -46,6 +48,7 @@ public AbstractQueueAction(Vertx vertx, RedisProvider redisProvider, String addr
this.consumersPrefix = consumersPrefix;
this.locksKey = locksKey;
this.queueConfigurations = queueConfigurations;
this.exceptionFactory = exceptionFactory;
this.queueStatisticsCollector = queueStatisticsCollector;
this.log = log;
}
Expand All @@ -57,6 +60,11 @@ protected Handler<Throwable> replyErrorMessageHandler(Message<JsonObject> event)
};
}

protected void handleFail(Message<JsonObject> event, String message, Throwable throwable) {
log.warn(message, exceptionFactory.newException(throwable));
event.fail(0, throwable.getMessage());
}

protected long getMaxAgeTimestamp() {
return System.currentTimeMillis() - MAX_AGE_MILLISECONDS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.handler.AddQueueItemHandler;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
Expand All @@ -19,19 +20,19 @@ public class AddQueueItemAction extends AbstractQueueAction {
public AddQueueItemAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
queueConfigurations, exceptionFactory, queueStatisticsCollector, log);
}

@Override
public void execute(Message<JsonObject> event) {
String key1 = queuesPrefix + event.body().getJsonObject(PAYLOAD).getString(QUEUENAME);
String valueAddItem = event.body().getJsonObject(PAYLOAD).getString(BUFFER);
var p = redisProvider.redis();
p.onSuccess(redisAPI -> redisAPI.rpush(Arrays.asList(key1, valueAddItem), new AddQueueItemHandler(event)));
p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex));
p.onSuccess(redisAPI -> redisAPI.rpush(Arrays.asList(key1, valueAddItem), new AddQueueItemHandler(event, exceptionFactory)));
p.onFailure(ex -> handleFail(event,"Operation AddQueueItemAction failed", ex));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.vertx.redis.client.impl.types.MultiType;
import io.vertx.redis.client.impl.types.SimpleStringType;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -20,9 +21,9 @@ public class BulkDeleteLocksAction extends AbstractQueueAction {

public BulkDeleteLocksAction(Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log) {
RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey, queueConfigurations,
queueStatisticsCollector, log);
exceptionFactory, queueStatisticsCollector, log);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -18,10 +19,10 @@ public class BulkDeleteQueuesAction extends AbstractQueueAction {
public BulkDeleteQueuesAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
queueConfigurations, exceptionFactory, queueStatisticsCollector, log);
}

@Override
Expand Down Expand Up @@ -49,11 +50,10 @@ public void execute(Message<JsonObject> event) {
if (delManyReply.succeeded()) {
event.reply(createOkReply().put(VALUE, delManyReply.result().toLong()));
} else {
log.error("Failed to bulkDeleteQueues", new Exception(delManyReply.cause()));
event.reply(createErrorReply());
handleFail(event, "Failed to bulkDeleteQueues", delManyReply.cause());
}
}));
p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex));
p.onFailure(ex -> handleFail(event, "Operation BulkDeleteQueuesAction failed", ex));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.handler.PutLockHandler;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
Expand All @@ -19,10 +20,10 @@ public class BulkPutLocksAction extends AbstractQueueAction {
public BulkPutLocksAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
queueConfigurations, exceptionFactory, queueStatisticsCollector, log);
}

@Override
Expand All @@ -45,9 +46,7 @@ public void execute(Message<JsonObject> event) {
}

var p = redisProvider.redis();
p.onSuccess(redisAPI -> {
redisAPI.hmset(buildLocksItems(locksKey, locks, lockInfo), new PutLockHandler(event));
});
p.onSuccess(redisAPI -> redisAPI.hmset(buildLocksItems(locksKey, locks, lockInfo), new PutLockHandler(event, exceptionFactory)));
p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Response;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -18,10 +19,10 @@ public class DeleteAllLocksAction extends AbstractQueueAction {
public DeleteAllLocksAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
queueConfigurations, exceptionFactory, queueStatisticsCollector, log);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.vertx.core.json.JsonObject;
import io.vertx.redis.client.Response;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
import org.swisspush.redisques.util.RedisProvider;
Expand All @@ -21,10 +22,10 @@ public class DeleteAllQueueItemsAction extends AbstractQueueAction {
public DeleteAllQueueItemsAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
queueConfigurations, exceptionFactory, queueStatisticsCollector, log);
}

@Override
Expand All @@ -36,40 +37,33 @@ public void execute(Message<JsonObject> event) {
p.onSuccess(redisAPI -> {
redisAPI.del(Collections.singletonList(buildQueueKey(queue)), deleteReply -> {
if (deleteReply.failed()) {
log.warn("Failed to deleteAllQueueItems. But we'll continue anyway",
new Exception(deleteReply.cause()));
// May we should 'fail()' here. But:
// 1st: We don't, to keep backward compatibility
// 2nd: We don't, to may unlock below.
handleFail(event, "Operation DeleteAllQueueItems failed", deleteReply.cause());
return;
}
queueStatisticsCollector.resetQueueFailureStatistics(queue, (Throwable ex, Void v) -> {
if (ex != null) log.warn("TODO_2958iouhj error handling", ex);
});
if (unlock) {
redisAPI.hdel(Arrays.asList(locksKey, queue), unlockReply -> {
if (unlockReply.failed()) {
log.warn("Failed to unlock queue '{}'. Will continue anyway",
queue, unlockReply.cause());
// IMO we should 'fail()' here. But we don't, to keep backward compatibility.
handleFail(event, "Failed to unlock queue " + queue, unlockReply.cause());
} else {
handleDeleteQueueReply(event, deleteReply);
}
handleDeleteQueueReply(event, deleteReply);
});
} else {
handleDeleteQueueReply(event, deleteReply);
}
});
});
p.onFailure(ex -> {
log.error("Redis: Failed to delete all queue items", new Exception(ex));
event.reply(createErrorReply());
});
p.onFailure(ex -> handleFail(event, "Operation DeleteAllQueueItems failed", ex));
}

private void handleDeleteQueueReply(Message<JsonObject> event, AsyncResult<Response> reply) {
if (reply.succeeded()) {
event.reply(createOkReply().put(VALUE, reply.result().toLong()));
} else {
log.error("Failed to replyResultGreaterThanZero", new Exception(reply.cause()));
log.error("Failed to replyResultGreaterThanZero", exceptionFactory.newException(reply.cause()));
event.reply(createErrorReply());
}
}
Expand Down
15 changes: 10 additions & 5 deletions src/main/java/org/swisspush/redisques/action/DeleteLockAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.vertx.core.eventbus.Message;
import io.vertx.core.json.JsonObject;
import org.slf4j.Logger;
import org.swisspush.redisques.exception.RedisQuesExceptionFactory;
import org.swisspush.redisques.handler.DeleteLockHandler;
import org.swisspush.redisques.util.QueueConfiguration;
import org.swisspush.redisques.util.QueueStatisticsCollector;
Expand All @@ -21,10 +22,10 @@ public class DeleteLockAction extends AbstractQueueAction {
public DeleteLockAction(
Vertx vertx, RedisProvider redisProvider, String address, String queuesKey, String queuesPrefix,
String consumersPrefix, String locksKey, List<QueueConfiguration> queueConfigurations,
QueueStatisticsCollector queueStatisticsCollector, Logger log
RedisQuesExceptionFactory exceptionFactory, QueueStatisticsCollector queueStatisticsCollector, Logger log
) {
super(vertx, redisProvider, address, queuesKey, queuesPrefix, consumersPrefix, locksKey,
queueConfigurations, queueStatisticsCollector, log);
queueConfigurations, exceptionFactory, queueStatisticsCollector, log);
}

@Override
Expand All @@ -33,14 +34,18 @@ public void execute(Message<JsonObject> event) {
var p = redisProvider.redis();
p.onSuccess(redisAPI -> {
redisAPI.exists(Collections.singletonList(queuesPrefix + queueName), event1 -> {
if( event1.failed() ) log.warn("Concealed error", new Exception(event1.cause()));
if (event1.failed()) {
log.warn("Concealed error", exceptionFactory.newException(event1.cause()));
}

if (event1.succeeded() && event1.result() != null && event1.result().toInteger() == 1) {
notifyConsumer(queueName);
}
redisAPI.hdel(Arrays.asList(locksKey, queueName), new DeleteLockHandler(event));

redisAPI.hdel(Arrays.asList(locksKey, queueName), new DeleteLockHandler(event, exceptionFactory));
});
});
p.onFailure(ex -> replyErrorMessageHandler(event).handle(ex));
p.onFailure(ex -> handleFail(event, "Operation DeleteLockAction failed", ex));
}

}
Loading
Loading