Skip to content

Commit

Permalink
Merge pull request #163 from hiddenalpha/SDCISA-13746-ConvertReporter…
Browse files Browse the repository at this point in the history
…ToATrulyAsyncLoop-20240403-1013

[SDCISA-13746] Reduce EvLoop blocking time while publishing QueueStats

fixes #162
  • Loading branch information
hiddenalpha authored Apr 4, 2024
2 parents a52531e + c12dac2 commit 7eaa0ca
Showing 1 changed file with 64 additions and 5 deletions.
69 changes: 64 additions & 5 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.swisspush.redisques.util.RedisquesAPI.*;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.*;

import static java.lang.System.currentTimeMillis;

public class RedisQues extends AbstractVerticle {

public static class RedisQuesBuilder {
Expand Down Expand Up @@ -66,7 +69,7 @@ private enum QueueState {
// The queues this verticle is listening to
private final Map<String, QueueState> myQueues = new HashMap<>();

private final Logger log = LoggerFactory.getLogger(RedisQues.class);
private static final Logger log = LoggerFactory.getLogger(RedisQues.class);

private QueueStatisticsCollector queueStatisticsCollector;

Expand Down Expand Up @@ -166,10 +169,8 @@ public void start(Promise<Void> promise) {

int dequeueStatisticReportInterval = modConfig.getDequeueStatisticReportIntervalSec();
if (dequeueStatisticReportInterval > 0) {
vertx.setPeriodic(1000L * dequeueStatisticReportInterval, handler -> {
dequeueStatistic.forEach((queueName, dequeueStatistic) ->
queueStatisticsCollector.setDequeueStatistic(queueName, dequeueStatistic));
});
Runnable publisher = newDequeueStatisticPublisher();
vertx.setPeriodic(1000L * dequeueStatisticReportInterval, time -> publisher.run());
}

queuesKey = modConfig.getRedisPrefix() + "queues";
Expand Down Expand Up @@ -259,6 +260,64 @@ private void initialize() {
registerQueueCheck();
}

private Runnable newDequeueStatisticPublisher() {
return new Runnable() {
final AtomicBoolean isRunning = new AtomicBoolean();
Iterator<Map.Entry<String, DequeueStatistic>> iter;
long startEpochMs;
int i, size;
public void run() {
if (!isRunning.compareAndSet(false, true)) {
log.warn("Previous publish run still in progress at idx {} of {} since {}ms",
i, size, currentTimeMillis() - startEpochMs);
return;
}
try {
// Local copy to prevent changes between 'size' and 'iterator' call, plus
// to prevent changes of the backing set while we're iterating.
Map<String, DequeueStatistic> localCopy = new HashMap<>(dequeueStatistic);
size = localCopy.size();
iter = localCopy.entrySet().iterator();
i = 0;
startEpochMs = currentTimeMillis();
if (size > 5_000) log.warn("Going to report {} dequeue statistics towards collector", size);
else if (size > 500) log.info("Going to report {} dequeue statistics towards collector", size);
else log.debug("Going to report {} dequeue statistics towards collector", size);
} catch (Throwable ex) {
isRunning.set(false);
throw ex;
}
resume();
}
void resume() {
try {
long stepEpochMs = currentTimeMillis();
int stepBeginIdx = i;
while (iter.hasNext()) {
var entry = iter.next();
var queueName = entry.getKey();
var dequeueStatistic = entry.getValue();
queueStatisticsCollector.setDequeueStatistic(queueName, dequeueStatistic);
i += 1;
long nowEpochMs = currentTimeMillis();
long stepDurationMs = nowEpochMs - stepEpochMs;
if (stepDurationMs >= 8) {
log.debug("Release EvLoop after step={}x, soFar={}x, size={}x statistics. Took step={}ms, total={}ms.",
i - stepBeginIdx, i, size, stepDurationMs, nowEpochMs - startEpochMs);
vertx.runOnContext(v -> resume());
return;
}
}
log.debug("Done publishing {} queue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs);
isRunning.set(false);
} catch (Throwable ex) {
isRunning.set(false);
throw ex;
}
}
};
}

private void registerActiveQueueRegistrationRefresh() {
// Periodic refresh of my registrations on active queues.
vertx.setPeriodic(configurationProvider.configuration().getRefreshPeriod() * 1000L, event -> {
Expand Down

0 comments on commit 7eaa0ca

Please sign in to comment.