Skip to content

Commit

Permalink
use PeriodicSkipScheduler with long consumer lock time
Browse files Browse the repository at this point in the history
  • Loading branch information
Xin Zheng committed Oct 25, 2024
1 parent bcded03 commit a3a5e53
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void start(Promise<Void> promise) {
consumersPrefix = modConfig.getRedisPrefix() + "consumers:";
locksKey = modConfig.getRedisPrefix() + "locks";
queueCheckLastexecKey = modConfig.getRedisPrefix() + "check:lastexec";
consumerLockTime = 2 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive
consumerLockTime = 20 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive
timer = new RedisQuesTimer(vertx);

if (redisProvider == null) {
Expand Down Expand Up @@ -670,7 +670,7 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s


private void registerQueueCheck() {
vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> {
periodicSkipScheduler.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), "registerQueueCheck", periodicEvent -> {
redisProvider.redis().compose((RedisAPI redisAPI) -> {
int checkInterval = configurationProvider.configuration().getCheckInterval();
return redisAPI.send(Command.SET, queueCheckLastexecKey, String.valueOf(currentTimeMillis()), "NX", "EX", String.valueOf(checkInterval));
Expand Down Expand Up @@ -1141,6 +1141,7 @@ private void updateTimestamp(final String queueName, Handler<AsyncResult<Respons
* This uses a sorted set of queue names scored by last update timestamp.
*/
private Future<Void> checkQueues() {
final long startTs = System.currentTimeMillis();
final var ctx = new Object() {
long limit;
RedisAPI redisAPI;
Expand All @@ -1158,6 +1159,7 @@ private Future<Void> checkQueues() {
redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p);
return p.future();
}).compose((Response queues) -> {
log.debug("zrangebyscore time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs);
assert ctx.counter == null;
assert ctx.iter == null;
ctx.counter = new AtomicInteger(queues.size());
Expand All @@ -1166,6 +1168,7 @@ private Future<Void> checkQueues() {
var p = Promise.<Void>promise();
upperBoundParallel.request(checkQueueRequestsQuota, null, new UpperBoundParallel.Mentor<Void>() {
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void ctx_) {
log.debug("upperBoundParallel time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs);
if (ctx.iter.hasNext()) {
var queueObject = ctx.iter.next();
// Check if the inactive queue is not empty (i.e. the key exists)
Expand All @@ -1183,13 +1186,14 @@ private Future<Void> checkQueues() {
if (notifyConsumerEvent.failed()) log.warn("TODO error handling",
exceptionFactory.newException("notifyConsumer(" + queueName + ") failed",
notifyConsumerEvent.cause()));
log.debug("refreshRegistration time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs);
onDone.accept(null, null);
});
});
};
ctx.redisAPI.exists(Collections.singletonList(key), event -> {
if (event.failed() || event.result() == null) {
log.error("RedisQues is unable to check existence of queue " + queueName,
log.error("RedisQues is unable to check existence of queue {}", queueName,
exceptionFactory.newException("redisAPI.exists(" + key + ") failed", event.cause()));
onDone.accept(null, null);
return;
Expand Down

0 comments on commit a3a5e53

Please sign in to comment.