diff --git a/src/main/java/org/swisspush/redisques/RedisQues.java b/src/main/java/org/swisspush/redisques/RedisQues.java index 348963e..380f2c1 100644 --- a/src/main/java/org/swisspush/redisques/RedisQues.java +++ b/src/main/java/org/swisspush/redisques/RedisQues.java @@ -16,11 +16,14 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static org.swisspush.redisques.util.RedisquesAPI.*; import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.*; +import static java.lang.System.currentTimeMillis; + public class RedisQues extends AbstractVerticle { public static class RedisQuesBuilder { @@ -66,7 +69,7 @@ private enum QueueState { // The queues this verticle is listening to private final Map myQueues = new HashMap<>(); - private final Logger log = LoggerFactory.getLogger(RedisQues.class); + private static final Logger log = LoggerFactory.getLogger(RedisQues.class); private QueueStatisticsCollector queueStatisticsCollector; @@ -166,10 +169,8 @@ public void start(Promise promise) { int dequeueStatisticReportInterval = modConfig.getDequeueStatisticReportIntervalSec(); if (dequeueStatisticReportInterval > 0) { - vertx.setPeriodic(1000L * dequeueStatisticReportInterval, handler -> { - dequeueStatistic.forEach((queueName, dequeueStatistic) -> - queueStatisticsCollector.setDequeueStatistic(queueName, dequeueStatistic)); - }); + Runnable publisher = newDequeueStatisticPublisher(); + vertx.setPeriodic(1000L * dequeueStatisticReportInterval, time -> publisher.run()); } queuesKey = modConfig.getRedisPrefix() + "queues"; @@ -259,6 +260,64 @@ private void initialize() { registerQueueCheck(); } + private Runnable newDequeueStatisticPublisher() { + return new Runnable() { + final AtomicBoolean isRunning = new AtomicBoolean(); + Iterator> iter; + long startEpochMs; + int i, size; + public void run() { + if (!isRunning.compareAndSet(false, true)) { + log.warn("Previous publish run still in progress at idx {} of {} since {}ms", + i, size, currentTimeMillis() - startEpochMs); + return; + } + try { + // Local copy to prevent changes between 'size' and 'iterator' call, plus + // to prevent changes of the backing set while we're iterating. + Map localCopy = new HashMap<>(dequeueStatistic); + size = localCopy.size(); + iter = localCopy.entrySet().iterator(); + i = 0; + startEpochMs = currentTimeMillis(); + if (size > 5_000) log.warn("Going to report {} dequeue statistics towards collector", size); + else if (size > 500) log.info("Going to report {} dequeue statistics towards collector", size); + else log.debug("Going to report {} dequeue statistics towards collector", size); + } catch (Throwable ex) { + isRunning.set(false); + throw ex; + } + resume(); + } + void resume() { + try { + long stepEpochMs = currentTimeMillis(); + int stepBeginIdx = i; + while (iter.hasNext()) { + var entry = iter.next(); + var queueName = entry.getKey(); + var dequeueStatistic = entry.getValue(); + queueStatisticsCollector.setDequeueStatistic(queueName, dequeueStatistic); + i += 1; + long nowEpochMs = currentTimeMillis(); + long stepDurationMs = nowEpochMs - stepEpochMs; + if (stepDurationMs >= 8) { + log.debug("Release EvLoop after step={}x, soFar={}x, size={}x statistics. Took step={}ms, total={}ms.", + i - stepBeginIdx, i, size, stepDurationMs, nowEpochMs - startEpochMs); + vertx.runOnContext(v -> resume()); + return; + } + } + log.debug("Done publishing {} queue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs); + isRunning.set(false); + } catch (Throwable ex) { + isRunning.set(false); + throw ex; + } + } + }; + } + private void registerActiveQueueRegistrationRefresh() { // Periodic refresh of my registrations on active queues. vertx.setPeriodic(configurationProvider.configuration().getRefreshPeriod() * 1000L, event -> {