Skip to content

Commit

Permalink
Add throttling during requeuing to avoid running out of memory
Browse files Browse the repository at this point in the history
  • Loading branch information
jmfee-usgs committed Dec 16, 2020
1 parent fe2eee8 commit 59532f3
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,12 @@ public Map<String, Integer> getQueueStatus() {
return null;
}

public void throttleQueues() throws InterruptedException {
if (notifier instanceof ExecutorListenerNotifier) {
((ExecutorListenerNotifier) notifier).throttleQueues();
}
}

public Long getReceiverCleanupInterval() {
return receiverCleanupInterval;
}
Expand All @@ -701,6 +707,14 @@ public void setConnectTimeout(int connectTimeout) {
this.connectTimeout = connectTimeout;
}

public ListenerNotifier getNotifier() {
return this.notifier;
}

public void setNotifier(final ListenerNotifier notifier) {
this.notifier = notifier;
}

public int getReadTimeout() {
return readTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ public class ExecutorListenerNotifier extends DefaultConfigurable implements
*/
protected Timer retryTimer = new Timer();

/** When queue size reaches this level, start throttling */
protected int throttleStartThreshold = 50000;

/** When queue size reaches this level, stop throttling */
protected int throttleStopThreshold = 25000;

/** When throttling, wait this many milliseconds between queue size checks. */
protected long throttleWaitInterval = 5000L;

public ExecutorListenerNotifier(final DefaultNotificationReceiver receiver) {
this.receiver = receiver;
}
Expand Down Expand Up @@ -242,6 +251,9 @@ public void startup() throws Exception {
// still valid
this.notifyListeners(event, gracefulListeners);
}

// try to keep queue size managable during restart
throttleQueues();
}
LOGGER.info("All notifications queued");

Expand Down Expand Up @@ -279,6 +291,58 @@ public Map<String, Integer> getStatus() {
return status;
}

/**
* Check queue status and return length of longest queue.
*
* @return length of longest queue, or null if no queue lengths.
*/
public Integer getMaxQueueSize() {
final Map<String, Integer> status = getStatus();
Integer maxQueueSize = null;
for (Integer queueSize : status.values()) {
if (queueSize != null && (maxQueueSize == null || queueSize > maxQueueSize)) {
maxQueueSize = queueSize;
}
}
return maxQueueSize;
}

/**
* If longest queue has more than 50k notifications,
* wait until longest queue has 25k notifications before returning.
*
* @throws InterruptedException
*/
public void throttleQueues() throws InterruptedException {
// try to keep queue size managable during restart
int maxSize = throttleStartThreshold;
// track whether any throttles occurred
boolean throttled = false;

while (true) {
final Integer size = getMaxQueueSize();
if (size == null || size <= maxSize) {
// within limit
if (throttled) {
LOGGER.info("[" + getName() + "] done throttling (size = " + size + ")");
}
break;
}

throttled = true;
LOGGER.info("[" + getName() + "]"
+ " queueing throttled until below "
+ throttleStopThreshold
+ " (size = " + size + ")");
// too many messages queued
// set maxSize to stop threshold
maxSize = throttleStopThreshold;
// wait for listener to do some processing
// 5s is a little low, but don't want to wait too long
Thread.sleep(throttleWaitInterval);
}
}

/**
* NOTE: messing with the executors map is not a good idea.
*
Expand All @@ -288,4 +352,13 @@ public Map<NotificationListener, ExecutorService> getExecutors() {
return notificationListeners;
}

public int getThrottleStartThreshold() { return this.throttleStartThreshold; }
public void setThrottleStartThreshold(final int n) { this.throttleStartThreshold = n; }

public int getThrottleStopThreshold() { return this.throttleStopThreshold; }
public void setThrottleStopThreshold(final int n) { this.throttleStopThreshold = n; }

public long getThrottleWaitInterval() { return this.throttleWaitInterval; }
public void setThrottleWaitInterval(final long ms) { this.throttleWaitInterval = ms; }

}

0 comments on commit 59532f3

Please sign in to comment.