From a3a5e53c01aaed21ec921e5bec9c6a9b87bd4d24 Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Fri, 25 Oct 2024 15:27:28 +0700 Subject: [PATCH] use PeriodicSkipScheduler with long consumer lock time --- src/main/java/org/swisspush/redisques/RedisQues.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 8fc491b..e9a270c 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -332,7 +332,7 @@ public void start(Promise promise) { consumersPrefix = modConfig.getRedisPrefix() + "consumers:"; locksKey = modConfig.getRedisPrefix() + "locks"; queueCheckLastexecKey = modConfig.getRedisPrefix() + "check:lastexec"; - consumerLockTime = 2 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive + consumerLockTime = 20 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive timer = new RedisQuesTimer(vertx); if (redisProvider == null) { @@ -670,7 +670,7 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s private void registerQueueCheck() { - vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> { + periodicSkipScheduler.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), "registerQueueCheck", periodicEvent -> { redisProvider.redis().compose((RedisAPI redisAPI) -> { int checkInterval = configurationProvider.configuration().getCheckInterval(); return redisAPI.send(Command.SET, queueCheckLastexecKey, String.valueOf(currentTimeMillis()), "NX", "EX", String.valueOf(checkInterval)); @@ -1141,6 +1141,7 @@ private void updateTimestamp(final String queueName, Handler checkQueues() { + final long startTs = System.currentTimeMillis(); final var ctx = new Object() { long limit; RedisAPI redisAPI; @@ -1158,6 +1159,7 @@ private Future checkQueues() { redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p); return p.future(); }).compose((Response queues) -> { + log.debug("zrangebyscore time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs); assert ctx.counter == null; assert ctx.iter == null; ctx.counter = new AtomicInteger(queues.size()); @@ -1166,6 +1168,7 @@ private Future checkQueues() { var p = Promise.promise(); upperBoundParallel.request(checkQueueRequestsQuota, null, new UpperBoundParallel.Mentor() { @Override public boolean runOneMore(BiConsumer onDone, Void ctx_) { + log.debug("upperBoundParallel time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs); if (ctx.iter.hasNext()) { var queueObject = ctx.iter.next(); // Check if the inactive queue is not empty (i.e. the key exists) @@ -1183,13 +1186,14 @@ private Future checkQueues() { if (notifyConsumerEvent.failed()) log.warn("TODO error handling", exceptionFactory.newException("notifyConsumer(" + queueName + ") failed", notifyConsumerEvent.cause())); + log.debug("refreshRegistration time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs); onDone.accept(null, null); }); }); }; ctx.redisAPI.exists(Collections.singletonList(key), event -> { if (event.failed() || event.result() == null) { - log.error("RedisQues is unable to check existence of queue " + queueName, + log.error("RedisQues is unable to check existence of queue {}", queueName, exceptionFactory.newException("redisAPI.exists(" + key + ") failed", event.cause())); onDone.accept(null, null); return;