Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that checkQueues() is not executed in parallel, more logs added #216

Merged
merged 5 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public void start(Promise<Void> promise) {
consumersPrefix = modConfig.getRedisPrefix() + "consumers:";
locksKey = modConfig.getRedisPrefix() + "locks";
queueCheckLastexecKey = modConfig.getRedisPrefix() + "check:lastexec";
consumerLockTime = 2 * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive
consumerLockTime = modConfig.getConsumerLockMultiplier() * modConfig.getRefreshPeriod(); // lock is kept twice as long as its refresh interval -> never expires as long as the consumer ('we') are alive
timer = new RedisQuesTimer(vertx);

if (redisProvider == null) {
Expand Down Expand Up @@ -670,7 +670,7 @@ int updateQueueFailureCountAndGetRetryInterval(final String queueName, boolean s


private void registerQueueCheck() {
vertx.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), periodicEvent -> {
periodicSkipScheduler.setPeriodic(configurationProvider.configuration().getCheckIntervalTimerMs(), "checkQueues", periodicEvent -> {
redisProvider.redis().compose((RedisAPI redisAPI) -> {
int checkInterval = configurationProvider.configuration().getCheckInterval();
return redisAPI.send(Command.SET, queueCheckLastexecKey, String.valueOf(currentTimeMillis()), "NX", "EX", String.valueOf(checkInterval));
Expand Down Expand Up @@ -1141,6 +1141,7 @@ private void updateTimestamp(final String queueName, Handler<AsyncResult<Respons
* This uses a sorted set of queue names scored by last update timestamp.
*/
private Future<Void> checkQueues() {
final long startTs = System.currentTimeMillis();
final var ctx = new Object() {
long limit;
RedisAPI redisAPI;
Expand All @@ -1158,6 +1159,7 @@ private Future<Void> checkQueues() {
redisAPI.zrangebyscore(Arrays.asList(queuesKey, "-inf", String.valueOf(ctx.limit)), p);
return p.future();
}).compose((Response queues) -> {
log.debug("zrangebyscore time used is {} ms", System.currentTimeMillis() - startTs);
assert ctx.counter == null;
assert ctx.iter == null;
ctx.counter = new AtomicInteger(queues.size());
Expand All @@ -1167,6 +1169,7 @@ private Future<Void> checkQueues() {
upperBoundParallel.request(checkQueueRequestsQuota, null, new UpperBoundParallel.Mentor<Void>() {
@Override public boolean runOneMore(BiConsumer<Throwable, Void> onDone, Void ctx_) {
if (ctx.iter.hasNext()) {
final long perQueueStartTs = System.currentTimeMillis();
var queueObject = ctx.iter.next();
// Check if the inactive queue is not empty (i.e. the key exists)
final String queueName = queueObject.toString();
Expand All @@ -1183,13 +1186,16 @@ private Future<Void> checkQueues() {
if (notifyConsumerEvent.failed()) log.warn("TODO error handling",
exceptionFactory.newException("notifyConsumer(" + queueName + ") failed",
notifyConsumerEvent.cause()));
if (log.isTraceEnabled()) {
log.trace("refreshRegistration for queue {} time used is {} ms", queueName, System.currentTimeMillis() - perQueueStartTs);
}
onDone.accept(null, null);
});
});
};
ctx.redisAPI.exists(Collections.singletonList(key), event -> {
if (event.failed() || event.result() == null) {
log.error("RedisQues is unable to check existence of queue " + queueName,
log.error("RedisQues is unable to check existence of queue {}", queueName,
exceptionFactory.newException("redisAPI.exists(" + key + ") failed", event.cause()));
onDone.accept(null, null);
return;
Expand Down Expand Up @@ -1242,6 +1248,7 @@ private Future<Void> checkQueues() {
}
});
} else {
log.debug("all queue items time used is {} ms", System.currentTimeMillis() - startTs);
onDone.accept(null, null);
}
return ctx.iter.hasNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class RedisquesConfiguration {
private final String metricStorageName;
private final int metricRefreshPeriod;
private final int refreshPeriod;
private final int consumerLockMultiplier;
private final List<String> redisHosts;
private final List<Integer> redisPorts;
private boolean redisEnableTls;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class RedisquesConfiguration {
private static final int DEFAULT_REDIS_RECONNECT_ATTEMPTS = 0;
private static final int DEFAULT_REDIS_RECONNECT_DELAY_SEC = 30;
private static final int DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS = 180_000;
private static final int DEFAULT_CONSUMER_LOCK_MULTIPLIER = 2;

// We want to have more than the default of 24 max waiting requests and therefore
// set the default here to infinity value. See as well:
Expand All @@ -88,6 +90,7 @@ public class RedisquesConfiguration {
public static final String PROP_METRIC_STORAGE_NAME = "metric-storage-name";
public static final String PROP_METRIC_REFRESH_PERIOD = "metric-refresh-period";
public static final String PROP_REFRESH_PERIOD = "refresh-period";
public static final String PROP_CONSUMER_LOCK_MULTIPLIER = "consumer-lock-multiplier";
public static final String PROP_REDIS_HOST = "redisHost";
public static final String PROP_REDIS_HOST_LIST = "redisHosts";
public static final String PROP_REDIS_PORT = "redisPort";
Expand Down Expand Up @@ -148,7 +151,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
RedisClientType.STANDALONE, redisAuth, null, null, false, checkInterval,
processorTimeout, processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
Expand All @@ -173,7 +176,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
RedisClientType.STANDALONE, null, redisPassword, redisUser, redisEnableTls, checkInterval,
processorTimeout, processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
Expand All @@ -198,7 +201,32 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
metricRefreshPeriod, refreshPeriod, DEFAULT_CONSUMER_LOCK_MULTIPLIER, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout,
processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
httpRequestHandlerPassword, httpRequestHandlerPort, httpRequestHandlerUserHeader, queueConfigurations,
enableQueueNameDecoding, DEFAULT_REDIS_MAX_POOL_SIZE, DEFAULT_REDIS_MAX_POOL_WAIT_SIZE,
DEFAULT_REDIS_MAX_PIPELINE_WAIT_SIZE, DEFAULT_QUEUE_SPEED_INTERVAL_SEC, DEFAULT_MEMORY_USAGE_LIMIT_PCT,
DEFAULT_MEMORY_USAGE_CHECK_INTERVAL_SEC, DEFAULT_REDIS_RECONNECT_ATTEMPTS, DEFAULT_REDIS_RECONNECT_DELAY_SEC,
DEFAULT_REDIS_POOL_RECYCLE_TIMEOUT_MS, DEFAULT_DEQUEUE_STATISTIC_REPORT_INTERVAL_SEC,
DEFAULT_REDIS_READY_CHECK_INTERVAL_MS);
}

/**
* Constructor with username and password (Redis ACL)
*/
public RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress,
String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod,
int consumerLockMultiplier, String redisHost, int redisPort, RedisClientType redisClientType, String redisPassword,
String redisUser, boolean redisEnableTls, int checkInterval,
int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled,
boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix,
String httpRequestHandlerUsername, String httpRequestHandlerPassword,
Integer httpRequestHandlerPort, String httpRequestHandlerUserHeader,
List<QueueConfiguration> queueConfigurations, boolean enableQueueNameDecoding) {
this(address, configurationUpdatedAddress, redisPrefix, processorAddress, publishMetricsAddress, metricStorageName,
metricRefreshPeriod, refreshPeriod, consumerLockMultiplier, Collections.singletonList(redisHost), Collections.singletonList(redisPort),
redisClientType, null, redisPassword, redisUser, redisEnableTls, checkInterval, processorTimeout,
processorDelayMax, httpRequestHandlerEnabled,
httpRequestHandlerAuthenticationEnabled, httpRequestHandlerPrefix, httpRequestHandlerUsername,
Expand All @@ -212,7 +240,7 @@ public RedisquesConfiguration(String address, String configurationUpdatedAddress

private RedisquesConfiguration(String address, String configurationUpdatedAddress, String redisPrefix, String processorAddress,
String publishMetricsAddress, String metricStorageName, int metricRefreshPeriod, int refreshPeriod,
List<String> redisHosts, List<Integer> redisPorts, RedisClientType redisClientType,
int consumerLockMultiplier, List<String> redisHosts, List<Integer> redisPorts, RedisClientType redisClientType,
String redisAuth, String redisPassword, String redisUser, boolean redisEnableTls, int checkInterval,
int processorTimeout, long processorDelayMax, boolean httpRequestHandlerEnabled,
boolean httpRequestHandlerAuthenticationEnabled, String httpRequestHandlerPrefix,
Expand All @@ -229,6 +257,7 @@ private RedisquesConfiguration(String address, String configurationUpdatedAddres
this.processorAddress = processorAddress;
this.publishMetricsAddress = publishMetricsAddress;
this.refreshPeriod = refreshPeriod;
this.consumerLockMultiplier = consumerLockMultiplier;
this.redisHosts = redisHosts;
this.redisPorts = redisPorts;
this.redisClientType = redisClientType;
Expand Down Expand Up @@ -326,7 +355,7 @@ public static RedisquesConfigurationBuilder with() {
private RedisquesConfiguration(RedisquesConfigurationBuilder builder) {
this(builder.address, builder.configurationUpdatedAddress, builder.redisPrefix,
builder.processorAddress, builder.publishMetricsAddress, builder.metricStorageName, builder.metricRefreshPeriod,
builder.refreshPeriod, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth,
builder.refreshPeriod, builder.consumerLockMultiplier, builder.redisHosts, builder.redisPorts, builder.redisClientType, builder.redisAuth,
builder.redisPassword, builder.redisUser, builder.redisEnableTls, builder.checkInterval,
builder.processorTimeout, builder.processorDelayMax, builder.httpRequestHandlerEnabled,
builder.httpRequestHandlerAuthenticationEnabled, builder.httpRequestHandlerPrefix,
Expand Down Expand Up @@ -354,6 +383,7 @@ public JsonObject asJsonObject() {
obj.put(PROP_METRIC_STORAGE_NAME, getMetricStorageName());
obj.put(PROP_METRIC_REFRESH_PERIOD, getMetricRefreshPeriod());
obj.put(PROP_REFRESH_PERIOD, getRefreshPeriod());
obj.put(PROP_CONSUMER_LOCK_MULTIPLIER, getConsumerLockMultiplier());
obj.put(PROP_REDIS_HOST, getRedisHost());
obj.put(PROP_REDIS_HOST_LIST, getRedisHosts());
obj.put(PROP_REDIS_PORT, getRedisPort());
Expand Down Expand Up @@ -415,6 +445,9 @@ public static RedisquesConfiguration fromJsonObject(JsonObject json) {
if (json.containsKey(PROP_REFRESH_PERIOD)) {
builder.refreshPeriod(json.getInteger(PROP_REFRESH_PERIOD));
}
if (json.containsKey(PROP_CONSUMER_LOCK_MULTIPLIER)) {
builder.consumerLockMultiplier(json.getInteger(PROP_CONSUMER_LOCK_MULTIPLIER));
}
if (json.containsKey(PROP_REDIS_HOST)) {
builder.redisHost(json.getString(PROP_REDIS_HOST));
}
Expand Down Expand Up @@ -549,6 +582,10 @@ public int getRefreshPeriod() {
return refreshPeriod;
}

public int getConsumerLockMultiplier() {
return consumerLockMultiplier;
}

public String getRedisHost() {
return redisHosts.get(0);
}
Expand Down Expand Up @@ -728,6 +765,7 @@ public static class RedisquesConfigurationBuilder {
private String metricStorageName;
private int metricRefreshPeriod;
private int refreshPeriod;
private int consumerLockMultiplier;
private List<String> redisHosts;
private List<Integer> redisPorts;
private boolean redisEnableTls;
Expand Down Expand Up @@ -767,6 +805,7 @@ public RedisquesConfigurationBuilder() {
this.processorAddress = "redisques-processor";
this.metricRefreshPeriod = 10;
this.refreshPeriod = 10;
this.consumerLockMultiplier = DEFAULT_CONSUMER_LOCK_MULTIPLIER;
this.redisHosts = Collections.singletonList("localhost");
this.redisPorts = Collections.singletonList(6379);
this.redisEnableTls = false;
Expand Down Expand Up @@ -836,6 +875,11 @@ public RedisquesConfigurationBuilder refreshPeriod(int refreshPeriod) {
return this;
}

public RedisquesConfigurationBuilder consumerLockMultiplier(int consumerLockMultiplier) {
this.consumerLockMultiplier = consumerLockMultiplier;
return this;
}

public RedisquesConfigurationBuilder redisHost(String redisHost) {
this.redisHosts = Collections.singletonList(redisHost);
return this;
Expand Down
Loading
Loading