Skip to content

Commit

Permalink
Merge branch 'synchronize-has-product' into 'master'
Browse files Browse the repository at this point in the history
Synchronize has product

See merge request ghsc/hazdev/pdl!132
  • Loading branch information
jmfee-usgs committed Dec 18, 2020
2 parents 8183231 + b71882a commit 379b684
Show file tree
Hide file tree
Showing 18 changed files with 701 additions and 593 deletions.
4 changes: 2 additions & 2 deletions code.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"name": "Product Distribution Layer",
"organization": "U.S. Geological Survey",
"description": "Distribution system used for derived earthquake information",
"version": "v2.7.1",
"version": "v2.7.2",
"status": "Production",
"permissions": {
"usageType": "openSource",
Expand All @@ -27,7 +27,7 @@
"email": "[email protected]"
},
"date": {
"metadataLastUpdated": "2020-12-15"
"metadataLastUpdated": "2020-12-18"
}
}
]
10 changes: 5 additions & 5 deletions src/main/java/gov/usgs/earthquake/aws/AwsProductReceiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void onOpen(Session session) throws IOException {
this.session = session;
// start catch up process
try {
LOGGER.info("[" + getName() + "] Starting catch up");
LOGGER.fine("[" + getName() + "] Starting catch up");
sendProductsCreatedAfter();
} catch (Exception e) {
LOGGER.log(
Expand Down Expand Up @@ -173,19 +173,19 @@ synchronized public void onMessage(String message) throws IOException {
protected void onBroadcast(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(
json.getJsonObject("notification"));
LOGGER.info("[" + getName() + "] onBroadcast(" + notification.getProductId() + ")");
LOGGER.fine("[" + getName() + "] onBroadcast(" + notification.getProductId() + ")");

Long broadcastId = json.getJsonObject("notification").getJsonNumber("id").longValue();
if (lastBroadcastId != null && broadcastId != (lastBroadcastId + 1)) {
// sanity check, broadcast ids are expected to increment
// if incoming broadcast is not lastBroadcastId + 1, may have missed a broadcast
LOGGER.warning(
LOGGER.finer(
"[" + getName() + "] broadcast ids out of sequence"
+ " (got " + broadcastId + ", expected " + (lastBroadcastId + 1) + ")");

if (processBroadcast) {
// not in catch up mode, switch back
LOGGER.info("[" + getName() + "] switching to catch up mode");
LOGGER.fine("[" + getName() + "] switching to catch up mode");
processBroadcast = false;
sendProductsCreatedAfter();
}
Expand Down Expand Up @@ -228,7 +228,7 @@ protected void onJsonNotification(final JsonNotification notification) throws Ex
protected void onProduct(final JsonObject json) throws Exception {
final JsonNotification notification = new JsonNotification(
json.getJsonObject("notification"));
LOGGER.info("[" + getName() + "] onProduct(" + notification.getProductId() + ")");
LOGGER.fine("[" + getName() + "] onProduct(" + notification.getProductId() + ")");
onJsonNotification(notification);
}

Expand Down
62 changes: 21 additions & 41 deletions src/main/java/gov/usgs/earthquake/aws/JsonNotificationIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import java.io.ByteArrayInputStream;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand All @@ -19,7 +17,6 @@

import javax.json.Json;


import gov.usgs.earthquake.distribution.DefaultNotification;
import gov.usgs.earthquake.distribution.Notification;
import gov.usgs.earthquake.distribution.NotificationIndex;
Expand Down Expand Up @@ -68,12 +65,8 @@ public class JsonNotificationIndex
public static final String DEFAULT_URL =
"jdbc:sqlite:json_notification_index.db";

/** JDBC driver classname. */
private String driver;
/** Database table name. */
private String table;
/** JDBC database connect url. */
private String url;

/**
* Construct a JsonNotification using defaults.
Expand All @@ -94,40 +87,26 @@ public JsonNotificationIndex(final String driver, final String url) {
*/
public JsonNotificationIndex(
final String driver, final String url, final String table) {
this.driver = driver;
super(driver, url);
this.table = table;
this.url = url;
}

public String getDriver() { return this.driver; }
public String getTable() { return this.table; }
public String getUrl() { return this.url; }
public void setDriver(final String driver) { this.driver = driver; }
public void setTable(final String table) { this.table = table; }
public void setUrl(final String url) { this.url = url; }

@Override
public void configure(final Config config) throws Exception {
driver = config.getProperty("driver", DEFAULT_DRIVER);
LOGGER.config("[" + getName() + "] driver=" + driver);
table = config.getProperty("table", DEFAULT_TABLE);
LOGGER.config("[" + getName() + "] table=" + table);
url = config.getProperty("url", DEFAULT_URL);
super.configure(config);
if (getDriver() == null) { setDriver(DEFAULT_DRIVER); }
if (getUrl() == null) { setUrl(DEFAULT_URL); }

setTable(config.getProperty("table", DEFAULT_TABLE));
LOGGER.config("[" + getName() + "] driver=" + getDriver());
LOGGER.config("[" + getName() + "] networkTimeout=" + getNetworkTimeout());
LOGGER.config("[" + getName() + "] table=" + getTable());
// do not log url, it may contain user/pass
}

/**
* Connect to database.
*
* Implements abstract JDBCConnection method.
*/
@Override
protected Connection connect() throws Exception {
// load driver if needed
Class.forName(driver);
return DriverManager.getConnection(url);
}

/**
* After normal startup, check whether schema exists and attempt to create.
*/
Expand Down Expand Up @@ -178,7 +157,7 @@ public void createSchema() throws Exception {
try (final Statement statement = getConnection().createStatement()) {
String autoIncrement = "";
String engine = "";
if (driver.contains("mysql")) {
if (getDriver().contains("mysql")) {
autoIncrement = " AUTO_INCREMENT";
engine = " ENGINE=innodb CHARSET=utf8";
}
Expand Down Expand Up @@ -215,7 +194,7 @@ public void createSchema() throws Exception {
* TrackerURLs are ignored.
*/
@Override
public synchronized void addNotification(Notification notification)
public void addNotification(Notification notification)
throws Exception {
// all notifications
Instant expires = notification.getExpirationDate().toInstant();
Expand Down Expand Up @@ -276,7 +255,7 @@ public synchronized void addNotification(Notification notification)
* Tracker URLs are ignored.
*/
@Override
public synchronized void removeNotification(Notification notification) throws Exception {
public void removeNotification(Notification notification) throws Exception {
// all notifications
Instant expires = notification.getExpirationDate().toInstant();
ProductId id = notification.getProductId();
Expand Down Expand Up @@ -339,7 +318,7 @@ public synchronized void removeNotification(Notification notification) throws Ex
* @return list with matching notifications, empty if not found.
*/
@Override
public synchronized List<Notification> findNotifications(
public List<Notification> findNotifications(
String source, String type, String code) throws Exception {
final ArrayList<Object> where = new ArrayList<Object>();
final ArrayList<String> values = new ArrayList<String>();
Expand Down Expand Up @@ -398,7 +377,7 @@ public synchronized List<Notification> findNotifications(
* @return list with matching notifications, empty if not found.
*/
@Override
public synchronized List<Notification> findNotifications(
public List<Notification> findNotifications(
List<String> sources, List<String> types, List<String> codes)
throws Exception {
final ArrayList<Object> where = new ArrayList<Object>();
Expand Down Expand Up @@ -474,7 +453,7 @@ public synchronized List<Notification> findNotifications(
* @return list with matching notifications, empty if not found.
*/
@Override
public synchronized List<Notification> findExpiredNotifications() throws Exception {
public List<Notification> findExpiredNotifications() throws Exception {
final String sql = "SELECT * FROM " + this.table + " WHERE expires <= ?";
// prepare statement
beginTransaction();
Expand All @@ -498,7 +477,6 @@ public synchronized List<Notification> findExpiredNotifications() throws Excepti
}
}
return new ArrayList<Notification>();

}

/**
Expand All @@ -509,7 +487,7 @@ public synchronized List<Notification> findExpiredNotifications() throws Excepti
* @return list with matching notifications, empty if not found.
*/
@Override
public synchronized List<Notification> findNotifications(ProductId id) throws Exception {
public List<Notification> findNotifications(ProductId id) throws Exception {
final String sql = "SELECT * FROM " + this.table
+ " WHERE source=? AND type=? AND code=? AND updatetime=?";
// prepare statement
Expand Down Expand Up @@ -555,8 +533,8 @@ public synchronized List<Notification> findNotifications(ProductId id) throws Ex
* other table.
* @throws Exception
*/
public synchronized List<Notification> getMissingNotifications(
final String otherTable) throws Exception {
public List<Notification> getMissingNotifications( final String otherTable)
throws Exception {
// this is used to requeue a notification index.
// run query in a way that returns list of default notifications,
// (by returning empty created, data, and url)
Expand Down Expand Up @@ -593,8 +571,10 @@ public synchronized List<Notification> getMissingNotifications(

/**
* Parse notifications from a statement ready to be executed.
*
* Should be called in a transaction.
*/
protected synchronized List<Notification> getNotifications(PreparedStatement ps)
protected List<Notification> getNotifications(PreparedStatement ps)
throws Exception {
final List<Notification> n = new ArrayList<Notification>();
try (final ResultSet rs = ps.executeQuery()) {
Expand Down
46 changes: 13 additions & 33 deletions src/main/java/gov/usgs/earthquake/aws/JsonProductStorage.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package gov.usgs.earthquake.aws;

import java.io.ByteArrayInputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -56,12 +54,8 @@ public class JsonProductStorage extends JDBCConnection implements ProductStorage
public static final String DEFAULT_TABLE = "product";
public static final String DEFAULT_URL = "jdbc:sqlite:json_product_index.db";

/** JDBC driver classname. */
private String driver;
/** Database table name. */
private String table;
/** JDBC database connect url. */
private String url;

/**
* Create a JsonProductStorage using defaults.
Expand All @@ -82,38 +76,24 @@ public JsonProductStorage(final String driver, final String url) {
*/
public JsonProductStorage(
final String driver, final String url, final String table) {
this.driver = driver;
super(driver, url);
this.table = table;
this.url = url;
}

public String getDriver() { return this.driver; }
public String getTable() { return this.table; }
public String getUrl() { return this.url; }
public void setDriver(final String driver) { this.driver = driver; }
public void setTable(final String table) { this.table = table; }
public void setUrl(final String url) { this.url = url; }

@Override
public void configure(final Config config) throws Exception {
driver = config.getProperty("driver", DEFAULT_DRIVER);
LOGGER.config("[" + getName() + "] driver=" + driver);
table = config.getProperty("table", DEFAULT_TABLE);
LOGGER.config("[" + getName() + "] table=" + table);
url = config.getProperty("url", DEFAULT_URL);
// do not log url, it may contain user/pass
}
super.configure(config);
if (getDriver() == null) { setDriver(DEFAULT_DRIVER); }
if (getUrl() == null) { setUrl(DEFAULT_URL); }

/**
* Connect to database.
*
* Implements abstract JDBCConnection method.
*/
@Override
protected Connection connect() throws Exception {
// load driver if needed
Class.forName(driver);
return DriverManager.getConnection(url);
setTable(config.getProperty("table", DEFAULT_TABLE));
LOGGER.config("[" + getName() + "] driver=" + getDriver());
LOGGER.config("[" + getName() + "] networkTimeout=" + getNetworkTimeout());
LOGGER.config("[" + getName() + "] table=" + getTable());
// do not log url, it may contain user/pass
}

/**
Expand Down Expand Up @@ -166,7 +146,7 @@ public void createSchema() throws Exception {
try (final Statement statement = getConnection().createStatement()) {
String autoIncrement = "";
String engine = "";
if (driver.contains("mysql")) {
if (getDriver().contains("mysql")) {
autoIncrement = " AUTO_INCREMENT";
engine = " ENGINE=innodb CHARSET=utf8";
}
Expand Down Expand Up @@ -205,7 +185,7 @@ public boolean hasProduct(ProductId id) throws Exception {
* @return product if found, otherwise null.
*/
@Override
public synchronized Product getProduct(ProductId id) throws Exception {
public Product getProduct(ProductId id) throws Exception {
Product product = null;
final String sql = "SELECT * FROM " + this.table
+ " WHERE source=? AND type=? AND code=? AND updatetime=?";
Expand Down Expand Up @@ -253,7 +233,7 @@ public synchronized Product getProduct(ProductId id) throws Exception {
* if product already in storage.
*/
@Override
public synchronized ProductId storeProduct(Product product) throws Exception {
public ProductId storeProduct(Product product) throws Exception {
// prepare statement
beginTransaction();
try (
Expand Down Expand Up @@ -323,7 +303,7 @@ public ProductId storeProductSource(ProductSource input) throws Exception {
* Remove product from storage.
*/
@Override
public synchronized void removeProduct(ProductId id) throws Exception {
public void removeProduct(ProductId id) throws Exception {
// prepare statement
final String sql = "DELETE FROM " + this.table
+ " WHERE source=? AND type=? AND code=? AND updatetime=?";
Expand Down
Loading

0 comments on commit 379b684

Please sign in to comment.