Skip to content

Commit

Permalink
Merge pull request #216 from swisspost/PeriodicSkipScheduler-test
Browse files Browse the repository at this point in the history
Ensure that checkQueues() is not executed in parallel, more logs added
  • Loading branch information
dominik-cnx authored Oct 29, 2024
2 parents bcded03 + b064f02 commit 05e4a2e
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 8 deletions.
13 changes: 10 additions & 3 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void start(Promise<Void> promise) {
consumersPrefix = modConfig.getRedisPrefix() + "consumers:";
locksKey = modConfig.getRedisPrefix() + "locks";
queueCheckLastexecKey = modConfig.getRedisPrefix() + "check:lastexec";
consumerLockTime = 2 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive
consumerLockTime = modConfig.getConsumerLockMultiplier() * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive
timer = new RedisQuesTimer(vertx);

if (redisProvider == null) {
Expand Down Expand Up @@ -670,7 +670,7 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s


private void registerQueueCheck() {
vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> {
periodicSkipScheduler.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), "checkQueues", periodicEvent -> {
redisProvider.redis().compose((RedisAPI redisAPI) -> {
int checkInterval = configurationProvider.configuration().getCheckInterval();
return redisAPI.send(Command.SET, queueCheckLastexecKey, String.valueOf(currentTimeMillis()), "NX", "EX", String.valueOf(checkInterval));
Expand Down Expand Up @@ -1141,6 +1141,7 @@ private void updateTimestamp(final String queueName, Handler<AsyncResult<Respons
* This uses a sorted set of queue names scored by last update timestamp.
*/
private Future<Void> checkQueues() {
final long startTs = System.currentTimeMillis();
final var ctx = new Object() {
long limit;
RedisAPI redisAPI;
Expand All @@ -1158,6 +1159,7 @@ private Future<Void> checkQueues() {
redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p);
return p.future();
}).compose((Response queues) -> {
log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs);
assert ctx.counter == null;
assert ctx.iter == null;
ctx.counter = new AtomicInteger(queues.size());
Expand All @@ -1167,6 +1169,7 @@ private Future<Void> checkQueues() {
upperBoundParallel.request(checkQueueRequestsQuota, null, new UpperBoundParallel.Mentor<Void>() {
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void ctx_) {
if (ctx.iter.hasNext()) {
final long perQueueStartTs = System.currentTimeMillis();
var queueObject = ctx.iter.next();
// Check if the inactive queue is not empty (i.e. the key exists)
final String queueName = queueObject.toString();
Expand All @@ -1183,13 +1186,16 @@ private Future<Void> checkQueues() {
if (notifyConsumerEvent.failed()) log.warn("TODO error handling",
exceptionFactory.newException("notifyConsumer(" + queueName + ") failed",
notifyConsumerEvent.cause()));
if (log.isTraceEnabled()) {
log.trace("refreshRegistration for queue {} time used is {} ms", queueName, System.currentTimeMillis() - perQueueStartTs);
}
onDone.accept(null, null);
});
});
};
ctx.redisAPI.exists(Collections.singletonList(key), event -> {
if (event.failed() || event.result() == null) {
log.error("RedisQues is unable to check existence of queue " + queueName,
log.error("RedisQues is unable to check existence of queue {}", queueName,
exceptionFactory.newException("redisAPI.exists(" + key + ") failed", event.cause()));
onDone.accept(null, null);
return;
Expand Down Expand Up @@ -1242,6 +1248,7 @@ private Future<Void> checkQueues() {
}
});
} else {
log.debug("all queue items time used is {} ms", System.currentTimeMillis() - startTs);
onDone.accept(null, null);
}
return ctx.iter.hasNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class RedisquesConfiguration {
private final String metricStorageName;
private final int metricRefreshPeriod;
private final int refreshPeriod;
private final int consumerLockMultiplier;
private final List<String> redisHosts;
private final List<Integer> redisPorts;
private boolean redisEnableTls;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class RedisquesConfiguration {
private static final int DEFAULT_REDIS_RECONNECT_ATTEMPTS = 0;
private static final int DEFAULT_REDIS_RECONNECT_DELAY_SEC = 30;
private static final int DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS = 180_000;
private static final int DEFAULT_CONSUMER_LOCK_MULTIPLIER = 2;

// We want to have more than the default of 24 max waiting requests and therefore
// set the default here to infinity value. See as well:
Expand All @@ -88,6 +90,7 @@ public class RedisquesConfiguration {
public static final String PROP_METRIC_STORAGE_NAME = "metric-storage-name";
public static final String PROP_METRIC_REFRESH_PERIOD = "metric-refresh-period";
public static final String PROP_REFRESH_PERIOD = "refresh-period";
public static final String PROP_CONSUMER_LOCK_MULTIPLIER = "consumer-lock-multiplier";
public static final String PROP_REDIS_HOST = "redisHost";
public static final String PROP_REDIS_HOST_LIST = "redisHosts";
public static final String PROP_REDIS_PORT = "redisPort";
Expand Down Expand Up @@ -148,7 +151,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
RedisClientType.STANDALONE, redisAuth, null, null, false, checkInterval,
processorTimeout, processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
Expand All @@ -173,7 +176,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
RedisClientType.STANDALONE, null, redisPassword, redisUser, redisEnableTls, checkInterval,
processorTimeout, processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
Expand All @@ -198,7 +201,32 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout,
processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
httpRequestHandlerPassword, httpRequestHandlerPort, httpRequestHandlerUserHeader, queueConfigurations,
enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE,
DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT,
DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC, DEFAULT_REDIS_RECONNECT_ATTEMPTS, DEFAULT_REDIS_RECONNECT_DELAY_SEC,
DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS, DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC,
DEFAULT_REDIS_READY_CHECK_INTERVAL_MS);
}

/**
* Constructor with username and password (Redis ACL)
*/
public RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress,
String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod,
int consumerLockMultiplier, String redisHost, int redisPort, RedisClientType redisClientType, String redisPassword,
String redisUser, boolean redisEnableTls, int checkInterval,
int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled,
boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix,
String httpRequestHandlerUsername, String httpRequestHandlerPassword,
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, consumerLockMultiplier, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout,
processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
Expand All @@ -212,7 +240,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress

private RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress,
String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod,
List<String> redisHosts, List<Integer> redisPorts, RedisClientType redisClientType,
int consumerLockMultiplier, List<String> redisHosts, List<Integer> redisPorts, RedisClientType redisClientType,
String redisAuth, String redisPassword, String redisUser, boolean redisEnableTls, int checkInterval,
int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled,
boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix,
Expand All @@ -229,6 +257,7 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres
this.processorAddress = processorAddress;
this.publishMetricsAddress = publishMetricsAddress;
this.refreshPeriod = refreshPeriod;
this.consumerLockMultiplier = consumerLockMultiplier;
this.redisHosts = redisHosts;
this.redisPorts = redisPorts;
this.redisClientType = redisClientType;
Expand Down Expand Up @@ -326,7 +355,7 @@ public static RedisquesConfigurationBuilder with() {
private RedisquesConfiguration(RedisquesConfigurationBuilder builder) {
this(builder.address, builder.configurationUpdatedAddress, builder.redisPrefix,
builder.processorAddress, builder.publishMetricsAddress, builder.metricStorageName, builder.metricRefreshPeriod,
builder.refreshPeriod, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth,
builder.refreshPeriod, builder.consumerLockMultiplier, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth,
builder.redisPassword, builder.redisUser, builder.redisEnableTls, builder.checkInterval,
builder.processorTimeout, builder.processorDelayMax, builder.httpRequestHandlerEnabled,
builder.httpRequestHandlerAuthenticationEnabled, builder.httpRequestHandlerPrefix,
Expand Down Expand Up @@ -354,6 +383,7 @@ public JsonObject asJsonObject() {
obj.put(PROP_METRIC_STORAGE_NAME, getMetricStorageName());
obj.put(PROP_METRIC_REFRESH_PERIOD, getMetricRefreshPeriod());
obj.put(PROP_REFRESH_PERIOD, getRefreshPeriod());
obj.put(PROP_CONSUMER_LOCK_MULTIPLIER, getConsumerLockMultiplier());
obj.put(PROP_REDIS_HOST, getRedisHost());
obj.put(PROP_REDIS_HOST_LIST, getRedisHosts());
obj.put(PROP_REDIS_PORT, getRedisPort());
Expand Down Expand Up @@ -415,6 +445,9 @@ public static RedisquesConfiguration fromJsonObject(JsonObject json) {
if (json.containsKey(PROP_REFRESH_PERIOD)) {
builder.refreshPeriod(json.getInteger(PROP_REFRESH_PERIOD));
}
if (json.containsKey(PROP_CONSUMER_LOCK_MULTIPLIER)) {
builder.consumerLockMultiplier(json.getInteger(PROP_CONSUMER_LOCK_MULTIPLIER));
}
if (json.containsKey(PROP_REDIS_HOST)) {
builder.redisHost(json.getString(PROP_REDIS_HOST));
}
Expand Down Expand Up @@ -549,6 +582,10 @@ public int getRefreshPeriod() {
return refreshPeriod;
}

public int getConsumerLockMultiplier() {
return consumerLockMultiplier;
}

public String getRedisHost() {
return redisHosts.get(0);
}
Expand Down Expand Up @@ -728,6 +765,7 @@ public static class RedisquesConfigurationBuilder {
private String metricStorageName;
private int metricRefreshPeriod;
private int refreshPeriod;
private int consumerLockMultiplier;
private List<String> redisHosts;
private List<Integer> redisPorts;
private boolean redisEnableTls;
Expand Down Expand Up @@ -767,6 +805,7 @@ public RedisquesConfigurationBuilder() {
this.processorAddress = "redisques-processor";
this.metricRefreshPeriod = 10;
this.refreshPeriod = 10;
this.consumerLockMultiplier = DEFAULT_CONSUMER_LOCK_MULTIPLIER;
this.redisHosts = Collections.singletonList("localhost");
this.redisPorts = Collections.singletonList(6379);
this.redisEnableTls = false;
Expand Down Expand Up @@ -836,6 +875,11 @@ public RedisquesConfigurationBuilder refreshPeriod(int refreshPeriod) {
return this;
}

public RedisquesConfigurationBuilder consumerLockMultiplier(int consumerLockMultiplier) {
this.consumerLockMultiplier = consumerLockMultiplier;
return this;
}

public RedisquesConfigurationBuilder redisHost(String redisHost) {
this.redisHosts = Collections.singletonList(redisHost);
return this;
Expand Down
Loading

0 comments on commit 05e4a2e

Please sign in to comment.