Skip to content

Commit

Permalink
Add sequence number sanity check to aws product receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
jmfee-usgs committed Dec 16, 2020
1 parent 357e73e commit fe2eee8
Showing 1 changed file with 31 additions and 7 deletions.
38 changes: 31 additions & 7 deletions src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ public class AwsProductReceiver extends DefaultNotificationReceiver implements W
private Session session;

/* µs timestamp of last message that has been processed */
private Instant createdAfter = null;
private JsonNotification lastBroadcast = null;
private boolean processBroadcast = false;
protected Instant createdAfter = null;
protected JsonNotification lastBroadcast = null;
protected Long lastBroadcastId = null;
protected boolean processBroadcast = false;


@Override
public void configure(Config config) throws Exception {
Expand Down Expand Up @@ -165,12 +167,34 @@ synchronized public void onMessage(String message) throws IOException {
protected void onBroadcast(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(
json.getJsonObject("notification"));
lastBroadcast = notification;
LOGGER.info("[" + getName() + "] onBroadcast(" + notification.getProductId() + ")");
if (!processBroadcast) {
return;

Long broadcastId = json.getJsonObject("notification").getJsonNumber("id").longValue();
if (lastBroadcastId != null && broadcastId != (lastBroadcastId + 1)) {
// sanity check, broadcast ids are expected to increment
// if incoming broadcast is not lastBroadcastId + 1, may have missed a broadcast
LOGGER.warning(
"[" + getName() + "] broadcast ids out of sequence"
+ " (got " + broadcastId + ", expected " + (lastBroadcastId + 1) + ")");

if (processBroadcast) {
// not in catch up mode, switch back
LOGGER.info("[" + getName() + "] switching to catch up mode");
processBroadcast = false;
sendProductsCreatedAfter();
}
}

// track last broadcast for catch up process (as long as newer)
if (lastBroadcastId == null || broadcastId > lastBroadcastId) {
lastBroadcastId = broadcastId;
lastBroadcast = notification;
}

// process message if not in catch up mode
if (processBroadcast) {
onJsonNotification(notification);
}
onJsonNotification(notification);
}

/**
Expand Down

0 comments on commit fe2eee8

Please sign in to comment.