diff --git a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java index addf139a..c207ddb0 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java +++ b/src/main/java/gov/usgs/earthquake/distribution/DefaultNotificationReceiver.java @@ -685,6 +685,12 @@ public Map getQueueStatus() { return null; } + public void throttleQueues() throws InterruptedException { + if (notifier instanceof ExecutorListenerNotifier) { + ((ExecutorListenerNotifier) notifier).throttleQueues(); + } + } + public Long getReceiverCleanupInterval() { return receiverCleanupInterval; } @@ -701,6 +707,14 @@ public void setConnectTimeout(int connectTimeout) { this.connectTimeout = connectTimeout; } + public ListenerNotifier getNotifier() { + return this.notifier; + } + + public void setNotifier(final ListenerNotifier notifier) { + this.notifier = notifier; + } + public int getReadTimeout() { return readTimeout; } diff --git a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java index 245db2c5..5508dfb8 100644 --- a/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java +++ b/src/main/java/gov/usgs/earthquake/distribution/ExecutorListenerNotifier.java @@ -48,6 +48,15 @@ public class ExecutorListenerNotifier extends DefaultConfigurable implements */ protected Timer retryTimer = new Timer(); + /** When queue size reaches this level, start throttling */ + protected int throttleStartThreshold = 50000; + + /** When queue size reaches this level, stop throttling */ + protected int throttleStopThreshold = 25000; + + /** When throttling, wait this many milliseconds between queue size checks. */ + protected long throttleWaitInterval = 5000L; + public ExecutorListenerNotifier(final DefaultNotificationReceiver receiver) { this.receiver = receiver; } @@ -242,6 +251,9 @@ public void startup() throws Exception { // still valid this.notifyListeners(event, gracefulListeners); } + + // try to keep queue size managable during restart + throttleQueues(); } LOGGER.info("All notifications queued"); @@ -279,6 +291,58 @@ public Map getStatus() { return status; } + /** + * Check queue status and return length of longest queue. + * + * @return length of longest queue, or null if no queue lengths. + */ + public Integer getMaxQueueSize() { + final Map status = getStatus(); + Integer maxQueueSize = null; + for (Integer queueSize : status.values()) { + if (queueSize != null && (maxQueueSize == null || queueSize > maxQueueSize)) { + maxQueueSize = queueSize; + } + } + return maxQueueSize; + } + + /** + * If longest queue has more than 50k notifications, + * wait until longest queue has 25k notifications before returning. + * + * @throws InterruptedException + */ + public void throttleQueues() throws InterruptedException { + // try to keep queue size managable during restart + int maxSize = throttleStartThreshold; + // track whether any throttles occurred + boolean throttled = false; + + while (true) { + final Integer size = getMaxQueueSize(); + if (size == null || size <= maxSize) { + // within limit + if (throttled) { + LOGGER.info("[" + getName() + "] done throttling (size = " + size + ")"); + } + break; + } + + throttled = true; + LOGGER.info("[" + getName() + "]" + + " queueing throttled until below " + + throttleStopThreshold + + " (size = " + size + ")"); + // too many messages queued + // set maxSize to stop threshold + maxSize = throttleStopThreshold; + // wait for listener to do some processing + // 5s is a little low, but don't want to wait too long + Thread.sleep(throttleWaitInterval); + } + } + /** * NOTE: messing with the executors map is not a good idea. * @@ -288,4 +352,13 @@ public Map getExecutors() { return notificationListeners; } + public int getThrottleStartThreshold() { return this.throttleStartThreshold; } + public void setThrottleStartThreshold(final int n) { this.throttleStartThreshold = n; } + + public int getThrottleStopThreshold() { return this.throttleStopThreshold; } + public void setThrottleStopThreshold(final int n) { this.throttleStopThreshold = n; } + + public long getThrottleWaitInterval() { return this.throttleWaitInterval; } + public void setThrottleWaitInterval(final long ms) { this.throttleWaitInterval = ms; } + }