diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 8fc491b..afc3434 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -332,7 +332,7 @@ public void start(Promise promise) { consumersPrefix = modConfig.getRedisPrefix() + "consumers:"; locksKey = modConfig.getRedisPrefix() + "locks"; queueCheckLastexecKey = modConfig.getRedisPrefix() + "check:lastexec"; - consumerLockTime = 2 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive + consumerLockTime = modConfig.getConsumerLockMultiplier() * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive timer = new RedisQuesTimer(vertx); if (redisProvider == null) { @@ -670,7 +670,7 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s private void registerQueueCheck() { - vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> { + periodicSkipScheduler.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), "checkQueues", periodicEvent -> { redisProvider.redis().compose((RedisAPI redisAPI) -> { int checkInterval = configurationProvider.configuration().getCheckInterval(); return redisAPI.send(Command.SET, queueCheckLastexecKey, String.valueOf(currentTimeMillis()), "NX", "EX", String.valueOf(checkInterval)); @@ -1141,6 +1141,7 @@ private void updateTimestamp(final String queueName, Handler checkQueues() { + final long startTs = System.currentTimeMillis(); final var ctx = new Object() { long limit; RedisAPI redisAPI; @@ -1158,6 +1159,7 @@ private Future checkQueues() { redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p); return p.future(); }).compose((Response queues) -> { + log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs); assert ctx.counter == null; assert ctx.iter == null; ctx.counter = new AtomicInteger(queues.size()); @@ -1167,6 +1169,7 @@ private Future checkQueues() { upperBoundParallel.request(checkQueueRequestsQuota, null, new UpperBoundParallel.Mentor() { @Override public boolean runOneMore(BiConsumer onDone, Void ctx_) { if (ctx.iter.hasNext()) { + final long perQueueStartTs = System.currentTimeMillis(); var queueObject = ctx.iter.next(); // Check if the inactive queue is not empty (i.e. the key exists) final String queueName = queueObject.toString(); @@ -1183,13 +1186,16 @@ private Future checkQueues() { if (notifyConsumerEvent.failed()) log.warn("TODO error handling", exceptionFactory.newException("notifyConsumer(" + queueName + ") failed", notifyConsumerEvent.cause())); + if (log.isTraceEnabled()) { + log.trace("refreshRegistration for queue {} time used is {} ms", queueName, System.currentTimeMillis() - perQueueStartTs); + } onDone.accept(null, null); }); }); }; ctx.redisAPI.exists(Collections.singletonList(key), event -> { if (event.failed() || event.result() == null) { - log.error("RedisQues is unable to check existence of queue " + queueName, + log.error("RedisQues is unable to check existence of queue {}", queueName, exceptionFactory.newException("redisAPI.exists(" + key + ") failed", event.cause())); onDone.accept(null, null); return; @@ -1242,6 +1248,7 @@ private Future checkQueues() { } }); } else { + log.debug("all queue items time used is {} ms", System.currentTimeMillis() - startTs); onDone.accept(null, null); } return ctx.iter.hasNext(); diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java index 20a11b8..00074de 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java @@ -26,6 +26,7 @@ public class RedisquesConfiguration { private final String metricStorageName; private final int metricRefreshPeriod; private final int refreshPeriod; + private final int consumerLockMultiplier; private final List redisHosts; private final List redisPorts; private boolean redisEnableTls; @@ -66,6 +67,7 @@ public class RedisquesConfiguration { private static final int DEFAULT_REDIS_RECONNECT_ATTEMPTS = 0; private static final int DEFAULT_REDIS_RECONNECT_DELAY_SEC = 30; private static final int DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS = 180_000; + private static final int DEFAULT_CONSUMER_LOCK_MULTIPLIER = 2; // We want to have more than the default of 24 max waiting requests and therefore // set the default here to infinity value. See as well: @@ -88,6 +90,7 @@ public class RedisquesConfiguration { public static final String PROP_METRIC_STORAGE_NAME = "metric-storage-name"; public static final String PROP_METRIC_REFRESH_PERIOD = "metric-refresh-period"; public static final String PROP_REFRESH_PERIOD = "refresh-period"; + public static final String PROP_CONSUMER_LOCK_MULTIPLIER = "consumer-lock-multiplier"; public static final String PROP_REDIS_HOST = "redisHost"; public static final String PROP_REDIS_HOST_LIST = "redisHosts"; public static final String PROP_REDIS_PORT = "redisPort"; @@ -148,7 +151,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, - metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), RedisClientType.STANDALONE, redisAuth, null, null, false, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, @@ -173,7 +176,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, - metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), RedisClientType.STANDALONE, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, @@ -198,7 +201,32 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, - metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, + processorDelayMax, httpRequestHandlerEnabled, + httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, + httpRequestHandlerPassword, httpRequestHandlerPort, httpRequestHandlerUserHeader, queueConfigurations, + enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE, + DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT, + DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC, DEFAULT_REDIS_RECONNECT_ATTEMPTS, DEFAULT_REDIS_RECONNECT_DELAY_SEC, + DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS, DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC, + DEFAULT_REDIS_READY_CHECK_INTERVAL_MS); + } + + /** + * Constructor with username and password (Redis ACL) + */ + public RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress, + String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod, + int consumerLockMultiplier, String redisHost, int redisPort, RedisClientType redisClientType, String redisPassword, + String redisUser, boolean redisEnableTls, int checkInterval, + int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled, + boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix, + String httpRequestHandlerUsername, String httpRequestHandlerPassword, + Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, + List queueConfigurations, boolean enableQueueNameDecoding) { + this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, + metricRefreshPeriod, refreshPeriod, consumerLockMultiplier, Collections.singletonList(redisHost), Collections.singletonList(redisPort), redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, @@ -212,7 +240,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress private RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress, String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod, - List redisHosts, List redisPorts, RedisClientType redisClientType, + int consumerLockMultiplier, List redisHosts, List redisPorts, RedisClientType redisClientType, String redisAuth, String redisPassword, String redisUser, boolean redisEnableTls, int checkInterval, int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled, boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix, @@ -229,6 +257,7 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres this.processorAddress = processorAddress; this.publishMetricsAddress = publishMetricsAddress; this.refreshPeriod = refreshPeriod; + this.consumerLockMultiplier = consumerLockMultiplier; this.redisHosts = redisHosts; this.redisPorts = redisPorts; this.redisClientType = redisClientType; @@ -326,7 +355,7 @@ public static RedisquesConfigurationBuilder with() { private RedisquesConfiguration(RedisquesConfigurationBuilder builder) { this(builder.address, builder.configurationUpdatedAddress, builder.redisPrefix, builder.processorAddress, builder.publishMetricsAddress, builder.metricStorageName, builder.metricRefreshPeriod, - builder.refreshPeriod, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth, + builder.refreshPeriod, builder.consumerLockMultiplier, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth, builder.redisPassword, builder.redisUser, builder.redisEnableTls, builder.checkInterval, builder.processorTimeout, builder.processorDelayMax, builder.httpRequestHandlerEnabled, builder.httpRequestHandlerAuthenticationEnabled, builder.httpRequestHandlerPrefix, @@ -354,6 +383,7 @@ public JsonObject asJsonObject() { obj.put(PROP_METRIC_STORAGE_NAME, getMetricStorageName()); obj.put(PROP_METRIC_REFRESH_PERIOD, getMetricRefreshPeriod()); obj.put(PROP_REFRESH_PERIOD, getRefreshPeriod()); + obj.put(PROP_CONSUMER_LOCK_MULTIPLIER, getConsumerLockMultiplier()); obj.put(PROP_REDIS_HOST, getRedisHost()); obj.put(PROP_REDIS_HOST_LIST, getRedisHosts()); obj.put(PROP_REDIS_PORT, getRedisPort()); @@ -415,6 +445,9 @@ public static RedisquesConfiguration fromJsonObject(JsonObject json) { if (json.containsKey(PROP_REFRESH_PERIOD)) { builder.refreshPeriod(json.getInteger(PROP_REFRESH_PERIOD)); } + if (json.containsKey(PROP_CONSUMER_LOCK_MULTIPLIER)) { + builder.consumerLockMultiplier(json.getInteger(PROP_CONSUMER_LOCK_MULTIPLIER)); + } if (json.containsKey(PROP_REDIS_HOST)) { builder.redisHost(json.getString(PROP_REDIS_HOST)); } @@ -549,6 +582,10 @@ public int getRefreshPeriod() { return refreshPeriod; } + public int getConsumerLockMultiplier() { + return consumerLockMultiplier; + } + public String getRedisHost() { return redisHosts.get(0); } @@ -728,6 +765,7 @@ public static class RedisquesConfigurationBuilder { private String metricStorageName; private int metricRefreshPeriod; private int refreshPeriod; + private int consumerLockMultiplier; private List redisHosts; private List redisPorts; private boolean redisEnableTls; @@ -767,6 +805,7 @@ public RedisquesConfigurationBuilder() { this.processorAddress = "redisques-processor"; this.metricRefreshPeriod = 10; this.refreshPeriod = 10; + this.consumerLockMultiplier = DEFAULT_CONSUMER_LOCK_MULTIPLIER; this.redisHosts = Collections.singletonList("localhost"); this.redisPorts = Collections.singletonList(6379); this.redisEnableTls = false; @@ -836,6 +875,11 @@ public RedisquesConfigurationBuilder refreshPeriod(int refreshPeriod) { return this; } + public RedisquesConfigurationBuilder consumerLockMultiplier(int consumerLockMultiplier) { + this.consumerLockMultiplier = consumerLockMultiplier; + return this; + } + public RedisquesConfigurationBuilder redisHost(String redisHost) { this.redisHosts = Collections.singletonList(redisHost); return this; diff --git a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java index e9790f2..b82e2a7 100644 --- a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java +++ b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java @@ -30,6 +30,7 @@ public void testDefaultConfiguration(TestContext testContext) { testContext.assertEquals(config.getProcessorAddress(), "redisques-processor"); testContext.assertEquals(config.getMetricRefreshPeriod(), 10); testContext.assertEquals(config.getRefreshPeriod(), 10); + testContext.assertEquals(config.getConsumerLockMultiplier(), 2); testContext.assertEquals(config.getRedisHost(), "localhost"); testContext.assertEquals(config.getRedisPort(), 6379); testContext.assertEquals(config.getRedisEnableTls(), false); @@ -77,6 +78,7 @@ public void testOverrideConfiguration(TestContext testContext) { new QueueConfiguration().withPattern("vehicle-.*").withRetryIntervals(10, 20, 30, 60) )) .queueSpeedIntervalSec(1) + .consumerLockMultiplier(9) .memoryUsageLimitPercent(80) .publishMetricsAddress("eventbus-addr-1") .metricStorageName("queue") @@ -110,6 +112,7 @@ public void testOverrideConfiguration(TestContext testContext) { testContext.assertEquals(config.getMemoryUsageLimitPercent(), 80); testContext.assertEquals(config.getPublishMetricsAddress(), "eventbus-addr-1"); testContext.assertEquals(config.getMetricStorageName(), "queue"); + testContext.assertEquals(config.getConsumerLockMultiplier(), 9); // queue configurations testContext.assertEquals(config.getQueueConfigurations().size(), 1); QueueConfiguration queueConfiguration = config.getQueueConfigurations().get(0); @@ -128,6 +131,7 @@ public void testGetDefaultAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getString(PROP_PROCESSOR_ADDRESS), "redisques-processor"); testContext.assertEquals(json.getInteger(PROP_METRIC_REFRESH_PERIOD), 10); testContext.assertEquals(json.getInteger(PROP_REFRESH_PERIOD), 10); + testContext.assertEquals(json.getInteger(PROP_CONSUMER_LOCK_MULTIPLIER), 2); testContext.assertEquals(json.getString(PROP_REDIS_HOST), "localhost"); testContext.assertEquals(json.getInteger(PROP_REDIS_PORT), 6379); testContext.assertFalse(json.getBoolean(PROP_REDIS_ENABLE_TLS)); @@ -178,6 +182,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) { .memoryUsageLimitPercent(55) .dequeueStatisticReportIntervalSec(44) .redisReadyCheckIntervalMs(5000) + .consumerLockMultiplier(3) .publishMetricsAddress("eventbus-addr-1") .metricStorageName("queue") .build(); @@ -214,6 +219,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getInteger(PROP_REDIS_READY_CHECK_INTERVAL_MS), 5000); testContext.assertEquals(json.getString(PROP_PUBLISH_METRICS_ADDRESS), "eventbus-addr-1"); testContext.assertEquals(json.getString(PROP_METRIC_STORAGE_NAME), "queue"); + testContext.assertEquals(json.getInteger(PROP_CONSUMER_LOCK_MULTIPLIER), 3); // queue configurations JsonArray queueConfigurationsJsonArray = json.getJsonArray(PROP_QUEUE_CONFIGURATIONS); List queueConfigurationJsonObjects = queueConfigurationsJsonArray.getList(); @@ -233,6 +239,7 @@ public void testGetDefaultFromJsonObject(TestContext testContext) { testContext.assertEquals(config.getRedisPrefix(), "redisques:"); testContext.assertEquals(config.getProcessorAddress(), "redisques-processor"); testContext.assertEquals(config.getRefreshPeriod(), 10); + testContext.assertEquals(config.getConsumerLockMultiplier(), 2); testContext.assertEquals(config.getMetricRefreshPeriod(), 10); testContext.assertEquals(config.getRedisHost(), "localhost"); testContext.assertEquals(config.getRedisPort(), 6379); @@ -267,6 +274,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) { json.put(PROP_REDIS_PREFIX, "new_redis-prefix"); json.put(PROP_PROCESSOR_ADDRESS, "new_processor-address"); json.put(PROP_REFRESH_PERIOD, 99); + json.put(PROP_CONSUMER_LOCK_MULTIPLIER, 91); json.put(PROP_METRIC_REFRESH_PERIOD, 55); json.put(PROP_REDIS_HOST, "newredishost"); json.put(PROP_REDIS_PORT, 4321); @@ -300,6 +308,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) { testContext.assertEquals(config.getRedisPrefix(), "new_redis-prefix"); testContext.assertEquals(config.getProcessorAddress(), "new_processor-address"); testContext.assertEquals(config.getRefreshPeriod(), 99); + testContext.assertEquals(config.getConsumerLockMultiplier(), 91); testContext.assertEquals(config.getMetricRefreshPeriod(), 55); testContext.assertEquals(config.getRedisHost(), "newredishost"); testContext.assertEquals(config.getRedisPort(), 4321);