Skip to content

Commit

Permalink
Merge branch 'from271' into 'master'
Browse files Browse the repository at this point in the history
From271

See merge request ghsc/hazdev/pdl!133
  • Loading branch information
jmfee-usgs committed Dec 23, 2020
2 parents 379b684 + a092447 commit 35d0a16
Show file tree
Hide file tree
Showing 24 changed files with 702 additions and 639 deletions.
4 changes: 2 additions & 2 deletions code.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"name": "Product Distribution Layer",
"organization": "U.S. Geological Survey",
"description": "Distribution system used for derived earthquake information",
"version": "v2.7.2",
"version": "v2.7.3",
"status": "Production",
"permissions": {
"usageType": "openSource",
Expand All @@ -27,7 +27,7 @@
"email": "[email protected]"
},
"date": {
"metadataLastUpdated": "2020-12-18"
"metadataLastUpdated": "2020-12-23"
}
}
]
24 changes: 22 additions & 2 deletions docs/userguide/configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,12 @@ <h3 id="NotificationReceiver">NotificationReceiver</h3>
notifications are processed in the order they are received.
</dd>

<dt>future</dt>
<dd>
Based on Executor notifier but uses Futures to implement timeouts.
May reduce thread usage.
</dd>

<dt>roundrobin</dt>
<dd>Round robin uses one queue per listener for each product
source+type and takes one product from the front of each queue.
Expand Down Expand Up @@ -1264,17 +1270,31 @@ <h3 id="IndexerListener">IndexerListener</h3>

<h3 id="IndexerModule">IndexerModule</h3>
<dl>
<dt><em>DefaultIndexerModule</em></dt>
<dd>
Automatically configured, and implements ANSS Authoritative region weights.
All other modules extend this class and can use these options:

<dl>
<dt>ignoreRegions</dt>
<dd>
Comma separate list of regions to ignore.<br/>
Products from sources in this list ignore authorative regions,
and instead default to preferred weight <code>1</code>.
</dd>
</dl>
</dd>
<dt>gov.usgs.earthquake.shakemap.ShakeMapIndexerModule</dt>
<dd>
Provides support for
<code>shakemap</code>
type products. There are no configuration parameters for this module.
type products. There are no specific configuration parameters for this module.
</dd>
<dt>gov.usgs.earthquake.momenttensor.MTIndexerModule</dt>
<dd>
Provides support for
<code>moment-tensor</code>
and adjusts the weight of a given product. There are no configuration
and adjusts the weight of a given product. There are no specific configuration
parameters for this module.
</dd>
</dl>
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void onOpen(Session session) throws IOException {
this.session = session;
// start catch up process
try {
LOGGER.fine("[" + getName() + "] Starting catch up");
LOGGER.info("[" + getName() + "] Starting catch up");
sendProductsCreatedAfter();
} catch (Exception e) {
LOGGER.log(
Expand Down Expand Up @@ -173,19 +173,19 @@ synchronized public void onMessage(String message) throws IOException {
protected void onBroadcast(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(
json.getJsonObject("notification"));
LOGGER.fine("[" + getName() + "] onBroadcast(" + notification.getProductId() + ")");
LOGGER.info("[" + getName() + "] onBroadcast(" + notification.getProductId() + ")");

Long broadcastId = json.getJsonObject("notification").getJsonNumber("id").longValue();
if (lastBroadcastId != null && broadcastId != (lastBroadcastId + 1)) {
// sanity check, broadcast ids are expected to increment
// if incoming broadcast is not lastBroadcastId + 1, may have missed a broadcast
LOGGER.finer(
LOGGER.warning(
"[" + getName() + "] broadcast ids out of sequence"
+ " (got " + broadcastId + ", expected " + (lastBroadcastId + 1) + ")");

if (processBroadcast) {
// not in catch up mode, switch back
LOGGER.fine("[" + getName() + "] switching to catch up mode");
LOGGER.info("[" + getName() + "] switching to catch up mode");
processBroadcast = false;
sendProductsCreatedAfter();
}
Expand Down Expand Up @@ -228,7 +228,7 @@ protected void onJsonNotification(final JsonNotification notification) throws Ex
protected void onProduct(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(
json.getJsonObject("notification"));
LOGGER.fine("[" + getName() + "] onProduct(" + notification.getProductId() + ")");
LOGGER.info("[" + getName() + "] onProduct(" + notification.getProductId() + ")");
onJsonNotification(notification);
}

Expand Down
35 changes: 22 additions & 13 deletions src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import javax.json.Json;


import gov.usgs.earthquake.distribution.DefaultNotification;
import gov.usgs.earthquake.distribution.Notification;
import gov.usgs.earthquake.distribution.NotificationIndex;
Expand Down Expand Up @@ -102,7 +103,6 @@ public void configure(final Config config) throws Exception {

setTable(config.getProperty("table", DEFAULT_TABLE));
LOGGER.config("[" + getName() + "] driver=" + getDriver());
LOGGER.config("[" + getName() + "] networkTimeout=" + getNetworkTimeout());
LOGGER.config("[" + getName() + "] table=" + getTable());
// do not log url, it may contain user/pass
}
Expand Down Expand Up @@ -131,6 +131,7 @@ public boolean schemaExists() throws Exception {
beginTransaction();
try (final PreparedStatement test = getConnection().prepareStatement(sql)) {
// should throw exception if table does not exist
test.setQueryTimeout(60);
try (final ResultSet rs = test.executeQuery()) {
rs.next();
}
Expand Down Expand Up @@ -194,7 +195,7 @@ public void createSchema() throws Exception {
* TrackerURLs are ignored.
*/
@Override
public void addNotification(Notification notification)
public synchronized void addNotification(Notification notification)
throws Exception {
// all notifications
Instant expires = notification.getExpirationDate().toInstant();
Expand All @@ -220,6 +221,7 @@ public void addNotification(Notification notification)
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?)")
) {
try {
statement.setQueryTimeout(60);
// set parameters
statement.setString(1, created != null ? created.toString() : "");
statement.setString(2, expires.toString());
Expand Down Expand Up @@ -255,7 +257,7 @@ public void addNotification(Notification notification)
* Tracker URLs are ignored.
*/
@Override
public void removeNotification(Notification notification) throws Exception {
public synchronized void removeNotification(Notification notification) throws Exception {
// all notifications
Instant expires = notification.getExpirationDate().toInstant();
ProductId id = notification.getProductId();
Expand All @@ -279,6 +281,7 @@ public void removeNotification(Notification notification) throws Exception {
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(60);
// set parameters
statement.setString(1, created != null ? created.toString() : "");
statement.setString(2, expires.toString());
Expand Down Expand Up @@ -318,7 +321,7 @@ public void removeNotification(Notification notification) throws Exception {
* @return list with matching notifications, empty if not found.
*/
@Override
public List<Notification> findNotifications(
public synchronized List<Notification> findNotifications(
String source, String type, String code) throws Exception {
final ArrayList<Object> where = new ArrayList<Object>();
final ArrayList<String> values = new ArrayList<String>();
Expand All @@ -342,6 +345,7 @@ public List<Notification> findNotifications(
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1000);

// set parameters
for (int i = 0, len=values.size(); i < len; i++) {
Expand Down Expand Up @@ -377,7 +381,7 @@ public List<Notification> findNotifications(
* @return list with matching notifications, empty if not found.
*/
@Override
public List<Notification> findNotifications(
public synchronized List<Notification> findNotifications(
List<String> sources, List<String> types, List<String> codes)
throws Exception {
final ArrayList<Object> where = new ArrayList<Object>();
Expand Down Expand Up @@ -425,6 +429,8 @@ public List<Notification> findNotifications(
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1000);

// set parameters
for (int i = 0, len=values.size(); i < len; i++) {
statement.setString(i+1, values.get(i));
Expand Down Expand Up @@ -453,12 +459,14 @@ public List<Notification> findNotifications(
* @return list with matching notifications, empty if not found.
*/
@Override
public List<Notification> findExpiredNotifications() throws Exception {
final String sql = "SELECT * FROM " + this.table + " WHERE expires <= ?";
public synchronized List<Notification> findExpiredNotifications() throws Exception {
final String sql = "SELECT * FROM " + this.table + " WHERE expires <= ? LIMIT 1000";
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1000);

// set parameters
statement.setString(1, Instant.now().toString());

Expand All @@ -477,6 +485,7 @@ public List<Notification> findExpiredNotifications() throws Exception {
}
}
return new ArrayList<Notification>();

}

/**
Expand All @@ -487,13 +496,14 @@ public List<Notification> findExpiredNotifications() throws Exception {
* @return list with matching notifications, empty if not found.
*/
@Override
public List<Notification> findNotifications(ProductId id) throws Exception {
public synchronized List<Notification> findNotifications(ProductId id) throws Exception {
final String sql = "SELECT * FROM " + this.table
+ " WHERE source=? AND type=? AND code=? AND updatetime=?";
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(30);
// set parameters
statement.setString(1, id.getSource());
statement.setString(2, id.getType());
Expand Down Expand Up @@ -533,8 +543,8 @@ public List<Notification> findNotifications(ProductId id) throws Exception {
* other table.
* @throws Exception
*/
public List<Notification> getMissingNotifications( final String otherTable)
throws Exception {
public synchronized List<Notification> getMissingNotifications(
final String otherTable) throws Exception {
// this is used to requeue a notification index.
// run query in a way that returns list of default notifications,
// (by returning empty created, data, and url)
Expand All @@ -552,6 +562,7 @@ public List<Notification> getMissingNotifications( final String otherTable)
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
try {
statement.setQueryTimeout(1000);
// execute and commit if successful
final List<Notification> notifications = getNotifications(statement);
commitTransaction();
Expand All @@ -571,10 +582,8 @@ public List<Notification> getMissingNotifications( final String otherTable)

/**
* Parse notifications from a statement ready to be executed.
*
* Should be called in a transaction.
*/
protected List<Notification> getNotifications(PreparedStatement ps)
protected synchronized List<Notification> getNotifications(PreparedStatement ps)
throws Exception {
final List<Notification> n = new ArrayList<Notification>();
try (final ResultSet rs = ps.executeQuery()) {
Expand Down
11 changes: 7 additions & 4 deletions src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public void configure(final Config config) throws Exception {

setTable(config.getProperty("table", DEFAULT_TABLE));
LOGGER.config("[" + getName() + "] driver=" + getDriver());
LOGGER.config("[" + getName() + "] networkTimeout=" + getNetworkTimeout());
LOGGER.config("[" + getName() + "] table=" + getTable());
// do not log url, it may contain user/pass
}
Expand Down Expand Up @@ -119,6 +118,7 @@ public boolean schemaExists() throws Exception {
final String sql = "select * from " + this.table + " limit 1";
beginTransaction();
try (final PreparedStatement test = getConnection().prepareStatement(sql)) {
test.setQueryTimeout(60);
// should throw exception if table does not exist
try (final ResultSet rs = test.executeQuery()) {
rs.next();
Expand Down Expand Up @@ -185,13 +185,14 @@ public boolean hasProduct(ProductId id) throws Exception {
* @return product if found, otherwise null.
*/
@Override
public Product getProduct(ProductId id) throws Exception {
public synchronized Product getProduct(ProductId id) throws Exception {
Product product = null;
final String sql = "SELECT * FROM " + this.table
+ " WHERE source=? AND type=? AND code=? AND updatetime=?";
// prepare statement
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
statement.setQueryTimeout(60);
// set parameters
statement.setString(1, id.getSource());
statement.setString(2, id.getType());
Expand Down Expand Up @@ -233,7 +234,7 @@ public Product getProduct(ProductId id) throws Exception {
* if product already in storage.
*/
@Override
public ProductId storeProduct(Product product) throws Exception {
public synchronized ProductId storeProduct(Product product) throws Exception {
// prepare statement
beginTransaction();
try (
Expand All @@ -242,6 +243,7 @@ public ProductId storeProduct(Product product) throws Exception {
+ " (source, type, code, updatetime, data)"
+ " VALUES (?, ?, ?, ?, ?)")
) {
statement.setQueryTimeout(60);
final ProductId id = product.getId();
// set parameters
statement.setString(1, id.getSource());
Expand Down Expand Up @@ -303,12 +305,13 @@ public ProductId storeProductSource(ProductSource input) throws Exception {
* Remove product from storage.
*/
@Override
public void removeProduct(ProductId id) throws Exception {
public synchronized void removeProduct(ProductId id) throws Exception {
// prepare statement
final String sql = "DELETE FROM " + this.table
+ " WHERE source=? AND type=? AND code=? AND updatetime=?";
beginTransaction();
try (final PreparedStatement statement = getConnection().prepareStatement(sql)) {
statement.setQueryTimeout(60);
// set parameters
statement.setString(1, id.getSource());
statement.setString(2, id.getType());
Expand Down
Loading

0 comments on commit 35d0a16

Please sign in to comment.