Skip to content

Commit

Permalink
Merge branch 'develop' into dequeue-monitor-timestamps-incorrect
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/java/org/swisspush/redisques/RedisQues.java
  • Loading branch information
Xin Zheng committed Apr 10, 2024
2 parents 83fa814 + 7eaa0ca commit a4c58ab
Showing 1 changed file with 63 additions and 2 deletions.
65 changes: 63 additions & 2 deletions src/main/java/org/swisspush/redisques/RedisQues.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.swisspush.redisques.util.RedisquesAPI.*;
import static org.swisspush.redisques.util.RedisquesAPI.QueueOperation.*;

import static java.lang.System.currentTimeMillis;

public class RedisQues extends AbstractVerticle {

public static class RedisQuesBuilder {
Expand Down Expand Up @@ -66,9 +69,9 @@ private enum QueueState {
// The queues this verticle is listening to
private final Map<String, QueueState> myQueues = new HashMap<>();

private final Logger log = LoggerFactory.getLogger(RedisQues.class);
private DequeueStatisticCollector dequeueStatisticCollector;
private static final Logger log = LoggerFactory.getLogger(RedisQues.class);

private DequeueStatisticCollector dequeueStatisticCollector;
private QueueStatisticsCollector queueStatisticsCollector;

private Handler<Void> stoppedHandler = null;
Expand Down Expand Up @@ -272,6 +275,64 @@ private void initialize() {
registerQueueCheck();
}

private Runnable newDequeueStatisticPublisher() {
return new Runnable() {
final AtomicBoolean isRunning = new AtomicBoolean();
Iterator<Map.Entry<String, DequeueStatistic>> iter;
long startEpochMs;
int i, size;
public void run() {
if (!isRunning.compareAndSet(false, true)) {
log.warn("Previous publish run still in progress at idx {} of {} since {}ms",
i, size, currentTimeMillis() - startEpochMs);
return;
}
try {
// Local copy to prevent changes between 'size' and 'iterator' call, plus
// to prevent changes of the backing set while we're iterating.
Map<String, DequeueStatistic> localCopy = new HashMap<>(dequeueStatistic);
size = localCopy.size();
iter = localCopy.entrySet().iterator();
i = 0;
startEpochMs = currentTimeMillis();
if (size > 5_000) log.warn("Going to report {} dequeue statistics towards collector", size);
else if (size > 500) log.info("Going to report {} dequeue statistics towards collector", size);
else log.debug("Going to report {} dequeue statistics towards collector", size);
} catch (Throwable ex) {
isRunning.set(false);
throw ex;
}
resume();
}
void resume() {
try {
long stepEpochMs = currentTimeMillis();
int stepBeginIdx = i;
while (iter.hasNext()) {
var entry = iter.next();
var queueName = entry.getKey();
var dequeueStatistic = entry.getValue();
queueStatisticsCollector.setDequeueStatistic(queueName, dequeueStatistic);
i += 1;
long nowEpochMs = currentTimeMillis();
long stepDurationMs = nowEpochMs - stepEpochMs;
if (stepDurationMs >= 8) {
log.debug("Release EvLoop after step={}x, soFar={}x, size={}x statistics. Took step={}ms, total={}ms.",
i - stepBeginIdx, i, size, stepDurationMs, nowEpochMs - startEpochMs);
vertx.runOnContext(v -> resume());
return;
}
}
log.debug("Done publishing {} queue statistics. Took {}ms", i, currentTimeMillis() - startEpochMs);
isRunning.set(false);
} catch (Throwable ex) {
isRunning.set(false);
throw ex;
}
}
};
}

private void registerActiveQueueRegistrationRefresh() {
// Periodic refresh of my registrations on active queues.
vertx.setPeriodic(configurationProvider.configuration().getRefreshPeriod() * 1000L, event -> {
Expand Down

0 comments on commit a4c58ab

Please sign in to comment.