From b30f2337a1c68cce11f23be95e320ecf75a1641b Mon Sep 17 00:00:00 2001 From: runner Date: Mon, 7 Oct 2024 06:52:15 +0000 Subject: [PATCH 1/9] updating poms for branch'release-4.1.3' with non-snapshot versions --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 96eb3b42..ad2d6f89 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.3-SNAPSHOT + 4.1.3 redisques A highly scalable redis-persistent queuing system for vertx From c410bb7860bb8d2b831951f03d2e429dd215e27e Mon Sep 17 00:00:00 2001 From: runner Date: Mon, 7 Oct 2024 06:52:15 +0000 Subject: [PATCH 2/9] updating poms for 4.1.4-SNAPSHOT development --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 96eb3b42..78bcdce9 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.3-SNAPSHOT + 4.1.4-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx From c2413791f99996bbc96df829a0f12228b9ba1aef Mon Sep 17 00:00:00 2001 From: runner Date: Mon, 7 Oct 2024 06:54:45 +0000 Subject: [PATCH 3/9] updating develop poms to master versions to avoid merge conflicts --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 78bcdce9..ad2d6f89 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.4-SNAPSHOT + 4.1.3 redisques A highly scalable redis-persistent queuing system for vertx From bcded033a932b839f268ce981d607229357347b4 Mon Sep 17 00:00:00 2001 From: runner Date: Mon, 7 Oct 2024 06:54:45 +0000 Subject: [PATCH 4/9] Updating develop poms back to pre merge state --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index ad2d6f89..78bcdce9 100644 --- a/pom.xml +++ b/pom.xml @@ -2,7 +2,7 @@ 4.0.0 org.swisspush redisques - 4.1.3 + 4.1.4-SNAPSHOT redisques A highly scalable redis-persistent queuing system for vertx From a3a5e53c01aaed21ec921e5bec9c6a9b87bd4d24 Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Fri, 25 Oct 2024 15:27:28 +0700 Subject: [PATCH 5/9] use PeriodicSkipScheduler with long consumer lock time --- src/main/java/org/swisspush/redisques/RedisQues.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 8fc491b5..e9a270c1 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -332,7 +332,7 @@ public void start(Promise promise) { consumersPrefix = modConfig.getRedisPrefix() + "consumers:"; locksKey = modConfig.getRedisPrefix() + "locks"; queueCheckLastexecKey = modConfig.getRedisPrefix() + "check:lastexec"; - consumerLockTime = 2 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive + consumerLockTime = 20 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive timer = new RedisQuesTimer(vertx); if (redisProvider == null) { @@ -670,7 +670,7 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s private void registerQueueCheck() { - vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> { + periodicSkipScheduler.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), "registerQueueCheck", periodicEvent -> { redisProvider.redis().compose((RedisAPI redisAPI) -> { int checkInterval = configurationProvider.configuration().getCheckInterval(); return redisAPI.send(Command.SET, queueCheckLastexecKey, String.valueOf(currentTimeMillis()), "NX", "EX", String.valueOf(checkInterval)); @@ -1141,6 +1141,7 @@ private void updateTimestamp(final String queueName, Handler checkQueues() { + final long startTs = System.currentTimeMillis(); final var ctx = new Object() { long limit; RedisAPI redisAPI; @@ -1158,6 +1159,7 @@ private Future checkQueues() { redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p); return p.future(); }).compose((Response queues) -> { + log.debug("zrangebyscore time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs); assert ctx.counter == null; assert ctx.iter == null; ctx.counter = new AtomicInteger(queues.size()); @@ -1166,6 +1168,7 @@ private Future checkQueues() { var p = Promise.promise(); upperBoundParallel.request(checkQueueRequestsQuota, null, new UpperBoundParallel.Mentor() { @Override public boolean runOneMore(BiConsumer onDone, Void ctx_) { + log.debug("upperBoundParallel time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs); if (ctx.iter.hasNext()) { var queueObject = ctx.iter.next(); // Check if the inactive queue is not empty (i.e. the key exists) @@ -1183,13 +1186,14 @@ private Future checkQueues() { if (notifyConsumerEvent.failed()) log.warn("TODO error handling", exceptionFactory.newException("notifyConsumer(" + queueName + ") failed", notifyConsumerEvent.cause())); + log.debug("refreshRegistration time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs); onDone.accept(null, null); }); }); }; ctx.redisAPI.exists(Collections.singletonList(key), event -> { if (event.failed() || event.result() == null) { - log.error("RedisQues is unable to check existence of queue " + queueName, + log.error("RedisQues is unable to check existence of queue {}", queueName, exceptionFactory.newException("redisAPI.exists(" + key + ") failed", event.cause())); onDone.accept(null, null); return; From 39881c4c8104ff68264db73c77048e6e1a5e3103 Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Fri, 25 Oct 2024 15:44:01 +0700 Subject: [PATCH 6/9] improve logs --- src/main/java/org/swisspush/redisques/RedisQues.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index e9a270c1..6a2157ce 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -1159,7 +1159,7 @@ private Future checkQueues() { redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p); return p.future(); }).compose((Response queues) -> { - log.debug("zrangebyscore time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs); + log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs); assert ctx.counter == null; assert ctx.iter == null; ctx.counter = new AtomicInteger(queues.size()); @@ -1168,8 +1168,8 @@ private Future checkQueues() { var p = Promise.promise(); upperBoundParallel.request(checkQueueRequestsQuota, null, new UpperBoundParallel.Mentor() { @Override public boolean runOneMore(BiConsumer onDone, Void ctx_) { - log.debug("upperBoundParallel time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs); if (ctx.iter.hasNext()) { + final long perQueueStartTs = System.currentTimeMillis(); var queueObject = ctx.iter.next(); // Check if the inactive queue is not empty (i.e. the key exists) final String queueName = queueObject.toString(); @@ -1186,7 +1186,7 @@ private Future checkQueues() { if (notifyConsumerEvent.failed()) log.warn("TODO error handling", exceptionFactory.newException("notifyConsumer(" + queueName + ") failed", notifyConsumerEvent.cause())); - log.debug("refreshRegistration time used for queue: {}, is {}", queues, System.currentTimeMillis() - startTs); + log.debug("refreshRegistration for queue {} time used is {} ms", queueName, System.currentTimeMillis() - perQueueStartTs); onDone.accept(null, null); }); }); @@ -1246,6 +1246,7 @@ private Future checkQueues() { } }); } else { + log.debug("all queue items time used is {} ms", System.currentTimeMillis() - startTs); onDone.accept(null, null); } return ctx.iter.hasNext(); From 9363a5013843c2c051069f032e4cedc24c227bdc Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Sat, 26 Oct 2024 08:56:55 +0700 Subject: [PATCH 7/9] debug hit changed --- src/main/java/org/swisspush/redisques/RedisQues.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 6a2157ce..5dafc346 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -670,7 +670,7 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s private void registerQueueCheck() { - periodicSkipScheduler.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), "registerQueueCheck", periodicEvent -> { + periodicSkipScheduler.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), "checkQueues", periodicEvent -> { redisProvider.redis().compose((RedisAPI redisAPI) -> { int checkInterval = configurationProvider.configuration().getCheckInterval(); return redisAPI.send(Command.SET, queueCheckLastexecKey, String.valueOf(currentTimeMillis()), "NX", "EX", String.valueOf(checkInterval)); From 88eb5242f8e948b5b411163147998551ae5c05c2 Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Sat, 26 Oct 2024 11:38:58 +0700 Subject: [PATCH 8/9] allow ConsumerLock Multiplier to be configurable --- .../org/swisspush/redisques/RedisQues.java | 2 +- .../util/RedisquesConfiguration.java | 54 +++++++++++++++++-- .../util/RedisquesConfigurationTest.java | 9 ++++ 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 5dafc346..cec5d69e 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -332,7 +332,7 @@ public void start(Promise promise) { consumersPrefix = modConfig.getRedisPrefix() + "consumers:"; locksKey = modConfig.getRedisPrefix() + "locks"; queueCheckLastexecKey = modConfig.getRedisPrefix() + "check:lastexec"; - consumerLockTime = 20 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive + consumerLockTime = modConfig.getConsumerLockMultiplier() * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive timer = new RedisQuesTimer(vertx); if (redisProvider == null) { diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java index 20a11b88..6a4ea3b2 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java @@ -26,6 +26,7 @@ public class RedisquesConfiguration { private final String metricStorageName; private final int metricRefreshPeriod; private final int refreshPeriod; + private final int consumerLockMultiplier; private final List redisHosts; private final List redisPorts; private boolean redisEnableTls; @@ -66,6 +67,7 @@ public class RedisquesConfiguration { private static final int DEFAULT_REDIS_RECONNECT_ATTEMPTS = 0; private static final int DEFAULT_REDIS_RECONNECT_DELAY_SEC = 30; private static final int DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS = 180_000; + private static final int DEFAULT_CONSUMER_LOCK_MULTIPLIER = 2; // We want to have more than the default of 24 max waiting requests and therefore // set the default here to infinity value. See as well: @@ -88,6 +90,7 @@ public class RedisquesConfiguration { public static final String PROP_METRIC_STORAGE_NAME = "metric-storage-name"; public static final String PROP_METRIC_REFRESH_PERIOD = "metric-refresh-period"; public static final String PROP_REFRESH_PERIOD = "refresh-period"; + public static final String PROP_CONSUMERLOCKMULTIPLIER = "consumer-lock-multiplier"; public static final String PROP_REDIS_HOST = "redisHost"; public static final String PROP_REDIS_HOST_LIST = "redisHosts"; public static final String PROP_REDIS_PORT = "redisPort"; @@ -148,7 +151,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, - metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), RedisClientType.STANDALONE, redisAuth, null, null, false, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, @@ -173,7 +176,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, - metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), RedisClientType.STANDALONE, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, @@ -198,7 +201,32 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, List queueConfigurations, boolean enableQueueNameDecoding) { this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, - metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort), + redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, + processorDelayMax, httpRequestHandlerEnabled, + httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, + httpRequestHandlerPassword, httpRequestHandlerPort, httpRequestHandlerUserHeader, queueConfigurations, + enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE, + DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT, + DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC, DEFAULT_REDIS_RECONNECT_ATTEMPTS, DEFAULT_REDIS_RECONNECT_DELAY_SEC, + DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS, DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC, + DEFAULT_REDIS_READY_CHECK_INTERVAL_MS); + } + + /** + * Constructor with username and password (Redis ACL) + */ + public RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress, + String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod, + int consumerLockMultiplier, String redisHost, int redisPort, RedisClientType redisClientType, String redisPassword, + String redisUser, boolean redisEnableTls, int checkInterval, + int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled, + boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix, + String httpRequestHandlerUsername, String httpRequestHandlerPassword, + Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader, + List queueConfigurations, boolean enableQueueNameDecoding) { + this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName, + metricRefreshPeriod, refreshPeriod, consumerLockMultiplier, Collections.singletonList(redisHost), Collections.singletonList(redisPort), redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout, processorDelayMax, httpRequestHandlerEnabled, httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername, @@ -212,7 +240,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress private RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress, String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod, - List redisHosts, List redisPorts, RedisClientType redisClientType, + int consumerLockMultiplier, List redisHosts, List redisPorts, RedisClientType redisClientType, String redisAuth, String redisPassword, String redisUser, boolean redisEnableTls, int checkInterval, int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled, boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix, @@ -229,6 +257,7 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres this.processorAddress = processorAddress; this.publishMetricsAddress = publishMetricsAddress; this.refreshPeriod = refreshPeriod; + this.consumerLockMultiplier = consumerLockMultiplier; this.redisHosts = redisHosts; this.redisPorts = redisPorts; this.redisClientType = redisClientType; @@ -326,7 +355,7 @@ public static RedisquesConfigurationBuilder with() { private RedisquesConfiguration(RedisquesConfigurationBuilder builder) { this(builder.address, builder.configurationUpdatedAddress, builder.redisPrefix, builder.processorAddress, builder.publishMetricsAddress, builder.metricStorageName, builder.metricRefreshPeriod, - builder.refreshPeriod, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth, + builder.refreshPeriod, builder.consumerLockMultiplier, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth, builder.redisPassword, builder.redisUser, builder.redisEnableTls, builder.checkInterval, builder.processorTimeout, builder.processorDelayMax, builder.httpRequestHandlerEnabled, builder.httpRequestHandlerAuthenticationEnabled, builder.httpRequestHandlerPrefix, @@ -354,6 +383,7 @@ public JsonObject asJsonObject() { obj.put(PROP_METRIC_STORAGE_NAME, getMetricStorageName()); obj.put(PROP_METRIC_REFRESH_PERIOD, getMetricRefreshPeriod()); obj.put(PROP_REFRESH_PERIOD, getRefreshPeriod()); + obj.put(PROP_CONSUMERLOCKMULTIPLIER, getConsumerLockMultiplier()); obj.put(PROP_REDIS_HOST, getRedisHost()); obj.put(PROP_REDIS_HOST_LIST, getRedisHosts()); obj.put(PROP_REDIS_PORT, getRedisPort()); @@ -415,6 +445,9 @@ public static RedisquesConfiguration fromJsonObject(JsonObject json) { if (json.containsKey(PROP_REFRESH_PERIOD)) { builder.refreshPeriod(json.getInteger(PROP_REFRESH_PERIOD)); } + if (json.containsKey(PROP_CONSUMERLOCKMULTIPLIER)) { + builder.consumerLockMultiplier(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER)); + } if (json.containsKey(PROP_REDIS_HOST)) { builder.redisHost(json.getString(PROP_REDIS_HOST)); } @@ -549,6 +582,10 @@ public int getRefreshPeriod() { return refreshPeriod; } + public int getConsumerLockMultiplier() { + return consumerLockMultiplier; + } + public String getRedisHost() { return redisHosts.get(0); } @@ -728,6 +765,7 @@ public static class RedisquesConfigurationBuilder { private String metricStorageName; private int metricRefreshPeriod; private int refreshPeriod; + private int consumerLockMultiplier; private List redisHosts; private List redisPorts; private boolean redisEnableTls; @@ -767,6 +805,7 @@ public RedisquesConfigurationBuilder() { this.processorAddress = "redisques-processor"; this.metricRefreshPeriod = 10; this.refreshPeriod = 10; + this.consumerLockMultiplier = DEFAULT_CONSUMER_LOCK_MULTIPLIER; this.redisHosts = Collections.singletonList("localhost"); this.redisPorts = Collections.singletonList(6379); this.redisEnableTls = false; @@ -836,6 +875,11 @@ public RedisquesConfigurationBuilder refreshPeriod(int refreshPeriod) { return this; } + public RedisquesConfigurationBuilder consumerLockMultiplier(int consumerLockMultiplier) { + this.consumerLockMultiplier = consumerLockMultiplier; + return this; + } + public RedisquesConfigurationBuilder redisHost(String redisHost) { this.redisHosts = Collections.singletonList(redisHost); return this; diff --git a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java index e9790f29..9cd2da9d 100644 --- a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java +++ b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java @@ -30,6 +30,7 @@ public void testDefaultConfiguration(TestContext testContext) { testContext.assertEquals(config.getProcessorAddress(), "redisques-processor"); testContext.assertEquals(config.getMetricRefreshPeriod(), 10); testContext.assertEquals(config.getRefreshPeriod(), 10); + testContext.assertEquals(config.getConsumerLockMultiplier(), 2); testContext.assertEquals(config.getRedisHost(), "localhost"); testContext.assertEquals(config.getRedisPort(), 6379); testContext.assertEquals(config.getRedisEnableTls(), false); @@ -77,6 +78,7 @@ public void testOverrideConfiguration(TestContext testContext) { new QueueConfiguration().withPattern("vehicle-.*").withRetryIntervals(10, 20, 30, 60) )) .queueSpeedIntervalSec(1) + .consumerLockMultiplier(9) .memoryUsageLimitPercent(80) .publishMetricsAddress("eventbus-addr-1") .metricStorageName("queue") @@ -110,6 +112,7 @@ public void testOverrideConfiguration(TestContext testContext) { testContext.assertEquals(config.getMemoryUsageLimitPercent(), 80); testContext.assertEquals(config.getPublishMetricsAddress(), "eventbus-addr-1"); testContext.assertEquals(config.getMetricStorageName(), "queue"); + testContext.assertEquals(config.getConsumerLockMultiplier(), 9); // queue configurations testContext.assertEquals(config.getQueueConfigurations().size(), 1); QueueConfiguration queueConfiguration = config.getQueueConfigurations().get(0); @@ -128,6 +131,7 @@ public void testGetDefaultAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getString(PROP_PROCESSOR_ADDRESS), "redisques-processor"); testContext.assertEquals(json.getInteger(PROP_METRIC_REFRESH_PERIOD), 10); testContext.assertEquals(json.getInteger(PROP_REFRESH_PERIOD), 10); + testContext.assertEquals(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER), 2); testContext.assertEquals(json.getString(PROP_REDIS_HOST), "localhost"); testContext.assertEquals(json.getInteger(PROP_REDIS_PORT), 6379); testContext.assertFalse(json.getBoolean(PROP_REDIS_ENABLE_TLS)); @@ -178,6 +182,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) { .memoryUsageLimitPercent(55) .dequeueStatisticReportIntervalSec(44) .redisReadyCheckIntervalMs(5000) + .consumerLockMultiplier(3) .publishMetricsAddress("eventbus-addr-1") .metricStorageName("queue") .build(); @@ -214,6 +219,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getInteger(PROP_REDIS_READY_CHECK_INTERVAL_MS), 5000); testContext.assertEquals(json.getString(PROP_PUBLISH_METRICS_ADDRESS), "eventbus-addr-1"); testContext.assertEquals(json.getString(PROP_METRIC_STORAGE_NAME), "queue"); + testContext.assertEquals(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER), 3); // queue configurations JsonArray queueConfigurationsJsonArray = json.getJsonArray(PROP_QUEUE_CONFIGURATIONS); List queueConfigurationJsonObjects = queueConfigurationsJsonArray.getList(); @@ -233,6 +239,7 @@ public void testGetDefaultFromJsonObject(TestContext testContext) { testContext.assertEquals(config.getRedisPrefix(), "redisques:"); testContext.assertEquals(config.getProcessorAddress(), "redisques-processor"); testContext.assertEquals(config.getRefreshPeriod(), 10); + testContext.assertEquals(config.getConsumerLockMultiplier(), 2); testContext.assertEquals(config.getMetricRefreshPeriod(), 10); testContext.assertEquals(config.getRedisHost(), "localhost"); testContext.assertEquals(config.getRedisPort(), 6379); @@ -267,6 +274,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) { json.put(PROP_REDIS_PREFIX, "new_redis-prefix"); json.put(PROP_PROCESSOR_ADDRESS, "new_processor-address"); json.put(PROP_REFRESH_PERIOD, 99); + json.put(PROP_CONSUMERLOCKMULTIPLIER, 91); json.put(PROP_METRIC_REFRESH_PERIOD, 55); json.put(PROP_REDIS_HOST, "newredishost"); json.put(PROP_REDIS_PORT, 4321); @@ -300,6 +308,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) { testContext.assertEquals(config.getRedisPrefix(), "new_redis-prefix"); testContext.assertEquals(config.getProcessorAddress(), "new_processor-address"); testContext.assertEquals(config.getRefreshPeriod(), 99); + testContext.assertEquals(config.getConsumerLockMultiplier(), 91); testContext.assertEquals(config.getMetricRefreshPeriod(), 55); testContext.assertEquals(config.getRedisHost(), "newredishost"); testContext.assertEquals(config.getRedisPort(), 4321); From b064f02372aac573693460d5592fd7d587d48c4d Mon Sep 17 00:00:00 2001 From: Xin Zheng Date: Mon, 28 Oct 2024 11:15:09 +0700 Subject: [PATCH 9/9] rename and log level changed --- src/main/java/org/swisspush/redisques/RedisQues.java | 4 +++- .../swisspush/redisques/util/RedisquesConfiguration.java | 8 ++++---- .../redisques/util/RedisquesConfigurationTest.java | 6 +++--- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index cec5d69e..afc3434d 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -1186,7 +1186,9 @@ private Future checkQueues() { if (notifyConsumerEvent.failed()) log.warn("TODO error handling", exceptionFactory.newException("notifyConsumer(" + queueName + ") failed", notifyConsumerEvent.cause())); - log.debug("refreshRegistration for queue {} time used is {} ms", queueName, System.currentTimeMillis() - perQueueStartTs); + if (log.isTraceEnabled()) { + log.trace("refreshRegistration for queue {} time used is {} ms", queueName, System.currentTimeMillis() - perQueueStartTs); + } onDone.accept(null, null); }); }); diff --git a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java index 6a4ea3b2..00074def 100644 --- a/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java +++ b/src/main/java/org/swisspush/redisques/util/RedisquesConfiguration.java @@ -90,7 +90,7 @@ public class RedisquesConfiguration { public static final String PROP_METRIC_STORAGE_NAME = "metric-storage-name"; public static final String PROP_METRIC_REFRESH_PERIOD = "metric-refresh-period"; public static final String PROP_REFRESH_PERIOD = "refresh-period"; - public static final String PROP_CONSUMERLOCKMULTIPLIER = "consumer-lock-multiplier"; + public static final String PROP_CONSUMER_LOCK_MULTIPLIER = "consumer-lock-multiplier"; public static final String PROP_REDIS_HOST = "redisHost"; public static final String PROP_REDIS_HOST_LIST = "redisHosts"; public static final String PROP_REDIS_PORT = "redisPort"; @@ -383,7 +383,7 @@ public JsonObject asJsonObject() { obj.put(PROP_METRIC_STORAGE_NAME, getMetricStorageName()); obj.put(PROP_METRIC_REFRESH_PERIOD, getMetricRefreshPeriod()); obj.put(PROP_REFRESH_PERIOD, getRefreshPeriod()); - obj.put(PROP_CONSUMERLOCKMULTIPLIER, getConsumerLockMultiplier()); + obj.put(PROP_CONSUMER_LOCK_MULTIPLIER, getConsumerLockMultiplier()); obj.put(PROP_REDIS_HOST, getRedisHost()); obj.put(PROP_REDIS_HOST_LIST, getRedisHosts()); obj.put(PROP_REDIS_PORT, getRedisPort()); @@ -445,8 +445,8 @@ public static RedisquesConfiguration fromJsonObject(JsonObject json) { if (json.containsKey(PROP_REFRESH_PERIOD)) { builder.refreshPeriod(json.getInteger(PROP_REFRESH_PERIOD)); } - if (json.containsKey(PROP_CONSUMERLOCKMULTIPLIER)) { - builder.consumerLockMultiplier(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER)); + if (json.containsKey(PROP_CONSUMER_LOCK_MULTIPLIER)) { + builder.consumerLockMultiplier(json.getInteger(PROP_CONSUMER_LOCK_MULTIPLIER)); } if (json.containsKey(PROP_REDIS_HOST)) { builder.redisHost(json.getString(PROP_REDIS_HOST)); diff --git a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java index 9cd2da9d..b82e2a70 100644 --- a/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java +++ b/src/test/java/org/swisspush/redisques/util/RedisquesConfigurationTest.java @@ -131,7 +131,7 @@ public void testGetDefaultAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getString(PROP_PROCESSOR_ADDRESS), "redisques-processor"); testContext.assertEquals(json.getInteger(PROP_METRIC_REFRESH_PERIOD), 10); testContext.assertEquals(json.getInteger(PROP_REFRESH_PERIOD), 10); - testContext.assertEquals(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER), 2); + testContext.assertEquals(json.getInteger(PROP_CONSUMER_LOCK_MULTIPLIER), 2); testContext.assertEquals(json.getString(PROP_REDIS_HOST), "localhost"); testContext.assertEquals(json.getInteger(PROP_REDIS_PORT), 6379); testContext.assertFalse(json.getBoolean(PROP_REDIS_ENABLE_TLS)); @@ -219,7 +219,7 @@ public void testGetOverriddenAsJsonObject(TestContext testContext) { testContext.assertEquals(json.getInteger(PROP_REDIS_READY_CHECK_INTERVAL_MS), 5000); testContext.assertEquals(json.getString(PROP_PUBLISH_METRICS_ADDRESS), "eventbus-addr-1"); testContext.assertEquals(json.getString(PROP_METRIC_STORAGE_NAME), "queue"); - testContext.assertEquals(json.getInteger(PROP_CONSUMERLOCKMULTIPLIER), 3); + testContext.assertEquals(json.getInteger(PROP_CONSUMER_LOCK_MULTIPLIER), 3); // queue configurations JsonArray queueConfigurationsJsonArray = json.getJsonArray(PROP_QUEUE_CONFIGURATIONS); List queueConfigurationJsonObjects = queueConfigurationsJsonArray.getList(); @@ -274,7 +274,7 @@ public void testGetOverriddenFromJsonObject(TestContext testContext) { json.put(PROP_REDIS_PREFIX, "new_redis-prefix"); json.put(PROP_PROCESSOR_ADDRESS, "new_processor-address"); json.put(PROP_REFRESH_PERIOD, 99); - json.put(PROP_CONSUMERLOCKMULTIPLIER, 91); + json.put(PROP_CONSUMER_LOCK_MULTIPLIER, 91); json.put(PROP_METRIC_REFRESH_PERIOD, 55); json.put(PROP_REDIS_HOST, "newredishost"); json.put(PROP_REDIS_PORT, 4321);