Skip to content

Commit

Permalink
Add throttling during catch up process, add tests for aws product rec…
Browse files Browse the repository at this point in the history
…eiver
  • Loading branch information
jmfee-usgs committed Dec 16, 2020
1 parent 59532f3 commit 70edea1
Show file tree
Hide file tree
Showing 5 changed files with 582 additions and 2 deletions.
91 changes: 89 additions & 2 deletions src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ public class AwsProductReceiver extends DefaultNotificationReceiver implements W
protected Long lastBroadcastId = null;
protected boolean processBroadcast = false;

/** Used to coordinate sending products_created_after message. */
protected boolean sendProductsCreatedAfterFlag = false;
protected boolean sendProductsCreatedAfterRunning = false;
protected Object sendProductsCreatedAfterSync = new Object();
protected Thread sendProductsCreatedAfterThread;


@Override
public void configure(Config config) throws Exception {
Expand Down Expand Up @@ -260,6 +266,81 @@ protected void onProductsCreatedAfter(final JsonObject json) throws Exception {
}
}

/**
* Request background thread to send "products_created_after" message.
*
* @throws IOException
*/
protected void sendProductsCreatedAfter() throws IOException {
synchronized(sendProductsCreatedAfterSync) {
// set flag that we want to send products created after
sendProductsCreatedAfterFlag = true;
// wake up background thread that sends message
sendProductsCreatedAfterSync.notifyAll();
}
}

/**
* Start background thread that sends "products_created_after" messages.
*
* @return started thread.
*/
protected void startSendProductsCreatedAfterThread() {
if (sendProductsCreatedAfterThread != null) {
throw new IllegalStateException("sendProductsCreatedThread already exists");
}
sendProductsCreatedAfterFlag = false;
sendProductsCreatedAfterRunning = true;
sendProductsCreatedAfterThread = new Thread(() -> {
while (sendProductsCreatedAfterRunning) {
try {
synchronized (sendProductsCreatedAfterSync) {
if (!sendProductsCreatedAfterFlag) {
// wait until ready to send
sendProductsCreatedAfterSync.wait();
continue;
}
}

// ready to send, try to keep queue size manageable
throttleQueues();

synchronized (sendProductsCreatedAfterSync) {
// now send actual products created after message
try {
_sendProductsCreatedAfter();
// message sent, reset flag
sendProductsCreatedAfterFlag = false;
} catch (IOException e) {
LOGGER.log(
Level.WARNING,
"[" + getName() + "] Exception sending products_created_after",
e);
}
}
} catch (InterruptedException ie) {
// interrupted usually means shutting down thread
}
}
});
sendProductsCreatedAfterThread.start();
}

protected void stopProductsCreatedAfterThread() {
try {
sendProductsCreatedAfterRunning = false;
sendProductsCreatedAfterThread.interrupt();
sendProductsCreatedAfterThread.join();
} catch (Exception e) {
LOGGER.log(
Level.WARNING,
"[" + getName() + "] exception stopping sendProductsCreatedAfterThread",
e);
} finally {
sendProductsCreatedAfterThread = null;
}
}

/**
* Send an "action"="products_created_after" request, which is part of the
* catch up process.
Expand All @@ -268,7 +349,7 @@ protected void onProductsCreatedAfter(final JsonObject json) throws Exception {
* then one "action"="products_created_after" message to indicate the request
* is complete.
*/
protected void sendProductsCreatedAfter() throws IOException {
protected void _sendProductsCreatedAfter() throws IOException {
// set default for created after
if (this.createdAfter == null) {
this.createdAfter = Instant.now().minusSeconds(7 * 86400);
Expand Down Expand Up @@ -323,6 +404,9 @@ public void startup() throws Exception{
createdAfter = Instant.parse(json.getString(CREATED_AFTER_PROPERTY));
}

// start background thread for products_create_after messages
startSendProductsCreatedAfterThread();

//open websocket
client = new WebSocketClient(uri, this, attempts, timeout, true);
}
Expand All @@ -333,8 +417,11 @@ public void startup() throws Exception{
*/
@Override
public void shutdown() throws Exception{
stopProductsCreatedAfterThread();
//close socket
client.shutdown();
try {
client.shutdown();
} catch (Exception e) {}
super.shutdown();
}

Expand Down
194 changes: 194 additions & 0 deletions src/test/java/gov/usgs/earthquake/aws/AwsProductReceiverTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package gov.usgs.earthquake.aws;

import java.time.Instant;

import javax.json.Json;
import javax.json.JsonObject;

import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import gov.usgs.earthquake.distribution.Notification;
import gov.usgs.earthquake.product.Product;
import gov.usgs.earthquake.product.ProductId;
import gov.usgs.earthquake.product.io.JsonProduct;

public class AwsProductReceiverTest {

TestAwsProductReceiver receiver;

@BeforeEach
public void before() throws Exception {
receiver = new TestAwsProductReceiver();
receiver.startSendProductsCreatedAfterThread();
}

@AfterEach
public void after() throws Exception {
receiver.stopProductsCreatedAfterThread();
receiver = null;
}

@Test
public void testSwitchToBroadcast() throws Exception {
TestSession testSession = new TestSession();
// connect
receiver.onOpen(testSession);

// receive broadcast
Instant created = Instant.now();
receiver.onMessage(getNotification("broadcast", 10, created).toString());
Assert.assertFalse("not in broadcast mode yet", receiver.isProcessBroadcast());
Assert.assertNull("didn't process broadcast", receiver.lastJsonNotification);
Assert.assertEquals("saved broadcast id",
Long.valueOf(10L), receiver.getLastBroadcastId());

// receive response to products_created_after
receiver.onMessage(getNotification("product", 10, created).toString());
Assert.assertNotNull(
"processed product during catch up",
receiver.lastJsonNotification);

// receive end of products_created_after response
receiver.onMessage(getProductsCreatedAfter(created, 1).toString());
Assert.assertTrue("switched to broadcast mode", receiver.isProcessBroadcast());
}


@Test
public void testBroadcastOutOfOrder() throws Exception {
TestSession testSession = new TestSession();
// connect
receiver.onOpen(testSession);
// enable broadcast mode
receiver.setProcessBroadcast(true);
receiver.setLastBroadcastId(10L);

// receive broadcast in order
receiver.onMessage(getNotification("broadcast", 11, Instant.now()).toString());
Assert.assertTrue("still in broadcast mode", receiver.isProcessBroadcast());
Assert.assertNotNull("processed broadcast", receiver.lastJsonNotification);
Assert.assertEquals("saved broadcast id",
Long.valueOf(11L), receiver.getLastBroadcastId());

// clear any previous products created after message
testSession.testBasicRemote.lastSendText = null;
receiver.lastJsonNotification = null;

// receive broadcast out of order
receiver.onMessage(getNotification("broadcast", 13, Instant.now()).toString());
Assert.assertFalse("no longer in broadcast mode", receiver.isProcessBroadcast());
Assert.assertNull("did not broadcast", receiver.lastJsonNotification);
Assert.assertEquals("still saved broadcast id",
Long.valueOf(13L), receiver.getLastBroadcastId());
String sent = testSession.waitForBasicSendText(100L);
Assert.assertTrue(
"sent products_created_after",
sent.contains("\"action\":\"products_created_after\""));
}

@Test
public void testStartCatchUpWhenConnected() throws Exception {
// set flags being tested to opposite values
receiver.setProcessBroadcast(true);
TestSession testSession = new TestSession();

// call onOpen to simulate connection
receiver.onOpen(testSession);

// processBroadcast disabled and created after request sent
Assert.assertFalse("not in process broadcast mode", receiver.isProcessBroadcast());

String sent = testSession.waitForBasicSendText(1000L);
Assert.assertTrue(
"sent products_created_after",
sent.contains("\"action\":\"products_created_after\""));
}

@Test
public void testThrottleQueue() throws Exception {
TestSession testSession = new TestSession();
TestListenerNotifier testNotifier = new TestListenerNotifier(receiver);
receiver.setNotifier(testNotifier);
// simulate queue that needs to be throttled
testNotifier.queueSize = 5001;
testNotifier.setThrottleStartThreshold(5000);
testNotifier.setThrottleStopThreshold(2500);
testNotifier.setThrottleWaitInterval(100L);

// call onOpen to simulate connection
receiver.onOpen(testSession);
Assert.assertNull(
"throttling should prevent message",
testSession.waitForBasicSendText(100L));

// now simulate queue that is done throttling
testNotifier.queueSize = 2499;
String sent = testSession.waitForBasicSendText(500L);
Assert.assertTrue(
"sent products_created_after",
sent.contains("\"action\":\"products_created_after\""));
}

static JsonObject getNotification(final String action, final long id, final Instant created) throws Exception {
Product product = new Product(new ProductId("source", "type", "code"));
return Json.createObjectBuilder()
.add("action", action)
.add("notification",
Json.createObjectBuilder()
.add("id", id)
.add("created", created.toString())
.add("product", new JsonProduct().getJsonObject(product)))
.build();
}

static JsonObject getProductsCreatedAfter(final Instant createdAfter, final int count) {
return Json.createObjectBuilder()
.add("action", "products_created_after")
.add("created_after", createdAfter.toString())
.add("count", count)
.build();
}

/**
* Stub socket connections to test message handling behavior.
*/
static class TestAwsProductReceiver extends AwsProductReceiver {
public JsonNotification lastJsonNotification;
public boolean onJsonNotificationCalled = false;

@Override
protected void onJsonNotification(JsonNotification notification) throws Exception {
onJsonNotificationCalled = true;
lastJsonNotification = notification;
super.onJsonNotification(notification);
}

@Override
public void receiveNotification(Notification notification) throws Exception {
// skip actual processing
}

@Override
public void writeTrackingData() {
// skip tracking
}

// getter/setter to control state for testing

public Instant getCreatedAfter() { return this.createdAfter; }
public void setCreatedAfter(final Instant c) { this.createdAfter = c; }

public JsonNotification getLastBroadcast() { return this.lastBroadcast; }
public void setLastBroadcast(final JsonNotification j) { this.lastBroadcast = j; }

public Long getLastBroadcastId() { return this.lastBroadcastId; }
public void setLastBroadcastId(final Long l) { this.lastBroadcastId = l; }

public boolean isProcessBroadcast() { return this.processBroadcast; }
public void setProcessBroadcast(final boolean b) { this.processBroadcast = b; }
}

}
Loading

0 comments on commit 70edea1

Please sign in to comment.