Skip to content

Commit

Permalink
Merge branch 'catch-up-updates' into 'master'
Browse files Browse the repository at this point in the history
Catch up updates

See merge request ghsc/hazdev/pdl!134
  • Loading branch information
jmfee-usgs committed Dec 24, 2020
2 parents 35d0a16 + 4cc977c commit 713c5e5
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 129 deletions.
2 changes: 1 addition & 1 deletion code.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"name": "Product Distribution Layer",
"organization": "U.S. Geological Survey",
"description": "Distribution system used for derived earthquake information",
"version": "v2.7.3",
"version": "v2.7.4",
"status": "Production",
"permissions": {
"usageType": "openSource",
Expand Down
258 changes: 153 additions & 105 deletions src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.io.StringReader;
import java.net.URI;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -28,7 +29,7 @@
* last product matches the last broadcast or there are no more products after
* the latest notification "created" timestamp.
*/
public class AwsProductReceiver extends DefaultNotificationReceiver implements WebSocketListener {
public class AwsProductReceiver extends DefaultNotificationReceiver implements Runnable, WebSocketListener {

public static final Logger LOGGER = Logger
.getLogger(AwsProductReceiver.class.getName());
Expand Down Expand Up @@ -57,16 +58,22 @@ public class AwsProductReceiver extends DefaultNotificationReceiver implements W

/* µs timestamp of last message that has been processed */
protected Instant createdAfter = null;
/* last broadcast message that has been processed (used for catch up) */
protected JsonNotification lastBroadcast = null;
protected Long lastBroadcastId = null;
/* whether to process broadcast messages (after catching up). */
protected boolean processBroadcast = false;

/** Used to coordinate sending products_created_after message. */
protected boolean sendProductsCreatedAfterFlag = false;
protected boolean sendProductsCreatedAfterRunning = false;
protected Object sendProductsCreatedAfterSync = new Object();
protected Thread sendProductsCreatedAfterThread;

/* whether currenting catching up. */
protected boolean catchUpRunning = false;
/* sync object for catchUp state. */
protected final Object catchUpSync = new Object();
/* thread where catch up process runs. */
protected Thread catchUpThread = null;
/* whether thread should continue running (shutdown flag) */
protected boolean catchUpThreadRunning = false;
/* last catch up message sent (for response timeouts) */
protected Instant lastCatchUpSent = null;

@Override
public void configure(Config config) throws Exception {
Expand Down Expand Up @@ -110,20 +117,13 @@ public void configure(Config config) throws Exception {
@Override
public void onOpen(Session session) throws IOException {
LOGGER.info("[" + getName() + "] onOpen connection_id=" + session.getId());
// ignore broadcast until caught up
processBroadcast = false;
// save session
this.session = session;
// start catch up process
try {
LOGGER.info("[" + getName() + "] Starting catch up");
sendProductsCreatedAfter();
} catch (Exception e) {
LOGGER.log(
Level.WARNING,
"[" + getName() + "] exception while starting catch up",
e);
}
LOGGER.info("[" + getName() + "] Starting catch up");
// ignore broadcast until caught up
processBroadcast = false;
startCatchUp();
}

/**
Expand Down Expand Up @@ -173,22 +173,22 @@ synchronized public void onMessage(String message) throws IOException {
protected void onBroadcast(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(
json.getJsonObject("notification"));
LOGGER.info("[" + getName() + "] onBroadcast(" + notification.getProductId() + ")");

Long broadcastId = json.getJsonObject("notification").getJsonNumber("id").longValue();
if (lastBroadcastId != null && broadcastId != (lastBroadcastId + 1)) {
// sanity check, broadcast ids are expected to increment
// if incoming broadcast is not lastBroadcastId + 1, may have missed a broadcast
LOGGER.warning(
"[" + getName() + "] broadcast ids out of sequence"
+ " (got " + broadcastId + ", expected " + (lastBroadcastId + 1) + ")");

if (processBroadcast) {
// not in catch up mode, switch back
LOGGER.info("[" + getName() + "] switching to catch up mode");
processBroadcast = false;
sendProductsCreatedAfter();
}
LOGGER.finer("[" + getName() + "]"
+ " onBroadcast(" + notification.getProductId() + ")"
+ " sequence=" + broadcastId + ", lastSequence=" + lastBroadcastId);

if (processBroadcast &&
// sanity check, broadcast ids are expected to increment
lastBroadcastId != null
&& broadcastId != (lastBroadcastId + 1)
) {
// may have missed message
LOGGER.info("[" + getName() + "] broadcast ids out of sequence"
+ " (at " + lastBroadcastId + ", received " + broadcastId + ")"
+ ", switching to catch up mode");
processBroadcast = false;
startCatchUp();
}

// track last broadcast for catch up process (as long as newer)
Expand Down Expand Up @@ -228,7 +228,7 @@ protected void onJsonNotification(final JsonNotification notification) throws Ex
protected void onProduct(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(
json.getJsonObject("notification"));
LOGGER.info("[" + getName() + "] onProduct(" + notification.getProductId() + ")");
LOGGER.finer("[" + getName() + "] onProduct(" + notification.getProductId() + ")");
onJsonNotification(notification);
}

Expand All @@ -248,6 +248,11 @@ protected void onProductsCreatedAfter(final JsonObject json) throws Exception {
final int count = json.getInt("count");
LOGGER.finer("[" + getName() + "] onProductsCreatedAfter(" + after
+ ", " + count + " products)");

// notify background thread that a response was received,
// as well as pausing messages until restarted below (if needed)
stopCatchUp();

// check whether caught up
if (
// if a broadcast received during catchup,
Expand All @@ -262,82 +267,63 @@ protected void onProductsCreatedAfter(final JsonObject json) throws Exception {
processBroadcast = true;
} else {
// keep catching up
sendProductsCreatedAfter();
startCatchUp();
}
}

/**
* Request background thread to send "products_created_after" message.
* Catch up process.
*
* @throws IOException
*/
protected void sendProductsCreatedAfter() throws IOException {
synchronized(sendProductsCreatedAfterSync) {
// set flag that we want to send products created after
sendProductsCreatedAfterFlag = true;
// wake up background thread that sends message
sendProductsCreatedAfterSync.notifyAll();
}
}

/**
* Start background thread that sends "products_created_after" messages.
* Do not run directly, use {@link #startCatchUpThread()} and
* {@link #stopCatchUpThread()} to start and stop the process.
*
* @return started thread.
* Process waits until {@link #startCatchUp()} is called,
* and uses {@link #throttleQueues()} between sends.
*/
protected void startSendProductsCreatedAfterThread() {
if (sendProductsCreatedAfterThread != null) {
throw new IllegalStateException("sendProductsCreatedThread already exists");
}
sendProductsCreatedAfterFlag = false;
sendProductsCreatedAfterRunning = true;
sendProductsCreatedAfterThread = new Thread(() -> {
while (sendProductsCreatedAfterRunning) {
try {
synchronized (sendProductsCreatedAfterSync) {
if (!sendProductsCreatedAfterFlag) {
// wait until ready to send
sendProductsCreatedAfterSync.wait();
@Override
public void run() {
while (catchUpThreadRunning) {
try {
synchronized (catchUpSync) {
if (!catchUpRunning) {
catchUpSync.wait();
continue;
}
if (lastCatchUpSent != null) {
// message already sent, wait for timeout
Instant now = Instant.now();
Instant timeout = lastCatchUpSent.plus(60, ChronoUnit.SECONDS);
if (now.isBefore(timeout)) {
catchUpSync.wait(now.until(timeout, ChronoUnit.MILLIS));
continue;
} else {
// timed out
LOGGER.warning("No products_created_response"
+ ", sent at " + lastCatchUpSent.toString());
// fall through
}
}
}

// ready to send, try to keep queue size manageable
throttleQueues();

synchronized (sendProductsCreatedAfterSync) {
// now send actual products created after message
try {
_sendProductsCreatedAfter();
// message sent, reset flag
sendProductsCreatedAfterFlag = false;
} catch (IOException e) {
LOGGER.log(
Level.WARNING,
"[" + getName() + "] Exception sending products_created_after",
e);
}
// ready to send, but block until done throttling
throttleQueues();

try {
synchronized (catchUpSync) {
// track when sent
lastCatchUpSent = Instant.now();
sendProductsCreatedAfter();
}
} catch (Exception e){
LOGGER.log(Level.WARNING, "Exception sending products_created_after", e);
if (catchUpThreadRunning && catchUpRunning) {
// wait before next attempt
Thread.sleep(5000);
}
} catch (InterruptedException ie) {
// interrupted usually means shutting down thread
}
} catch (InterruptedException e) {
// probably stopping
}
});
sendProductsCreatedAfterThread.start();
}

protected void stopProductsCreatedAfterThread() {
try {
sendProductsCreatedAfterRunning = false;
sendProductsCreatedAfterThread.interrupt();
sendProductsCreatedAfterThread.join();
} catch (Exception e) {
LOGGER.log(
Level.WARNING,
"[" + getName() + "] exception stopping sendProductsCreatedAfterThread",
e);
} finally {
sendProductsCreatedAfterThread = null;
}
}

Expand All @@ -349,7 +335,7 @@ protected void stopProductsCreatedAfterThread() {
* then one "action"="products_created_after" message to indicate the request
* is complete.
*/
protected void _sendProductsCreatedAfter() throws IOException {
protected void sendProductsCreatedAfter() throws IOException {
// set default for created after
if (this.createdAfter == null) {
this.createdAfter = Instant.now().minusSeconds(7 * 86400);
Expand All @@ -364,6 +350,66 @@ protected void _sendProductsCreatedAfter() throws IOException {
this.session.getBasicRemote().sendText(request);
}


/**
* Notify running background thread to start catch up process.
*/
protected void startCatchUp() {
// notify background thread to start catch up
synchronized (catchUpSync) {
catchUpRunning = true;
catchUpSync.notify();
}
}

/**
* Start background thread for catch up process.
*/
protected void startCatchUpThread() {
if (catchUpThread != null) {
throw new IllegalStateException("catchUp thread already started");
}
synchronized (catchUpSync) {
catchUpThreadRunning = true;
catchUpThread = new Thread(this);
}
catchUpThread.start();
}

/**
* Notify running background thread to stop catch up process.
*/
protected void stopCatchUp() {
synchronized (catchUpSync) {
// stop catch up
catchUpRunning = false;
// clear sent time
lastCatchUpSent = null;
catchUpSync.notify();
}
}

/**
* Stop background thread for catch up process.
*/
protected void stopCatchUpThread() {
// stop catch up thread
try {
synchronized (catchUpSync) {
// orderly shutdown
catchUpThreadRunning = false;
catchUpSync.notify();
}
// interrupt just in case
catchUpThread.interrupt();
catchUpThread.join();
} catch (Exception e) {
LOGGER.log(Level.INFO, "Error stopping catchUpThread", e);
} finally {
catchUpThread = null;
}
}

/**
* Called when connection is closed, either because shutdown on this end or
* closed by server.
Expand Down Expand Up @@ -404,21 +450,22 @@ public void startup() throws Exception{
createdAfter = Instant.parse(json.getString(CREATED_AFTER_PROPERTY));
}

// start background thread for products_create_after messages
startSendProductsCreatedAfterThread();

//open websocket
// open websocket
client = new WebSocketClient(uri, this, attempts, timeout, true);

// start catch up process
startCatchUpThread();
}

/**
* Closes web socket
* @throws Exception
*/
@Override
public void shutdown() throws Exception{
stopProductsCreatedAfterThread();
//close socket
public void shutdown() throws Exception {
// stop catch up process
stopCatchUpThread();
// close socket
try {
client.shutdown();
} catch (Exception e) {}
Expand Down Expand Up @@ -489,4 +536,5 @@ public long getTimeout() {
public void setTimeout(final long timeout) {
this.timeout = timeout;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class ProductClient extends DefaultConfigurable implements
ProductClientMBean, Bootstrappable {

/** The "release" version number. */
public static final String RELEASE_VERSION = "Version 2.7.3 2020-12-22";
public static final String RELEASE_VERSION = "Version 2.7.4 2020-12-23";

/** Property name used on products for current RELEASE_VERSION. */
public static final String PDL_CLIENT_VERSION_PROPERTY = "pdl-client-version";
Expand Down
Loading

0 comments on commit 713c5e5

Please sign in to comment.